1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5    Copyright (C) 2011-2012 Planets Communications B.V.
6    Copyright (C) 2013-2018 Bareos GmbH & Co. KG
7 
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12 
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    Affero General Public License for more details.
17 
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22 */
23 /**
24  * BAREOS job queue routines.
25  *
26  * This code consists of three queues, the waiting_jobs
27  * queue, where jobs are initially queued, the ready_jobs
28  * queue, where jobs are placed when all the resources are
29  * allocated and they can immediately be run, and the
30  * running queue where jobs are placed when they are
31  * running.
32  */
33 
34 #include "include/bareos.h"
35 #include "dird.h"
36 #include "dird/jcr_private.h"
37 #include "dird/job.h"
38 #include "dird/jobq.h"
39 #include "dird/storage.h"
40 #include "lib/berrno.h"
41 #include "lib/thread_specific_data.h"
42 
43 namespace directordaemon {
44 
45 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
46 
47 /* Forward referenced functions */
48 extern "C" void* jobq_server(void* arg);
49 extern "C" void* sched_wait(void* arg);
50 
51 static int StartServer(jobq_t* jq);
52 static bool AcquireResources(JobControlRecord* jcr);
53 static bool RescheduleJob(JobControlRecord* jcr, jobq_t* jq, jobq_item_t* je);
54 static bool IncClientConcurrency(JobControlRecord* jcr);
55 static void DecClientConcurrency(JobControlRecord* jcr);
56 static bool IncJobConcurrency(JobControlRecord* jcr);
57 static void DecJobConcurrency(JobControlRecord* jcr);
58 static bool IncWriteStore(JobControlRecord* jcr);
59 static void DecWriteStore(JobControlRecord* jcr);
60 
61 /*
62  * Initialize a job queue
63  *
64  * Returns: 0 on success
65  *          errno on failure
66  */
JobqInit(jobq_t * jq,int max_workers,void * (* engine)(void * arg))67 int JobqInit(jobq_t* jq, int max_workers, void* (*engine)(void* arg))
68 {
69   int status;
70   jobq_item_t* item = NULL;
71 
72   if ((status = pthread_attr_init(&jq->attr)) != 0) {
73     BErrNo be;
74     Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"),
75           be.bstrerror(status));
76     return status;
77   }
78   if ((status = pthread_attr_setdetachstate(&jq->attr,
79                                             PTHREAD_CREATE_DETACHED)) != 0) {
80     pthread_attr_destroy(&jq->attr);
81     return status;
82   }
83   if ((status = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
84     BErrNo be;
85     Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"),
86           be.bstrerror(status));
87     pthread_attr_destroy(&jq->attr);
88     return status;
89   }
90   if ((status = pthread_cond_init(&jq->work, NULL)) != 0) {
91     BErrNo be;
92     Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"),
93           be.bstrerror(status));
94     pthread_mutex_destroy(&jq->mutex);
95     pthread_attr_destroy(&jq->attr);
96     return status;
97   }
98   jq->quit = false;
99   jq->max_workers = max_workers; /* max threads to create */
100   jq->num_workers = 0;           /* no threads yet */
101   jq->engine = engine;           /* routine to run */
102   jq->valid = JOBQ_VALID;
103 
104   /*
105    * Initialize the job queues
106    */
107   jq->waiting_jobs = new dlist(item, &item->link);
108   jq->running_jobs = new dlist(item, &item->link);
109   jq->ready_jobs = new dlist(item, &item->link);
110 
111   return 0;
112 }
113 
114 /**
115  * Destroy the job queue
116  *
117  * Returns: 0 on success
118  *          errno on failure
119  */
JobqDestroy(jobq_t * jq)120 int JobqDestroy(jobq_t* jq)
121 {
122   int status, status1, status2;
123 
124   if (jq->valid != JOBQ_VALID) { return EINVAL; }
125   P(jq->mutex);
126   jq->valid = 0; /* prevent any more operations */
127 
128   /*
129    * If any threads are active, wake them
130    */
131   if (jq->num_workers > 0) {
132     jq->quit = true;
133     while (jq->num_workers > 0) {
134       if ((status = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
135         BErrNo be;
136         Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"),
137               be.bstrerror(status));
138         V(jq->mutex);
139         return status;
140       }
141     }
142   }
143   V(jq->mutex);
144   status = pthread_mutex_destroy(&jq->mutex);
145   status1 = pthread_cond_destroy(&jq->work);
146   status2 = pthread_attr_destroy(&jq->attr);
147   delete jq->waiting_jobs;
148   delete jq->running_jobs;
149   delete jq->ready_jobs;
150   return (status != 0 ? status : (status1 != 0 ? status1 : status2));
151 }
152 
153 struct wait_pkt {
154   JobControlRecord* jcr;
155   jobq_t* jq;
156 };
157 
158 /**
159  * Wait until schedule time arrives before starting. Normally
160  * this routine is only used for jobs started from the console
161  * for which the user explicitly specified a start time. Otherwise
162  * most jobs are put into the job queue only when their
163  * scheduled time arives.
164  */
sched_wait(void * arg)165 extern "C" void* sched_wait(void* arg)
166 {
167   JobControlRecord* jcr = ((wait_pkt*)arg)->jcr;
168   jobq_t* jq = ((wait_pkt*)arg)->jq;
169 
170   SetJcrInThreadSpecificData(nullptr);
171   Dmsg0(2300, "Enter sched_wait.\n");
172   free(arg);
173   time_t wtime = jcr->sched_time - time(NULL);
174   jcr->setJobStatus(JS_WaitStartTime);
175 
176   /*
177    * Wait until scheduled time arrives
178    */
179   if (wtime > 0) {
180     Jmsg(jcr, M_INFO, 0,
181          _("Job %s waiting %d seconds for scheduled start time.\n"), jcr->Job,
182          wtime);
183   }
184 
185   /*
186    * Check every 30 seconds if canceled
187    */
188   while (wtime > 0) {
189     Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", jcr->JobId,
190           wtime, jcr->UseCount());
191     if (wtime > 30) { wtime = 30; }
192     Bmicrosleep(wtime, 0);
193     if (JobCanceled(jcr)) { break; }
194     wtime = jcr->sched_time - time(NULL);
195   }
196   Dmsg1(200, "resched use=%d\n", jcr->UseCount());
197   JobqAdd(jq, jcr);
198   FreeJcr(jcr); /* we are done with jcr */
199   Dmsg0(2300, "Exit sched_wait\n");
200 
201   return NULL;
202 }
203 
204 /**
205  * Add a job to the queue
206  * jq is a queue that was created with jobq_init
207  */
JobqAdd(jobq_t * jq,JobControlRecord * jcr)208 int JobqAdd(jobq_t* jq, JobControlRecord* jcr)
209 {
210   int status;
211   jobq_item_t *item, *li;
212   bool inserted = false;
213   time_t wtime = jcr->sched_time - time(NULL);
214   pthread_t id;
215   wait_pkt* sched_pkt;
216 
217   if (!jcr->impl->term_wait_inited) {
218     /*
219      * Initialize termination condition variable
220      */
221     if ((status = pthread_cond_init(&jcr->impl->term_wait, NULL)) != 0) {
222       BErrNo be;
223       Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"),
224             be.bstrerror(status));
225       return status;
226     }
227     jcr->impl->term_wait_inited = true;
228   }
229 
230   Dmsg3(2300, "JobqAdd jobid=%d jcr=0x%x UseCount=%d\n", jcr->JobId, jcr,
231         jcr->UseCount());
232   if (jq->valid != JOBQ_VALID) {
233     Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
234     return EINVAL;
235   }
236 
237   jcr->IncUseCount(); /* mark jcr in use by us */
238   Dmsg3(2300, "JobqAdd jobid=%d jcr=0x%x UseCount=%d\n", jcr->JobId, jcr,
239         jcr->UseCount());
240   if (!JobCanceled(jcr) && wtime > 0) {
241     SetThreadConcurrency(jq->max_workers + 2);
242     sched_pkt = (wait_pkt*)malloc(sizeof(wait_pkt));
243     sched_pkt->jcr = jcr;
244     sched_pkt->jq = jq;
245     status = pthread_create(&id, &jq->attr, sched_wait, (void*)sched_pkt);
246     if (status != 0) { /* thread not created */
247       BErrNo be;
248       Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"),
249             be.bstrerror(status));
250     }
251     return status;
252   }
253 
254   P(jq->mutex);
255 
256   if ((item = (jobq_item_t*)malloc(sizeof(jobq_item_t))) == NULL) {
257     FreeJcr(jcr); /* release jcr */
258     return ENOMEM;
259   }
260   item->jcr = jcr;
261 
262   /*
263    * While waiting in a queue this job is not attached to a thread
264    */
265   SetJcrInThreadSpecificData(nullptr);
266   if (JobCanceled(jcr)) {
267     /*
268      * Add job to ready queue so that it is canceled quickly
269      */
270     jq->ready_jobs->prepend(item);
271     Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
272   } else {
273     /*
274      * Add this job to the wait queue in priority sorted order
275      */
276     foreach_dlist (li, jq->waiting_jobs) {
277       Dmsg2(2300, "waiting item jobid=%d priority=%d\n", li->jcr->JobId,
278             li->jcr->JobPriority);
279       if (li->jcr->JobPriority > jcr->JobPriority) {
280         jq->waiting_jobs->InsertBefore(item, li);
281         Dmsg2(2300, "InsertBefore jobid=%d before waiting job=%d\n",
282               li->jcr->JobId, jcr->JobId);
283         inserted = true;
284         break;
285       }
286     }
287 
288     /*
289      * If not jobs in wait queue, append it
290      */
291     if (!inserted) {
292       jq->waiting_jobs->append(item);
293       Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
294     }
295   }
296 
297   /*
298    * Ensure that at least one server looks at the queue.
299    */
300   status = StartServer(jq);
301 
302   V(jq->mutex);
303   Dmsg0(2300, "Return JobqAdd\n");
304   return status;
305 }
306 
307 /**
308  * Remove a job from the job queue. Used only by CancelJob().
309  *
310  * Note, it is "removed" from the job queue.
311  * If you want to cancel it, you need to provide some external means
312  * of doing so (e.g. pthread_kill()).
313  */
JobqRemove(jobq_t * jq,JobControlRecord * jcr)314 int JobqRemove(jobq_t* jq, JobControlRecord* jcr)
315 {
316   int status;
317   bool found = false;
318   jobq_item_t* item;
319 
320   Dmsg2(2300, "JobqRemove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
321   if (jq->valid != JOBQ_VALID) { return EINVAL; }
322 
323   P(jq->mutex);
324   foreach_dlist (item, jq->waiting_jobs) {
325     if (jcr == item->jcr) {
326       found = true;
327       break;
328     }
329   }
330   if (!found) {
331     V(jq->mutex);
332     Dmsg2(2300, "JobqRemove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId,
333           jcr);
334     return EINVAL;
335   }
336 
337   /*
338    * Move item to be the first on the list
339    */
340   jq->waiting_jobs->remove(item);
341   jq->ready_jobs->prepend(item);
342   Dmsg2(2300, "JobqRemove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId,
343         jcr);
344 
345   status = StartServer(jq);
346 
347   V(jq->mutex);
348   Dmsg0(2300, "Return JobqRemove\n");
349   return status;
350 }
351 
352 /**
353  * Start the server thread if it isn't already running
354  */
StartServer(jobq_t * jq)355 static int StartServer(jobq_t* jq)
356 {
357   int status = 0;
358   pthread_t id;
359 
360   if (jq->num_workers < jq->max_workers) {
361     Dmsg0(2300, "Create worker thread\n");
362     SetThreadConcurrency(jq->max_workers + 1);
363     if ((status = pthread_create(&id, &jq->attr, jobq_server, (void*)jq)) !=
364         0) {
365       BErrNo be;
366       Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"),
367             be.bstrerror(status));
368       return status;
369     }
370     jq->num_workers++;
371   }
372   return status;
373 }
374 
375 /**
376  * This is the worker thread that serves the job queue.
377  * When all the resources are acquired for the job,
378  * it will call the user's engine.
379  */
jobq_server(void * arg)380 extern "C" void* jobq_server(void* arg)
381 {
382   struct timespec timeout;
383   jobq_t* jq = (jobq_t*)arg;
384   jobq_item_t* je; /* job entry in queue */
385   int status;
386   bool timedout = false;
387   bool work = true;
388 
389   SetJcrInThreadSpecificData(nullptr);
390   Dmsg0(2300, "Start jobq_server\n");
391   P(jq->mutex);
392 
393   for (;;) {
394     struct timeval tv;
395     struct timezone tz;
396 
397     Dmsg0(2300, "Top of for loop\n");
398     if (!work && !jq->quit) {
399       gettimeofday(&tv, &tz);
400       timeout.tv_nsec = 0;
401       timeout.tv_sec = tv.tv_sec + 4;
402 
403       while (!jq->quit) {
404         /*
405          * Wait 4 seconds, then if no more work, exit
406          */
407         Dmsg0(2300, "pthread_cond_timedwait()\n");
408         status = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
409         if (status == ETIMEDOUT) {
410           Dmsg0(2300, "timedwait timedout.\n");
411           timedout = true;
412           break;
413         } else if (status != 0) {
414           /*
415            * This shouldn't happen
416            */
417           Dmsg0(2300, "This shouldn't happen\n");
418           jq->num_workers--;
419           V(jq->mutex);
420           return NULL;
421         }
422         break;
423       }
424     }
425 
426     /*
427      * If anything is in the ready queue, run it
428      */
429     Dmsg0(2300, "Checking ready queue.\n");
430     while (!jq->ready_jobs->empty() && !jq->quit) {
431       JobControlRecord* jcr;
432 
433       je = (jobq_item_t*)jq->ready_jobs->first();
434       jcr = je->jcr;
435       jq->ready_jobs->remove(je);
436       if (!jq->ready_jobs->empty()) {
437         Dmsg0(2300, "ready queue not empty start server\n");
438         if (StartServer(jq) != 0) {
439           jq->num_workers--;
440           V(jq->mutex);
441           return NULL;
442         }
443       }
444       jq->running_jobs->append(je);
445 
446       /*
447        * Attach jcr to this thread while we run the job
448        */
449       jcr->SetKillable(true);
450       SetJcrInThreadSpecificData(jcr);
451       Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
452 
453       /*
454        * Release job queue lock
455        */
456       V(jq->mutex);
457 
458       /*
459        * Call user's routine here
460        */
461       Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n",
462             jcr->JobId, jcr->UseCount(), jcr->JobStatus);
463       jq->engine(je->jcr);
464 
465       /*
466        * Job finished detach from thread
467        */
468       RemoveJcrFromThreadSpecificData(je->jcr);
469       je->jcr->SetKillable(false);
470 
471       Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
472             jcr->UseCount());
473 
474       /*
475        * Reacquire job queue lock
476        */
477       P(jq->mutex);
478       Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
479       jq->running_jobs->remove(je);
480 
481       /*
482        * Release locks if acquired. Note, they will not have
483        * been acquired for jobs canceled before they were put into the ready
484        * queue.
485        */
486       if (jcr->impl->acquired_resource_locks) {
487         DecReadStore(jcr);
488         DecWriteStore(jcr);
489         DecClientConcurrency(jcr);
490         DecJobConcurrency(jcr);
491         jcr->impl->acquired_resource_locks = false;
492       }
493 
494       if (RescheduleJob(jcr, jq, je)) { continue; /* go look for more work */ }
495 
496       /*
497        * Clean up and release old jcr
498        */
499       Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId,
500             jcr->UseCount());
501       jcr->impl->SDJobStatus = 0;
502       V(jq->mutex); /* release internal lock */
503       FreeJcr(jcr);
504       free(je);     /* release job entry */
505       P(jq->mutex); /* reacquire job queue lock */
506     }
507 
508     /*
509      * If any job in the wait queue can be run, move it to the ready queue
510      */
511     Dmsg0(2300, "Done check ready, now check wait queue.\n");
512     if (!jq->waiting_jobs->empty() && !jq->quit) {
513       int Priority;
514       bool running_allow_mix = false;
515       je = (jobq_item_t*)jq->waiting_jobs->first();
516       jobq_item_t* re = (jobq_item_t*)jq->running_jobs->first();
517       if (re) {
518         Priority = re->jcr->JobPriority;
519         Dmsg2(2300, "JobId %d is running. Look for pri=%d\n", re->jcr->JobId,
520               Priority);
521         running_allow_mix = true;
522 
523         for (; re;) {
524           Dmsg2(
525               2300, "JobId %d is also running with %s\n", re->jcr->JobId,
526               re->jcr->impl->res.job->allow_mixed_priority ? "mix" : "no mix");
527           if (!re->jcr->impl->res.job->allow_mixed_priority) {
528             running_allow_mix = false;
529             break;
530           }
531           re = (jobq_item_t*)jq->running_jobs->next(re);
532         }
533         Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
534               running_allow_mix ? "allow" : "don't allow");
535       } else {
536         Priority = je->jcr->JobPriority;
537         Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
538       }
539 
540       /*
541        * Walk down the list of waiting jobs and attempt to acquire the resources
542        * it needs.
543        */
544       for (; je;) {
545         /*
546          * je is current job item on the queue, jn is the next one
547          */
548         JobControlRecord* jcr = je->jcr;
549         jobq_item_t* jn = (jobq_item_t*)jq->waiting_jobs->next(je);
550 
551         Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n", jcr->JobId,
552               jcr->JobPriority, Priority,
553               jcr->impl->res.job->allow_mixed_priority ? "mix" : "no mix");
554 
555         /*
556          * Take only jobs of correct Priority
557          */
558         if (!(jcr->JobPriority == Priority ||
559               (jcr->JobPriority < Priority &&
560                jcr->impl->res.job->allow_mixed_priority &&
561                running_allow_mix))) {
562           jcr->setJobStatus(JS_WaitPriority);
563           break;
564         }
565 
566         if (!AcquireResources(jcr)) {
567           /*
568            * If resource conflict, job is canceled
569            */
570           if (!JobCanceled(jcr)) {
571             je = jn; /* point to next waiting job */
572             continue;
573           }
574         }
575 
576         /*
577          * Got all locks, now remove it from wait queue and append it
578          * to the ready queue.  Note, we may also get here if the
579          * job was canceled.  Once it is "run", it will quickly Terminate.
580          */
581         jq->waiting_jobs->remove(je);
582         jq->ready_jobs->append(je);
583         Dmsg1(2300, "moved JobId=%d from wait to ready queue\n",
584               je->jcr->JobId);
585         je = jn; /* Point to next waiting job */
586       }          /* end for loop */
587     }            /* end if */
588 
589     Dmsg0(2300, "Done checking wait queue.\n");
590 
591     /*
592      * If no more ready work and we are asked to quit, then do it
593      */
594     if (jq->ready_jobs->empty() && jq->quit) {
595       jq->num_workers--;
596       if (jq->num_workers == 0) {
597         Dmsg0(2300, "Wake up destroy routine\n");
598 
599         /*
600          * Wake up destroy routine if he is waiting
601          */
602         pthread_cond_broadcast(&jq->work);
603       }
604       break;
605     }
606 
607     Dmsg0(2300, "Check for work request\n");
608 
609     /*
610      * If no more work requests, and we waited long enough, quit
611      */
612     Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
613           jq->ready_jobs->empty());
614 
615     if (jq->ready_jobs->empty() && timedout) {
616       Dmsg0(2300, "break big loop\n");
617       jq->num_workers--;
618       break;
619     }
620 
621     work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
622     if (work) {
623       /*
624        * If a job is waiting on a Resource, don't consume all
625        * the CPU time looping looking for work, and even more
626        * important, release the lock so that a job that has
627        * terminated can give us the resource.
628        */
629       V(jq->mutex);
630       Bmicrosleep(2, 0); /* pause for 2 seconds */
631       P(jq->mutex);
632 
633       /*
634        * Recompute work as something may have changed in last 2 secs
635        */
636       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
637     }
638     Dmsg1(2300, "Loop again. work=%d\n", work);
639   } /* end of big for loop */
640 
641   Dmsg0(200, "unlock mutex\n");
642   V(jq->mutex);
643   Dmsg0(2300, "End jobq_server\n");
644 
645   return NULL;
646 }
647 
648 /**
649  * Returns true if cleanup done and we should look for more work
650  */
RescheduleJob(JobControlRecord * jcr,jobq_t * jq,jobq_item_t * je)651 static bool RescheduleJob(JobControlRecord* jcr, jobq_t* jq, jobq_item_t* je)
652 {
653   bool resched = false, retval = false;
654 
655   /*
656    * Reschedule the job if requested and possible
657    */
658 
659   /*
660    * Basic condition is that more reschedule times remain
661    */
662   if (jcr->impl->res.job->RescheduleTimes == 0 ||
663       jcr->impl->reschedule_count < jcr->impl->res.job->RescheduleTimes) {
664     resched =
665         /*
666          * Check for incomplete jobs
667          */
668         (jcr->impl->res.job->RescheduleIncompleteJobs && jcr->IsIncomplete() &&
669          jcr->is_JobType(JT_BACKUP) && !jcr->is_JobLevel(L_BASE)) ||
670         /*
671          * Check for failed jobs
672          */
673         (jcr->impl->res.job->RescheduleOnError && !jcr->IsTerminatedOk() &&
674          !jcr->is_JobStatus(JS_Canceled) && jcr->is_JobType(JT_BACKUP));
675   }
676 
677   if (resched) {
678     char dt[50], dt2[50];
679     time_t now;
680 
681     /*
682      * Reschedule this job by cleaning it up, but reuse the same JobId if
683      * possible.
684      */
685     now = time(NULL);
686     jcr->impl->reschedule_count++;
687     jcr->sched_time = now + jcr->impl->res.job->RescheduleInterval;
688     bstrftime(dt, sizeof(dt), now);
689     bstrftime(dt2, sizeof(dt2), jcr->sched_time);
690     Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n",
691           jcr->Job, (int)jcr->impl->res.job->RescheduleInterval, now,
692           jcr->sched_time);
693     Jmsg(jcr, M_INFO, 0,
694          _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
695          jcr->Job, dt, (int)jcr->impl->res.job->RescheduleInterval, dt2);
696     DirdFreeJcrPointers(jcr); /* partial cleanup old stuff */
697     jcr->JobStatus = -1;
698     jcr->impl->SDJobStatus = 0;
699     jcr->JobErrors = 0;
700     if (!AllowDuplicateJob(jcr)) { return false; }
701 
702     /*
703      * Only jobs with no output or Incomplete jobs can run on same
704      * JobControlRecord
705      */
706     if (jcr->JobBytes == 0) {
707       UpdateJobEnd(jcr, JS_WaitStartTime);
708       Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->UseCount());
709       V(jq->mutex);
710       jcr->impl->jr.RealEndTime = 0;
711       JobqAdd(jq, jcr); /* queue the job to run again */
712       P(jq->mutex);
713       FreeJcr(jcr);  /* release jcr */
714       free(je);      /* free the job entry */
715       retval = true; /* we already cleaned up */
716     } else {
717       JobControlRecord* njcr;
718 
719       /*
720        * Something was actually backed up, so we cannot reuse
721        * the old JobId or there will be database record
722        * conflicts.  We now create a new job, copying the
723        * appropriate fields.
724        */
725       jcr->setJobStatus(JS_WaitStartTime);
726       njcr = NewDirectorJcr();
727       SetJcrDefaults(njcr, jcr->impl->res.job);
728       njcr->impl->reschedule_count = jcr->impl->reschedule_count;
729       njcr->sched_time = jcr->sched_time;
730       njcr->initial_sched_time = jcr->initial_sched_time;
731 
732       njcr->setJobLevel(jcr->getJobLevel());
733       njcr->impl->res.pool = jcr->impl->res.pool;
734       njcr->impl->res.run_pool_override = jcr->impl->res.run_pool_override;
735       njcr->impl->res.full_pool = jcr->impl->res.full_pool;
736       njcr->impl->res.run_full_pool_override =
737           jcr->impl->res.run_full_pool_override;
738       njcr->impl->res.inc_pool = jcr->impl->res.inc_pool;
739       njcr->impl->res.run_inc_pool_override =
740           jcr->impl->res.run_inc_pool_override;
741       njcr->impl->res.diff_pool = jcr->impl->res.diff_pool;
742       njcr->impl->res.run_diff_pool_override =
743           jcr->impl->res.run_diff_pool_override;
744       njcr->impl->res.next_pool = jcr->impl->res.next_pool;
745       njcr->impl->res.run_next_pool_override =
746           jcr->impl->res.run_next_pool_override;
747       njcr->JobStatus = -1;
748       njcr->setJobStatus(jcr->JobStatus);
749       if (jcr->impl->res.read_storage) {
750         CopyRstorage(njcr, jcr->impl->res.read_storage_list,
751                      _("previous Job"));
752       } else {
753         FreeRstorage(njcr);
754       }
755       if (jcr->impl->res.write_storage) {
756         CopyWstorage(njcr, jcr->impl->res.write_storage_list,
757                      _("previous Job"));
758       } else {
759         FreeWstorage(njcr);
760       }
761       njcr->impl->res.messages = jcr->impl->res.messages;
762       njcr->impl->spool_data = jcr->impl->spool_data;
763       Dmsg0(2300, "Call to run new job\n");
764       V(jq->mutex);
765       RunJob(njcr);  /* This creates a "new" job */
766       FreeJcr(njcr); /* release "new" jcr */
767       P(jq->mutex);
768       Dmsg0(2300, "Back from running new job.\n");
769     }
770   }
771 
772   return retval;
773 }
774 
775 /**
776  * See if we can acquire all the necessary resources for the job
777  * (JobControlRecord)
778  *
779  *  Returns: true  if successful
780  *           false if resource failure
781  */
AcquireResources(JobControlRecord * jcr)782 static bool AcquireResources(JobControlRecord* jcr)
783 {
784   /*
785    * Set that we didn't acquire any resourse locks yet.
786    */
787   jcr->impl->acquired_resource_locks = false;
788 
789   /*
790    * Some Job Types are excluded from the client and storage concurrency
791    * as they have no interaction with the client or storage at all.
792    */
793   switch (jcr->getJobType()) {
794     case JT_MIGRATE:
795     case JT_COPY:
796     case JT_CONSOLIDATE:
797       /*
798        * Migration/Copy and Consolidation jobs are not counted for client
799        * concurrency as they do not touch the client at all
800        */
801       jcr->impl->IgnoreClientConcurrency = true;
802       Dmsg1(200, "Skipping migrate/copy Job %s for client concurrency\n",
803             jcr->Job);
804 
805       if (jcr->impl->MigrateJobId == 0) {
806         /*
807          * Migration/Copy control jobs are not counted for storage concurrency
808          * as they do not touch the storage at all
809          */
810         Dmsg1(200,
811               "Skipping migrate/copy Control Job %s for storage concurrency\n",
812               jcr->Job);
813         jcr->impl->IgnoreStorageConcurrency = true;
814       }
815       break;
816     default:
817       break;
818   }
819 
820   if (jcr->impl->res.read_storage) {
821     if (!IncReadStore(jcr)) {
822       jcr->setJobStatus(JS_WaitStoreRes);
823 
824       return false;
825     }
826   }
827 
828   if (jcr->impl->res.write_storage) {
829     if (!IncWriteStore(jcr)) {
830       DecReadStore(jcr);
831       jcr->setJobStatus(JS_WaitStoreRes);
832 
833       return false;
834     }
835   }
836 
837   if (!IncClientConcurrency(jcr)) {
838     /*
839      * Back out previous locks
840      */
841     DecWriteStore(jcr);
842     DecReadStore(jcr);
843     jcr->setJobStatus(JS_WaitClientRes);
844 
845     return false;
846   }
847 
848   if (!IncJobConcurrency(jcr)) {
849     /*
850      * Back out previous locks
851      */
852     DecWriteStore(jcr);
853     DecReadStore(jcr);
854     DecClientConcurrency(jcr);
855     jcr->setJobStatus(JS_WaitJobRes);
856 
857     return false;
858   }
859 
860   jcr->impl->acquired_resource_locks = true;
861 
862   return true;
863 }
864 
IncClientConcurrency(JobControlRecord * jcr)865 static bool IncClientConcurrency(JobControlRecord* jcr)
866 {
867   if (!jcr->impl->res.client || jcr->impl->IgnoreClientConcurrency) {
868     return true;
869   }
870 
871   P(mutex);
872   if (jcr->impl->res.client->rcs->NumConcurrentJobs <
873       jcr->impl->res.client->MaxConcurrentJobs) {
874     jcr->impl->res.client->rcs->NumConcurrentJobs++;
875     Dmsg2(50, "Inc Client=%s rncj=%d\n", jcr->impl->res.client->resource_name_,
876           jcr->impl->res.client->rcs->NumConcurrentJobs);
877     V(mutex);
878 
879     return true;
880   }
881 
882   V(mutex);
883 
884   return false;
885 }
886 
DecClientConcurrency(JobControlRecord * jcr)887 static void DecClientConcurrency(JobControlRecord* jcr)
888 {
889   if (jcr->impl->IgnoreClientConcurrency) { return; }
890 
891   P(mutex);
892   if (jcr->impl->res.client) {
893     jcr->impl->res.client->rcs->NumConcurrentJobs--;
894     Dmsg2(50, "Dec Client=%s rncj=%d\n", jcr->impl->res.client->resource_name_,
895           jcr->impl->res.client->rcs->NumConcurrentJobs);
896   }
897   V(mutex);
898 }
899 
IncJobConcurrency(JobControlRecord * jcr)900 static bool IncJobConcurrency(JobControlRecord* jcr)
901 {
902   P(mutex);
903   if (jcr->impl->res.job->rjs->NumConcurrentJobs <
904       jcr->impl->res.job->MaxConcurrentJobs) {
905     jcr->impl->res.job->rjs->NumConcurrentJobs++;
906     Dmsg2(50, "Inc Job=%s rncj=%d\n", jcr->impl->res.job->resource_name_,
907           jcr->impl->res.job->rjs->NumConcurrentJobs);
908     V(mutex);
909 
910     return true;
911   }
912 
913   V(mutex);
914 
915   return false;
916 }
917 
DecJobConcurrency(JobControlRecord * jcr)918 static void DecJobConcurrency(JobControlRecord* jcr)
919 {
920   P(mutex);
921   jcr->impl->res.job->rjs->NumConcurrentJobs--;
922   Dmsg2(50, "Dec Job=%s rncj=%d\n", jcr->impl->res.job->resource_name_,
923         jcr->impl->res.job->rjs->NumConcurrentJobs);
924   V(mutex);
925 }
926 
927 /**
928  * Note: IncReadStore() and DecReadStore() are
929  * called from SelectNextRstore() in src/dird/job.c
930  */
IncReadStore(JobControlRecord * jcr)931 bool IncReadStore(JobControlRecord* jcr)
932 {
933   if (jcr->impl->IgnoreStorageConcurrency) { return true; }
934 
935   P(mutex);
936   if (jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs <
937       jcr->impl->res.read_storage->MaxConcurrentJobs) {
938     jcr->impl->res.read_storage->runtime_storage_status
939         ->NumConcurrentReadJobs++;
940     jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs++;
941     Dmsg2(50, "Inc Rstore=%s rncj=%d\n",
942           jcr->impl->res.read_storage->resource_name_,
943           jcr->impl->res.read_storage->runtime_storage_status
944               ->NumConcurrentJobs);
945     V(mutex);
946 
947     return true;
948   }
949   V(mutex);
950 
951   Dmsg2(
952       50, "Fail to acquire Rstore=%s rncj=%d\n",
953       jcr->impl->res.read_storage->resource_name_,
954       jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs);
955 
956   return false;
957 }
958 
DecReadStore(JobControlRecord * jcr)959 void DecReadStore(JobControlRecord* jcr)
960 {
961   if (jcr->impl->res.read_storage && !jcr->impl->IgnoreStorageConcurrency) {
962     P(mutex);
963     jcr->impl->res.read_storage->runtime_storage_status
964         ->NumConcurrentReadJobs--;
965     jcr->impl->res.read_storage->runtime_storage_status->NumConcurrentJobs--;
966     Dmsg2(50, "Dec Rstore=%s rncj=%d\n",
967           jcr->impl->res.read_storage->resource_name_,
968           jcr->impl->res.read_storage->runtime_storage_status
969               ->NumConcurrentJobs);
970 
971     if (jcr->impl->res.read_storage->runtime_storage_status
972             ->NumConcurrentReadJobs < 0) {
973       Jmsg(jcr, M_FATAL, 0, _("NumConcurrentReadJobs Dec Rstore=%s rncj=%d\n"),
974            jcr->impl->res.read_storage->resource_name_,
975            jcr->impl->res.read_storage->runtime_storage_status
976                ->NumConcurrentReadJobs);
977     }
978 
979     if (jcr->impl->res.read_storage->runtime_storage_status
980             ->NumConcurrentJobs < 0) {
981       Jmsg(jcr, M_FATAL, 0, _("NumConcurrentJobs Dec Rstore=%s rncj=%d\n"),
982            jcr->impl->res.read_storage->resource_name_,
983            jcr->impl->res.read_storage->runtime_storage_status
984                ->NumConcurrentJobs);
985     }
986     V(mutex);
987   }
988 }
989 
IncWriteStore(JobControlRecord * jcr)990 static bool IncWriteStore(JobControlRecord* jcr)
991 {
992   if (jcr->impl->IgnoreStorageConcurrency) { return true; }
993 
994   P(mutex);
995   if (jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs <
996       jcr->impl->res.write_storage->MaxConcurrentJobs) {
997     jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs++;
998     Dmsg2(50, "Inc Wstore=%s wncj=%d\n",
999           jcr->impl->res.write_storage->resource_name_,
1000           jcr->impl->res.write_storage->runtime_storage_status
1001               ->NumConcurrentJobs);
1002     V(mutex);
1003 
1004     return true;
1005   }
1006   V(mutex);
1007 
1008   Dmsg2(
1009       50, "Fail to acquire Wstore=%s wncj=%d\n",
1010       jcr->impl->res.write_storage->resource_name_,
1011       jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs);
1012 
1013   return false;
1014 }
1015 
DecWriteStore(JobControlRecord * jcr)1016 static void DecWriteStore(JobControlRecord* jcr)
1017 {
1018   if (jcr->impl->res.write_storage && !jcr->impl->IgnoreStorageConcurrency) {
1019     P(mutex);
1020     jcr->impl->res.write_storage->runtime_storage_status->NumConcurrentJobs--;
1021     Dmsg2(50, "Dec Wstore=%s wncj=%d\n",
1022           jcr->impl->res.write_storage->resource_name_,
1023           jcr->impl->res.write_storage->runtime_storage_status
1024               ->NumConcurrentJobs);
1025 
1026     if (jcr->impl->res.write_storage->runtime_storage_status
1027             ->NumConcurrentJobs < 0) {
1028       Jmsg(jcr, M_FATAL, 0, _("NumConcurrentJobs Dec Wstore=%s wncj=%d\n"),
1029            jcr->impl->res.write_storage->resource_name_,
1030            jcr->impl->res.write_storage->runtime_storage_status
1031                ->NumConcurrentJobs);
1032     }
1033     V(mutex);
1034   }
1035 }
1036 } /* namespace directordaemon */
1037