1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Append code for Storage daemon
21 * Kern Sibbald, May MM
22 */
23
24 #include "bacula.h"
25 #include "stored.h"
26 #include "prepare.h"
27
28
29 /* Responses sent to the File daemon */
30 static char OK_data[] = "3000 OK data\n";
31 static char OK_append[] = "3000 OK append data\n";
32
33 /*
34 * Check if we can mark this job incomplete
35 *
36 */
possible_incomplete_job(JCR * jcr,uint32_t last_file_index)37 void possible_incomplete_job(JCR *jcr, uint32_t last_file_index)
38 {
39 BSOCK *dir = jcr->dir_bsock;
40 /*
41 * Note, here we decide if it is worthwhile to restart
42 * the Job at this point. For the moment, if at least
43 * 10 Files have been seen.
44 * We must be sure that the saved files are safe.
45 * Using this function when there is as comm line problem is probably safe,
46 * it is inappropriate to use it for a any failure that could
47 * involve corrupted data.
48 * We cannot mark a job Incomplete if we have already flushed
49 * a bad JobMedia record (i.e. one beyond the last FileIndex
50 * that is known to be good).
51 */
52 if (jcr->spool_attributes && last_file_index > 10 &&
53 dir->get_lastFlushIndex() < last_file_index) {
54 jcr->setJobStatus(JS_Incomplete);
55 }
56 }
57
58 /*
59 * Append Data sent from Client (FD/SD)
60 *
61 */
do_append_data(JCR * jcr)62 bool do_append_data(JCR *jcr)
63 {
64 int32_t n;
65 int32_t file_index, stream, last_file_index;
66 uint64_t stream_len;
67 BSOCK *fd = jcr->file_bsock;
68 bool ok = true;
69 DEV_RECORD rec;
70 prepare_ctx pctx;
71 char buf1[100], buf2[100];
72 DCR *dcr = jcr->dcr;
73 DEVICE *dev;
74 char ec[50];
75 POOL_MEM errmsg(PM_EMSG);
76
77 if (!dcr) {
78 pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
79 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
80 return false;
81 }
82 dev = dcr->dev;
83 if (!dev) {
84 pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
85 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
86 return false;
87 }
88
89 Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
90
91 memset(&rec, 0, sizeof(rec));
92
93 if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
94 jcr->setJobStatus(JS_ErrorTerminated);
95 pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
96 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
97 return false;
98 }
99
100 if (!acquire_device_for_append(dcr)) {
101 jcr->setJobStatus(JS_ErrorTerminated);
102 return false;
103 }
104
105 dev->start_of_job(dcr);
106 jcr->sendJobStatus(JS_Running);
107
108 Dmsg1(50, "Begin append device=%s\n", dev->print_name());
109
110 begin_data_spool(dcr);
111 begin_attribute_spool(jcr);
112
113 /*
114 * Write Begin Session Record
115 */
116 if (!write_session_label(dcr, SOS_LABEL)) {
117 Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
118 dev->bstrerror());
119 jcr->setJobStatus(JS_ErrorTerminated);
120 ok = false;
121 }
122
123 /* Tell File daemon to send data */
124 if (!fd->fsend(OK_data)) {
125 berrno be;
126 Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
127 be.bstrerror(fd->b_errno));
128 ok = false;
129 }
130
131 /*
132 * Get Data from File daemon, write to device. To clarify what is
133 * going on here. We expect:
134 * - A stream header
135 * - Multiple records of data
136 * - EOD record
137 *
138 * The Stream header is just used to synchronize things, and
139 * none of the stream header is written to tape.
140 * The Multiple records of data, contain first the Attributes,
141 * then after another stream header, the file data, then
142 * after another stream header, the MD5 data if any.
143 *
144 * So we get the (stream header, data, EOD) three time for each
145 * file. 1. for the Attributes, 2. for the file data if any,
146 * and 3. for the MD5 if any.
147 */
148 dcr->VolFirstIndex = dcr->VolLastIndex = 0;
149 jcr->run_time = time(NULL); /* start counting time for rates */
150
151 GetMsg *qfd = dcr->dev->get_msg_queue(jcr, fd, DEDUP_MAX_MSG_SIZE);
152
153 qfd->start_read_sock();
154
155 for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
156 /* assume no server side deduplication at first */
157 bool dedup_srv_side = false;
158 /* used to store references and hash in the volume */
159 char dedup_ref_buf[DEDUP_MAX_REF_SIZE+OFFSET_FADDR_SIZE+100];
160
161 /* Read Stream header from the File daemon.
162 * The stream header consists of the following:
163 * file_index (sequential Bacula file index, base 1)
164 * stream (Bacula number to distinguish parts of data)
165 * stream_len (Expected length of this stream. This
166 * will be the size backed up if the file does not
167 * grow during the backup.
168 */
169 n = qfd->bget_msg(NULL);
170 if (n <= 0) {
171 if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
172 Dmsg0(200, "Got EOD on reading header.\n");
173 break; /* end of data */
174 }
175 Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
176 n, qfd->msglen, fd->bstrerror());
177 // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
178 possible_incomplete_job(jcr, last_file_index);
179 ok = false;
180 break;
181 }
182
183 if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
184 // TODO ASX already done in bufmsg, should reuse the values
185 char buf[256];
186 Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
187 ok = false;
188 possible_incomplete_job(jcr, last_file_index);
189 break;
190 }
191
192 Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
193 file_index, stream, stream_len);
194
195 /*
196 * We make sure the file_index is advancing sequentially.
197 * An incomplete job can start the file_index at any number.
198 * otherwise, it must start at 1.
199 */
200 if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
201 goto fi_checked;
202 }
203 Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
204 if (file_index > 0 && (file_index == last_file_index ||
205 file_index == last_file_index + 1)) {
206 goto fi_checked;
207 }
208 Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
209 file_index, last_file_index);
210 possible_incomplete_job(jcr, last_file_index);
211 ok = false;
212 break;
213
214 fi_checked:
215 if (file_index != last_file_index) {
216 jcr->JobFiles = file_index;
217 last_file_index = file_index;
218 }
219
220 dedup_srv_side = is_dedup_server_side(dev, stream, stream_len);
221
222 /* Read data stream from the File daemon.
223 * The data stream is just raw bytes
224 */
225 while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
226 rec.VolSessionId = jcr->VolSessionId;
227 rec.VolSessionTime = jcr->VolSessionTime;
228 rec.FileIndex = file_index;
229 rec.Stream = stream | (dedup_srv_side ? STREAM_BIT_DEDUPLICATION_DATA : 0);
230 rec.StreamLen = stream_len;
231 rec.maskedStream = stream & STREAMMASK_TYPE; /* strip high bits */
232 rec.data_len = qfd->msglen;
233 rec.data = qfd->msg; /* use message buffer */
234 rec.extra_bytes = 0;
235
236 /* Debug code: check if we must hangup or blowup */
237 if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
238 fd->close();
239 return false;
240 }
241 Dmsg4(850, "before write_rec FI=%d SessId=%d Strm=%s len=%d\n",
242 rec.FileIndex, rec.VolSessionId,
243 stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
244 rec.data_len);
245 /*
246 * Check for any last minute Storage daemon preparation
247 * of the files being backed up proir to doing so. E.g.
248 * we might do a Percona prepare or a virus check.
249 */
250 if (prepare(jcr, pctx, rec)) {
251 /* All done in prepare */
252 } else {
253 /* Normal non "prepare" backup */
254
255 char *rbuf = qfd->msg;
256 char *wdedup_ref_buf = dedup_ref_buf;
257 int rbuflen = qfd->msglen;
258 if (is_offset_stream(stream)) {
259 if (stream & STREAM_BIT_OFFSETS) {
260 /* Prepare to update the index */
261 unser_declare;
262 unser_begin(rbuf, 0);
263 unser_uint64(rec.FileOffset);
264 }
265 rbuf += OFFSET_FADDR_SIZE;
266 rbuflen -= OFFSET_FADDR_SIZE;
267 if (dedup_srv_side) {
268 wdedup_ref_buf += OFFSET_FADDR_SIZE;
269 memcpy(dedup_ref_buf, qfd->msg, OFFSET_FADDR_SIZE);
270 }
271 }
272
273 if (dedup_srv_side) {
274 /* if dedup is in use then store and replace the chunk by its ref */
275 ok = qfd->dedup_store_chunk(&rec, rbuf, rbuflen, dedup_ref_buf, wdedup_ref_buf, errmsg.addr());
276 if (!ok) {
277 // TODO ASX, I have used successfully a Jmsg and no break from the beginning
278 // a Dmsg and a break looks more appropriate, hope this works
279 Jmsg1(jcr, M_FATAL, 0, "%s", errmsg.c_str());
280 break;
281 }
282 } else {
283 if (stream & STREAM_BIT_DEDUPLICATION_DATA) {
284 /* do accounting for VolABytes */
285 rec.extra_bytes = qfd->bmsg->dedup_size;
286 } else {
287 rec.extra_bytes = 0;
288 }
289 }
290
291 Dmsg4(850, "before write_rec FI=%d SessId=%d Strm=%s len=%d\n",
292 rec.FileIndex, rec.VolSessionId,
293 stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
294 rec.data_len);
295 /* Do the detection here because references are also created by the FD when dedup=bothside */
296 rec.state_bits |= is_dedup_ref(&rec, true) ? REC_NO_SPLIT : 0;
297 ok = dcr->write_record(&rec);
298 if (!ok) {
299 Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
300 dcr->dev->print_name(), dcr->dev->bstrerror());
301 break;
302 }
303
304 jcr->JobBytes += rec.data_len; /* increment bytes this job */
305 jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
306 Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
307 FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
308 stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
309
310 send_attrs_to_dir(jcr, &rec);
311 }
312 Dmsg0(650, "Enter bnet_get\n");
313 }
314 Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
315
316 if (fd->is_error()) {
317 if (!jcr->is_job_canceled()) {
318 Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
319 Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
320 fd->bstrerror());
321 possible_incomplete_job(jcr, last_file_index);
322 }
323 ok = false;
324 break;
325 }
326 }
327
328 /* stop local and remote dedup */
329 Dmsg2(DT_DEDUP|215, "Wait for deduplication quarantine: emergency_exit=%d device=%s\n", ok?0:1, dev->print_name());
330 qfd->wait_read_sock((ok == false) || jcr->is_job_canceled());
331
332 if (qfd->commit(errmsg.addr(), jcr->JobId)) {
333 ok = false;
334 Jmsg1(jcr, M_ERROR, 0, _("DDE commit failed. ERR=%s\n"),
335 errmsg.c_str());
336 }
337
338 /* Create Job status for end of session label */
339 jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
340
341 if (ok) {
342 /* Terminate connection with Client */
343 fd->fsend(OK_append);
344 do_client_commands(jcr); /* finish dialog with Client */
345 } else {
346 fd->fsend("3999 Failed append\n");
347 }
348
349 prepare_sd_end(jcr, pctx, rec);
350
351 Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
352
353 /*
354 * Check if we can still write. This may not be the case
355 * if we are at the end of the tape or we got a fatal I/O error.
356 */
357 dcr->set_ameta();
358 if (ok || dev->can_write()) {
359 if (!dev->flush_before_eos(dcr)) {
360 /* Print only if ok and not cancelled to avoid spurious messages */
361 if (ok && !jcr->is_job_canceled()) {
362 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
363 dev->print_name(), dev->bstrerror());
364 Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
365 possible_incomplete_job(jcr, last_file_index);
366 }
367 jcr->setJobStatus(JS_ErrorTerminated);
368 ok = false;
369 }
370 if (!write_session_label(dcr, EOS_LABEL)) {
371 /* Print only if ok and not cancelled to avoid spurious messages */
372 if (ok && !jcr->is_job_canceled()) {
373 Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
374 dev->bstrerror());
375 possible_incomplete_job(jcr, last_file_index);
376 }
377 jcr->setJobStatus(JS_ErrorTerminated);
378 ok = false;
379 }
380 /* Flush out final partial ameta block of this session */
381 Dmsg1(200, "=== Flush adata=%d last block.\n", dcr->block->adata);
382 ASSERT(!dcr->block->adata);
383 if (!dcr->write_final_block_to_device()) {
384 /* Print only if ok and not cancelled to avoid spurious messages */
385 if (ok && !jcr->is_job_canceled()) {
386 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
387 dev->print_name(), dev->bstrerror());
388 Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
389 possible_incomplete_job(jcr, last_file_index);
390 }
391 jcr->setJobStatus(JS_ErrorTerminated);
392 ok = false;
393 }
394 }
395 /* Must keep the dedup connection alive (and the "last" hashes buffer)
396 * until the last block has been written into the volume for the vacuum */
397 free_GetMsg(qfd);
398
399 flush_jobmedia_queue(jcr);
400 if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
401 discard_data_spool(dcr);
402 } else {
403 /* Note: if commit is OK, the device will remain blocked */
404 commit_data_spool(dcr);
405 }
406
407 /*
408 * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
409 * and the subsequent Jmsg() editing will break
410 */
411 int32_t job_elapsed = time(NULL) - jcr->run_time;
412
413 if (job_elapsed <= 0) {
414 job_elapsed = 1;
415 }
416
417 Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
418 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
419 edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
420
421 /*
422 * Release the device -- and send final Vol info to DIR
423 * and unlock it.
424 */
425 release_device(dcr);
426
427 if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
428 discard_attribute_spool(jcr);
429 } else {
430 commit_attribute_spool(jcr);
431 }
432
433 jcr->sendJobStatus(); /* update director */
434
435 Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
436 return ok;
437 }
438
439
440 /* Send attributes and digest to Director for Catalog */
send_attrs_to_dir(JCR * jcr,DEV_RECORD * rec)441 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
442 {
443 if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES ||
444 rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
445 rec->maskedStream == STREAM_RESTORE_OBJECT ||
446 crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
447 if (!jcr->no_attributes) {
448 BSOCK *dir = jcr->dir_bsock;
449 if (are_attributes_spooled(jcr)) {
450 dir->set_spooling();
451 }
452 Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
453 if (!dir_update_file_attributes(jcr->dcr, rec)) {
454 Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
455 dir->bstrerror());
456 dir->clear_spooling();
457 return false;
458 }
459 dir->clear_spooling();
460 }
461 }
462 return true;
463 }
464