1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Bacula job queue routines.
21 *
22 * This code consists of three queues, the waiting_jobs
23 * queue, where jobs are initially queued, the ready_jobs
24 * queue, where jobs are placed when all the resources are
25 * allocated and they can immediately be run, and the
26 * running queue where jobs are placed when they are
27 * running.
28 *
29 * Kern Sibbald, July MMIII
30 *
31 *
32 * This code was adapted from the Bacula workq, which was
33 * adapted from "Programming with POSIX Threads", by
34 * David R. Butenhof
35 *
36 */
37
38 #include "bacula.h"
39 #include "dird.h"
40
41 extern JCR *jobs;
42
43 /* Forward referenced functions */
44 extern "C" void *jobq_server(void *arg);
45 extern "C" void *sched_wait(void *arg);
46
47 static int start_server(jobq_t *jq);
48 static bool acquire_resources(JCR *jcr);
49 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
50 static void dec_write_store(JCR *jcr);
51
52 /*
53 * Initialize a job queue
54 *
55 * Returns: 0 on success
56 * errno on failure
57 */
jobq_init(jobq_t * jq,int threads,void * (* engine)(void * arg))58 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
59 {
60 int stat;
61 jobq_item_t *item = NULL;
62
63 if ((stat = pthread_attr_init(&jq->attr)) != 0) {
64 berrno be;
65 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
66 return stat;
67 }
68 if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69 pthread_attr_destroy(&jq->attr);
70 return stat;
71 }
72 if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
73 berrno be;
74 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
75 pthread_attr_destroy(&jq->attr);
76 return stat;
77 }
78 if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
79 berrno be;
80 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
81 pthread_mutex_destroy(&jq->mutex);
82 pthread_attr_destroy(&jq->attr);
83 return stat;
84 }
85 jq->quit = false;
86 jq->max_workers = threads; /* max threads to create */
87 jq->num_workers = 0; /* no threads yet */
88 jq->idle_workers = 0; /* no idle threads */
89 jq->engine = engine; /* routine to run */
90 jq->valid = JOBQ_VALID;
91 /* Initialize the job queues */
92 jq->waiting_jobs = New(dlist(item, &item->link));
93 jq->running_jobs = New(dlist(item, &item->link));
94 jq->ready_jobs = New(dlist(item, &item->link));
95 return 0;
96 }
97
98 /*
99 * Destroy the job queue
100 *
101 * Returns: 0 on success
102 * errno on failure
103 */
jobq_destroy(jobq_t * jq)104 int jobq_destroy(jobq_t *jq)
105 {
106 int stat, stat1, stat2;
107
108 if (jq->valid != JOBQ_VALID) {
109 return EINVAL;
110 }
111 P(jq->mutex);
112 jq->valid = 0; /* prevent any more operations */
113
114 /*
115 * If any threads are active, wake them
116 */
117 if (jq->num_workers > 0) {
118 jq->quit = true;
119 if (jq->idle_workers) {
120 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
121 berrno be;
122 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
123 V(jq->mutex);
124 return stat;
125 }
126 }
127 while (jq->num_workers > 0) {
128 if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
129 berrno be;
130 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
131 V(jq->mutex);
132 return stat;
133 }
134 }
135 }
136 V(jq->mutex);
137 stat = pthread_mutex_destroy(&jq->mutex);
138 stat1 = pthread_cond_destroy(&jq->work);
139 stat2 = pthread_attr_destroy(&jq->attr);
140 delete jq->waiting_jobs;
141 delete jq->running_jobs;
142 delete jq->ready_jobs;
143 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
144 }
145
146 struct wait_pkt {
147 JCR *jcr;
148 jobq_t *jq;
149 };
150
151 /*
152 * Wait until schedule time arrives before starting. Normally
153 * this routine is only used for jobs started from the console
154 * for which the user explicitly specified a start time. Otherwise
155 * most jobs are put into the job queue only when their
156 * scheduled time arrives.
157 */
158 extern "C"
sched_wait(void * arg)159 void *sched_wait(void *arg)
160 {
161 JCR *jcr = ((wait_pkt *)arg)->jcr;
162 jobq_t *jq = ((wait_pkt *)arg)->jq;
163
164 set_jcr_in_tsd(INVALID_JCR);
165 Dmsg0(2300, "Enter sched_wait.\n");
166 free(arg);
167 time_t wtime = jcr->sched_time - time(NULL);
168 jcr->setJobStatus(JS_WaitStartTime);
169 /* Wait until scheduled time arrives */
170 if (wtime > 0) {
171 Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
172 jcr->Job, wtime);
173 }
174 /* Check every 30 seconds if canceled */
175 while (wtime > 0) {
176 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
177 jcr->JobId, wtime, jcr->use_count());
178 if (wtime > 30) {
179 wtime = 30;
180 }
181 bmicrosleep(wtime, 0);
182 if (job_canceled(jcr)) {
183 break;
184 }
185 wtime = jcr->sched_time - time(NULL);
186 }
187 Dmsg1(200, "resched use=%d\n", jcr->use_count());
188 jobq_add(jq, jcr);
189 free_jcr(jcr); /* we are done with jcr */
190 Dmsg0(2300, "Exit sched_wait\n");
191 return NULL;
192 }
193
194 /* Procedure to update the client->NumConcurrentJobs */
update_client_numconcurrentjobs(JCR * jcr,int val)195 static void update_client_numconcurrentjobs(JCR *jcr, int val)
196 {
197 int num;
198 if (!jcr->client) {
199 return;
200 }
201
202 switch (jcr->getJobType())
203 {
204 case JT_MIGRATE:
205 case JT_COPY:
206 case JT_ADMIN:
207 break;
208 case JT_BACKUP:
209 /* Fall through wanted */
210 default:
211 if (jcr->no_client_used() || jcr->wasVirtualFull) {
212 break;
213 }
214 num = jcr->client->getNumConcurrentJobs();
215 jcr->client->setNumConcurrentJobs(num + val);
216 break;
217 }
218 }
219
220 /*
221 * Add a job to the queue
222 * jq is a queue that was created with jobq_init
223 */
jobq_add(jobq_t * jq,JCR * jcr)224 int jobq_add(jobq_t *jq, JCR *jcr)
225 {
226 int stat;
227 jobq_item_t *item, *li;
228 bool inserted = false;
229 time_t wtime = jcr->sched_time - time(NULL);
230 pthread_t id;
231 wait_pkt *sched_pkt;
232
233 if (!jcr->term_wait_inited) {
234 /* Initialize termination condition variable */
235 if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
236 berrno be;
237 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
238 return stat;
239 }
240 jcr->term_wait_inited = true;
241 }
242
243 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
244 if (jq->valid != JOBQ_VALID) {
245 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
246 return EINVAL;
247 }
248
249 jcr->inc_use_count(); /* mark jcr in use by us */
250 Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
251 if (!job_canceled(jcr) && wtime > 0) {
252 set_thread_concurrency(jq->max_workers + 2);
253 sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
254 sched_pkt->jcr = jcr;
255 sched_pkt->jq = jq;
256 stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
257 if (stat != 0) { /* thread not created */
258 berrno be;
259 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
260 }
261 return stat;
262 }
263
264 P(jq->mutex);
265
266 if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
267 free_jcr(jcr); /* release jcr */
268 return ENOMEM;
269 }
270 item->jcr = jcr;
271
272 /* While waiting in a queue this job is not attached to a thread */
273 set_jcr_in_tsd(INVALID_JCR);
274 if (job_canceled(jcr)) {
275 /* Add job to ready queue so that it is canceled quickly */
276 jq->ready_jobs->prepend(item);
277 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
278 } else {
279 /* Add this job to the wait queue in priority sorted order */
280 foreach_dlist(li, jq->waiting_jobs) {
281 Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
282 li->jcr->JobId, li->jcr->JobPriority);
283 if (li->jcr->JobPriority > jcr->JobPriority) {
284 jq->waiting_jobs->insert_before(item, li);
285 Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
286 li->jcr->JobId, jcr->JobId);
287 inserted = true;
288 break;
289 }
290 }
291 /* If not jobs in wait queue, append it */
292 if (!inserted) {
293 jq->waiting_jobs->append(item);
294 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
295 }
296 }
297
298 /* Ensure that at least one server looks at the queue. */
299 stat = start_server(jq);
300
301 V(jq->mutex);
302 Dmsg0(2300, "Return jobq_add\n");
303 return stat;
304 }
305
306 /*
307 * Remove a job from the job queue. Used only by cancel_job().
308 * jq is a queue that was created with jobq_init
309 * work_item is an element of work
310 *
311 * Note, it is "removed" from the job queue.
312 * If you want to cancel it, you need to provide some external means
313 * of doing so (e.g. pthread_kill()).
314 */
jobq_remove(jobq_t * jq,JCR * jcr)315 int jobq_remove(jobq_t *jq, JCR *jcr)
316 {
317 int stat;
318 bool found = false;
319 jobq_item_t *item;
320
321 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
322 if (jq->valid != JOBQ_VALID) {
323 return EINVAL;
324 }
325
326 P(jq->mutex);
327 foreach_dlist(item, jq->waiting_jobs) {
328 if (jcr == item->jcr) {
329 found = true;
330 break;
331 }
332 }
333 if (!found) {
334 V(jq->mutex);
335 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
336 return EINVAL;
337 }
338
339 /* Move item to be the first on the list */
340 jq->waiting_jobs->remove(item);
341 jq->ready_jobs->prepend(item);
342 Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
343
344 stat = start_server(jq);
345
346 V(jq->mutex);
347 Dmsg0(2300, "Return jobq_remove\n");
348 return stat;
349 }
350
351
352 /*
353 * Start the server thread if it isn't already running
354 */
start_server(jobq_t * jq)355 static int start_server(jobq_t *jq)
356 {
357 int stat = 0;
358 pthread_t id;
359
360 /*
361 * if any threads are idle, wake one.
362 * Actually we do a broadcast because on /lib/tls
363 * these signals seem to get lost from time to time.
364 */
365 if (jq->idle_workers > 0) {
366 Dmsg0(2300, "Signal worker to wake up\n");
367 if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
368 berrno be;
369 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
370 return stat;
371 }
372 } else if (jq->num_workers < jq->max_workers) {
373 Dmsg0(2300, "Create worker thread\n");
374 /* No idle threads so create a new one */
375 set_thread_concurrency(jq->max_workers + 1);
376 jq->num_workers++;
377 if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
378 berrno be;
379 jq->num_workers--;
380 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
381 return stat;
382 }
383 }
384 return stat;
385 }
386
387
388 /*
389 * This is the worker thread that serves the job queue.
390 * When all the resources are acquired for the job,
391 * it will call the user's engine.
392 */
393 extern "C"
jobq_server(void * arg)394 void *jobq_server(void *arg)
395 {
396 struct timespec timeout;
397 jobq_t *jq = (jobq_t *)arg;
398 jobq_item_t *je; /* job entry in queue */
399 int stat;
400 bool timedout = false;
401 bool work = true;
402
403 set_jcr_in_tsd(INVALID_JCR);
404 Dmsg0(2300, "Start jobq_server\n");
405 P(jq->mutex);
406
407 for (;;) {
408 struct timeval tv;
409 struct timezone tz;
410
411 Dmsg0(2300, "Top of for loop\n");
412 if (!work && !jq->quit) {
413 gettimeofday(&tv, &tz);
414 timeout.tv_nsec = 0;
415 timeout.tv_sec = tv.tv_sec + 4;
416
417 while (!jq->quit) {
418 /*
419 * Wait 4 seconds, then if no more work, exit
420 */
421 Dmsg0(2300, "pthread_cond_timedwait()\n");
422 stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
423 if (stat == ETIMEDOUT) {
424 Dmsg0(2300, "timedwait timedout.\n");
425 timedout = true;
426 break;
427 } else if (stat != 0) {
428 /* This shouldn't happen */
429 Dmsg0(2300, "This shouldn't happen\n");
430 jq->num_workers--;
431 V(jq->mutex);
432 return NULL;
433 }
434 break;
435 }
436 }
437 /*
438 * If anything is in the ready queue, run it
439 */
440 Dmsg0(2300, "Checking ready queue.\n");
441 while (!jq->ready_jobs->empty() && !jq->quit) {
442 JCR *jcr;
443 je = (jobq_item_t *)jq->ready_jobs->first();
444 jcr = je->jcr;
445 jq->ready_jobs->remove(je);
446 if (!jq->ready_jobs->empty()) {
447 Dmsg0(2300, "ready queue not empty start server\n");
448 if (start_server(jq) != 0) {
449 jq->num_workers--;
450 V(jq->mutex);
451 return NULL;
452 }
453 }
454 jq->running_jobs->append(je);
455
456 /* Attach jcr to this thread while we run the job */
457 jcr->my_thread_id = pthread_self();
458 jcr->set_killable(true);
459 set_jcr_in_tsd(jcr);
460 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
461
462 /* Release job queue lock */
463 V(jq->mutex);
464
465 /* Call user's routine here */
466 Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
467 jcr->use_count(), jcr->JobStatus);
468 jq->engine(je->jcr);
469
470 /* Job finished detach from thread */
471 remove_jcr_from_tsd(je->jcr);
472 je->jcr->set_killable(false);
473
474 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
475 jcr->use_count());
476
477 /* Reacquire job queue lock */
478 P(jq->mutex);
479 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
480 jq->running_jobs->remove(je);
481 /*
482 * Release locks if acquired. Note, they will not have
483 * been acquired for jobs canceled before they were
484 * put into the ready queue.
485 */
486 if (jcr->acquired_resource_locks) {
487 int num;
488 dec_read_store(jcr);
489 dec_write_store(jcr);
490 update_client_numconcurrentjobs(jcr, -1);
491 num = jcr->job->getNumConcurrentJobs() - 1;
492 jcr->job->setNumConcurrentJobs(num);
493 jcr->acquired_resource_locks = false;
494 }
495
496 if (reschedule_job(jcr, jq, je)) {
497 continue; /* go look for more work */
498 }
499
500 /* Clean up and release old jcr */
501 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
502 jcr->SDJobStatus = 0;
503 V(jq->mutex); /* release internal lock */
504 free_jcr(jcr);
505 free(je); /* release job entry */
506 P(jq->mutex); /* reacquire job queue lock */
507 }
508 /*
509 * If any job in the wait queue can be run,
510 * move it to the ready queue
511 */
512 Dmsg0(2300, "Done check ready, now check wait queue.\n");
513 if (!jq->waiting_jobs->empty() && !jq->quit) {
514 int Priority;
515 bool running_allow_mix = false;
516 je = (jobq_item_t *)jq->waiting_jobs->first();
517 jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
518 if (re) {
519 Priority = re->jcr->JobPriority;
520 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
521 re->jcr->JobId, Priority);
522 running_allow_mix = true;
523 for ( ; re; ) {
524 Dmsg2(2300, "JobId %d is also running with %s\n",
525 re->jcr->JobId,
526 re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
527 if (!re->jcr->job->allow_mixed_priority) {
528 running_allow_mix = false;
529 break;
530 }
531 re = (jobq_item_t *)jq->running_jobs->next(re);
532 }
533 Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
534 running_allow_mix ? "allow" : "don't allow");
535 } else {
536 Priority = je->jcr->JobPriority;
537 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
538 }
539 /*
540 * Walk down the list of waiting jobs and attempt
541 * to acquire the resources it needs.
542 */
543 for ( ; je; ) {
544 /* je is current job item on the queue, jn is the next one */
545 JCR *jcr = je->jcr;
546 jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
547
548 Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
549 jcr->JobId, jcr->JobPriority, Priority,
550 jcr->job->allow_mixed_priority ? "mix" : "no mix");
551
552 /* Take only jobs of correct Priority */
553 if (!(jcr->JobPriority == Priority
554 || (jcr->JobPriority < Priority &&
555 jcr->job->allow_mixed_priority && running_allow_mix))) {
556 jcr->setJobStatus(JS_WaitPriority);
557 break;
558 }
559
560 if (!acquire_resources(jcr)) {
561 /* If resource conflict, job is canceled */
562 if (!job_canceled(jcr)) {
563 je = jn; /* point to next waiting job */
564 continue;
565 }
566 }
567
568 /*
569 * Got all locks, now remove it from wait queue and append it
570 * to the ready queue. Note, we may also get here if the
571 * job was canceled. Once it is "run", it will quickly
572 * terminate.
573 */
574 jq->waiting_jobs->remove(je);
575 jq->ready_jobs->append(je);
576 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
577 je = jn; /* Point to next waiting job */
578 } /* end for loop */
579
580 } /* end if */
581
582 Dmsg0(2300, "Done checking wait queue.\n");
583 /*
584 * If no more ready work and we are asked to quit, then do it
585 */
586 if (jq->ready_jobs->empty() && jq->quit) {
587 jq->num_workers--;
588 if (jq->num_workers == 0) {
589 Dmsg0(2300, "Wake up destroy routine\n");
590 /* Wake up destroy routine if he is waiting */
591 pthread_cond_broadcast(&jq->work);
592 }
593 break;
594 }
595 Dmsg0(2300, "Check for work request\n");
596 /*
597 * If no more work requests, and we waited long enough, quit
598 */
599 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
600 jq->ready_jobs->empty());
601 if (jq->ready_jobs->empty() && timedout) {
602 Dmsg0(2300, "break big loop\n");
603 jq->num_workers--;
604 break;
605 }
606
607 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
608 if (work) {
609 /*
610 * If a job is waiting on a Resource, don't consume all
611 * the CPU time looping looking for work, and even more
612 * important, release the lock so that a job that has
613 * terminated can give us the resource.
614 */
615 V(jq->mutex);
616 bmicrosleep(2, 0); /* pause for 2 seconds */
617 P(jq->mutex);
618 /* Recompute work as something may have changed in last 2 secs */
619 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
620 }
621 Dmsg1(2300, "Loop again. work=%d\n", work);
622 } /* end of big for loop */
623
624 Dmsg0(200, "unlock mutex\n");
625 V(jq->mutex);
626 Dmsg0(2300, "End jobq_server\n");
627 return NULL;
628 }
629
630 /*
631 * Returns true if cleanup done and we should look for more work
632 */
reschedule_job(JCR * jcr,jobq_t * jq,jobq_item_t * je)633 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
634 {
635 bool resched = false;
636 /*
637 * Reschedule the job if requested and possible
638 */
639 /* Basic condition is that more reschedule times remain */
640 if (jcr->job->RescheduleTimes == 0 ||
641 jcr->reschedule_count < jcr->job->RescheduleTimes) {
642
643 /* Check for incomplete jobs */
644 if (jcr->is_incomplete()) {
645 resched = (jcr->RescheduleIncompleteJobs && jcr->is_JobType(JT_BACKUP) &&
646 !(jcr->HasBase||jcr->is_JobLevel(L_BASE)));
647 } else {
648 /* Check for failed jobs */
649 resched = (jcr->job->RescheduleOnError &&
650 !jcr->is_JobStatus(JS_Terminated) &&
651 !jcr->is_JobStatus(JS_Canceled) &&
652 jcr->is_JobType(JT_BACKUP));
653 }
654 }
655 if (resched) {
656 char dt[50], dt2[50];
657
658 /*
659 * Reschedule this job by cleaning it up, but
660 * reuse the same JobId if possible.
661 */
662 jcr->rerunning = jcr->is_incomplete(); /* save incomplete status */
663 time_t now = time(NULL);
664 jcr->reschedule_count++;
665 jcr->sched_time = now + jcr->job->RescheduleInterval;
666 bstrftime(dt, sizeof(dt), now);
667 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
668 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
669 (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
670 Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
671 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
672 dird_free_jcr_pointers(jcr); /* partial cleanup old stuff */
673 jcr->JobStatus = -1;
674 jcr->setJobStatus(JS_WaitStartTime);
675 jcr->SDJobStatus = 0;
676 jcr->JobErrors = 0;
677 if (!allow_duplicate_job(jcr)) {
678 return false;
679 }
680 /* Only jobs with no output or Incomplete jobs can run on same JCR */
681 if (jcr->JobBytes == 0 || jcr->rerunning) {
682 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
683 V(jq->mutex);
684 /*
685 * Special test here since a Virtual Full gets marked
686 * as a Full, so we look at the resource record
687 */
688 if (jcr->wasVirtualFull) {
689 jcr->setJobLevel(L_VIRTUAL_FULL);
690 }
691 /*
692 * When we are using the same jcr then make sure to reset
693 * RealEndTime back to zero.
694 */
695 jcr->jr.RealEndTime = 0;
696 jobq_add(jq, jcr); /* queue the job to run again */
697 P(jq->mutex);
698 free_jcr(jcr); /* release jcr */
699 free(je); /* free the job entry */
700 return true; /* we already cleaned up */
701 }
702 /*
703 * Something was actually backed up, so we cannot reuse
704 * the old JobId or there will be database record
705 * conflicts. We now create a new job, copying the
706 * appropriate fields.
707 */
708 JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
709 set_jcr_defaults(njcr, jcr->job);
710 /*
711 * Eliminate the new job_end_push, then copy the one from
712 * the old job, and set the old one to be empty.
713 */
714 void *v;
715 lock_jobs(); /* protect ourself from reload_config() */
716 LockRes();
717 foreach_alist(v, (&jcr->job_end_push)) {
718 njcr->job_end_push.append(v);
719 }
720 jcr->job_end_push.destroy();
721 jcr->job_end_push.init(1, false);
722 UnlockRes();
723 unlock_jobs();
724
725 njcr->reschedule_count = jcr->reschedule_count;
726 njcr->sched_time = jcr->sched_time;
727 njcr->initial_sched_time = jcr->initial_sched_time;
728 /*
729 * Special test here since a Virtual Full gets marked
730 * as a Full, so we look at the resource record
731 */
732 if (jcr->wasVirtualFull) {
733 njcr->setJobLevel(L_VIRTUAL_FULL);
734 } else {
735 njcr->setJobLevel(jcr->getJobLevel());
736 }
737 njcr->pool = jcr->pool;
738 njcr->run_pool_override = jcr->run_pool_override;
739 njcr->next_pool = jcr->next_pool;
740 njcr->run_next_pool_override = jcr->run_next_pool_override;
741 njcr->full_pool = jcr->full_pool;
742 njcr->vfull_pool = jcr->vfull_pool;
743 njcr->run_full_pool_override = jcr->run_full_pool_override;
744 njcr->run_vfull_pool_override = jcr->run_vfull_pool_override;
745 njcr->inc_pool = jcr->inc_pool;
746 njcr->run_inc_pool_override = jcr->run_inc_pool_override;
747 njcr->diff_pool = jcr->diff_pool;
748 njcr->JobStatus = -1;
749 njcr->setJobStatus(jcr->JobStatus);
750 if (jcr->rstore) {
751 copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
752 } else {
753 free_rstorage(njcr);
754 }
755 if (jcr->wstore) {
756 copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
757 } else {
758 free_wstorage(njcr);
759 }
760 njcr->messages = jcr->messages;
761 njcr->spool_data = jcr->spool_data;
762 njcr->write_part_after_job = jcr->write_part_after_job;
763 Dmsg0(2300, "Call to run new job\n");
764 V(jq->mutex);
765 run_job(njcr); /* This creates a "new" job */
766 free_jcr(njcr); /* release "new" jcr */
767 P(jq->mutex);
768 Dmsg0(2300, "Back from running new job.\n");
769 }
770 return false;
771 }
772
773 /*
774 * See if we can acquire all the necessary resources for the job (JCR)
775 *
776 * Returns: true if successful
777 * false if resource failure
778 */
acquire_resources(JCR * jcr)779 static bool acquire_resources(JCR *jcr)
780 {
781 bool skip_this_jcr = false;
782
783 jcr->acquired_resource_locks = false;
784 /*
785 * Turning this code off is likely to cause some deadlocks,
786 * but we do not really have enough information here to
787 * know if this is really a deadlock (it may be a dual drive
788 * autochanger), and in principle, the SD reservation system
789 * should detect these deadlocks, so push the work off on it.
790 */
791 #ifdef xxx
792 if (jcr->rstore && jcr->rstore == jcr->wstore) { /* possible deadlock */
793 Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
794 " Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
795 jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
796 jcr->setJobStatus(JS_Canceled);
797 return false;
798 }
799 #endif
800 if (jcr->rstore) {
801 Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
802 if (!inc_read_store(jcr)) {
803 Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->getNumConcurrentJobs());
804 jcr->setJobStatus(JS_WaitStoreRes);
805 return false;
806 }
807 }
808
809 if (jcr->wstore) {
810 Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
811 int num = jcr->wstore->getNumConcurrentJobs();
812 if (num < jcr->wstore->MaxConcurrentJobs) {
813 Dmsg1(200, "Inc wncj=%d\n", num + 1);
814 jcr->wstore->setNumConcurrentJobs(num + 1);
815 } else if (jcr->rstore) {
816 dec_read_store(jcr);
817 skip_this_jcr = true;
818 } else {
819 Dmsg1(200, "Fail wncj=%d\n", num);
820 skip_this_jcr = true;
821 }
822 }
823 if (skip_this_jcr) {
824 jcr->setJobStatus(JS_WaitStoreRes);
825 return false;
826 }
827
828 if (jcr->client) {
829 if (jcr->client->getNumConcurrentJobs() < jcr->client->MaxConcurrentJobs) {
830 update_client_numconcurrentjobs(jcr, 1);
831 } else {
832 /* Back out previous locks */
833 dec_write_store(jcr);
834 dec_read_store(jcr);
835 jcr->setJobStatus(JS_WaitClientRes);
836 return false;
837 }
838 }
839 if (jcr->job->getNumConcurrentJobs() < jcr->job->MaxConcurrentJobs) {
840 int num;
841 num = jcr->job->getNumConcurrentJobs() + 1;
842 jcr->job->setNumConcurrentJobs(num);
843 } else {
844 /* Back out previous locks */
845 dec_write_store(jcr);
846 dec_read_store(jcr);
847 update_client_numconcurrentjobs(jcr, -1);
848 jcr->setJobStatus(JS_WaitJobRes);
849 return false;
850 }
851
852 jcr->acquired_resource_locks = true;
853 return true;
854 }
855
856 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
857
858 /*
859 * Note: inc_read_store() and dec_read_store() are
860 * called from select_rstore() in src/dird/restore.c
861 */
inc_read_store(JCR * jcr)862 bool inc_read_store(JCR *jcr)
863 {
864 P(rstore_mutex);
865 int num = jcr->rstore->getNumConcurrentJobs();
866 int numread = jcr->rstore->getNumConcurrentReadJobs();
867 int maxread = jcr->rstore->MaxConcurrentReadJobs;
868 if (num < jcr->rstore->MaxConcurrentJobs &&
869 (jcr->getJobType() == JT_RESTORE ||
870 numread == 0 ||
871 maxread == 0 || /* No limit set */
872 numread < maxread)) /* Below the limit */
873 {
874 num++;
875 numread++;
876 jcr->rstore->setNumConcurrentReadJobs(numread);
877 jcr->rstore->setNumConcurrentJobs(num);
878 Dmsg1(200, "Inc rncj=%d\n", num);
879 V(rstore_mutex);
880 return true;
881 }
882 V(rstore_mutex);
883 return false;
884 }
885
dec_read_store(JCR * jcr)886 void dec_read_store(JCR *jcr)
887 {
888 if (jcr->rstore) {
889 P(rstore_mutex);
890 int numread = jcr->rstore->getNumConcurrentReadJobs() - 1;
891 int num = jcr->rstore->getNumConcurrentJobs() - 1;
892 jcr->rstore->setNumConcurrentReadJobs(numread);
893 jcr->rstore->setNumConcurrentJobs(num);
894 Dmsg1(200, "Dec rncj=%d\n", num);
895 V(rstore_mutex);
896 }
897 }
898
dec_write_store(JCR * jcr)899 static void dec_write_store(JCR *jcr)
900 {
901 if (jcr->wstore) {
902 int num = jcr->wstore->getNumConcurrentJobs() - 1;
903 Dmsg1(200, "Dec wncj=%d\n", num);
904 jcr->wstore->setNumConcurrentJobs(num);
905 }
906 }
907