1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Append code for Storage daemon
21  *  Kern Sibbald, May MM
22  */
23 
24 #include "bacula.h"
25 #include "stored.h"
26 #include "prepare.h"
27 
28 
29 /* Responses sent to the File daemon */
30 static char OK_data[]    = "3000 OK data\n";
31 static char OK_append[]  = "3000 OK append data\n";
32 
33 /*
34  * Check if we can mark this job incomplete
35  *
36  */
possible_incomplete_job(JCR * jcr,uint32_t last_file_index)37 void possible_incomplete_job(JCR *jcr, uint32_t last_file_index)
38 {
39    BSOCK *dir = jcr->dir_bsock;
40    /*
41     * Note, here we decide if it is worthwhile to restart
42     *  the Job at this point. For the moment, if at least
43     *  10 Files have been seen.
44     * We must be sure that the saved files are safe.
45     * Using this function when there is as comm line problem is probably safe,
46     *  it is inappropriate to use it for a any failure that could
47     *  involve corrupted data.
48     * We cannot mark a job Incomplete if we have already flushed
49     *  a bad JobMedia record (i.e. one beyond the last FileIndex
50     *  that is known to be good).
51     */
52    if (jcr->spool_attributes && last_file_index > 10 &&
53        dir->get_lastFlushIndex() < last_file_index) {
54       jcr->setJobStatus(JS_Incomplete);
55    }
56 }
57 
58 /*
59  *  Append Data sent from Client (FD/SD)
60  *
61  */
do_append_data(JCR * jcr)62 bool do_append_data(JCR *jcr)
63 {
64    int32_t n;
65    int32_t file_index, stream, last_file_index;
66    uint64_t stream_len;
67    BSOCK *fd = jcr->file_bsock;
68    bool ok = true;
69    DEV_RECORD rec;
70    prepare_ctx pctx;
71    char buf1[100], buf2[100];
72    DCR *dcr = jcr->dcr;
73    DEVICE *dev;
74    char ec[50];
75    POOL_MEM errmsg(PM_EMSG);
76 
77    if (!dcr) {
78       pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
79       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
80       return false;
81    }
82    dev = dcr->dev;
83    if (!dev) {
84       pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
85       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
86       return false;
87    }
88 
89    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
90 
91    memset(&rec, 0, sizeof(rec));
92 
93    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
94       jcr->setJobStatus(JS_ErrorTerminated);
95       pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
96       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
97       return false;
98    }
99 
100    if (!acquire_device_for_append(dcr)) {
101       jcr->setJobStatus(JS_ErrorTerminated);
102       return false;
103    }
104 
105    dev->start_of_job(dcr);
106    jcr->sendJobStatus(JS_Running);
107 
108    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
109 
110    begin_data_spool(dcr);
111    begin_attribute_spool(jcr);
112 
113    /*
114     * Write Begin Session Record
115     */
116    if (!write_session_label(dcr, SOS_LABEL)) {
117       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
118          dev->bstrerror());
119       jcr->setJobStatus(JS_ErrorTerminated);
120       ok = false;
121    }
122 
123    /* Tell File daemon to send data */
124    if (!fd->fsend(OK_data)) {
125       berrno be;
126       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
127             be.bstrerror(fd->b_errno));
128       ok = false;
129    }
130 
131    /*
132     * Get Data from File daemon, write to device.  To clarify what is
133     *   going on here.  We expect:
134     *     - A stream header
135     *     - Multiple records of data
136     *     - EOD record
137     *
138     *    The Stream header is just used to synchronize things, and
139     *    none of the stream header is written to tape.
140     *    The Multiple records of data, contain first the Attributes,
141     *    then after another stream header, the file data, then
142     *    after another stream header, the MD5 data if any.
143     *
144     *   So we get the (stream header, data, EOD) three time for each
145     *   file. 1. for the Attributes, 2. for the file data if any,
146     *   and 3. for the MD5 if any.
147     */
148    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
149    jcr->run_time = time(NULL);              /* start counting time for rates */
150 
151    GetMsg *qfd = dcr->dev->get_msg_queue(jcr, fd, DEDUP_MAX_MSG_SIZE);
152 
153    qfd->start_read_sock();
154 
155    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
156       /* assume no server side deduplication at first */
157       bool dedup_srv_side = false;
158       /* used to store references and hash in the volume */
159       char dedup_ref_buf[DEDUP_MAX_REF_SIZE+OFFSET_FADDR_SIZE+100];
160 
161       /* Read Stream header from the File daemon.
162        *  The stream header consists of the following:
163        *    file_index (sequential Bacula file index, base 1)
164        *    stream     (Bacula number to distinguish parts of data)
165        *    stream_len (Expected length of this stream. This
166        *       will be the size backed up if the file does not
167        *       grow during the backup.
168        */
169       n = qfd->bget_msg(NULL);
170       if (n <= 0) {
171          if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
172             Dmsg0(200, "Got EOD on reading header.\n");
173             break;                    /* end of data */
174          }
175          Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
176                n, qfd->msglen, fd->bstrerror());
177          // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
178          possible_incomplete_job(jcr, last_file_index);
179          ok = false;
180          break;
181       }
182 
183       if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
184          // TODO ASX already done in bufmsg, should reuse the values
185          char buf[256];
186          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
187          ok = false;
188          possible_incomplete_job(jcr, last_file_index);
189          break;
190       }
191 
192       Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
193          file_index, stream, stream_len);
194 
195       /*
196        * We make sure the file_index is advancing sequentially.
197        * An incomplete job can start the file_index at any number.
198        * otherwise, it must start at 1.
199        */
200       if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
201          goto fi_checked;
202       }
203       Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
204       if (file_index > 0 && (file_index == last_file_index ||
205           file_index == last_file_index + 1)) {
206          goto fi_checked;
207       }
208       Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
209             file_index, last_file_index);
210       possible_incomplete_job(jcr, last_file_index);
211       ok = false;
212       break;
213 
214 fi_checked:
215       if (file_index != last_file_index) {
216          jcr->JobFiles = file_index;
217          last_file_index = file_index;
218       }
219 
220       dedup_srv_side = is_dedup_server_side(dev, stream, stream_len);
221 
222       /* Read data stream from the File daemon.
223        *  The data stream is just raw bytes
224        */
225       while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
226          rec.VolSessionId = jcr->VolSessionId;
227          rec.VolSessionTime = jcr->VolSessionTime;
228          rec.FileIndex = file_index;
229          rec.Stream = stream | (dedup_srv_side ? STREAM_BIT_DEDUPLICATION_DATA : 0);
230          rec.StreamLen = stream_len;
231          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
232          rec.data_len = qfd->msglen;
233          rec.data = qfd->msg;            /* use message buffer */
234          rec.extra_bytes = 0;
235 
236          /* Debug code: check if we must hangup or blowup */
237          if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
238             fd->close();
239             return false;
240          }
241          Dmsg4(850, "before write_rec FI=%d SessId=%d Strm=%s len=%d\n",
242             rec.FileIndex, rec.VolSessionId,
243             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
244             rec.data_len);
245          /*
246           * Check for any last minute Storage daemon preparation
247           *   of the files being backed up proir to doing so.  E.g.
248           *   we might do a Percona prepare or a virus check.
249           */
250          if (prepare(jcr, pctx, rec)) {
251             /* All done in prepare */
252          } else {
253             /* Normal non "prepare" backup */
254 
255             char *rbuf = qfd->msg;
256             char *wdedup_ref_buf = dedup_ref_buf;
257             int rbuflen = qfd->msglen;
258             if (is_offset_stream(stream)) {
259                if (stream & STREAM_BIT_OFFSETS) {
260                   /* Prepare to update the index */
261                   unser_declare;
262                   unser_begin(rbuf, 0);
263                   unser_uint64(rec.FileOffset);
264                }
265                rbuf += OFFSET_FADDR_SIZE;
266                rbuflen -= OFFSET_FADDR_SIZE;
267                if (dedup_srv_side) {
268                   wdedup_ref_buf += OFFSET_FADDR_SIZE;
269                   memcpy(dedup_ref_buf, qfd->msg, OFFSET_FADDR_SIZE);
270                }
271             }
272 
273             if (dedup_srv_side) {
274                /* if dedup is in use then store and replace the chunk by its ref */
275                ok = qfd->dedup_store_chunk(&rec, rbuf, rbuflen, dedup_ref_buf, wdedup_ref_buf, errmsg.addr());
276                if (!ok) {
277                   // TODO ASX, I have used successfully a Jmsg and no break from the beginning
278                   // a Dmsg and a break looks more appropriate, hope this works
279                   Jmsg1(jcr, M_FATAL, 0, "%s", errmsg.c_str());
280                   break;
281                }
282             } else {
283                if (stream & STREAM_BIT_DEDUPLICATION_DATA) {
284                   /* do accounting for VolABytes */
285                   rec.extra_bytes = qfd->bmsg->dedup_size;
286                } else {
287                   rec.extra_bytes = 0;
288                }
289             }
290 
291             Dmsg4(850, "before write_rec FI=%d SessId=%d Strm=%s len=%d\n",
292                   rec.FileIndex, rec.VolSessionId,
293                   stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
294                   rec.data_len);
295             /* Do the detection here because references are also created by the FD when dedup=bothside */
296             rec.state_bits |= is_dedup_ref(&rec, true) ? REC_NO_SPLIT : 0;
297             ok = dcr->write_record(&rec);
298             if (!ok) {
299                Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
300                      dcr->dev->print_name(), dcr->dev->bstrerror());
301                break;
302             }
303 
304             jcr->JobBytes += rec.data_len;   /* increment bytes this job */
305             jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
306             Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
307                   FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
308                   stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
309 
310             send_attrs_to_dir(jcr, &rec);
311          }
312          Dmsg0(650, "Enter bnet_get\n");
313       }
314       Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
315 
316       if (fd->is_error()) {
317          if (!jcr->is_job_canceled()) {
318             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
319             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
320                   fd->bstrerror());
321             possible_incomplete_job(jcr, last_file_index);
322          }
323          ok = false;
324          break;
325       }
326    }
327 
328    /* stop local and remote dedup  */
329    Dmsg2(DT_DEDUP|215, "Wait for deduplication quarantine: emergency_exit=%d device=%s\n", ok?0:1, dev->print_name());
330    qfd->wait_read_sock((ok == false) || jcr->is_job_canceled());
331 
332    if (qfd->commit(errmsg.addr(), jcr->JobId)) {
333       ok = false;
334       Jmsg1(jcr, M_ERROR, 0, _("DDE commit failed. ERR=%s\n"),
335             errmsg.c_str());
336    }
337 
338    /* Create Job status for end of session label */
339    jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
340 
341    if (ok) {
342       /* Terminate connection with Client */
343       fd->fsend(OK_append);
344       do_client_commands(jcr);            /* finish dialog with Client */
345    } else {
346       fd->fsend("3999 Failed append\n");
347    }
348 
349    prepare_sd_end(jcr, pctx, rec);
350 
351    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
352 
353    /*
354     * Check if we can still write. This may not be the case
355     *  if we are at the end of the tape or we got a fatal I/O error.
356     */
357    dcr->set_ameta();
358    if (ok || dev->can_write()) {
359       if (!dev->flush_before_eos(dcr)) {
360          /* Print only if ok and not cancelled to avoid spurious messages */
361          if (ok && !jcr->is_job_canceled()) {
362             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
363                   dev->print_name(), dev->bstrerror());
364             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
365             possible_incomplete_job(jcr, last_file_index);
366          }
367          jcr->setJobStatus(JS_ErrorTerminated);
368          ok = false;
369       }
370       if (!write_session_label(dcr, EOS_LABEL)) {
371          /* Print only if ok and not cancelled to avoid spurious messages */
372          if (ok && !jcr->is_job_canceled()) {
373             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
374                   dev->bstrerror());
375             possible_incomplete_job(jcr, last_file_index);
376          }
377          jcr->setJobStatus(JS_ErrorTerminated);
378          ok = false;
379       }
380       /* Flush out final partial ameta block of this session */
381       Dmsg1(200, "=== Flush adata=%d last block.\n", dcr->block->adata);
382       ASSERT(!dcr->block->adata);
383       if (!dcr->write_final_block_to_device()) {
384          /* Print only if ok and not cancelled to avoid spurious messages */
385          if (ok && !jcr->is_job_canceled()) {
386             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
387                   dev->print_name(), dev->bstrerror());
388             Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
389             possible_incomplete_job(jcr, last_file_index);
390          }
391          jcr->setJobStatus(JS_ErrorTerminated);
392          ok = false;
393       }
394    }
395    /* Must keep the dedup connection alive (and the "last" hashes buffer)
396     * until the last block has been written into the volume for the vacuum */
397    free_GetMsg(qfd);
398 
399    flush_jobmedia_queue(jcr);
400    if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
401       discard_data_spool(dcr);
402    } else {
403       /* Note: if commit is OK, the device will remain blocked */
404       commit_data_spool(dcr);
405    }
406 
407    /*
408     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
409     *   and the subsequent Jmsg() editing will break
410     */
411    int32_t job_elapsed = time(NULL) - jcr->run_time;
412 
413    if (job_elapsed <= 0) {
414       job_elapsed = 1;
415    }
416 
417    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
418          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
419          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
420 
421    /*
422     * Release the device -- and send final Vol info to DIR
423     *  and unlock it.
424     */
425    release_device(dcr);
426 
427    if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
428       discard_attribute_spool(jcr);
429    } else {
430       commit_attribute_spool(jcr);
431    }
432 
433    jcr->sendJobStatus();          /* update director */
434 
435    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
436    return ok;
437 }
438 
439 
440 /* Send attributes and digest to Director for Catalog */
send_attrs_to_dir(JCR * jcr,DEV_RECORD * rec)441 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
442 {
443    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    ||
444        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
445        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
446        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
447       if (!jcr->no_attributes) {
448          BSOCK *dir = jcr->dir_bsock;
449          if (are_attributes_spooled(jcr)) {
450             dir->set_spooling();
451          }
452          Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
453          if (!dir_update_file_attributes(jcr->dcr, rec)) {
454             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
455                dir->bstrerror());
456             dir->clear_spooling();
457             return false;
458          }
459          dir->clear_spooling();
460       }
461    }
462    return true;
463 }
464