1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Manipulation routines for Job Control Records and
21 * handling of last_jobs_list.
22 *
23 * Kern E. Sibbald, December 2000
24 *
25 * These routines are thread safe.
26 *
27 * The job list routines were re-written in May 2005 to
28 * eliminate the global lock while traversing the list, and
29 * to use the dlist subroutines. The locking is now done
30 * on the list each time the list is modified or traversed.
31 * That is it is "micro-locked" rather than globally locked.
32 * The result is that there is one lock/unlock for each entry
33 * in the list while traversing it rather than a single lock
34 * at the beginning of a traversal and one at the end. This
35 * incurs slightly more overhead, but effectively eliminates
36 * the possibilty of race conditions. In addition, with the
37 * exception of the global locking of the list during the
38 * re-reading of the config file, no recursion is needed.
39 *
40 */
41
42 #include "bacula.h"
43 #include "jcr.h"
44
45 const int dbglvl = 3400;
46
47 /* External variables we reference */
48
49 /* External referenced functions */
50 void free_bregexps(alist *bregexps);
51
52 /* Forward referenced functions */
53 extern "C" void timeout_handler(int sig);
54 static void jcr_timeout_check(watchdog_t *self);
55 #ifdef TRACE_JCR_CHAIN
56 static void b_lock_jcr_chain(const char *filen, int line);
57 static void b_unlock_jcr_chain(const char *filen, int line);
58 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
59 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
60 #else
61 static void lock_jcr_chain();
62 static void unlock_jcr_chain();
63 #endif
64
65
66 int num_jobs_run;
67 dlist *last_jobs = NULL;
68 const int max_last_jobs = 10;
69
70 static dlist *jcrs = NULL; /* JCR chain */
71 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
72
73 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
74
75 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
76
77 static pthread_key_t jcr_key; /* Pointer to jcr for each thread */
78
79 pthread_once_t key_once = PTHREAD_ONCE_INIT;
80
81 static char Job_status[] = "Status JobId=%ld JobStatus=%d\n";
82
83
lock_jobs()84 void lock_jobs()
85 {
86 P(job_start_mutex);
87 }
88
unlock_jobs()89 void unlock_jobs()
90 {
91 V(job_start_mutex);
92 }
93
init_last_jobs_list()94 void init_last_jobs_list()
95 {
96 JCR *jcr = NULL;
97 struct s_last_job *job_entry = NULL;
98 if (!last_jobs) {
99 last_jobs = New(dlist(job_entry, &job_entry->link));
100 }
101 if (!jcrs) {
102 jcrs = New(dlist(jcr, &jcr->link));
103 }
104 }
105
term_last_jobs_list()106 void term_last_jobs_list()
107 {
108 if (last_jobs) {
109 lock_last_jobs_list();
110 while (!last_jobs->empty()) {
111 void *je = last_jobs->first();
112 last_jobs->remove(je);
113 free(je);
114 }
115 delete last_jobs;
116 last_jobs = NULL;
117 unlock_last_jobs_list();
118 }
119 if (jcrs) {
120 delete jcrs;
121 jcrs = NULL;
122 }
123 }
124
read_last_jobs_list(int fd,uint64_t addr)125 bool read_last_jobs_list(int fd, uint64_t addr)
126 {
127 struct s_last_job *je, job;
128 uint32_t num;
129 bool ok = true;
130
131 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
132 if (addr == 0 || lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
133 return false;
134 }
135 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
136 return false;
137 }
138 Dmsg1(100, "Read num_items=%d\n", num);
139 if (num > 4 * max_last_jobs) { /* sanity check */
140 return false;
141 }
142 lock_last_jobs_list();
143 for ( ; num; num--) {
144 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
145 berrno be;
146 Pmsg1(000, "Read job entry. ERR=%s\n", be.bstrerror());
147 ok = false;
148 break;
149 }
150 if (job.JobId > 0) {
151 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
152 memcpy((char *)je, (char *)&job, sizeof(job));
153 if (!last_jobs) {
154 init_last_jobs_list();
155 }
156 last_jobs->append(je);
157 if (last_jobs->size() > max_last_jobs) {
158 je = (struct s_last_job *)last_jobs->first();
159 last_jobs->remove(je);
160 free(je);
161 }
162 }
163 }
164 unlock_last_jobs_list();
165 return ok;
166 }
167
write_last_jobs_list(int fd,uint64_t addr)168 uint64_t write_last_jobs_list(int fd, uint64_t addr)
169 {
170 struct s_last_job *je;
171 uint32_t num;
172 ssize_t stat;
173
174 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
175 if (lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
176 return 0;
177 }
178 if (last_jobs) {
179 lock_last_jobs_list();
180 /* First record is number of entires */
181 num = last_jobs->size();
182 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
183 berrno be;
184 Pmsg1(000, "Error writing num_items: ERR=%s\n", be.bstrerror());
185 goto bail_out;
186 }
187 foreach_dlist(je, last_jobs) {
188 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
189 berrno be;
190 Pmsg1(000, "Error writing job: ERR=%s\n", be.bstrerror());
191 goto bail_out;
192 }
193 }
194 unlock_last_jobs_list();
195 }
196 /* Return current address */
197 stat = lseek(fd, 0, SEEK_CUR);
198 if (stat < 0) {
199 stat = 0;
200 }
201 return stat;
202
203 bail_out:
204 unlock_last_jobs_list();
205 return 0;
206 }
207
lock_last_jobs_list()208 void lock_last_jobs_list()
209 {
210 P(last_jobs_mutex);
211 }
212
unlock_last_jobs_list()213 void unlock_last_jobs_list()
214 {
215 V(last_jobs_mutex);
216 }
217
218 /* Get an ASCII representation of the Operation being performed as an english Noun */
get_OperationName()219 const char *JCR::get_OperationName()
220 {
221 switch(m_JobType) {
222 case JT_BACKUP:
223 return _("Backup");
224 case JT_VERIFY:
225 return _("Verifying");
226 case JT_RESTORE:
227 return _("Restoring");
228 case JT_ARCHIVE:
229 return _("Archiving");
230 case JT_COPY:
231 return _("Copying");
232 case JT_MIGRATE:
233 return _("Migration");
234 case JT_SCAN:
235 return _("Scanning");
236 default:
237 return _("Unknown operation");
238 }
239 }
240
241 /* Get an ASCII representation of the Action being performed either an english Verb or Adjective */
get_ActionName(bool past)242 const char *JCR::get_ActionName(bool past)
243 {
244 switch(m_JobType) {
245 case JT_BACKUP:
246 return _("backup");
247 case JT_VERIFY:
248 return (past == true) ? _("verified") : _("verify");
249 case JT_RESTORE:
250 return (past == true) ? _("restored") : _("restore");
251 case JT_ARCHIVE:
252 return (past == true) ? _("archived") : _("archive");
253 case JT_COPY:
254 return (past == true) ? _("copied") : _("copy");
255 case JT_MIGRATE:
256 return (past == true) ? _("migrated") : _("migrate");
257 case JT_SCAN:
258 return (past == true) ? _("scanned") : _("scan");
259 default:
260 return _("unknown action");
261 }
262 }
263
JobReads()264 bool JCR::JobReads()
265 {
266 switch (m_JobType) {
267 case JT_VERIFY:
268 case JT_RESTORE:
269 case JT_COPY:
270 case JT_MIGRATE:
271 return true;
272 case JT_BACKUP:
273 if (m_JobLevel == L_VIRTUAL_FULL) {
274 return true;
275 }
276 break;
277 default:
278 break;
279 }
280 return false;
281 }
282
283 /* We can stop only Backup jobs connected to a client. It doesn't make sens at
284 * this time to stop a copy, migraton, restore or a verify job. The specific
285 * code should be implemented first.
286 */
can_be_stopped()287 bool JCR::can_be_stopped()
288 {
289 bool ok=true;
290 if (getJobType() == JT_BACKUP) { /* Is a Backup */
291 if (getJobLevel() == L_VIRTUAL_FULL) { /* Is a VirtualFull */
292 ok = false;
293 }
294 } else { /* Is not a backup (so, copy, migration, admin, verify, ... */
295 ok = false;
296 }
297 return ok;
298 }
299
300 /*
301 * Push a subroutine address into the job end callback stack
302 */
job_end_push(JCR * jcr,void job_end_cb (JCR * jcr,void *),void * ctx)303 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
304 {
305 jcr->job_end_push.append((void *)job_end_cb);
306 jcr->job_end_push.append(ctx);
307 }
308
309 /* DELETE ME when bugs in MA1512, MA1632 MA1639 are fixed */
310 void (*MA1512_reload_job_end_cb)(JCR *,void *) = NULL;
311
312 /* Pop each job_end subroutine and call it */
job_end_pop(JCR * jcr)313 static void job_end_pop(JCR *jcr)
314 {
315 void (*job_end_cb)(JCR *jcr, void *ctx);
316 void *ctx;
317 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
318 ctx = jcr->job_end_push.get(i--);
319 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
320 /* check for bug MA1512, MA1632 MA1639,
321 * today, job_end_cb can only be reload_job_end_cb() from DIR */
322 if (job_end_cb != MA1512_reload_job_end_cb && MA1512_reload_job_end_cb != NULL) {
323 Tmsg2(0, "Bug 'job_end_pop' detected, skip ! job_end_cb=0x%p ctx=0x%p\n", job_end_cb, ctx);
324 Tmsg0(0, "Display job_end_push list\n");
325 for (int j=jcr->job_end_push.size()-1; j > 0; ) {
326 void *ctx2 = jcr->job_end_push.get(j--);
327 void *job_end_cb2 = jcr->job_end_push.get(j--);
328 Tmsg3(0, "Bug 'job_end_pop' entry[%d] job_end_cb=0x%p ctx=0x%p\n", j+1, job_end_cb2, ctx2);
329 }
330 } else
331 {
332 job_end_cb(jcr, ctx);
333 }
334 }
335 }
336
337 /*
338 * Create thread key for thread specific data
339 */
create_jcr_key()340 void create_jcr_key()
341 {
342 int status = pthread_key_create(&jcr_key, NULL);
343 if (status != 0) {
344 berrno be;
345 Jmsg1(NULL, M_ABORT, 0, _("pthread key create failed: ERR=%s\n"),
346 be.bstrerror(status));
347 }
348 }
349
350 /*
351 * Create a Job Control Record and link it into JCR chain
352 * Returns newly allocated JCR
353 * Note, since each daemon has a different JCR, he passes
354 * us the size.
355 */
new_jcr(int size,JCR_free_HANDLER * daemon_free_jcr)356 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
357 {
358 JCR *jcr;
359 MQUEUE_ITEM *item = NULL;
360 int status;
361
362 Dmsg0(dbglvl, "Enter new_jcr\n");
363 status = pthread_once(&key_once, create_jcr_key);
364 if (status != 0) {
365 berrno be;
366 Jmsg1(NULL, M_ABORT, 0, _("pthread_once failed. ERR=%s\n"), be.bstrerror(status));
367 }
368 jcr = (JCR *)malloc(size);
369 bmemzero(jcr, size);
370 /* Note for the director, this value is changed in jobq.c */
371 jcr->my_thread_id = pthread_self();
372 jcr->msg_queue = New(dlist(item, &item->link));
373 if ((status = pthread_mutex_init(&jcr->msg_queue_mutex, NULL)) != 0) {
374 berrno be;
375 Jmsg(NULL, M_ABORT, 0, _("Could not init msg_queue mutex. ERR=%s\n"),
376 be.bstrerror(status));
377 }
378 jcr->job_end_push.init(1, false);
379 jcr->sched_time = time(NULL);
380 jcr->initial_sched_time = jcr->sched_time;
381 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
382 jcr->init_mutex();
383 jcr->inc_use_count();
384 jcr->VolumeName = get_pool_memory(PM_FNAME);
385 jcr->VolumeName[0] = 0;
386 jcr->errmsg = get_pool_memory(PM_MESSAGE);
387 jcr->errmsg[0] = 0;
388 jcr->comment = get_pool_memory(PM_FNAME);
389 jcr->comment[0] = 0;
390 jcr->StatusErrMsg = get_pool_memory(PM_FNAME);
391 jcr->StatusErrMsg[0] = 0;
392 jcr->job_uid = -1;
393 /* Setup some dummy values */
394 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
395 jcr->JobId = 0;
396 jcr->setJobType(JT_SYSTEM); /* internal job until defined */
397 jcr->setJobLevel(L_NONE);
398 jcr->setJobStatus(JS_Created); /* ready to run */
399 #ifndef HAVE_WIN32
400 struct sigaction sigtimer;
401 sigtimer.sa_flags = 0;
402 sigtimer.sa_handler = timeout_handler;
403 sigfillset(&sigtimer.sa_mask);
404 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
405 #endif
406
407 /*
408 * Locking jobs is a global lock that is needed
409 * so that the Director can stop new jobs from being
410 * added to the jcr chain while it processes a new
411 * conf file and does the job_end_push().
412 */
413 lock_jobs();
414 lock_jcr_chain();
415 if (!jcrs) {
416 jcrs = New(dlist(jcr, &jcr->link));
417 }
418 jcrs->append(jcr);
419 unlock_jcr_chain();
420 unlock_jobs();
421
422 return jcr;
423 }
424
425
426 /*
427 * Remove a JCR from the chain
428 * NOTE! The chain must be locked prior to calling
429 * this routine.
430 */
remove_jcr(JCR * jcr)431 static void remove_jcr(JCR *jcr)
432 {
433 Dmsg0(dbglvl, "Enter remove_jcr\n");
434 if (!jcr) {
435 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
436 }
437 jcrs->remove(jcr);
438 Dmsg0(dbglvl, "Leave remove_jcr\n");
439 }
440
441 /*
442 * Free stuff common to all JCRs. N.B. Be careful to include only
443 * generic stuff in the common part of the jcr.
444 */
free_common_jcr(JCR * jcr)445 static void free_common_jcr(JCR *jcr)
446 {
447 /* Uses jcr lock/unlock */
448 remove_jcr_from_tsd(jcr);
449 jcr->set_killable(false);
450
451 jcr->destroy_mutex();
452
453 if (jcr->msg_queue) {
454 delete jcr->msg_queue;
455 jcr->msg_queue = NULL;
456 pthread_mutex_destroy(&jcr->msg_queue_mutex);
457 }
458
459 /* do this after closing messages */
460 free_and_null_pool_memory(jcr->JobIds);
461 free_and_null_pool_memory(jcr->client_name);
462 free_and_null_pool_memory(jcr->attr);
463 free_and_null_pool_memory(jcr->VolumeName);
464 free_and_null_pool_memory(jcr->errmsg);
465 free_and_null_pool_memory(jcr->StatusErrMsg);
466 bfree_and_null(jcr->job_user);
467 bfree_and_null(jcr->job_group);
468
469 if (jcr->sd_auth_key) {
470 free(jcr->sd_auth_key);
471 jcr->sd_auth_key = NULL;
472 }
473
474 free_bsock(jcr->dir_bsock);
475
476 if (jcr->where) {
477 free(jcr->where);
478 jcr->where = NULL;
479 }
480 if (jcr->RegexWhere) {
481 free(jcr->RegexWhere);
482 jcr->RegexWhere = NULL;
483 }
484 if (jcr->where_bregexp) {
485 free_bregexps(jcr->where_bregexp);
486 delete jcr->where_bregexp;
487 jcr->where_bregexp = NULL;
488 }
489 if (jcr->cached_path) {
490 free_pool_memory(jcr->cached_path);
491 jcr->cached_path = NULL;
492 jcr->cached_pnl = 0;
493 }
494 if (jcr->id_list) {
495 free_guid_list(jcr->id_list);
496 jcr->id_list = NULL;
497 }
498 if (jcr->comment) {
499 free_pool_memory(jcr->comment);
500 jcr->comment = NULL;
501 }
502 free(jcr);
503 }
504
505 /*
506 * Global routine to free a jcr
507 */
508 #ifdef DEBUG
b_free_jcr(const char * file,int line,JCR * jcr)509 void b_free_jcr(const char *file, int line, JCR *jcr)
510 {
511 struct s_last_job *je;
512
513 Dmsg3(dbglvl, "Enter free_jcr jid=%u from %s:%d\n", jcr->JobId, file, line);
514
515 #else
516
517 void free_jcr(JCR *jcr)
518 {
519 struct s_last_job *je;
520
521 Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n",
522 jcr->JobId, jcr->use_count(), jcr->Job);
523
524 #endif
525
526 lock_jcr_chain();
527 jcr->dec_use_count(); /* decrement use count */
528 ASSERT2(jcr->use_count() >= 0, "JCR use_count < 0");
529 // Jmsg2(jcr, M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
530 // jcr->use_count(), jcr->JobId);
531 //}
532 if (jcr->JobId > 0) {
533 Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n",
534 jcr->JobId, jcr->use_count(), jcr->Job);
535 }
536 if (jcr->use_count() > 0) { /* if in use */
537 unlock_jcr_chain();
538 return;
539 }
540 if (jcr->JobId > 0) {
541 Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n",
542 jcr->JobId, jcr->use_count(), jcr->Job);
543 }
544 jcr->exiting = true;
545 remove_jcr(jcr); /* remove Jcr from chain */
546 unlock_jcr_chain();
547
548 if (jcr->JobId > 0) {
549 dequeue_messages(jcr);
550 dequeue_daemon_messages(jcr);
551 }
552 close_msg(jcr); /* close messages for this job */
553 job_end_pop(jcr); /* pop and call hooked routines */
554
555 Dmsg1(dbglvl, "End job=%d\n", jcr->JobId);
556
557 /* Keep some statistics */
558 switch (jcr->getJobType()) {
559 case JT_BACKUP:
560 case JT_VERIFY:
561 case JT_RESTORE:
562 case JT_MIGRATE:
563 case JT_COPY:
564 case JT_ADMIN:
565 /* Keep list of last jobs, but not Console where JobId==0 */
566 if (jcr->JobId > 0) {
567 lock_last_jobs_list();
568 num_jobs_run++;
569 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
570 memset(je, 0, sizeof(struct s_last_job)); /* zero in case unset fields */
571 je->Errors = jcr->JobErrors;
572 je->JobType = jcr->getJobType();
573 je->JobId = jcr->JobId;
574 je->VolSessionId = jcr->VolSessionId;
575 je->VolSessionTime = jcr->VolSessionTime;
576 bstrncpy(je->Job, jcr->Job, sizeof(je->Job));
577 je->JobFiles = jcr->JobFiles;
578 je->JobBytes = jcr->JobBytes;
579 je->JobStatus = jcr->JobStatus;
580 je->JobLevel = jcr->getJobLevel();
581 je->start_time = jcr->start_time;
582 je->end_time = time(NULL);
583
584 if (!last_jobs) {
585 init_last_jobs_list();
586 }
587 last_jobs->append(je);
588 if (last_jobs->size() > max_last_jobs) {
589 je = (struct s_last_job *)last_jobs->first();
590 last_jobs->remove(je);
591 free(je);
592 }
593 unlock_last_jobs_list();
594 }
595 break;
596 default:
597 break;
598 }
599
600 if (jcr->daemon_free_jcr) {
601 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
602 }
603
604 free_common_jcr(jcr);
605 close_msg(NULL); /* flush any daemon messages */
606 Dmsg0(dbglvl, "Exit free_jcr\n");
607 }
608
609 /*
610 * Remove jcr from thread specific data, but
611 * but make sure it is us who are attached.
612 */
613 void remove_jcr_from_tsd(JCR *jcr)
614 {
615 JCR *tjcr = get_jcr_from_tsd();
616 if (tjcr == jcr) {
617 set_jcr_in_tsd(INVALID_JCR);
618 }
619 }
620
621 void JCR::set_killable(bool killable)
622 {
623 lock();
624 my_thread_killable = killable;
625 unlock();
626 }
627
628 /*
629 * Put this jcr in the thread specifc data
630 * if update_thread_info is true and the jcr is valide,
631 * we update the my_thread_id in the JCR
632 */
633 void set_jcr_in_tsd(JCR *jcr)
634 {
635 int status = pthread_setspecific(jcr_key, (void *)jcr);
636 if (status != 0) {
637 berrno be;
638 Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"),
639 be.bstrerror(status));
640 }
641 }
642
643 void JCR::my_thread_send_signal(int sig)
644 {
645 lock_jcr_chain(); /* use global lock */
646 this->lock();
647 if (this->exiting) {
648 goto get_out;
649 }
650 if (this->is_killable() &&
651 !pthread_equal(this->my_thread_id, pthread_self()))
652 {
653 Dmsg1(800, "Send kill to jid=%d\n", this->JobId);
654 pthread_kill(this->my_thread_id, sig);
655 this->exiting = true;
656
657 } else if (!this->is_killable()) {
658 Dmsg1(10, "Warning, cannot send kill to jid=%d marked not killable.\n", this->JobId);
659 }
660 get_out:
661 this->unlock();
662 unlock_jcr_chain();
663 }
664
665 /*
666 * Give me the jcr that is attached to this thread
667 */
668 JCR *get_jcr_from_tsd()
669 {
670 JCR *jcr = (JCR *)pthread_getspecific(jcr_key);
671 // printf("get_jcr_from_tsd: jcr=%p\n", jcr);
672 /* set any INVALID_JCR to NULL which the rest of Bacula understands */
673 if (jcr == INVALID_JCR) {
674 jcr = NULL;
675 }
676 return jcr;
677 }
678
679
680 /*
681 * Find which JobId corresponds to the current thread
682 */
683 uint32_t get_jobid_from_tsd()
684 {
685 JCR *jcr;
686 uint32_t JobId = 0;
687 jcr = get_jcr_from_tsd();
688 // printf("get_jobid_from_tsr: jcr=%p\n", jcr);
689 if (jcr) {
690 JobId = (uint32_t)jcr->JobId;
691 }
692 return JobId;
693 }
694
695 /*
696 * Given a JobId, find the JCR
697 * Returns: jcr on success
698 * NULL on failure
699 */
700 JCR *get_jcr_by_id(uint32_t JobId)
701 {
702 JCR *jcr;
703
704 foreach_jcr(jcr) {
705 if (jcr->JobId == JobId) {
706 jcr->inc_use_count();
707 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
708 jcr->JobId, jcr->use_count(), jcr->Job);
709 break;
710 }
711 }
712 endeach_jcr(jcr);
713 return jcr;
714 }
715
716 /*
717 * Given a thread id, find the JobId
718 * Returns: JobId on success
719 * 0 on failure
720 */
721 uint32_t get_jobid_from_tid(pthread_t tid)
722 {
723 JCR *jcr = NULL;
724 bool found = false;
725
726 foreach_jcr(jcr) {
727 if (pthread_equal(jcr->my_thread_id, tid)) {
728 found = true;
729 break;
730 }
731 }
732 endeach_jcr(jcr);
733 if (found) {
734 return jcr->JobId;
735 }
736 return 0;
737 }
738
739
740 /*
741 * Given a SessionId and SessionTime, find the JCR
742 * Returns: jcr on success
743 * NULL on failure
744 */
745 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
746 {
747 JCR *jcr;
748
749 foreach_jcr(jcr) {
750 if (jcr->VolSessionId == SessionId &&
751 jcr->VolSessionTime == SessionTime) {
752 jcr->inc_use_count();
753 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
754 jcr->JobId, jcr->use_count(), jcr->Job);
755 break;
756 }
757 }
758 endeach_jcr(jcr);
759 return jcr;
760 }
761
762
763 /*
764 * Given a Job, find the JCR
765 * compares on the number of characters in Job
766 * thus allowing partial matches.
767 * Returns: jcr on success
768 * NULL on failure
769 */
770 JCR *get_jcr_by_partial_name(char *Job)
771 {
772 JCR *jcr;
773 int len;
774
775 if (!Job) {
776 return NULL;
777 }
778 len = strlen(Job);
779 foreach_jcr(jcr) {
780 if (strncmp(Job, jcr->Job, len) == 0) {
781 jcr->inc_use_count();
782 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
783 jcr->JobId, jcr->use_count(), jcr->Job);
784 break;
785 }
786 }
787 endeach_jcr(jcr);
788 return jcr;
789 }
790
791
792 /*
793 * Given a Job, find the JCR
794 * requires an exact match of names.
795 * Returns: jcr on success
796 * NULL on failure
797 */
798 JCR *get_jcr_by_full_name(char *Job)
799 {
800 JCR *jcr;
801
802 if (!Job) {
803 return NULL;
804 }
805 foreach_jcr(jcr) {
806 if (strcmp(jcr->Job, Job) == 0) {
807 jcr->inc_use_count();
808 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
809 jcr->JobId, jcr->use_count(), jcr->Job);
810 break;
811 }
812 }
813 endeach_jcr(jcr);
814 return jcr;
815 }
816
817 static void update_wait_time(JCR *jcr, int newJobStatus)
818 {
819 bool enter_in_waittime;
820 int oldJobStatus = jcr->JobStatus;
821
822 switch (newJobStatus) {
823 case JS_WaitFD:
824 case JS_WaitSD:
825 case JS_WaitMedia:
826 case JS_WaitMount:
827 case JS_WaitStoreRes:
828 case JS_WaitJobRes:
829 case JS_WaitClientRes:
830 case JS_WaitMaxJobs:
831 case JS_WaitPriority:
832 enter_in_waittime = true;
833 break;
834 default:
835 enter_in_waittime = false; /* not a Wait situation */
836 break;
837 }
838
839 /*
840 * If we were previously waiting and are not any more
841 * we want to update the wait_time variable, which is
842 * the start of waiting.
843 */
844 switch (oldJobStatus) {
845 case JS_WaitFD:
846 case JS_WaitSD:
847 case JS_WaitMedia:
848 case JS_WaitMount:
849 case JS_WaitStoreRes:
850 case JS_WaitJobRes:
851 case JS_WaitClientRes:
852 case JS_WaitMaxJobs:
853 case JS_WaitPriority:
854 if (!enter_in_waittime) { /* we get out the wait time */
855 jcr->wait_time_sum += (time(NULL) - jcr->wait_time);
856 jcr->wait_time = 0;
857 }
858 break;
859
860 /* if wait state is new, we keep current time for watchdog MaxWaitTime */
861 default:
862 if (enter_in_waittime) {
863 jcr->wait_time = time(NULL);
864 }
865 break;
866 }
867 }
868
869 /*
870 * Priority runs from 0 (lowest) to 10 (highest)
871 */
872 static int get_status_priority(int JobStatus)
873 {
874 int priority = 0;
875 switch (JobStatus) {
876 case JS_Incomplete:
877 priority = 10;
878 break;
879 case JS_ErrorTerminated:
880 case JS_FatalError:
881 case JS_Canceled:
882 priority = 9;
883 break;
884 case JS_Error:
885 priority = 8;
886 break;
887 case JS_Differences:
888 priority = 7;
889 break;
890 }
891 return priority;
892 }
893
894 /*
895 * Send Job status to Director
896 */
897 bool JCR::sendJobStatus()
898 {
899 if (dir_bsock) {
900 return dir_bsock->fsend(Job_status, JobId, JobStatus);
901 }
902 return true;
903 }
904
905 /*
906 * Set and send Job status to Director
907 */
908 bool JCR::sendJobStatus(int aJobStatus)
909 {
910 if (!is_JobStatus(aJobStatus)) {
911 setJobStatus(aJobStatus);
912 if (dir_bsock) {
913 return dir_bsock->fsend(Job_status, JobId, JobStatus);
914 }
915 }
916 return true;
917 }
918
919 void JCR::setJobStarted()
920 {
921 job_started = true;
922 job_started_time = time(NULL);
923 }
924
925 static pthread_mutex_t status_lock = PTHREAD_MUTEX_INITIALIZER;
926
927 void JCR::setJobStatus(int newJobStatus)
928 {
929 int priority, old_priority;
930 int oldJobStatus = JobStatus;
931
932 P(status_lock);
933 priority = get_status_priority(newJobStatus);
934 old_priority = get_status_priority(oldJobStatus);
935
936 Dmsg2(800, "set_jcr_job_status(%ld, %c)\n", JobId, newJobStatus);
937
938 /* Update wait_time depending on newJobStatus and oldJobStatus */
939 update_wait_time(this, newJobStatus);
940
941 /*
942 * For a set of errors, ... keep the current status
943 * so it isn't lost. For all others, set it.
944 */
945 Dmsg2(800, "OnEntry JobStatus=%c newJobstatus=%c\n", (oldJobStatus==0)?'0':oldJobStatus, newJobStatus);
946 /*
947 * If status priority is > than proposed new status, change it.
948 * If status priority == new priority and both are zero, take
949 * the new status.
950 * If it is not zero, then we keep the first non-zero "error" that
951 * occurred.
952 */
953 if (priority > old_priority || (
954 priority == 0 && old_priority == 0)) {
955 Dmsg4(800, "Set new stat. old: %c,%d new: %c,%d\n",
956 (oldJobStatus==0)?'0':oldJobStatus, old_priority, newJobStatus, priority);
957 JobStatus = newJobStatus; /* replace with new status */
958 }
959
960 if (oldJobStatus != JobStatus) {
961 Dmsg2(800, "leave setJobStatus old=%c new=%c\n", (oldJobStatus==0)?'0':oldJobStatus, newJobStatus);
962 // generate_plugin_event(this, bEventStatusChange, NULL);
963 }
964 V(status_lock);
965 }
966
967 #ifdef TRACE_JCR_CHAIN
968 static int lock_count = 0;
969 #endif
970
971 /*
972 * Lock the chain
973 */
974 #ifdef TRACE_JCR_CHAIN
975 static void b_lock_jcr_chain(const char *fname, int line)
976 #else
977 static void lock_jcr_chain()
978 #endif
979 {
980 #ifdef TRACE_JCR_CHAIN
981 Dmsg3(dbglvl, "Lock jcr chain %d from %s:%d\n", ++lock_count, fname, line);
982 #endif
983 P(jcr_lock);
984 }
985
986 /*
987 * Unlock the chain
988 */
989 #ifdef TRACE_JCR_CHAIN
990 static void b_unlock_jcr_chain(const char *fname, int line)
991 #else
992 static void unlock_jcr_chain()
993 #endif
994 {
995 #ifdef TRACE_JCR_CHAIN
996 Dmsg3(dbglvl, "Unlock jcr chain %d from %s:%d\n", lock_count--, fname, line);
997 #endif
998 V(jcr_lock);
999 }
1000
1001 /*
1002 * Start walk of jcr chain
1003 * The proper way to walk the jcr chain is:
1004 * JCR *jcr;
1005 * foreach_jcr(jcr) {
1006 * ...
1007 * }
1008 * endeach_jcr(jcr);
1009 *
1010 * It is possible to leave out the endeach_jcr(jcr), but
1011 * in that case, the last jcr referenced must be explicitly
1012 * released with:
1013 *
1014 * free_jcr(jcr);
1015 *
1016 */
1017 JCR *jcr_walk_start()
1018 {
1019 JCR *jcr;
1020 lock_jcr_chain();
1021 jcr = (JCR *)jcrs->first();
1022 if (jcr) {
1023 jcr->inc_use_count();
1024 if (jcr->JobId > 0) {
1025 Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n",
1026 jcr->JobId, jcr->use_count(), jcr->Job);
1027 }
1028 }
1029 unlock_jcr_chain();
1030 return jcr;
1031 }
1032
1033 /*
1034 * Get next jcr from chain, and release current one
1035 */
1036 JCR *jcr_walk_next(JCR *prev_jcr)
1037 {
1038 JCR *jcr;
1039
1040 lock_jcr_chain();
1041 jcr = (JCR *)jcrs->next(prev_jcr);
1042 if (jcr) {
1043 jcr->inc_use_count();
1044 if (jcr->JobId > 0) {
1045 Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n",
1046 jcr->JobId, jcr->use_count(), jcr->Job);
1047 }
1048 }
1049 unlock_jcr_chain();
1050 if (prev_jcr) {
1051 free_jcr(prev_jcr);
1052 }
1053 return jcr;
1054 }
1055
1056 /*
1057 * Release last jcr referenced
1058 */
1059 void jcr_walk_end(JCR *jcr)
1060 {
1061 if (jcr) {
1062 if (jcr->JobId > 0) {
1063 Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n",
1064 jcr->JobId, jcr->use_count(), jcr->Job);
1065 }
1066 free_jcr(jcr);
1067 }
1068 }
1069
1070 /*
1071 * Return number of Jobs
1072 */
1073 int job_count()
1074 {
1075 JCR *jcr;
1076 int count = 0;
1077
1078 lock_jcr_chain();
1079 for (jcr = (JCR *)jcrs->first(); jcr ; jcr = (JCR *)jcrs->next(jcr)) {
1080 if (jcr->JobId > 0) {
1081 count++;
1082 }
1083 }
1084 unlock_jcr_chain();
1085 return count;
1086 }
1087
1088
1089 /*
1090 * Setup to call the timeout check routine every 30 seconds
1091 * This routine will check any timers that have been enabled.
1092 */
1093 bool init_jcr_subsystem(void)
1094 {
1095 watchdog_t *wd = new_watchdog();
1096
1097 wd->one_shot = false;
1098 wd->interval = 30; /* FIXME: should be configurable somewhere, even
1099 if only with a #define */
1100 wd->callback = jcr_timeout_check;
1101
1102 register_watchdog(wd);
1103
1104 return true;
1105 }
1106
1107 static void jcr_timeout_check(watchdog_t *self)
1108 {
1109 JCR *jcr;
1110 BSOCK *bs;
1111 time_t timer_start;
1112
1113 Dmsg0(dbglvl, "Start JCR timeout checks\n");
1114
1115 /* Walk through all JCRs checking if any one is
1116 * blocked for more than specified max time.
1117 */
1118 foreach_jcr(jcr) {
1119 Dmsg2(dbglvl, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
1120 if (jcr->JobId == 0) {
1121 continue;
1122 }
1123 bs = jcr->store_bsock;
1124 if (bs) {
1125 timer_start = bs->timer_start;
1126 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
1127 bs->timer_start = 0; /* turn off timer */
1128 bs->set_timed_out();
1129 Qmsg(jcr, M_ERROR, 0, _(
1130 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
1131 (int)(watchdog_time - timer_start));
1132 jcr->my_thread_send_signal(TIMEOUT_SIGNAL);
1133 }
1134 }
1135 bs = jcr->file_bsock;
1136 if (bs) {
1137 timer_start = bs->timer_start;
1138 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
1139 bs->timer_start = 0; /* turn off timer */
1140 bs->set_timed_out();
1141 Qmsg(jcr, M_ERROR, 0, _(
1142 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
1143 (int)(watchdog_time - timer_start));
1144 jcr->my_thread_send_signal(TIMEOUT_SIGNAL);
1145 }
1146 }
1147 bs = jcr->dir_bsock;
1148 if (bs) {
1149 timer_start = bs->timer_start;
1150 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
1151 bs->timer_start = 0; /* turn off timer */
1152 bs->set_timed_out();
1153 Qmsg(jcr, M_ERROR, 0, _(
1154 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
1155 (int)(watchdog_time - timer_start));
1156 jcr->my_thread_send_signal(TIMEOUT_SIGNAL);
1157 }
1158 }
1159 }
1160 endeach_jcr(jcr);
1161
1162 Dmsg0(dbglvl, "Finished JCR timeout checks\n");
1163 }
1164
1165 /*
1166 * Return next JobId from comma separated list
1167 *
1168 * Returns:
1169 * 1 if next JobId returned
1170 * 0 if no more JobIds are in list
1171 * -1 there is an error
1172 */
1173 int get_next_jobid_from_list(char **p, uint32_t *JobId)
1174 {
1175 const int maxlen = 30;
1176 char jobid[maxlen+1];
1177 char *q = *p;
1178
1179 jobid[0] = 0;
1180 for (int i=0; i<maxlen; i++) {
1181 if (*q == 0) {
1182 break;
1183 } else if (*q == ',') {
1184 q++;
1185 break;
1186 }
1187 jobid[i] = *q++;
1188 jobid[i+1] = 0;
1189 }
1190 if (jobid[0] == 0) {
1191 return 0;
1192 } else if (!is_a_number(jobid)) {
1193 return -1; /* error */
1194 }
1195 *p = q;
1196 *JobId = str_to_int64(jobid);
1197 return 1;
1198 }
1199
1200 /*
1201 * Timeout signal comes here
1202 */
1203 extern "C" void timeout_handler(int sig)
1204 {
1205 return; /* thus interrupting the function */
1206 }
1207
1208 /* Used to display specific daemon information after a fatal signal
1209 * (like BDB in the director)
1210 */
1211 #define MAX_DBG_HOOK 10
1212 static dbg_jcr_hook_t *dbg_jcr_hooks[MAX_DBG_HOOK];
1213 static int dbg_jcr_handler_count=0;
1214
1215 void dbg_jcr_add_hook(dbg_jcr_hook_t *hook)
1216 {
1217 ASSERT(dbg_jcr_handler_count < MAX_DBG_HOOK);
1218 dbg_jcr_hooks[dbg_jcr_handler_count++] = hook;
1219 }
1220
1221 /* on win32, the pthread_t is a struct, so we don't display it */
1222 #ifdef HAVE_MINGW_W64
1223 # define get_threadid(a) (void *)0
1224 #else
1225 # define get_threadid(a) (void *)(a)
1226 #endif
1227 /*
1228 * !!! WARNING !!!
1229 *
1230 * This function should be used ONLY after a fatal signal. We walk through the
1231 * JCR chain without doing any lock, Bacula should not be running.
1232 */
1233 void dbg_print_jcr(FILE *fp)
1234 {
1235 char buf1[128], buf2[128], buf3[128], buf4[128];
1236
1237 if (!jcrs) {
1238 return;
1239 }
1240
1241 fprintf(fp, "Attempt to dump current JCRs. njcrs=%d\n", jcrs->size());
1242
1243 for (JCR *jcr = (JCR *)jcrs->first(); jcr ; jcr = (JCR *)jcrs->next(jcr)) {
1244 fprintf(fp, "threadid=%p JobId=%d JobStatus=%c jcr=%p name=%s\n",
1245 get_threadid(jcr->my_thread_id), (int)jcr->JobId, jcr->JobStatus, jcr, jcr->Job);
1246 fprintf(fp, "\tuse_count=%i killable=%d\n",
1247 jcr->use_count(), jcr->is_killable());
1248 fprintf(fp, "\tJobType=%c JobLevel=%c\n",
1249 jcr->getJobType(), jcr->getJobLevel());
1250 bstrftime(buf1, sizeof(buf1), jcr->sched_time);
1251 bstrftime(buf2, sizeof(buf2), jcr->start_time);
1252 bstrftime(buf3, sizeof(buf3), jcr->end_time);
1253 bstrftime(buf4, sizeof(buf4), jcr->wait_time);
1254 fprintf(fp, "\tsched_time=%s start_time=%s\n\tend_time=%s wait_time=%s\n",
1255 buf1, buf2, buf3, buf4);
1256 fprintf(fp, "\tdb=%p db_batch=%p batch_started=%i\n",
1257 jcr->db, jcr->db_batch, jcr->batch_started);
1258
1259 /*
1260 * Call all the jcr debug hooks
1261 */
1262 for(int i=0; i < dbg_jcr_handler_count; i++) {
1263 dbg_jcr_hook_t *hook = dbg_jcr_hooks[i];
1264 hook(jcr, fp);
1265 }
1266 }
1267 }
1268