1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * SD -- vbackup.c -- responsible for doing virtual backup jobs.
21 *
22 * Kern Sibbald, January MMVI
23 *
24 */
25
26 #include "bacula.h"
27 #include "stored.h"
28
29 /* Import functions */
30 extern char Job_end[];
31
32 /* Forward referenced subroutines */
33 static bool record_cb(DCR *dcr, DEV_RECORD *rec);
34
35 /*
36 * Read Data and send to File Daemon
37 * Returns: false on failure
38 * true on success
39 */
do_vbackup(JCR * jcr)40 bool do_vbackup(JCR *jcr)
41 {
42 bool ok = true;
43 BSOCK *dir = jcr->dir_bsock;
44 const char *Type;
45 char ec1[50];
46 DEVICE *dev;
47
48 switch(jcr->getJobType()) {
49 case JT_MIGRATE:
50 Type = "Migration";
51 break;
52 case JT_ARCHIVE:
53 Type = "Archive";
54 break;
55 case JT_COPY:
56 Type = "Copy";
57 break;
58 case JT_BACKUP:
59 Type = "Virtual Backup";
60 break;
61 default:
62 Type = "Unknown";
63 break;
64 }
65
66 /* TODO: Remove when the new match_all is well tested */
67 jcr->use_new_match_all = use_new_match_all;
68
69 Dmsg1(20, "Start read data. newbsr=%d\n", jcr->use_new_match_all);
70
71 if (!jcr->read_dcr) {
72 Jmsg(jcr, M_FATAL, 0, _("Read device not initialized or defined for this job.\n"));
73 goto bail_out;
74 }
75 if (!jcr->dcr) {
76 Jmsg(jcr, M_FATAL, 0, _("Write device not initialized or defined for this job.\n"));
77 goto bail_out;
78 }
79 Dmsg2(100, "read_dcr=%p write_dcr=%p\n", jcr->read_dcr, jcr->dcr);
80
81 if (jcr->NumReadVolumes == 0) {
82 Jmsg(jcr, M_FATAL, 0, _("No Volume names found for %s.\n"), Type);
83 goto bail_out;
84 }
85
86 Dmsg3(200, "Found %d volumes names for %s. First=%s\n", jcr->NumReadVolumes,
87 jcr->VolList->VolumeName, Type);
88
89 ASSERT(jcr->read_dcr != jcr->dcr);
90 ASSERT(jcr->read_dcr->dev != jcr->dcr->dev);
91 /* Ready devices for reading and writing */
92 if (!acquire_device_for_read(jcr->read_dcr) ||
93 !acquire_device_for_append(jcr->dcr)) {
94 jcr->setJobStatus(JS_ErrorTerminated);
95 goto bail_out;
96 }
97 jcr->dcr->dev->start_of_job(jcr->dcr);
98
99 Dmsg2(200, "===== After acquire pos %u:%u\n", jcr->dcr->dev->file, jcr->dcr->dev->block_num);
100 jcr->sendJobStatus(JS_Running);
101
102 begin_data_spool(jcr->dcr);
103 begin_attribute_spool(jcr);
104
105 jcr->dcr->VolFirstIndex = jcr->dcr->VolLastIndex = 0;
106 jcr->run_time = time(NULL);
107 set_start_vol_position(jcr->dcr);
108
109 jcr->JobFiles = 0;
110 jcr->dcr->set_ameta();
111 jcr->read_dcr->set_ameta();
112 ok = read_records(jcr->read_dcr, record_cb, mount_next_read_volume);
113 goto ok_out;
114
115 bail_out:
116 ok = false;
117
118 ok_out:
119 if (jcr->dedup) {
120 /* was create in record_cb() to do rehydration */
121 delete jcr->dedup;
122 jcr->dedup = NULL;
123 }
124 if (jcr->dcr) {
125 jcr->dcr->set_ameta();
126 dev = jcr->dcr->dev;
127 Dmsg1(100, "ok=%d\n", ok);
128 if (ok || dev->can_write()) {
129 if (!dev->flush_before_eos(jcr->dcr)) {
130 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
131 dev->print_name(), dev->bstrerror());
132 Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n"));
133 ok = false;
134 }
135 /* Flush out final ameta partial block of this session */
136 if (!jcr->dcr->write_final_block_to_device()) {
137 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
138 dev->print_name(), dev->bstrerror());
139 Dmsg0(100, _("Set ok=FALSE after write_final_block_to_device.\n"));
140 ok = false;
141 }
142 Dmsg2(200, "Flush block to device pos %u:%u\n", dev->file, dev->block_num);
143 }
144 flush_jobmedia_queue(jcr);
145 if (!ok) {
146 discard_data_spool(jcr->dcr);
147 } else {
148 /* Note: if commit is OK, the device will remain blocked */
149 commit_data_spool(jcr->dcr);
150 }
151
152 /*
153 * Don't use time_t for job_elapsed as time_t can be 32 or 64 bits,
154 * and the subsequent Jmsg() editing will break
155 */
156 int32_t job_elapsed = time(NULL) - jcr->run_time;
157
158 if (job_elapsed <= 0) {
159 job_elapsed = 1;
160 }
161
162 Jmsg(jcr, M_INFO, 0, _("Elapsed time=%02d:%02d:%02d, Transfer rate=%s Bytes/second\n"),
163 job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
164 edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec1));
165
166 /* Release the device -- and send final Vol info to DIR */
167 release_device(jcr->dcr);
168
169 if (!ok || job_canceled(jcr)) {
170 discard_attribute_spool(jcr);
171 } else {
172 commit_attribute_spool(jcr);
173 }
174 }
175
176 if (jcr->read_dcr) {
177 if (!release_device(jcr->read_dcr)) {
178 ok = false;
179 }
180 }
181
182 jcr->sendJobStatus(); /* update director */
183
184 Dmsg0(30, "Done reading.\n");
185 jcr->end_time = time(NULL);
186 dequeue_messages(jcr); /* send any queued messages */
187 if (ok) {
188 jcr->setJobStatus(JS_Terminated);
189 }
190 generate_daemon_event(jcr, "JobEnd");
191 generate_plugin_event(jcr, bsdEventJobEnd);
192 dir->fsend(Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles,
193 edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors, jcr->StatusErrMsg);
194 Dmsg6(100, Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles, ec1, jcr->JobErrors, jcr->StatusErrMsg);
195 dequeue_daemon_messages(jcr);
196
197 dir->signal(BNET_EOD); /* send EOD to Director daemon */
198 free_plugins(jcr); /* release instantiated plugins */
199
200 return ok;
201 }
202
203
204 /*
205 * Called here for each record from read_records()
206 * Returns: true if OK
207 * false if error
208 */
record_cb(DCR * dcr,DEV_RECORD * rec)209 static bool record_cb(DCR *dcr, DEV_RECORD *rec)
210 {
211 JCR *jcr = dcr->jcr;
212 DEVICE *dev = jcr->dcr->dev;
213 char buf1[100], buf2[100];
214 bool restoredatap = false;
215 POOLMEM *orgdata = NULL;
216 uint32_t orgdata_len = 0;
217 bool ret = false;
218
219 /* If label and not for us, discard it */
220 if (rec->FileIndex < 0 && rec->match_stat <= 0) {
221 ret = true;
222 goto bail_out;
223 }
224 /* We want to write SOS_LABEL and EOS_LABEL discard all others */
225 switch (rec->FileIndex) {
226 case PRE_LABEL:
227 case VOL_LABEL:
228 case EOT_LABEL:
229 case EOM_LABEL:
230 ret = true; /* don't write vol labels */
231 goto bail_out;
232 }
233
234 /*
235 * For normal migration jobs, FileIndex values are sequential because
236 * we are dealing with one job. However, for Vbackup (consolidation),
237 * we will be getting records from multiple jobs and writing them back
238 * out, so we need to ensure that the output FileIndex is sequential.
239 * We do so by detecting a FileIndex change and incrementing the
240 * JobFiles, which we then use as the output FileIndex.
241 */
242 if (rec->FileIndex >= 0) {
243 /* If something changed, increment FileIndex */
244 if (rec->VolSessionId != rec->last_VolSessionId ||
245 rec->VolSessionTime != rec->last_VolSessionTime ||
246 rec->FileIndex != rec->last_FileIndex) {
247 jcr->JobFiles++;
248 rec->last_VolSessionId = rec->VolSessionId;
249 rec->last_VolSessionTime = rec->VolSessionTime;
250 rec->last_FileIndex = rec->FileIndex;
251 }
252 rec->FileIndex = jcr->JobFiles; /* set sequential output FileIndex */
253 }
254
255 /* TODO: If user really wants to do rehydrate the data, we should propose
256 * this option.
257 */
258
259 /* Do rehydration if the write device doesn't support deduplication */
260 if (jcr->dcr->device->dev_type != B_DEDUP_DEV &&
261 rec->Stream & STREAM_BIT_DEDUPLICATION_DATA) {
262 if (!jcr->read_dcr->dev->setup_dedup_rehydration_interface(jcr->read_dcr)) {
263 Jmsg0(jcr, M_FATAL, 0, _("Cannot do rehydration, device is not dedup aware\n"));
264 goto bail_out;
265 }
266 bool despite_of_error = false; /* don't try to cheat for now */
267 int size;
268 int err = jcr->dedup->record_rehydration(dcr, rec, jcr->dedup->get_msgbuf(), jcr->errmsg, despite_of_error, &size);
269 if (err < 0) {
270 /* cannot read data from DSE */
271 Jmsg(jcr, M_FATAL, 0, "%s", jcr->errmsg);
272 goto bail_out;
273 }
274
275 /* Keep old pointers and restore them when at the end,
276 * after this point, we need to use goto bail_out
277 */
278 orgdata = rec->data;
279 orgdata_len = rec->data_len;
280 restoredatap = true;
281
282 rec->data = jcr->dedup->get_msgbuf();
283 rec->data_len = size;
284 }
285
286 /*
287 * Modify record SessionId and SessionTime to correspond to
288 * output.
289 */
290 rec->VolSessionId = jcr->VolSessionId;
291 rec->VolSessionTime = jcr->VolSessionTime;
292 Dmsg5(200, "before write JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
293 jcr->JobId,
294 FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
295 stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
296
297 rec->state_bits |= is_dedup_ref(rec, true) ? REC_NO_SPLIT : 0;
298 if (!jcr->dcr->write_record(rec)) {
299 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
300 dev->print_name(), dev->bstrerror());
301 goto bail_out;
302 }
303 /* Restore packet */
304 rec->VolSessionId = rec->last_VolSessionId;
305 rec->VolSessionTime = rec->last_VolSessionTime;
306 if (rec->FileIndex < 0) {
307 ret = true; /* don't send LABELs to Dir */
308 goto bail_out;
309 }
310 jcr->JobBytes += rec->data_len; /* increment bytes this job */
311 Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n",
312 jcr->JobId,
313 FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId,
314 stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len);
315
316 send_attrs_to_dir(jcr, rec);
317 ret = true;
318
319 bail_out:
320 if (restoredatap) {
321 rec->data = orgdata;
322 rec->data_len = orgdata_len;
323 }
324 return ret;
325 }
326