1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Manipulation routines for Job Control Records and
21 * handling of last_jobs_list.
22 *
23 * Kern E. Sibbald, December 2000
24 *
25 * These routines are thread safe.
26 *
27 * The job list routines were re-written in May 2005 to
28 * eliminate the global lock while traversing the list, and
29 * to use the dlist subroutines. The locking is now done
30 * on the list each time the list is modified or traversed.
31 * That is it is "micro-locked" rather than globally locked.
32 * The result is that there is one lock/unlock for each entry
33 * in the list while traversing it rather than a single lock
34 * at the beginning of a traversal and one at the end. This
35 * incurs slightly more overhead, but effectively eliminates
36 * the possibilty of race conditions. In addition, with the
37 * exception of the global locking of the list during the
38 * re-reading of the config file, no recursion is needed.
39 *
40 */
41
42 #include "bacula.h"
43 #include "jcr.h"
44
45 const int dbglvl = 3400;
46
47 /* External variables we reference */
48
49 /* External referenced functions */
50 void free_bregexps(alist *bregexps);
51
52 /* Forward referenced functions */
53 extern "C" void timeout_handler(int sig);
54 static void jcr_timeout_check(watchdog_t *self);
55 #ifdef TRACE_JCR_CHAIN
56 static void b_lock_jcr_chain(const char *filen, int line);
57 static void b_unlock_jcr_chain(const char *filen, int line);
58 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
59 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
60 #else
61 static void lock_jcr_chain();
62 static void unlock_jcr_chain();
63 #endif
64
65
66 int num_jobs_run;
67 dlist *last_jobs = NULL;
68 const int max_last_jobs = 10;
69
70 static dlist *jcrs = NULL; /* JCR chain */
71 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
72
73 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
74
75 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
76
77 static pthread_key_t jcr_key; /* Pointer to jcr for each thread */
78
79 pthread_once_t key_once = PTHREAD_ONCE_INIT;
80
81 static char Job_status[] = "Status JobId=%ld JobStatus=%d\n";
82
83
lock_jobs()84 void lock_jobs()
85 {
86 P(job_start_mutex);
87 }
88
unlock_jobs()89 void unlock_jobs()
90 {
91 V(job_start_mutex);
92 }
93
init_last_jobs_list()94 void init_last_jobs_list()
95 {
96 JCR *jcr = NULL;
97 struct s_last_job *job_entry = NULL;
98 if (!last_jobs) {
99 last_jobs = New(dlist(job_entry, &job_entry->link));
100 }
101 if (!jcrs) {
102 jcrs = New(dlist(jcr, &jcr->link));
103 }
104 }
105
term_last_jobs_list()106 void term_last_jobs_list()
107 {
108 if (last_jobs) {
109 lock_last_jobs_list();
110 while (!last_jobs->empty()) {
111 void *je = last_jobs->first();
112 last_jobs->remove(je);
113 free(je);
114 }
115 delete last_jobs;
116 last_jobs = NULL;
117 unlock_last_jobs_list();
118 }
119 if (jcrs) {
120 delete jcrs;
121 jcrs = NULL;
122 }
123 }
124
read_last_jobs_list(int fd,uint64_t addr)125 bool read_last_jobs_list(int fd, uint64_t addr)
126 {
127 struct s_last_job *je, job;
128 uint32_t num;
129 bool ok = true;
130
131 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
132 if (addr == 0 || lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
133 return false;
134 }
135 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
136 return false;
137 }
138 Dmsg1(100, "Read num_items=%d\n", num);
139 if (num > 4 * max_last_jobs) { /* sanity check */
140 return false;
141 }
142 lock_last_jobs_list();
143 for ( ; num; num--) {
144 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
145 berrno be;
146 Pmsg1(000, "Read job entry. ERR=%s\n", be.bstrerror());
147 ok = false;
148 break;
149 }
150 if (job.JobId > 0) {
151 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
152 memcpy((char *)je, (char *)&job, sizeof(job));
153 if (!last_jobs) {
154 init_last_jobs_list();
155 }
156 last_jobs->append(je);
157 if (last_jobs->size() > max_last_jobs) {
158 je = (struct s_last_job *)last_jobs->first();
159 last_jobs->remove(je);
160 free(je);
161 }
162 }
163 }
164 unlock_last_jobs_list();
165 return ok;
166 }
167
write_last_jobs_list(int fd,uint64_t addr)168 uint64_t write_last_jobs_list(int fd, uint64_t addr)
169 {
170 struct s_last_job *je;
171 uint32_t num;
172 ssize_t stat;
173
174 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
175 if (lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
176 return 0;
177 }
178 if (last_jobs) {
179 lock_last_jobs_list();
180 /* First record is number of entires */
181 num = last_jobs->size();
182 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
183 berrno be;
184 Pmsg1(000, "Error writing num_items: ERR=%s\n", be.bstrerror());
185 goto bail_out;
186 }
187 foreach_dlist(je, last_jobs) {
188 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
189 berrno be;
190 Pmsg1(000, "Error writing job: ERR=%s\n", be.bstrerror());
191 goto bail_out;
192 }
193 }
194 unlock_last_jobs_list();
195 }
196 /* Return current address */
197 stat = lseek(fd, 0, SEEK_CUR);
198 if (stat < 0) {
199 stat = 0;
200 }
201 return stat;
202
203 bail_out:
204 unlock_last_jobs_list();
205 return 0;
206 }
207
lock_last_jobs_list()208 void lock_last_jobs_list()
209 {
210 P(last_jobs_mutex);
211 }
212
unlock_last_jobs_list()213 void unlock_last_jobs_list()
214 {
215 V(last_jobs_mutex);
216 }
217
218 /* Get an ASCII representation of the Operation being performed as an english Noun */
get_OperationName()219 const char *JCR::get_OperationName()
220 {
221 switch(m_JobType) {
222 case JT_BACKUP:
223 return _("Backup");
224 case JT_VERIFY:
225 return _("Verifying");
226 case JT_RESTORE:
227 return _("Restoring");
228 case JT_ARCHIVE:
229 return _("Archiving");
230 case JT_COPY:
231 return _("Copying");
232 case JT_MIGRATE:
233 return _("Migration");
234 case JT_SCAN:
235 return _("Scanning");
236 default:
237 return _("Unknown operation");
238 }
239 }
240
241 /* Get an ASCII representation of the Action being performed either an english Verb or Adjective */
get_ActionName(bool past)242 const char *JCR::get_ActionName(bool past)
243 {
244 switch(m_JobType) {
245 case JT_BACKUP:
246 return _("backup");
247 case JT_VERIFY:
248 return (past == true) ? _("verified") : _("verify");
249 case JT_RESTORE:
250 return (past == true) ? _("restored") : _("restore");
251 case JT_ARCHIVE:
252 return (past == true) ? _("archived") : _("archive");
253 case JT_COPY:
254 return (past == true) ? _("copied") : _("copy");
255 case JT_MIGRATE:
256 return (past == true) ? _("migrated") : _("migrate");
257 case JT_SCAN:
258 return (past == true) ? _("scanned") : _("scan");
259 default:
260 return _("unknown action");
261 }
262 }
263
JobReads()264 bool JCR::JobReads()
265 {
266 switch (m_JobType) {
267 case JT_VERIFY:
268 case JT_RESTORE:
269 case JT_COPY:
270 case JT_MIGRATE:
271 return true;
272 case JT_BACKUP:
273 if (m_JobLevel == L_VIRTUAL_FULL) {
274 return true;
275 }
276 break;
277 default:
278 break;
279 }
280 return false;
281 }
282
283 /* We can stop only Backup jobs connected to a client. It doesn't make sens at
284 * this time to stop a copy, migraton, restore or a verify job. The specific
285 * code should be implemented first.
286 */
can_be_stopped()287 bool JCR::can_be_stopped()
288 {
289 bool ok=true;
290 if (getJobType() == JT_BACKUP) { /* Is a Backup */
291 if (getJobLevel() == L_VIRTUAL_FULL) { /* Is a VirtualFull */
292 ok = false;
293 }
294 } else { /* Is not a backup (so, copy, migration, admin, verify, ... */
295 ok = false;
296 }
297 return ok;
298 }
299
300 /*
301 * Push a subroutine address into the job end callback stack
302 */
job_end_push(JCR * jcr,void job_end_cb (JCR * jcr,void *),void * ctx)303 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
304 {
305 jcr->job_end_push.append((void *)job_end_cb);
306 jcr->job_end_push.append(ctx);
307 }
308
309 /* DELETE ME when bugs in MA1512, MA1632 MA1639 are fixed */
310 void (*MA1512_reload_job_end_cb)(JCR *,void *) = NULL;
311
312 /* Pop each job_end subroutine and call it */
job_end_pop(JCR * jcr)313 static void job_end_pop(JCR *jcr)
314 {
315 void (*job_end_cb)(JCR *jcr, void *ctx);
316 void *ctx;
317 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
318 ctx = jcr->job_end_push.get(i--);
319 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
320 /* check for bug MA1512, MA1632 MA1639,
321 * today, job_end_cb can only be reload_job_end_cb() from DIR */
322 if (job_end_cb != MA1512_reload_job_end_cb && MA1512_reload_job_end_cb != NULL) {
323 Tmsg2(0, "Bug 'job_end_pop' detected, skip ! job_end_cb=0x%p ctx=0x%p\n", job_end_cb, ctx);
324 Tmsg0(0, "Display job_end_push list\n");
325 for (int j=jcr->job_end_push.size()-1; j > 0; ) {
326 void *ctx2 = jcr->job_end_push.get(j--);
327 void *job_end_cb2 = jcr->job_end_push.get(j--);
328 Tmsg3(0, "Bug 'job_end_pop' entry[%d] job_end_cb=0x%p ctx=0x%p\n", j+1, job_end_cb2, ctx2);
329 }
330 } else
331 {
332 job_end_cb(jcr, ctx);
333 }
334 }
335 }
336
337 /*
338 * Create thread key for thread specific data
339 */
create_jcr_key()340 void create_jcr_key()
341 {
342 int status = pthread_key_create(&jcr_key, NULL);
343 if (status != 0) {
344 berrno be;
345 Jmsg1(NULL, M_ABORT, 0, _("pthread key create failed: ERR=%s\n"),
346 be.bstrerror(status));
347 }
348 }
349
350 /*
351 * Create a Job Control Record and link it into JCR chain
352 * Returns newly allocated JCR
353 * Note, since each daemon has a different JCR, he passes
354 * us the size.
355 */
new_jcr(int size,JCR_free_HANDLER * daemon_free_jcr)356 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
357 {
358 JCR *jcr;
359 MQUEUE_ITEM *item = NULL;
360 int status;
361
362 Dmsg0(dbglvl, "Enter new_jcr\n");
363 status = pthread_once(&key_once, create_jcr_key);
364 if (status != 0) {
365 berrno be;
366 Jmsg1(NULL, M_ABORT, 0, _("pthread_once failed. ERR=%s\n"), be.bstrerror(status));
367 }
368 jcr = (JCR *)malloc(size);
369 bmemzero(jcr, size);
370 /* Note for the director, this value is changed in jobq.c */
371 jcr->my_thread_id = pthread_self();
372 jcr->msg_queue = New(dlist(item, &item->link));
373 if ((status = pthread_mutex_init(&jcr->msg_queue_mutex, NULL)) != 0) {
374 berrno be;
375 Jmsg(NULL, M_ABORT, 0, _("Could not init msg_queue mutex. ERR=%s\n"),
376 be.bstrerror(status));
377 }
378 jcr->job_end_push.init(1, false);
379 jcr->sched_time = time(NULL);
380 jcr->initial_sched_time = jcr->sched_time;
381 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
382 jcr->init_mutex();
383 jcr->inc_use_count();
384 jcr->VolumeName = get_pool_memory(PM_FNAME);
385 jcr->VolumeName[0] = 0;
386 jcr->errmsg = get_pool_memory(PM_MESSAGE);
387 jcr->errmsg[0] = 0;
388 jcr->comment = get_pool_memory(PM_FNAME);
389 jcr->comment[0] = 0;
390 jcr->StatusErrMsg = get_pool_memory(PM_FNAME);
391 jcr->StatusErrMsg[0] = 0;
392 jcr->job_uid = -1;
393 /* Setup some dummy values */
394 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
395 jcr->JobId = 0;
396 jcr->setJobType(JT_SYSTEM); /* internal job until defined */
397 jcr->setJobLevel(L_NONE);
398 jcr->setJobStatus(JS_Created); /* ready to run */
399 #ifndef HAVE_WIN32
400 struct sigaction sigtimer;
401 sigtimer.sa_flags = 0;
402 sigtimer.sa_handler = timeout_handler;
403 sigfillset(&sigtimer.sa_mask);
404 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
405 #endif
406
407 /*
408 * Locking jobs is a global lock that is needed
409 * so that the Director can stop new jobs from being
410 * added to the jcr chain while it processes a new
411 * conf file and does the job_end_push().
412 */
413 lock_jobs();
414 lock_jcr_chain();
415 if (!jcrs) {
416 jcrs = New(dlist(jcr, &jcr->link));
417 }
418 jcrs->append(jcr);
419 unlock_jcr_chain();
420 unlock_jobs();
421
422 return jcr;
423 }
424
425
426 /*
427 * Remove a JCR from the chain
428 * NOTE! The chain must be locked prior to calling
429 * this routine.
430 */
remove_jcr(JCR * jcr)431 static void remove_jcr(JCR *jcr)
432 {
433 Dmsg0(dbglvl, "Enter remove_jcr\n");
434 if (!jcr) {
435 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
436 }
437 jcrs->remove(jcr);
438 Dmsg0(dbglvl, "Leave remove_jcr\n");
439 }
440
441 /*
442 * Free stuff common to all JCRs. N.B. Be careful to include only
443 * generic stuff in the common part of the jcr.
444 */
free_common_jcr(JCR * jcr)445 static void free_common_jcr(JCR *jcr)
446 {
447 /* Uses jcr lock/unlock */
448 remove_jcr_from_tsd(jcr);
449 jcr->set_killable(false);
450
451 jcr->destroy_mutex();
452
453 if (jcr->msg_queue) {
454 delete jcr->msg_queue;
455 jcr->msg_queue = NULL;
456 pthread_mutex_destroy(&jcr->msg_queue_mutex);
457 }
458
459 /* do this after closing messages */
460 free_and_null_pool_memory(jcr->JobIds);
461 free_and_null_pool_memory(jcr->client_name);
462 free_and_null_pool_memory(jcr->attr);
463 free_and_null_pool_memory(jcr->VolumeName);
464 free_and_null_pool_memory(jcr->errmsg);
465 free_and_null_pool_memory(jcr->StatusErrMsg);
466
467 if (jcr->sd_auth_key) {
468 free(jcr->sd_auth_key);
469 jcr->sd_auth_key = NULL;
470 }
471
472 free_bsock(jcr->dir_bsock);
473
474 if (jcr->where) {
475 free(jcr->where);
476 jcr->where = NULL;
477 }
478 if (jcr->RegexWhere) {
479 free(jcr->RegexWhere);
480 jcr->RegexWhere = NULL;
481 }
482 if (jcr->where_bregexp) {
483 free_bregexps(jcr->where_bregexp);
484 delete jcr->where_bregexp;
485 jcr->where_bregexp = NULL;
486 }
487 if (jcr->cached_path) {
488 free_pool_memory(jcr->cached_path);
489 jcr->cached_path = NULL;
490 jcr->cached_pnl = 0;
491 }
492 if (jcr->id_list) {
493 free_guid_list(jcr->id_list);
494 jcr->id_list = NULL;
495 }
496 if (jcr->comment) {
497 free_pool_memory(jcr->comment);
498 jcr->comment = NULL;
499 }
500 free(jcr);
501 }
502
503 /*
504 * Global routine to free a jcr
505 */
506 #ifdef DEBUG
b_free_jcr(const char * file,int line,JCR * jcr)507 void b_free_jcr(const char *file, int line, JCR *jcr)
508 {
509 struct s_last_job *je;
510
511 Dmsg3(dbglvl, "Enter free_jcr jid=%u from %s:%d\n", jcr->JobId, file, line);
512
513 #else
514
515 void free_jcr(JCR *jcr)
516 {
517 struct s_last_job *je;
518
519 Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n",
520 jcr->JobId, jcr->use_count(), jcr->Job);
521
522 #endif
523
524 lock_jcr_chain();
525 jcr->dec_use_count(); /* decrement use count */
526 ASSERT2(jcr->use_count() >= 0, "JCR use_count < 0");
527 // Jmsg2(jcr, M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
528 // jcr->use_count(), jcr->JobId);
529 //}
530 if (jcr->JobId > 0) {
531 Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n",
532 jcr->JobId, jcr->use_count(), jcr->Job);
533 }
534 if (jcr->use_count() > 0) { /* if in use */
535 unlock_jcr_chain();
536 return;
537 }
538 if (jcr->JobId > 0) {
539 Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n",
540 jcr->JobId, jcr->use_count(), jcr->Job);
541 }
542 jcr->exiting = true;
543 remove_jcr(jcr); /* remove Jcr from chain */
544 unlock_jcr_chain();
545
546 if (jcr->JobId > 0) {
547 dequeue_messages(jcr);
548 dequeue_daemon_messages(jcr);
549 }
550 close_msg(jcr); /* close messages for this job */
551 job_end_pop(jcr); /* pop and call hooked routines */
552
553 Dmsg1(dbglvl, "End job=%d\n", jcr->JobId);
554
555 /* Keep some statistics */
556 switch (jcr->getJobType()) {
557 case JT_BACKUP:
558 case JT_VERIFY:
559 case JT_RESTORE:
560 case JT_MIGRATE:
561 case JT_COPY:
562 case JT_ADMIN:
563 /* Keep list of last jobs, but not Console where JobId==0 */
564 if (jcr->JobId > 0) {
565 lock_last_jobs_list();
566 num_jobs_run++;
567 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
568 memset(je, 0, sizeof(struct s_last_job)); /* zero in case unset fields */
569 je->Errors = jcr->JobErrors;
570 je->JobType = jcr->getJobType();
571 je->JobId = jcr->JobId;
572 je->VolSessionId = jcr->VolSessionId;
573 je->VolSessionTime = jcr->VolSessionTime;
574 bstrncpy(je->Job, jcr->Job, sizeof(je->Job));
575 je->JobFiles = jcr->JobFiles;
576 je->JobBytes = jcr->JobBytes;
577 je->JobStatus = jcr->JobStatus;
578 je->JobLevel = jcr->getJobLevel();
579 je->start_time = jcr->start_time;
580 je->end_time = time(NULL);
581
582 if (!last_jobs) {
583 init_last_jobs_list();
584 }
585 last_jobs->append(je);
586 if (last_jobs->size() > max_last_jobs) {
587 je = (struct s_last_job *)last_jobs->first();
588 last_jobs->remove(je);
589 free(je);
590 }
591 unlock_last_jobs_list();
592 }
593 break;
594 default:
595 break;
596 }
597
598 if (jcr->daemon_free_jcr) {
599 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
600 }
601
602 free_common_jcr(jcr);
603 close_msg(NULL); /* flush any daemon messages */
604 Dmsg0(dbglvl, "Exit free_jcr\n");
605 }
606
607 /*
608 * Remove jcr from thread specific data, but
609 * but make sure it is us who are attached.
610 */
611 void remove_jcr_from_tsd(JCR *jcr)
612 {
613 JCR *tjcr = get_jcr_from_tsd();
614 if (tjcr == jcr) {
615 set_jcr_in_tsd(INVALID_JCR);
616 }
617 }
618
619 void JCR::set_killable(bool killable)
620 {
621 lock();
622 my_thread_killable = killable;
623 unlock();
624 }
625
626 /*
627 * Put this jcr in the thread specifc data
628 * if update_thread_info is true and the jcr is valide,
629 * we update the my_thread_id in the JCR
630 */
631 void set_jcr_in_tsd(JCR *jcr)
632 {
633 int status = pthread_setspecific(jcr_key, (void *)jcr);
634 if (status != 0) {
635 berrno be;
636 Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"),
637 be.bstrerror(status));
638 }
639 }
640
641 void JCR::my_thread_send_signal(int sig)
642 {
643 lock_jcr_chain(); /* use global lock */
644 this->lock();
645 if (this->exiting) {
646 goto get_out;
647 }
648 if (this->is_killable() &&
649 !pthread_equal(this->my_thread_id, pthread_self()))
650 {
651 Dmsg1(800, "Send kill to jid=%d\n", this->JobId);
652 pthread_kill(this->my_thread_id, sig);
653 this->exiting = true;
654
655 } else if (!this->is_killable()) {
656 Dmsg1(10, "Warning, cannot send kill to jid=%d marked not killable.\n", this->JobId);
657 }
658 get_out:
659 this->unlock();
660 unlock_jcr_chain();
661 }
662
663 /*
664 * Give me the jcr that is attached to this thread
665 */
666 JCR *get_jcr_from_tsd()
667 {
668 JCR *jcr = (JCR *)pthread_getspecific(jcr_key);
669 // printf("get_jcr_from_tsd: jcr=%p\n", jcr);
670 /* set any INVALID_JCR to NULL which the rest of Bacula understands */
671 if (jcr == INVALID_JCR) {
672 jcr = NULL;
673 }
674 return jcr;
675 }
676
677
678 /*
679 * Find which JobId corresponds to the current thread
680 */
681 uint32_t get_jobid_from_tsd()
682 {
683 JCR *jcr;
684 uint32_t JobId = 0;
685 jcr = get_jcr_from_tsd();
686 // printf("get_jobid_from_tsr: jcr=%p\n", jcr);
687 if (jcr) {
688 JobId = (uint32_t)jcr->JobId;
689 }
690 return JobId;
691 }
692
693 /*
694 * Given a JobId, find the JCR
695 * Returns: jcr on success
696 * NULL on failure
697 */
698 JCR *get_jcr_by_id(uint32_t JobId)
699 {
700 JCR *jcr;
701
702 foreach_jcr(jcr) {
703 if (jcr->JobId == JobId) {
704 jcr->inc_use_count();
705 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
706 jcr->JobId, jcr->use_count(), jcr->Job);
707 break;
708 }
709 }
710 endeach_jcr(jcr);
711 return jcr;
712 }
713
714 /*
715 * Given a thread id, find the JobId
716 * Returns: JobId on success
717 * 0 on failure
718 */
719 uint32_t get_jobid_from_tid(pthread_t tid)
720 {
721 JCR *jcr = NULL;
722 bool found = false;
723
724 foreach_jcr(jcr) {
725 if (pthread_equal(jcr->my_thread_id, tid)) {
726 found = true;
727 break;
728 }
729 }
730 endeach_jcr(jcr);
731 if (found) {
732 return jcr->JobId;
733 }
734 return 0;
735 }
736
737
738 /*
739 * Given a SessionId and SessionTime, find the JCR
740 * Returns: jcr on success
741 * NULL on failure
742 */
743 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
744 {
745 JCR *jcr;
746
747 foreach_jcr(jcr) {
748 if (jcr->VolSessionId == SessionId &&
749 jcr->VolSessionTime == SessionTime) {
750 jcr->inc_use_count();
751 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
752 jcr->JobId, jcr->use_count(), jcr->Job);
753 break;
754 }
755 }
756 endeach_jcr(jcr);
757 return jcr;
758 }
759
760
761 /*
762 * Given a Job, find the JCR
763 * compares on the number of characters in Job
764 * thus allowing partial matches.
765 * Returns: jcr on success
766 * NULL on failure
767 */
768 JCR *get_jcr_by_partial_name(char *Job)
769 {
770 JCR *jcr;
771 int len;
772
773 if (!Job) {
774 return NULL;
775 }
776 len = strlen(Job);
777 foreach_jcr(jcr) {
778 if (strncmp(Job, jcr->Job, len) == 0) {
779 jcr->inc_use_count();
780 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
781 jcr->JobId, jcr->use_count(), jcr->Job);
782 break;
783 }
784 }
785 endeach_jcr(jcr);
786 return jcr;
787 }
788
789
790 /*
791 * Given a Job, find the JCR
792 * requires an exact match of names.
793 * Returns: jcr on success
794 * NULL on failure
795 */
796 JCR *get_jcr_by_full_name(char *Job)
797 {
798 JCR *jcr;
799
800 if (!Job) {
801 return NULL;
802 }
803 foreach_jcr(jcr) {
804 if (strcmp(jcr->Job, Job) == 0) {
805 jcr->inc_use_count();
806 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
807 jcr->JobId, jcr->use_count(), jcr->Job);
808 break;
809 }
810 }
811 endeach_jcr(jcr);
812 return jcr;
813 }
814
815 static void update_wait_time(JCR *jcr, int newJobStatus)
816 {
817 bool enter_in_waittime;
818 int oldJobStatus = jcr->JobStatus;
819
820 switch (newJobStatus) {
821 case JS_WaitFD:
822 case JS_WaitSD:
823 case JS_WaitMedia:
824 case JS_WaitMount:
825 case JS_WaitStoreRes:
826 case JS_WaitJobRes:
827 case JS_WaitClientRes:
828 case JS_WaitMaxJobs:
829 case JS_WaitPriority:
830 enter_in_waittime = true;
831 break;
832 default:
833 enter_in_waittime = false; /* not a Wait situation */
834 break;
835 }
836
837 /*
838 * If we were previously waiting and are not any more
839 * we want to update the wait_time variable, which is
840 * the start of waiting.
841 */
842 switch (oldJobStatus) {
843 case JS_WaitFD:
844 case JS_WaitSD:
845 case JS_WaitMedia:
846 case JS_WaitMount:
847 case JS_WaitStoreRes:
848 case JS_WaitJobRes:
849 case JS_WaitClientRes:
850 case JS_WaitMaxJobs:
851 case JS_WaitPriority:
852 if (!enter_in_waittime) { /* we get out the wait time */
853 jcr->wait_time_sum += (time(NULL) - jcr->wait_time);
854 jcr->wait_time = 0;
855 }
856 break;
857
858 /* if wait state is new, we keep current time for watchdog MaxWaitTime */
859 default:
860 if (enter_in_waittime) {
861 jcr->wait_time = time(NULL);
862 }
863 break;
864 }
865 }
866
867 /*
868 * Priority runs from 0 (lowest) to 10 (highest)
869 */
870 static int get_status_priority(int JobStatus)
871 {
872 int priority = 0;
873 switch (JobStatus) {
874 case JS_Incomplete:
875 priority = 10;
876 break;
877 case JS_ErrorTerminated:
878 case JS_FatalError:
879 case JS_Canceled:
880 priority = 9;
881 break;
882 case JS_Error:
883 priority = 8;
884 break;
885 case JS_Differences:
886 priority = 7;
887 break;
888 }
889 return priority;
890 }
891
892 /*
893 * Send Job status to Director
894 */
895 bool JCR::sendJobStatus()
896 {
897 if (dir_bsock) {
898 return dir_bsock->fsend(Job_status, JobId, JobStatus);
899 }
900 return true;
901 }
902
903 /*
904 * Set and send Job status to Director
905 */
906 bool JCR::sendJobStatus(int aJobStatus)
907 {
908 if (!is_JobStatus(aJobStatus)) {
909 setJobStatus(aJobStatus);
910 if (dir_bsock) {
911 return dir_bsock->fsend(Job_status, JobId, JobStatus);
912 }
913 }
914 return true;
915 }
916
917 void JCR::setJobStarted()
918 {
919 job_started = true;
920 job_started_time = time(NULL);
921 }
922
923 static pthread_mutex_t status_lock = PTHREAD_MUTEX_INITIALIZER;
924
925 void JCR::setJobStatus(int newJobStatus)
926 {
927 int priority, old_priority;
928 int oldJobStatus = JobStatus;
929
930 P(status_lock);
931 priority = get_status_priority(newJobStatus);
932 old_priority = get_status_priority(oldJobStatus);
933
934 Dmsg2(800, "set_jcr_job_status(%ld, %c)\n", JobId, newJobStatus);
935
936 /* Update wait_time depending on newJobStatus and oldJobStatus */
937 update_wait_time(this, newJobStatus);
938
939 /*
940 * For a set of errors, ... keep the current status
941 * so it isn't lost. For all others, set it.
942 */
943 Dmsg2(800, "OnEntry JobStatus=%c newJobstatus=%c\n", (oldJobStatus==0)?'0':oldJobStatus, newJobStatus);
944 /*
945 * If status priority is > than proposed new status, change it.
946 * If status priority == new priority and both are zero, take
947 * the new status.
948 * If it is not zero, then we keep the first non-zero "error" that
949 * occurred.
950 */
951 if (priority > old_priority || (
952 priority == 0 && old_priority == 0)) {
953 Dmsg4(800, "Set new stat. old: %c,%d new: %c,%d\n",
954 (oldJobStatus==0)?'0':oldJobStatus, old_priority, newJobStatus, priority);
955 JobStatus = newJobStatus; /* replace with new status */
956 }
957
958 if (oldJobStatus != JobStatus) {
959 Dmsg2(800, "leave setJobStatus old=%c new=%c\n", (oldJobStatus==0)?'0':oldJobStatus, newJobStatus);
960 // generate_plugin_event(this, bEventStatusChange, NULL);
961 }
962 V(status_lock);
963 }
964
965 #ifdef TRACE_JCR_CHAIN
966 static int lock_count = 0;
967 #endif
968
969 /*
970 * Lock the chain
971 */
972 #ifdef TRACE_JCR_CHAIN
973 static void b_lock_jcr_chain(const char *fname, int line)
974 #else
975 static void lock_jcr_chain()
976 #endif
977 {
978 #ifdef TRACE_JCR_CHAIN
979 Dmsg3(dbglvl, "Lock jcr chain %d from %s:%d\n", ++lock_count, fname, line);
980 #endif
981 P(jcr_lock);
982 }
983
984 /*
985 * Unlock the chain
986 */
987 #ifdef TRACE_JCR_CHAIN
988 static void b_unlock_jcr_chain(const char *fname, int line)
989 #else
990 static void unlock_jcr_chain()
991 #endif
992 {
993 #ifdef TRACE_JCR_CHAIN
994 Dmsg3(dbglvl, "Unlock jcr chain %d from %s:%d\n", lock_count--, fname, line);
995 #endif
996 V(jcr_lock);
997 }
998
999 /*
1000 * Start walk of jcr chain
1001 * The proper way to walk the jcr chain is:
1002 * JCR *jcr;
1003 * foreach_jcr(jcr) {
1004 * ...
1005 * }
1006 * endeach_jcr(jcr);
1007 *
1008 * It is possible to leave out the endeach_jcr(jcr), but
1009 * in that case, the last jcr referenced must be explicitly
1010 * released with:
1011 *
1012 * free_jcr(jcr);
1013 *
1014 */
1015 JCR *jcr_walk_start()
1016 {
1017 JCR *jcr;
1018 lock_jcr_chain();
1019 jcr = (JCR *)jcrs->first();
1020 if (jcr) {
1021 jcr->inc_use_count();
1022 if (jcr->JobId > 0) {
1023 Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n",
1024 jcr->JobId, jcr->use_count(), jcr->Job);
1025 }
1026 }
1027 unlock_jcr_chain();
1028 return jcr;
1029 }
1030
1031 /*
1032 * Get next jcr from chain, and release current one
1033 */
1034 JCR *jcr_walk_next(JCR *prev_jcr)
1035 {
1036 JCR *jcr;
1037
1038 lock_jcr_chain();
1039 jcr = (JCR *)jcrs->next(prev_jcr);
1040 if (jcr) {
1041 jcr->inc_use_count();
1042 if (jcr->JobId > 0) {
1043 Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n",
1044 jcr->JobId, jcr->use_count(), jcr->Job);
1045 }
1046 }
1047 unlock_jcr_chain();
1048 if (prev_jcr) {
1049 free_jcr(prev_jcr);
1050 }
1051 return jcr;
1052 }
1053
1054 /*
1055 * Release last jcr referenced
1056 */
1057 void jcr_walk_end(JCR *jcr)
1058 {
1059 if (jcr) {
1060 if (jcr->JobId > 0) {
1061 Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n",
1062 jcr->JobId, jcr->use_count(), jcr->Job);
1063 }
1064 free_jcr(jcr);
1065 }
1066 }
1067
1068 /*
1069 * Return number of Jobs
1070 */
1071 int job_count()
1072 {
1073 JCR *jcr;
1074 int count = 0;
1075
1076 lock_jcr_chain();
1077 for (jcr = (JCR *)jcrs->first(); jcr ; jcr = (JCR *)jcrs->next(jcr)) {
1078 if (jcr->JobId > 0) {
1079 count++;
1080 }
1081 }
1082 unlock_jcr_chain();
1083 return count;
1084 }
1085
1086
1087 /*
1088 * Setup to call the timeout check routine every 30 seconds
1089 * This routine will check any timers that have been enabled.
1090 */
1091 bool init_jcr_subsystem(void)
1092 {
1093 watchdog_t *wd = new_watchdog();
1094
1095 wd->one_shot = false;
1096 wd->interval = 30; /* FIXME: should be configurable somewhere, even
1097 if only with a #define */
1098 wd->callback = jcr_timeout_check;
1099
1100 register_watchdog(wd);
1101
1102 return true;
1103 }
1104
1105 static void jcr_timeout_check(watchdog_t *self)
1106 {
1107 JCR *jcr;
1108 BSOCK *bs;
1109 time_t timer_start;
1110
1111 Dmsg0(dbglvl, "Start JCR timeout checks\n");
1112
1113 /* Walk through all JCRs checking if any one is
1114 * blocked for more than specified max time.
1115 */
1116 foreach_jcr(jcr) {
1117 Dmsg2(dbglvl, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
1118 if (jcr->JobId == 0) {
1119 continue;
1120 }
1121 bs = jcr->store_bsock;
1122 if (bs) {
1123 timer_start = bs->timer_start;
1124 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
1125 bs->timer_start = 0; /* turn off timer */
1126 bs->set_timed_out();
1127 Qmsg(jcr, M_ERROR, 0, _(
1128 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
1129 (int)(watchdog_time - timer_start));
1130 jcr->my_thread_send_signal(TIMEOUT_SIGNAL);
1131 }
1132 }
1133 bs = jcr->file_bsock;
1134 if (bs) {
1135 timer_start = bs->timer_start;
1136 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
1137 bs->timer_start = 0; /* turn off timer */
1138 bs->set_timed_out();
1139 Qmsg(jcr, M_ERROR, 0, _(
1140 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
1141 (int)(watchdog_time - timer_start));
1142 jcr->my_thread_send_signal(TIMEOUT_SIGNAL);
1143 }
1144 }
1145 bs = jcr->dir_bsock;
1146 if (bs) {
1147 timer_start = bs->timer_start;
1148 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
1149 bs->timer_start = 0; /* turn off timer */
1150 bs->set_timed_out();
1151 Qmsg(jcr, M_ERROR, 0, _(
1152 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
1153 (int)(watchdog_time - timer_start));
1154 jcr->my_thread_send_signal(TIMEOUT_SIGNAL);
1155 }
1156 }
1157 }
1158 endeach_jcr(jcr);
1159
1160 Dmsg0(dbglvl, "Finished JCR timeout checks\n");
1161 }
1162
1163 /*
1164 * Return next JobId from comma separated list
1165 *
1166 * Returns:
1167 * 1 if next JobId returned
1168 * 0 if no more JobIds are in list
1169 * -1 there is an error
1170 */
1171 int get_next_jobid_from_list(char **p, uint32_t *JobId)
1172 {
1173 const int maxlen = 30;
1174 char jobid[maxlen+1];
1175 char *q = *p;
1176
1177 jobid[0] = 0;
1178 for (int i=0; i<maxlen; i++) {
1179 if (*q == 0) {
1180 break;
1181 } else if (*q == ',') {
1182 q++;
1183 break;
1184 }
1185 jobid[i] = *q++;
1186 jobid[i+1] = 0;
1187 }
1188 if (jobid[0] == 0) {
1189 return 0;
1190 } else if (!is_a_number(jobid)) {
1191 return -1; /* error */
1192 }
1193 *p = q;
1194 *JobId = str_to_int64(jobid);
1195 return 1;
1196 }
1197
1198 /*
1199 * Timeout signal comes here
1200 */
1201 extern "C" void timeout_handler(int sig)
1202 {
1203 return; /* thus interrupting the function */
1204 }
1205
1206 /* Used to display specific daemon information after a fatal signal
1207 * (like BDB in the director)
1208 */
1209 #define MAX_DBG_HOOK 10
1210 static dbg_jcr_hook_t *dbg_jcr_hooks[MAX_DBG_HOOK];
1211 static int dbg_jcr_handler_count=0;
1212
1213 void dbg_jcr_add_hook(dbg_jcr_hook_t *hook)
1214 {
1215 ASSERT(dbg_jcr_handler_count < MAX_DBG_HOOK);
1216 dbg_jcr_hooks[dbg_jcr_handler_count++] = hook;
1217 }
1218
1219 /* on win32, the pthread_t is a struct, so we don't display it */
1220 #ifdef HAVE_MINGW_W64
1221 # define get_threadid(a) (void *)0
1222 #else
1223 # define get_threadid(a) (void *)(a)
1224 #endif
1225 /*
1226 * !!! WARNING !!!
1227 *
1228 * This function should be used ONLY after a fatal signal. We walk through the
1229 * JCR chain without doing any lock, Bacula should not be running.
1230 */
1231 void dbg_print_jcr(FILE *fp)
1232 {
1233 char buf1[128], buf2[128], buf3[128], buf4[128];
1234
1235 if (!jcrs) {
1236 return;
1237 }
1238
1239 fprintf(fp, "Attempt to dump current JCRs. njcrs=%d\n", jcrs->size());
1240
1241 for (JCR *jcr = (JCR *)jcrs->first(); jcr ; jcr = (JCR *)jcrs->next(jcr)) {
1242 fprintf(fp, "threadid=%p JobId=%d JobStatus=%c jcr=%p name=%s\n",
1243 get_threadid(jcr->my_thread_id), (int)jcr->JobId, jcr->JobStatus, jcr, jcr->Job);
1244 fprintf(fp, "\tuse_count=%i killable=%d\n",
1245 jcr->use_count(), jcr->is_killable());
1246 fprintf(fp, "\tJobType=%c JobLevel=%c\n",
1247 jcr->getJobType(), jcr->getJobLevel());
1248 bstrftime(buf1, sizeof(buf1), jcr->sched_time);
1249 bstrftime(buf2, sizeof(buf2), jcr->start_time);
1250 bstrftime(buf3, sizeof(buf3), jcr->end_time);
1251 bstrftime(buf4, sizeof(buf4), jcr->wait_time);
1252 fprintf(fp, "\tsched_time=%s start_time=%s\n\tend_time=%s wait_time=%s\n",
1253 buf1, buf2, buf3, buf4);
1254 fprintf(fp, "\tdb=%p db_batch=%p batch_started=%i\n",
1255 jcr->db, jcr->db_batch, jcr->batch_started);
1256
1257 /*
1258 * Call all the jcr debug hooks
1259 */
1260 for(int i=0; i < dbg_jcr_handler_count; i++) {
1261 dbg_jcr_hook_t *hook = dbg_jcr_hooks[i];
1262 hook(jcr, fp);
1263 }
1264 }
1265 }
1266