1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package apps 18 19import ( 20 "context" 21 "fmt" 22 "strconv" 23 "time" 24 25 batchv1 "k8s.io/api/batch/v1" 26 v1 "k8s.io/api/core/v1" 27 apierrors "k8s.io/apimachinery/pkg/api/errors" 28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 29 "k8s.io/apimachinery/pkg/util/sets" 30 "k8s.io/apimachinery/pkg/util/wait" 31 clientset "k8s.io/client-go/kubernetes" 32 batchinternal "k8s.io/kubernetes/pkg/apis/batch" 33 "k8s.io/kubernetes/test/e2e/framework" 34 e2ejob "k8s.io/kubernetes/test/e2e/framework/job" 35 e2enode "k8s.io/kubernetes/test/e2e/framework/node" 36 e2epod "k8s.io/kubernetes/test/e2e/framework/pod" 37 e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" 38 "k8s.io/utils/pointer" 39 40 "github.com/onsi/ginkgo" 41 "github.com/onsi/gomega" 42) 43 44var _ = SIGDescribe("Job", func() { 45 f := framework.NewDefaultFramework("job") 46 parallelism := int32(2) 47 completions := int32(4) 48 backoffLimit := int32(6) // default value 49 50 // Simplest case: N pods succeed 51 ginkgo.It("should run a job to completion when tasks succeed", func() { 52 ginkgo.By("Creating a job") 53 job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) 54 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 55 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 56 57 ginkgo.By("Ensuring job reaches completions") 58 err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions) 59 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) 60 61 ginkgo.By("Ensuring pods for job exist") 62 pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) 63 framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name) 64 successes := int32(0) 65 for _, pod := range pods.Items { 66 if pod.Status.Phase == v1.PodSucceeded { 67 successes++ 68 } 69 } 70 framework.ExpectEqual(successes, completions, "epected %d successful job pods, but got %d", completions, successes) 71 }) 72 73 ginkgo.It("should not create pods when created in suspend state", func() { 74 ginkgo.By("Creating a job with suspend=true") 75 job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) 76 job.Spec.Suspend = pointer.BoolPtr(true) 77 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 78 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 79 80 ginkgo.By("Ensuring pods aren't created for job") 81 framework.ExpectEqual(wait.Poll(framework.Poll, wait.ForeverTestTimeout, func() (bool, error) { 82 pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) 83 if err != nil { 84 return false, err 85 } 86 return len(pods.Items) > 0, nil 87 }), wait.ErrWaitTimeout) 88 89 ginkgo.By("Checking Job status to observe Suspended state") 90 job, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name) 91 framework.ExpectNoError(err, "failed to retrieve latest job object") 92 exists := false 93 for _, c := range job.Status.Conditions { 94 if c.Type == batchv1.JobSuspended { 95 exists = true 96 break 97 } 98 } 99 framework.ExpectEqual(exists, true) 100 101 ginkgo.By("Updating the job with suspend=false") 102 job.Spec.Suspend = pointer.BoolPtr(false) 103 job, err = e2ejob.UpdateJob(f.ClientSet, f.Namespace.Name, job) 104 framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name) 105 106 ginkgo.By("Waiting for job to complete") 107 err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions) 108 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) 109 }) 110 111 ginkgo.It("should delete pods when suspended", func() { 112 ginkgo.By("Creating a job with suspend=false") 113 job := e2ejob.NewTestJob("notTerminate", "suspend-false-to-true", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) 114 job.Spec.Suspend = pointer.BoolPtr(false) 115 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 116 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 117 118 ginkgo.By("Ensure pods equal to paralellism count is attached to the job") 119 err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) 120 framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name) 121 122 ginkgo.By("Updating the job with suspend=true") 123 job, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name) 124 framework.ExpectNoError(err, "failed to retrieve latest job object") 125 job.Spec.Suspend = pointer.BoolPtr(true) 126 job, err = e2ejob.UpdateJob(f.ClientSet, f.Namespace.Name, job) 127 framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name) 128 129 ginkgo.By("Ensuring pods are deleted") 130 err = e2ejob.WaitForAllJobPodsGone(f.ClientSet, f.Namespace.Name, job.Name) 131 framework.ExpectNoError(err, "failed to ensure pods are deleted after suspend=true") 132 133 ginkgo.By("Checking Job status to observe Suspended state") 134 job, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name) 135 framework.ExpectNoError(err, "failed to retrieve latest job object") 136 exists := false 137 for _, c := range job.Status.Conditions { 138 if c.Type == batchv1.JobSuspended { 139 exists = true 140 break 141 } 142 } 143 framework.ExpectEqual(exists, true) 144 }) 145 146 /* 147 Testcase: Ensure Pods of an Indexed Job get a unique index. 148 Description: Create an Indexed Job, wait for completion, capture the output of the pods and verify that they contain the completion index. 149 */ 150 ginkgo.It("should create pods for an Indexed job with completion indexes and specified hostname", func() { 151 ginkgo.By("Creating Indexed job") 152 job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) 153 mode := batchv1.IndexedCompletion 154 job.Spec.CompletionMode = &mode 155 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 156 framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name) 157 158 ginkgo.By("Ensuring job reaches completions") 159 err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions) 160 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) 161 162 ginkgo.By("Ensuring pods with index for job exist") 163 pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) 164 framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name) 165 succeededIndexes := sets.NewInt() 166 for _, pod := range pods.Items { 167 if pod.Status.Phase == v1.PodSucceeded && pod.Annotations != nil { 168 ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotation]) 169 framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name) 170 succeededIndexes.Insert(ix) 171 expectedName := fmt.Sprintf("%s-%d", job.Name, ix) 172 framework.ExpectEqual(pod.Spec.Hostname, expectedName, "expected completed pod with hostname %s, but got %s", expectedName, pod.Spec.Hostname) 173 } 174 } 175 gotIndexes := succeededIndexes.List() 176 wantIndexes := []int{0, 1, 2, 3} 177 framework.ExpectEqual(gotIndexes, wantIndexes, "expected completed indexes %s, but got %s", wantIndexes, gotIndexes) 178 }) 179 180 /* 181 Testcase: Ensure that the pods associated with the job are removed once the job is deleted 182 Description: Create a job and ensure the associated pod count is equal to paralellism count. Delete the 183 job and ensure if the pods associated with the job have been removed 184 */ 185 ginkgo.It("should remove pods when job is deleted", func() { 186 ginkgo.By("Creating a job") 187 job := e2ejob.NewTestJob("notTerminate", "all-pods-removed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) 188 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 189 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 190 191 ginkgo.By("Ensure pods equal to paralellism count is attached to the job") 192 err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) 193 framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name) 194 195 ginkgo.By("Delete the job") 196 err = e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name) 197 framework.ExpectNoError(err, "failed to delete the job in namespace: %s", f.Namespace.Name) 198 199 ginkgo.By("Ensure the pods associated with the job are also deleted") 200 err = e2ejob.WaitForAllJobPodsGone(f.ClientSet, f.Namespace.Name, job.Name) 201 framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name) 202 }) 203 204 /* 205 Release: v1.16 206 Testname: Jobs, completion after task failure 207 Description: Explicitly cause the tasks to fail once initially. After restarting, the Job MUST 208 execute to completion. 209 */ 210 framework.ConformanceIt("should run a job to completion when tasks sometimes fail and are locally restarted", func() { 211 ginkgo.By("Creating a job") 212 // One failure, then a success, local restarts. 213 // We can't use the random failure approach, because kubelet will 214 // throttle frequently failing containers in a given pod, ramping 215 // up to 5 minutes between restarts, making test timeout due to 216 // successive failures too likely with a reasonable test timeout. 217 job := e2ejob.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit) 218 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 219 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 220 221 ginkgo.By("Ensuring job reaches completions") 222 err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions) 223 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) 224 }) 225 226 // Pods sometimes fail, but eventually succeed, after pod restarts 227 ginkgo.It("should run a job to completion when tasks sometimes fail and are not locally restarted", func() { 228 // One failure, then a success, no local restarts. 229 // We can't use the random failure approach, because JobController 230 // will throttle frequently failing Pods of a given Job, ramping 231 // up to 6 minutes between restarts, making test timeout due to 232 // successive failures. 233 // Instead, we force the Job's Pods to be scheduled to a single Node 234 // and use a hostPath volume to persist data across new Pods. 235 ginkgo.By("Looking for a node to schedule job pod") 236 node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet) 237 framework.ExpectNoError(err) 238 239 ginkgo.By("Creating a job") 240 job := e2ejob.NewTestJobOnNode("failOnce", "fail-once-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name) 241 job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 242 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 243 244 ginkgo.By("Ensuring job reaches completions") 245 err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions) 246 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) 247 }) 248 249 ginkgo.It("should fail when exceeds active deadline", func() { 250 ginkgo.By("Creating a job") 251 var activeDeadlineSeconds int64 = 1 252 job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit) 253 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 254 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 255 ginkgo.By("Ensuring job past active deadline") 256 err = waitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+15)*time.Second, "DeadlineExceeded") 257 framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name) 258 }) 259 260 /* 261 Release: v1.15 262 Testname: Jobs, active pods, graceful termination 263 Description: Create a job. Ensure the active pods reflect paralellism in the namespace and delete the job. Job MUST be deleted successfully. 264 */ 265 framework.ConformanceIt("should delete a job", func() { 266 ginkgo.By("Creating a job") 267 job := e2ejob.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) 268 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 269 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 270 271 ginkgo.By("Ensuring active pods == parallelism") 272 err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) 273 framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name) 274 275 ginkgo.By("delete a job") 276 framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name)) 277 278 ginkgo.By("Ensuring job was deleted") 279 _, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name) 280 framework.ExpectError(err, "failed to ensure job %s was deleted in namespace: %s", job.Name, f.Namespace.Name) 281 framework.ExpectEqual(apierrors.IsNotFound(err), true) 282 }) 283 284 /* 285 Release: v1.16 286 Testname: Jobs, orphan pods, re-adoption 287 Description: Create a parallel job. The number of Pods MUST equal the level of parallelism. 288 Orphan a Pod by modifying its owner reference. The Job MUST re-adopt the orphan pod. 289 Modify the labels of one of the Job's Pods. The Job MUST release the Pod. 290 */ 291 framework.ConformanceIt("should adopt matching orphans and release non-matching pods", func() { 292 ginkgo.By("Creating a job") 293 job := e2ejob.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) 294 // Replace job with the one returned from Create() so it has the UID. 295 // Save Kind since it won't be populated in the returned job. 296 kind := job.Kind 297 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 298 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 299 job.Kind = kind 300 301 ginkgo.By("Ensuring active pods == parallelism") 302 err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) 303 framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name) 304 305 ginkgo.By("Orphaning one of the Job's Pods") 306 pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) 307 framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name) 308 gomega.Expect(pods.Items).To(gomega.HaveLen(int(parallelism))) 309 pod := pods.Items[0] 310 f.PodClient().Update(pod.Name, func(pod *v1.Pod) { 311 pod.OwnerReferences = nil 312 }) 313 314 ginkgo.By("Checking that the Job readopts the Pod") 315 gomega.Expect(e2epod.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", e2ejob.JobTimeout, 316 func(pod *v1.Pod) (bool, error) { 317 controllerRef := metav1.GetControllerOf(pod) 318 if controllerRef == nil { 319 return false, nil 320 } 321 if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID { 322 return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job) 323 } 324 return true, nil 325 }, 326 )).To(gomega.Succeed(), "wait for pod %q to be readopted", pod.Name) 327 328 ginkgo.By("Removing the labels from the Job's Pod") 329 f.PodClient().Update(pod.Name, func(pod *v1.Pod) { 330 pod.Labels = nil 331 }) 332 333 ginkgo.By("Checking that the Job releases the Pod") 334 gomega.Expect(e2epod.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", e2ejob.JobTimeout, 335 func(pod *v1.Pod) (bool, error) { 336 controllerRef := metav1.GetControllerOf(pod) 337 if controllerRef != nil { 338 return false, nil 339 } 340 return true, nil 341 }, 342 )).To(gomega.Succeed(), "wait for pod %q to be released", pod.Name) 343 }) 344 345 ginkgo.It("should fail to exceed backoffLimit", func() { 346 ginkgo.By("Creating a job") 347 backoff := 1 348 job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff)) 349 job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) 350 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) 351 ginkgo.By("Ensuring job exceed backofflimit") 352 353 err = waitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, e2ejob.JobTimeout, "BackoffLimitExceeded") 354 framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name) 355 356 ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1)) 357 pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) 358 framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name) 359 gomega.Expect(pods.Items).To(gomega.HaveLen(backoff + 1)) 360 for _, pod := range pods.Items { 361 framework.ExpectEqual(pod.Status.Phase, v1.PodFailed) 362 } 363 }) 364}) 365 366// waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail. 367func waitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error { 368 return wait.Poll(framework.Poll, timeout, func() (bool, error) { 369 curr, err := c.BatchV1().Jobs(ns).Get(context.TODO(), jobName, metav1.GetOptions{}) 370 if err != nil { 371 return false, err 372 } 373 for _, c := range curr.Status.Conditions { 374 if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue { 375 if reason == "" || reason == c.Reason { 376 return true, nil 377 } 378 } 379 } 380 return false, nil 381 }) 382} 383