1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Append code for Storage daemon
21  *  Kern Sibbald, May MM
22  */
23 
24 #include "bacula.h"
25 #include "stored.h"
26 
27 
28 /* Responses sent to the File daemon */
29 static char OK_data[]    = "3000 OK data\n";
30 static char OK_append[]  = "3000 OK append data\n";
31 
32 /* Forward referenced functions */
33 
34 
35 /*
36  * Check if we can mark this job incomplete
37  *
38  */
possible_incomplete_job(JCR * jcr,uint32_t last_file_index)39 void possible_incomplete_job(JCR *jcr, uint32_t last_file_index)
40 {
41    BSOCK *dir = jcr->dir_bsock;
42    /*
43     * Note, here we decide if it is worthwhile to restart
44     *  the Job at this point. For the moment, if at least
45     *  10 Files have been seen.
46     * We must be sure that the saved files are safe.
47     * Using this function when there is as comm line problem is probably safe,
48     *  it is inappropriate to use it for a any failure that could
49     *  involve corrupted data.
50     * We cannot mark a job Incomplete if we have already flushed
51     *  a bad JobMedia record (i.e. one beyond the last FileIndex
52     *  that is known to be good).
53     */
54    if (jcr->spool_attributes && last_file_index > 10 &&
55        dir->get_lastFlushIndex() < last_file_index) {
56       jcr->setJobStatus(JS_Incomplete);
57    }
58 }
59 
60 /*
61  *  Append Data sent from Client (FD/SD)
62  *
63  */
do_append_data(JCR * jcr)64 bool do_append_data(JCR *jcr)
65 {
66    int32_t n;
67    int32_t file_index, stream, last_file_index;
68    uint64_t stream_len;
69    BSOCK *fd = jcr->file_bsock;
70    bool ok = true;
71    DEV_RECORD rec;
72    char buf1[100], buf2[100];
73    DCR *dcr = jcr->dcr;
74    DEVICE *dev;
75    char ec[50];
76    POOLMEM *eblock = NULL;
77    POOL_MEM errmsg(PM_EMSG);
78 
79    if (!dcr) {
80       pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
81       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
82       return false;
83    }
84    dev = dcr->dev;
85    if (!dev) {
86       pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
87       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
88       return false;
89    }
90 
91    Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
92 
93    memset(&rec, 0, sizeof(rec));
94 
95    if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
96       jcr->setJobStatus(JS_ErrorTerminated);
97       pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
98       Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
99       return false;
100    }
101 
102    if (!acquire_device_for_append(dcr)) {
103       jcr->setJobStatus(JS_ErrorTerminated);
104       return false;
105    }
106 
107    dev->start_of_job(dcr);
108    jcr->sendJobStatus(JS_Running);
109 
110    Dmsg1(50, "Begin append device=%s\n", dev->print_name());
111 
112    begin_data_spool(dcr);
113    begin_attribute_spool(jcr);
114 
115    /*
116     * Write Begin Session Record
117     */
118    if (!write_session_label(dcr, SOS_LABEL)) {
119       Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
120          dev->bstrerror());
121       jcr->setJobStatus(JS_ErrorTerminated);
122       ok = false;
123    }
124 
125    /* Tell File daemon to send data */
126    if (!fd->fsend(OK_data)) {
127       berrno be;
128       Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
129             be.bstrerror(fd->b_errno));
130       ok = false;
131    }
132 
133    /*
134     * Get Data from File daemon, write to device.  To clarify what is
135     *   going on here.  We expect:
136     *     - A stream header
137     *     - Multiple records of data
138     *     - EOD record
139     *
140     *    The Stream header is just used to synchronize things, and
141     *    none of the stream header is written to tape.
142     *    The Multiple records of data, contain first the Attributes,
143     *    then after another stream header, the file data, then
144     *    after another stream header, the MD5 data if any.
145     *
146     *   So we get the (stream header, data, EOD) three time for each
147     *   file. 1. for the Attributes, 2. for the file data if any,
148     *   and 3. for the MD5 if any.
149     */
150    dcr->VolFirstIndex = dcr->VolLastIndex = 0;
151    jcr->run_time = time(NULL);              /* start counting time for rates */
152 
153    GetMsg *qfd;
154 
155    qfd = New(GetMsg(jcr, fd, NULL, GETMSG_MAX_MSG_SIZE));
156    qfd->start_read_sock();
157 
158    for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
159 
160       /* Read Stream header from the File daemon.
161        *  The stream header consists of the following:
162        *    file_index (sequential Bacula file index, base 1)
163        *    stream     (Bacula number to distinguish parts of data)
164        *    stream_len (Expected length of this stream. This
165        *       will be the size backed up if the file does not
166        *       grow during the backup.
167        */
168       n = qfd->bget_msg(NULL);
169       if (n <= 0) {
170          if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
171             Dmsg0(200, "Got EOD on reading header.\n");
172             break;                    /* end of data */
173          }
174          Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
175                n, qfd->msglen, fd->bstrerror());
176          // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
177          possible_incomplete_job(jcr, last_file_index);
178          ok = false;
179          break;
180       }
181 
182       if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
183          // TODO ASX already done in bufmsg, should reuse the values
184          char buf[256];
185          Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
186          ok = false;
187          possible_incomplete_job(jcr, last_file_index);
188          break;
189       }
190 
191       Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
192          file_index, stream, stream_len);
193 
194       /*
195        * We make sure the file_index is advancing sequentially.
196        * An incomplete job can start the file_index at any number.
197        * otherwise, it must start at 1.
198        */
199       if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
200          goto fi_checked;
201       }
202       Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
203       if (file_index > 0 && (file_index == last_file_index ||
204           file_index == last_file_index + 1)) {
205          goto fi_checked;
206       }
207       Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
208             file_index, last_file_index);
209       possible_incomplete_job(jcr, last_file_index);
210       ok = false;
211       break;
212 
213 fi_checked:
214       if (file_index != last_file_index) {
215          jcr->JobFiles = file_index;
216          last_file_index = file_index;
217       }
218 
219       /* Read data stream from the File daemon.
220        *  The data stream is just raw bytes
221        */
222       while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
223 
224          rec.VolSessionId = jcr->VolSessionId;
225          rec.VolSessionTime = jcr->VolSessionTime;
226          rec.FileIndex = file_index;
227          rec.Stream = stream;
228          rec.StreamLen = stream_len;
229          rec.maskedStream = stream & STREAMMASK_TYPE;   /* strip high bits */
230          rec.data_len = qfd->msglen;
231          rec.data = qfd->msg;            /* use message buffer */
232 
233          /* Debug code: check if we must hangup or blowup */
234          if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
235             return false;
236          }
237          Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
238             rec.FileIndex, rec.VolSessionId,
239             stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
240             rec.data_len);
241          ok = dcr->write_record(&rec);
242          if (!ok) {
243             Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
244                   dcr->dev->print_name(), dcr->dev->bstrerror());
245             break;
246          }
247          jcr->JobBytes += rec.data_len;   /* increment bytes this job */
248          jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
249          Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
250             FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
251             stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
252 
253          send_attrs_to_dir(jcr, &rec);
254          Dmsg0(650, "Enter bnet_get\n");
255       }
256       Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
257 
258       if (fd->is_error()) {
259          if (!jcr->is_job_canceled()) {
260             Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
261             Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
262                   fd->bstrerror());
263             possible_incomplete_job(jcr, last_file_index);
264          }
265          ok = false;
266          break;
267       }
268    }
269 
270    qfd->wait_read_sock((ok == false) || jcr->is_job_canceled());
271    free_GetMsg(qfd);
272 
273    if (eblock != NULL) {
274       free_pool_memory(eblock);
275    }
276 
277    /* Create Job status for end of session label */
278    jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
279 
280    if (ok) {
281       /* Terminate connection with Client */
282       fd->fsend(OK_append);
283       do_client_commands(jcr);            /* finish dialog with Client */
284    } else {
285       fd->fsend("3999 Failed append\n");
286    }
287 
288    Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
289 
290    /*
291     * Check if we can still write. This may not be the case
292     *  if we are at the end of the tape or we got a fatal I/O error.
293     */
294    dcr->set_ameta();
295    if (ok || dev->can_write()) {
296       if (!dev->flush_before_eos(dcr)) {
297          /* Print only if ok and not cancelled to avoid spurious messages */
298          if (ok && !jcr->is_job_canceled()) {
299             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
300                   dev->print_name(), dev->bstrerror());
301             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
302             possible_incomplete_job(jcr, last_file_index);
303          }
304          jcr->setJobStatus(JS_ErrorTerminated);
305          ok = false;
306       }
307       if (!write_session_label(dcr, EOS_LABEL)) {
308          /* Print only if ok and not cancelled to avoid spurious messages */
309          if (ok && !jcr->is_job_canceled()) {
310             Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
311                   dev->bstrerror());
312             possible_incomplete_job(jcr, last_file_index);
313          }
314          jcr->setJobStatus(JS_ErrorTerminated);
315          ok = false;
316       }
317       /* Flush out final partial block of this session */
318       Dmsg1(200, "=== Flush adata=%d last block.\n", dcr->block->adata);
319       ASSERT(!dcr->block->adata);
320       if (!dcr->write_final_block_to_device()) {
321          /* Print only if ok and not cancelled to avoid spurious messages */
322          if (ok && !jcr->is_job_canceled()) {
323             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
324                   dev->print_name(), dev->bstrerror());
325             Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
326             possible_incomplete_job(jcr, last_file_index);
327          }
328          jcr->setJobStatus(JS_ErrorTerminated);
329          ok = false;
330       }
331    }
332    flush_jobmedia_queue(jcr);
333    if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
334       discard_data_spool(dcr);
335    } else {
336       /* Note: if commit is OK, the device will remain blocked */
337       commit_data_spool(dcr);
338    }
339 
340    /*
341     * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
342     *   and the subsequent Jmsg() editing will break
343     */
344    int32_t job_elapsed = time(NULL) - jcr->run_time;
345 
346    if (job_elapsed <= 0) {
347       job_elapsed = 1;
348    }
349 
350    Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
351          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
352          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
353 
354    /*
355     * Release the device -- and send final Vol info to DIR
356     *  and unlock it.
357     */
358    release_device(dcr);
359 
360    if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
361       discard_attribute_spool(jcr);
362    } else {
363       commit_attribute_spool(jcr);
364    }
365 
366    jcr->sendJobStatus();          /* update director */
367 
368    Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
369    return ok;
370 }
371 
372 
373 /* Send attributes and digest to Director for Catalog */
send_attrs_to_dir(JCR * jcr,DEV_RECORD * rec)374 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
375 {
376    if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES    ||
377        rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
378        rec->maskedStream == STREAM_RESTORE_OBJECT     ||
379        crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
380       if (!jcr->no_attributes) {
381          BSOCK *dir = jcr->dir_bsock;
382          if (are_attributes_spooled(jcr)) {
383             dir->set_spooling();
384          }
385          Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
386          if (!dir_update_file_attributes(jcr->dcr, rec)) {
387             Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
388                dir->bstrerror());
389             dir->clear_spooling();
390             return false;
391          }
392          dir->clear_spooling();
393       }
394    }
395    return true;
396 }
397