1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 *
21 * Bacula scheduler
22 * It looks at what jobs are to be run and when
23 * and waits around until it is time to
24 * fire them up.
25 *
26 * Kern Sibbald, May MM, major revision December MMIII
27 *
28 */
29
30 #include "bacula.h"
31 #include "dird.h"
32
33 #if 0
34 #define SCHED_DEBUG
35 #define DBGLVL 0
36 #else
37 #undef SCHED_DEBUG
38 #define DBGLVL DT_SCHEDULER|200
39 #endif
40
41 const int dbglvl = DBGLVL;
42
43 /* Local variables */
44 struct job_item {
45 RUN *run;
46 JOB *job;
47 time_t runtime;
48 int Priority;
49 dlink link; /* link for list */
50 };
51
52 /* List of jobs to be run. They were scheduled in this hour or the next */
53 static dlist *jobs_to_run; /* list of jobs to be run */
54
55 /* Time interval in secs to sleep if nothing to be run */
56 static int const next_check_secs = 60;
57
58 /* Forward referenced subroutines */
59 static void find_runs();
60 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime);
61 static void dump_job(job_item *ji, const char *msg);
62
63 /* Imported subroutines */
64
65 /* Imported variables */
66
67 /*
68 * called by reload_config to tell us that the schedules
69 * we may have based our next jobs to run queues have been
70 * invalidated. In fact the schedules may not have changed
71 * but the run object that we have recorded the last_run time
72 * on are new and no longer have a valid last_run time which
73 * causes us to double run schedules that get put into the list
74 * because run_nh = 1.
75 */
76 static bool schedules_invalidated = false;
invalidate_schedules(void)77 void invalidate_schedules(void) {
78 schedules_invalidated = true;
79 }
80
81 /*********************************************************************
82 *
83 * Main Bacula Scheduler
84 *
85 */
wait_for_next_job(char * one_shot_job_to_run)86 JCR *wait_for_next_job(char *one_shot_job_to_run)
87 {
88 JCR *jcr;
89 JOB *job;
90 RUN *run;
91 time_t now, prev;
92 static bool first = true;
93 job_item *next_job = NULL;
94
95 Dmsg0(dbglvl, "Enter wait_for_next_job\n");
96 if (first) {
97 first = false;
98 /* Create scheduled jobs list */
99 jobs_to_run = New(dlist(next_job, &next_job->link));
100 if (one_shot_job_to_run) { /* one shot */
101 job = (JOB *)GetResWithName(R_JOB, one_shot_job_to_run);
102 if (!job) {
103 Emsg1(M_ABORT, 0, _("Job %s not found\n"), one_shot_job_to_run);
104 }
105 Dmsg1(5, "Found one_shot_job_to_run %s\n", one_shot_job_to_run);
106 jcr = new_jcr(sizeof(JCR), dird_free_jcr);
107 set_jcr_defaults(jcr, job);
108 return jcr;
109 }
110 }
111
112 /* Wait until we have something in the
113 * next hour or so.
114 */
115 again:
116 while (jobs_to_run->empty()) {
117 find_runs();
118 if (!jobs_to_run->empty()) {
119 break;
120 }
121 bmicrosleep(next_check_secs, 0); /* recheck once per minute */
122 }
123
124 #ifdef list_chain
125 job_item *je;
126 foreach_dlist(je, jobs_to_run) {
127 dump_job(je, _("Walk queue"));
128 }
129 #endif
130 /*
131 * Pull the first job to run (already sorted by runtime and
132 * Priority, then wait around until it is time to run it.
133 */
134 next_job = (job_item *)jobs_to_run->first();
135 jobs_to_run->remove(next_job);
136
137 dump_job(next_job, _("Dequeued job"));
138
139 if (!next_job) { /* we really should have something now */
140 Emsg0(M_ABORT, 0, _("Scheduler logic error\n"));
141 }
142
143 /* Now wait for the time to run the job */
144 for (;;) {
145 time_t twait;
146 /** discard scheduled queue and rebuild with new schedule objects. **/
147 lock_jobs();
148 if (schedules_invalidated) {
149 dump_job(next_job, "Invalidated job");
150 free(next_job);
151 while (!jobs_to_run->empty()) {
152 next_job = (job_item *)jobs_to_run->first();
153 jobs_to_run->remove(next_job);
154 dump_job(next_job, "Invalidated job");
155 free(next_job);
156 }
157 schedules_invalidated = false;
158 unlock_jobs();
159 goto again;
160 }
161 unlock_jobs();
162 prev = now = time(NULL);
163 twait = next_job->runtime - now;
164 if (twait <= 0) { /* time to run it */
165 break;
166 }
167 /* Recheck at least once per minute */
168 bmicrosleep((next_check_secs < twait)?next_check_secs:twait, 0);
169 /* Attempt to handle clock shift (but not daylight savings time changes)
170 * we allow a skew of 10 seconds before invalidating everything.
171 */
172 now = time(NULL);
173 if (now < prev-10 || now > (prev+next_check_secs+10)) {
174 schedules_invalidated = true;
175 }
176 }
177 jcr = new_jcr(sizeof(JCR), dird_free_jcr);
178 run = next_job->run; /* pick up needed values */
179 job = next_job->job;
180 if (job->is_enabled() && (!job->client || job->client->is_enabled())) {
181 dump_job(next_job, _("Run job")); /* no client and job enabled */
182 }
183 free(next_job);
184 if (!job->is_enabled() || (job->client && !job->client->is_enabled())) {
185 free_jcr(jcr);
186 goto again; /* ignore this job */
187 }
188 run->last_run = now; /* mark as run now */
189
190 ASSERT(job);
191 set_jcr_defaults(jcr, job);
192 if (run->level) {
193 jcr->setJobLevel(run->level); /* override run level */
194 }
195 if (run->pool) {
196 jcr->pool = run->pool; /* override pool */
197 jcr->run_pool_override = true;
198 }
199 if (run->next_pool) {
200 jcr->next_pool = run->next_pool; /* override next pool */
201 jcr->run_next_pool_override = true;
202 }
203 if (run->full_pool) {
204 jcr->full_pool = run->full_pool; /* override full pool */
205 jcr->run_full_pool_override = true;
206 }
207 if (run->vfull_pool) {
208 jcr->vfull_pool = run->vfull_pool; /* override virtual full pool */
209 jcr->run_vfull_pool_override = true;
210 }
211 if (run->inc_pool) {
212 jcr->inc_pool = run->inc_pool; /* override inc pool */
213 jcr->run_inc_pool_override = true;
214 }
215 if (run->diff_pool) {
216 jcr->diff_pool = run->diff_pool; /* override dif pool */
217 jcr->run_diff_pool_override = true;
218 }
219 if (run->storage) {
220 USTORE store;
221 store.store = run->storage;
222 pm_strcpy(store.store_source, _("run override"));
223 set_rwstorage(jcr, &store); /* override storage */
224 }
225 if (run->msgs) {
226 jcr->messages = run->msgs; /* override messages */
227 }
228 if (run->Priority) {
229 jcr->JobPriority = run->Priority;
230 }
231 if (run->spool_data_set) {
232 jcr->spool_data = run->spool_data;
233 }
234 if (run->accurate_set) { /* overwrite accurate mode */
235 jcr->accurate = run->accurate;
236 }
237 if (run->write_part_after_job_set) {
238 jcr->write_part_after_job = run->write_part_after_job;
239 }
240 if (run->MaxRunSchedTime_set) {
241 jcr->MaxRunSchedTime = run->MaxRunSchedTime;
242 }
243 Dmsg0(dbglvl, "Leave wait_for_next_job()\n");
244 return jcr;
245 }
246
247
248 /*
249 * Shutdown the scheduler
250 */
term_scheduler()251 void term_scheduler()
252 {
253 if (jobs_to_run) {
254 delete jobs_to_run;
255 }
256 }
257
258 /*
259 * Find all jobs to be run this hour and the next hour.
260 */
find_runs()261 static void find_runs()
262 {
263 time_t now, next_hour, runtime;
264 RUN *run;
265 JOB *job;
266 SCHED *sched;
267 struct tm tm;
268 int hour, mday, wday, month, wom, woy, ldom;
269 /* Items corresponding to above at the next hour */
270 int nh_hour, nh_mday, nh_wday, nh_month, nh_wom, nh_woy, nh_ldom;
271
272 Dmsg0(dbglvl, "enter find_runs()\n");
273
274 /* compute values for time now */
275 now = time(NULL);
276 (void)localtime_r(&now, &tm);
277 hour = tm.tm_hour;
278 mday = tm.tm_mday - 1;
279 wday = tm.tm_wday;
280 month = tm.tm_mon;
281 wom = mday / 7;
282 woy = tm_woy(now); /* get week of year */
283 ldom = tm_ldom(month, tm.tm_year + 1900);
284
285 Dmsg7(dbglvl, "now = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
286 now, hour, month, mday, wday, wom, woy);
287
288 /*
289 * Compute values for next hour from now.
290 * We do this to be sure we don't miss a job while
291 * sleeping.
292 */
293 next_hour = now + 3600;
294 (void)localtime_r(&next_hour, &tm);
295 nh_hour = tm.tm_hour;
296 nh_mday = tm.tm_mday - 1;
297 nh_wday = tm.tm_wday;
298 nh_month = tm.tm_mon;
299 nh_wom = nh_mday / 7;
300 nh_woy = tm_woy(next_hour); /* get week of year */
301 nh_ldom = tm_ldom(nh_month, tm.tm_year + 1900);
302
303 Dmsg7(dbglvl, "nh = %x: h=%d m=%d md=%d wd=%d wom=%d woy=%d\n",
304 next_hour, nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy);
305
306 /* Loop through all jobs */
307 LockRes();
308 foreach_res(job, R_JOB) {
309 sched = job->schedule;
310 if (!sched || !job->is_enabled() || (sched && !sched->is_enabled()) ||
311 (job->client && !job->client->is_enabled())) {
312 continue; /* no, skip this job */
313 }
314 Dmsg1(dbglvl, "Got job: %s\n", job->hdr.name);
315 for (run=sched->run; run; run=run->next) {
316 bool run_now, run_nh;
317 /*
318 * Find runs scheduled between now and the next hour.
319 */
320 #ifdef xxxx
321 Dmsg0(000, "\n");
322 Dmsg7(000, "run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
323 hour, month, mday, wday, wom, woy, ldom);
324 Dmsg7(000, "bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
325 bit_is_set(hour, run->hour),
326 bit_is_set(month, run->month),
327 bit_is_set(mday, run->mday),
328 bit_is_set(wday, run->wday),
329 bit_is_set(wom, run->wom),
330 bit_is_set(woy, run->woy),
331 bit_is_set(31, run->mday));
332
333
334 Dmsg7(000, "nh_run h=%d m=%d md=%d wd=%d wom=%d woy=%d ldom=%d\n",
335 nh_hour, nh_month, nh_mday, nh_wday, nh_wom, nh_woy, nh_ldom);
336 Dmsg7(000, "nh_bitset bsh=%d bsm=%d bsmd=%d bswd=%d bswom=%d bswoy=%d bsldom=%d\n",
337 bit_is_set(nh_hour, run->hour),
338 bit_is_set(nh_month, run->month),
339 bit_is_set(nh_mday, run->mday),
340 bit_is_set(nh_wday, run->wday),
341 bit_is_set(nh_wom, run->wom),
342 bit_is_set(nh_woy, run->woy),
343 bit_is_set(31, run->mday));
344 #endif
345
346 run_now = bit_is_set(hour, run->hour) &&
347 ((bit_is_set(mday, run->mday) &&
348 bit_is_set(wday, run->wday) &&
349 bit_is_set(month, run->month) &&
350 bit_is_set(wom, run->wom) &&
351 bit_is_set(woy, run->woy)) ||
352 (bit_is_set(month, run->month) &&
353 bit_is_set(31, run->mday) && mday == ldom));
354
355 run_nh = bit_is_set(nh_hour, run->hour) &&
356 ((bit_is_set(nh_mday, run->mday) &&
357 bit_is_set(nh_wday, run->wday) &&
358 bit_is_set(nh_month, run->month) &&
359 bit_is_set(nh_wom, run->wom) &&
360 bit_is_set(nh_woy, run->woy)) ||
361 (bit_is_set(nh_month, run->month) &&
362 bit_is_set(31, run->mday) && nh_mday == nh_ldom));
363
364 Dmsg3(dbglvl, "run@%p: run_now=%d run_nh=%d\n", run, run_now, run_nh);
365
366 if (run_now || run_nh) {
367 /* find time (time_t) job is to be run */
368 (void)localtime_r(&now, &tm); /* reset tm structure */
369 tm.tm_min = run->minute; /* set run minute */
370 tm.tm_sec = 0; /* zero secs */
371 runtime = mktime(&tm);
372 if (run_now) {
373 add_job(job, run, now, runtime);
374 }
375 /* If job is to be run in the next hour schedule it */
376 if (run_nh) {
377 add_job(job, run, now, runtime + 3600);
378 }
379 }
380 }
381 }
382 UnlockRes();
383 Dmsg0(dbglvl, "Leave find_runs()\n");
384 }
385
add_job(JOB * job,RUN * run,time_t now,time_t runtime)386 static void add_job(JOB *job, RUN *run, time_t now, time_t runtime)
387 {
388 job_item *ji;
389 bool inserted = false;
390 /*
391 * Don't run any job that ran less than a minute ago, but
392 * do run any job scheduled less than a minute ago.
393 */
394 if (((runtime - run->last_run) < 61) || ((runtime+59) < now)) {
395 #ifdef SCHED_DEBUG
396 Dmsg4(000, "Drop: Job=\"%s\" run=%lld. last_run=%lld. now=%lld\n", job->hdr.name,
397 (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
398 fflush(stdout);
399 #endif
400 return;
401 }
402 #ifdef SCHED_DEBUG
403 Dmsg4(000, "Add: Job=\"%s\" run=%lld last_run=%lld now=%lld\n", job->hdr.name,
404 (utime_t)runtime, (utime_t)run->last_run, (utime_t)now);
405 #endif
406 /* accept to run this job */
407 job_item *je = (job_item *)malloc(sizeof(job_item));
408 je->run = run;
409 je->job = job;
410 je->runtime = runtime;
411 if (run->Priority) {
412 je->Priority = run->Priority;
413 } else {
414 je->Priority = job->Priority;
415 }
416
417 /* Add this job to the wait queue in runtime, priority sorted order */
418 foreach_dlist(ji, jobs_to_run) {
419 if (ji->runtime > je->runtime ||
420 (ji->runtime == je->runtime && ji->Priority > je->Priority)) {
421 jobs_to_run->insert_before(je, ji);
422 dump_job(je, _("Inserted job"));
423 inserted = true;
424 break;
425 }
426 }
427 /* If place not found in queue, append it */
428 if (!inserted) {
429 jobs_to_run->append(je);
430 dump_job(je, _("Appended job"));
431 }
432 #ifdef SCHED_DEBUG
433 foreach_dlist(ji, jobs_to_run) {
434 dump_job(ji, _("Run queue"));
435 }
436 Dmsg0(000, "End run queue\n");
437 #endif
438 }
439
dump_job(job_item * ji,const char * msg)440 static void dump_job(job_item *ji, const char *msg)
441 {
442 #ifdef SCHED_DEBUG
443 char dt[MAX_TIME_LENGTH];
444 int64_t save_debug = debug_level;
445
446 if (!chk_dbglvl(dbglvl)) {
447 return;
448 }
449 bstrftime_nc(dt, sizeof(dt), ji->runtime);
450 Dmsg4(dbglvl, "%s: Job=%s priority=%d run %s\n", msg, ji->job->hdr.name,
451 ji->Priority, dt);
452 fflush(stdout);
453 debug_level = save_debug;
454 #endif
455 }
456