1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package scheduler 18 19import ( 20 "context" 21 "fmt" 22 "testing" 23 "time" 24 25 v1 "k8s.io/api/core/v1" 26 policy "k8s.io/api/policy/v1beta1" 27 apierrors "k8s.io/apimachinery/pkg/api/errors" 28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 29 "k8s.io/apimachinery/pkg/util/wait" 30 cacheddiscovery "k8s.io/client-go/discovery/cached/memory" 31 "k8s.io/client-go/dynamic" 32 "k8s.io/client-go/informers" 33 clientset "k8s.io/client-go/kubernetes" 34 corelisters "k8s.io/client-go/listers/core/v1" 35 restclient "k8s.io/client-go/rest" 36 "k8s.io/client-go/restmapper" 37 "k8s.io/client-go/scale" 38 "k8s.io/kube-scheduler/config/v1beta2" 39 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 40 "k8s.io/kubernetes/pkg/controller/disruption" 41 "k8s.io/kubernetes/pkg/scheduler" 42 configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" 43 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption" 44 st "k8s.io/kubernetes/pkg/scheduler/testing" 45 testutils "k8s.io/kubernetes/test/integration/util" 46 imageutils "k8s.io/kubernetes/test/utils/image" 47 "k8s.io/utils/pointer" 48) 49 50// initDisruptionController initializes and runs a Disruption Controller to properly 51// update PodDisuptionBudget objects. 52func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *disruption.DisruptionController { 53 informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour) 54 55 discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery()) 56 mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) 57 58 config := restclient.Config{Host: testCtx.HTTPServer.URL} 59 scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery()) 60 scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) 61 if err != nil { 62 t.Fatalf("Error in create scaleClient: %v", err) 63 } 64 65 dc := disruption.NewDisruptionController( 66 informers.Core().V1().Pods(), 67 informers.Policy().V1().PodDisruptionBudgets(), 68 informers.Core().V1().ReplicationControllers(), 69 informers.Apps().V1().ReplicaSets(), 70 informers.Apps().V1().Deployments(), 71 informers.Apps().V1().StatefulSets(), 72 testCtx.ClientSet, 73 mapper, 74 scaleClient, 75 testCtx.ClientSet.Discovery()) 76 77 informers.Start(testCtx.Scheduler.StopEverything) 78 informers.WaitForCacheSync(testCtx.Scheduler.StopEverything) 79 go dc.Run(testCtx.Scheduler.StopEverything) 80 return dc 81} 82 83// initTest initializes a test environment and creates API server and scheduler with default 84// configuration. 85func initTest(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext { 86 testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), nil, opts...) 87 testutils.SyncInformerFactory(testCtx) 88 go testCtx.Scheduler.Run(testCtx.Ctx) 89 return testCtx 90} 91 92// initTestDisablePreemption initializes a test environment and creates API server and scheduler with default 93// configuration but with pod preemption disabled. 94func initTestDisablePreemption(t *testing.T, nsPrefix string) *testutils.TestContext { 95 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 96 Profiles: []v1beta2.KubeSchedulerProfile{{ 97 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 98 Plugins: &v1beta2.Plugins{ 99 PostFilter: v1beta2.PluginSet{ 100 Disabled: []v1beta2.Plugin{ 101 {Name: defaultpreemption.Name}, 102 }, 103 }, 104 }, 105 }}, 106 }) 107 testCtx := testutils.InitTestSchedulerWithOptions( 108 t, testutils.InitTestAPIServer(t, nsPrefix, nil), nil, 109 scheduler.WithProfiles(cfg.Profiles...)) 110 testutils.SyncInformerFactory(testCtx) 111 go testCtx.Scheduler.Run(testCtx.Ctx) 112 return testCtx 113} 114 115// waitForReflection waits till the passFunc confirms that the object it expects 116// to see is in the store. Used to observe reflected events. 117func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, 118 passFunc func(n interface{}) bool) error { 119 var nodes []*v1.Node 120 err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { 121 n, err := nodeLister.Get(key) 122 123 switch { 124 case err == nil && passFunc(n): 125 return true, nil 126 case apierrors.IsNotFound(err): 127 nodes = append(nodes, nil) 128 case err != nil: 129 t.Errorf("Unexpected error: %v", err) 130 default: 131 nodes = append(nodes, n) 132 } 133 134 return false, nil 135 }) 136 if err != nil { 137 t.Logf("Logging consecutive node versions received from store:") 138 for i, n := range nodes { 139 t.Logf("%d: %#v", i, n) 140 } 141 } 142 return err 143} 144 145func createNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { 146 return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) 147} 148 149// createNodes creates `numNodes` nodes. The created node names will be in the 150// form of "`prefix`-X" where X is an ordinal. 151// DEPRECATED 152// use createAndWaitForNodesInCache instead, which ensures the created nodes 153// to be present in scheduler cache. 154func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { 155 nodes := make([]*v1.Node, numNodes) 156 for i := 0; i < numNodes; i++ { 157 nodeName := fmt.Sprintf("%v-%d", prefix, i) 158 node, err := createNode(cs, wrapper.Name(nodeName).Obj()) 159 if err != nil { 160 return nodes[:], err 161 } 162 nodes[i] = node 163 } 164 return nodes[:], nil 165} 166 167// createAndWaitForNodesInCache calls createNodes(), and wait for the created 168// nodes to be present in scheduler cache. 169func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { 170 existingNodes := testCtx.Scheduler.SchedulerCache.NodeCount() 171 nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) 172 if err != nil { 173 return nodes, fmt.Errorf("cannot create nodes: %v", err) 174 } 175 return nodes, waitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes) 176} 177 178// waitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache 179// within 30 seconds; otherwise returns false. 180func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { 181 err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { 182 return sched.SchedulerCache.NodeCount() >= nodeCount, nil 183 }) 184 if err != nil { 185 return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) 186 } 187 return nil 188} 189 190type pausePodConfig struct { 191 Name string 192 Namespace string 193 Affinity *v1.Affinity 194 Annotations, Labels, NodeSelector map[string]string 195 Resources *v1.ResourceRequirements 196 Tolerations []v1.Toleration 197 NodeName string 198 SchedulerName string 199 Priority *int32 200 PreemptionPolicy *v1.PreemptionPolicy 201 PriorityClassName string 202} 203 204// initPausePod initializes a pod API object from the given config. It is used 205// mainly in pod creation process. 206func initPausePod(conf *pausePodConfig) *v1.Pod { 207 pod := &v1.Pod{ 208 ObjectMeta: metav1.ObjectMeta{ 209 Name: conf.Name, 210 Namespace: conf.Namespace, 211 Labels: conf.Labels, 212 Annotations: conf.Annotations, 213 }, 214 Spec: v1.PodSpec{ 215 NodeSelector: conf.NodeSelector, 216 Affinity: conf.Affinity, 217 Containers: []v1.Container{ 218 { 219 Name: conf.Name, 220 Image: imageutils.GetPauseImageName(), 221 }, 222 }, 223 Tolerations: conf.Tolerations, 224 NodeName: conf.NodeName, 225 SchedulerName: conf.SchedulerName, 226 Priority: conf.Priority, 227 PreemptionPolicy: conf.PreemptionPolicy, 228 PriorityClassName: conf.PriorityClassName, 229 }, 230 } 231 if conf.Resources != nil { 232 pod.Spec.Containers[0].Resources = *conf.Resources 233 } 234 return pod 235} 236 237// createPausePod creates a pod with "Pause" image and the given config and 238// return its pointer and error status. 239func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) { 240 return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{}) 241} 242 243// createPausePodWithResource creates a pod with "Pause" image and the given 244// resources and returns its pointer and error status. The resource list can be 245// nil. 246func createPausePodWithResource(cs clientset.Interface, podName string, 247 nsName string, res *v1.ResourceList) (*v1.Pod, error) { 248 var conf pausePodConfig 249 if res == nil { 250 conf = pausePodConfig{ 251 Name: podName, 252 Namespace: nsName, 253 } 254 } else { 255 conf = pausePodConfig{ 256 Name: podName, 257 Namespace: nsName, 258 Resources: &v1.ResourceRequirements{ 259 Requests: *res, 260 }, 261 } 262 } 263 return createPausePod(cs, initPausePod(&conf)) 264} 265 266// runPausePod creates a pod with "Pause" image and the given config and waits 267// until it is scheduled. It returns its pointer and error status. 268func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { 269 pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) 270 if err != nil { 271 return nil, fmt.Errorf("failed to create pause pod: %v", err) 272 } 273 if err = testutils.WaitForPodToSchedule(cs, pod); err != nil { 274 return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err) 275 } 276 if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { 277 return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err) 278 } 279 return pod, nil 280} 281 282type podWithContainersConfig struct { 283 Name string 284 Namespace string 285 Containers []v1.Container 286} 287 288// initPodWithContainers initializes a pod API object from the given config. This is used primarily for generating 289// pods with containers each having a specific image. 290func initPodWithContainers(cs clientset.Interface, conf *podWithContainersConfig) *v1.Pod { 291 pod := &v1.Pod{ 292 ObjectMeta: metav1.ObjectMeta{ 293 Name: conf.Name, 294 Namespace: conf.Namespace, 295 }, 296 Spec: v1.PodSpec{ 297 Containers: conf.Containers, 298 }, 299 } 300 return pod 301} 302 303// runPodWithContainers creates a pod with given config and containers and waits 304// until it is scheduled. It returns its pointer and error status. 305func runPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { 306 pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) 307 if err != nil { 308 return nil, fmt.Errorf("failed to create pod-with-containers: %v", err) 309 } 310 if err = testutils.WaitForPodToSchedule(cs, pod); err != nil { 311 return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err) 312 } 313 if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { 314 return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err) 315 } 316 return pod, nil 317} 318 319// podIsGettingEvicted returns true if the pod's deletion timestamp is set. 320func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { 321 return func() (bool, error) { 322 pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) 323 if err != nil { 324 return false, err 325 } 326 if pod.DeletionTimestamp != nil { 327 return true, nil 328 } 329 return false, nil 330 } 331} 332 333// podScheduledIn returns true if a given pod is placed onto one of the expected nodes. 334func podScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc { 335 return func() (bool, error) { 336 pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) 337 if err != nil { 338 // This could be a connection error so we want to retry. 339 return false, nil 340 } 341 if pod.Spec.NodeName == "" { 342 return false, nil 343 } 344 for _, nodeName := range nodeNames { 345 if pod.Spec.NodeName == nodeName { 346 return true, nil 347 } 348 } 349 return false, nil 350 } 351} 352 353// podUnschedulable returns a condition function that returns true if the given pod 354// gets unschedulable status. 355func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { 356 return func() (bool, error) { 357 pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) 358 if err != nil { 359 // This could be a connection error so we want to retry. 360 return false, nil 361 } 362 _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) 363 return cond != nil && cond.Status == v1.ConditionFalse && 364 cond.Reason == v1.PodReasonUnschedulable, nil 365 } 366} 367 368// podSchedulingError returns a condition function that returns true if the given pod 369// gets unschedulable status for reasons other than "Unschedulable". The scheduler 370// records such reasons in case of error. 371func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { 372 return func() (bool, error) { 373 pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) 374 if err != nil { 375 // This could be a connection error so we want to retry. 376 return false, nil 377 } 378 _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) 379 return cond != nil && cond.Status == v1.ConditionFalse && 380 cond.Reason != v1.PodReasonUnschedulable, nil 381 } 382} 383 384// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns 385// an error if it does not become unschedulable within the given timeout. 386func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { 387 return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name)) 388} 389 390// waitForPodUnschedule waits for a pod to fail scheduling and returns 391// an error if it does not become unschedulable within the timeout duration (30 seconds). 392func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { 393 return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) 394} 395 396// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns 397// an error if it does not scheduled within the given timeout. 398func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { 399 return wait.Poll(100*time.Millisecond, timeout, podScheduled(cs, pod.Namespace, pod.Name)) 400} 401 402// waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to 403// the expected values. 404func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { 405 return wait.Poll(time.Second, 60*time.Second, func() (bool, error) { 406 pdbList, err := testCtx.ClientSet.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{}) 407 if err != nil { 408 return false, err 409 } 410 if len(pdbList.Items) != len(pdbs) { 411 return false, nil 412 } 413 for i, pdb := range pdbs { 414 found := false 415 for _, cpdb := range pdbList.Items { 416 if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace { 417 found = true 418 if cpdb.Status.CurrentHealthy != pdbPodNum[i] { 419 return false, nil 420 } 421 } 422 } 423 if !found { 424 return false, nil 425 } 426 } 427 return true, nil 428 }) 429} 430 431// waitCachedPodsStable waits until scheduler cache has the given pods. 432func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error { 433 return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { 434 cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount() 435 if err != nil { 436 return false, err 437 } 438 if len(pods) != cachedPods { 439 return false, nil 440 } 441 for _, p := range pods { 442 actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{}) 443 if err1 != nil { 444 return false, err1 445 } 446 cachedPod, err2 := testCtx.Scheduler.SchedulerCache.GetPod(actualPod) 447 if err2 != nil || cachedPod == nil { 448 return false, err2 449 } 450 } 451 return true, nil 452 }) 453} 454 455// deletePod deletes the given pod in the given namespace. 456func deletePod(cs clientset.Interface, podName string, nsName string) error { 457 return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0)) 458} 459 460func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) { 461 return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) 462} 463 464// noPodsInNamespace returns true if no pods in the given namespace. 465func noPodsInNamespace(c clientset.Interface, podNamespace string) wait.ConditionFunc { 466 return func() (bool, error) { 467 pods, err := c.CoreV1().Pods(podNamespace).List(context.TODO(), metav1.ListOptions{}) 468 if err != nil { 469 return false, err 470 } 471 472 return len(pods.Items) == 0, nil 473 } 474} 475 476// cleanupPodsInNamespace deletes the pods in the given namespace and waits for them to 477// be actually deleted. They are removed with no grace. 478func cleanupPodsInNamespace(cs clientset.Interface, t *testing.T, ns string) { 479 t.Helper() 480 481 zero := int64(0) 482 if err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{GracePeriodSeconds: &zero}, metav1.ListOptions{}); err != nil { 483 t.Errorf("error while listing pod in namespace %v: %v", ns, err) 484 return 485 } 486 487 if err := wait.Poll(time.Second, wait.ForeverTestTimeout, 488 noPodsInNamespace(cs, ns)); err != nil { 489 t.Errorf("error while waiting for pods in namespace %v: %v", ns, err) 490 } 491} 492 493// podScheduled returns true if a node is assigned to the given pod. 494func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { 495 return func() (bool, error) { 496 pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) 497 if err != nil { 498 // This could be a connection error so we want to retry. 499 return false, nil 500 } 501 if pod.Spec.NodeName == "" { 502 return false, nil 503 } 504 return true, nil 505 } 506} 507 508func createNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error { 509 for _, n := range namespaces { 510 ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}} 511 if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil { 512 return err 513 } 514 } 515 return nil 516} 517