1 /*
2 BAREOS® - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5 Copyright (C) 2011-2012 Planets Communications B.V.
6 Copyright (C) 2013-2018 Bareos GmbH & Co. KG
7
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
11 in the file LICENSE.
12
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 02110-1301, USA.
22 */
23 /**
24 * BAREOS job queue routines.
25 *
26 * This code consists of three queues, the waiting_jobs
27 * queue, where jobs are initially queued, the ready_jobs
28 * queue, where jobs are placed when all the resources are
29 * allocated and they can immediately be run, and the
30 * running queue where jobs are placed when they are
31 * running.
32 */
33
34 #include "include/bareos.h"
35 #include "dird.h"
36 #include "dird/jcr_private.h"
37 #include "dird/job.h"
38 #include "dird/jobq.h"
39 #include "dird/storage.h"
40 #include "lib/berrno.h"
41 #include "lib/thread_specific_data.h"
42
43 namespace directordaemon {
44
45 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
46
47 /* Forward referenced functions */
48 extern "C" void* jobq_server(void* arg);
49 extern "C" void* sched_wait(void* arg);
50
51 static int StartServer(jobq_t* jq);
52 static bool AcquireResources(JobControlRecord* jcr);
53 static bool RescheduleJob(JobControlRecord* jcr, jobq_t* jq, jobq_item_t* je);
54 static bool IncClientConcurrency(JobControlRecord* jcr);
55 static void DecClientConcurrency(JobControlRecord* jcr);
56 static bool IncJobConcurrency(JobControlRecord* jcr);
57 static void DecJobConcurrency(JobControlRecord* jcr);
58 static bool IncWriteStore(JobControlRecord* jcr);
59 static void DecWriteStore(JobControlRecord* jcr);
60
61 /*
62 * Initialize a job queue
63 *
64 * Returns: 0 on success
65 * errno on failure
66 */
JobqInit(jobq_t * jq,int max_workers,void * (* engine)(void * arg))67 int JobqInit(jobq_t* jq, int max_workers, void* (*engine)(void* arg))
68 {
69 int status;
70 jobq_item_t* item = NULL;
71
72 if ((status = pthread_attr_init(&jq->attr)) != 0) {
73 BErrNo be;
74 Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"),
75 be.bstrerror(status));
76 return status;
77 }
78 if ((status = pthread_attr_setdetachstate(&jq->attr,
79 PTHREAD_CREATE_DETACHED)) != 0) {
80 pthread_attr_destroy(&jq->attr);
81 return status;
82 }
83 if ((status = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
84 BErrNo be;
85 Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"),
86 be.bstrerror(status));
87 pthread_attr_destroy(&jq->attr);
88 return status;
89 }
90 if ((status = pthread_cond_init(&jq->work, NULL)) != 0) {
91 BErrNo be;
92 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"),
93 be.bstrerror(status));
94 pthread_mutex_destroy(&jq->mutex);
95 pthread_attr_destroy(&jq->attr);
96 return status;
97 }
98 jq->quit = false;
99 jq->max_workers = max_workers; /* max threads to create */
100 jq->num_workers = 0; /* no threads yet */
101 jq->engine = engine; /* routine to run */
102 jq->valid = JOBQ_VALID;
103
104 /*
105 * Initialize the job queues
106 */
107 jq->waiting_jobs = new dlist(item, &item->link);
108 jq->running_jobs = new dlist(item, &item->link);
109 jq->ready_jobs = new dlist(item, &item->link);
110
111 return 0;
112 }
113
114 /**
115 * Destroy the job queue
116 *
117 * Returns: 0 on success
118 * errno on failure
119 */
JobqDestroy(jobq_t * jq)120 int JobqDestroy(jobq_t* jq)
121 {
122 int status, status1, status2;
123
124 if (jq->valid != JOBQ_VALID) { return EINVAL; }
125 P(jq->mutex);
126 jq->valid = 0; /* prevent any more operations */
127
128 /*
129 * If any threads are active, wake them
130 */
131 if (jq->num_workers > 0) {
132 jq->quit = true;
133 while (jq->num_workers > 0) {
134 if ((status = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
135 BErrNo be;
136 Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"),
137 be.bstrerror(status));
138 V(jq->mutex);
139 return status;
140 }
141 }
142 }
143 V(jq->mutex);
144 status = pthread_mutex_destroy(&jq->mutex);
145 status1 = pthread_cond_destroy(&jq->work);
146 status2 = pthread_attr_destroy(&jq->attr);
147 delete jq->waiting_jobs;
148 delete jq->running_jobs;
149 delete jq->ready_jobs;
150 return (status != 0 ? status : (status1 != 0 ? status1 : status2));
151 }
152
153 struct wait_pkt {
154 JobControlRecord* jcr;
155 jobq_t* jq;
156 };
157
158 /**
159 * Wait until schedule time arrives before starting. Normally
160 * this routine is only used for jobs started from the console
161 * for which the user explicitly specified a start time. Otherwise
162 * most jobs are put into the job queue only when their
163 * scheduled time arives.
164 */
sched_wait(void * arg)165 extern "C" void* sched_wait(void* arg)
166 {
167 JobControlRecord* jcr = ((wait_pkt*)arg)->jcr;
168 jobq_t* jq = ((wait_pkt*)arg)->jq;
169
170 SetJcrInThreadSpecificData(nullptr);
171 Dmsg0(2300, "Enter sched_wait.\n");
172 free(arg);
173 time_t wtime = jcr->sched_time - time(NULL);
174 jcr->setJobStatus(JS_WaitStartTime);
175
176 /*
177 * Wait until scheduled time arrives
178 */
179 if (wtime > 0) {
180 Jmsg(jcr, M_INFO, 0,
181 _("Job %s waiting %d seconds for scheduled start time.\n"), jcr->Job,
182 wtime);
183 }
184
185 /*
186 * Check every 30 seconds if canceled
187 */
188 while (wtime > 0) {
189 Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", jcr->JobId,
190 wtime, jcr->UseCount());
191 if (wtime > 30) { wtime = 30; }
192 Bmicrosleep(wtime, 0);
193 if (JobCanceled(jcr)) { break; }
194 wtime = jcr->sched_time - time(NULL);
195 }
196 Dmsg1(200, "resched use=%d\n", jcr->UseCount());
197 JobqAdd(jq, jcr);
198 FreeJcr(jcr); /* we are done with jcr */
199 Dmsg0(2300, "Exit sched_wait\n");
200
201 return NULL;
202 }
203
204 /**
205 * Add a job to the queue
206 * jq is a queue that was created with jobq_init
207 */
JobqAdd(jobq_t * jq,JobControlRecord * jcr)208 int JobqAdd(jobq_t* jq, JobControlRecord* jcr)
209 {
210 int status;
211 jobq_item_t *item, *li;
212 bool inserted = false;
213 time_t wtime = jcr->sched_time - time(NULL);
214 pthread_t id;
215 wait_pkt* sched_pkt;
216
217 if (!jcr->impl->term_wait_inited) {
218 /*
219 * Initialize termination condition variable
220 */
221 if ((status = pthread_cond_init(&jcr->impl->term_wait, NULL)) != 0) {
222 BErrNo be;
223 Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"),
224 be.bstrerror(status));
225 return status;
226 }
227 jcr->impl->term_wait_inited = true;
228 }
229
230 Dmsg3(2300, "JobqAdd jobid=%d jcr=0x%x UseCount=%d\n", jcr->JobId, jcr,
231 jcr->UseCount());
232 if (jq->valid != JOBQ_VALID) {
233 Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
234 return EINVAL;
235 }
236
237 jcr->IncUseCount(); /* mark jcr in use by us */
238 Dmsg3(2300, "JobqAdd jobid=%d jcr=0x%x UseCount=%d\n", jcr->JobId, jcr,
239 jcr->UseCount());
240 if (!JobCanceled(jcr) && wtime > 0) {
241 SetThreadConcurrency(jq->max_workers + 2);
242 sched_pkt = (wait_pkt*)malloc(sizeof(wait_pkt));
243 sched_pkt->jcr = jcr;
244 sched_pkt->jq = jq;
245 status = pthread_create(&id, &jq->attr, sched_wait, (void*)sched_pkt);
246 if (status != 0) { /* thread not created */
247 BErrNo be;
248 Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"),
249 be.bstrerror(status));
250 }
251 return status;
252 }
253
254 P(jq->mutex);
255
256 if ((item = (jobq_item_t*)malloc(sizeof(jobq_item_t))) == NULL) {
257 FreeJcr(jcr); /* release jcr */
258 return ENOMEM;
259 }
260 item->jcr = jcr;
261
262 /*
263 * While waiting in a queue this job is not attached to a thread
264 */
265 SetJcrInThreadSpecificData(nullptr);
266 if (JobCanceled(jcr)) {
267 /*
268 * Add job to ready queue so that it is canceled quickly
269 */
270 jq->ready_jobs->prepend(item);
271 Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
272 } else {
273 /*
274 * Add this job to the wait queue in priority sorted order
275 */
276 foreach_dlist (li, jq->waiting_jobs) {
277 Dmsg2(2300, "waiting item jobid=%d priority=%d\n", li->jcr->JobId,
278 li->jcr->JobPriority);
279 if (li->jcr->JobPriority > jcr->JobPriority) {
280 jq->waiting_jobs->InsertBefore(item, li);
281 Dmsg2(2300, "InsertBefore jobid=%d before waiting job=%d\n",
282 li->jcr->JobId, jcr->JobId);
283 inserted = true;
284 break;
285 }
286 }
287
288 /*
289 * If not jobs in wait queue, append it
290 */
291 if (!inserted) {
292 jq->waiting_jobs->append(item);
293 Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
294 }
295 }
296
297 /*
298 * Ensure that at least one server looks at the queue.
299 */
300 status = StartServer(jq);
301
302 V(jq->mutex);
303 Dmsg0(2300, "Return JobqAdd\n");
304 return status;
305 }
306
307 /**
308 * Remove a job from the job queue. Used only by CancelJob().
309 *
310 * Note, it is "removed" from the job queue.
311 * If you want to cancel it, you need to provide some external means
312 * of doing so (e.g. pthread_kill()).
313 */
JobqRemove(jobq_t * jq,JobControlRecord * jcr)314 int JobqRemove(jobq_t* jq, JobControlRecord* jcr)
315 {
316 int status;
317 bool found = false;
318 jobq_item_t* item;
319
320 Dmsg2(2300, "JobqRemove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
321 if (jq->valid != JOBQ_VALID) { return EINVAL; }
322
323 P(jq->mutex);
324 foreach_dlist (item, jq->waiting_jobs) {
325 if (jcr == item->jcr) {
326 found = true;
327 break;
328 }
329 }
330 if (!found) {
331 V(jq->mutex);
332 Dmsg2(2300, "JobqRemove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId,
333 jcr);
334 return EINVAL;
335 }
336
337 /*
338 * Move item to be the first on the list
339 */
340 jq->waiting_jobs->remove(item);
341 jq->ready_jobs->prepend(item);
342 Dmsg2(2300, "JobqRemove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId,
343 jcr);
344
345 status = StartServer(jq);
346
347 V(jq->mutex);
348 Dmsg0(2300, "Return JobqRemove\n");
349 return status;
350 }
351
352 /**
353 * Start the server thread if it isn't already running
354 */
StartServer(jobq_t * jq)355 static int StartServer(jobq_t* jq)
356 {
357 int status = 0;
358 pthread_t id;
359
360 if (jq->num_workers < jq->max_workers) {
361 Dmsg0(2300, "Create worker thread\n");
362 SetThreadConcurrency(jq->max_workers + 1);
363 if ((status = pthread_create(&id, &jq->attr, jobq_server, (void*)jq)) !=
364 0) {
365 BErrNo be;
366 Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"),
367 be.bstrerror(status));
368 return status;
369 }
370 jq->num_workers++;
371 }
372 return status;
373 }
374
375 /**
376 * This is the worker thread that serves the job queue.
377 * When all the resources are acquired for the job,
378 * it will call the user's engine.
379 */
jobq_server(void * arg)380 extern "C" void* jobq_server(void* arg)
381 {
382 struct timespec timeout;
383 jobq_t* jq = (jobq_t*)arg;
384 jobq_item_t* je; /* job entry in queue */
385 int status;
386 bool timedout = false;
387 bool work = true;
388
389 SetJcrInThreadSpecificData(nullptr);
390 Dmsg0(2300, "Start jobq_server\n");
391 P(jq->mutex);
392
393 for (;;) {
394 struct timeval tv;
395 struct timezone tz;
396
397 Dmsg0(2300, "Top of for loop\n");
398 if (!work && !jq->quit) {
399 gettimeofday(&tv, &tz);
400 timeout.tv_nsec = 0;
401 timeout.tv_sec = tv.tv_sec + 4;
402
403 while (!jq->quit) {
404 /*
405 * Wait 4 seconds, then if no more work, exit
406 */
407 Dmsg0(2300, "pthread_cond_timedwait()\n");
408 status = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
409 if (status == ETIMEDOUT) {
410 Dmsg0(2300, "timedwait timedout.\n");
411 timedout = true;
412 break;
413 } else if (status != 0) {
414 /*
415 * This shouldn't happen
416 */
417 Dmsg0(2300, "This shouldn't happen\n");
418 jq->num_workers--;
419 V(jq->mutex);
420 return NULL;
421 }
422 break;
423 }
424 }
425
426 /*
427 * If anything is in the ready queue, run it
428 */
429 Dmsg0(2300, "Checking ready queue.\n");
430 while (!jq->ready_jobs->empty() && !jq->quit) {
431 JobControlRecord* jcr;
432
433 je = (jobq_item_t*)jq->ready_jobs->first();
434 jcr = je->jcr;
435 jq->ready_jobs->remove(je);
436 if (!jq->ready_jobs->empty()) {
437 Dmsg0(2300, "ready queue not empty start server\n");
438 if (StartServer(jq) != 0) {
439 jq->num_workers--;
440 V(jq->mutex);
441 return NULL;
442 }
443 }
444 jq->running_jobs->append(je);
445
446 /*
447 * Attach jcr to this thread while we run the job
448 */
449 jcr->SetKillable(true);
450 SetJcrInThreadSpecificData(jcr);
451 Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
452
453 /*
454 * Release job queue lock
455 */
456 V(jq->mutex);
457
458 /*
459 * Call user's routine here
460 */
461 Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n",
462 jcr->JobId, jcr->UseCount(), jcr->JobStatus);
463 jq->engine(je->jcr);
464
465 /*
466 * Job finished detach from thread
467 */
468 RemoveJcrFromThreadSpecificData(je->jcr);
469 je->jcr->SetKillable(false);
470
471 Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
472 jcr->UseCount());
473
474 /*
475 * Reacquire job queue lock
476 */
477 P(jq->mutex);
478 Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
479 jq->running_jobs->remove(je);
480
481 /*
482 * Release locks if acquired. Note, they will not have
483 * been acquired for jobs canceled before they were put into the ready
484 * queue.
485 */
486 if (jcr->impl->acquired_resource_locks) {
487 DecReadStore(jcr);
488 DecWriteStore(jcr);
489 DecClientConcurrency(jcr);
490 DecJobConcurrency(jcr);
491 jcr->impl->acquired_resource_locks = false;
492 }
493
494 if (RescheduleJob(jcr, jq, je)) { continue; /* go look for more work */ }
495
496 /*
497 * Clean up and release old jcr
498 */
499 Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId,
500 jcr->UseCount());
501 jcr->impl->SDJobStatus = 0;
502 V(jq->mutex); /* release internal lock */
503 FreeJcr(jcr);
504 free(je); /* release job entry */
505 P(jq->mutex); /* reacquire job queue lock */
506 }
507
508 /*
509 * If any job in the wait queue can be run, move it to the ready queue
510 */
511 Dmsg0(2300, "Done check ready, now check wait queue.\n");
512 if (!jq->waiting_jobs->empty() && !jq->quit) {
513 int Priority;
514 bool running_allow_mix = false;
515 je = (jobq_item_t*)jq->waiting_jobs->first();
516 jobq_item_t* re = (jobq_item_t*)jq->running_jobs->first();
517 if (re) {
518 Priority = re->jcr->JobPriority;
519 Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId,
520 Priority);
521 running_allow_mix = true;
522
523 for (; re;) {
524 Dmsg2(
525 2300, "JobId %d is also running with %s\n", re->jcr->JobId,
526 re->jcr->impl->res.job->allow_mixed_priority ? "mix" : "no mix");
527 if (!re->jcr->impl->res.job->allow_mixed_priority) {
528 running_allow_mix = false;
529 break;
530 }
531 re = (jobq_item_t*)jq->running_jobs->next(re);
532 }
533 Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
534 running_allow_mix ? "allow" : "don't allow");
535 } else {
536 Priority = je->jcr->JobPriority;
537 Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
538 }
539
540 /*
541 * Walk down the list of waiting jobs and attempt to acquire the resources
542 * it needs.
543 */
544 for (; je;) {
545 /*
546 * je is current job item on the queue, jn is the next one
547 */
548 JobControlRecord* jcr = je->jcr;
549 jobq_item_t* jn = (jobq_item_t*)jq->waiting_jobs->next(je);
550
551 Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n", jcr->JobId,
552 jcr->JobPriority, Priority,
553 jcr->impl->res.job->allow_mixed_priority ? "mix" : "no mix");
554
555 /*
556 * Take only jobs of correct Priority
557 */
558 if (!(jcr->JobPriority == Priority ||
559 (jcr->JobPriority < Priority &&
560 jcr->impl->res.job->allow_mixed_priority &&
561 running_allow_mix))) {
562 jcr->setJobStatus(JS_WaitPriority);
563 break;
564 }
565
566 if (!AcquireResources(jcr)) {
567 /*
568 * If resource conflict, job is canceled
569 */
570 if (!JobCanceled(jcr)) {
571 je = jn; /* point to next waiting job */
572 continue;
573 }
574 }
575
576 /*
577 * Got all locks, now remove it from wait queue and append it
578 * to the ready queue. Note, we may also get here if the
579 * job was canceled. Once it is "run", it will quickly Terminate.
580 */
581 jq->waiting_jobs->remove(je);
582 jq->ready_jobs->append(je);
583 Dmsg1(2300, "moved JobId=%d from wait to ready queue\n",
584 je->jcr->JobId);
585 je = jn; /* Point to next waiting job */
586 } /* end for loop */
587 } /* end if */
588
589 Dmsg0(2300, "Done checking wait queue.\n");
590
591 /*
592 * If no more ready work and we are asked to quit, then do it
593 */
594 if (jq->ready_jobs->empty() && jq->quit) {
595 jq->num_workers--;
596 if (jq->num_workers == 0) {
597 Dmsg0(2300, "Wake up destroy routine\n");
598
599 /*
600 * Wake up destroy routine if he is waiting
601 */
602 pthread_cond_broadcast(&jq->work);
603 }
604 break;
605 }
606
607 Dmsg0(2300, "Check for work request\n");
608
609 /*
610 * If no more work requests, and we waited long enough, quit
611 */
612 Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
613 jq->ready_jobs->empty());
614
615 if (jq->ready_jobs->empty() && timedout) {
616 Dmsg0(2300, "break big loop\n");
617 jq->num_workers--;
618 break;
619 }
620
621 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
622 if (work) {
623 /*
624 * If a job is waiting on a Resource, don't consume all
625 * the CPU time looping looking for work, and even more
626 * important, release the lock so that a job that has
627 * terminated can give us the resource.
628 */
629 V(jq->mutex);
630 Bmicrosleep(2, 0); /* pause for 2 seconds */
631 P(jq->mutex);
632
633 /*
634 * Recompute work as something may have changed in last 2 secs
635 */
636 work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
637 }
638 Dmsg1(2300, "Loop again. work=%d\n", work);
639 } /* end of big for loop */
640
641 Dmsg0(200, "unlock mutex\n");
642 V(jq->mutex);
643 Dmsg0(2300, "End jobq_server\n");
644
645 return NULL;
646 }
647
648 /**
649 * Returns true if cleanup done and we should look for more work
650 */
RescheduleJob(JobControlRecord * jcr,jobq_t * jq,jobq_item_t * je)651 static bool RescheduleJob(JobControlRecord* jcr, jobq_t* jq, jobq_item_t* je)
652 {
653 bool resched = false, retval = false;
654
655 /*
656 * Reschedule the job if requested and possible
657 */
658
659 /*
660 * Basic condition is that more reschedule times remain
661 */
662 if (jcr->impl->res.job->RescheduleTimes == 0 ||
663 jcr->impl->reschedule_count < jcr->impl->res.job->RescheduleTimes) {
664 resched =
665 /*
666 * Check for incomplete jobs
667 */
668 (jcr->impl->res.job->RescheduleIncompleteJobs && jcr->IsIncomplete() &&
669 jcr->is_JobType(JT_BACKUP) && !jcr->is_JobLevel(L_BASE)) ||
670 /*
671 * Check for failed jobs
672 */
673 (jcr->impl->res.job->RescheduleOnError && !jcr->IsTerminatedOk() &&
674 !jcr->is_JobStatus(JS_Canceled) && jcr->is_JobType(JT_BACKUP));
675 }
676
677 if (resched) {
678 char dt[50], dt2[50];
679 time_t now;
680
681 /*
682 * Reschedule this job by cleaning it up, but reuse the same JobId if
683 * possible.
684 */
685 now = time(NULL);
686 jcr->impl->reschedule_count++;
687 jcr->sched_time = now + jcr->impl->res.job->RescheduleInterval;
688 bstrftime(dt, sizeof(dt), now);
689 bstrftime(dt2, sizeof(dt2), jcr->sched_time);
690 Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n",
691 jcr->Job, (int)jcr->impl->res.job->RescheduleInterval, now,
692 jcr->sched_time);
693 Jmsg(jcr, M_INFO, 0,
694 _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
695 jcr->Job, dt, (int)jcr->impl->res.job->RescheduleInterval, dt2);
696 DirdFreeJcrPointers(jcr); /* partial cleanup old stuff */
697 jcr->JobStatus = -1;
698 jcr->impl->SDJobStatus = 0;
699 jcr->JobErrors = 0;
700 if (!AllowDuplicateJob(jcr)) { return false; }
701
702 /*
703 * Only jobs with no output or Incomplete jobs can run on same
704 * JobControlRecord
705 */
706 if (jcr->JobBytes == 0) {
707 UpdateJobEnd(jcr, JS_WaitStartTime);
708 Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->UseCount());
709 V(jq->mutex);
710 jcr->impl->jr.RealEndTime = 0;
711 JobqAdd(jq, jcr); /* queue the job to run again */
712 P(jq->mutex);
713 FreeJcr(jcr); /* release jcr */
714 free(je); /* free the job entry */
715 retval = true; /* we already cleaned up */
716 } else {
717 JobControlRecord* njcr;
718
719 /*
720 * Something was actually backed up, so we cannot reuse
721 * the old JobId or there will be database record
722 * conflicts. We now create a new job, copying the
723 * appropriate fields.
724 */
725 jcr->setJobStatus(JS_WaitStartTime);
726 njcr = NewDirectorJcr();
727 SetJcrDefaults(njcr, jcr->impl->res.job);
728 njcr->impl->reschedule_count = jcr->impl->reschedule_count;
729 njcr->sched_time = jcr->sched_time;
730 njcr->initial_sched_time = jcr->initial_sched_time;
731
732 njcr->setJobLevel(jcr->getJobLevel());
733 njcr->impl->res.pool = jcr->impl->res.pool;
734 njcr->impl->res.run_pool_override = jcr->impl->res.run_pool_override;
735 njcr->impl->res.full_pool = jcr->impl->res.full_pool;
736 njcr->impl->res.run_full_pool_override =
737 jcr->impl->res.run_full_pool_override;
738 njcr->impl->res.inc_pool = jcr->impl->res.inc_pool;
739 njcr->impl->res.run_inc_pool_override =
740 jcr->impl->res.run_inc_pool_override;
741 njcr->impl->res.diff_pool = jcr->impl->res.diff_pool;
742 njcr->impl->res.run_diff_pool_override =
743 jcr->impl->res.run_diff_pool_override;
744 njcr->impl->res.next_pool = jcr->impl->res.next_pool;
745 njcr->impl->res.run_next_pool_override =
746 jcr->impl->res.run_next_pool_override;
747 njcr->JobStatus = -1;
748 njcr->setJobStatus(jcr->JobStatus);
749 if (jcr->impl->res.read_storage) {
750 CopyRstorage(njcr, jcr->impl->res.read_storage_list,
751 _("previous Job"));
752 } else {
753 FreeRstorage(njcr);
754 }
755 if (jcr->impl->res.write_storage) {
756 CopyWstorage(njcr, jcr->impl->res.write_storage_list,
757 _("previous Job"));
758 } else {
759 FreeWstorage(njcr);
760 }
761 njcr->impl->res.messages = jcr->impl->res.messages;
762 njcr->impl->spool_data = jcr->impl->spool_data;
763 Dmsg0(2300, "Call to run new job\n");
764 V(jq->mutex);
765 RunJob(njcr); /* This creates a "new" job */
766 FreeJcr(njcr); /* release "new" jcr */
767 P(jq->mutex);
768 Dmsg0(2300, "Back from running new job.\n");
769 }
770 }
771
772 return retval;
773 }
774
775 /**
776 * See if we can acquire all the necessary resources for the job
777 * (JobControlRecord)
778 *
779 * Returns: true if successful
780 * false if resource failure
781 */
AcquireResources(JobControlRecord * jcr)782 static bool AcquireResources(JobControlRecord* jcr)
783 {
784 /*
785 * Set that we didn't acquire any resourse locks yet.
786 */
787 jcr->impl->acquired_resource_locks = false;
788
789 /*
790 * Some Job Types are excluded from the client and storage concurrency
791 * as they have no interaction with the client or storage at all.
792 */
793 switch (jcr->getJobType()) {
794 case JT_MIGRATE:
795 case JT_COPY:
796 case JT_CONSOLIDATE:
797 /*
798 * Migration/Copy and Consolidation jobs are not counted for client
799 * concurrency as they do not touch the client at all
800 */
801 jcr->impl->IgnoreClientConcurrency = true;
802 Dmsg1(200, "Skipping migrate/copy Job %s for client concurrency\n",
803 jcr->Job);
804
805 if (jcr->impl->MigrateJobId == 0) {
806 /*
807 * Migration/Copy control jobs are not counted for storage concurrency
808 * as they do not touch the storage at all
809 */
810 Dmsg1(200,
811 "Skipping migrate/copy Control Job %s for storage concurrency\n",
812 jcr->Job);
813 jcr->impl->IgnoreStorageConcurrency = true;
814 }
815 break;
816 default:
817 break;
818 }
819
820 if (jcr->impl->res.read_storage) {
821 if (!IncReadStore(jcr)) {
822 jcr->setJobStatus(JS_WaitStoreRes);
823
824 return false;
825 }
826 }
827
828 if (jcr->impl->res.write_storage) {
829 if (!IncWriteStore(jcr)) {
830 DecReadStore(jcr);
831 jcr->setJobStatus(JS_WaitStoreRes);
832
833 return false;
834 }
835 }
836
837 if (!IncClientConcurrency(jcr)) {
838 /*
839 * Back out previous locks
840 */
841 DecWriteStore(jcr);
842 DecReadStore(jcr);
843 jcr->setJobStatus(JS_WaitClientRes);
844
845 return false;
846 }
847
848 if (!IncJobConcurrency(jcr)) {
849 /*
850 * Back out previous locks
851 */
852 DecWriteStore(jcr);
853 DecReadStore(jcr);
854 DecClientConcurrency(jcr);
855 jcr->setJobStatus(JS_WaitJobRes);
856
857 return false;
858 }
859
860 jcr->impl->acquired_resource_locks = true;
861
862 return true;
863 }
864
IncClientConcurrency(JobControlRecord * jcr)865 static bool IncClientConcurrency(JobControlRecord* jcr)
866 {
867 if (!jcr->impl->res.client || jcr->impl->IgnoreClientConcurrency) {
868 return true;
869 }
870
871 P(mutex);
872 if (jcr->impl->res.client->rcs->NumConcurrentJobs <
873 jcr->impl->res.client->MaxConcurrentJobs) {
874 jcr->impl->res.client->rcs->NumConcurrentJobs++;
875 Dmsg2(50, "Inc Client=%s rncj=%d\n", jcr->impl->res.client->resource_name_,
876 jcr->impl->res.client->rcs->NumConcurrentJobs);
877 V(mutex);
878
879 return true;
880 }
881
882 V(mutex);
883
884 return false;
885 }
886
DecClientConcurrency(JobControlRecord * jcr)887 static void DecClientConcurrency(JobControlRecord* jcr)
888 {
889 if (jcr->impl->IgnoreClientConcurrency) { return; }
890
891 P(mutex);
892 if (jcr->impl->res.client) {
893 jcr->impl->res.client->rcs->NumConcurrentJobs--;
894 Dmsg2(50, "Dec Client=%s rncj=%d\n", jcr->impl->res.client->resource_name_,
895 jcr->impl->res.client->rcs->NumConcurrentJobs);
896 }
897 V(mutex);
898 }
899
IncJobConcurrency(JobControlRecord * jcr)900 static bool IncJobConcurrency(JobControlRecord* jcr)
901 {
902 P(mutex);
903 if (jcr->impl->res.job->rjs->NumConcurrentJobs <
904 jcr->impl->res.job->MaxConcurrentJobs) {
905 jcr->impl->res.job->rjs->NumConcurrentJobs++;
906 Dmsg2(50, "Inc Job=%s rncj=%d\n", jcr->impl->res.job->resource_name_,
907 jcr->impl->res.job->rjs->NumConcurrentJobs);
908 V(mutex);
909
910 return true;
911 }
912
913 V(mutex);
914
915 return false;
916 }
917
DecJobConcurrency(JobControlRecord * jcr)918 static void DecJobConcurrency(JobControlRecord* jcr)
919 {
920 P(mutex);
921 jcr->impl->res.job->rjs->NumConcurrentJobs--;
922 Dmsg2(50, "Dec Job=%s rncj=%d\n", jcr->impl->res.job->resource_name_,
923 jcr->impl->res.job->rjs->NumConcurrentJobs);
924 V(mutex);
925 }
926
927 /**
928 * Note: IncReadStore() and DecReadStore() are
929 * called from SelectNextRstore() in src/dird/job.c
930 */
IncReadStore(JobControlRecord * jcr)931 bool IncReadStore(JobControlRecord* jcr)
932 {
933 if (jcr->impl->IgnoreStorageConcurrency) { return true; }
934
935 P(mutex);
936 if (jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs <
937 jcr->impl->res.read_storage->MaxConcurrentJobs) {
938 jcr->impl->res.read_storage->runtime_storage_status
939 ->NumConcurrentReadJobs++;
940 jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs++;
941 Dmsg2(50, "Inc Rstore=%s rncj=%d\n",
942 jcr->impl->res.read_storage->resource_name_,
943 jcr->impl->res.read_storage->runtime_storage_status
944 ->NumConcurrentJobs);
945 V(mutex);
946
947 return true;
948 }
949 V(mutex);
950
951 Dmsg2(
952 50, "Fail to acquire Rstore=%s rncj=%d\n",
953 jcr->impl->res.read_storage->resource_name_,
954 jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs);
955
956 return false;
957 }
958
DecReadStore(JobControlRecord * jcr)959 void DecReadStore(JobControlRecord* jcr)
960 {
961 if (jcr->impl->res.read_storage && !jcr->impl->IgnoreStorageConcurrency) {
962 P(mutex);
963 jcr->impl->res.read_storage->runtime_storage_status
964 ->NumConcurrentReadJobs--;
965 jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs--;
966 Dmsg2(50, "Dec Rstore=%s rncj=%d\n",
967 jcr->impl->res.read_storage->resource_name_,
968 jcr->impl->res.read_storage->runtime_storage_status
969 ->NumConcurrentJobs);
970
971 if (jcr->impl->res.read_storage->runtime_storage_status
972 ->NumConcurrentReadJobs < 0) {
973 Jmsg(jcr, M_FATAL, 0, _("NumConcurrentReadJobs Dec Rstore=%s rncj=%d\n"),
974 jcr->impl->res.read_storage->resource_name_,
975 jcr->impl->res.read_storage->runtime_storage_status
976 ->NumConcurrentReadJobs);
977 }
978
979 if (jcr->impl->res.read_storage->runtime_storage_status
980 ->NumConcurrentJobs < 0) {
981 Jmsg(jcr, M_FATAL, 0, _("NumConcurrentJobs Dec Rstore=%s rncj=%d\n"),
982 jcr->impl->res.read_storage->resource_name_,
983 jcr->impl->res.read_storage->runtime_storage_status
984 ->NumConcurrentJobs);
985 }
986 V(mutex);
987 }
988 }
989
IncWriteStore(JobControlRecord * jcr)990 static bool IncWriteStore(JobControlRecord* jcr)
991 {
992 if (jcr->impl->IgnoreStorageConcurrency) { return true; }
993
994 P(mutex);
995 if (jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs <
996 jcr->impl->res.write_storage->MaxConcurrentJobs) {
997 jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs++;
998 Dmsg2(50, "Inc Wstore=%s wncj=%d\n",
999 jcr->impl->res.write_storage->resource_name_,
1000 jcr->impl->res.write_storage->runtime_storage_status
1001 ->NumConcurrentJobs);
1002 V(mutex);
1003
1004 return true;
1005 }
1006 V(mutex);
1007
1008 Dmsg2(
1009 50, "Fail to acquire Wstore=%s wncj=%d\n",
1010 jcr->impl->res.write_storage->resource_name_,
1011 jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs);
1012
1013 return false;
1014 }
1015
DecWriteStore(JobControlRecord * jcr)1016 static void DecWriteStore(JobControlRecord* jcr)
1017 {
1018 if (jcr->impl->res.write_storage && !jcr->impl->IgnoreStorageConcurrency) {
1019 P(mutex);
1020 jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs--;
1021 Dmsg2(50, "Dec Wstore=%s wncj=%d\n",
1022 jcr->impl->res.write_storage->resource_name_,
1023 jcr->impl->res.write_storage->runtime_storage_status
1024 ->NumConcurrentJobs);
1025
1026 if (jcr->impl->res.write_storage->runtime_storage_status
1027 ->NumConcurrentJobs < 0) {
1028 Jmsg(jcr, M_FATAL, 0, _("NumConcurrentJobs Dec Wstore=%s wncj=%d\n"),
1029 jcr->impl->res.write_storage->resource_name_,
1030 jcr->impl->res.write_storage->runtime_storage_status
1031 ->NumConcurrentJobs);
1032 }
1033 V(mutex);
1034 }
1035 }
1036 } /* namespace directordaemon */
1037