1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Append code for Storage daemon
21 * Kern Sibbald, May MM
22 */
23
24 #include "bacula.h"
25 #include "stored.h"
26
27
28 /* Responses sent to the File daemon */
29 static char OK_data[] = "3000 OK data\n";
30 static char OK_append[] = "3000 OK append data\n";
31
32 /* Forward referenced functions */
33
34
35 /*
36 * Check if we can mark this job incomplete
37 *
38 */
possible_incomplete_job(JCR * jcr,uint32_t last_file_index)39 void possible_incomplete_job(JCR *jcr, uint32_t last_file_index)
40 {
41 BSOCK *dir = jcr->dir_bsock;
42 /*
43 * Note, here we decide if it is worthwhile to restart
44 * the Job at this point. For the moment, if at least
45 * 10 Files have been seen.
46 * We must be sure that the saved files are safe.
47 * Using this function when there is as comm line problem is probably safe,
48 * it is inappropriate to use it for a any failure that could
49 * involve corrupted data.
50 * We cannot mark a job Incomplete if we have already flushed
51 * a bad JobMedia record (i.e. one beyond the last FileIndex
52 * that is known to be good).
53 */
54 if (jcr->spool_attributes && last_file_index > 10 &&
55 dir->get_lastFlushIndex() < last_file_index) {
56 jcr->setJobStatus(JS_Incomplete);
57 }
58 }
59
60 /*
61 * Append Data sent from Client (FD/SD)
62 *
63 */
do_append_data(JCR * jcr)64 bool do_append_data(JCR *jcr)
65 {
66 int32_t n;
67 int32_t file_index, stream, last_file_index;
68 uint64_t stream_len;
69 BSOCK *fd = jcr->file_bsock;
70 bool ok = true;
71 DEV_RECORD rec;
72 char buf1[100], buf2[100];
73 DCR *dcr = jcr->dcr;
74 DEVICE *dev;
75 char ec[50];
76 POOLMEM *eblock = NULL;
77 POOL_MEM errmsg(PM_EMSG);
78
79 if (!dcr) {
80 pm_strcpy(jcr->errmsg, _("DCR is NULL!!!\n"));
81 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
82 return false;
83 }
84 dev = dcr->dev;
85 if (!dev) {
86 pm_strcpy(jcr->errmsg, _("DEVICE is NULL!!!\n"));
87 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
88 return false;
89 }
90
91 Dmsg1(100, "Start append data. res=%d\n", dev->num_reserved());
92
93 memset(&rec, 0, sizeof(rec));
94
95 if (!fd->set_buffer_size(dcr->device->max_network_buffer_size, BNET_SETBUF_WRITE)) {
96 jcr->setJobStatus(JS_ErrorTerminated);
97 pm_strcpy(jcr->errmsg, _("Unable to set network buffer size.\n"));
98 Jmsg0(jcr, M_FATAL, 0, jcr->errmsg);
99 return false;
100 }
101
102 if (!acquire_device_for_append(dcr)) {
103 jcr->setJobStatus(JS_ErrorTerminated);
104 return false;
105 }
106
107 dev->start_of_job(dcr);
108 jcr->sendJobStatus(JS_Running);
109
110 Dmsg1(50, "Begin append device=%s\n", dev->print_name());
111
112 begin_data_spool(dcr);
113 begin_attribute_spool(jcr);
114
115 /*
116 * Write Begin Session Record
117 */
118 if (!write_session_label(dcr, SOS_LABEL)) {
119 Jmsg1(jcr, M_FATAL, 0, _("Write session label failed. ERR=%s\n"),
120 dev->bstrerror());
121 jcr->setJobStatus(JS_ErrorTerminated);
122 ok = false;
123 }
124
125 /* Tell File daemon to send data */
126 if (!fd->fsend(OK_data)) {
127 berrno be;
128 Jmsg1(jcr, M_FATAL, 0, _("Network send error to FD. ERR=%s\n"),
129 be.bstrerror(fd->b_errno));
130 ok = false;
131 }
132
133 /*
134 * Get Data from File daemon, write to device. To clarify what is
135 * going on here. We expect:
136 * - A stream header
137 * - Multiple records of data
138 * - EOD record
139 *
140 * The Stream header is just used to synchronize things, and
141 * none of the stream header is written to tape.
142 * The Multiple records of data, contain first the Attributes,
143 * then after another stream header, the file data, then
144 * after another stream header, the MD5 data if any.
145 *
146 * So we get the (stream header, data, EOD) three time for each
147 * file. 1. for the Attributes, 2. for the file data if any,
148 * and 3. for the MD5 if any.
149 */
150 dcr->VolFirstIndex = dcr->VolLastIndex = 0;
151 jcr->run_time = time(NULL); /* start counting time for rates */
152
153 GetMsg *qfd;
154
155 qfd = New(GetMsg(jcr, fd, NULL, GETMSG_MAX_MSG_SIZE));
156 qfd->start_read_sock();
157
158 for (last_file_index = 0; ok && !jcr->is_job_canceled(); ) {
159
160 /* Read Stream header from the File daemon.
161 * The stream header consists of the following:
162 * file_index (sequential Bacula file index, base 1)
163 * stream (Bacula number to distinguish parts of data)
164 * stream_len (Expected length of this stream. This
165 * will be the size backed up if the file does not
166 * grow during the backup.
167 */
168 n = qfd->bget_msg(NULL);
169 if (n <= 0) {
170 if (n == BNET_SIGNAL && qfd->msglen == BNET_EOD) {
171 Dmsg0(200, "Got EOD on reading header.\n");
172 break; /* end of data */
173 }
174 Jmsg3(jcr, M_FATAL, 0, _("Error reading data header from FD. n=%d msglen=%d ERR=%s\n"),
175 n, qfd->msglen, fd->bstrerror());
176 // ASX TODO the fd->bstrerror() can be related to the wrong error, I should Queue the error too
177 possible_incomplete_job(jcr, last_file_index);
178 ok = false;
179 break;
180 }
181
182 if (sscanf(qfd->msg, "%ld %ld %lld", &file_index, &stream, &stream_len) != 3) {
183 // TODO ASX already done in bufmsg, should reuse the values
184 char buf[256];
185 Jmsg1(jcr, M_FATAL, 0, _("Malformed data header from FD: %s\n"), asciidump(qfd->msg, qfd->msglen, buf, sizeof(buf)));
186 ok = false;
187 possible_incomplete_job(jcr, last_file_index);
188 break;
189 }
190
191 Dmsg3(890, "<filed: Header FilInx=%d stream=%d stream_len=%lld\n",
192 file_index, stream, stream_len);
193
194 /*
195 * We make sure the file_index is advancing sequentially.
196 * An incomplete job can start the file_index at any number.
197 * otherwise, it must start at 1.
198 */
199 if (jcr->rerunning && file_index > 0 && last_file_index == 0) {
200 goto fi_checked;
201 }
202 Dmsg2(400, "file_index=%d last_file_index=%d\n", file_index, last_file_index);
203 if (file_index > 0 && (file_index == last_file_index ||
204 file_index == last_file_index + 1)) {
205 goto fi_checked;
206 }
207 Jmsg2(jcr, M_FATAL, 0, _("FI=%d from FD not positive or last_FI=%d\n"),
208 file_index, last_file_index);
209 possible_incomplete_job(jcr, last_file_index);
210 ok = false;
211 break;
212
213 fi_checked:
214 if (file_index != last_file_index) {
215 jcr->JobFiles = file_index;
216 last_file_index = file_index;
217 }
218
219 /* Read data stream from the File daemon.
220 * The data stream is just raw bytes
221 */
222 while ((n=qfd->bget_msg(NULL)) > 0 && !jcr->is_job_canceled()) {
223
224 rec.VolSessionId = jcr->VolSessionId;
225 rec.VolSessionTime = jcr->VolSessionTime;
226 rec.FileIndex = file_index;
227 rec.Stream = stream;
228 rec.StreamLen = stream_len;
229 rec.maskedStream = stream & STREAMMASK_TYPE; /* strip high bits */
230 rec.data_len = qfd->msglen;
231 rec.data = qfd->msg; /* use message buffer */
232
233 /* Debug code: check if we must hangup or blowup */
234 if (handle_hangup_blowup(jcr, jcr->JobFiles, jcr->JobBytes)) {
235 return false;
236 }
237 Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
238 rec.FileIndex, rec.VolSessionId,
239 stream_to_ascii(buf1, rec.Stream,rec.FileIndex),
240 rec.data_len);
241 ok = dcr->write_record(&rec);
242 if (!ok) {
243 Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n",
244 dcr->dev->print_name(), dcr->dev->bstrerror());
245 break;
246 }
247 jcr->JobBytes += rec.data_len; /* increment bytes this job */
248 jcr->JobBytes += qfd->bmsg->jobbytes; // if the block as been downloaded, count it
249 Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n",
250 FI_to_ascii(buf1, rec.FileIndex), rec.VolSessionId,
251 stream_to_ascii(buf2, rec.Stream, rec.FileIndex), rec.data_len);
252
253 send_attrs_to_dir(jcr, &rec);
254 Dmsg0(650, "Enter bnet_get\n");
255 }
256 Dmsg2(650, "End read loop with FD. JobFiles=%d Stat=%d\n", jcr->JobFiles, n);
257
258 if (fd->is_error()) {
259 if (!jcr->is_job_canceled()) {
260 Dmsg1(350, "Network read error from FD. ERR=%s\n", fd->bstrerror());
261 Jmsg1(jcr, M_FATAL, 0, _("Network error reading from FD. ERR=%s\n"),
262 fd->bstrerror());
263 possible_incomplete_job(jcr, last_file_index);
264 }
265 ok = false;
266 break;
267 }
268 }
269
270 qfd->wait_read_sock((ok == false) || jcr->is_job_canceled());
271 free_GetMsg(qfd);
272
273 if (eblock != NULL) {
274 free_pool_memory(eblock);
275 }
276
277 /* Create Job status for end of session label */
278 jcr->setJobStatus(ok?JS_Terminated:JS_ErrorTerminated);
279
280 if (ok) {
281 /* Terminate connection with Client */
282 fd->fsend(OK_append);
283 do_client_commands(jcr); /* finish dialog with Client */
284 } else {
285 fd->fsend("3999 Failed append\n");
286 }
287
288 Dmsg1(200, "Write EOS label JobStatus=%c\n", jcr->JobStatus);
289
290 /*
291 * Check if we can still write. This may not be the case
292 * if we are at the end of the tape or we got a fatal I/O error.
293 */
294 dcr->set_ameta();
295 if (ok || dev->can_write()) {
296 if (!dev->flush_before_eos(dcr)) {
297 /* Print only if ok and not cancelled to avoid spurious messages */
298 if (ok && !jcr->is_job_canceled()) {
299 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
300 dev->print_name(), dev->bstrerror());
301 Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
302 possible_incomplete_job(jcr, last_file_index);
303 }
304 jcr->setJobStatus(JS_ErrorTerminated);
305 ok = false;
306 }
307 if (!write_session_label(dcr, EOS_LABEL)) {
308 /* Print only if ok and not cancelled to avoid spurious messages */
309 if (ok && !jcr->is_job_canceled()) {
310 Jmsg1(jcr, M_FATAL, 0, _("Error writing end session label. ERR=%s\n"),
311 dev->bstrerror());
312 possible_incomplete_job(jcr, last_file_index);
313 }
314 jcr->setJobStatus(JS_ErrorTerminated);
315 ok = false;
316 }
317 /* Flush out final partial block of this session */
318 Dmsg1(200, "=== Flush adata=%d last block.\n", dcr->block->adata);
319 ASSERT(!dcr->block->adata);
320 if (!dcr->write_final_block_to_device()) {
321 /* Print only if ok and not cancelled to avoid spurious messages */
322 if (ok && !jcr->is_job_canceled()) {
323 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
324 dev->print_name(), dev->bstrerror());
325 Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
326 possible_incomplete_job(jcr, last_file_index);
327 }
328 jcr->setJobStatus(JS_ErrorTerminated);
329 ok = false;
330 }
331 }
332 flush_jobmedia_queue(jcr);
333 if (!ok && !jcr->is_JobStatus(JS_Incomplete)) {
334 discard_data_spool(dcr);
335 } else {
336 /* Note: if commit is OK, the device will remain blocked */
337 commit_data_spool(dcr);
338 }
339
340 /*
341 * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
342 * and the subsequent Jmsg() editing will break
343 */
344 int32_t job_elapsed = time(NULL) - jcr->run_time;
345
346 if (job_elapsed <= 0) {
347 job_elapsed = 1;
348 }
349
350 Jmsg(dcr->jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
351 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
352 edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
353
354 /*
355 * Release the device -- and send final Vol info to DIR
356 * and unlock it.
357 */
358 release_device(dcr);
359
360 if ((!ok || jcr->is_job_canceled()) && !jcr->is_JobStatus(JS_Incomplete)) {
361 discard_attribute_spool(jcr);
362 } else {
363 commit_attribute_spool(jcr);
364 }
365
366 jcr->sendJobStatus(); /* update director */
367
368 Dmsg1(100, "return from do_append_data() ok=%d\n", ok);
369 return ok;
370 }
371
372
373 /* Send attributes and digest to Director for Catalog */
send_attrs_to_dir(JCR * jcr,DEV_RECORD * rec)374 bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec)
375 {
376 if (rec->maskedStream == STREAM_UNIX_ATTRIBUTES ||
377 rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX ||
378 rec->maskedStream == STREAM_RESTORE_OBJECT ||
379 crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) {
380 if (!jcr->no_attributes) {
381 BSOCK *dir = jcr->dir_bsock;
382 if (are_attributes_spooled(jcr)) {
383 dir->set_spooling();
384 }
385 Dmsg1(850, "Send attributes to dir. FI=%d\n", rec->FileIndex);
386 if (!dir_update_file_attributes(jcr->dcr, rec)) {
387 Jmsg(jcr, M_FATAL, 0, _("Error updating file attributes. ERR=%s\n"),
388 dir->bstrerror());
389 dir->clear_spooling();
390 return false;
391 }
392 dir->clear_spooling();
393 }
394 }
395 return true;
396 }
397