1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * SD -- vbackup.c --  responsible for doing virtual backup jobs.
21  *
22  *     Kern Sibbald, January MMVI
23  *
24  */
25 
26 #include "bacula.h"
27 #include "stored.h"
28 
29 /* Import functions */
30 extern char Job_end[];
31 
32 /* Forward referenced subroutines */
33 static bool record_cb(DCR *dcr, DEV_RECORD *rec);
34 
35 /*
36  *  Read Data and send to File Daemon
37  *   Returns: false on failure
38  *            true  on success
39  */
do_vbackup(JCR * jcr)40 bool do_vbackup(JCR *jcr)
41 {
42    bool ok = true;
43    BSOCK *dir = jcr->dir_bsock;
44    const char *Type;
45    char ec1[50];
46    DEVICE *dev;
47 
48    switch(jcr->getJobType()) {
49    case JT_MIGRATE:
50       Type = "Migration";
51       break;
52    case JT_ARCHIVE:
53       Type = "Archive";
54       break;
55    case JT_COPY:
56       Type = "Copy";
57       break;
58    case JT_BACKUP:
59       Type = "Virtual Backup";
60       break;
61    default:
62       Type = "Unknown";
63       break;
64    }
65 
66    /* TODO: Remove when the new match_all is well tested */
67    jcr->use_new_match_all = use_new_match_all;
68 
69    Dmsg1(20, "Start read data. newbsr=%d\n", jcr->use_new_match_all);
70 
71    if (!jcr->read_dcr) {
72       Jmsg(jcr, M_FATAL, 0, _("Read device not initialized or defined for this job.\n"));
73       goto bail_out;
74    }
75    if (!jcr->dcr) {
76       Jmsg(jcr, M_FATAL, 0, _("Write device not initialized or defined for this job.\n"));
77       goto bail_out;
78    }
79    Dmsg2(100, "read_dcr=%p write_dcr=%p\n", jcr->read_dcr, jcr->dcr);
80 
81    if (jcr->NumReadVolumes == 0) {
82       Jmsg(jcr, M_FATAL, 0, _("No Volume names found for %s.\n"), Type);
83       goto bail_out;
84    }
85 
86    Dmsg3(200, "Found %d volumes names for %s. First=%s\n", jcr->NumReadVolumes,
87       jcr->VolList->VolumeName, Type);
88 
89    ASSERT(jcr->read_dcr != jcr->dcr);
90    ASSERT(jcr->read_dcr->dev != jcr->dcr->dev);
91    /* Ready devices for reading and writing */
92    if (!acquire_device_for_read(jcr->read_dcr) ||
93        !acquire_device_for_append(jcr->dcr)) {
94       jcr->setJobStatus(JS_ErrorTerminated);
95       goto bail_out;
96    }
97    jcr->dcr->dev->start_of_job(jcr->dcr);
98 
99    Dmsg2(200, "===== After acquire pos %u:%u\n", jcr->dcr->dev->file, jcr->dcr->dev->block_num);
100    jcr->sendJobStatus(JS_Running);
101 
102    begin_data_spool(jcr->dcr);
103    begin_attribute_spool(jcr);
104 
105    jcr->dcr->VolFirstIndex = jcr->dcr->VolLastIndex = 0;
106    jcr->run_time = time(NULL);
107    set_start_vol_position(jcr->dcr);
108 
109    jcr->JobFiles = 0;
110    jcr->dcr->set_ameta();
111    jcr->read_dcr->set_ameta();
112    ok = read_records(jcr->read_dcr, record_cb, mount_next_read_volume);
113    goto ok_out;
114 
115 bail_out:
116    ok = false;
117 
118 ok_out:
119    if (jcr->dedup) {
120       /* was create in record_cb() to do rehydration */
121       delete jcr->dedup;
122       jcr->dedup = NULL;
123    }
124    if (jcr->dcr) {
125       jcr->dcr->set_ameta();
126       dev = jcr->dcr->dev;
127       Dmsg1(100, "ok=%d\n", ok);
128       if (ok || dev->can_write()) {
129          if (!dev->flush_before_eos(jcr->dcr)) {
130             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
131                   dev->print_name(), dev->bstrerror());
132             Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
133             ok = false;
134          }
135          /* Flush out final ameta partial block of this session */
136          if (!jcr->dcr->write_final_block_to_device()) {
137             Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
138                   dev->print_name(), dev->bstrerror());
139             Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
140             ok = false;
141          }
142          Dmsg2(200, "Flush block to device pos %u:%u\n", dev->file, dev->block_num);
143       }
144       flush_jobmedia_queue(jcr);
145       if (!ok) {
146          discard_data_spool(jcr->dcr);
147       } else {
148          /* Note: if commit is OK, the device will remain blocked */
149          commit_data_spool(jcr->dcr);
150       }
151 
152       /*
153        * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
154        *   and the subsequent Jmsg() editing will break
155        */
156       int32_t job_elapsed = time(NULL) - jcr->run_time;
157 
158       if (job_elapsed <= 0) {
159          job_elapsed = 1;
160       }
161 
162       Jmsg(jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
163             job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
164             edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec1));
165 
166       /* Release the device -- and send final Vol info to DIR */
167       release_device(jcr->dcr);
168 
169       if (!ok || job_canceled(jcr)) {
170          discard_attribute_spool(jcr);
171       } else {
172          commit_attribute_spool(jcr);
173       }
174    }
175 
176    if (jcr->read_dcr) {
177       if (!release_device(jcr->read_dcr)) {
178          ok = false;
179       }
180    }
181 
182    jcr->sendJobStatus();              /* update director */
183 
184    Dmsg0(30, "Done reading.\n");
185    jcr->end_time = time(NULL);
186    dequeue_messages(jcr);             /* send any queued messages */
187    if (ok) {
188       jcr->setJobStatus(JS_Terminated);
189    }
190    generate_daemon_event(jcr, "JobEnd");
191    generate_plugin_event(jcr, bsdEventJobEnd);
192    dir->fsend(Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles,
193       edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors, jcr->StatusErrMsg);
194    Dmsg6(100, Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles, ec1, jcr->JobErrors, jcr->StatusErrMsg);
195    dequeue_daemon_messages(jcr);
196 
197    dir->signal(BNET_EOD);             /* send EOD to Director daemon */
198    free_plugins(jcr);                 /* release instantiated plugins */
199 
200    return ok;
201 }
202 
203 
204 /*
205  * Called here for each record from read_records()
206  *  Returns: true if OK
207  *           false if error
208  */
record_cb(DCR * dcr,DEV_RECORD * rec)209 static bool record_cb(DCR *dcr, DEV_RECORD *rec)
210 {
211    JCR *jcr = dcr->jcr;
212    DEVICE *dev = jcr->dcr->dev;
213    char buf1[100], buf2[100];
214    bool     restoredatap = false;
215    POOLMEM *orgdata = NULL;
216    uint32_t orgdata_len = 0;
217    bool ret = false;
218 
219    /* If label and not for us, discard it */
220    if (rec->FileIndex < 0 && rec->match_stat <= 0) {
221       ret = true;
222       goto bail_out;
223    }
224    /* We want to write SOS_LABEL and EOS_LABEL discard all others */
225    switch (rec->FileIndex) {
226    case PRE_LABEL:
227    case VOL_LABEL:
228    case EOT_LABEL:
229    case EOM_LABEL:
230       ret = true;                    /* don't write vol labels */
231       goto bail_out;
232    }
233 
234    /*
235     * For normal migration jobs, FileIndex values are sequential because
236     *  we are dealing with one job.  However, for Vbackup (consolidation),
237     *  we will be getting records from multiple jobs and writing them back
238     *  out, so we need to ensure that the output FileIndex is sequential.
239     *  We do so by detecting a FileIndex change and incrementing the
240     *  JobFiles, which we then use as the output FileIndex.
241     */
242    if (rec->FileIndex >= 0) {
243       /* If something changed, increment FileIndex */
244       if (rec->VolSessionId != rec->last_VolSessionId ||
245           rec->VolSessionTime != rec->last_VolSessionTime ||
246           rec->FileIndex != rec->last_FileIndex) {
247          jcr->JobFiles++;
248          rec->last_VolSessionId = rec->VolSessionId;
249          rec->last_VolSessionTime = rec->VolSessionTime;
250          rec->last_FileIndex = rec->FileIndex;
251       }
252       rec->FileIndex = jcr->JobFiles;     /* set sequential output FileIndex */
253    }
254 
255    /* TODO: If user really wants to do rehydrate the data, we should propose
256     * this option.
257     */
258 
259    /* Do rehydration if the write device doesn't support deduplication */
260    if (jcr->dcr->device->dev_type != B_DEDUP_DEV &&
261        rec->Stream & STREAM_BIT_DEDUPLICATION_DATA) {
262       if (!jcr->read_dcr->dev->setup_dedup_rehydration_interface(jcr->read_dcr)) {
263          Jmsg0(jcr, M_FATAL, 0, _("Cannot do rehydration, device is not dedup aware\n"));
264          goto bail_out;
265       }
266       bool despite_of_error = false; /* don't try to cheat for now */
267       int size;
268       int err = jcr->dedup->record_rehydration(dcr, rec, jcr->dedup->get_msgbuf(), jcr->errmsg, despite_of_error, &size);
269       if (err < 0) {
270          /* cannot read data from DSE */
271          Jmsg(jcr, M_FATAL, 0, "%s", jcr->errmsg);
272          goto bail_out;
273       }
274 
275       /* Keep old pointers and restore them when at the end,
276        * after this point, we need to use goto bail_out
277        */
278       orgdata = rec->data;
279       orgdata_len = rec->data_len;
280       restoredatap = true;
281 
282       rec->data = jcr->dedup->get_msgbuf();
283       rec->data_len = size;
284    }
285 
286    /*
287     * Modify record SessionId and SessionTime to correspond to
288     * output.
289     */
290    rec->VolSessionId = jcr->VolSessionId;
291    rec->VolSessionTime = jcr->VolSessionTime;
292    Dmsg5(200, "before write JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
293       jcr->JobId,
294       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
295       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
296 
297    rec->state_bits |= is_dedup_ref(rec, true) ? REC_NO_SPLIT : 0;
298    if (!jcr->dcr->write_record(rec)) {
299       Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
300             dev->print_name(), dev->bstrerror());
301       goto bail_out;
302    }
303    /* Restore packet */
304    rec->VolSessionId = rec->last_VolSessionId;
305    rec->VolSessionTime = rec->last_VolSessionTime;
306    if (rec->FileIndex < 0) {
307       ret = true;                    /* don't send LABELs to Dir */
308       goto bail_out;
309    }
310    jcr->JobBytes += rec->data_len;   /* increment bytes this job */
311    Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
312       jcr->JobId,
313       FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
314       stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
315 
316    send_attrs_to_dir(jcr, rec);
317    ret = true;
318 
319 bail_out:
320    if (restoredatap) {
321       rec->data = orgdata;
322       rec->data_len = orgdata_len;
323    }
324    return ret;
325 }
326