1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Spooling code
21 *
22 * Kern Sibbald, March 2004
23 *
24 */
25
26 #include "bacula.h"
27 #include "stored.h"
28
29 /* Forward referenced subroutines */
30 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
31 static bool open_data_spool_file(DCR *dcr);
32 static bool close_data_spool_file(DCR *dcr);
33 static bool despool_data(DCR *dcr, bool commit);
34 static int read_block_from_spool_file(DCR *dcr);
35 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
36 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
37 static ssize_t write_spool_header(DCR *dcr, ssize_t *expected);
38 static ssize_t write_spool_data(DCR *dcr, ssize_t *expected);
39 static bool write_spool_block(DCR *dcr);
40
41 struct spool_stats_t {
42 uint32_t data_jobs; /* current jobs spooling data */
43 uint32_t attr_jobs;
44 uint32_t total_data_jobs; /* total jobs to have spooled data */
45 uint32_t total_attr_jobs;
46 int64_t max_data_size; /* max data size */
47 int64_t max_attr_size;
48 int64_t data_size; /* current data size (all jobs running) */
49 int64_t attr_size;
50 };
51
52 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
53 spool_stats_t spool_stats;
54
55 /*
56 * Header for data spool record */
57 struct spool_hdr {
58 int32_t FirstIndex; /* FirstIndex for buffer */
59 int32_t LastIndex; /* LastIndex for buffer */
60 uint32_t len; /* length of next buffer */
61 };
62
63 enum {
64 RB_EOT = 1,
65 RB_ERROR,
66 RB_OK
67 };
68
list_spool_stats(void sendit (const char * msg,int len,void * sarg),void * arg)69 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
70 {
71 char ed1[30], ed2[30];
72 POOL_MEM msg(PM_MESSAGE);
73 int len;
74
75 len = Mmsg(msg, _("Spooling statistics:\n"));
76
77 if (spool_stats.data_jobs || spool_stats.max_data_size) {
78 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
79 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
80 spool_stats.total_data_jobs,
81 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
82
83 sendit(msg.c_str(), len, arg);
84 }
85 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
86 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
87 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
88 spool_stats.total_attr_jobs,
89 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
90
91 sendit(msg.c_str(), len, arg);
92 }
93 }
94
begin_data_spool(DCR * dcr)95 bool begin_data_spool(DCR *dcr)
96 {
97 bool stat = true;
98 if (dcr->dev->is_aligned()) {
99 dcr->jcr->spool_data = false;
100 }
101 if (dcr->jcr->spool_data) {
102 Dmsg0(100, "Turning on data spooling\n");
103 dcr->spool_data = true;
104 stat = open_data_spool_file(dcr);
105 if (stat) {
106 dcr->spooling = true;
107 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
108 P(mutex);
109 spool_stats.data_jobs++;
110 V(mutex);
111 }
112 }
113 return stat;
114 }
115
discard_data_spool(DCR * dcr)116 bool discard_data_spool(DCR *dcr)
117 {
118 if (dcr->spooling) {
119 Dmsg0(100, "Data spooling discarded\n");
120 return close_data_spool_file(dcr);
121 }
122 return true;
123 }
124
commit_data_spool(DCR * dcr)125 bool commit_data_spool(DCR *dcr)
126 {
127 bool stat;
128
129 if (dcr->spooling) {
130 Dmsg0(100, "Committing spooled data\n");
131 stat = despool_data(dcr, true /*commit*/);
132 if (!stat) {
133 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
134 close_data_spool_file(dcr);
135 return false;
136 }
137 return close_data_spool_file(dcr);
138 }
139 return true;
140 }
141
make_unique_data_spool_filename(DCR * dcr,POOLMEM ** name)142 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
143 {
144 const char *dir;
145 if (dcr->dev->device->spool_directory) {
146 dir = dcr->dev->device->spool_directory;
147 } else {
148 dir = working_directory;
149 }
150 Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
151 dcr->jcr->Job, dcr->device->hdr.name);
152 }
153
154
open_data_spool_file(DCR * dcr)155 static bool open_data_spool_file(DCR *dcr)
156 {
157 POOLMEM *name = get_pool_memory(PM_MESSAGE);
158 int spool_fd;
159
160 make_unique_data_spool_filename(dcr, &name);
161 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY|O_CLOEXEC, 0640)) >= 0) {
162 dcr->spool_fd = spool_fd;
163 dcr->jcr->spool_attributes = true;
164 } else {
165 berrno be;
166 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
167 be.bstrerror());
168 free_pool_memory(name);
169 return false;
170 }
171 Dmsg1(100, "Created spool file: %s\n", name);
172 free_pool_memory(name);
173 return true;
174 }
175
176 static const char *spool_name = "*spool*";
177
178 /*
179 * NB! This routine locks the device, but if committing will
180 * not unlock it. If not committing, it will be unlocked.
181 */
despool_data(DCR * dcr,bool commit)182 static bool despool_data(DCR *dcr, bool commit)
183 {
184 DEVICE *rdev;
185 DCR *rdcr;
186 bool ok = true;
187 DEV_BLOCK *block;
188 JCR *jcr = dcr->jcr;
189 int stat;
190 char ec1[50];
191
192 Dmsg0(100, "Despooling data\n");
193 if (jcr->dcr->job_spool_size == 0) {
194 Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
195 }
196
197 /*
198 * Commit means that the job is done, so we commit, otherwise, we
199 * are despooling because of user spool size max or some error
200 * (e.g. filesystem full).
201 */
202 if (commit) {
203 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
204 jcr->dcr->VolumeName,
205 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
206 jcr->setJobStatus(JS_DataCommitting);
207 } else {
208 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
209 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
210 jcr->setJobStatus(JS_DataDespooling);
211 }
212 jcr->sendJobStatus(JS_DataDespooling);
213 dcr->despool_wait = true;
214 dcr->spooling = false;
215 /*
216 * We work with device blocked, but not locked so that
217 * other threads -- e.g. reservations can lock the device
218 * structure.
219 */
220 dcr->dblock(BST_DESPOOLING);
221 dcr->despool_wait = false;
222 dcr->despooling = true;
223
224 /*
225 * This is really quite kludgy and should be fixed some time.
226 * We create a dev structure to read from the spool file
227 * in rdev and rdcr.
228 */
229 rdev = New(file_dev);
230 rdev->dev_name = get_memory(strlen(spool_name)+1);
231 bstrncpy(rdev->dev_name, spool_name, strlen(spool_name)+1);
232 rdev->errmsg = get_pool_memory(PM_EMSG);
233 *rdev->errmsg = 0;
234 rdev->max_block_size = dcr->dev->max_block_size;
235 rdev->min_block_size = dcr->dev->min_block_size;
236 rdev->device = dcr->dev->device;
237 rdcr = new_dcr(jcr, NULL, rdev, SD_READ);
238 rdcr->spool_fd = dcr->spool_fd;
239 block = dcr->block; /* save block */
240 dcr->block = rdcr->block; /* make read and write block the same */
241
242 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
243 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
244
245 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
246 posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
247 #endif
248
249 /* Add run time, to get current wait time */
250 int32_t despool_start = time(NULL) - jcr->run_time;
251
252 set_new_file_parameters(dcr);
253
254 for ( ; ok; ) {
255 stat = read_block_from_spool_file(rdcr);
256 if (stat == RB_EOT) {
257 break;
258 } else if (stat == RB_ERROR) {
259 ok = false;
260 break;
261 }
262 ok = dcr->write_block_to_device();
263
264 if (jcr->is_canceled()) {
265 ok = false;
266 break;
267 }
268 if (!ok) {
269 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
270 dcr->dev->print_name(), dcr->dev->bstrerror());
271 Pmsg2(000, "Fatal append error on device %s: ERR=%s\n",
272 dcr->dev->print_name(), dcr->dev->bstrerror());
273 /* Force in case Incomplete set */
274 jcr->forceJobStatus(JS_FatalError);
275 }
276 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
277 }
278
279 if (!dir_create_jobmedia_record(dcr)) {
280 Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
281 dcr->getVolCatName(), jcr->Job);
282 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
283 }
284 flush_jobmedia_queue(jcr);
285 /* Set new file/block parameters for current dcr */
286 set_new_file_parameters(dcr);
287
288 /*
289 * Subtracting run_time give us elapsed time - wait_time since
290 * we started despooling. Note, don't use time_t as it is 32 or 64
291 * bits depending on the OS and doesn't edit with %d
292 */
293 int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
294
295 if (despool_elapsed <= 0) {
296 despool_elapsed = 1;
297 }
298
299 Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
300 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
301 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
302
303 dcr->block = block; /* reset block */
304
305 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
306 if (ftruncate(rdcr->spool_fd, 0) != 0) {
307 berrno be;
308 Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
309 be.bstrerror());
310 /* Note, try continuing despite ftruncate problem */
311 }
312
313 P(mutex);
314 if (spool_stats.data_size < dcr->job_spool_size) {
315 spool_stats.data_size = 0;
316 } else {
317 spool_stats.data_size -= dcr->job_spool_size;
318 }
319 V(mutex);
320 P(dcr->dev->spool_mutex);
321 dcr->dev->spool_size -= dcr->job_spool_size;
322 dcr->job_spool_size = 0; /* zap size in input dcr */
323 V(dcr->dev->spool_mutex);
324 free_memory(rdev->dev_name);
325 free_pool_memory(rdev->errmsg);
326 /* Be careful to NULL the jcr and free rdev after free_dcr() */
327 rdcr->jcr = NULL;
328 rdcr->set_dev(NULL);
329 free_dcr(rdcr);
330 free(rdev);
331 dcr->spooling = true; /* turn on spooling again */
332 dcr->despooling = false;
333 /*
334 * Note, if committing we leave the device blocked. It will be removed in
335 * release_device();
336 */
337 if (!commit) {
338 dcr->dev->dunblock();
339 }
340 jcr->sendJobStatus(JS_Running);
341 return ok;
342 }
343
344 /*
345 * Read a block from the spool file
346 *
347 * Returns RB_OK on success
348 * RB_EOT when file done
349 * RB_ERROR on error
350 */
read_block_from_spool_file(DCR * dcr)351 static int read_block_from_spool_file(DCR *dcr)
352 {
353 uint32_t rlen;
354 ssize_t stat;
355 spool_hdr hdr;
356 DEV_BLOCK *block = dcr->block;
357 JCR *jcr = dcr->jcr;
358
359 rlen = sizeof(hdr);
360 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
361 if (stat == 0) {
362 Dmsg0(100, "EOT on spool read.\n");
363 return RB_EOT;
364 } else if (stat != (ssize_t)rlen) {
365 if (stat == -1) {
366 berrno be;
367 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
368 be.bstrerror());
369 } else {
370 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
371 Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
372 }
373 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
374 return RB_ERROR;
375 }
376 rlen = hdr.len;
377 if (rlen > block->buf_len) {
378 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
379 Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
380 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
381 return RB_ERROR;
382 }
383 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
384 if (stat != (ssize_t)rlen) {
385 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
386 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
387 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
388 return RB_ERROR;
389 }
390 /* Setup write pointers */
391 block->binbuf = rlen;
392 block->bufp = block->buf + block->binbuf;
393 block->FirstIndex = hdr.FirstIndex;
394 block->LastIndex = hdr.LastIndex;
395 block->VolSessionId = dcr->jcr->VolSessionId;
396 block->VolSessionTime = dcr->jcr->VolSessionTime;
397
398 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
399 return RB_OK;
400 }
401
402 /*
403 * Write a block to the spool file
404 *
405 * Returns: true on success or EOT
406 * false on hard error
407 */
write_block_to_spool_file(DCR * dcr)408 bool write_block_to_spool_file(DCR *dcr)
409 {
410 uint32_t wlen, hlen; /* length to write */
411 bool despool = false;
412 DEV_BLOCK *block = dcr->block;
413
414 if (job_canceled(dcr->jcr)) {
415 return false;
416 }
417 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
418 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
419 return true;
420 }
421
422 hlen = sizeof(spool_hdr);
423 wlen = block->binbuf;
424 P(dcr->dev->spool_mutex);
425 dcr->job_spool_size += hlen + wlen;
426 dcr->dev->spool_size += hlen + wlen;
427 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
428 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
429 despool = true;
430 }
431 V(dcr->dev->spool_mutex);
432 P(mutex);
433 spool_stats.data_size += hlen + wlen;
434 if (spool_stats.data_size > spool_stats.max_data_size) {
435 spool_stats.max_data_size = spool_stats.data_size;
436 }
437 V(mutex);
438 if (despool) {
439 char ec1[30], ec2[30];
440 if (dcr->max_job_spool_size > 0) {
441 Jmsg(dcr->jcr, M_INFO, 0, _("User specified Job spool size reached: "
442 "JobSpoolSize=%s MaxJobSpoolSize=%s\n"),
443 edit_uint64_with_commas(dcr->job_spool_size, ec1),
444 edit_uint64_with_commas(dcr->max_job_spool_size, ec2));
445 } else {
446 Jmsg(dcr->jcr, M_INFO, 0, _("User specified Device spool size reached: "
447 "DevSpoolSize=%s MaxDevSpoolSize=%s\n"),
448 edit_uint64_with_commas(dcr->dev->spool_size, ec1),
449 edit_uint64_with_commas(dcr->dev->max_spool_size, ec2));
450 }
451
452 if (!despool_data(dcr, false)) {
453 Pmsg0(000, _("Bad return from despool in write_block.\n"));
454 return false;
455 }
456 /* Despooling cleared these variables so reset them */
457 P(dcr->dev->spool_mutex);
458 dcr->job_spool_size += hlen + wlen;
459 dcr->dev->spool_size += hlen + wlen;
460 V(dcr->dev->spool_mutex);
461 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
462 }
463
464 if (!write_spool_block(dcr)) {
465 return false;
466 }
467
468 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
469 empty_block(block);
470 return true;
471 }
472
rewind_spoolfile(DCR * dcr,ssize_t size,ssize_t expected)473 static bool rewind_spoolfile(DCR *dcr, ssize_t size, ssize_t expected)
474 {
475 JCR *jcr = dcr->jcr;
476 if (size == 0) {
477 return true; /* nothing to do */
478 }
479 Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
480 " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
481 (int)expected, (int)size);
482 #if defined(HAVE_WIN32)
483 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
484 #else
485 boffset_t pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
486 #endif
487 if (ftruncate(dcr->spool_fd, pos - size) != 0) {
488 berrno be;
489 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
490 be.bstrerror());
491 /* Note, try continuing despite ftruncate problem */
492 }
493 if (!despool_data(dcr, false)) {
494 Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
495 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
496 return false;
497 }
498 return true;
499 }
500
write_spool_block(DCR * dcr)501 static bool write_spool_block(DCR *dcr)
502 {
503 ssize_t size = 0, ret;
504 ssize_t expected = 0;
505
506 for (int retry=0; retry <= 1; retry++) {
507 /* Rewind if needed */
508 if (size > 0 && !rewind_spoolfile(dcr, size, expected)) {
509 return false;
510 }
511
512 /* Try to write the header */
513 ret = write_spool_header(dcr, &expected);
514 if (ret == -1) { /* I/O error, it's fatal */
515 goto bail_out;
516
517 } else {
518 size += ret; /* Keep the size written for a future rewind */
519 }
520
521 if (ret != expected) { /* We don't have the size expected, rewind, despool and retry */
522 continue;
523 }
524
525 ret = write_spool_data(dcr, &expected);
526 if (ret == -1) { /* I/O Error, it's fatal */
527 goto bail_out;
528
529 } else {
530 size += ret; /* Keep the size written for a furture rewind */
531 }
532
533 if (ret != expected) { /* We don't have the size expected, rewind, despool and retry */
534 continue;
535 }
536
537 return true;
538 }
539
540 bail_out:
541 berrno be;
542 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing block to spool file. ERR=%s\n"),
543 be.bstrerror());
544 dcr->jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
545 return false;
546 }
547
write_spool_header(DCR * dcr,ssize_t * expected)548 static ssize_t write_spool_header(DCR *dcr, ssize_t *expected)
549 {
550 spool_hdr hdr;
551 DEV_BLOCK *block = dcr->block;
552
553 hdr.FirstIndex = block->FirstIndex;
554 hdr.LastIndex = block->LastIndex;
555 hdr.len = block->binbuf;
556 *expected = sizeof(hdr);
557
558 /* Write header */
559 return write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
560 }
561
562
write_spool_data(DCR * dcr,ssize_t * expected)563 static ssize_t write_spool_data(DCR *dcr, ssize_t *expected)
564 {
565 DEV_BLOCK *block = dcr->block;
566 *expected = block->binbuf;
567 return write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
568 }
569
close_data_spool_file(DCR * dcr)570 static bool close_data_spool_file(DCR *dcr)
571 {
572 POOLMEM *name = get_pool_memory(PM_MESSAGE);
573
574 P(mutex);
575 spool_stats.data_jobs--;
576 spool_stats.total_data_jobs++;
577 if (spool_stats.data_size < dcr->job_spool_size) {
578 spool_stats.data_size = 0;
579 } else {
580 spool_stats.data_size -= dcr->job_spool_size;
581 }
582 V(mutex);
583 P(dcr->dev->spool_mutex);
584 dcr->job_spool_size = 0;
585 V(dcr->dev->spool_mutex);
586
587 make_unique_data_spool_filename(dcr, &name);
588 close(dcr->spool_fd);
589 dcr->spool_fd = -1;
590 dcr->spooling = false;
591 unlink(name);
592 Dmsg1(100, "Deleted spool file: %s\n", name);
593 free_pool_memory(name);
594 return true;
595 }
596
are_attributes_spooled(JCR * jcr)597 bool are_attributes_spooled(JCR *jcr)
598 {
599 return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
600 }
601
602 /*
603 * Create spool file for attributes.
604 * This is done by "attaching" to the bsock, and when
605 * it is called, the output is written to a file.
606 * The actual spooling is turned on and off in
607 * append.c only during writing of the attributes.
608 */
begin_attribute_spool(JCR * jcr)609 bool begin_attribute_spool(JCR *jcr)
610 {
611 if (!jcr->no_attributes && jcr->spool_attributes) {
612 return open_attr_spool_file(jcr, jcr->dir_bsock);
613 }
614 return true;
615 }
616
update_attr_spool_size(ssize_t size)617 static void update_attr_spool_size(ssize_t size)
618 {
619 P(mutex);
620 if (size > 0) {
621 if ((spool_stats.attr_size - size) > 0) {
622 spool_stats.attr_size -= size;
623 } else {
624 spool_stats.attr_size = 0;
625 }
626 }
627 V(mutex);
628 }
629
make_unique_spool_filename(JCR * jcr,POOLMEM ** name,int fd)630 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
631 {
632 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
633 jcr->Job, fd);
634 }
635
636 /*
637 * Tell Director where to find the attributes spool file
638 * Note, if we are not on the same machine, the Director will
639 * return an error, and the higher level routine will transmit
640 * the data record by record -- using bsock->despool().
641 */
blast_attr_spool_file(JCR * jcr,boffset_t size)642 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
643 {
644 /* send full spool file name */
645 POOLMEM *name = get_pool_memory(PM_MESSAGE);
646 make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
647 bash_spaces(name);
648 jcr->dir_bsock->fsend("BlastAttr JobId=%d File=%s\n", jcr->JobId, name);
649 free_pool_memory(name);
650
651 if (jcr->dir_bsock->recv() <= 0) {
652 Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
653 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
654 return false;
655 }
656
657 if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
658 return false;
659 }
660 return true;
661 }
662
commit_attribute_spool(JCR * jcr)663 bool commit_attribute_spool(JCR *jcr)
664 {
665 boffset_t size, data_end;
666 char ec1[30];
667 char tbuf[100];
668 BSOCK *dir;
669
670 Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
671 (utime_t)time(NULL)));
672 if (are_attributes_spooled(jcr)) {
673 dir = jcr->dir_bsock;
674 if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
675 berrno be;
676 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
677 be.bstrerror());
678 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
679 goto bail_out;
680 }
681 size = ftello(dir->m_spool_fd);
682 /* For Incomplete Job truncate spool file to last valid data_end if necssary */
683 if (jcr->is_JobStatus(JS_Incomplete)) {
684 data_end = dir->get_last_data_end();
685 if (size > data_end) {
686 if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
687 berrno be;
688 Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
689 be.bstrerror());
690 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
691 goto bail_out;
692 }
693 Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n",
694 size, data_end);
695 size = data_end;
696 }
697 }
698 if (size < 0) {
699 berrno be;
700 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
701 be.bstrerror());
702 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
703 goto bail_out;
704 }
705 P(mutex);
706 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
707 spool_stats.max_attr_size = spool_stats.attr_size + size;
708 }
709 spool_stats.attr_size += size;
710 V(mutex);
711 jcr->sendJobStatus(JS_AttrDespooling);
712 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
713 edit_uint64_with_commas(size, ec1));
714
715 if (!blast_attr_spool_file(jcr, size)) {
716 /* Can't read spool file from director side,
717 * send content over network.
718 */
719 dir->despool(update_attr_spool_size, size);
720 }
721 return close_attr_spool_file(jcr, dir);
722 }
723 return true;
724
725 bail_out:
726 close_attr_spool_file(jcr, dir);
727 return false;
728 }
729
open_attr_spool_file(JCR * jcr,BSOCK * bs)730 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
731 {
732 POOLMEM *name = get_pool_memory(PM_MESSAGE);
733
734 make_unique_spool_filename(jcr, &name, bs->m_fd);
735 bs->m_spool_fd = bfopen(name, "w+b");
736 if (!bs->m_spool_fd) {
737 berrno be;
738 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
739 be.bstrerror());
740 jcr->forceJobStatus(JS_FatalError); /* override any Incomplete */
741 free_pool_memory(name);
742 return false;
743 }
744 P(mutex);
745 spool_stats.attr_jobs++;
746 V(mutex);
747 free_pool_memory(name);
748 return true;
749 }
750
close_attr_spool_file(JCR * jcr,BSOCK * bs)751 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
752 {
753 POOLMEM *name;
754
755 char tbuf[100];
756
757 Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
758 (utime_t)time(NULL)));
759 if (!bs->m_spool_fd) {
760 return true;
761 }
762 name = get_pool_memory(PM_MESSAGE);
763 P(mutex);
764 spool_stats.attr_jobs--;
765 spool_stats.total_attr_jobs++;
766 V(mutex);
767 make_unique_spool_filename(jcr, &name, bs->m_fd);
768 fclose(bs->m_spool_fd);
769 unlink(name);
770 free_pool_memory(name);
771 bs->m_spool_fd = NULL;
772 bs->clear_spooling();
773 return true;
774 }
775
discard_attribute_spool(JCR * jcr)776 bool discard_attribute_spool(JCR *jcr)
777 {
778 if (are_attributes_spooled(jcr)) {
779 return close_attr_spool_file(jcr, jcr->dir_bsock);
780 }
781 return true;
782 }
783