1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5    Copyright (C) 2011-2016 Planets Communications B.V.
6    Copyright (C) 2013-2018 Bareos GmbH & Co. KG
7 
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12 
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    Affero General Public License for more details.
17 
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22 */
23 /*
24  * Kern Sibbald, August MM
25  */
26 
27 /**
28  * @file
29  *
30  * handles the message channel to the Storage daemon and the File daemon.
31  *
32  * * Basic tasks done here:
33  *    * Open a message channel with the Storage daemon
34  *      to authenticate ourself and to pass the JobId.
35  *    * Create a thread to interact with the Storage daemon
36  *      who returns a job status and requests Catalog services, etc.
37  */
38 #include "include/bareos.h"
39 #include "dird.h"
40 #include "dird/getmsg.h"
41 #include "dird/job.h"
42 #include "dird/jcr_private.h"
43 #include "dird/msgchan.h"
44 #include "dird/quota.h"
45 #include "dird/sd_cmds.h"
46 #include "lib/berrno.h"
47 #include "lib/bnet.h"
48 #include "lib/edit.h"
49 #include "lib/util.h"
50 #include "lib/thread_specific_data.h"
51 #include "lib/watchdog.h"
52 
53 namespace directordaemon {
54 
55 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
56 
57 /* Commands sent to Storage daemon */
58 static char jobcmd[] =
59     "JobId=%s job=%s job_name=%s client_name=%s "
60     "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
61     "SpoolData=%d PreferMountedVols=%d SpoolSize=%s "
62     "rerunning=%d VolSessionId=%d VolSessionTime=%d Quota=%llu "
63     "Protocol=%d BackupFormat=%s\n";
64 static char use_storage[] =
65     "use storage=%s media_type=%s pool_name=%s "
66     "pool_type=%s append=%d copy=%d stripe=%d\n";
67 static char use_device[] = "use device=%s\n";
68 // static char query_device[] =
69 //   "query device=%s";
70 
71 /* Response from Storage daemon */
72 static char OKbootstrap[] = "3000 OK bootstrap\n";
73 static char OK_job[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
74 static char OK_nextrun[] = "3000 OK Job Authorization=%100s\n";
75 static char OK_device[] = "3000 OK use device device=%s\n";
76 
77 /* Storage Daemon requests */
78 static char Job_start[] = "3010 Job %127s start\n";
79 static char Job_end[] =
80     "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
81 
82 /* Forward referenced functions */
83 extern "C" void* msg_thread(void* arg);
84 
85 /** Send bootstrap file to Storage daemon.
86  *  This is used for
87  *    * restore
88  *    * verify
89  *    * VolumeToCatalog
90  *    * migration and
91  *    * copy Jobs
92  */
SendBootstrapFileToSd(JobControlRecord * jcr,BareosSocket * sd)93 static inline bool SendBootstrapFileToSd(JobControlRecord* jcr,
94                                          BareosSocket* sd)
95 {
96   FILE* bs;
97   char buf[1000];
98   const char* bootstrap = "bootstrap\n";
99 
100   Dmsg1(400, "SendBootstrapFileToSd: %s\n", jcr->RestoreBootstrap);
101   if (!jcr->RestoreBootstrap) { return true; }
102   bs = fopen(jcr->RestoreBootstrap, "rb");
103   if (!bs) {
104     BErrNo be;
105     Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
106          jcr->RestoreBootstrap, be.bstrerror());
107     jcr->setJobStatus(JS_ErrorTerminated);
108     return false;
109   }
110   sd->fsend(bootstrap);
111   while (fgets(buf, sizeof(buf), bs)) { sd->fsend("%s", buf); }
112   sd->signal(BNET_EOD);
113   fclose(bs);
114   if (jcr->impl->unlink_bsr) {
115     SecureErase(jcr, jcr->RestoreBootstrap);
116     jcr->impl->unlink_bsr = false;
117   }
118   return true;
119 }
120 
121 /** Start a job with the Storage daemon
122  */
StartStorageDaemonJob(JobControlRecord * jcr,alist * read_storage,alist * write_storage,bool send_bsr)123 bool StartStorageDaemonJob(JobControlRecord* jcr,
124                            alist* read_storage,
125                            alist* write_storage,
126                            bool send_bsr)
127 {
128   bool ok = true;
129   StorageResource* storage = nullptr;
130   char auth_key[100];
131   const char* fileset_md5;
132   PoolMem StoreName, device_name, pool_name, pool_type, media_type,
133       backup_format;
134   PoolMem job_name, client_name, fileset_name;
135   int copy = 0;
136   int stripe = 0;
137   uint64_t remainingquota = 0;
138   char ed1[30], ed2[30];
139   BareosSocket* sd = jcr->store_bsock;
140 
141   /*
142    * Before actually starting a new Job on the SD make sure we send any specific
143    * plugin options for this Job.
144    */
145   if (!SendStoragePluginOptions(jcr)) {
146     Jmsg(jcr, M_FATAL, 0,
147          _("Storage daemon rejected Plugin Options command: %s\n"), sd->msg);
148     return false;
149   }
150 
151   /*
152    * Now send JobId and permissions, and get back the authorization key.
153    */
154   PmStrcpy(job_name, jcr->impl->res.job->resource_name_);
155   BashSpaces(job_name);
156 
157   if (jcr->impl->res.client) {
158     PmStrcpy(client_name, jcr->impl->res.client->resource_name_);
159   } else {
160     PmStrcpy(client_name, "**None**");
161   }
162   BashSpaces(client_name);
163 
164   if (jcr->impl->res.fileset) {
165     PmStrcpy(fileset_name, jcr->impl->res.fileset->resource_name_);
166   } else {
167     PmStrcpy(fileset_name, "**None**");
168   }
169   BashSpaces(fileset_name);
170 
171   PmStrcpy(backup_format, jcr->impl->backup_format);
172   BashSpaces(backup_format);
173 
174   if (jcr->impl->res.fileset && jcr->impl->res.fileset->MD5[0] == 0) {
175     bstrncpy(jcr->impl->res.fileset->MD5, "**Dummy**",
176              sizeof(jcr->impl->res.fileset->MD5));
177     fileset_md5 = jcr->impl->res.fileset->MD5;
178   } else if (jcr->impl->res.fileset) {
179     fileset_md5 = jcr->impl->res.fileset->MD5;
180   } else {
181     fileset_md5 = "**Dummy**";
182   }
183 
184   /*
185    * If rescheduling, cancel the previous incarnation of this job
186    * with the SD, which might be waiting on the FD connection.
187    * If we do not cancel it the SD will not accept a new connection
188    * for the same jobid.
189    */
190   if (jcr->impl->reschedule_count) {
191     sd->fsend("cancel Job=%s\n", jcr->Job);
192     while (sd->recv() >= 0) { continue; }
193   }
194 
195   /*
196    * Retrieve available quota 0 bytes means dont perform the check
197    */
198   remainingquota = FetchRemainingQuotas(jcr);
199   Dmsg1(50, "Remainingquota: %llu\n", remainingquota);
200 
201   sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, job_name.c_str(),
202             client_name.c_str(), jcr->getJobType(), jcr->getJobLevel(),
203             fileset_name.c_str(), !jcr->impl->res.pool->catalog_files,
204             jcr->impl->res.job->SpoolAttributes, fileset_md5,
205             jcr->impl->spool_data, jcr->impl->res.job->PreferMountedVolumes,
206             edit_int64(jcr->impl->spool_size, ed2), jcr->rerunning,
207             jcr->VolSessionId, jcr->VolSessionTime, remainingquota,
208             jcr->getJobProtocol(), backup_format.c_str());
209 
210   Dmsg1(100, ">stored: %s", sd->msg);
211   if (BgetDirmsg(sd) > 0) {
212     Dmsg1(100, "<stored: %s", sd->msg);
213     if (sscanf(sd->msg, OK_job, &jcr->VolSessionId, &jcr->VolSessionTime,
214                &auth_key) != 3) {
215       Dmsg1(100, "BadJob=%s\n", sd->msg);
216       Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"),
217            sd->msg);
218       return false;
219     } else {
220       BfreeAndNull(jcr->sd_auth_key);
221       jcr->sd_auth_key = strdup(auth_key);
222       Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
223     }
224   } else {
225     Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
226          sd->bstrerror());
227     return false;
228   }
229 
230   if (send_bsr &&
231       (!SendBootstrapFileToSd(jcr, sd) ||
232        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
233     return false;
234   }
235 
236   /*
237    * request sd to reply the secure erase cmd
238    * or "*None*" if not set
239    */
240   if (!SendSecureEraseReqToSd(jcr)) {
241     Dmsg1(400, "Unexpected %s Secure Erase Reply\n", "SD");
242   }
243 
244   /*
245    * We have two loops here. The first comes from the
246    *  Storage = associated with the Job, and we need
247    *  to attach to each one.
248    * The inner loop loops over all the alternative devices
249    *  associated with each Storage. It selects the first
250    *  available one.
251    *
252    */
253   /* Do read side of storage daemon */
254   if (ok && read_storage) {
255     /* For the moment, only migrate, copy and vbackup have rpool */
256     if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
257         (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
258       PmStrcpy(pool_type, jcr->impl->res.rpool->pool_type);
259       PmStrcpy(pool_name, jcr->impl->res.rpool->resource_name_);
260     } else {
261       PmStrcpy(pool_type, jcr->impl->res.pool->pool_type);
262       PmStrcpy(pool_name, jcr->impl->res.pool->resource_name_);
263     }
264     BashSpaces(pool_type);
265     BashSpaces(pool_name);
266     foreach_alist (storage, read_storage) {
267       Dmsg1(100, "Rstore=%s\n", storage->resource_name_);
268       PmStrcpy(StoreName, storage->resource_name_);
269       BashSpaces(StoreName);
270       PmStrcpy(media_type, storage->media_type);
271       BashSpaces(media_type);
272       sd->fsend(use_storage, StoreName.c_str(), media_type.c_str(),
273                 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
274       Dmsg1(100, "read_storage >stored: %s", sd->msg);
275       DeviceResource* dev = nullptr;
276       /* Loop over alternative storage Devices until one is OK */
277       foreach_alist (dev, storage->device) {
278         PmStrcpy(device_name, dev->resource_name_);
279         BashSpaces(device_name);
280         sd->fsend(use_device, device_name.c_str());
281         Dmsg1(100, ">stored: %s", sd->msg);
282       }
283       sd->signal(BNET_EOD); /* end of Devices */
284     }
285     sd->signal(BNET_EOD); /* end of Storages */
286     if (BgetDirmsg(sd) > 0) {
287       Dmsg1(100, "<stored: %s", sd->msg);
288       /* ****FIXME**** save actual device name */
289       ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
290     } else {
291       ok = false;
292     }
293     if (ok) {
294       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"),
295            device_name.c_str());
296     }
297   }
298 
299   /* Do write side of storage daemon */
300   if (ok && write_storage) {
301     PmStrcpy(pool_type, jcr->impl->res.pool->pool_type);
302     PmStrcpy(pool_name, jcr->impl->res.pool->resource_name_);
303     BashSpaces(pool_type);
304     BashSpaces(pool_name);
305     foreach_alist (storage, write_storage) {
306       PmStrcpy(StoreName, storage->resource_name_);
307       BashSpaces(StoreName);
308       PmStrcpy(media_type, storage->media_type);
309       BashSpaces(media_type);
310       sd->fsend(use_storage, StoreName.c_str(), media_type.c_str(),
311                 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
312 
313       Dmsg1(100, "write_storage >stored: %s", sd->msg);
314       DeviceResource* dev = nullptr;
315       /* Loop over alternative storage Devices until one is OK */
316       foreach_alist (dev, storage->device) {
317         PmStrcpy(device_name, dev->resource_name_);
318         BashSpaces(device_name);
319         sd->fsend(use_device, device_name.c_str());
320         Dmsg1(100, ">stored: %s", sd->msg);
321       }
322       sd->signal(BNET_EOD); /* end of Devices */
323     }
324     sd->signal(BNET_EOD); /* end of Storages */
325     if (BgetDirmsg(sd) > 0) {
326       Dmsg1(100, "<stored: %s", sd->msg);
327       /* ****FIXME**** save actual device name */
328       ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
329     } else {
330       ok = false;
331     }
332     if (ok) {
333       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"),
334            device_name.c_str());
335     }
336   }
337   if (!ok) {
338     PoolMem err_msg;
339     if (sd->msg[0]) {
340       PmStrcpy(err_msg, sd->msg); /* save message */
341       Jmsg(jcr, M_FATAL, 0,
342            _("\n"
343              "     Storage daemon didn't accept Device \"%s\" because:\n     "
344              "%s"),
345            device_name.c_str(), err_msg.c_str() /* sd->msg */);
346     } else {
347       Jmsg(jcr, M_FATAL, 0,
348            _("\n"
349              "     Storage daemon didn't accept Device \"%s\" command.\n"),
350            device_name.c_str());
351     }
352   }
353   return ok;
354 }
355 
356 /** Start a thread to handle Storage daemon messages and
357  *  Catalog requests.
358  */
StartStorageDaemonMessageThread(JobControlRecord * jcr)359 bool StartStorageDaemonMessageThread(JobControlRecord* jcr)
360 {
361   int status;
362   pthread_t thid;
363 
364   jcr->IncUseCount(); /* mark in use by msg thread */
365   jcr->impl->sd_msg_thread_done = false;
366   jcr->impl->SD_msg_chan_started = false;
367   Dmsg0(100, "Start SD msg_thread.\n");
368   if ((status = pthread_create(&thid, NULL, msg_thread, (void*)jcr)) != 0) {
369     BErrNo be;
370     Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"),
371           be.bstrerror(status));
372   }
373   /* Wait for thread to start */
374   while (!jcr->impl->SD_msg_chan_started) {
375     Bmicrosleep(0, 50);
376     if (JobCanceled(jcr) || jcr->impl->sd_msg_thread_done) { return false; }
377   }
378   Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->UseCount());
379   return true;
380 }
381 
MsgThreadCleanup(void * arg)382 extern "C" void MsgThreadCleanup(void* arg)
383 {
384   JobControlRecord* jcr = (JobControlRecord*)arg;
385 
386   jcr->db->EndTransaction(jcr); /* Terminate any open transaction */
387   jcr->lock();
388   jcr->impl->sd_msg_thread_done = true;
389   jcr->impl->SD_msg_chan_started = false;
390   jcr->unlock();
391   pthread_cond_broadcast(
392       &jcr->impl->nextrun_ready); /* wakeup any waiting threads */
393   pthread_cond_broadcast(
394       &jcr->impl->term_wait); /* wakeup any waiting threads */
395   Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId,
396         jcr->UseCount());
397   jcr->db->ThreadCleanup(); /* remove thread specific data */
398   FreeJcr(jcr);             /* release jcr */
399 }
400 
401 /** Handle the message channel (i.e. requests from the
402  *  Storage daemon).
403  * Note, we are running in a separate thread.
404  */
msg_thread(void * arg)405 extern "C" void* msg_thread(void* arg)
406 {
407   JobControlRecord* jcr = (JobControlRecord*)arg;
408   BareosSocket* sd;
409   int JobStatus;
410   int n;
411   char auth_key[100];
412   char Job[MAX_NAME_LENGTH];
413   uint32_t JobFiles, JobErrors;
414   uint64_t JobBytes;
415 
416   pthread_detach(pthread_self());
417   SetJcrInThreadSpecificData(jcr);
418   jcr->impl->SD_msg_chan = pthread_self();
419   jcr->impl->SD_msg_chan_started = true;
420   pthread_cleanup_push(MsgThreadCleanup, arg);
421   sd = jcr->store_bsock;
422 
423   /*
424    * Read the Storage daemon's output.
425    */
426   Dmsg0(100, "Start msg_thread loop\n");
427   n = 0;
428   while (!JobCanceled(jcr) && (n = BgetDirmsg(sd)) >= 0) {
429     Dmsg1(400, "<stored: %s", sd->msg);
430     /*
431      * Check for "3000 OK Job Authorization="
432      * Returned by a rerun cmd.
433      */
434     if (sscanf(sd->msg, OK_nextrun, &auth_key) == 1) {
435       if (jcr->sd_auth_key) { free(jcr->sd_auth_key); }
436       jcr->sd_auth_key = strdup(auth_key);
437       pthread_cond_broadcast(
438           &jcr->impl->nextrun_ready); /* wakeup any waiting threads */
439       continue;
440     }
441 
442     /*
443      * Check for "3010 Job <jobid> start"
444      */
445     if (sscanf(sd->msg, Job_start, Job) == 1) { continue; }
446 
447     /*
448      * Check for "3099 Job <JobId> end JobStatus= JobFiles= JobBytes=
449      * JobErrors="
450      */
451     if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles, &JobBytes,
452                &JobErrors) == 5) {
453       jcr->impl->SDJobStatus = JobStatus; /* termination status */
454       jcr->impl->SDJobFiles = JobFiles;
455       jcr->impl->SDJobBytes = JobBytes;
456       jcr->impl->SDErrors = JobErrors;
457       break;
458     }
459     Dmsg1(400, "end loop use=%d\n", jcr->UseCount());
460   }
461   if (n == BNET_HARDEOF) {
462     /*
463      * A lost connection to the storage daemon is FATAL.
464      * This is required, as otherwise
465      * the job could failed to write data
466      * but still end as JS_Warnings (OK -- with warnings).
467      */
468     Qmsg(jcr, M_FATAL, 0, _("Director's comm line to SD dropped.\n"));
469   }
470   if (IsBnetError(sd)) { jcr->impl->SDJobStatus = JS_ErrorTerminated; }
471   pthread_cleanup_pop(1); /* remove and execute the handler */
472   return NULL;
473 }
474 
WaitForStorageDaemonTermination(JobControlRecord * jcr)475 void WaitForStorageDaemonTermination(JobControlRecord* jcr)
476 {
477   int cancel_count = 0;
478   /* Now wait for Storage daemon to Terminate our message thread */
479   while (!jcr->impl->sd_msg_thread_done) {
480     struct timeval tv;
481     struct timezone tz;
482     struct timespec timeout;
483 
484     gettimeofday(&tv, &tz);
485     timeout.tv_nsec = 0;
486     timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
487     Dmsg0(400, "I'm waiting for message thread termination.\n");
488     P(mutex);
489     pthread_cond_timedwait(&jcr->impl->term_wait, &mutex, &timeout);
490     V(mutex);
491     if (jcr->IsCanceled()) {
492       if (jcr->impl->SD_msg_chan_started) {
493         jcr->store_bsock->SetTimedOut();
494         jcr->store_bsock->SetTerminated();
495         SdMsgThreadSendSignal(jcr, TIMEOUT_SIGNAL);
496       }
497       cancel_count++;
498     }
499     /* Give SD 30 seconds to clean up after cancel */
500     if (cancel_count == 6) { break; }
501   }
502   jcr->setJobStatus(JS_Terminated);
503 }
504 
505 } /* namespace directordaemon */
506