1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Bacula job queue routines.
21 *
22 * This code consists of three queues, the waiting_jobs
23 * queue, where jobs are initially queued, the ready_jobs
24 * queue, where jobs are placed when all the resources are
25 * allocated and they can immediately be run, and the
26 * running queue where jobs are placed when they are
27 * running.
28 *
29 * Kern Sibbald, July MMIII
30 *
31 *
32 * This code was adapted from the Bacula workq, which was
33 * adapted from "Programming with POSIX Threads", by
34 * David R. Butenhof
35 *
36 */
37
38 #include "bacula.h"
39 #include "dird.h"
40
41 extern JCR *jobs;
42
43 /* Forward referenced functions */
44 extern "C" void *jobq_server(void *arg);
45 extern "C" void *sched_wait(void *arg);
46
47 static int start_server(jobq_t *jq);
48 static bool acquire_resources(JCR *jcr);
49 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
50 static void dec_write_store(JCR *jcr);
51
52 /*
53 * Initialize a job queue
54 *
55 * Returns: 0 on success
56 * errno on failure
57 */
jobq_init(jobq_t * jq,int threads,void * (* engine)(void * arg))58 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
59 {
60 int stat;
61 jobq_item_t *item = NULL;
62
63 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
64 berrno be;
65 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
66 return stat;
67 }
68 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69 pthread_attr_destroy(&jq->attr);
70 return stat;
71 }
72 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
73 berrno be;
74 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
75 pthread_attr_destroy(&jq->attr);
76 return stat;
77 }
78 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
79 berrno be;
80 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
81 pthread_mutex_destroy(&jq->mutex);
82 pthread_attr_destroy(&jq->attr);
83 return stat;
84 }
85 jq->quit = false;
86 jq->max_workers = threads; /* max threads to create */
87 jq->num_workers = 0; /* no threads yet */
88 jq->idle_workers = 0; /* no idle threads */
89 jq->engine = engine; /* routine to run */
90 jq->valid = JOBQ_VALID;
91 /* Initialize the job queues */
92 jq->waiting_jobs = New(dlist(item, &item->link));
93 jq->running_jobs = New(dlist(item, &item->link));
94 jq->ready_jobs = New(dlist(item, &item->link));
95 return 0;
96 }
97
98 /*
99 * Destroy the job queue
100 *
101 * Returns: 0 on success
102 * errno on failure
103 */
jobq_destroy(jobq_t * jq)104 int jobq_destroy(jobq_t *jq)
105 {
106 int stat, stat1, stat2;
107
108 if (jq->valid != JOBQ_VALID) {
109 return EINVAL;
110 }
111 P(jq->mutex);
112 jq->valid = 0; /* prevent any more operations */
113
114 /*
115 * If any threads are active, wake them
116 */
117 if (jq->num_workers > 0) {
118 jq->quit = true;
119 if (jq->idle_workers) {
120 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
121 berrno be;
122 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
123 V(jq->mutex);
124 return stat;
125 }
126 }
127 while (jq->num_workers > 0) {
128 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
129 berrno be;
130 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
131 V(jq->mutex);
132 return stat;
133 }
134 }
135 }
136 V(jq->mutex);
137 stat = pthread_mutex_destroy(&jq->mutex);
138 stat1 = pthread_cond_destroy(&jq->work);
139 stat2 = pthread_attr_destroy(&jq->attr);
140 delete jq->waiting_jobs;
141 delete jq->running_jobs;
142 delete jq->ready_jobs;
143 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
144 }
145
146 struct wait_pkt {
147 JCR *jcr;
148 jobq_t *jq;
149 };
150
151 /*
152 * Wait until schedule time arrives before starting. Normally
153 * this routine is only used for jobs started from the console
154 * for which the user explicitly specified a start time. Otherwise
155 * most jobs are put into the job queue only when their
156 * scheduled time arrives.
157 */
158 extern "C"
sched_wait(void * arg)159 void *sched_wait(void *arg)
160 {
161 JCR *jcr = ((wait_pkt *)arg)->jcr;
162 jobq_t *jq = ((wait_pkt *)arg)->jq;
163
164 set_jcr_in_tsd(INVALID_JCR);
165 Dmsg0(2300, "Enter sched_wait.\n");
166 free(arg);
167 time_t wtime = jcr->sched_time - time(NULL);
168 jcr->setJobStatus(JS_WaitStartTime);
169 /* Wait until scheduled time arrives */
170 if (wtime > 0) {
171 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
172 jcr->Job, wtime);
173 }
174 /* Check every 30 seconds if canceled */
175 while (wtime > 0) {
176 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
177 jcr->JobId, wtime, jcr->use_count());
178 if (wtime > 30) {
179 wtime = 30;
180 }
181 bmicrosleep(wtime, 0);
182 if (job_canceled(jcr)) {
183 break;
184 }
185 wtime = jcr->sched_time - time(NULL);
186 }
187 Dmsg1(200, "resched use=%d\n", jcr->use_count());
188 jobq_add(jq, jcr);
189 free_jcr(jcr); /* we are done with jcr */
190 Dmsg0(2300, "Exit sched_wait\n");
191 return NULL;
192 }
193
194 /* Procedure to update the client->NumConcurrentJobs */
update_client_numconcurrentjobs(JCR * jcr,int val)195 static void update_client_numconcurrentjobs(JCR *jcr, int val)
196 {
197 if (!jcr->client) {
198 return;
199 }
200
201 switch (jcr->getJobType())
202 {
203 case JT_MIGRATE:
204 case JT_COPY:
205 case JT_ADMIN:
206 break;
207 case JT_BACKUP:
208 /* Fall through wanted */
209 default:
210 if (jcr->no_client_used() || jcr->wasVirtualFull) {
211 break;
212 }
213 jcr->client->incNumConcurrentJobs(val);
214 break;
215 }
216 }
217
218 /*
219 * Add a job to the queue
220 * jq is a queue that was created with jobq_init
221 */
jobq_add(jobq_t * jq,JCR * jcr)222 int jobq_add(jobq_t *jq, JCR *jcr)
223 {
224 int stat;
225 jobq_item_t *item, *li;
226 bool inserted = false;
227 time_t wtime = jcr->sched_time - time(NULL);
228 pthread_t id;
229 wait_pkt *sched_pkt;
230
231 if (!jcr->term_wait_inited) {
232 /* Initialize termination condition variable */
233 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
234 berrno be;
235 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
236 return stat;
237 }
238 jcr->term_wait_inited = true;
239 }
240
241 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
242 if (jq->valid != JOBQ_VALID) {
243 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
244 return EINVAL;
245 }
246
247 jcr->inc_use_count(); /* mark jcr in use by us */
248 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
249 if (!job_canceled(jcr) && wtime > 0) {
250 set_thread_concurrency(jq->max_workers + 2);
251 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
252 sched_pkt->jcr = jcr;
253 sched_pkt->jq = jq;
254 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
255 if (stat != 0) { /* thread not created */
256 berrno be;
257 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
258 }
259 return stat;
260 }
261
262 P(jq->mutex);
263
264 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
265 free_jcr(jcr); /* release jcr */
266 return ENOMEM;
267 }
268 item->jcr = jcr;
269
270 /* While waiting in a queue this job is not attached to a thread */
271 set_jcr_in_tsd(INVALID_JCR);
272 if (job_canceled(jcr)) {
273 /* Add job to ready queue so that it is canceled quickly */
274 jq->ready_jobs->prepend(item);
275 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
276 } else {
277 /* Add this job to the wait queue in priority sorted order */
278 foreach_dlist(li, jq->waiting_jobs) {
279 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
280 li->jcr->JobId, li->jcr->JobPriority);
281 if (li->jcr->JobPriority > jcr->JobPriority) {
282 jq->waiting_jobs->insert_before(item, li);
283 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
284 li->jcr->JobId, jcr->JobId);
285 inserted = true;
286 break;
287 }
288 }
289 /* If not jobs in wait queue, append it */
290 if (!inserted) {
291 jq->waiting_jobs->append(item);
292 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
293 }
294 }
295
296 /* Ensure that at least one server looks at the queue. */
297 stat = start_server(jq);
298
299 V(jq->mutex);
300 Dmsg0(2300, "Return jobq_add\n");
301 return stat;
302 }
303
304 /*
305 * Remove a job from the job queue. Used only by cancel_job().
306 * jq is a queue that was created with jobq_init
307 * work_item is an element of work
308 *
309 * Note, it is "removed" from the job queue.
310 * If you want to cancel it, you need to provide some external means
311 * of doing so (e.g. pthread_kill()).
312 */
jobq_remove(jobq_t * jq,JCR * jcr)313 int jobq_remove(jobq_t *jq, JCR *jcr)
314 {
315 int stat;
316 bool found = false;
317 jobq_item_t *item;
318
319 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
320 if (jq->valid != JOBQ_VALID) {
321 return EINVAL;
322 }
323
324 P(jq->mutex);
325 foreach_dlist(item, jq->waiting_jobs) {
326 if (jcr == item->jcr) {
327 found = true;
328 break;
329 }
330 }
331 if (!found) {
332 V(jq->mutex);
333 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
334 return EINVAL;
335 }
336
337 /* Move item to be the first on the list */
338 jq->waiting_jobs->remove(item);
339 jq->ready_jobs->prepend(item);
340 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
341
342 stat = start_server(jq);
343
344 V(jq->mutex);
345 Dmsg0(2300, "Return jobq_remove\n");
346 return stat;
347 }
348
349
350 /*
351 * Start the server thread if it isn't already running
352 */
start_server(jobq_t * jq)353 static int start_server(jobq_t *jq)
354 {
355 int stat = 0;
356 pthread_t id;
357
358 /*
359 * if any threads are idle, wake one.
360 * Actually we do a broadcast because on /lib/tls
361 * these signals seem to get lost from time to time.
362 */
363 if (jq->idle_workers > 0) {
364 Dmsg0(2300, "Signal worker to wake up\n");
365 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
366 berrno be;
367 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
368 return stat;
369 }
370 } else if (jq->num_workers < jq->max_workers) {
371 Dmsg0(2300, "Create worker thread\n");
372 /* No idle threads so create a new one */
373 set_thread_concurrency(jq->max_workers + 1);
374 jq->num_workers++;
375 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
376 berrno be;
377 jq->num_workers--;
378 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
379 return stat;
380 }
381 }
382 return stat;
383 }
384
385
386 /*
387 * This is the worker thread that serves the job queue.
388 * When all the resources are acquired for the job,
389 * it will call the user's engine.
390 */
391 extern "C"
jobq_server(void * arg)392 void *jobq_server(void *arg)
393 {
394 struct timespec timeout;
395 jobq_t *jq = (jobq_t *)arg;
396 jobq_item_t *je; /* job entry in queue */
397 int stat;
398 bool timedout = false;
399 bool work = true;
400
401 set_jcr_in_tsd(INVALID_JCR);
402 Dmsg0(2300, "Start jobq_server\n");
403 P(jq->mutex);
404
405 for (;;) {
406 struct timeval tv;
407 struct timezone tz;
408
409 Dmsg0(2300, "Top of for loop\n");
410 if (!work && !jq->quit) {
411 gettimeofday(&tv, &tz);
412 timeout.tv_nsec = 0;
413 timeout.tv_sec = tv.tv_sec + 4;
414
415 while (!jq->quit) {
416 /*
417 * Wait 4 seconds, then if no more work, exit
418 */
419 Dmsg0(2300, "pthread_cond_timedwait()\n");
420 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
421 if (stat == ETIMEDOUT) {
422 Dmsg0(2300, "timedwait timedout.\n");
423 timedout = true;
424 break;
425 } else if (stat != 0) {
426 /* This shouldn't happen */
427 Dmsg0(2300, "This shouldn't happen\n");
428 jq->num_workers--;
429 V(jq->mutex);
430 return NULL;
431 }
432 break;
433 }
434 }
435 /*
436 * If anything is in the ready queue, run it
437 */
438 Dmsg0(2300, "Checking ready queue.\n");
439 while (!jq->ready_jobs->empty() && !jq->quit) {
440 JCR *jcr;
441 je = (jobq_item_t *)jq->ready_jobs->first();
442 jcr = je->jcr;
443 jq->ready_jobs->remove(je);
444 if (!jq->ready_jobs->empty()) {
445 Dmsg0(2300, "ready queue not empty start server\n");
446 if (start_server(jq) != 0) {
447 jq->num_workers--;
448 V(jq->mutex);
449 return NULL;
450 }
451 }
452 jq->running_jobs->append(je);
453
454 /* Attach jcr to this thread while we run the job */
455 jcr->my_thread_id = pthread_self();
456 jcr->set_killable(true);
457 set_jcr_in_tsd(jcr);
458 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
459
460 /* Release job queue lock */
461 V(jq->mutex);
462
463 /* Call user's routine here */
464 Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
465 jcr->use_count(), jcr->JobStatus);
466 jq->engine(je->jcr);
467
468 /* Job finished detach from thread */
469 remove_jcr_from_tsd(je->jcr);
470 je->jcr->set_killable(false);
471
472 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
473 jcr->use_count());
474
475 /* Reacquire job queue lock */
476 P(jq->mutex);
477 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
478 jq->running_jobs->remove(je);
479 /*
480 * Release locks if acquired. Note, they will not have
481 * been acquired for jobs canceled before they were
482 * put into the ready queue.
483 */
484 if (jcr->acquired_resource_locks) {
485 dec_read_store(jcr);
486 dec_write_store(jcr);
487 update_client_numconcurrentjobs(jcr, -1);
488 jcr->job->incNumConcurrentJobs(-1);
489 jcr->acquired_resource_locks = false;
490 }
491
492 if (reschedule_job(jcr, jq, je)) {
493 continue; /* go look for more work */
494 }
495
496 /* Clean up and release old jcr */
497 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
498 jcr->SDJobStatus = 0;
499 V(jq->mutex); /* release internal lock */
500 free_jcr(jcr);
501 free(je); /* release job entry */
502 P(jq->mutex); /* reacquire job queue lock */
503 }
504 /*
505 * If any job in the wait queue can be run,
506 * move it to the ready queue
507 */
508 Dmsg0(2300, "Done check ready, now check wait queue.\n");
509 if (!jq->waiting_jobs->empty() && !jq->quit) {
510 int Priority;
511 bool running_allow_mix = false;
512 je = (jobq_item_t *)jq->waiting_jobs->first();
513 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
514 if (re) {
515 Priority = re->jcr->JobPriority;
516 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
517 re->jcr->JobId, Priority);
518 running_allow_mix = true;
519 for ( ; re; ) {
520 Dmsg2(2300, "JobId %d is also running with %s\n",
521 re->jcr->JobId,
522 re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
523 if (!re->jcr->job->allow_mixed_priority) {
524 running_allow_mix = false;
525 break;
526 }
527 re = (jobq_item_t *)jq->running_jobs->next(re);
528 }
529 Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
530 running_allow_mix ? "allow" : "don't allow");
531 } else {
532 Priority = je->jcr->JobPriority;
533 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
534 }
535 /*
536 * Walk down the list of waiting jobs and attempt
537 * to acquire the resources it needs.
538 */
539 for ( ; je; ) {
540 /* je is current job item on the queue, jn is the next one */
541 JCR *jcr = je->jcr;
542 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
543
544 Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
545 jcr->JobId, jcr->JobPriority, Priority,
546 jcr->job->allow_mixed_priority ? "mix" : "no mix");
547
548 /* Take only jobs of correct Priority */
549 if (!(jcr->JobPriority == Priority
550 || (jcr->JobPriority < Priority &&
551 jcr->job->allow_mixed_priority && running_allow_mix))) {
552 jcr->setJobStatus(JS_WaitPriority);
553 break;
554 }
555
556 if (!acquire_resources(jcr)) {
557 /* If resource conflict, job is canceled */
558 if (!job_canceled(jcr)) {
559 je = jn; /* point to next waiting job */
560 continue;
561 }
562 }
563
564 /*
565 * Got all locks, now remove it from wait queue and append it
566 * to the ready queue. Note, we may also get here if the
567 * job was canceled. Once it is "run", it will quickly
568 * terminate.
569 */
570 jq->waiting_jobs->remove(je);
571 jq->ready_jobs->append(je);
572 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
573 je = jn; /* Point to next waiting job */
574 } /* end for loop */
575
576 } /* end if */
577
578 Dmsg0(2300, "Done checking wait queue.\n");
579 /*
580 * If no more ready work and we are asked to quit, then do it
581 */
582 if (jq->ready_jobs->empty() && jq->quit) {
583 jq->num_workers--;
584 if (jq->num_workers == 0) {
585 Dmsg0(2300, "Wake up destroy routine\n");
586 /* Wake up destroy routine if he is waiting */
587 pthread_cond_broadcast(&jq->work);
588 }
589 break;
590 }
591 Dmsg0(2300, "Check for work request\n");
592 /*
593 * If no more work requests, and we waited long enough, quit
594 */
595 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
596 jq->ready_jobs->empty());
597 if (jq->ready_jobs->empty() && timedout) {
598 Dmsg0(2300, "break big loop\n");
599 jq->num_workers--;
600 break;
601 }
602
603 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
604 if (work) {
605 /*
606 * If a job is waiting on a Resource, don't consume all
607 * the CPU time looping looking for work, and even more
608 * important, release the lock so that a job that has
609 * terminated can give us the resource.
610 */
611 V(jq->mutex);
612 bmicrosleep(2, 0); /* pause for 2 seconds */
613 P(jq->mutex);
614 /* Recompute work as something may have changed in last 2 secs */
615 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
616 }
617 Dmsg1(2300, "Loop again. work=%d\n", work);
618 } /* end of big for loop */
619
620 Dmsg0(200, "unlock mutex\n");
621 V(jq->mutex);
622 Dmsg0(2300, "End jobq_server\n");
623 return NULL;
624 }
625
626 /*
627 * Returns true if cleanup done and we should look for more work
628 */
reschedule_job(JCR * jcr,jobq_t * jq,jobq_item_t * je)629 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
630 {
631 bool resched = false;
632 /*
633 * Reschedule the job if requested and possible
634 */
635 /* Basic condition is that more reschedule times remain */
636 if (jcr->job->RescheduleTimes == 0 ||
637 jcr->reschedule_count < jcr->job->RescheduleTimes) {
638
639 /* Check for incomplete jobs */
640 if (jcr->is_incomplete()) {
641 resched = (jcr->RescheduleIncompleteJobs && jcr->is_JobType(JT_BACKUP) &&
642 !(jcr->HasBase||jcr->is_JobLevel(L_BASE)));
643 } else {
644 /* Check for failed jobs */
645 resched = (jcr->job->RescheduleOnError &&
646 !jcr->is_JobStatus(JS_Terminated) &&
647 !jcr->is_JobStatus(JS_Canceled) &&
648 jcr->is_JobType(JT_BACKUP));
649 }
650 }
651 if (resched) {
652 char dt[50], dt2[50];
653
654 /*
655 * Reschedule this job by cleaning it up, but
656 * reuse the same JobId if possible.
657 */
658 jcr->rerunning = jcr->is_incomplete(); /* save incomplete status */
659 time_t now = time(NULL);
660 jcr->reschedule_count++;
661 jcr->sched_time = now + jcr->job->RescheduleInterval;
662 bstrftime(dt, sizeof(dt), now);
663 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
664 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
665 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
666 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
667 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
668 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
669 jcr->JobStatus = -1;
670 jcr->setJobStatus(JS_WaitStartTime);
671 jcr->SDJobStatus = 0;
672 jcr->JobErrors = 0;
673 if (!allow_duplicate_job(jcr)) {
674 return false;
675 }
676 /* Only jobs with no output or Incomplete jobs can run on same JCR */
677 if (jcr->JobBytes == 0 || jcr->rerunning) {
678 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
679 V(jq->mutex);
680 /*
681 * Special test here since a Virtual Full gets marked
682 * as a Full, so we look at the resource record
683 */
684 if (jcr->wasVirtualFull) {
685 jcr->setJobLevel(L_VIRTUAL_FULL);
686 }
687 /*
688 * When we are using the same jcr then make sure to reset
689 * RealEndTime back to zero.
690 */
691 jcr->jr.RealEndTime = 0;
692 jobq_add(jq, jcr); /* queue the job to run again */
693 P(jq->mutex);
694 free_jcr(jcr); /* release jcr */
695 free(je); /* free the job entry */
696 return true; /* we already cleaned up */
697 }
698 /*
699 * Something was actually backed up, so we cannot reuse
700 * the old JobId or there will be database record
701 * conflicts. We now create a new job, copying the
702 * appropriate fields.
703 */
704 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
705 set_jcr_defaults(njcr, jcr->job);
706 /*
707 * Eliminate the new job_end_push, then copy the one from
708 * the old job, and set the old one to be empty.
709 */
710 void *v;
711 lock_jobs(); /* protect ourself from reload_config() */
712 LockRes();
713 foreach_alist(v, (&jcr->job_end_push)) {
714 njcr->job_end_push.append(v);
715 }
716 jcr->job_end_push.destroy();
717 jcr->job_end_push.init(1, false);
718 UnlockRes();
719 unlock_jobs();
720
721 njcr->reschedule_count = jcr->reschedule_count;
722 njcr->sched_time = jcr->sched_time;
723 njcr->initial_sched_time = jcr->initial_sched_time;
724 /*
725 * Special test here since a Virtual Full gets marked
726 * as a Full, so we look at the resource record
727 */
728 if (jcr->wasVirtualFull) {
729 njcr->setJobLevel(L_VIRTUAL_FULL);
730 } else {
731 njcr->setJobLevel(jcr->getJobLevel());
732 }
733 njcr->pool = jcr->pool;
734 njcr->run_pool_override = jcr->run_pool_override;
735 njcr->next_pool = jcr->next_pool;
736 njcr->run_next_pool_override = jcr->run_next_pool_override;
737 njcr->full_pool = jcr->full_pool;
738 njcr->vfull_pool = jcr->vfull_pool;
739 njcr->run_full_pool_override = jcr->run_full_pool_override;
740 njcr->run_vfull_pool_override = jcr->run_vfull_pool_override;
741 njcr->inc_pool = jcr->inc_pool;
742 njcr->run_inc_pool_override = jcr->run_inc_pool_override;
743 njcr->diff_pool = jcr->diff_pool;
744 njcr->JobStatus = -1;
745 njcr->setJobStatus(jcr->JobStatus);
746 if (jcr->rstore) {
747 copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
748 } else {
749 free_rstorage(njcr);
750 }
751 if (jcr->wstore) {
752 copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
753 } else {
754 free_wstorage(njcr);
755 }
756 njcr->messages = jcr->messages;
757 njcr->spool_data = jcr->spool_data;
758 njcr->write_part_after_job = jcr->write_part_after_job;
759 Dmsg0(2300, "Call to run new job\n");
760 V(jq->mutex);
761 run_job(njcr); /* This creates a "new" job */
762 free_jcr(njcr); /* release "new" jcr */
763 P(jq->mutex);
764 Dmsg0(2300, "Back from running new job.\n");
765 }
766 return false;
767 }
768
769 /*
770 * See if we can acquire all the necessary resources for the job (JCR)
771 *
772 * Returns: true if successful
773 * false if resource failure
774 */
acquire_resources(JCR * jcr)775 static bool acquire_resources(JCR *jcr)
776 {
777 bool skip_this_jcr = false;
778
779 jcr->acquired_resource_locks = false;
780 /*
781 * Turning this code off is likely to cause some deadlocks,
782 * but we do not really have enough information here to
783 * know if this is really a deadlock (it may be a dual drive
784 * autochanger), and in principle, the SD reservation system
785 * should detect these deadlocks, so push the work off on it.
786 */
787 #ifdef xxx
788 if (jcr->rstore && jcr->rstore == jcr->wstore) { /* possible deadlock */
789 Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
790 " Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
791 jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
792 jcr->setJobStatus(JS_Canceled);
793 return false;
794 }
795 #endif
796 if (jcr->rstore) {
797 Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
798 if (!inc_read_store(jcr)) {
799 Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->getNumConcurrentJobs());
800 jcr->setJobStatus(JS_WaitStoreRes);
801 return false;
802 }
803 }
804
805 if (jcr->wstore) {
806 Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
807 int num = jcr->wstore->getNumConcurrentJobs();
808 if (num < jcr->wstore->MaxConcurrentJobs) {
809 num = jcr->wstore->incNumConcurrentJobs(1);
810 Dmsg1(200, "Inc wncj=%d\n", num);
811 } else if (jcr->rstore) {
812 dec_read_store(jcr);
813 skip_this_jcr = true;
814 } else {
815 Dmsg1(200, "Fail wncj=%d\n", num);
816 skip_this_jcr = true;
817 }
818 }
819 if (skip_this_jcr) {
820 jcr->setJobStatus(JS_WaitStoreRes);
821 return false;
822 }
823
824 if (jcr->client) {
825 if (jcr->client->getNumConcurrentJobs() < jcr->client->MaxConcurrentJobs) {
826 update_client_numconcurrentjobs(jcr, 1);
827 } else {
828 /* Back out previous locks */
829 dec_write_store(jcr);
830 dec_read_store(jcr);
831 jcr->setJobStatus(JS_WaitClientRes);
832 return false;
833 }
834 }
835 if (jcr->job->getNumConcurrentJobs() < jcr->job->MaxConcurrentJobs) {
836 jcr->job->incNumConcurrentJobs(1);
837 } else {
838 /* Back out previous locks */
839 dec_write_store(jcr);
840 dec_read_store(jcr);
841 update_client_numconcurrentjobs(jcr, -1);
842 jcr->setJobStatus(JS_WaitJobRes);
843 return false;
844 }
845
846 jcr->acquired_resource_locks = true;
847 return true;
848 }
849
850 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
851
852 /*
853 * Note: inc_read_store() and dec_read_store() are
854 * called from select_rstore() in src/dird/restore.c
855 */
inc_read_store(JCR * jcr)856 bool inc_read_store(JCR *jcr)
857 {
858 P(rstore_mutex);
859 int num = jcr->rstore->getNumConcurrentJobs();
860 int numread = jcr->rstore->getNumConcurrentReadJobs();
861 int maxread = jcr->rstore->MaxConcurrentReadJobs;
862 if (num < jcr->rstore->MaxConcurrentJobs &&
863 (jcr->getJobType() == JT_RESTORE ||
864 numread == 0 ||
865 maxread == 0 || /* No limit set */
866 numread < maxread)) /* Below the limit */
867 {
868 numread = jcr->rstore->incNumConcurrentReadJobs(1);
869 num = jcr->rstore->incNumConcurrentJobs(1);
870 Dmsg1(200, "Inc rncj=%d\n", num);
871 V(rstore_mutex);
872 return true;
873 }
874 V(rstore_mutex);
875 return false;
876 }
877
dec_read_store(JCR * jcr)878 void dec_read_store(JCR *jcr)
879 {
880 if (jcr->rstore) {
881 P(rstore_mutex);
882 jcr->rstore->incNumConcurrentReadJobs(-1);
883 int num = jcr->rstore->incNumConcurrentJobs(-1);
884 Dmsg1(200, "Dec rncj=%d\n", num);
885 V(rstore_mutex);
886 }
887 }
888
dec_write_store(JCR * jcr)889 static void dec_write_store(JCR *jcr)
890 {
891 if (jcr->wstore) {
892 int num = jcr->wstore->incNumConcurrentJobs(-1);
893 Dmsg1(200, "Dec wncj=%d\n", num);
894 }
895 }
896