1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package scheduling
18
19import (
20	"context"
21	"encoding/json"
22	"fmt"
23	"strings"
24	"sync/atomic"
25	"time"
26
27	"github.com/google/uuid"
28	appsv1 "k8s.io/api/apps/v1"
29	v1 "k8s.io/api/core/v1"
30	schedulingv1 "k8s.io/api/scheduling/v1"
31	apierrors "k8s.io/apimachinery/pkg/api/errors"
32	"k8s.io/apimachinery/pkg/api/resource"
33	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34	"k8s.io/apimachinery/pkg/runtime"
35	"k8s.io/apimachinery/pkg/types"
36	"k8s.io/apimachinery/pkg/util/sets"
37	"k8s.io/apimachinery/pkg/util/strategicpatch"
38	"k8s.io/apimachinery/pkg/util/wait"
39	"k8s.io/apimachinery/pkg/watch"
40	clientset "k8s.io/client-go/kubernetes"
41	"k8s.io/client-go/tools/cache"
42	"k8s.io/kubernetes/pkg/apis/scheduling"
43	"k8s.io/kubernetes/test/e2e/framework"
44	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
45	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
46	e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset"
47
48	"github.com/onsi/ginkgo"
49	"github.com/onsi/gomega"
50
51	// ensure libs have a chance to initialize
52	_ "github.com/stretchr/testify/assert"
53)
54
55type priorityPair struct {
56	name  string
57	value int32
58}
59
60var testExtendedResource = v1.ResourceName("scheduling.k8s.io/foo")
61
62var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
63	var cs clientset.Interface
64	var nodeList *v1.NodeList
65	var ns string
66	f := framework.NewDefaultFramework("sched-preemption")
67
68	lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000)
69	lowPriorityClassName := f.BaseName + "-low-priority"
70	mediumPriorityClassName := f.BaseName + "-medium-priority"
71	highPriorityClassName := f.BaseName + "-high-priority"
72	priorityPairs := []priorityPair{
73		{name: lowPriorityClassName, value: lowPriority},
74		{name: mediumPriorityClassName, value: mediumPriority},
75		{name: highPriorityClassName, value: highPriority},
76	}
77
78	ginkgo.AfterEach(func() {
79		for _, pair := range priorityPairs {
80			cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, *metav1.NewDeleteOptions(0))
81		}
82		for _, node := range nodeList.Items {
83			nodeCopy := node.DeepCopy()
84			delete(nodeCopy.Status.Capacity, testExtendedResource)
85			err := patchNode(cs, &node, nodeCopy)
86			framework.ExpectNoError(err)
87		}
88	})
89
90	ginkgo.BeforeEach(func() {
91		cs = f.ClientSet
92		ns = f.Namespace.Name
93		nodeList = &v1.NodeList{}
94		var err error
95		for _, pair := range priorityPairs {
96			_, err := f.ClientSet.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: pair.name}, Value: pair.value}, metav1.CreateOptions{})
97			framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
98		}
99
100		e2enode.WaitForTotalHealthy(cs, time.Minute)
101		nodeList, err = e2enode.GetReadySchedulableNodes(cs)
102		if err != nil {
103			framework.Logf("Unexpected error occurred: %v", err)
104		}
105		framework.ExpectNoErrorWithOffset(0, err)
106		for _, n := range nodeList.Items {
107			workerNodes.Insert(n.Name)
108		}
109
110		err = framework.CheckTestingNSDeletedExcept(cs, ns)
111		framework.ExpectNoError(err)
112	})
113
114	/*
115		Release: v1.19
116		Testname: Scheduler, Basic Preemption
117		Description: When a higher priority pod is created and no node with enough
118		resources is found, the scheduler MUST preempt a lower priority pod and
119		schedule the high priority pod.
120	*/
121	framework.ConformanceIt("validates basic preemption works", func() {
122		var podRes v1.ResourceList
123
124		// Create two pods per node that uses a lot of the node's resources.
125		ginkgo.By("Create pods that use 4/5 of node resources.")
126		pods := make([]*v1.Pod, 0, 2*len(nodeList.Items))
127		// Create pods in the cluster.
128		// One of them has low priority, making it the victim for preemption.
129		for i, node := range nodeList.Items {
130			// Update each node to advertise 3 available extended resources
131			nodeCopy := node.DeepCopy()
132			nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5")
133			err := patchNode(cs, &node, nodeCopy)
134			framework.ExpectNoError(err)
135
136			for j := 0; j < 2; j++ {
137				// Request 2 of the available resources for the victim pods
138				podRes = v1.ResourceList{}
139				podRes[testExtendedResource] = resource.MustParse("2")
140
141				// make the first pod low priority and the rest medium priority.
142				priorityName := mediumPriorityClassName
143				if len(pods) == 0 {
144					priorityName = lowPriorityClassName
145				}
146				pausePod := createPausePod(f, pausePodConfig{
147					Name:              fmt.Sprintf("pod%d-%d-%v", i, j, priorityName),
148					PriorityClassName: priorityName,
149					Resources: &v1.ResourceRequirements{
150						Requests: podRes,
151						Limits:   podRes,
152					},
153					Affinity: &v1.Affinity{
154						NodeAffinity: &v1.NodeAffinity{
155							RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
156								NodeSelectorTerms: []v1.NodeSelectorTerm{
157									{
158										MatchFields: []v1.NodeSelectorRequirement{
159											{Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
160										},
161									},
162								},
163							},
164						},
165					},
166				})
167				pods = append(pods, pausePod)
168				framework.Logf("Created pod: %v", pausePod.Name)
169			}
170		}
171		if len(pods) < 2 {
172			framework.Failf("We need at least two pods to be created but " +
173				"all nodes are already heavily utilized, so preemption tests cannot be run")
174		}
175		ginkgo.By("Wait for pods to be scheduled.")
176		for _, pod := range pods {
177			framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
178		}
179
180		// Set the pod request to the first pod's resources (should be low priority pod)
181		podRes = pods[0].Spec.Containers[0].Resources.Requests
182
183		ginkgo.By("Run a high priority pod that has same requirements as that of lower priority pod")
184		// Create a high priority pod and make sure it is scheduled on the same node as the low priority pod.
185		runPausePodWithTimeout(f, pausePodConfig{
186			Name:              "preemptor-pod",
187			PriorityClassName: highPriorityClassName,
188			Resources: &v1.ResourceRequirements{
189				Requests: podRes,
190				Limits:   podRes,
191			},
192		}, framework.PodStartShortTimeout)
193
194		preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
195		podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
196			(err == nil && preemptedPod.DeletionTimestamp != nil)
197		for i := 1; i < len(pods); i++ {
198			livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
199			framework.ExpectNoError(err)
200			gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
201		}
202
203		framework.ExpectEqual(podPreempted, true)
204	})
205
206	/*
207		Release: v1.19
208		Testname: Scheduler, Preemption for critical pod
209		Description: When a critical pod is created and no node with enough
210		resources is found, the scheduler MUST preempt a lower priority pod to
211		schedule the critical pod.
212	*/
213	framework.ConformanceIt("validates lower priority pod preemption by critical pod", func() {
214		var podRes v1.ResourceList
215
216		ginkgo.By("Create pods that use 4/5 of node resources.")
217		pods := make([]*v1.Pod, 0, len(nodeList.Items))
218		for i, node := range nodeList.Items {
219			// Update each node to advertise 3 available extended resources
220			nodeCopy := node.DeepCopy()
221			nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5")
222			err := patchNode(cs, &node, nodeCopy)
223			framework.ExpectNoError(err)
224
225			for j := 0; j < 2; j++ {
226				// Request 2 of the available resources for the victim pods
227				podRes = v1.ResourceList{}
228				podRes[testExtendedResource] = resource.MustParse("2")
229
230				// make the first pod low priority and the rest medium priority.
231				priorityName := mediumPriorityClassName
232				if len(pods) == 0 {
233					priorityName = lowPriorityClassName
234				}
235				pausePod := createPausePod(f, pausePodConfig{
236					Name:              fmt.Sprintf("pod%d-%d-%v", i, j, priorityName),
237					PriorityClassName: priorityName,
238					Resources: &v1.ResourceRequirements{
239						Requests: podRes,
240						Limits:   podRes,
241					},
242					Affinity: &v1.Affinity{
243						NodeAffinity: &v1.NodeAffinity{
244							RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
245								NodeSelectorTerms: []v1.NodeSelectorTerm{
246									{
247										MatchFields: []v1.NodeSelectorRequirement{
248											{Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
249										},
250									},
251								},
252							},
253						},
254					},
255				})
256				pods = append(pods, pausePod)
257				framework.Logf("Created pod: %v", pausePod.Name)
258			}
259		}
260		if len(pods) < 2 {
261			framework.Failf("We need at least two pods to be created but " +
262				"all nodes are already heavily utilized, so preemption tests cannot be run")
263		}
264		ginkgo.By("Wait for pods to be scheduled.")
265		for _, pod := range pods {
266			framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
267		}
268
269		// We want this pod to be preempted
270		podRes = pods[0].Spec.Containers[0].Resources.Requests
271		ginkgo.By("Run a critical pod that use same resources as that of a lower priority pod")
272		// Create a critical pod and make sure it is scheduled.
273		defer func() {
274			// Clean-up the critical pod
275			// Always run cleanup to make sure the pod is properly cleaned up.
276			err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", *metav1.NewDeleteOptions(0))
277			if err != nil && !apierrors.IsNotFound(err) {
278				framework.Failf("Error cleanup pod `%s/%s`: %v", metav1.NamespaceSystem, "critical-pod", err)
279			}
280		}()
281		runPausePodWithTimeout(f, pausePodConfig{
282			Name:              "critical-pod",
283			Namespace:         metav1.NamespaceSystem,
284			PriorityClassName: scheduling.SystemClusterCritical,
285			Resources: &v1.ResourceRequirements{
286				Requests: podRes,
287				Limits:   podRes,
288			},
289		}, framework.PodStartShortTimeout)
290
291		defer func() {
292			// Clean-up the critical pod
293			err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", *metav1.NewDeleteOptions(0))
294			framework.ExpectNoError(err)
295		}()
296		// Make sure that the lowest priority pod is deleted.
297		preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
298		podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
299			(err == nil && preemptedPod.DeletionTimestamp != nil)
300		for i := 1; i < len(pods); i++ {
301			livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
302			framework.ExpectNoError(err)
303			gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
304		}
305
306		framework.ExpectEqual(podPreempted, true)
307	})
308
309	ginkgo.Context("PodTopologySpread Preemption", func() {
310		var nodeNames []string
311		var nodes []*v1.Node
312		topologyKey := "kubernetes.io/e2e-pts-preemption"
313		var fakeRes v1.ResourceName = "example.com/fakePTSRes"
314
315		ginkgo.BeforeEach(func() {
316			if len(nodeList.Items) < 2 {
317				ginkgo.Skip("At least 2 nodes are required to run the test")
318			}
319			ginkgo.By("Trying to get 2 available nodes which can run pod")
320			nodeNames = Get2NodesThatCanRunPod(f)
321			ginkgo.By(fmt.Sprintf("Apply dedicated topologyKey %v for this test on the 2 nodes.", topologyKey))
322			for _, nodeName := range nodeNames {
323				framework.AddOrUpdateLabelOnNode(cs, nodeName, topologyKey, nodeName)
324
325				node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
326				framework.ExpectNoError(err)
327				// update Node API object with a fake resource
328				ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name))
329				nodeCopy := node.DeepCopy()
330				nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10")
331				err = patchNode(cs, node, nodeCopy)
332				framework.ExpectNoError(err)
333				nodes = append(nodes, node)
334			}
335		})
336		ginkgo.AfterEach(func() {
337			for _, nodeName := range nodeNames {
338				framework.RemoveLabelOffNode(cs, nodeName, topologyKey)
339			}
340			for _, node := range nodes {
341				nodeCopy := node.DeepCopy()
342				delete(nodeCopy.Status.Capacity, fakeRes)
343				err := patchNode(cs, node, nodeCopy)
344				framework.ExpectNoError(err)
345			}
346		})
347
348		ginkgo.It("validates proper pods are preempted", func() {
349			podLabel := "e2e-pts-preemption"
350			nodeAffinity := &v1.Affinity{
351				NodeAffinity: &v1.NodeAffinity{
352					RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
353						NodeSelectorTerms: []v1.NodeSelectorTerm{
354							{
355								MatchExpressions: []v1.NodeSelectorRequirement{
356									{
357										Key:      topologyKey,
358										Operator: v1.NodeSelectorOpIn,
359										Values:   nodeNames,
360									},
361								},
362							},
363						},
364					},
365				},
366			}
367			highPodCfg := pausePodConfig{
368				Name:              "high",
369				Namespace:         ns,
370				Labels:            map[string]string{podLabel: ""},
371				PriorityClassName: highPriorityClassName,
372				Affinity:          nodeAffinity,
373				Resources: &v1.ResourceRequirements{
374					Requests: v1.ResourceList{fakeRes: resource.MustParse("9")},
375					Limits:   v1.ResourceList{fakeRes: resource.MustParse("9")},
376				},
377			}
378			lowPodCfg := pausePodConfig{
379				Namespace:         ns,
380				Labels:            map[string]string{podLabel: ""},
381				PriorityClassName: lowPriorityClassName,
382				Affinity:          nodeAffinity,
383				Resources: &v1.ResourceRequirements{
384					Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
385					Limits:   v1.ResourceList{fakeRes: resource.MustParse("3")},
386				},
387			}
388
389			ginkgo.By("Create 1 High Pod and 3 Low Pods to occupy 9/10 of fake resources on both nodes.")
390			// Prepare 1 High Pod and 3 Low Pods
391			runPausePod(f, highPodCfg)
392			for i := 1; i <= 3; i++ {
393				lowPodCfg.Name = fmt.Sprintf("low-%v", i)
394				runPausePod(f, lowPodCfg)
395			}
396
397			ginkgo.By("Create 1 Medium Pod with TopologySpreadConstraints")
398			mediumPodCfg := pausePodConfig{
399				Name:              "medium",
400				Namespace:         ns,
401				Labels:            map[string]string{podLabel: ""},
402				PriorityClassName: mediumPriorityClassName,
403				Affinity:          nodeAffinity,
404				Resources: &v1.ResourceRequirements{
405					Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
406					Limits:   v1.ResourceList{fakeRes: resource.MustParse("3")},
407				},
408				TopologySpreadConstraints: []v1.TopologySpreadConstraint{
409					{
410						MaxSkew:           1,
411						TopologyKey:       topologyKey,
412						WhenUnsatisfiable: v1.DoNotSchedule,
413						LabelSelector: &metav1.LabelSelector{
414							MatchExpressions: []metav1.LabelSelectorRequirement{
415								{
416									Key:      podLabel,
417									Operator: metav1.LabelSelectorOpExists,
418								},
419							},
420						},
421					},
422				},
423			}
424			// To fulfil resource.requests, the medium Pod only needs to preempt one low pod.
425			// However, in that case, the Pods spread becomes [<high>, <medium, low, low>], which doesn't
426			// satisfy the pod topology spread constraints. Hence it needs to preempt another low pod
427			// to make the Pods spread like [<high>, <medium, low>].
428			runPausePod(f, mediumPodCfg)
429
430			ginkgo.By("Verify there are 3 Pods left in this namespace")
431			wantPods := sets.NewString("high", "medium", "low")
432
433			// Wait until the number of pods stabilizes. Note that `medium` pod can get scheduled once the
434			// second low priority pod is marked as terminating.
435			pods, err := e2epod.WaitForNumberOfPods(cs, ns, 3, framework.PollShortTimeout)
436			framework.ExpectNoError(err)
437
438			for _, pod := range pods.Items {
439				// Remove the ordinal index for low pod.
440				podName := strings.Split(pod.Name, "-")[0]
441				if wantPods.Has(podName) {
442					ginkgo.By(fmt.Sprintf("Pod %q is as expected to be running.", pod.Name))
443					wantPods.Delete(podName)
444				} else {
445					framework.Failf("Pod %q conflicted with expected PodSet %v", podName, wantPods)
446				}
447			}
448		})
449	})
450
451	ginkgo.Context("PreemptionExecutionPath", func() {
452		// construct a fakecpu so as to set it to status of Node object
453		// otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet
454		var fakecpu v1.ResourceName = "example.com/fakecpu"
455		var cs clientset.Interface
456		var node *v1.Node
457		var ns, nodeHostNameLabel string
458		f := framework.NewDefaultFramework("sched-preemption-path")
459
460		priorityPairs := make([]priorityPair, 0)
461
462		ginkgo.AfterEach(func() {
463			// print out additional info if tests failed
464			if ginkgo.CurrentGinkgoTestDescription().Failed {
465				// List existing PriorityClasses.
466				priorityList, err := cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{})
467				if err != nil {
468					framework.Logf("Unable to list PriorityClasses: %v", err)
469				} else {
470					framework.Logf("List existing PriorityClasses:")
471					for _, p := range priorityList.Items {
472						framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp)
473					}
474				}
475			}
476
477			if node != nil {
478				nodeCopy := node.DeepCopy()
479				delete(nodeCopy.Status.Capacity, fakecpu)
480				err := patchNode(cs, node, nodeCopy)
481				framework.ExpectNoError(err)
482			}
483			for _, pair := range priorityPairs {
484				cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, *metav1.NewDeleteOptions(0))
485			}
486		})
487
488		ginkgo.BeforeEach(func() {
489			cs = f.ClientSet
490			ns = f.Namespace.Name
491
492			// find an available node
493			ginkgo.By("Finding an available node")
494			nodeName := GetNodeThatCanRunPod(f)
495			framework.Logf("found a healthy node: %s", nodeName)
496
497			// get the node API object
498			var err error
499			node, err = cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
500			if err != nil {
501				framework.Failf("error getting node %q: %v", nodeName, err)
502			}
503			var ok bool
504			nodeHostNameLabel, ok = node.GetObjectMeta().GetLabels()["kubernetes.io/hostname"]
505			if !ok {
506				framework.Failf("error getting kubernetes.io/hostname label on node %s", nodeName)
507			}
508
509			// update Node API object with a fake resource
510			nodeCopy := node.DeepCopy()
511			nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
512			err = patchNode(cs, node, nodeCopy)
513			framework.ExpectNoError(err)
514
515			// create four PriorityClass: p1, p2, p3, p4
516			for i := 1; i <= 4; i++ {
517				priorityName := fmt.Sprintf("p%d", i)
518				priorityVal := int32(i)
519				priorityPairs = append(priorityPairs, priorityPair{name: priorityName, value: priorityVal})
520				_, err := cs.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: priorityName}, Value: priorityVal}, metav1.CreateOptions{})
521				if err != nil {
522					framework.Logf("Failed to create priority '%v/%v'. Reason: %v. Msg: %v", priorityName, priorityVal, apierrors.ReasonForError(err), err)
523				}
524				framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
525			}
526		})
527
528		/*
529			Release: v1.19
530			Testname: Pod preemption verification
531			Description: Four levels of Pods in ReplicaSets with different levels of Priority, restricted by given CPU limits MUST launch. Priority 1 - 3 Pods MUST spawn first followed by Priority 4 Pod. The ReplicaSets with Replicas MUST contain the expected number of Replicas.
532		*/
533		framework.ConformanceIt("runs ReplicaSets to verify preemption running path", func() {
534			podNamesSeen := []int32{0, 0, 0}
535			stopCh := make(chan struct{})
536
537			// create a pod controller to list/watch pod events from the test framework namespace
538			_, podController := cache.NewInformer(
539				&cache.ListWatch{
540					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
541						obj, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), options)
542						return runtime.Object(obj), err
543					},
544					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
545						return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options)
546					},
547				},
548				&v1.Pod{},
549				0,
550				cache.ResourceEventHandlerFuncs{
551					AddFunc: func(obj interface{}) {
552						if pod, ok := obj.(*v1.Pod); ok {
553							if strings.HasPrefix(pod.Name, "rs-pod1") {
554								atomic.AddInt32(&podNamesSeen[0], 1)
555							} else if strings.HasPrefix(pod.Name, "rs-pod2") {
556								atomic.AddInt32(&podNamesSeen[1], 1)
557							} else if strings.HasPrefix(pod.Name, "rs-pod3") {
558								atomic.AddInt32(&podNamesSeen[2], 1)
559							}
560						}
561					},
562				},
563			)
564			go podController.Run(stopCh)
565			defer close(stopCh)
566
567			// prepare three ReplicaSet
568			rsConfs := []pauseRSConfig{
569				{
570					Replicas: int32(1),
571					PodConfig: pausePodConfig{
572						Name:              "pod1",
573						Namespace:         ns,
574						Labels:            map[string]string{"name": "pod1"},
575						PriorityClassName: "p1",
576						NodeSelector:      map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
577						Resources: &v1.ResourceRequirements{
578							Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
579							Limits:   v1.ResourceList{fakecpu: resource.MustParse("200")},
580						},
581					},
582				},
583				{
584					Replicas: int32(1),
585					PodConfig: pausePodConfig{
586						Name:              "pod2",
587						Namespace:         ns,
588						Labels:            map[string]string{"name": "pod2"},
589						PriorityClassName: "p2",
590						NodeSelector:      map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
591						Resources: &v1.ResourceRequirements{
592							Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
593							Limits:   v1.ResourceList{fakecpu: resource.MustParse("300")},
594						},
595					},
596				},
597				{
598					Replicas: int32(1),
599					PodConfig: pausePodConfig{
600						Name:              "pod3",
601						Namespace:         ns,
602						Labels:            map[string]string{"name": "pod3"},
603						PriorityClassName: "p3",
604						NodeSelector:      map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
605						Resources: &v1.ResourceRequirements{
606							Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
607							Limits:   v1.ResourceList{fakecpu: resource.MustParse("450")},
608						},
609					},
610				},
611			}
612			// create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource
613			for i := range rsConfs {
614				runPauseRS(f, rsConfs[i])
615			}
616
617			framework.Logf("pods created so far: %v", podNamesSeen)
618			framework.Logf("length of pods created so far: %v", len(podNamesSeen))
619
620			// create a Preemptor Pod
621			preemptorPodConf := pausePodConfig{
622				Name:              "pod4",
623				Namespace:         ns,
624				Labels:            map[string]string{"name": "pod4"},
625				PriorityClassName: "p4",
626				NodeSelector:      map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
627				Resources: &v1.ResourceRequirements{
628					Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
629					Limits:   v1.ResourceList{fakecpu: resource.MustParse("500")},
630				},
631			}
632			preemptorPod := createPod(f, preemptorPodConf)
633			waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout)
634
635			framework.Logf("pods created so far: %v", podNamesSeen)
636
637			// count pods number of ReplicaSet{1,2,3}:
638			// - if it's more than expected replicas, it denotes its pods have been over-preempted
639			// - if it's less than expected replicas, it denotes its pods are under-preempted
640			// "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once.
641			expectedRSPods := []int32{1 * 2, 1 * 2, 1}
642			err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
643				for i := 0; i < len(podNamesSeen); i++ {
644					got := atomic.LoadInt32(&podNamesSeen[i])
645					if got < expectedRSPods[i] {
646						framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
647						return false, nil
648					} else if got > expectedRSPods[i] {
649						return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
650					}
651				}
652				return true, nil
653			})
654			if err != nil {
655				framework.Logf("pods created so far: %v", podNamesSeen)
656				framework.Failf("failed pod observation expectations: %v", err)
657			}
658
659			// If logic continues to here, we should do a final check to ensure within a time period,
660			// the state is stable; otherwise, pods may be over-preempted.
661			time.Sleep(5 * time.Second)
662			for i := 0; i < len(podNamesSeen); i++ {
663				got := atomic.LoadInt32(&podNamesSeen[i])
664				if got < expectedRSPods[i] {
665					framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
666				} else if got > expectedRSPods[i] {
667					framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
668				}
669			}
670		})
671	})
672
673	ginkgo.Context("PriorityClass endpoints", func() {
674		var cs clientset.Interface
675		f := framework.NewDefaultFramework("sched-preemption-path")
676		testUUID := uuid.New().String()
677		var pcs []*schedulingv1.PriorityClass
678
679		ginkgo.BeforeEach(func() {
680			cs = f.ClientSet
681			// Create 2 PriorityClass: p1, p2.
682			for i := 1; i <= 2; i++ {
683				name, val := fmt.Sprintf("p%d", i), int32(i)
684				pc, err := cs.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{"e2e": testUUID}}, Value: val}, metav1.CreateOptions{})
685				if err != nil {
686					framework.Logf("Failed to create priority '%v/%v'. Reason: %v. Msg: %v", name, val, apierrors.ReasonForError(err), err)
687				}
688				framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
689				pcs = append(pcs, pc)
690			}
691		})
692
693		ginkgo.AfterEach(func() {
694			// Print out additional info if tests failed.
695			if ginkgo.CurrentGinkgoTestDescription().Failed {
696				// List existing PriorityClasses.
697				priorityList, err := cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{})
698				if err != nil {
699					framework.Logf("Unable to list PriorityClasses: %v", err)
700				} else {
701					framework.Logf("List existing PriorityClasses:")
702					for _, p := range priorityList.Items {
703						framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp)
704					}
705				}
706			}
707
708			// Collection deletion on created PriorityClasses.
709			err := cs.SchedulingV1().PriorityClasses().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: fmt.Sprintf("e2e=%v", testUUID)})
710			framework.ExpectNoError(err)
711		})
712
713		/*
714			Release: v1.20
715			Testname: Scheduler, Verify PriorityClass endpoints
716			Description: Verify that PriorityClass endpoints can be listed. When any mutable field is
717			either patched or updated it MUST succeed. When any immutable field is either patched or
718			updated it MUST fail.
719		*/
720		framework.ConformanceIt("verify PriorityClass endpoints can be operated with different HTTP methods", func() {
721			// 1. Patch/Update on immutable fields will fail.
722			pcCopy := pcs[0].DeepCopy()
723			pcCopy.Value = pcCopy.Value * 10
724			err := patchPriorityClass(cs, pcs[0], pcCopy)
725			framework.ExpectError(err, "expect a patch error on an immutable field")
726			framework.Logf("%v", err)
727
728			pcCopy = pcs[1].DeepCopy()
729			pcCopy.Value = pcCopy.Value * 10
730			_, err = cs.SchedulingV1().PriorityClasses().Update(context.TODO(), pcCopy, metav1.UpdateOptions{})
731			framework.ExpectError(err, "expect an update error on an immutable field")
732			framework.Logf("%v", err)
733
734			// 2. Patch/Update on mutable fields will succeed.
735			newDesc := "updated description"
736			pcCopy = pcs[0].DeepCopy()
737			pcCopy.Description = newDesc
738			err = patchPriorityClass(cs, pcs[0], pcCopy)
739			framework.ExpectNoError(err)
740
741			pcCopy = pcs[1].DeepCopy()
742			pcCopy.Description = newDesc
743			_, err = cs.SchedulingV1().PriorityClasses().Update(context.TODO(), pcCopy, metav1.UpdateOptions{})
744			framework.ExpectNoError(err)
745
746			// 3. List existing PriorityClasses.
747			_, err = cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{})
748			framework.ExpectNoError(err)
749
750			// 4. Verify fields of updated PriorityClasses.
751			for _, pc := range pcs {
752				livePC, err := cs.SchedulingV1().PriorityClasses().Get(context.TODO(), pc.Name, metav1.GetOptions{})
753				framework.ExpectNoError(err)
754				framework.ExpectEqual(livePC.Value, pc.Value)
755				framework.ExpectEqual(livePC.Description, newDesc)
756			}
757		})
758	})
759})
760
761type pauseRSConfig struct {
762	Replicas  int32
763	PodConfig pausePodConfig
764}
765
766func initPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
767	pausePod := initPausePod(f, conf.PodConfig)
768	pauseRS := &appsv1.ReplicaSet{
769		ObjectMeta: metav1.ObjectMeta{
770			Name:      "rs-" + pausePod.Name,
771			Namespace: pausePod.Namespace,
772		},
773		Spec: appsv1.ReplicaSetSpec{
774			Replicas: &conf.Replicas,
775			Selector: &metav1.LabelSelector{
776				MatchLabels: pausePod.Labels,
777			},
778			Template: v1.PodTemplateSpec{
779				ObjectMeta: metav1.ObjectMeta{Labels: pausePod.ObjectMeta.Labels},
780				Spec:       pausePod.Spec,
781			},
782		},
783	}
784	return pauseRS
785}
786
787func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
788	namespace := conf.PodConfig.Namespace
789	if len(namespace) == 0 {
790		namespace = f.Namespace.Name
791	}
792	rs, err := f.ClientSet.AppsV1().ReplicaSets(namespace).Create(context.TODO(), initPauseRS(f, conf), metav1.CreateOptions{})
793	framework.ExpectNoError(err)
794	return rs
795}
796
797func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
798	rs := createPauseRS(f, conf)
799	framework.ExpectNoError(e2ereplicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
800	return rs
801}
802
803func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
804	namespace := conf.Namespace
805	if len(namespace) == 0 {
806		namespace = f.Namespace.Name
807	}
808	pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(context.TODO(), initPausePod(f, conf), metav1.CreateOptions{})
809	framework.ExpectNoError(err)
810	return pod
811}
812
813// waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks
814// if the 'spec.NodeName' field of preemptor 'pod' has been set.
815func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
816	err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
817		pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
818		if err != nil {
819			return false, err
820		}
821		if len(pod.Spec.NodeName) > 0 {
822			return true, nil
823		}
824		return false, err
825	})
826	framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
827}
828
829func patchNode(client clientset.Interface, old *v1.Node, new *v1.Node) error {
830	oldData, err := json.Marshal(old)
831	if err != nil {
832		return err
833	}
834
835	newData, err := json.Marshal(new)
836	if err != nil {
837		return err
838	}
839	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
840	if err != nil {
841		return fmt.Errorf("failed to create merge patch for node %q: %v", old.Name, err)
842	}
843	_, err = client.CoreV1().Nodes().Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
844	return err
845}
846
847func patchPriorityClass(cs clientset.Interface, old, new *schedulingv1.PriorityClass) error {
848	oldData, err := json.Marshal(old)
849	if err != nil {
850		return err
851	}
852
853	newData, err := json.Marshal(new)
854	if err != nil {
855		return err
856	}
857	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &schedulingv1.PriorityClass{})
858	if err != nil {
859		return fmt.Errorf("failed to create merge patch for PriorityClass %q: %v", old.Name, err)
860	}
861	_, err = cs.SchedulingV1().PriorityClasses().Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
862	return err
863}
864