1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package scheduler
18
19import (
20	"context"
21	"fmt"
22	"testing"
23	"time"
24
25	v1 "k8s.io/api/core/v1"
26	policy "k8s.io/api/policy/v1beta1"
27	apierrors "k8s.io/apimachinery/pkg/api/errors"
28	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29	"k8s.io/apimachinery/pkg/util/wait"
30	cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
31	"k8s.io/client-go/dynamic"
32	"k8s.io/client-go/informers"
33	clientset "k8s.io/client-go/kubernetes"
34	corelisters "k8s.io/client-go/listers/core/v1"
35	restclient "k8s.io/client-go/rest"
36	"k8s.io/client-go/restmapper"
37	"k8s.io/client-go/scale"
38	"k8s.io/kube-scheduler/config/v1beta2"
39	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
40	"k8s.io/kubernetes/pkg/controller/disruption"
41	"k8s.io/kubernetes/pkg/scheduler"
42	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
43	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
44	st "k8s.io/kubernetes/pkg/scheduler/testing"
45	testutils "k8s.io/kubernetes/test/integration/util"
46	imageutils "k8s.io/kubernetes/test/utils/image"
47	"k8s.io/utils/pointer"
48)
49
50// initDisruptionController initializes and runs a Disruption Controller to properly
51// update PodDisuptionBudget objects.
52func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *disruption.DisruptionController {
53	informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
54
55	discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
56	mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
57
58	config := restclient.Config{Host: testCtx.HTTPServer.URL}
59	scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery())
60	scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
61	if err != nil {
62		t.Fatalf("Error in create scaleClient: %v", err)
63	}
64
65	dc := disruption.NewDisruptionController(
66		informers.Core().V1().Pods(),
67		informers.Policy().V1().PodDisruptionBudgets(),
68		informers.Core().V1().ReplicationControllers(),
69		informers.Apps().V1().ReplicaSets(),
70		informers.Apps().V1().Deployments(),
71		informers.Apps().V1().StatefulSets(),
72		testCtx.ClientSet,
73		mapper,
74		scaleClient,
75		testCtx.ClientSet.Discovery())
76
77	informers.Start(testCtx.Scheduler.StopEverything)
78	informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
79	go dc.Run(testCtx.Scheduler.StopEverything)
80	return dc
81}
82
83// initTest initializes a test environment and creates API server and scheduler with default
84// configuration.
85func initTest(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext {
86	testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), nil, opts...)
87	testutils.SyncInformerFactory(testCtx)
88	go testCtx.Scheduler.Run(testCtx.Ctx)
89	return testCtx
90}
91
92// initTestDisablePreemption initializes a test environment and creates API server and scheduler with default
93// configuration but with pod preemption disabled.
94func initTestDisablePreemption(t *testing.T, nsPrefix string) *testutils.TestContext {
95	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
96		Profiles: []v1beta2.KubeSchedulerProfile{{
97			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
98			Plugins: &v1beta2.Plugins{
99				PostFilter: v1beta2.PluginSet{
100					Disabled: []v1beta2.Plugin{
101						{Name: defaultpreemption.Name},
102					},
103				},
104			},
105		}},
106	})
107	testCtx := testutils.InitTestSchedulerWithOptions(
108		t, testutils.InitTestAPIServer(t, nsPrefix, nil), nil,
109		scheduler.WithProfiles(cfg.Profiles...))
110	testutils.SyncInformerFactory(testCtx)
111	go testCtx.Scheduler.Run(testCtx.Ctx)
112	return testCtx
113}
114
115// waitForReflection waits till the passFunc confirms that the object it expects
116// to see is in the store. Used to observe reflected events.
117func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
118	passFunc func(n interface{}) bool) error {
119	var nodes []*v1.Node
120	err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
121		n, err := nodeLister.Get(key)
122
123		switch {
124		case err == nil && passFunc(n):
125			return true, nil
126		case apierrors.IsNotFound(err):
127			nodes = append(nodes, nil)
128		case err != nil:
129			t.Errorf("Unexpected error: %v", err)
130		default:
131			nodes = append(nodes, n)
132		}
133
134		return false, nil
135	})
136	if err != nil {
137		t.Logf("Logging consecutive node versions received from store:")
138		for i, n := range nodes {
139			t.Logf("%d: %#v", i, n)
140		}
141	}
142	return err
143}
144
145func createNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
146	return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
147}
148
149// createNodes creates `numNodes` nodes. The created node names will be in the
150// form of "`prefix`-X" where X is an ordinal.
151// DEPRECATED
152// use createAndWaitForNodesInCache instead, which ensures the created nodes
153// to be present in scheduler cache.
154func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
155	nodes := make([]*v1.Node, numNodes)
156	for i := 0; i < numNodes; i++ {
157		nodeName := fmt.Sprintf("%v-%d", prefix, i)
158		node, err := createNode(cs, wrapper.Name(nodeName).Obj())
159		if err != nil {
160			return nodes[:], err
161		}
162		nodes[i] = node
163	}
164	return nodes[:], nil
165}
166
167// createAndWaitForNodesInCache calls createNodes(), and wait for the created
168// nodes to be present in scheduler cache.
169func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
170	existingNodes := testCtx.Scheduler.SchedulerCache.NodeCount()
171	nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
172	if err != nil {
173		return nodes, fmt.Errorf("cannot create nodes: %v", err)
174	}
175	return nodes, waitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes)
176}
177
178// waitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache
179// within 30 seconds; otherwise returns false.
180func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
181	err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
182		return sched.SchedulerCache.NodeCount() >= nodeCount, nil
183	})
184	if err != nil {
185		return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
186	}
187	return nil
188}
189
190type pausePodConfig struct {
191	Name                              string
192	Namespace                         string
193	Affinity                          *v1.Affinity
194	Annotations, Labels, NodeSelector map[string]string
195	Resources                         *v1.ResourceRequirements
196	Tolerations                       []v1.Toleration
197	NodeName                          string
198	SchedulerName                     string
199	Priority                          *int32
200	PreemptionPolicy                  *v1.PreemptionPolicy
201	PriorityClassName                 string
202}
203
204// initPausePod initializes a pod API object from the given config. It is used
205// mainly in pod creation process.
206func initPausePod(conf *pausePodConfig) *v1.Pod {
207	pod := &v1.Pod{
208		ObjectMeta: metav1.ObjectMeta{
209			Name:        conf.Name,
210			Namespace:   conf.Namespace,
211			Labels:      conf.Labels,
212			Annotations: conf.Annotations,
213		},
214		Spec: v1.PodSpec{
215			NodeSelector: conf.NodeSelector,
216			Affinity:     conf.Affinity,
217			Containers: []v1.Container{
218				{
219					Name:  conf.Name,
220					Image: imageutils.GetPauseImageName(),
221				},
222			},
223			Tolerations:       conf.Tolerations,
224			NodeName:          conf.NodeName,
225			SchedulerName:     conf.SchedulerName,
226			Priority:          conf.Priority,
227			PreemptionPolicy:  conf.PreemptionPolicy,
228			PriorityClassName: conf.PriorityClassName,
229		},
230	}
231	if conf.Resources != nil {
232		pod.Spec.Containers[0].Resources = *conf.Resources
233	}
234	return pod
235}
236
237// createPausePod creates a pod with "Pause" image and the given config and
238// return its pointer and error status.
239func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
240	return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{})
241}
242
243// createPausePodWithResource creates a pod with "Pause" image and the given
244// resources and returns its pointer and error status. The resource list can be
245// nil.
246func createPausePodWithResource(cs clientset.Interface, podName string,
247	nsName string, res *v1.ResourceList) (*v1.Pod, error) {
248	var conf pausePodConfig
249	if res == nil {
250		conf = pausePodConfig{
251			Name:      podName,
252			Namespace: nsName,
253		}
254	} else {
255		conf = pausePodConfig{
256			Name:      podName,
257			Namespace: nsName,
258			Resources: &v1.ResourceRequirements{
259				Requests: *res,
260			},
261		}
262	}
263	return createPausePod(cs, initPausePod(&conf))
264}
265
266// runPausePod creates a pod with "Pause" image and the given config and waits
267// until it is scheduled. It returns its pointer and error status.
268func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
269	pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
270	if err != nil {
271		return nil, fmt.Errorf("failed to create pause pod: %v", err)
272	}
273	if err = testutils.WaitForPodToSchedule(cs, pod); err != nil {
274		return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
275	}
276	if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
277		return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err)
278	}
279	return pod, nil
280}
281
282type podWithContainersConfig struct {
283	Name       string
284	Namespace  string
285	Containers []v1.Container
286}
287
288// initPodWithContainers initializes a pod API object from the given config. This is used primarily for generating
289// pods with containers each having a specific image.
290func initPodWithContainers(cs clientset.Interface, conf *podWithContainersConfig) *v1.Pod {
291	pod := &v1.Pod{
292		ObjectMeta: metav1.ObjectMeta{
293			Name:      conf.Name,
294			Namespace: conf.Namespace,
295		},
296		Spec: v1.PodSpec{
297			Containers: conf.Containers,
298		},
299	}
300	return pod
301}
302
303// runPodWithContainers creates a pod with given config and containers and waits
304// until it is scheduled. It returns its pointer and error status.
305func runPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
306	pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
307	if err != nil {
308		return nil, fmt.Errorf("failed to create pod-with-containers: %v", err)
309	}
310	if err = testutils.WaitForPodToSchedule(cs, pod); err != nil {
311		return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
312	}
313	if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
314		return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err)
315	}
316	return pod, nil
317}
318
319// podIsGettingEvicted returns true if the pod's deletion timestamp is set.
320func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
321	return func() (bool, error) {
322		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
323		if err != nil {
324			return false, err
325		}
326		if pod.DeletionTimestamp != nil {
327			return true, nil
328		}
329		return false, nil
330	}
331}
332
333// podScheduledIn returns true if a given pod is placed onto one of the expected nodes.
334func podScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc {
335	return func() (bool, error) {
336		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
337		if err != nil {
338			// This could be a connection error so we want to retry.
339			return false, nil
340		}
341		if pod.Spec.NodeName == "" {
342			return false, nil
343		}
344		for _, nodeName := range nodeNames {
345			if pod.Spec.NodeName == nodeName {
346				return true, nil
347			}
348		}
349		return false, nil
350	}
351}
352
353// podUnschedulable returns a condition function that returns true if the given pod
354// gets unschedulable status.
355func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
356	return func() (bool, error) {
357		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
358		if err != nil {
359			// This could be a connection error so we want to retry.
360			return false, nil
361		}
362		_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
363		return cond != nil && cond.Status == v1.ConditionFalse &&
364			cond.Reason == v1.PodReasonUnschedulable, nil
365	}
366}
367
368// podSchedulingError returns a condition function that returns true if the given pod
369// gets unschedulable status for reasons other than "Unschedulable". The scheduler
370// records such reasons in case of error.
371func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
372	return func() (bool, error) {
373		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
374		if err != nil {
375			// This could be a connection error so we want to retry.
376			return false, nil
377		}
378		_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
379		return cond != nil && cond.Status == v1.ConditionFalse &&
380			cond.Reason != v1.PodReasonUnschedulable, nil
381	}
382}
383
384// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
385// an error if it does not become unschedulable within the given timeout.
386func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
387	return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
388}
389
390// waitForPodUnschedule waits for a pod to fail scheduling and returns
391// an error if it does not become unschedulable within the timeout duration (30 seconds).
392func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
393	return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
394}
395
396// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
397// an error if it does not scheduled within the given timeout.
398func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
399	return wait.Poll(100*time.Millisecond, timeout, podScheduled(cs, pod.Namespace, pod.Name))
400}
401
402// waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
403// the expected values.
404func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
405	return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
406		pdbList, err := testCtx.ClientSet.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{})
407		if err != nil {
408			return false, err
409		}
410		if len(pdbList.Items) != len(pdbs) {
411			return false, nil
412		}
413		for i, pdb := range pdbs {
414			found := false
415			for _, cpdb := range pdbList.Items {
416				if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
417					found = true
418					if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
419						return false, nil
420					}
421				}
422			}
423			if !found {
424				return false, nil
425			}
426		}
427		return true, nil
428	})
429}
430
431// waitCachedPodsStable waits until scheduler cache has the given pods.
432func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
433	return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
434		cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount()
435		if err != nil {
436			return false, err
437		}
438		if len(pods) != cachedPods {
439			return false, nil
440		}
441		for _, p := range pods {
442			actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
443			if err1 != nil {
444				return false, err1
445			}
446			cachedPod, err2 := testCtx.Scheduler.SchedulerCache.GetPod(actualPod)
447			if err2 != nil || cachedPod == nil {
448				return false, err2
449			}
450		}
451		return true, nil
452	})
453}
454
455// deletePod deletes the given pod in the given namespace.
456func deletePod(cs clientset.Interface, podName string, nsName string) error {
457	return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0))
458}
459
460func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
461	return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
462}
463
464// noPodsInNamespace returns true if no pods in the given namespace.
465func noPodsInNamespace(c clientset.Interface, podNamespace string) wait.ConditionFunc {
466	return func() (bool, error) {
467		pods, err := c.CoreV1().Pods(podNamespace).List(context.TODO(), metav1.ListOptions{})
468		if err != nil {
469			return false, err
470		}
471
472		return len(pods.Items) == 0, nil
473	}
474}
475
476// cleanupPodsInNamespace deletes the pods in the given namespace and waits for them to
477// be actually deleted.  They are removed with no grace.
478func cleanupPodsInNamespace(cs clientset.Interface, t *testing.T, ns string) {
479	t.Helper()
480
481	zero := int64(0)
482	if err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{GracePeriodSeconds: &zero}, metav1.ListOptions{}); err != nil {
483		t.Errorf("error while listing pod in namespace %v: %v", ns, err)
484		return
485	}
486
487	if err := wait.Poll(time.Second, wait.ForeverTestTimeout,
488		noPodsInNamespace(cs, ns)); err != nil {
489		t.Errorf("error while waiting for pods in namespace %v: %v", ns, err)
490	}
491}
492
493// podScheduled returns true if a node is assigned to the given pod.
494func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
495	return func() (bool, error) {
496		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
497		if err != nil {
498			// This could be a connection error so we want to retry.
499			return false, nil
500		}
501		if pod.Spec.NodeName == "" {
502			return false, nil
503		}
504		return true, nil
505	}
506}
507
508func createNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error {
509	for _, n := range namespaces {
510		ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}}
511		if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil {
512			return err
513		}
514	}
515	return nil
516}
517