1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * This file handles commands from the File daemon.
21 *
22 * Written by Kern Sibbald, MM
23 *
24 * We get here because the Director has initiated a Job with
25 * the Storage daemon, then done the same with the File daemon,
26 * then when the Storage daemon receives a proper connection from
27 * the File daemon, control is passed here to handle the
28 * subsequent File daemon commands.
29 *
30 *
31 */
32
33 #include "bacula.h"
34 #include "stored.h"
35
36 /* Forward referenced functions */
37 static bool response(JCR *jcr, BSOCK *bs, const char *resp, const char *cmd);
38
39 /* Imported variables */
40 extern STORES *me;
41
42 /* Static variables */
43 static char ferrmsg[] = "3900 Invalid command\n";
44 static char OK_data[] = "3000 OK data\n";
45
46 /* Imported functions */
47 extern bool do_append_data(JCR *jcr);
48 extern bool do_read_data(JCR *jcr);
49 extern bool do_backup_job(JCR *jcr);
50
51 /* Forward referenced FD commands */
52 static bool append_open_session(JCR *jcr);
53 static bool append_close_session(JCR *jcr);
54 static bool append_data_cmd(JCR *jcr);
55 static bool append_end_session(JCR *jcr);
56 static bool read_open_session(JCR *jcr);
57 static bool read_data_cmd(JCR *jcr);
58 static bool read_close_session(JCR *jcr);
59 static bool read_control_cmd(JCR *jcr);
60 static bool sd_testnetwork_cmd(JCR *jcr);
61
62 /* Exported function */
63 bool get_bootstrap_file(JCR *jcr, BSOCK *bs);
64
65 struct s_cmds {
66 const char *cmd;
67 bool (*func)(JCR *jcr);
68 };
69
70 /*
71 * The following are the recognized commands from the File daemon
72 */
73 static struct s_cmds fd_cmds[] = {
74 {"append open", append_open_session},
75 {"append data", append_data_cmd},
76 {"append end", append_end_session},
77 {"append close", append_close_session},
78 {"read open", read_open_session},
79 {"read data", read_data_cmd},
80 {"read close", read_close_session},
81 {"read control", read_control_cmd},
82 {"testnetwork", sd_testnetwork_cmd},
83 {NULL, NULL} /* list terminator */
84 };
85
86 /* Commands from the File daemon that require additional scanning */
87 static char read_open[] = "read open session = %127s %ld %ld %ld %ld %ld %ld\n";
88
89 /* Responses sent to the File daemon */
90 static char NO_open[] = "3901 Error session already open\n";
91 static char NOT_opened[] = "3902 Error session not opened\n";
92 static char ERROR_open[] = "3904 Error open session, bad parameters\n";
93 static char OK_end[] = "3000 OK end\n";
94 static char OK_close[] = "3000 OK close Status = %d\n";
95 static char OK_open[] = "3000 OK open ticket = %d\n";
96 static char ERROR_append[] = "3903 Error append data: %s\n";
97
98 /* Information sent to the Director */
99 static char Job_start[] = "3010 Job %s start\n";
100 char Job_end[] =
101 "3099 Job %s end JobStatus=%d JobFiles=%d JobBytes=%s JobErrors=%u ErrMsg=%s\n";
102
103 /*
104 * Run a Client Job -- Client already authorized
105 * Note: this can be either a backup or restore or
106 * migrate/copy job.
107 *
108 * Basic task here is:
109 * - Read a command from the Client -- FD or SD
110 * - Execute it
111 *
112 */
run_job(JCR * jcr)113 void run_job(JCR *jcr)
114 {
115 BSOCK *dir = jcr->dir_bsock;
116 char ec1[30];
117
118 dir->set_jcr(jcr);
119 Dmsg1(120, "Start run Job=%s\n", jcr->Job);
120 dir->fsend(Job_start, jcr->Job);
121 jcr->start_time = time(NULL);
122 jcr->run_time = jcr->start_time;
123 jcr->sendJobStatus(JS_Running);
124
125 /* TODO: Remove when the new match_all is well tested */
126 jcr->use_new_match_all = use_new_match_all;
127 /*
128 * A migrate or copy job does both a restore (read_data) and
129 * a backup (append_data).
130 * Otherwise we do the commands that the client sends
131 * which are for normal backup or restore jobs.
132 */
133 Dmsg3(050, "==== JobType=%c run_job=%d sd_client=%d\n", jcr->getJobType(), jcr->JobId, jcr->sd_client);
134 if (jcr->is_JobType(JT_BACKUP) && jcr->sd_client) {
135 jcr->session_opened = true;
136 Dmsg0(050, "Do: receive for 3000 OK data then append\n");
137 if (!response(jcr, jcr->file_bsock, "3000 OK data\n", "Append data")) {
138 Dmsg1(050, "Expect: 3000 OK data, got: %s", jcr->file_bsock->msg);
139 Jmsg0(jcr, M_FATAL, 0, "Append data not accepted\n");
140 goto bail_out;
141 }
142 append_data_cmd(jcr);
143 append_end_session(jcr);
144 } else if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
145 jcr->session_opened = true;
146 /* send "3000 OK data" now to avoid a dead lock, the other side is also
147 * waiting for one. The old peace of code was reading the "3000 OK" reply
148 * at the end of the backup (not really appropriate).
149 * dedup need duplex communication with the other side and need the
150 * "3000 OK" to be out of the socket, and be handle here by the right
151 * peace of code */
152 Dmsg0(DT_DEDUP|215, "send OK_data\n");
153 jcr->file_bsock->fsend(OK_data);
154 jcr->is_ok_data_sent = true;
155 Dmsg1(050, "Do: read_data_cmd file_bsock=%p\n", jcr->file_bsock);
156 Dmsg0(050, "Do: receive for 3000 OK data then read\n");
157 if (!response(jcr, jcr->file_bsock, "3000 OK data\n", "Data received")) {
158 Dmsg1(050, "Expect 3000 OK data, got: %s", jcr->file_bsock->msg);
159 Jmsg0(jcr, M_FATAL, 0, "Read data not accepted\n");
160 jcr->file_bsock->signal(BNET_EOD);
161 goto bail_out;
162 }
163 read_data_cmd(jcr);
164 jcr->file_bsock->signal(BNET_EOD);
165 } else {
166 /* Either a Backup or Restore job */
167 Dmsg0(050, "Do: do_client_commands\n");
168 do_client_commands(jcr);
169 }
170 bail_out:
171 jcr->end_time = time(NULL);
172 flush_jobmedia_queue(jcr);
173 dequeue_messages(jcr); /* send any queued messages */
174 jcr->setJobStatus(JS_Terminated);
175
176 /* Keep track of the important events */
177 events_send_msg(jcr, "SJ0002",
178 EVENTS_TYPE_JOB, jcr->director->hdr.name, (intptr_t)jcr,
179 "Job End jobid=%i job=%s status=%c",
180 jcr->JobId, jcr->Job, jcr->JobStatus);
181
182 generate_daemon_event(jcr, "JobEnd");
183 generate_plugin_event(jcr, bsdEventJobEnd);
184 bash_spaces(jcr->StatusErrMsg);
185 dir->fsend(Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles,
186 edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors, jcr->StatusErrMsg);
187 Dmsg1(100, "==== %s", dir->msg);
188 unbash_spaces(jcr->StatusErrMsg);
189 dequeue_daemon_messages(jcr);
190 dir->signal(BNET_EOD); /* send EOD to Director daemon */
191 free_plugins(jcr); /* release instantiated plugins */
192 garbage_collect_memory_pool();
193 return;
194 }
195
196 /*
197 * Now talk to the Client (FD/SD) and do what he says
198 */
do_client_commands(JCR * jcr)199 void do_client_commands(JCR *jcr)
200 {
201 int i;
202 bool found, quit;
203 BSOCK *fd = jcr->file_bsock;
204
205 if (!fd) {
206 return;
207 }
208 fd->set_jcr(jcr);
209 for (quit=false; !quit;) {
210 int stat;
211
212 /* Read command coming from the File daemon */
213 stat = fd->recv();
214 if (fd->is_stop()) { /* hard eof or error */
215 break; /* connection terminated */
216 }
217 if (stat <= 0) {
218 continue; /* ignore signals and zero length msgs */
219 }
220 Dmsg1(110, "<filed: %s", fd->msg);
221 found = false;
222 for (i=0; fd_cmds[i].cmd; i++) {
223 if (strncmp(fd_cmds[i].cmd, fd->msg, strlen(fd_cmds[i].cmd)) == 0) {
224 found = true; /* indicate command found */
225 jcr->errmsg[0] = 0;
226 if (!fd_cmds[i].func(jcr)) { /* do command */
227 /* Note fd->msg command may be destroyed by comm activity */
228 if (!job_canceled(jcr) && !jcr->is_incomplete()) {
229 strip_trailing_junk(fd->msg);
230 if (jcr->errmsg[0]) {
231 strip_trailing_junk(jcr->errmsg);
232 Jmsg2(jcr, M_FATAL, 0, _("Command error with FD msg=\"%s\", SD hanging up. ERR=%s\n"),
233 fd->msg, jcr->errmsg);
234 } else {
235 Jmsg1(jcr, M_FATAL, 0, _("Command error with FD msg=\"%s\", SD hanging up.\n"),
236 fd->msg);
237 }
238 jcr->setJobStatus(JS_ErrorTerminated);
239 }
240 quit = true;
241 }
242 break;
243 }
244 }
245 if (!found) { /* command not found */
246 if (!job_canceled(jcr)) {
247 Jmsg1(jcr, M_FATAL, 0, _("FD command not found: %s\n"), fd->msg);
248 Dmsg1(110, "<filed: Command not found: %s\n", fd->msg);
249 }
250 fd->fsend(ferrmsg);
251 break;
252 }
253 }
254 fd->signal(BNET_TERMINATE); /* signal to FD job is done */
255 }
256
257 /*
258 * Append Data command
259 * Open Data Channel and receive Data for archiving
260 * Write the Data to the archive device
261 */
append_data_cmd(JCR * jcr)262 static bool append_data_cmd(JCR *jcr)
263 {
264 BSOCK *fd = jcr->file_bsock;
265
266 Dmsg1(120, "Append data: %s", fd->msg);
267 if (jcr->session_opened) {
268 Dmsg1(110, "<bfiled: %s", fd->msg);
269 jcr->setJobType(JT_BACKUP);
270 jcr->errmsg[0] = 0;
271 if (do_append_data(jcr)) {
272 return true;
273 } else {
274 fd->suppress_error_messages(true); /* ignore errors at this point */
275 fd->fsend(ERROR_append, jcr->errmsg);
276 }
277 } else {
278 pm_strcpy(jcr->errmsg, _("Attempt to append on non-open session.\n"));
279 fd->fsend(NOT_opened);
280 }
281 return false;
282 }
283
append_end_session(JCR * jcr)284 static bool append_end_session(JCR *jcr)
285 {
286 BSOCK *fd = jcr->file_bsock;
287
288 Dmsg1(120, "store<file: %s", fd->msg);
289 if (!jcr->session_opened) {
290 pm_strcpy(jcr->errmsg, _("Attempt to close non-open session.\n"));
291 fd->fsend(NOT_opened);
292 return false;
293 }
294 return fd->fsend(OK_end);
295 }
296
297 /*
298 * Test the FD/SD connectivity
299 */
sd_testnetwork_cmd(JCR * jcr)300 static bool sd_testnetwork_cmd(JCR *jcr)
301 {
302 BSOCK *fd = jcr->file_bsock;
303 int64_t nb=0, nbrtt=0, rtt=0, bandwidth=0;
304 int32_t ok=1;
305 bool can_compress;
306 btime_t start, end;
307
308 if (scan_string(fd->msg, "testnetwork bytes=%lld rtt=%lld bw=%lld", &nb, &nbrtt, &bandwidth) != 3) {
309 if (scan_string(fd->msg, "testnetwork bytes=%lld", &nb) != 1) {
310 Dmsg1(0, "Invalid command %s\n", fd->msg);
311 return false;
312 }
313 }
314
315 /* We disable the comline compression for this test */
316 can_compress = fd->can_compress();
317 fd->clear_compress();
318
319 if (nb > 0) {
320 /* First, get data from the FD */
321 while (fd->recv() > 0) { }
322
323 /* Then, send back data to the FD */
324 memset(fd->msg, 0xBB, sizeof_pool_memory(fd->msg));
325 fd->msglen = sizeof_pool_memory(fd->msg);
326
327 while(nb > 0 && ok > 0) {
328 if (nb < fd->msglen) {
329 fd->msglen = nb;
330 }
331 ok = fd->send();
332 nb -= fd->msglen;
333 }
334 fd->signal(BNET_EOD);
335 }
336
337 /* Compute RTT */
338 ok = 1;
339 if (nbrtt > 0) {
340 while (ok > 0) {
341 start = get_current_btime();
342 ok = fd->recv();
343 if (ok > 0) {
344 ok = fd->send();
345 end = get_current_btime() + 1;
346 rtt += end - start;
347 }
348 }
349
350 if (nbrtt) {
351 fd->set_bandwidth(bandwidth);
352 fd->set_rtt(rtt / nbrtt);
353 }
354 }
355
356 if (can_compress) {
357 fd->set_compress();
358 }
359 return true;
360 }
361
362 /*
363 * Append Open session command
364 *
365 */
append_open_session(JCR * jcr)366 static bool append_open_session(JCR *jcr)
367 {
368 BSOCK *fd = jcr->file_bsock;
369
370 Dmsg1(120, "Append open session: %s", fd->msg);
371 if (jcr->session_opened) {
372 pm_strcpy(jcr->errmsg, _("Attempt to open already open session.\n"));
373 fd->fsend(NO_open);
374 return false;
375 }
376
377 jcr->session_opened = true;
378
379 /* Send "Ticket" to File Daemon */
380 fd->fsend(OK_open, jcr->VolSessionId);
381 Dmsg1(110, ">filed: %s", fd->msg);
382
383 return true;
384 }
385
386 /*
387 * Append Close session command
388 * Close the append session and send back Statistics
389 * (need to fix statistics)
390 */
append_close_session(JCR * jcr)391 static bool append_close_session(JCR *jcr)
392 {
393 BSOCK *fd = jcr->file_bsock;
394
395 Dmsg1(120, "<filed: %s", fd->msg);
396 if (!jcr->session_opened) {
397 pm_strcpy(jcr->errmsg, _("Attempt to close non-open session.\n"));
398 fd->fsend(NOT_opened);
399 return false;
400 }
401 /* Send final statistics to File daemon */
402 fd->fsend(OK_close, jcr->JobStatus);
403 Dmsg1(120, ">filed: %s", fd->msg);
404
405 fd->signal(BNET_EOD); /* send EOD to File daemon */
406
407 jcr->session_opened = false;
408 return true;
409 }
410
411 /*
412 * Read Data command
413 * Open Data Channel, read the data from
414 * the archive device and send to File
415 * daemon.
416 */
read_data_cmd(JCR * jcr)417 static bool read_data_cmd(JCR *jcr)
418 {
419 BSOCK *fd = jcr->file_bsock;
420
421 Dmsg1(120, "Read data: %s", fd->msg);
422 if (jcr->session_opened) {
423 Dmsg1(120, "<bfiled: %s", fd->msg);
424 return do_read_data(jcr);
425 } else {
426 pm_strcpy(jcr->errmsg, _("Attempt to read on non-open session.\n"));
427 fd->fsend(NOT_opened);
428 return false;
429 }
430 }
431
432 /*
433 * Read Open session command
434 *
435 * We need to scan for the parameters of the job
436 * to be restored.
437 */
read_open_session(JCR * jcr)438 static bool read_open_session(JCR *jcr)
439 {
440 BSOCK *fd = jcr->file_bsock;
441
442 Dmsg1(120, "%s", fd->msg);
443 if (jcr->session_opened) {
444 pm_strcpy(jcr->errmsg, _("Attempt to open an already open session.\n"));
445 fd->fsend(NO_open);
446 return false;
447 }
448
449 if (sscanf(fd->msg, read_open, jcr->read_dcr->VolumeName, &jcr->read_VolSessionId,
450 &jcr->read_VolSessionTime, &jcr->read_StartFile, &jcr->read_EndFile,
451 &jcr->read_StartBlock, &jcr->read_EndBlock) == 7) {
452 Dmsg4(100, "read_open_session got: JobId=%d Vol=%s VolSessId=%ld VolSessT=%ld\n",
453 jcr->JobId, jcr->read_dcr->VolumeName, jcr->read_VolSessionId,
454 jcr->read_VolSessionTime);
455 Dmsg4(100, " StartF=%ld EndF=%ld StartB=%ld EndB=%ld\n",
456 jcr->read_StartFile, jcr->read_EndFile, jcr->read_StartBlock,
457 jcr->read_EndBlock);
458
459 } else {
460 pm_strcpy(jcr->errmsg, _("Cannot open session, received bad parameters.\n"));
461 fd->fsend(ERROR_open);
462 return false;
463 }
464
465 jcr->session_opened = true;
466 jcr->setJobType(JT_RESTORE);
467
468 /* Send "Ticket" to File Daemon */
469 fd->fsend(OK_open, jcr->VolSessionId);
470 Dmsg1(110, ">filed: %s", fd->msg);
471
472 return true;
473 }
474
read_control_cmd(JCR * jcr)475 static bool read_control_cmd(JCR *jcr)
476 {
477 BSOCK *fd = jcr->file_bsock;
478
479 Dmsg1(120, "Read control: %s\n", fd->msg);
480 if (!jcr->session_opened) {
481 fd->fsend(NOT_opened);
482 return false;
483 }
484 jcr->interactive_session = true;
485 return true;
486 }
487
488 /*
489 * Read Close session command
490 * Close the read session
491 */
read_close_session(JCR * jcr)492 static bool read_close_session(JCR *jcr)
493 {
494 BSOCK *fd = jcr->file_bsock;
495
496 Dmsg1(120, "Read close session: %s\n", fd->msg);
497 if (!jcr->session_opened) {
498 fd->fsend(NOT_opened);
499 return false;
500 }
501 /* Send final close msg to File daemon */
502 fd->fsend(OK_close, jcr->JobStatus);
503 Dmsg1(160, ">filed: %s\n", fd->msg);
504
505 fd->signal(BNET_EOD); /* send EOD to File daemon */
506
507 jcr->session_opened = false;
508 return true;
509 }
510
511 /*
512 * Get response from FD or SD
513 * sent. Check that the response agrees with what we expect.
514 *
515 * Returns: false on failure
516 * true on success
517 */
response(JCR * jcr,BSOCK * bs,const char * resp,const char * cmd)518 static bool response(JCR *jcr, BSOCK *bs, const char *resp, const char *cmd)
519 {
520 int n;
521
522 if (bs->is_error()) {
523 return false;
524 }
525 if ((n = bs->recv()) >= 0) {
526 if (strcmp(bs->msg, resp) == 0) {
527 return true;
528 }
529 Jmsg(jcr, M_FATAL, 0, _("Bad response to %s command: wanted %s, got %s\n"),
530 cmd, resp, bs->msg);
531 return false;
532 }
533 Jmsg(jcr, M_FATAL, 0, _("Socket error on %s command: ERR=%s\n"),
534 cmd, bs->bstrerror());
535 return false;
536 }
537