1 /*
2 BAREOS® - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5 Copyright (C) 2011-2016 Planets Communications B.V.
6 Copyright (C) 2013-2018 Bareos GmbH & Co. KG
7
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
11 in the file LICENSE.
12
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Affero General Public License for more details.
17
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 02110-1301, USA.
22 */
23 /*
24 * Kern Sibbald, August MM
25 */
26
27 /**
28 * @file
29 *
30 * handles the message channel to the Storage daemon and the File daemon.
31 *
32 * * Basic tasks done here:
33 * * Open a message channel with the Storage daemon
34 * to authenticate ourself and to pass the JobId.
35 * * Create a thread to interact with the Storage daemon
36 * who returns a job status and requests Catalog services, etc.
37 */
38 #include "include/bareos.h"
39 #include "dird.h"
40 #include "dird/getmsg.h"
41 #include "dird/job.h"
42 #include "dird/jcr_private.h"
43 #include "dird/msgchan.h"
44 #include "dird/quota.h"
45 #include "dird/sd_cmds.h"
46 #include "lib/berrno.h"
47 #include "lib/bnet.h"
48 #include "lib/edit.h"
49 #include "lib/util.h"
50 #include "lib/thread_specific_data.h"
51 #include "lib/watchdog.h"
52
53 namespace directordaemon {
54
55 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
56
57 /* Commands sent to Storage daemon */
58 static char jobcmd[] =
59 "JobId=%s job=%s job_name=%s client_name=%s "
60 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
61 "SpoolData=%d PreferMountedVols=%d SpoolSize=%s "
62 "rerunning=%d VolSessionId=%d VolSessionTime=%d Quota=%llu "
63 "Protocol=%d BackupFormat=%s\n";
64 static char use_storage[] =
65 "use storage=%s media_type=%s pool_name=%s "
66 "pool_type=%s append=%d copy=%d stripe=%d\n";
67 static char use_device[] = "use device=%s\n";
68 // static char query_device[] =
69 // "query device=%s";
70
71 /* Response from Storage daemon */
72 static char OKbootstrap[] = "3000 OK bootstrap\n";
73 static char OK_job[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
74 static char OK_nextrun[] = "3000 OK Job Authorization=%100s\n";
75 static char OK_device[] = "3000 OK use device device=%s\n";
76
77 /* Storage Daemon requests */
78 static char Job_start[] = "3010 Job %127s start\n";
79 static char Job_end[] =
80 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
81
82 /* Forward referenced functions */
83 extern "C" void* msg_thread(void* arg);
84
85 /** Send bootstrap file to Storage daemon.
86 * This is used for
87 * * restore
88 * * verify
89 * * VolumeToCatalog
90 * * migration and
91 * * copy Jobs
92 */
SendBootstrapFileToSd(JobControlRecord * jcr,BareosSocket * sd)93 static inline bool SendBootstrapFileToSd(JobControlRecord* jcr,
94 BareosSocket* sd)
95 {
96 FILE* bs;
97 char buf[1000];
98 const char* bootstrap = "bootstrap\n";
99
100 Dmsg1(400, "SendBootstrapFileToSd: %s\n", jcr->RestoreBootstrap);
101 if (!jcr->RestoreBootstrap) { return true; }
102 bs = fopen(jcr->RestoreBootstrap, "rb");
103 if (!bs) {
104 BErrNo be;
105 Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
106 jcr->RestoreBootstrap, be.bstrerror());
107 jcr->setJobStatus(JS_ErrorTerminated);
108 return false;
109 }
110 sd->fsend(bootstrap);
111 while (fgets(buf, sizeof(buf), bs)) { sd->fsend("%s", buf); }
112 sd->signal(BNET_EOD);
113 fclose(bs);
114 if (jcr->impl->unlink_bsr) {
115 SecureErase(jcr, jcr->RestoreBootstrap);
116 jcr->impl->unlink_bsr = false;
117 }
118 return true;
119 }
120
121 /** Start a job with the Storage daemon
122 */
StartStorageDaemonJob(JobControlRecord * jcr,alist * read_storage,alist * write_storage,bool send_bsr)123 bool StartStorageDaemonJob(JobControlRecord* jcr,
124 alist* read_storage,
125 alist* write_storage,
126 bool send_bsr)
127 {
128 bool ok = true;
129 StorageResource* storage = nullptr;
130 char auth_key[100];
131 const char* fileset_md5;
132 PoolMem StoreName, device_name, pool_name, pool_type, media_type,
133 backup_format;
134 PoolMem job_name, client_name, fileset_name;
135 int copy = 0;
136 int stripe = 0;
137 uint64_t remainingquota = 0;
138 char ed1[30], ed2[30];
139 BareosSocket* sd = jcr->store_bsock;
140
141 /*
142 * Before actually starting a new Job on the SD make sure we send any specific
143 * plugin options for this Job.
144 */
145 if (!SendStoragePluginOptions(jcr)) {
146 Jmsg(jcr, M_FATAL, 0,
147 _("Storage daemon rejected Plugin Options command: %s\n"), sd->msg);
148 return false;
149 }
150
151 /*
152 * Now send JobId and permissions, and get back the authorization key.
153 */
154 PmStrcpy(job_name, jcr->impl->res.job->resource_name_);
155 BashSpaces(job_name);
156
157 if (jcr->impl->res.client) {
158 PmStrcpy(client_name, jcr->impl->res.client->resource_name_);
159 } else {
160 PmStrcpy(client_name, "**None**");
161 }
162 BashSpaces(client_name);
163
164 if (jcr->impl->res.fileset) {
165 PmStrcpy(fileset_name, jcr->impl->res.fileset->resource_name_);
166 } else {
167 PmStrcpy(fileset_name, "**None**");
168 }
169 BashSpaces(fileset_name);
170
171 PmStrcpy(backup_format, jcr->impl->backup_format);
172 BashSpaces(backup_format);
173
174 if (jcr->impl->res.fileset && jcr->impl->res.fileset->MD5[0] == 0) {
175 bstrncpy(jcr->impl->res.fileset->MD5, "**Dummy**",
176 sizeof(jcr->impl->res.fileset->MD5));
177 fileset_md5 = jcr->impl->res.fileset->MD5;
178 } else if (jcr->impl->res.fileset) {
179 fileset_md5 = jcr->impl->res.fileset->MD5;
180 } else {
181 fileset_md5 = "**Dummy**";
182 }
183
184 /*
185 * If rescheduling, cancel the previous incarnation of this job
186 * with the SD, which might be waiting on the FD connection.
187 * If we do not cancel it the SD will not accept a new connection
188 * for the same jobid.
189 */
190 if (jcr->impl->reschedule_count) {
191 sd->fsend("cancel Job=%s\n", jcr->Job);
192 while (sd->recv() >= 0) { continue; }
193 }
194
195 /*
196 * Retrieve available quota 0 bytes means dont perform the check
197 */
198 remainingquota = FetchRemainingQuotas(jcr);
199 Dmsg1(50, "Remainingquota: %llu\n", remainingquota);
200
201 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, job_name.c_str(),
202 client_name.c_str(), jcr->getJobType(), jcr->getJobLevel(),
203 fileset_name.c_str(), !jcr->impl->res.pool->catalog_files,
204 jcr->impl->res.job->SpoolAttributes, fileset_md5,
205 jcr->impl->spool_data, jcr->impl->res.job->PreferMountedVolumes,
206 edit_int64(jcr->impl->spool_size, ed2), jcr->rerunning,
207 jcr->VolSessionId, jcr->VolSessionTime, remainingquota,
208 jcr->getJobProtocol(), backup_format.c_str());
209
210 Dmsg1(100, ">stored: %s", sd->msg);
211 if (BgetDirmsg(sd) > 0) {
212 Dmsg1(100, "<stored: %s", sd->msg);
213 if (sscanf(sd->msg, OK_job, &jcr->VolSessionId, &jcr->VolSessionTime,
214 &auth_key) != 3) {
215 Dmsg1(100, "BadJob=%s\n", sd->msg);
216 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"),
217 sd->msg);
218 return false;
219 } else {
220 BfreeAndNull(jcr->sd_auth_key);
221 jcr->sd_auth_key = strdup(auth_key);
222 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
223 }
224 } else {
225 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
226 sd->bstrerror());
227 return false;
228 }
229
230 if (send_bsr &&
231 (!SendBootstrapFileToSd(jcr, sd) ||
232 !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
233 return false;
234 }
235
236 /*
237 * request sd to reply the secure erase cmd
238 * or "*None*" if not set
239 */
240 if (!SendSecureEraseReqToSd(jcr)) {
241 Dmsg1(400, "Unexpected %s Secure Erase Reply\n", "SD");
242 }
243
244 /*
245 * We have two loops here. The first comes from the
246 * Storage = associated with the Job, and we need
247 * to attach to each one.
248 * The inner loop loops over all the alternative devices
249 * associated with each Storage. It selects the first
250 * available one.
251 *
252 */
253 /* Do read side of storage daemon */
254 if (ok && read_storage) {
255 /* For the moment, only migrate, copy and vbackup have rpool */
256 if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
257 (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
258 PmStrcpy(pool_type, jcr->impl->res.rpool->pool_type);
259 PmStrcpy(pool_name, jcr->impl->res.rpool->resource_name_);
260 } else {
261 PmStrcpy(pool_type, jcr->impl->res.pool->pool_type);
262 PmStrcpy(pool_name, jcr->impl->res.pool->resource_name_);
263 }
264 BashSpaces(pool_type);
265 BashSpaces(pool_name);
266 foreach_alist (storage, read_storage) {
267 Dmsg1(100, "Rstore=%s\n", storage->resource_name_);
268 PmStrcpy(StoreName, storage->resource_name_);
269 BashSpaces(StoreName);
270 PmStrcpy(media_type, storage->media_type);
271 BashSpaces(media_type);
272 sd->fsend(use_storage, StoreName.c_str(), media_type.c_str(),
273 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
274 Dmsg1(100, "read_storage >stored: %s", sd->msg);
275 DeviceResource* dev = nullptr;
276 /* Loop over alternative storage Devices until one is OK */
277 foreach_alist (dev, storage->device) {
278 PmStrcpy(device_name, dev->resource_name_);
279 BashSpaces(device_name);
280 sd->fsend(use_device, device_name.c_str());
281 Dmsg1(100, ">stored: %s", sd->msg);
282 }
283 sd->signal(BNET_EOD); /* end of Devices */
284 }
285 sd->signal(BNET_EOD); /* end of Storages */
286 if (BgetDirmsg(sd) > 0) {
287 Dmsg1(100, "<stored: %s", sd->msg);
288 /* ****FIXME**** save actual device name */
289 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
290 } else {
291 ok = false;
292 }
293 if (ok) {
294 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"),
295 device_name.c_str());
296 }
297 }
298
299 /* Do write side of storage daemon */
300 if (ok && write_storage) {
301 PmStrcpy(pool_type, jcr->impl->res.pool->pool_type);
302 PmStrcpy(pool_name, jcr->impl->res.pool->resource_name_);
303 BashSpaces(pool_type);
304 BashSpaces(pool_name);
305 foreach_alist (storage, write_storage) {
306 PmStrcpy(StoreName, storage->resource_name_);
307 BashSpaces(StoreName);
308 PmStrcpy(media_type, storage->media_type);
309 BashSpaces(media_type);
310 sd->fsend(use_storage, StoreName.c_str(), media_type.c_str(),
311 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
312
313 Dmsg1(100, "write_storage >stored: %s", sd->msg);
314 DeviceResource* dev = nullptr;
315 /* Loop over alternative storage Devices until one is OK */
316 foreach_alist (dev, storage->device) {
317 PmStrcpy(device_name, dev->resource_name_);
318 BashSpaces(device_name);
319 sd->fsend(use_device, device_name.c_str());
320 Dmsg1(100, ">stored: %s", sd->msg);
321 }
322 sd->signal(BNET_EOD); /* end of Devices */
323 }
324 sd->signal(BNET_EOD); /* end of Storages */
325 if (BgetDirmsg(sd) > 0) {
326 Dmsg1(100, "<stored: %s", sd->msg);
327 /* ****FIXME**** save actual device name */
328 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
329 } else {
330 ok = false;
331 }
332 if (ok) {
333 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"),
334 device_name.c_str());
335 }
336 }
337 if (!ok) {
338 PoolMem err_msg;
339 if (sd->msg[0]) {
340 PmStrcpy(err_msg, sd->msg); /* save message */
341 Jmsg(jcr, M_FATAL, 0,
342 _("\n"
343 " Storage daemon didn't accept Device \"%s\" because:\n "
344 "%s"),
345 device_name.c_str(), err_msg.c_str() /* sd->msg */);
346 } else {
347 Jmsg(jcr, M_FATAL, 0,
348 _("\n"
349 " Storage daemon didn't accept Device \"%s\" command.\n"),
350 device_name.c_str());
351 }
352 }
353 return ok;
354 }
355
356 /** Start a thread to handle Storage daemon messages and
357 * Catalog requests.
358 */
StartStorageDaemonMessageThread(JobControlRecord * jcr)359 bool StartStorageDaemonMessageThread(JobControlRecord* jcr)
360 {
361 int status;
362 pthread_t thid;
363
364 jcr->IncUseCount(); /* mark in use by msg thread */
365 jcr->impl->sd_msg_thread_done = false;
366 jcr->impl->SD_msg_chan_started = false;
367 Dmsg0(100, "Start SD msg_thread.\n");
368 if ((status = pthread_create(&thid, NULL, msg_thread, (void*)jcr)) != 0) {
369 BErrNo be;
370 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"),
371 be.bstrerror(status));
372 }
373 /* Wait for thread to start */
374 while (!jcr->impl->SD_msg_chan_started) {
375 Bmicrosleep(0, 50);
376 if (JobCanceled(jcr) || jcr->impl->sd_msg_thread_done) { return false; }
377 }
378 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->UseCount());
379 return true;
380 }
381
MsgThreadCleanup(void * arg)382 extern "C" void MsgThreadCleanup(void* arg)
383 {
384 JobControlRecord* jcr = (JobControlRecord*)arg;
385
386 jcr->db->EndTransaction(jcr); /* Terminate any open transaction */
387 jcr->lock();
388 jcr->impl->sd_msg_thread_done = true;
389 jcr->impl->SD_msg_chan_started = false;
390 jcr->unlock();
391 pthread_cond_broadcast(
392 &jcr->impl->nextrun_ready); /* wakeup any waiting threads */
393 pthread_cond_broadcast(
394 &jcr->impl->term_wait); /* wakeup any waiting threads */
395 Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId,
396 jcr->UseCount());
397 jcr->db->ThreadCleanup(); /* remove thread specific data */
398 FreeJcr(jcr); /* release jcr */
399 }
400
401 /** Handle the message channel (i.e. requests from the
402 * Storage daemon).
403 * Note, we are running in a separate thread.
404 */
msg_thread(void * arg)405 extern "C" void* msg_thread(void* arg)
406 {
407 JobControlRecord* jcr = (JobControlRecord*)arg;
408 BareosSocket* sd;
409 int JobStatus;
410 int n;
411 char auth_key[100];
412 char Job[MAX_NAME_LENGTH];
413 uint32_t JobFiles, JobErrors;
414 uint64_t JobBytes;
415
416 pthread_detach(pthread_self());
417 SetJcrInThreadSpecificData(jcr);
418 jcr->impl->SD_msg_chan = pthread_self();
419 jcr->impl->SD_msg_chan_started = true;
420 pthread_cleanup_push(MsgThreadCleanup, arg);
421 sd = jcr->store_bsock;
422
423 /*
424 * Read the Storage daemon's output.
425 */
426 Dmsg0(100, "Start msg_thread loop\n");
427 n = 0;
428 while (!JobCanceled(jcr) && (n = BgetDirmsg(sd)) >= 0) {
429 Dmsg1(400, "<stored: %s", sd->msg);
430 /*
431 * Check for "3000 OK Job Authorization="
432 * Returned by a rerun cmd.
433 */
434 if (sscanf(sd->msg, OK_nextrun, &auth_key) == 1) {
435 if (jcr->sd_auth_key) { free(jcr->sd_auth_key); }
436 jcr->sd_auth_key = strdup(auth_key);
437 pthread_cond_broadcast(
438 &jcr->impl->nextrun_ready); /* wakeup any waiting threads */
439 continue;
440 }
441
442 /*
443 * Check for "3010 Job <jobid> start"
444 */
445 if (sscanf(sd->msg, Job_start, Job) == 1) { continue; }
446
447 /*
448 * Check for "3099 Job <JobId> end JobStatus= JobFiles= JobBytes=
449 * JobErrors="
450 */
451 if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles, &JobBytes,
452 &JobErrors) == 5) {
453 jcr->impl->SDJobStatus = JobStatus; /* termination status */
454 jcr->impl->SDJobFiles = JobFiles;
455 jcr->impl->SDJobBytes = JobBytes;
456 jcr->impl->SDErrors = JobErrors;
457 break;
458 }
459 Dmsg1(400, "end loop use=%d\n", jcr->UseCount());
460 }
461 if (n == BNET_HARDEOF) {
462 /*
463 * A lost connection to the storage daemon is FATAL.
464 * This is required, as otherwise
465 * the job could failed to write data
466 * but still end as JS_Warnings (OK -- with warnings).
467 */
468 Qmsg(jcr, M_FATAL, 0, _("Director's comm line to SD dropped.\n"));
469 }
470 if (IsBnetError(sd)) { jcr->impl->SDJobStatus = JS_ErrorTerminated; }
471 pthread_cleanup_pop(1); /* remove and execute the handler */
472 return NULL;
473 }
474
WaitForStorageDaemonTermination(JobControlRecord * jcr)475 void WaitForStorageDaemonTermination(JobControlRecord* jcr)
476 {
477 int cancel_count = 0;
478 /* Now wait for Storage daemon to Terminate our message thread */
479 while (!jcr->impl->sd_msg_thread_done) {
480 struct timeval tv;
481 struct timezone tz;
482 struct timespec timeout;
483
484 gettimeofday(&tv, &tz);
485 timeout.tv_nsec = 0;
486 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
487 Dmsg0(400, "I'm waiting for message thread termination.\n");
488 P(mutex);
489 pthread_cond_timedwait(&jcr->impl->term_wait, &mutex, &timeout);
490 V(mutex);
491 if (jcr->IsCanceled()) {
492 if (jcr->impl->SD_msg_chan_started) {
493 jcr->store_bsock->SetTimedOut();
494 jcr->store_bsock->SetTerminated();
495 SdMsgThreadSendSignal(jcr, TIMEOUT_SIGNAL);
496 }
497 cancel_count++;
498 }
499 /* Give SD 30 seconds to clean up after cancel */
500 if (cancel_count == 6) { break; }
501 }
502 jcr->setJobStatus(JS_Terminated);
503 }
504
505 } /* namespace directordaemon */
506