1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cronjob 18 19/* 20I did not use watch or expectations. Those add a lot of corner cases, and we aren't 21expecting a large volume of jobs or cronJobs. (We are favoring correctness 22over scalability. If we find a single controller thread is too slow because 23there are a lot of Jobs or CronJobs, we can parallelize by Namespace. 24If we find the load on the API server is too high, we can use a watch and 25UndeltaStore.) 26 27Just periodically list jobs and cronJobs, and then reconcile them. 28*/ 29 30import ( 31 "context" 32 "fmt" 33 "sort" 34 "time" 35 36 "k8s.io/klog/v2" 37 38 batchv1 "k8s.io/api/batch/v1" 39 v1 "k8s.io/api/core/v1" 40 "k8s.io/apimachinery/pkg/api/errors" 41 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 42 "k8s.io/apimachinery/pkg/runtime" 43 "k8s.io/apimachinery/pkg/types" 44 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 45 "k8s.io/apimachinery/pkg/util/wait" 46 clientset "k8s.io/client-go/kubernetes" 47 "k8s.io/client-go/kubernetes/scheme" 48 v1core "k8s.io/client-go/kubernetes/typed/core/v1" 49 "k8s.io/client-go/tools/pager" 50 "k8s.io/client-go/tools/record" 51 ref "k8s.io/client-go/tools/reference" 52 "k8s.io/component-base/metrics/prometheus/ratelimiter" 53) 54 55// Utilities for dealing with Jobs and CronJobs and time. 56 57// controllerKind contains the schema.GroupVersionKind for this controller type. 58var controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob") 59 60// Controller is a controller for CronJobs. 61type Controller struct { 62 kubeClient clientset.Interface 63 jobControl jobControlInterface 64 cjControl cjControlInterface 65 podControl podControlInterface 66 recorder record.EventRecorder 67} 68 69// NewController creates and initializes a new Controller. 70func NewController(kubeClient clientset.Interface) (*Controller, error) { 71 eventBroadcaster := record.NewBroadcaster() 72 eventBroadcaster.StartStructuredLogging(0) 73 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) 74 75 if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { 76 if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { 77 return nil, err 78 } 79 } 80 81 jm := &Controller{ 82 kubeClient: kubeClient, 83 jobControl: realJobControl{KubeClient: kubeClient}, 84 cjControl: &realCJControl{KubeClient: kubeClient}, 85 podControl: &realPodControl{KubeClient: kubeClient}, 86 recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}), 87 } 88 89 return jm, nil 90} 91 92// Run starts the main goroutine responsible for watching and syncing jobs. 93func (jm *Controller) Run(stopCh <-chan struct{}) { 94 defer utilruntime.HandleCrash() 95 klog.Infof("Starting CronJob Manager") 96 // Check things every 10 second. 97 go wait.Until(jm.syncAll, 10*time.Second, stopCh) 98 <-stopCh 99 klog.Infof("Shutting down CronJob Manager") 100} 101 102// syncAll lists all the CronJobs and Jobs and reconciles them. 103func (jm *Controller) syncAll() { 104 // List children (Jobs) before parents (CronJob). 105 // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer, 106 // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639). 107 // Note that this only works because we are NOT using any caches here. 108 jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) { 109 return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts) 110 } 111 112 js := make([]batchv1.Job, 0) 113 err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { 114 jobTmp, ok := object.(*batchv1.Job) 115 if !ok { 116 return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp) 117 } 118 js = append(js, *jobTmp) 119 return nil 120 }) 121 if err != nil { 122 utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err)) 123 return 124 } 125 klog.V(4).Infof("Found %d jobs", len(js)) 126 127 jobsByCj := groupJobsByParent(js) 128 klog.V(4).Infof("Found %d groups", len(jobsByCj)) 129 130 err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { 131 return jm.kubeClient.BatchV1().CronJobs(metav1.NamespaceAll).List(ctx, opts) 132 }).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { 133 cj, ok := object.(*batchv1.CronJob) 134 if !ok { 135 return fmt.Errorf("expected type *batchv1.CronJob, got type %T", cj) 136 } 137 syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder) 138 cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder) 139 return nil 140 }) 141 142 if err != nil { 143 utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err)) 144 return 145 } 146} 147 148// cleanupFinishedJobs cleanups finished jobs created by a CronJob 149func cleanupFinishedJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, 150 cjc cjControlInterface, recorder record.EventRecorder) { 151 // If neither limits are active, there is no need to do anything. 152 if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { 153 return 154 } 155 156 failedJobs := []batchv1.Job{} 157 successfulJobs := []batchv1.Job{} 158 159 for _, job := range js { 160 isFinished, finishedStatus := getFinishedStatus(&job) 161 if isFinished && finishedStatus == batchv1.JobComplete { 162 successfulJobs = append(successfulJobs, job) 163 } else if isFinished && finishedStatus == batchv1.JobFailed { 164 failedJobs = append(failedJobs, job) 165 } 166 } 167 168 if cj.Spec.SuccessfulJobsHistoryLimit != nil { 169 removeOldestJobs(cj, 170 successfulJobs, 171 jc, 172 *cj.Spec.SuccessfulJobsHistoryLimit, 173 recorder) 174 } 175 176 if cj.Spec.FailedJobsHistoryLimit != nil { 177 removeOldestJobs(cj, 178 failedJobs, 179 jc, 180 *cj.Spec.FailedJobsHistoryLimit, 181 recorder) 182 } 183 184 // Update the CronJob, in case jobs were removed from the list. 185 if _, err := cjc.UpdateStatus(cj); err != nil { 186 nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) 187 klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) 188 } 189} 190 191// removeOldestJobs removes the oldest jobs from a list of jobs 192func removeOldestJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) { 193 numToDelete := len(js) - int(maxJobs) 194 if numToDelete <= 0 { 195 return 196 } 197 198 nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) 199 klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog) 200 201 sort.Sort(byJobStartTime(js)) 202 for i := 0; i < numToDelete; i++ { 203 klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog) 204 deleteJob(cj, &js[i], jc, recorder) 205 } 206} 207 208// syncOne reconciles a CronJob with a list of any Jobs that it created. 209// All known jobs created by "cj" should be included in "js". 210// The current time is passed in to facilitate testing. 211// It has no receiver, to facilitate testing. 212func syncOne(cj *batchv1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) { 213 nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) 214 215 childrenJobs := make(map[types.UID]bool) 216 for _, j := range js { 217 childrenJobs[j.ObjectMeta.UID] = true 218 found := inActiveList(*cj, j.ObjectMeta.UID) 219 if !found && !IsJobFinished(&j) { 220 recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) 221 // We found an unfinished job that has us as the parent, but it is not in our Active list. 222 // This could happen if we crashed right after creating the Job and before updating the status, 223 // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created 224 // a job that they wanted us to adopt. 225 226 // TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't 227 // stop users from creating jobs if they have permission. It is assumed that if a 228 // user has permission to create a job within a namespace, then they have permission to make any cronJob 229 // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. 230 // TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about? 231 } else if found && IsJobFinished(&j) { 232 _, status := getFinishedStatus(&j) 233 deleteFromActiveList(cj, j.ObjectMeta.UID) 234 recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) 235 } 236 } 237 238 // Remove any job reference from the active list if the corresponding job does not exist any more. 239 // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching 240 // job running. 241 for _, j := range cj.Status.Active { 242 if found := childrenJobs[j.UID]; !found { 243 recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) 244 deleteFromActiveList(cj, j.UID) 245 } 246 } 247 248 updatedCJ, err := cjc.UpdateStatus(cj) 249 if err != nil { 250 klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) 251 return 252 } 253 *cj = *updatedCJ 254 255 if cj.DeletionTimestamp != nil { 256 // The CronJob is being deleted. 257 // Don't do anything other than updating status. 258 return 259 } 260 261 if cj.Spec.Suspend != nil && *cj.Spec.Suspend { 262 klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) 263 return 264 } 265 266 times, err := getRecentUnmetScheduleTimes(*cj, now) 267 if err != nil { 268 recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) 269 klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) 270 return 271 } 272 // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed. 273 if len(times) == 0 { 274 klog.V(4).Infof("No unmet start times for %s", nameForLog) 275 return 276 } 277 if len(times) > 1 { 278 klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) 279 } 280 281 scheduledTime := times[len(times)-1] 282 tooLate := false 283 if cj.Spec.StartingDeadlineSeconds != nil { 284 tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now) 285 } 286 if tooLate { 287 klog.V(4).Infof("Missed starting window for %s", nameForLog) 288 recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z)) 289 // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing 290 // the miss every cycle. In order to avoid sending multiple events, and to avoid processing 291 // the cj again and again, we could set a Status.LastMissedTime when we notice a miss. 292 // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp, 293 // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate 294 // and event the next time we process it, and also so the user looking at the status 295 // can see easily that there was a missed execution. 296 return 297 } 298 if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 { 299 // Regardless which source of information we use for the set of active jobs, 300 // there is some risk that we won't see an active job when there is one. 301 // (because we haven't seen the status update to the SJ or the created pod). 302 // So it is theoretically possible to have concurrency with Forbid. 303 // As long the as the invocations are "far enough apart in time", this usually won't happen. 304 // 305 // TODO: for Forbid, we could use the same name for every execution, as a lock. 306 // With replace, we could use a name that is deterministic per execution time. 307 // But that would mean that you could not inspect prior successes or failures of Forbid jobs. 308 klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) 309 return 310 } 311 if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent { 312 for _, j := range cj.Status.Active { 313 klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) 314 315 job, err := jc.GetJob(j.Namespace, j.Name) 316 if err != nil { 317 recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) 318 return 319 } 320 if !deleteJob(cj, job, jc, recorder) { 321 return 322 } 323 } 324 } 325 326 jobReq, err := getJobFromTemplate(cj, scheduledTime) 327 if err != nil { 328 klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) 329 return 330 } 331 jobResp, err := jc.CreateJob(cj.Namespace, jobReq) 332 if err != nil { 333 // If the namespace is being torn down, we can safely ignore 334 // this error since all subsequent creations will fail. 335 if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { 336 recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) 337 } 338 return 339 } 340 klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) 341 recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) 342 343 // ------------------------------------------------------------------ // 344 345 // If this process restarts at this point (after posting a job, but 346 // before updating the status), then we might try to start the job on 347 // the next time. Actually, if we re-list the SJs and Jobs on the next 348 // iteration of syncAll, we might not see our own status update, and 349 // then post one again. So, we need to use the job name as a lock to 350 // prevent us from making the job twice (name the job with hash of its 351 // scheduled time). 352 353 // Add the just-started job to the status list. 354 ref, err := getRef(jobResp) 355 if err != nil { 356 klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) 357 } else { 358 cj.Status.Active = append(cj.Status.Active, *ref) 359 } 360 cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} 361 if _, err := cjc.UpdateStatus(cj); err != nil { 362 klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) 363 } 364 365 return 366} 367 368// deleteJob reaps a job, deleting the job, the pods and the reference in the active list 369func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { 370 nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) 371 372 // delete the job itself... 373 if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { 374 recorder.Eventf(cj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) 375 klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) 376 return false 377 } 378 // ... and its reference from active list 379 deleteFromActiveList(cj, job.ObjectMeta.UID) 380 recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name) 381 382 return true 383} 384 385func getRef(object runtime.Object) (*v1.ObjectReference, error) { 386 return ref.GetReference(scheme.Scheme, object) 387} 388