1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula job queue routines.
21  *
22  *  This code consists of three queues, the waiting_jobs
23  *  queue, where jobs are initially queued, the ready_jobs
24  *  queue, where jobs are placed when all the resources are
25  *  allocated and they can immediately be run, and the
26  *  running queue where jobs are placed when they are
27  *  running.
28  *
29  *  Kern Sibbald, July MMIII
30  *
31  *
32  *  This code was adapted from the Bacula workq, which was
33  *    adapted from "Programming with POSIX Threads", by
34  *    David R. Butenhof
35  *
36  */
37 
38 #include "bacula.h"
39 #include "dird.h"
40 
41 extern JCR *jobs;
42 
43 /* Forward referenced functions */
44 extern "C" void *jobq_server(void *arg);
45 extern "C" void *sched_wait(void *arg);
46 
47 static int  start_server(jobq_t *jq);
48 static bool acquire_resources(JCR *jcr);
49 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
50 static void dec_write_store(JCR *jcr);
51 
52 /*
53  * Initialize a job queue
54  *
55  *  Returns: 0 on success
56  *           errno on failure
57  */
jobq_init(jobq_t * jq,int threads,void * (* engine)(void * arg))58 int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg))
59 {
60    int stat;
61    jobq_item_t *item = NULL;
62 
63    if ((stat = pthread_attr_init(&jq->attr)) != 0) {
64       berrno be;
65       Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat));
66       return stat;
67    }
68    if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69       pthread_attr_destroy(&jq->attr);
70       return stat;
71    }
72    if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) {
73       berrno be;
74       Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat));
75       pthread_attr_destroy(&jq->attr);
76       return stat;
77    }
78    if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) {
79       berrno be;
80       Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat));
81       pthread_mutex_destroy(&jq->mutex);
82       pthread_attr_destroy(&jq->attr);
83       return stat;
84    }
85    jq->quit = false;
86    jq->max_workers = threads;         /* max threads to create */
87    jq->num_workers = 0;               /* no threads yet */
88    jq->idle_workers = 0;              /* no idle threads */
89    jq->engine = engine;               /* routine to run */
90    jq->valid = JOBQ_VALID;
91    /* Initialize the job queues */
92    jq->waiting_jobs = New(dlist(item, &item->link));
93    jq->running_jobs = New(dlist(item, &item->link));
94    jq->ready_jobs = New(dlist(item, &item->link));
95    return 0;
96 }
97 
98 /*
99  * Destroy the job queue
100  *
101  * Returns: 0 on success
102  *          errno on failure
103  */
jobq_destroy(jobq_t * jq)104 int jobq_destroy(jobq_t *jq)
105 {
106    int stat, stat1, stat2;
107 
108    if (jq->valid != JOBQ_VALID) {
109       return EINVAL;
110    }
111    P(jq->mutex);
112    jq->valid = 0;                      /* prevent any more operations */
113 
114    /*
115     * If any threads are active, wake them
116     */
117    if (jq->num_workers > 0) {
118       jq->quit = true;
119       if (jq->idle_workers) {
120          if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
121             berrno be;
122             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
123             V(jq->mutex);
124             return stat;
125          }
126       }
127       while (jq->num_workers > 0) {
128          if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
129             berrno be;
130             Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
131             V(jq->mutex);
132             return stat;
133          }
134       }
135    }
136    V(jq->mutex);
137    stat  = pthread_mutex_destroy(&jq->mutex);
138    stat1 = pthread_cond_destroy(&jq->work);
139    stat2 = pthread_attr_destroy(&jq->attr);
140    delete jq->waiting_jobs;
141    delete jq->running_jobs;
142    delete jq->ready_jobs;
143    return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
144 }
145 
146 struct wait_pkt {
147    JCR *jcr;
148    jobq_t *jq;
149 };
150 
151 /*
152  * Wait until schedule time arrives before starting. Normally
153  *  this routine is only used for jobs started from the console
154  *  for which the user explicitly specified a start time. Otherwise
155  *  most jobs are put into the job queue only when their
156  *  scheduled time arrives.
157  */
158 extern "C"
sched_wait(void * arg)159 void *sched_wait(void *arg)
160 {
161    JCR *jcr = ((wait_pkt *)arg)->jcr;
162    jobq_t *jq = ((wait_pkt *)arg)->jq;
163 
164    set_jcr_in_tsd(INVALID_JCR);
165    Dmsg0(2300, "Enter sched_wait.\n");
166    free(arg);
167    time_t wtime = jcr->sched_time - time(NULL);
168    jcr->setJobStatus(JS_WaitStartTime);
169    /* Wait until scheduled time arrives */
170    if (wtime > 0) {
171       Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"),
172          jcr->Job, wtime);
173    }
174    /* Check every 30 seconds if canceled */
175    while (wtime > 0) {
176       Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n",
177          jcr->JobId, wtime, jcr->use_count());
178       if (wtime > 30) {
179          wtime = 30;
180       }
181       bmicrosleep(wtime, 0);
182       if (job_canceled(jcr)) {
183          break;
184       }
185       wtime = jcr->sched_time - time(NULL);
186    }
187    Dmsg1(200, "resched use=%d\n", jcr->use_count());
188    jobq_add(jq, jcr);
189    free_jcr(jcr);                     /* we are done with jcr */
190    Dmsg0(2300, "Exit sched_wait\n");
191    return NULL;
192 }
193 
194 /* Procedure to update the client->NumConcurrentJobs */
update_client_numconcurrentjobs(JCR * jcr,int val)195 static void update_client_numconcurrentjobs(JCR *jcr, int val)
196 {
197    if (!jcr->client) {
198       return;
199    }
200 
201    switch (jcr->getJobType())
202    {
203    case JT_MIGRATE:
204    case JT_COPY:
205    case JT_ADMIN:
206       break;
207    case JT_BACKUP:
208    /* Fall through wanted */
209    default:
210       if (jcr->no_client_used() || jcr->wasVirtualFull) {
211          break;
212       }
213       jcr->client->incNumConcurrentJobs(val);
214       break;
215    }
216 }
217 
218 /*
219  *  Add a job to the queue
220  *    jq is a queue that was created with jobq_init
221  */
jobq_add(jobq_t * jq,JCR * jcr)222 int jobq_add(jobq_t *jq, JCR *jcr)
223 {
224    int stat;
225    jobq_item_t *item, *li;
226    bool inserted = false;
227    time_t wtime = jcr->sched_time - time(NULL);
228    pthread_t id;
229    wait_pkt *sched_pkt;
230 
231    if (!jcr->term_wait_inited) {
232       /* Initialize termination condition variable */
233       if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
234          berrno be;
235          Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat));
236          return stat;
237       }
238       jcr->term_wait_inited = true;
239    }
240 
241    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
242    if (jq->valid != JOBQ_VALID) {
243       Jmsg0(jcr, M_ERROR, 0, "Jobq_add queue not initialized.\n");
244       return EINVAL;
245    }
246 
247    jcr->inc_use_count();                 /* mark jcr in use by us */
248    Dmsg3(2300, "jobq_add jobid=%d jcr=0x%x use_count=%d\n", jcr->JobId, jcr, jcr->use_count());
249    if (!job_canceled(jcr) && wtime > 0) {
250       set_thread_concurrency(jq->max_workers + 2);
251       sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
252       sched_pkt->jcr = jcr;
253       sched_pkt->jq = jq;
254       stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
255       if (stat != 0) {                /* thread not created */
256          berrno be;
257          Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat));
258       }
259       return stat;
260    }
261 
262    P(jq->mutex);
263 
264    if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
265       free_jcr(jcr);                    /* release jcr */
266       return ENOMEM;
267    }
268    item->jcr = jcr;
269 
270    /* While waiting in a queue this job is not attached to a thread */
271    set_jcr_in_tsd(INVALID_JCR);
272    if (job_canceled(jcr)) {
273       /* Add job to ready queue so that it is canceled quickly */
274       jq->ready_jobs->prepend(item);
275       Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId);
276    } else {
277       /* Add this job to the wait queue in priority sorted order */
278       foreach_dlist(li, jq->waiting_jobs) {
279          Dmsg2(2300, "waiting item jobid=%d priority=%d\n",
280             li->jcr->JobId, li->jcr->JobPriority);
281          if (li->jcr->JobPriority > jcr->JobPriority) {
282             jq->waiting_jobs->insert_before(item, li);
283             Dmsg2(2300, "insert_before jobid=%d before waiting job=%d\n",
284                li->jcr->JobId, jcr->JobId);
285             inserted = true;
286             break;
287          }
288       }
289       /* If not jobs in wait queue, append it */
290       if (!inserted) {
291          jq->waiting_jobs->append(item);
292          Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId);
293       }
294    }
295 
296    /* Ensure that at least one server looks at the queue. */
297    stat = start_server(jq);
298 
299    V(jq->mutex);
300    Dmsg0(2300, "Return jobq_add\n");
301    return stat;
302 }
303 
304 /*
305  *  Remove a job from the job queue. Used only by cancel_job().
306  *    jq is a queue that was created with jobq_init
307  *    work_item is an element of work
308  *
309  *   Note, it is "removed" from the job queue.
310  *    If you want to cancel it, you need to provide some external means
311  *    of doing so (e.g. pthread_kill()).
312  */
jobq_remove(jobq_t * jq,JCR * jcr)313 int jobq_remove(jobq_t *jq, JCR *jcr)
314 {
315    int stat;
316    bool found = false;
317    jobq_item_t *item;
318 
319    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x\n", jcr->JobId, jcr);
320    if (jq->valid != JOBQ_VALID) {
321       return EINVAL;
322    }
323 
324    P(jq->mutex);
325    foreach_dlist(item, jq->waiting_jobs) {
326       if (jcr == item->jcr) {
327          found = true;
328          break;
329       }
330    }
331    if (!found) {
332       V(jq->mutex);
333       Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
334       return EINVAL;
335    }
336 
337    /* Move item to be the first on the list */
338    jq->waiting_jobs->remove(item);
339    jq->ready_jobs->prepend(item);
340    Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr);
341 
342    stat = start_server(jq);
343 
344    V(jq->mutex);
345    Dmsg0(2300, "Return jobq_remove\n");
346    return stat;
347 }
348 
349 
350 /*
351  * Start the server thread if it isn't already running
352  */
start_server(jobq_t * jq)353 static int start_server(jobq_t *jq)
354 {
355    int stat = 0;
356    pthread_t id;
357 
358    /*
359     * if any threads are idle, wake one.
360     *   Actually we do a broadcast because on /lib/tls
361     *   these signals seem to get lost from time to time.
362     */
363    if (jq->idle_workers > 0) {
364       Dmsg0(2300, "Signal worker to wake up\n");
365       if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
366          berrno be;
367          Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat));
368          return stat;
369       }
370    } else if (jq->num_workers < jq->max_workers) {
371       Dmsg0(2300, "Create worker thread\n");
372       /* No idle threads so create a new one */
373       set_thread_concurrency(jq->max_workers + 1);
374       jq->num_workers++;
375       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
376          berrno be;
377          jq->num_workers--;
378          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
379          return stat;
380       }
381    }
382    return stat;
383 }
384 
385 
386 /*
387  * This is the worker thread that serves the job queue.
388  * When all the resources are acquired for the job,
389  *  it will call the user's engine.
390  */
391 extern "C"
jobq_server(void * arg)392 void *jobq_server(void *arg)
393 {
394    struct timespec timeout;
395    jobq_t *jq = (jobq_t *)arg;
396    jobq_item_t *je;                   /* job entry in queue */
397    int stat;
398    bool timedout = false;
399    bool work = true;
400 
401    set_jcr_in_tsd(INVALID_JCR);
402    Dmsg0(2300, "Start jobq_server\n");
403    P(jq->mutex);
404 
405    for (;;) {
406       struct timeval tv;
407       struct timezone tz;
408 
409       Dmsg0(2300, "Top of for loop\n");
410       if (!work && !jq->quit) {
411          gettimeofday(&tv, &tz);
412          timeout.tv_nsec = 0;
413          timeout.tv_sec = tv.tv_sec + 4;
414 
415          while (!jq->quit) {
416             /*
417              * Wait 4 seconds, then if no more work, exit
418              */
419             Dmsg0(2300, "pthread_cond_timedwait()\n");
420             stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
421             if (stat == ETIMEDOUT) {
422                Dmsg0(2300, "timedwait timedout.\n");
423                timedout = true;
424                break;
425             } else if (stat != 0) {
426                /* This shouldn't happen */
427                Dmsg0(2300, "This shouldn't happen\n");
428                jq->num_workers--;
429                V(jq->mutex);
430                return NULL;
431             }
432             break;
433          }
434       }
435       /*
436        * If anything is in the ready queue, run it
437        */
438       Dmsg0(2300, "Checking ready queue.\n");
439       while (!jq->ready_jobs->empty() && !jq->quit) {
440          JCR *jcr;
441          je = (jobq_item_t *)jq->ready_jobs->first();
442          jcr = je->jcr;
443          jq->ready_jobs->remove(je);
444          if (!jq->ready_jobs->empty()) {
445             Dmsg0(2300, "ready queue not empty start server\n");
446             if (start_server(jq) != 0) {
447                jq->num_workers--;
448                V(jq->mutex);
449                return NULL;
450             }
451          }
452          jq->running_jobs->append(je);
453 
454          /* Attach jcr to this thread while we run the job */
455          jcr->my_thread_id = pthread_self();
456          jcr->set_killable(true);
457          set_jcr_in_tsd(jcr);
458          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
459 
460          /* Release job queue lock */
461          V(jq->mutex);
462 
463          /* Call user's routine here */
464          Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
465             jcr->use_count(), jcr->JobStatus);
466          jq->engine(je->jcr);
467 
468          /* Job finished detach from thread */
469          remove_jcr_from_tsd(je->jcr);
470          je->jcr->set_killable(false);
471 
472          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
473             jcr->use_count());
474 
475          /* Reacquire job queue lock */
476          P(jq->mutex);
477          Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
478          jq->running_jobs->remove(je);
479          /*
480           * Release locks if acquired. Note, they will not have
481           *  been acquired for jobs canceled before they were
482           *  put into the ready queue.
483           */
484          if (jcr->acquired_resource_locks) {
485             dec_read_store(jcr);
486             dec_write_store(jcr);
487             update_client_numconcurrentjobs(jcr, -1);
488             jcr->job->incNumConcurrentJobs(-1);
489             jcr->acquired_resource_locks = false;
490          }
491 
492          if (reschedule_job(jcr, jq, je)) {
493             continue;              /* go look for more work */
494          }
495 
496          /* Clean up and release old jcr */
497          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
498          jcr->SDJobStatus = 0;
499          V(jq->mutex);                /* release internal lock */
500          free_jcr(jcr);
501          free(je);                    /* release job entry */
502          P(jq->mutex);                /* reacquire job queue lock */
503       }
504       /*
505        * If any job in the wait queue can be run,
506        *  move it to the ready queue
507        */
508       Dmsg0(2300, "Done check ready, now check wait queue.\n");
509       if (!jq->waiting_jobs->empty() && !jq->quit) {
510          int Priority;
511          bool running_allow_mix = false;
512          je = (jobq_item_t *)jq->waiting_jobs->first();
513          jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
514          if (re) {
515             Priority = re->jcr->JobPriority;
516             Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
517                   re->jcr->JobId, Priority);
518             running_allow_mix = true;
519             for ( ; re; ) {
520                Dmsg2(2300, "JobId %d is also running with %s\n",
521                      re->jcr->JobId,
522                      re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
523                if (!re->jcr->job->allow_mixed_priority) {
524                   running_allow_mix = false;
525                   break;
526                }
527                re = (jobq_item_t *)jq->running_jobs->next(re);
528             }
529             Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
530                   running_allow_mix ? "allow" : "don't allow");
531          } else {
532             Priority = je->jcr->JobPriority;
533             Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
534          }
535          /*
536           * Walk down the list of waiting jobs and attempt
537           *   to acquire the resources it needs.
538           */
539          for ( ; je;  ) {
540             /* je is current job item on the queue, jn is the next one */
541             JCR *jcr = je->jcr;
542             jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
543 
544             Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
545                   jcr->JobId, jcr->JobPriority, Priority,
546                   jcr->job->allow_mixed_priority ? "mix" : "no mix");
547 
548             /* Take only jobs of correct Priority */
549             if (!(jcr->JobPriority == Priority
550                   || (jcr->JobPriority < Priority &&
551                       jcr->job->allow_mixed_priority && running_allow_mix))) {
552                jcr->setJobStatus(JS_WaitPriority);
553                break;
554             }
555 
556             if (!acquire_resources(jcr)) {
557                /* If resource conflict, job is canceled */
558                if (!job_canceled(jcr)) {
559                   je = jn;            /* point to next waiting job */
560                   continue;
561                }
562             }
563 
564             /*
565              * Got all locks, now remove it from wait queue and append it
566              *   to the ready queue.  Note, we may also get here if the
567              *    job was canceled.  Once it is "run", it will quickly
568              *    terminate.
569              */
570             jq->waiting_jobs->remove(je);
571             jq->ready_jobs->append(je);
572             Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
573             je = jn;                  /* Point to next waiting job */
574          } /* end for loop */
575 
576       } /* end if */
577 
578       Dmsg0(2300, "Done checking wait queue.\n");
579       /*
580        * If no more ready work and we are asked to quit, then do it
581        */
582       if (jq->ready_jobs->empty() && jq->quit) {
583          jq->num_workers--;
584          if (jq->num_workers == 0) {
585             Dmsg0(2300, "Wake up destroy routine\n");
586             /* Wake up destroy routine if he is waiting */
587             pthread_cond_broadcast(&jq->work);
588          }
589          break;
590       }
591       Dmsg0(2300, "Check for work request\n");
592       /*
593        * If no more work requests, and we waited long enough, quit
594        */
595       Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
596          jq->ready_jobs->empty());
597       if (jq->ready_jobs->empty() && timedout) {
598          Dmsg0(2300, "break big loop\n");
599          jq->num_workers--;
600          break;
601       }
602 
603       work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
604       if (work) {
605          /*
606           * If a job is waiting on a Resource, don't consume all
607           *   the CPU time looping looking for work, and even more
608           *   important, release the lock so that a job that has
609           *   terminated can give us the resource.
610           */
611          V(jq->mutex);
612          bmicrosleep(2, 0);              /* pause for 2 seconds */
613          P(jq->mutex);
614          /* Recompute work as something may have changed in last 2 secs */
615          work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
616       }
617       Dmsg1(2300, "Loop again. work=%d\n", work);
618    } /* end of big for loop */
619 
620    Dmsg0(200, "unlock mutex\n");
621    V(jq->mutex);
622    Dmsg0(2300, "End jobq_server\n");
623    return NULL;
624 }
625 
626 /*
627  * Returns true if cleanup done and we should look for more work
628  */
reschedule_job(JCR * jcr,jobq_t * jq,jobq_item_t * je)629 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
630 {
631    bool resched = false;
632    /*
633     * Reschedule the job if requested and possible
634     */
635    /* Basic condition is that more reschedule times remain */
636    if (jcr->job->RescheduleTimes == 0 ||
637        jcr->reschedule_count < jcr->job->RescheduleTimes) {
638 
639       /* Check for incomplete jobs */
640       if (jcr->is_incomplete()) {
641          resched = (jcr->RescheduleIncompleteJobs && jcr->is_JobType(JT_BACKUP) &&
642                     !(jcr->HasBase||jcr->is_JobLevel(L_BASE)));
643       } else {
644          /* Check for failed jobs */
645          resched = (jcr->job->RescheduleOnError &&
646                     !jcr->is_JobStatus(JS_Terminated) &&
647                     !jcr->is_JobStatus(JS_Canceled) &&
648                     jcr->is_JobType(JT_BACKUP));
649       }
650    }
651    if (resched) {
652        char dt[50], dt2[50];
653 
654        /*
655         * Reschedule this job by cleaning it up, but
656         *  reuse the same JobId if possible.
657         */
658       jcr->rerunning = jcr->is_incomplete();   /* save incomplete status */
659       time_t now = time(NULL);
660       jcr->reschedule_count++;
661       jcr->sched_time = now + jcr->job->RescheduleInterval;
662       bstrftime(dt, sizeof(dt), now);
663       bstrftime(dt2, sizeof(dt2), jcr->sched_time);
664       Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
665             (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
666       Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
667            jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
668       dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
669       jcr->JobStatus = -1;
670       jcr->setJobStatus(JS_WaitStartTime);
671       jcr->SDJobStatus = 0;
672       jcr->JobErrors = 0;
673       if (!allow_duplicate_job(jcr)) {
674          return false;
675       }
676       /* Only jobs with no output or Incomplete jobs can run on same JCR */
677       if (jcr->JobBytes == 0 || jcr->rerunning) {
678          Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
679          V(jq->mutex);
680          /*
681           * Special test here since a Virtual Full gets marked
682           *  as a Full, so we look at the resource record
683           */
684          if (jcr->wasVirtualFull) {
685             jcr->setJobLevel(L_VIRTUAL_FULL);
686          }
687          /*
688           * When we are using the same jcr then make sure to reset
689           *   RealEndTime back to zero.
690           */
691          jcr->jr.RealEndTime = 0;
692          jobq_add(jq, jcr);     /* queue the job to run again */
693          P(jq->mutex);
694          free_jcr(jcr);         /* release jcr */
695          free(je);              /* free the job entry */
696          return true;           /* we already cleaned up */
697       }
698       /*
699        * Something was actually backed up, so we cannot reuse
700        *   the old JobId or there will be database record
701        *   conflicts.  We now create a new job, copying the
702        *   appropriate fields.
703        */
704       JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
705       set_jcr_defaults(njcr, jcr->job);
706       /*
707        * Eliminate the new job_end_push, then copy the one from
708        *  the old job, and set the old one to be empty.
709        */
710       void *v;
711       lock_jobs();              /* protect ourself from reload_config() */
712       LockRes();
713       foreach_alist(v, (&jcr->job_end_push)) {
714          njcr->job_end_push.append(v);
715       }
716       jcr->job_end_push.destroy();
717       jcr->job_end_push.init(1, false);
718       UnlockRes();
719       unlock_jobs();
720 
721       njcr->reschedule_count = jcr->reschedule_count;
722       njcr->sched_time = jcr->sched_time;
723       njcr->initial_sched_time = jcr->initial_sched_time;
724       /*
725        * Special test here since a Virtual Full gets marked
726        *  as a Full, so we look at the resource record
727        */
728       if (jcr->wasVirtualFull) {
729          njcr->setJobLevel(L_VIRTUAL_FULL);
730       } else {
731          njcr->setJobLevel(jcr->getJobLevel());
732       }
733       njcr->pool = jcr->pool;
734       njcr->run_pool_override = jcr->run_pool_override;
735       njcr->next_pool = jcr->next_pool;
736       njcr->run_next_pool_override = jcr->run_next_pool_override;
737       njcr->full_pool = jcr->full_pool;
738       njcr->vfull_pool = jcr->vfull_pool;
739       njcr->run_full_pool_override = jcr->run_full_pool_override;
740       njcr->run_vfull_pool_override = jcr->run_vfull_pool_override;
741       njcr->inc_pool = jcr->inc_pool;
742       njcr->run_inc_pool_override = jcr->run_inc_pool_override;
743       njcr->diff_pool = jcr->diff_pool;
744       njcr->JobStatus = -1;
745       njcr->setJobStatus(jcr->JobStatus);
746       if (jcr->rstore) {
747          copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
748       } else {
749          free_rstorage(njcr);
750       }
751       if (jcr->wstore) {
752          copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
753       } else {
754          free_wstorage(njcr);
755       }
756       njcr->messages = jcr->messages;
757       njcr->spool_data = jcr->spool_data;
758       njcr->write_part_after_job = jcr->write_part_after_job;
759       Dmsg0(2300, "Call to run new job\n");
760       V(jq->mutex);
761       run_job(njcr);            /* This creates a "new" job */
762       free_jcr(njcr);           /* release "new" jcr */
763       P(jq->mutex);
764       Dmsg0(2300, "Back from running new job.\n");
765    }
766    return false;
767 }
768 
769 /*
770  * See if we can acquire all the necessary resources for the job (JCR)
771  *
772  *  Returns: true  if successful
773  *           false if resource failure
774  */
acquire_resources(JCR * jcr)775 static bool acquire_resources(JCR *jcr)
776 {
777    bool skip_this_jcr = false;
778 
779    jcr->acquired_resource_locks = false;
780 /*
781  * Turning this code off is likely to cause some deadlocks,
782  *   but we do not really have enough information here to
783  *   know if this is really a deadlock (it may be a dual drive
784  *   autochanger), and in principle, the SD reservation system
785  *   should detect these deadlocks, so push the work off on it.
786  */
787 #ifdef xxx
788    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
789       Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n"
790          "    Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
791          jcr->rstore->name(), jcr->rstore_source, jcr->wstore->name(), jcr->wstore_source);
792       jcr->setJobStatus(JS_Canceled);
793       return false;
794    }
795 #endif
796    if (jcr->rstore) {
797       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
798       if (!inc_read_store(jcr)) {
799          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->getNumConcurrentJobs());
800          jcr->setJobStatus(JS_WaitStoreRes);
801          return false;
802       }
803    }
804 
805    if (jcr->wstore) {
806       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
807       int num = jcr->wstore->getNumConcurrentJobs();
808       if (num < jcr->wstore->MaxConcurrentJobs) {
809          num = jcr->wstore->incNumConcurrentJobs(1);
810          Dmsg1(200, "Inc wncj=%d\n", num);
811       } else if (jcr->rstore) {
812          dec_read_store(jcr);
813          skip_this_jcr = true;
814       } else {
815          Dmsg1(200, "Fail wncj=%d\n", num);
816          skip_this_jcr = true;
817       }
818    }
819    if (skip_this_jcr) {
820       jcr->setJobStatus(JS_WaitStoreRes);
821       return false;
822    }
823 
824    if (jcr->client) {
825       if (jcr->client->getNumConcurrentJobs() < jcr->client->MaxConcurrentJobs) {
826          update_client_numconcurrentjobs(jcr, 1);
827       } else {
828          /* Back out previous locks */
829          dec_write_store(jcr);
830          dec_read_store(jcr);
831          jcr->setJobStatus(JS_WaitClientRes);
832          return false;
833       }
834    }
835    if (jcr->job->getNumConcurrentJobs() < jcr->job->MaxConcurrentJobs) {
836       jcr->job->incNumConcurrentJobs(1);
837    } else {
838       /* Back out previous locks */
839       dec_write_store(jcr);
840       dec_read_store(jcr);
841       update_client_numconcurrentjobs(jcr, -1);
842       jcr->setJobStatus(JS_WaitJobRes);
843       return false;
844    }
845 
846    jcr->acquired_resource_locks = true;
847    return true;
848 }
849 
850 static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
851 
852 /*
853  * Note: inc_read_store() and dec_read_store() are
854  *   called from select_rstore() in src/dird/restore.c
855  */
inc_read_store(JCR * jcr)856 bool inc_read_store(JCR *jcr)
857 {
858    P(rstore_mutex);
859    int num = jcr->rstore->getNumConcurrentJobs();
860    int numread = jcr->rstore->getNumConcurrentReadJobs();
861    int maxread = jcr->rstore->MaxConcurrentReadJobs;
862    if (num < jcr->rstore->MaxConcurrentJobs &&
863        (jcr->getJobType() == JT_RESTORE ||
864         numread == 0     ||
865         maxread == 0     ||     /* No limit set */
866         numread < maxread))     /* Below the limit */
867    {
868       numread = jcr->rstore->incNumConcurrentReadJobs(1);
869       num = jcr->rstore->incNumConcurrentJobs(1);
870       Dmsg1(200, "Inc rncj=%d\n", num);
871       V(rstore_mutex);
872       return true;
873    }
874    V(rstore_mutex);
875    return false;
876 }
877 
dec_read_store(JCR * jcr)878 void dec_read_store(JCR *jcr)
879 {
880    if (jcr->rstore) {
881       P(rstore_mutex);
882       jcr->rstore->incNumConcurrentReadJobs(-1);
883       int num = jcr->rstore->incNumConcurrentJobs(-1);
884       Dmsg1(200, "Dec rncj=%d\n", num);
885       V(rstore_mutex);
886    }
887 }
888 
dec_write_store(JCR * jcr)889 static void dec_write_store(JCR *jcr)
890 {
891    if (jcr->wstore) {
892       int num = jcr->wstore->incNumConcurrentJobs(-1);
893       Dmsg1(200, "Dec wncj=%d\n", num);
894    }
895 }
896