1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package apps
18
19import (
20	"context"
21	"fmt"
22	"strconv"
23	"time"
24
25	batchv1 "k8s.io/api/batch/v1"
26	v1 "k8s.io/api/core/v1"
27	apierrors "k8s.io/apimachinery/pkg/api/errors"
28	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29	"k8s.io/apimachinery/pkg/util/sets"
30	"k8s.io/apimachinery/pkg/util/wait"
31	clientset "k8s.io/client-go/kubernetes"
32	batchinternal "k8s.io/kubernetes/pkg/apis/batch"
33	"k8s.io/kubernetes/test/e2e/framework"
34	e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
35	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
36	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
37	e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
38	"k8s.io/utils/pointer"
39
40	"github.com/onsi/ginkgo"
41	"github.com/onsi/gomega"
42)
43
44var _ = SIGDescribe("Job", func() {
45	f := framework.NewDefaultFramework("job")
46	parallelism := int32(2)
47	completions := int32(4)
48	backoffLimit := int32(6) // default value
49
50	// Simplest case: N pods succeed
51	ginkgo.It("should run a job to completion when tasks succeed", func() {
52		ginkgo.By("Creating a job")
53		job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
54		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
55		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
56
57		ginkgo.By("Ensuring job reaches completions")
58		err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
59		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
60
61		ginkgo.By("Ensuring pods for job exist")
62		pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
63		framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
64		successes := int32(0)
65		for _, pod := range pods.Items {
66			if pod.Status.Phase == v1.PodSucceeded {
67				successes++
68			}
69		}
70		framework.ExpectEqual(successes, completions, "epected %d successful job pods, but got  %d", completions, successes)
71	})
72
73	ginkgo.It("should not create pods when created in suspend state", func() {
74		ginkgo.By("Creating a job with suspend=true")
75		job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
76		job.Spec.Suspend = pointer.BoolPtr(true)
77		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
78		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
79
80		ginkgo.By("Ensuring pods aren't created for job")
81		framework.ExpectEqual(wait.Poll(framework.Poll, wait.ForeverTestTimeout, func() (bool, error) {
82			pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
83			if err != nil {
84				return false, err
85			}
86			return len(pods.Items) > 0, nil
87		}), wait.ErrWaitTimeout)
88
89		ginkgo.By("Checking Job status to observe Suspended state")
90		job, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
91		framework.ExpectNoError(err, "failed to retrieve latest job object")
92		exists := false
93		for _, c := range job.Status.Conditions {
94			if c.Type == batchv1.JobSuspended {
95				exists = true
96				break
97			}
98		}
99		framework.ExpectEqual(exists, true)
100
101		ginkgo.By("Updating the job with suspend=false")
102		job.Spec.Suspend = pointer.BoolPtr(false)
103		job, err = e2ejob.UpdateJob(f.ClientSet, f.Namespace.Name, job)
104		framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
105
106		ginkgo.By("Waiting for job to complete")
107		err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
108		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
109	})
110
111	ginkgo.It("should delete pods when suspended", func() {
112		ginkgo.By("Creating a job with suspend=false")
113		job := e2ejob.NewTestJob("notTerminate", "suspend-false-to-true", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
114		job.Spec.Suspend = pointer.BoolPtr(false)
115		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
116		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
117
118		ginkgo.By("Ensure pods equal to paralellism count is attached to the job")
119		err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
120		framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
121
122		ginkgo.By("Updating the job with suspend=true")
123		job, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
124		framework.ExpectNoError(err, "failed to retrieve latest job object")
125		job.Spec.Suspend = pointer.BoolPtr(true)
126		job, err = e2ejob.UpdateJob(f.ClientSet, f.Namespace.Name, job)
127		framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
128
129		ginkgo.By("Ensuring pods are deleted")
130		err = e2ejob.WaitForAllJobPodsGone(f.ClientSet, f.Namespace.Name, job.Name)
131		framework.ExpectNoError(err, "failed to ensure pods are deleted after suspend=true")
132
133		ginkgo.By("Checking Job status to observe Suspended state")
134		job, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
135		framework.ExpectNoError(err, "failed to retrieve latest job object")
136		exists := false
137		for _, c := range job.Status.Conditions {
138			if c.Type == batchv1.JobSuspended {
139				exists = true
140				break
141			}
142		}
143		framework.ExpectEqual(exists, true)
144	})
145
146	/*
147		Testcase: Ensure Pods of an Indexed Job get a unique index.
148		Description: Create an Indexed Job, wait for completion, capture the output of the pods and verify that they contain the completion index.
149	*/
150	ginkgo.It("should create pods for an Indexed job with completion indexes and specified hostname", func() {
151		ginkgo.By("Creating Indexed job")
152		job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
153		mode := batchv1.IndexedCompletion
154		job.Spec.CompletionMode = &mode
155		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
156		framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name)
157
158		ginkgo.By("Ensuring job reaches completions")
159		err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
160		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
161
162		ginkgo.By("Ensuring pods with index for job exist")
163		pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
164		framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
165		succeededIndexes := sets.NewInt()
166		for _, pod := range pods.Items {
167			if pod.Status.Phase == v1.PodSucceeded && pod.Annotations != nil {
168				ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotation])
169				framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name)
170				succeededIndexes.Insert(ix)
171				expectedName := fmt.Sprintf("%s-%d", job.Name, ix)
172				framework.ExpectEqual(pod.Spec.Hostname, expectedName, "expected completed pod with hostname %s, but got %s", expectedName, pod.Spec.Hostname)
173			}
174		}
175		gotIndexes := succeededIndexes.List()
176		wantIndexes := []int{0, 1, 2, 3}
177		framework.ExpectEqual(gotIndexes, wantIndexes, "expected completed indexes %s, but got %s", wantIndexes, gotIndexes)
178	})
179
180	/*
181		Testcase: Ensure that the pods associated with the job are removed once the job is deleted
182		Description: Create a job and ensure the associated pod count is equal to paralellism count. Delete the
183		job and ensure if the pods associated with the job have been removed
184	*/
185	ginkgo.It("should remove pods when job is deleted", func() {
186		ginkgo.By("Creating a job")
187		job := e2ejob.NewTestJob("notTerminate", "all-pods-removed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
188		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
189		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
190
191		ginkgo.By("Ensure pods equal to paralellism count is attached to the job")
192		err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
193		framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
194
195		ginkgo.By("Delete the job")
196		err = e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name)
197		framework.ExpectNoError(err, "failed to delete the job in namespace: %s", f.Namespace.Name)
198
199		ginkgo.By("Ensure the pods associated with the job are also deleted")
200		err = e2ejob.WaitForAllJobPodsGone(f.ClientSet, f.Namespace.Name, job.Name)
201		framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
202	})
203
204	/*
205		Release: v1.16
206		Testname: Jobs, completion after task failure
207		Description: Explicitly cause the tasks to fail once initially. After restarting, the Job MUST
208		execute to completion.
209	*/
210	framework.ConformanceIt("should run a job to completion when tasks sometimes fail and are locally restarted", func() {
211		ginkgo.By("Creating a job")
212		// One failure, then a success, local restarts.
213		// We can't use the random failure approach, because kubelet will
214		// throttle frequently failing containers in a given pod, ramping
215		// up to 5 minutes between restarts, making test timeout due to
216		// successive failures too likely with a reasonable test timeout.
217		job := e2ejob.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit)
218		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
219		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
220
221		ginkgo.By("Ensuring job reaches completions")
222		err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
223		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
224	})
225
226	// Pods sometimes fail, but eventually succeed, after pod restarts
227	ginkgo.It("should run a job to completion when tasks sometimes fail and are not locally restarted", func() {
228		// One failure, then a success, no local restarts.
229		// We can't use the random failure approach, because JobController
230		// will throttle frequently failing Pods of a given Job, ramping
231		// up to 6 minutes between restarts, making test timeout due to
232		// successive failures.
233		// Instead, we force the Job's Pods to be scheduled to a single Node
234		// and use a hostPath volume to persist data across new Pods.
235		ginkgo.By("Looking for a node to schedule job pod")
236		node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
237		framework.ExpectNoError(err)
238
239		ginkgo.By("Creating a job")
240		job := e2ejob.NewTestJobOnNode("failOnce", "fail-once-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
241		job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
242		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
243
244		ginkgo.By("Ensuring job reaches completions")
245		err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
246		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
247	})
248
249	ginkgo.It("should fail when exceeds active deadline", func() {
250		ginkgo.By("Creating a job")
251		var activeDeadlineSeconds int64 = 1
252		job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
253		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
254		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
255		ginkgo.By("Ensuring job past active deadline")
256		err = waitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+15)*time.Second, "DeadlineExceeded")
257		framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name)
258	})
259
260	/*
261		Release: v1.15
262		Testname: Jobs, active pods, graceful termination
263		Description: Create a job. Ensure the active pods reflect paralellism in the namespace and delete the job. Job MUST be deleted successfully.
264	*/
265	framework.ConformanceIt("should delete a job", func() {
266		ginkgo.By("Creating a job")
267		job := e2ejob.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
268		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
269		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
270
271		ginkgo.By("Ensuring active pods == parallelism")
272		err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
273		framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
274
275		ginkgo.By("delete a job")
276		framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
277
278		ginkgo.By("Ensuring job was deleted")
279		_, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
280		framework.ExpectError(err, "failed to ensure job %s was deleted in namespace: %s", job.Name, f.Namespace.Name)
281		framework.ExpectEqual(apierrors.IsNotFound(err), true)
282	})
283
284	/*
285		Release: v1.16
286		Testname: Jobs, orphan pods, re-adoption
287		Description: Create a parallel job. The number of Pods MUST equal the level of parallelism.
288		Orphan a Pod by modifying its owner reference. The Job MUST re-adopt the orphan pod.
289		Modify the labels of one of the Job's Pods. The Job MUST release the Pod.
290	*/
291	framework.ConformanceIt("should adopt matching orphans and release non-matching pods", func() {
292		ginkgo.By("Creating a job")
293		job := e2ejob.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
294		// Replace job with the one returned from Create() so it has the UID.
295		// Save Kind since it won't be populated in the returned job.
296		kind := job.Kind
297		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
298		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
299		job.Kind = kind
300
301		ginkgo.By("Ensuring active pods == parallelism")
302		err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
303		framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
304
305		ginkgo.By("Orphaning one of the Job's Pods")
306		pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
307		framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
308		gomega.Expect(pods.Items).To(gomega.HaveLen(int(parallelism)))
309		pod := pods.Items[0]
310		f.PodClient().Update(pod.Name, func(pod *v1.Pod) {
311			pod.OwnerReferences = nil
312		})
313
314		ginkgo.By("Checking that the Job readopts the Pod")
315		gomega.Expect(e2epod.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", e2ejob.JobTimeout,
316			func(pod *v1.Pod) (bool, error) {
317				controllerRef := metav1.GetControllerOf(pod)
318				if controllerRef == nil {
319					return false, nil
320				}
321				if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID {
322					return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job)
323				}
324				return true, nil
325			},
326		)).To(gomega.Succeed(), "wait for pod %q to be readopted", pod.Name)
327
328		ginkgo.By("Removing the labels from the Job's Pod")
329		f.PodClient().Update(pod.Name, func(pod *v1.Pod) {
330			pod.Labels = nil
331		})
332
333		ginkgo.By("Checking that the Job releases the Pod")
334		gomega.Expect(e2epod.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", e2ejob.JobTimeout,
335			func(pod *v1.Pod) (bool, error) {
336				controllerRef := metav1.GetControllerOf(pod)
337				if controllerRef != nil {
338					return false, nil
339				}
340				return true, nil
341			},
342		)).To(gomega.Succeed(), "wait for pod %q to be released", pod.Name)
343	})
344
345	ginkgo.It("should fail to exceed backoffLimit", func() {
346		ginkgo.By("Creating a job")
347		backoff := 1
348		job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
349		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
350		framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
351		ginkgo.By("Ensuring job exceed backofflimit")
352
353		err = waitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, e2ejob.JobTimeout, "BackoffLimitExceeded")
354		framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name)
355
356		ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
357		pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
358		framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
359		gomega.Expect(pods.Items).To(gomega.HaveLen(backoff + 1))
360		for _, pod := range pods.Items {
361			framework.ExpectEqual(pod.Status.Phase, v1.PodFailed)
362		}
363	})
364})
365
366// waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
367func waitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {
368	return wait.Poll(framework.Poll, timeout, func() (bool, error) {
369		curr, err := c.BatchV1().Jobs(ns).Get(context.TODO(), jobName, metav1.GetOptions{})
370		if err != nil {
371			return false, err
372		}
373		for _, c := range curr.Status.Conditions {
374			if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue {
375				if reason == "" || reason == c.Reason {
376					return true, nil
377				}
378			}
379		}
380		return false, nil
381	})
382}
383