17package cronjob
20I did not use watch or expectations.  Those add a lot of corner cases, and we aren't
21expecting a large volume of jobs or cronJobs.  (We are favoring correctness
22over scalability.  If we find a single controller thread is too slow because
23there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
24If we find the load on the API server is too high, we can use a watch and
27Just periodically list jobs and cronJobs, and then reconcile them.
30import (
31	"context"
32	"fmt"
33	"sort"
34	"time"
36	"k8s.io/klog/v2"
38	batchv1 "k8s.io/api/batch/v1"
39	v1 "k8s.io/api/core/v1"
40	"k8s.io/apimachinery/pkg/api/errors"
41	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42	"k8s.io/apimachinery/pkg/runtime"
43	"k8s.io/apimachinery/pkg/types"
44	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
45	"k8s.io/apimachinery/pkg/util/wait"
46	clientset "k8s.io/client-go/kubernetes"
47	"k8s.io/client-go/kubernetes/scheme"
48	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
49	"k8s.io/client-go/tools/pager"
50	"k8s.io/client-go/tools/record"
51	ref "k8s.io/client-go/tools/reference"
52	"k8s.io/component-base/metrics/prometheus/ratelimiter"
55// Utilities for dealing with Jobs and CronJobs and time.
57// controllerKind contains the schema.GroupVersionKind for this controller type.
58var controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
60// Controller is a controller for CronJobs.
61type Controller struct {
62	kubeClient clientset.Interface
63	jobControl jobControlInterface
64	cjControl  cjControlInterface
65	podControl podControlInterface
66	recorder   record.EventRecorder
69// NewController creates and initializes a new Controller.
70func NewController(kubeClient clientset.Interface) (*Controller, error) {
71	eventBroadcaster := record.NewBroadcaster()
72	eventBroadcaster.StartStructuredLogging(0)
73	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
75	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
76		if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
77			return nil, err
78		}
79	}
81	jm := &Controller{
82		kubeClient: kubeClient,
83		jobControl: realJobControl{KubeClient: kubeClient},
84		cjControl:  &realCJControl{KubeClient: kubeClient},
85		podControl: &realPodControl{KubeClient: kubeClient},
86		recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
87	}
89	return jm, nil
92// Run starts the main goroutine responsible for watching and syncing jobs.
93func (jm *Controller) Run(stopCh <-chan struct{}) {
94	defer utilruntime.HandleCrash()
95	klog.Infof("Starting CronJob Manager")
96	// Check things every 10 second.
97	go wait.Until(jm.syncAll, 10*time.Second, stopCh)
98	<-stopCh
99	klog.Infof("Shutting down CronJob Manager")
102// syncAll lists all the CronJobs and Jobs and reconciles them.
103func (jm *Controller) syncAll() {
104	// List children (Jobs) before parents (CronJob).
105	// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
106	// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
107	// Note that this only works because we are NOT using any caches here.
108	jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
109		return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
110	}
112	js := make([]batchv1.Job, 0)
113	err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
114		jobTmp, ok := object.(*batchv1.Job)
115		if !ok {
116			return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
117		}
118		js = append(js, *jobTmp)
119		return nil
120	})
121	if err != nil {
122		utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
123		return
124	}
125	klog.V(4).Infof("Found %d jobs", len(js))
127	jobsByCj := groupJobsByParent(js)
128	klog.V(4).Infof("Found %d groups", len(jobsByCj))
130	err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
131		return jm.kubeClient.BatchV1().CronJobs(metav1.NamespaceAll).List(ctx, opts)
132	}).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
133		cj, ok := object.(*batchv1.CronJob)
134		if !ok {
135			return fmt.Errorf("expected type *batchv1.CronJob, got type %T", cj)
136		}
137		syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
138		cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
139		return nil
140	})
142	if err != nil {
143		utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
144		return
145	}
148// cleanupFinishedJobs cleanups finished jobs created by a CronJob
149func cleanupFinishedJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface,
150	cjc cjControlInterface, recorder record.EventRecorder) {
151	// If neither limits are active, there is no need to do anything.
152	if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
153		return
154	}
156	failedJobs := []batchv1.Job{}
157	successfulJobs := []batchv1.Job{}
159	for _, job := range js {
160		isFinished, finishedStatus := getFinishedStatus(&job)
161		if isFinished && finishedStatus == batchv1.JobComplete {
162			successfulJobs = append(successfulJobs, job)
163		} else if isFinished && finishedStatus == batchv1.JobFailed {
164			failedJobs = append(failedJobs, job)
165		}
166	}
168	if cj.Spec.SuccessfulJobsHistoryLimit != nil {
169		removeOldestJobs(cj,
170			successfulJobs,
171			jc,
172			*cj.Spec.SuccessfulJobsHistoryLimit,
173			recorder)
174	}
176	if cj.Spec.FailedJobsHistoryLimit != nil {
177		removeOldestJobs(cj,
178			failedJobs,
179			jc,
180			*cj.Spec.FailedJobsHistoryLimit,
181			recorder)
182	}
184	// Update the CronJob, in case jobs were removed from the list.
185	if _, err := cjc.UpdateStatus(cj); err != nil {
186		nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
187		klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
188	}
191// removeOldestJobs removes the oldest jobs from a list of jobs
192func removeOldestJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
193	numToDelete := len(js) - int(maxJobs)
194	if numToDelete <= 0 {
195		return
196	}
198	nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
199	klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
201	sort.Sort(byJobStartTime(js))
202	for i := 0; i < numToDelete; i++ {
203		klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
204		deleteJob(cj, &js[i], jc, recorder)
205	}
208// syncOne reconciles a CronJob with a list of any Jobs that it created.
209// All known jobs created by "cj" should be included in "js".
210// The current time is passed in to facilitate testing.
211// It has no receiver, to facilitate testing.
212func syncOne(cj *batchv1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
213	nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
215	childrenJobs := make(map[types.UID]bool)
216	for _, j := range js {
217		childrenJobs[j.ObjectMeta.UID] = true
218		found := inActiveList(*cj, j.ObjectMeta.UID)
219		if !found && !IsJobFinished(&j) {
220			recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
221			// We found an unfinished job that has us as the parent, but it is not in our Active list.
222			// This could happen if we crashed right after creating the Job and before updating the status,
223			// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
224			// a job that they wanted us to adopt.
226			// TODO: maybe handle the adoption case?  Concurrency/suspend rules will not apply in that case, obviously, since we can't
227			// stop users from creating jobs if they have permission.  It is assumed that if a
228			// user has permission to create a job within a namespace, then they have permission to make any cronJob
229			// in the same namespace "adopt" that job.  ReplicaSets and their Pods work the same way.
230			// TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
231		} else if found && IsJobFinished(&j) {
232			_, status := getFinishedStatus(&j)
233			deleteFromActiveList(cj, j.ObjectMeta.UID)
234			recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
235		}
236	}
238	// Remove any job reference from the active list if the corresponding job does not exist any more.
239	// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
240	// job running.
241	for _, j := range cj.Status.Active {
242		if found := childrenJobs[j.UID]; !found {
243			recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
244			deleteFromActiveList(cj, j.UID)
245		}
246	}
248	updatedCJ, err := cjc.UpdateStatus(cj)
249	if err != nil {
250		klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
251		return
252	}
253	*cj = *updatedCJ
255	if cj.DeletionTimestamp != nil {
256		// The CronJob is being deleted.
257		// Don't do anything other than updating status.
258		return
259	}
261	if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
262		klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
263		return
264	}
266	times, err := getRecentUnmetScheduleTimes(*cj, now)
267	if err != nil {
268		recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
269		klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
270		return
271	}
272	// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
273	if len(times) == 0 {
274		klog.V(4).Infof("No unmet start times for %s", nameForLog)
275		return
276	}
277	if len(times) > 1 {
278		klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
279	}
281	scheduledTime := times[len(times)-1]
282	tooLate := false
283	if cj.Spec.StartingDeadlineSeconds != nil {
284		tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
285	}
286	if tooLate {
287		klog.V(4).Infof("Missed starting window for %s", nameForLog)
288		recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
289		// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
290		// the miss every cycle.  In order to avoid sending multiple events, and to avoid processing
291		// the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
292		// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
293		// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
294		// and event the next time we process it, and also so the user looking at the status
295		// can see easily that there was a missed execution.
296		return
297	}
298	if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
299		// Regardless which source of information we use for the set of active jobs,
300		// there is some risk that we won't see an active job when there is one.
301		// (because we haven't seen the status update to the SJ or the created pod).
302		// So it is theoretically possible to have concurrency with Forbid.
303		// As long the as the invocations are "far enough apart in time", this usually won't happen.
304		//
305		// TODO: for Forbid, we could use the same name for every execution, as a lock.
306		// With replace, we could use a name that is deterministic per execution time.
307		// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
308		klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
309		return
310	}
311	if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
312		for _, j := range cj.Status.Active {
313			klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
315			job, err := jc.GetJob(j.Namespace, j.Name)
316			if err != nil {
317				recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
318				return
319			}
320			if !deleteJob(cj, job, jc, recorder) {
321				return
322			}
323		}
324	}
326	jobReq, err := getJobFromTemplate(cj, scheduledTime)
327	if err != nil {
328		klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
329		return
330	}
331	jobResp, err := jc.CreateJob(cj.Namespace, jobReq)
332	if err != nil {
333		// If the namespace is being torn down, we can safely ignore
334		// this error since all subsequent creations will fail.
335		if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
336			recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
337		}
338		return
339	}
340	klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
341	recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
343	// ------------------------------------------------------------------ //
345	// If this process restarts at this point (after posting a job, but
346	// before updating the status), then we might try to start the job on
347	// the next time.  Actually, if we re-list the SJs and Jobs on the next
348	// iteration of syncAll, we might not see our own status update, and
349	// then post one again.  So, we need to use the job name as a lock to
350	// prevent us from making the job twice (name the job with hash of its
351	// scheduled time).
353	// Add the just-started job to the status list.
354	ref, err := getRef(jobResp)
355	if err != nil {
356		klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
357	} else {
358		cj.Status.Active = append(cj.Status.Active, *ref)
359	}
360	cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
361	if _, err := cjc.UpdateStatus(cj); err != nil {
362		klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
363	}
365	return
368// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
369func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
370	nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
372	// delete the job itself...
373	if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
374		recorder.Eventf(cj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
375		klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
376		return false
377	}
378	// ... and its reference from active list
379	deleteFromActiveList(cj, job.ObjectMeta.UID)
380	recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
382	return true
385func getRef(object runtime.Object) (*v1.ObjectReference, error) {
386	return ref.GetReference(scheme.Scheme, object)