1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package scheduling 18 19import ( 20 "context" 21 "encoding/json" 22 "fmt" 23 "strings" 24 "sync/atomic" 25 "time" 26 27 "github.com/google/uuid" 28 appsv1 "k8s.io/api/apps/v1" 29 v1 "k8s.io/api/core/v1" 30 schedulingv1 "k8s.io/api/scheduling/v1" 31 apierrors "k8s.io/apimachinery/pkg/api/errors" 32 "k8s.io/apimachinery/pkg/api/resource" 33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 34 "k8s.io/apimachinery/pkg/runtime" 35 "k8s.io/apimachinery/pkg/types" 36 "k8s.io/apimachinery/pkg/util/sets" 37 "k8s.io/apimachinery/pkg/util/strategicpatch" 38 "k8s.io/apimachinery/pkg/util/wait" 39 "k8s.io/apimachinery/pkg/watch" 40 clientset "k8s.io/client-go/kubernetes" 41 "k8s.io/client-go/tools/cache" 42 "k8s.io/kubernetes/pkg/apis/scheduling" 43 "k8s.io/kubernetes/test/e2e/framework" 44 e2enode "k8s.io/kubernetes/test/e2e/framework/node" 45 e2epod "k8s.io/kubernetes/test/e2e/framework/pod" 46 e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset" 47 48 "github.com/onsi/ginkgo" 49 "github.com/onsi/gomega" 50 51 // ensure libs have a chance to initialize 52 _ "github.com/stretchr/testify/assert" 53) 54 55type priorityPair struct { 56 name string 57 value int32 58} 59 60var testExtendedResource = v1.ResourceName("scheduling.k8s.io/foo") 61 62var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { 63 var cs clientset.Interface 64 var nodeList *v1.NodeList 65 var ns string 66 f := framework.NewDefaultFramework("sched-preemption") 67 68 lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000) 69 lowPriorityClassName := f.BaseName + "-low-priority" 70 mediumPriorityClassName := f.BaseName + "-medium-priority" 71 highPriorityClassName := f.BaseName + "-high-priority" 72 priorityPairs := []priorityPair{ 73 {name: lowPriorityClassName, value: lowPriority}, 74 {name: mediumPriorityClassName, value: mediumPriority}, 75 {name: highPriorityClassName, value: highPriority}, 76 } 77 78 ginkgo.AfterEach(func() { 79 for _, pair := range priorityPairs { 80 cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, *metav1.NewDeleteOptions(0)) 81 } 82 for _, node := range nodeList.Items { 83 nodeCopy := node.DeepCopy() 84 delete(nodeCopy.Status.Capacity, testExtendedResource) 85 err := patchNode(cs, &node, nodeCopy) 86 framework.ExpectNoError(err) 87 } 88 }) 89 90 ginkgo.BeforeEach(func() { 91 cs = f.ClientSet 92 ns = f.Namespace.Name 93 nodeList = &v1.NodeList{} 94 var err error 95 for _, pair := range priorityPairs { 96 _, err := f.ClientSet.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: pair.name}, Value: pair.value}, metav1.CreateOptions{}) 97 framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true) 98 } 99 100 e2enode.WaitForTotalHealthy(cs, time.Minute) 101 nodeList, err = e2enode.GetReadySchedulableNodes(cs) 102 if err != nil { 103 framework.Logf("Unexpected error occurred: %v", err) 104 } 105 framework.ExpectNoErrorWithOffset(0, err) 106 for _, n := range nodeList.Items { 107 workerNodes.Insert(n.Name) 108 } 109 110 err = framework.CheckTestingNSDeletedExcept(cs, ns) 111 framework.ExpectNoError(err) 112 }) 113 114 /* 115 Release: v1.19 116 Testname: Scheduler, Basic Preemption 117 Description: When a higher priority pod is created and no node with enough 118 resources is found, the scheduler MUST preempt a lower priority pod and 119 schedule the high priority pod. 120 */ 121 framework.ConformanceIt("validates basic preemption works", func() { 122 var podRes v1.ResourceList 123 124 // Create two pods per node that uses a lot of the node's resources. 125 ginkgo.By("Create pods that use 4/5 of node resources.") 126 pods := make([]*v1.Pod, 0, 2*len(nodeList.Items)) 127 // Create pods in the cluster. 128 // One of them has low priority, making it the victim for preemption. 129 for i, node := range nodeList.Items { 130 // Update each node to advertise 3 available extended resources 131 nodeCopy := node.DeepCopy() 132 nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5") 133 err := patchNode(cs, &node, nodeCopy) 134 framework.ExpectNoError(err) 135 136 for j := 0; j < 2; j++ { 137 // Request 2 of the available resources for the victim pods 138 podRes = v1.ResourceList{} 139 podRes[testExtendedResource] = resource.MustParse("2") 140 141 // make the first pod low priority and the rest medium priority. 142 priorityName := mediumPriorityClassName 143 if len(pods) == 0 { 144 priorityName = lowPriorityClassName 145 } 146 pausePod := createPausePod(f, pausePodConfig{ 147 Name: fmt.Sprintf("pod%d-%d-%v", i, j, priorityName), 148 PriorityClassName: priorityName, 149 Resources: &v1.ResourceRequirements{ 150 Requests: podRes, 151 Limits: podRes, 152 }, 153 Affinity: &v1.Affinity{ 154 NodeAffinity: &v1.NodeAffinity{ 155 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ 156 NodeSelectorTerms: []v1.NodeSelectorTerm{ 157 { 158 MatchFields: []v1.NodeSelectorRequirement{ 159 {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}}, 160 }, 161 }, 162 }, 163 }, 164 }, 165 }, 166 }) 167 pods = append(pods, pausePod) 168 framework.Logf("Created pod: %v", pausePod.Name) 169 } 170 } 171 if len(pods) < 2 { 172 framework.Failf("We need at least two pods to be created but " + 173 "all nodes are already heavily utilized, so preemption tests cannot be run") 174 } 175 ginkgo.By("Wait for pods to be scheduled.") 176 for _, pod := range pods { 177 framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod)) 178 } 179 180 // Set the pod request to the first pod's resources (should be low priority pod) 181 podRes = pods[0].Spec.Containers[0].Resources.Requests 182 183 ginkgo.By("Run a high priority pod that has same requirements as that of lower priority pod") 184 // Create a high priority pod and make sure it is scheduled on the same node as the low priority pod. 185 runPausePodWithTimeout(f, pausePodConfig{ 186 Name: "preemptor-pod", 187 PriorityClassName: highPriorityClassName, 188 Resources: &v1.ResourceRequirements{ 189 Requests: podRes, 190 Limits: podRes, 191 }, 192 }, framework.PodStartShortTimeout) 193 194 preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{}) 195 podPreempted := (err != nil && apierrors.IsNotFound(err)) || 196 (err == nil && preemptedPod.DeletionTimestamp != nil) 197 for i := 1; i < len(pods); i++ { 198 livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{}) 199 framework.ExpectNoError(err) 200 gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil()) 201 } 202 203 framework.ExpectEqual(podPreempted, true) 204 }) 205 206 /* 207 Release: v1.19 208 Testname: Scheduler, Preemption for critical pod 209 Description: When a critical pod is created and no node with enough 210 resources is found, the scheduler MUST preempt a lower priority pod to 211 schedule the critical pod. 212 */ 213 framework.ConformanceIt("validates lower priority pod preemption by critical pod", func() { 214 var podRes v1.ResourceList 215 216 ginkgo.By("Create pods that use 4/5 of node resources.") 217 pods := make([]*v1.Pod, 0, len(nodeList.Items)) 218 for i, node := range nodeList.Items { 219 // Update each node to advertise 3 available extended resources 220 nodeCopy := node.DeepCopy() 221 nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5") 222 err := patchNode(cs, &node, nodeCopy) 223 framework.ExpectNoError(err) 224 225 for j := 0; j < 2; j++ { 226 // Request 2 of the available resources for the victim pods 227 podRes = v1.ResourceList{} 228 podRes[testExtendedResource] = resource.MustParse("2") 229 230 // make the first pod low priority and the rest medium priority. 231 priorityName := mediumPriorityClassName 232 if len(pods) == 0 { 233 priorityName = lowPriorityClassName 234 } 235 pausePod := createPausePod(f, pausePodConfig{ 236 Name: fmt.Sprintf("pod%d-%d-%v", i, j, priorityName), 237 PriorityClassName: priorityName, 238 Resources: &v1.ResourceRequirements{ 239 Requests: podRes, 240 Limits: podRes, 241 }, 242 Affinity: &v1.Affinity{ 243 NodeAffinity: &v1.NodeAffinity{ 244 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ 245 NodeSelectorTerms: []v1.NodeSelectorTerm{ 246 { 247 MatchFields: []v1.NodeSelectorRequirement{ 248 {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}}, 249 }, 250 }, 251 }, 252 }, 253 }, 254 }, 255 }) 256 pods = append(pods, pausePod) 257 framework.Logf("Created pod: %v", pausePod.Name) 258 } 259 } 260 if len(pods) < 2 { 261 framework.Failf("We need at least two pods to be created but " + 262 "all nodes are already heavily utilized, so preemption tests cannot be run") 263 } 264 ginkgo.By("Wait for pods to be scheduled.") 265 for _, pod := range pods { 266 framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod)) 267 } 268 269 // We want this pod to be preempted 270 podRes = pods[0].Spec.Containers[0].Resources.Requests 271 ginkgo.By("Run a critical pod that use same resources as that of a lower priority pod") 272 // Create a critical pod and make sure it is scheduled. 273 defer func() { 274 // Clean-up the critical pod 275 // Always run cleanup to make sure the pod is properly cleaned up. 276 err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", *metav1.NewDeleteOptions(0)) 277 if err != nil && !apierrors.IsNotFound(err) { 278 framework.Failf("Error cleanup pod `%s/%s`: %v", metav1.NamespaceSystem, "critical-pod", err) 279 } 280 }() 281 runPausePodWithTimeout(f, pausePodConfig{ 282 Name: "critical-pod", 283 Namespace: metav1.NamespaceSystem, 284 PriorityClassName: scheduling.SystemClusterCritical, 285 Resources: &v1.ResourceRequirements{ 286 Requests: podRes, 287 Limits: podRes, 288 }, 289 }, framework.PodStartShortTimeout) 290 291 defer func() { 292 // Clean-up the critical pod 293 err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", *metav1.NewDeleteOptions(0)) 294 framework.ExpectNoError(err) 295 }() 296 // Make sure that the lowest priority pod is deleted. 297 preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{}) 298 podPreempted := (err != nil && apierrors.IsNotFound(err)) || 299 (err == nil && preemptedPod.DeletionTimestamp != nil) 300 for i := 1; i < len(pods); i++ { 301 livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{}) 302 framework.ExpectNoError(err) 303 gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil()) 304 } 305 306 framework.ExpectEqual(podPreempted, true) 307 }) 308 309 ginkgo.Context("PodTopologySpread Preemption", func() { 310 var nodeNames []string 311 var nodes []*v1.Node 312 topologyKey := "kubernetes.io/e2e-pts-preemption" 313 var fakeRes v1.ResourceName = "example.com/fakePTSRes" 314 315 ginkgo.BeforeEach(func() { 316 if len(nodeList.Items) < 2 { 317 ginkgo.Skip("At least 2 nodes are required to run the test") 318 } 319 ginkgo.By("Trying to get 2 available nodes which can run pod") 320 nodeNames = Get2NodesThatCanRunPod(f) 321 ginkgo.By(fmt.Sprintf("Apply dedicated topologyKey %v for this test on the 2 nodes.", topologyKey)) 322 for _, nodeName := range nodeNames { 323 framework.AddOrUpdateLabelOnNode(cs, nodeName, topologyKey, nodeName) 324 325 node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 326 framework.ExpectNoError(err) 327 // update Node API object with a fake resource 328 ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name)) 329 nodeCopy := node.DeepCopy() 330 nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10") 331 err = patchNode(cs, node, nodeCopy) 332 framework.ExpectNoError(err) 333 nodes = append(nodes, node) 334 } 335 }) 336 ginkgo.AfterEach(func() { 337 for _, nodeName := range nodeNames { 338 framework.RemoveLabelOffNode(cs, nodeName, topologyKey) 339 } 340 for _, node := range nodes { 341 nodeCopy := node.DeepCopy() 342 delete(nodeCopy.Status.Capacity, fakeRes) 343 err := patchNode(cs, node, nodeCopy) 344 framework.ExpectNoError(err) 345 } 346 }) 347 348 ginkgo.It("validates proper pods are preempted", func() { 349 podLabel := "e2e-pts-preemption" 350 nodeAffinity := &v1.Affinity{ 351 NodeAffinity: &v1.NodeAffinity{ 352 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ 353 NodeSelectorTerms: []v1.NodeSelectorTerm{ 354 { 355 MatchExpressions: []v1.NodeSelectorRequirement{ 356 { 357 Key: topologyKey, 358 Operator: v1.NodeSelectorOpIn, 359 Values: nodeNames, 360 }, 361 }, 362 }, 363 }, 364 }, 365 }, 366 } 367 highPodCfg := pausePodConfig{ 368 Name: "high", 369 Namespace: ns, 370 Labels: map[string]string{podLabel: ""}, 371 PriorityClassName: highPriorityClassName, 372 Affinity: nodeAffinity, 373 Resources: &v1.ResourceRequirements{ 374 Requests: v1.ResourceList{fakeRes: resource.MustParse("9")}, 375 Limits: v1.ResourceList{fakeRes: resource.MustParse("9")}, 376 }, 377 } 378 lowPodCfg := pausePodConfig{ 379 Namespace: ns, 380 Labels: map[string]string{podLabel: ""}, 381 PriorityClassName: lowPriorityClassName, 382 Affinity: nodeAffinity, 383 Resources: &v1.ResourceRequirements{ 384 Requests: v1.ResourceList{fakeRes: resource.MustParse("3")}, 385 Limits: v1.ResourceList{fakeRes: resource.MustParse("3")}, 386 }, 387 } 388 389 ginkgo.By("Create 1 High Pod and 3 Low Pods to occupy 9/10 of fake resources on both nodes.") 390 // Prepare 1 High Pod and 3 Low Pods 391 runPausePod(f, highPodCfg) 392 for i := 1; i <= 3; i++ { 393 lowPodCfg.Name = fmt.Sprintf("low-%v", i) 394 runPausePod(f, lowPodCfg) 395 } 396 397 ginkgo.By("Create 1 Medium Pod with TopologySpreadConstraints") 398 mediumPodCfg := pausePodConfig{ 399 Name: "medium", 400 Namespace: ns, 401 Labels: map[string]string{podLabel: ""}, 402 PriorityClassName: mediumPriorityClassName, 403 Affinity: nodeAffinity, 404 Resources: &v1.ResourceRequirements{ 405 Requests: v1.ResourceList{fakeRes: resource.MustParse("3")}, 406 Limits: v1.ResourceList{fakeRes: resource.MustParse("3")}, 407 }, 408 TopologySpreadConstraints: []v1.TopologySpreadConstraint{ 409 { 410 MaxSkew: 1, 411 TopologyKey: topologyKey, 412 WhenUnsatisfiable: v1.DoNotSchedule, 413 LabelSelector: &metav1.LabelSelector{ 414 MatchExpressions: []metav1.LabelSelectorRequirement{ 415 { 416 Key: podLabel, 417 Operator: metav1.LabelSelectorOpExists, 418 }, 419 }, 420 }, 421 }, 422 }, 423 } 424 // To fulfil resource.requests, the medium Pod only needs to preempt one low pod. 425 // However, in that case, the Pods spread becomes [<high>, <medium, low, low>], which doesn't 426 // satisfy the pod topology spread constraints. Hence it needs to preempt another low pod 427 // to make the Pods spread like [<high>, <medium, low>]. 428 runPausePod(f, mediumPodCfg) 429 430 ginkgo.By("Verify there are 3 Pods left in this namespace") 431 wantPods := sets.NewString("high", "medium", "low") 432 433 // Wait until the number of pods stabilizes. Note that `medium` pod can get scheduled once the 434 // second low priority pod is marked as terminating. 435 pods, err := e2epod.WaitForNumberOfPods(cs, ns, 3, framework.PollShortTimeout) 436 framework.ExpectNoError(err) 437 438 for _, pod := range pods.Items { 439 // Remove the ordinal index for low pod. 440 podName := strings.Split(pod.Name, "-")[0] 441 if wantPods.Has(podName) { 442 ginkgo.By(fmt.Sprintf("Pod %q is as expected to be running.", pod.Name)) 443 wantPods.Delete(podName) 444 } else { 445 framework.Failf("Pod %q conflicted with expected PodSet %v", podName, wantPods) 446 } 447 } 448 }) 449 }) 450 451 ginkgo.Context("PreemptionExecutionPath", func() { 452 // construct a fakecpu so as to set it to status of Node object 453 // otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet 454 var fakecpu v1.ResourceName = "example.com/fakecpu" 455 var cs clientset.Interface 456 var node *v1.Node 457 var ns, nodeHostNameLabel string 458 f := framework.NewDefaultFramework("sched-preemption-path") 459 460 priorityPairs := make([]priorityPair, 0) 461 462 ginkgo.AfterEach(func() { 463 // print out additional info if tests failed 464 if ginkgo.CurrentGinkgoTestDescription().Failed { 465 // List existing PriorityClasses. 466 priorityList, err := cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{}) 467 if err != nil { 468 framework.Logf("Unable to list PriorityClasses: %v", err) 469 } else { 470 framework.Logf("List existing PriorityClasses:") 471 for _, p := range priorityList.Items { 472 framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp) 473 } 474 } 475 } 476 477 if node != nil { 478 nodeCopy := node.DeepCopy() 479 delete(nodeCopy.Status.Capacity, fakecpu) 480 err := patchNode(cs, node, nodeCopy) 481 framework.ExpectNoError(err) 482 } 483 for _, pair := range priorityPairs { 484 cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, *metav1.NewDeleteOptions(0)) 485 } 486 }) 487 488 ginkgo.BeforeEach(func() { 489 cs = f.ClientSet 490 ns = f.Namespace.Name 491 492 // find an available node 493 ginkgo.By("Finding an available node") 494 nodeName := GetNodeThatCanRunPod(f) 495 framework.Logf("found a healthy node: %s", nodeName) 496 497 // get the node API object 498 var err error 499 node, err = cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 500 if err != nil { 501 framework.Failf("error getting node %q: %v", nodeName, err) 502 } 503 var ok bool 504 nodeHostNameLabel, ok = node.GetObjectMeta().GetLabels()["kubernetes.io/hostname"] 505 if !ok { 506 framework.Failf("error getting kubernetes.io/hostname label on node %s", nodeName) 507 } 508 509 // update Node API object with a fake resource 510 nodeCopy := node.DeepCopy() 511 nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000") 512 err = patchNode(cs, node, nodeCopy) 513 framework.ExpectNoError(err) 514 515 // create four PriorityClass: p1, p2, p3, p4 516 for i := 1; i <= 4; i++ { 517 priorityName := fmt.Sprintf("p%d", i) 518 priorityVal := int32(i) 519 priorityPairs = append(priorityPairs, priorityPair{name: priorityName, value: priorityVal}) 520 _, err := cs.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: priorityName}, Value: priorityVal}, metav1.CreateOptions{}) 521 if err != nil { 522 framework.Logf("Failed to create priority '%v/%v'. Reason: %v. Msg: %v", priorityName, priorityVal, apierrors.ReasonForError(err), err) 523 } 524 framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true) 525 } 526 }) 527 528 /* 529 Release: v1.19 530 Testname: Pod preemption verification 531 Description: Four levels of Pods in ReplicaSets with different levels of Priority, restricted by given CPU limits MUST launch. Priority 1 - 3 Pods MUST spawn first followed by Priority 4 Pod. The ReplicaSets with Replicas MUST contain the expected number of Replicas. 532 */ 533 framework.ConformanceIt("runs ReplicaSets to verify preemption running path", func() { 534 podNamesSeen := []int32{0, 0, 0} 535 stopCh := make(chan struct{}) 536 537 // create a pod controller to list/watch pod events from the test framework namespace 538 _, podController := cache.NewInformer( 539 &cache.ListWatch{ 540 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { 541 obj, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), options) 542 return runtime.Object(obj), err 543 }, 544 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { 545 return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options) 546 }, 547 }, 548 &v1.Pod{}, 549 0, 550 cache.ResourceEventHandlerFuncs{ 551 AddFunc: func(obj interface{}) { 552 if pod, ok := obj.(*v1.Pod); ok { 553 if strings.HasPrefix(pod.Name, "rs-pod1") { 554 atomic.AddInt32(&podNamesSeen[0], 1) 555 } else if strings.HasPrefix(pod.Name, "rs-pod2") { 556 atomic.AddInt32(&podNamesSeen[1], 1) 557 } else if strings.HasPrefix(pod.Name, "rs-pod3") { 558 atomic.AddInt32(&podNamesSeen[2], 1) 559 } 560 } 561 }, 562 }, 563 ) 564 go podController.Run(stopCh) 565 defer close(stopCh) 566 567 // prepare three ReplicaSet 568 rsConfs := []pauseRSConfig{ 569 { 570 Replicas: int32(1), 571 PodConfig: pausePodConfig{ 572 Name: "pod1", 573 Namespace: ns, 574 Labels: map[string]string{"name": "pod1"}, 575 PriorityClassName: "p1", 576 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, 577 Resources: &v1.ResourceRequirements{ 578 Requests: v1.ResourceList{fakecpu: resource.MustParse("200")}, 579 Limits: v1.ResourceList{fakecpu: resource.MustParse("200")}, 580 }, 581 }, 582 }, 583 { 584 Replicas: int32(1), 585 PodConfig: pausePodConfig{ 586 Name: "pod2", 587 Namespace: ns, 588 Labels: map[string]string{"name": "pod2"}, 589 PriorityClassName: "p2", 590 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, 591 Resources: &v1.ResourceRequirements{ 592 Requests: v1.ResourceList{fakecpu: resource.MustParse("300")}, 593 Limits: v1.ResourceList{fakecpu: resource.MustParse("300")}, 594 }, 595 }, 596 }, 597 { 598 Replicas: int32(1), 599 PodConfig: pausePodConfig{ 600 Name: "pod3", 601 Namespace: ns, 602 Labels: map[string]string{"name": "pod3"}, 603 PriorityClassName: "p3", 604 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, 605 Resources: &v1.ResourceRequirements{ 606 Requests: v1.ResourceList{fakecpu: resource.MustParse("450")}, 607 Limits: v1.ResourceList{fakecpu: resource.MustParse("450")}, 608 }, 609 }, 610 }, 611 } 612 // create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource 613 for i := range rsConfs { 614 runPauseRS(f, rsConfs[i]) 615 } 616 617 framework.Logf("pods created so far: %v", podNamesSeen) 618 framework.Logf("length of pods created so far: %v", len(podNamesSeen)) 619 620 // create a Preemptor Pod 621 preemptorPodConf := pausePodConfig{ 622 Name: "pod4", 623 Namespace: ns, 624 Labels: map[string]string{"name": "pod4"}, 625 PriorityClassName: "p4", 626 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, 627 Resources: &v1.ResourceRequirements{ 628 Requests: v1.ResourceList{fakecpu: resource.MustParse("500")}, 629 Limits: v1.ResourceList{fakecpu: resource.MustParse("500")}, 630 }, 631 } 632 preemptorPod := createPod(f, preemptorPodConf) 633 waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout) 634 635 framework.Logf("pods created so far: %v", podNamesSeen) 636 637 // count pods number of ReplicaSet{1,2,3}: 638 // - if it's more than expected replicas, it denotes its pods have been over-preempted 639 // - if it's less than expected replicas, it denotes its pods are under-preempted 640 // "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once. 641 expectedRSPods := []int32{1 * 2, 1 * 2, 1} 642 err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) { 643 for i := 0; i < len(podNamesSeen); i++ { 644 got := atomic.LoadInt32(&podNamesSeen[i]) 645 if got < expectedRSPods[i] { 646 framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got) 647 return false, nil 648 } else if got > expectedRSPods[i] { 649 return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got) 650 } 651 } 652 return true, nil 653 }) 654 if err != nil { 655 framework.Logf("pods created so far: %v", podNamesSeen) 656 framework.Failf("failed pod observation expectations: %v", err) 657 } 658 659 // If logic continues to here, we should do a final check to ensure within a time period, 660 // the state is stable; otherwise, pods may be over-preempted. 661 time.Sleep(5 * time.Second) 662 for i := 0; i < len(podNamesSeen); i++ { 663 got := atomic.LoadInt32(&podNamesSeen[i]) 664 if got < expectedRSPods[i] { 665 framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got) 666 } else if got > expectedRSPods[i] { 667 framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got) 668 } 669 } 670 }) 671 }) 672 673 ginkgo.Context("PriorityClass endpoints", func() { 674 var cs clientset.Interface 675 f := framework.NewDefaultFramework("sched-preemption-path") 676 testUUID := uuid.New().String() 677 var pcs []*schedulingv1.PriorityClass 678 679 ginkgo.BeforeEach(func() { 680 cs = f.ClientSet 681 // Create 2 PriorityClass: p1, p2. 682 for i := 1; i <= 2; i++ { 683 name, val := fmt.Sprintf("p%d", i), int32(i) 684 pc, err := cs.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{"e2e": testUUID}}, Value: val}, metav1.CreateOptions{}) 685 if err != nil { 686 framework.Logf("Failed to create priority '%v/%v'. Reason: %v. Msg: %v", name, val, apierrors.ReasonForError(err), err) 687 } 688 framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true) 689 pcs = append(pcs, pc) 690 } 691 }) 692 693 ginkgo.AfterEach(func() { 694 // Print out additional info if tests failed. 695 if ginkgo.CurrentGinkgoTestDescription().Failed { 696 // List existing PriorityClasses. 697 priorityList, err := cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{}) 698 if err != nil { 699 framework.Logf("Unable to list PriorityClasses: %v", err) 700 } else { 701 framework.Logf("List existing PriorityClasses:") 702 for _, p := range priorityList.Items { 703 framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp) 704 } 705 } 706 } 707 708 // Collection deletion on created PriorityClasses. 709 err := cs.SchedulingV1().PriorityClasses().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: fmt.Sprintf("e2e=%v", testUUID)}) 710 framework.ExpectNoError(err) 711 }) 712 713 /* 714 Release: v1.20 715 Testname: Scheduler, Verify PriorityClass endpoints 716 Description: Verify that PriorityClass endpoints can be listed. When any mutable field is 717 either patched or updated it MUST succeed. When any immutable field is either patched or 718 updated it MUST fail. 719 */ 720 framework.ConformanceIt("verify PriorityClass endpoints can be operated with different HTTP methods", func() { 721 // 1. Patch/Update on immutable fields will fail. 722 pcCopy := pcs[0].DeepCopy() 723 pcCopy.Value = pcCopy.Value * 10 724 err := patchPriorityClass(cs, pcs[0], pcCopy) 725 framework.ExpectError(err, "expect a patch error on an immutable field") 726 framework.Logf("%v", err) 727 728 pcCopy = pcs[1].DeepCopy() 729 pcCopy.Value = pcCopy.Value * 10 730 _, err = cs.SchedulingV1().PriorityClasses().Update(context.TODO(), pcCopy, metav1.UpdateOptions{}) 731 framework.ExpectError(err, "expect an update error on an immutable field") 732 framework.Logf("%v", err) 733 734 // 2. Patch/Update on mutable fields will succeed. 735 newDesc := "updated description" 736 pcCopy = pcs[0].DeepCopy() 737 pcCopy.Description = newDesc 738 err = patchPriorityClass(cs, pcs[0], pcCopy) 739 framework.ExpectNoError(err) 740 741 pcCopy = pcs[1].DeepCopy() 742 pcCopy.Description = newDesc 743 _, err = cs.SchedulingV1().PriorityClasses().Update(context.TODO(), pcCopy, metav1.UpdateOptions{}) 744 framework.ExpectNoError(err) 745 746 // 3. List existing PriorityClasses. 747 _, err = cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{}) 748 framework.ExpectNoError(err) 749 750 // 4. Verify fields of updated PriorityClasses. 751 for _, pc := range pcs { 752 livePC, err := cs.SchedulingV1().PriorityClasses().Get(context.TODO(), pc.Name, metav1.GetOptions{}) 753 framework.ExpectNoError(err) 754 framework.ExpectEqual(livePC.Value, pc.Value) 755 framework.ExpectEqual(livePC.Description, newDesc) 756 } 757 }) 758 }) 759}) 760 761type pauseRSConfig struct { 762 Replicas int32 763 PodConfig pausePodConfig 764} 765 766func initPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet { 767 pausePod := initPausePod(f, conf.PodConfig) 768 pauseRS := &appsv1.ReplicaSet{ 769 ObjectMeta: metav1.ObjectMeta{ 770 Name: "rs-" + pausePod.Name, 771 Namespace: pausePod.Namespace, 772 }, 773 Spec: appsv1.ReplicaSetSpec{ 774 Replicas: &conf.Replicas, 775 Selector: &metav1.LabelSelector{ 776 MatchLabels: pausePod.Labels, 777 }, 778 Template: v1.PodTemplateSpec{ 779 ObjectMeta: metav1.ObjectMeta{Labels: pausePod.ObjectMeta.Labels}, 780 Spec: pausePod.Spec, 781 }, 782 }, 783 } 784 return pauseRS 785} 786 787func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet { 788 namespace := conf.PodConfig.Namespace 789 if len(namespace) == 0 { 790 namespace = f.Namespace.Name 791 } 792 rs, err := f.ClientSet.AppsV1().ReplicaSets(namespace).Create(context.TODO(), initPauseRS(f, conf), metav1.CreateOptions{}) 793 framework.ExpectNoError(err) 794 return rs 795} 796 797func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet { 798 rs := createPauseRS(f, conf) 799 framework.ExpectNoError(e2ereplicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout)) 800 return rs 801} 802 803func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod { 804 namespace := conf.Namespace 805 if len(namespace) == 0 { 806 namespace = f.Namespace.Name 807 } 808 pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(context.TODO(), initPausePod(f, conf), metav1.CreateOptions{}) 809 framework.ExpectNoError(err) 810 return pod 811} 812 813// waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks 814// if the 'spec.NodeName' field of preemptor 'pod' has been set. 815func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) { 816 err := wait.Poll(2*time.Second, timeout, func() (bool, error) { 817 pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) 818 if err != nil { 819 return false, err 820 } 821 if len(pod.Spec.NodeName) > 0 { 822 return true, nil 823 } 824 return false, err 825 }) 826 framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name) 827} 828 829func patchNode(client clientset.Interface, old *v1.Node, new *v1.Node) error { 830 oldData, err := json.Marshal(old) 831 if err != nil { 832 return err 833 } 834 835 newData, err := json.Marshal(new) 836 if err != nil { 837 return err 838 } 839 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) 840 if err != nil { 841 return fmt.Errorf("failed to create merge patch for node %q: %v", old.Name, err) 842 } 843 _, err = client.CoreV1().Nodes().Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") 844 return err 845} 846 847func patchPriorityClass(cs clientset.Interface, old, new *schedulingv1.PriorityClass) error { 848 oldData, err := json.Marshal(old) 849 if err != nil { 850 return err 851 } 852 853 newData, err := json.Marshal(new) 854 if err != nil { 855 return err 856 } 857 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &schedulingv1.PriorityClass{}) 858 if err != nil { 859 return fmt.Errorf("failed to create merge patch for PriorityClass %q: %v", old.Name, err) 860 } 861 _, err = cs.SchedulingV1().PriorityClasses().Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) 862 return err 863} 864