1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cronjob
18
19/*
20I did not use watch or expectations.  Those add a lot of corner cases, and we aren't
21expecting a large volume of jobs or cronJobs.  (We are favoring correctness
22over scalability.  If we find a single controller thread is too slow because
23there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
24If we find the load on the API server is too high, we can use a watch and
25UndeltaStore.)
26
27Just periodically list jobs and cronJobs, and then reconcile them.
28*/
29
30import (
31	"context"
32	"fmt"
33	"sort"
34	"time"
35
36	"k8s.io/klog/v2"
37
38	batchv1 "k8s.io/api/batch/v1"
39	v1 "k8s.io/api/core/v1"
40	"k8s.io/apimachinery/pkg/api/errors"
41	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42	"k8s.io/apimachinery/pkg/runtime"
43	"k8s.io/apimachinery/pkg/types"
44	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
45	"k8s.io/apimachinery/pkg/util/wait"
46	clientset "k8s.io/client-go/kubernetes"
47	"k8s.io/client-go/kubernetes/scheme"
48	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
49	"k8s.io/client-go/tools/pager"
50	"k8s.io/client-go/tools/record"
51	ref "k8s.io/client-go/tools/reference"
52	"k8s.io/component-base/metrics/prometheus/ratelimiter"
53)
54
55// Utilities for dealing with Jobs and CronJobs and time.
56
57// controllerKind contains the schema.GroupVersionKind for this controller type.
58var controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
59
60// Controller is a controller for CronJobs.
61type Controller struct {
62	kubeClient clientset.Interface
63	jobControl jobControlInterface
64	cjControl  cjControlInterface
65	podControl podControlInterface
66	recorder   record.EventRecorder
67}
68
69// NewController creates and initializes a new Controller.
70func NewController(kubeClient clientset.Interface) (*Controller, error) {
71	eventBroadcaster := record.NewBroadcaster()
72	eventBroadcaster.StartStructuredLogging(0)
73	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
74
75	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
76		if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
77			return nil, err
78		}
79	}
80
81	jm := &Controller{
82		kubeClient: kubeClient,
83		jobControl: realJobControl{KubeClient: kubeClient},
84		cjControl:  &realCJControl{KubeClient: kubeClient},
85		podControl: &realPodControl{KubeClient: kubeClient},
86		recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
87	}
88
89	return jm, nil
90}
91
92// Run starts the main goroutine responsible for watching and syncing jobs.
93func (jm *Controller) Run(stopCh <-chan struct{}) {
94	defer utilruntime.HandleCrash()
95	klog.Infof("Starting CronJob Manager")
96	// Check things every 10 second.
97	go wait.Until(jm.syncAll, 10*time.Second, stopCh)
98	<-stopCh
99	klog.Infof("Shutting down CronJob Manager")
100}
101
102// syncAll lists all the CronJobs and Jobs and reconciles them.
103func (jm *Controller) syncAll() {
104	// List children (Jobs) before parents (CronJob).
105	// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
106	// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
107	// Note that this only works because we are NOT using any caches here.
108	jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
109		return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
110	}
111
112	js := make([]batchv1.Job, 0)
113	err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
114		jobTmp, ok := object.(*batchv1.Job)
115		if !ok {
116			return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
117		}
118		js = append(js, *jobTmp)
119		return nil
120	})
121	if err != nil {
122		utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
123		return
124	}
125	klog.V(4).Infof("Found %d jobs", len(js))
126
127	jobsByCj := groupJobsByParent(js)
128	klog.V(4).Infof("Found %d groups", len(jobsByCj))
129
130	err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
131		return jm.kubeClient.BatchV1().CronJobs(metav1.NamespaceAll).List(ctx, opts)
132	}).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
133		cj, ok := object.(*batchv1.CronJob)
134		if !ok {
135			return fmt.Errorf("expected type *batchv1.CronJob, got type %T", cj)
136		}
137		syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
138		cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
139		return nil
140	})
141
142	if err != nil {
143		utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
144		return
145	}
146}
147
148// cleanupFinishedJobs cleanups finished jobs created by a CronJob
149func cleanupFinishedJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface,
150	cjc cjControlInterface, recorder record.EventRecorder) {
151	// If neither limits are active, there is no need to do anything.
152	if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
153		return
154	}
155
156	failedJobs := []batchv1.Job{}
157	successfulJobs := []batchv1.Job{}
158
159	for _, job := range js {
160		isFinished, finishedStatus := getFinishedStatus(&job)
161		if isFinished && finishedStatus == batchv1.JobComplete {
162			successfulJobs = append(successfulJobs, job)
163		} else if isFinished && finishedStatus == batchv1.JobFailed {
164			failedJobs = append(failedJobs, job)
165		}
166	}
167
168	if cj.Spec.SuccessfulJobsHistoryLimit != nil {
169		removeOldestJobs(cj,
170			successfulJobs,
171			jc,
172			*cj.Spec.SuccessfulJobsHistoryLimit,
173			recorder)
174	}
175
176	if cj.Spec.FailedJobsHistoryLimit != nil {
177		removeOldestJobs(cj,
178			failedJobs,
179			jc,
180			*cj.Spec.FailedJobsHistoryLimit,
181			recorder)
182	}
183
184	// Update the CronJob, in case jobs were removed from the list.
185	if _, err := cjc.UpdateStatus(cj); err != nil {
186		nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
187		klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
188	}
189}
190
191// removeOldestJobs removes the oldest jobs from a list of jobs
192func removeOldestJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
193	numToDelete := len(js) - int(maxJobs)
194	if numToDelete <= 0 {
195		return
196	}
197
198	nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
199	klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
200
201	sort.Sort(byJobStartTime(js))
202	for i := 0; i < numToDelete; i++ {
203		klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
204		deleteJob(cj, &js[i], jc, recorder)
205	}
206}
207
208// syncOne reconciles a CronJob with a list of any Jobs that it created.
209// All known jobs created by "cj" should be included in "js".
210// The current time is passed in to facilitate testing.
211// It has no receiver, to facilitate testing.
212func syncOne(cj *batchv1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
213	nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
214
215	childrenJobs := make(map[types.UID]bool)
216	for _, j := range js {
217		childrenJobs[j.ObjectMeta.UID] = true
218		found := inActiveList(*cj, j.ObjectMeta.UID)
219		if !found && !IsJobFinished(&j) {
220			recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
221			// We found an unfinished job that has us as the parent, but it is not in our Active list.
222			// This could happen if we crashed right after creating the Job and before updating the status,
223			// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
224			// a job that they wanted us to adopt.
225
226			// TODO: maybe handle the adoption case?  Concurrency/suspend rules will not apply in that case, obviously, since we can't
227			// stop users from creating jobs if they have permission.  It is assumed that if a
228			// user has permission to create a job within a namespace, then they have permission to make any cronJob
229			// in the same namespace "adopt" that job.  ReplicaSets and their Pods work the same way.
230			// TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
231		} else if found && IsJobFinished(&j) {
232			_, status := getFinishedStatus(&j)
233			deleteFromActiveList(cj, j.ObjectMeta.UID)
234			recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
235		}
236	}
237
238	// Remove any job reference from the active list if the corresponding job does not exist any more.
239	// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
240	// job running.
241	for _, j := range cj.Status.Active {
242		if found := childrenJobs[j.UID]; !found {
243			recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
244			deleteFromActiveList(cj, j.UID)
245		}
246	}
247
248	updatedCJ, err := cjc.UpdateStatus(cj)
249	if err != nil {
250		klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
251		return
252	}
253	*cj = *updatedCJ
254
255	if cj.DeletionTimestamp != nil {
256		// The CronJob is being deleted.
257		// Don't do anything other than updating status.
258		return
259	}
260
261	if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
262		klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
263		return
264	}
265
266	times, err := getRecentUnmetScheduleTimes(*cj, now)
267	if err != nil {
268		recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
269		klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
270		return
271	}
272	// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
273	if len(times) == 0 {
274		klog.V(4).Infof("No unmet start times for %s", nameForLog)
275		return
276	}
277	if len(times) > 1 {
278		klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
279	}
280
281	scheduledTime := times[len(times)-1]
282	tooLate := false
283	if cj.Spec.StartingDeadlineSeconds != nil {
284		tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
285	}
286	if tooLate {
287		klog.V(4).Infof("Missed starting window for %s", nameForLog)
288		recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
289		// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
290		// the miss every cycle.  In order to avoid sending multiple events, and to avoid processing
291		// the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
292		// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
293		// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
294		// and event the next time we process it, and also so the user looking at the status
295		// can see easily that there was a missed execution.
296		return
297	}
298	if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
299		// Regardless which source of information we use for the set of active jobs,
300		// there is some risk that we won't see an active job when there is one.
301		// (because we haven't seen the status update to the SJ or the created pod).
302		// So it is theoretically possible to have concurrency with Forbid.
303		// As long the as the invocations are "far enough apart in time", this usually won't happen.
304		//
305		// TODO: for Forbid, we could use the same name for every execution, as a lock.
306		// With replace, we could use a name that is deterministic per execution time.
307		// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
308		klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
309		return
310	}
311	if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
312		for _, j := range cj.Status.Active {
313			klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
314
315			job, err := jc.GetJob(j.Namespace, j.Name)
316			if err != nil {
317				recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
318				return
319			}
320			if !deleteJob(cj, job, jc, recorder) {
321				return
322			}
323		}
324	}
325
326	jobReq, err := getJobFromTemplate(cj, scheduledTime)
327	if err != nil {
328		klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
329		return
330	}
331	jobResp, err := jc.CreateJob(cj.Namespace, jobReq)
332	if err != nil {
333		// If the namespace is being torn down, we can safely ignore
334		// this error since all subsequent creations will fail.
335		if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
336			recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
337		}
338		return
339	}
340	klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
341	recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
342
343	// ------------------------------------------------------------------ //
344
345	// If this process restarts at this point (after posting a job, but
346	// before updating the status), then we might try to start the job on
347	// the next time.  Actually, if we re-list the SJs and Jobs on the next
348	// iteration of syncAll, we might not see our own status update, and
349	// then post one again.  So, we need to use the job name as a lock to
350	// prevent us from making the job twice (name the job with hash of its
351	// scheduled time).
352
353	// Add the just-started job to the status list.
354	ref, err := getRef(jobResp)
355	if err != nil {
356		klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
357	} else {
358		cj.Status.Active = append(cj.Status.Active, *ref)
359	}
360	cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
361	if _, err := cjc.UpdateStatus(cj); err != nil {
362		klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
363	}
364
365	return
366}
367
368// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
369func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
370	nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
371
372	// delete the job itself...
373	if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
374		recorder.Eventf(cj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
375		klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
376		return false
377	}
378	// ... and its reference from active list
379	deleteFromActiveList(cj, job.ObjectMeta.UID)
380	recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
381
382	return true
383}
384
385func getRef(object runtime.Object) (*v1.ObjectReference, error) {
386	return ref.GetReference(scheme.Scheme, object)
387}
388