1 /*
2    BAREOS - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2012 Planets Communications B.V.
5    Copyright (C) 2013-2020 Bareos GmbH & Co. KG
6 
7    This program is Free Software; you can redistribute it and/or
8    modify it under the terms of version three of the GNU Affero General Public
9    License as published by the Free Software Foundation and included
10    in the file LICENSE.
11 
12    This program is distributed in the hope that it will be useful, but
13    WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15    General Public License for more details.
16 
17    You should have received a copy of the GNU Affero General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20    02110-1301, USA.
21 */
22 /*
23  * Marco van Wieringen, November 2012
24  */
25 /**
26  * @file
27  * This file handles commands from another Storage daemon.
28  *
29  * We get here because the Director has initiated a Job with
30  * another Storage daemon, then done the same with this
31  * Storage daemon. When the Storage daemon receives a proper
32  * connection from the other Storage daemon, control is
33  * passed here to handle the subsequent Storage daemon commands.
34  */
35 
36 #include "include/bareos.h"
37 #include "stored/stored.h"
38 #include "stored/stored_globals.h"
39 #include "stored/append.h"
40 #include "stored/authenticate.h"
41 #include "stored/jcr_private.h"
42 #include "stored/sd_stats.h"
43 #include "stored/sd_stats.h"
44 #include "lib/bnet.h"
45 #include "lib/bsock.h"
46 #include "lib/edit.h"
47 #include "include/jcr.h"
48 
49 namespace storagedaemon {
50 
51 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
52 
53 /* Imported variables */
54 
55 /* Static variables */
56 static char serrmsg[] = "3900 Invalid command\n";
57 
58 /* Imported functions */
59 
60 /* Forward referenced SD commands */
61 static bool StartReplicationSession(JobControlRecord* jcr);
62 static bool ReplicateData(JobControlRecord* jcr);
63 static bool EndReplicationSession(JobControlRecord* jcr);
64 
65 struct s_cmds {
66   const char* cmd;
67   bool (*func)(JobControlRecord* jcr);
68 };
69 
70 /**
71  * The following are the recognized commands from the Remote Storage daemon
72  */
73 static struct s_cmds sd_cmds[] = {
74     {"start replicate", StartReplicationSession},
75     {"replicate data", ReplicateData},
76     {"end replicate", EndReplicationSession},
77     {NULL, NULL} /* list terminator */
78 };
79 
80 /**
81  * Responses sent to the Remote Storage daemon
82  */
83 static char NO_open[] = "3901 Error replicate session already open\n";
84 static char NOT_opened[] = "3902 Error replicate session not opened\n";
85 static char ERROR_replicate[] = "3903 Error replicate data\n";
86 static char OK_end_replicate[] = "3000 OK end replicate\n";
87 static char OK_start_replicate[] = "3000 OK start replicate ticket = %d\n";
88 
89 /**
90  * Responses sent to the Director
91  */
92 static char Job_start[] = "3010 Job %s start\n";
93 static char Job_end[]
94     = "3099 Job %s end JobStatus=%d JobFiles=%d JobBytes=%s JobErrors=%u\n";
95 
96 /**
97  * After receiving a connection (in socket_server.c) if it is
98  * from the Storage daemon, this routine is called.
99  */
handle_stored_connection(BareosSocket * sd,char * job_name)100 void* handle_stored_connection(BareosSocket* sd, char* job_name)
101 {
102   JobControlRecord* jcr;
103 
104   /**
105    * With the following Bmicrosleep on, running the
106    * SD under the debugger fails.
107    */
108   // Bmicrosleep(0, 50000);             /* wait 50 millisecs */
109   if (!(jcr = get_jcr_by_full_name(job_name))) {
110     Jmsg1(NULL, M_FATAL, 0, _("SD connect failed: Job name not found: %s\n"),
111           job_name);
112     Dmsg1(3, "**** Job \"%s\" not found.\n", job_name);
113     sd->close();
114     delete sd;
115     return NULL;
116   }
117 
118   Dmsg1(50, "Found Job %s\n", job_name);
119 
120   if (jcr->authenticated) {
121     Jmsg2(jcr, M_FATAL, 0,
122           _("Hey!!!! JobId %u Job %s already authenticated.\n"),
123           (uint32_t)jcr->JobId, jcr->Job);
124     Dmsg2(50, "Hey!!!! JobId %u Job %s already authenticated.\n",
125           (uint32_t)jcr->JobId, jcr->Job);
126     sd->close();
127     delete sd;
128     FreeJcr(jcr);
129     return NULL;
130   }
131 
132   jcr->store_bsock = sd;
133   jcr->store_bsock->SetJcr(jcr);
134 
135   /*
136    * Authenticate the Storage daemon
137    */
138   if (jcr->authenticated || !AuthenticateStoragedaemon(jcr)) {
139     Dmsg1(50, "Authentication failed Job %s\n", jcr->Job);
140     Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate Storage daemon\n"));
141   } else {
142     jcr->authenticated = true;
143     Dmsg2(50, "OK Authentication jid=%u Job %s\n", (uint32_t)jcr->JobId,
144           jcr->Job);
145   }
146 
147   if (!jcr->authenticated) { jcr->setJobStatus(JS_ErrorTerminated); }
148 
149   pthread_cond_signal(&jcr->impl->job_start_wait); /* wake waiting job */
150   FreeJcr(jcr);
151 
152   return NULL;
153 }
154 
155 /**
156  * Now talk to the SD and do what he says
157  */
DoSdCommands(JobControlRecord * jcr)158 static void DoSdCommands(JobControlRecord* jcr)
159 {
160   int i, status;
161   bool found, quit;
162   BareosSocket* sd = jcr->store_bsock;
163 
164   sd->SetJcr(jcr);
165   quit = false;
166   while (!quit) {
167     /*
168      * Read command coming from the Storage daemon
169      */
170     status = sd->recv();
171     if (IsBnetStop(sd)) { /* hardeof or error */
172       break;              /* connection terminated */
173     }
174     if (status <= 0) { continue; /* ignore signals and zero length msgs */ }
175 
176     Dmsg1(110, "<stored: %s", sd->msg);
177     found = false;
178     for (i = 0; sd_cmds[i].cmd; i++) {
179       if (bstrncmp(sd_cmds[i].cmd, sd->msg, strlen(sd_cmds[i].cmd))) {
180         found = true; /* indicate command found */
181         jcr->errmsg[0] = 0;
182         if (!sd_cmds[i].func(jcr)) { /* do command */
183           /*
184            * Note sd->msg command may be destroyed by comm activity
185            */
186           if (!JobCanceled(jcr)) {
187             if (jcr->errmsg[0]) {
188               Jmsg1(jcr, M_FATAL, 0,
189                     _("Command error with SD, hanging up. %s\n"), jcr->errmsg);
190             } else {
191               Jmsg0(jcr, M_FATAL, 0, _("Command error with SD, hanging up.\n"));
192             }
193             jcr->setJobStatus(JS_ErrorTerminated);
194           }
195           quit = true;
196         }
197         break;
198       }
199     }
200 
201     if (!found) { /* command not found */
202       if (!JobCanceled(jcr)) {
203         Jmsg1(jcr, M_FATAL, 0, _("SD command not found: %s\n"), sd->msg);
204         Dmsg1(110, "<stored: Command not found: %s\n", sd->msg);
205       }
206       sd->fsend(serrmsg);
207       break;
208     }
209   }
210   sd->signal(BNET_TERMINATE); /* signal to SD job is done */
211 }
212 
213 /**
214  * Run a Storage daemon replicate Job -- Wait for remote Storage daemon
215  * to connect and authenticate it we then will get a wakeup sign using
216  * the job_start_wait conditional
217  *
218  * Director sends us this command.
219  *
220  * Basic task here is:
221  * - Read a command from the Storage daemon
222  * - Execute it
223  */
DoListenRun(JobControlRecord * jcr)224 bool DoListenRun(JobControlRecord* jcr)
225 {
226   char ec1[30];
227   int errstat = 0;
228   BareosSocket* dir = jcr->dir_bsock;
229 
230   jcr->sendJobStatus(JS_WaitSD); /* wait for SD to connect */
231 
232   Dmsg2(50, "%s waiting for SD to contact SD key=%s\n", jcr->Job,
233         jcr->sd_auth_key);
234   Dmsg2(800, "Wait SD for jid=%d %p\n", jcr->JobId, jcr);
235 
236   /*
237    * Wait for the Storage daemon to contact us to start the Job, when he does,
238    * we will be released.
239    */
240   P(mutex);
241   while (!jcr->authenticated && !JobCanceled(jcr)) {
242     errstat = pthread_cond_wait(&jcr->impl->job_start_wait, &mutex);
243     if (errstat == EINVAL || errstat == EPERM) { break; }
244     Dmsg1(800, "=== Auth cond errstat=%d\n", errstat);
245   }
246   Dmsg3(50, "Auth=%d canceled=%d errstat=%d\n", jcr->authenticated,
247         JobCanceled(jcr), errstat);
248   V(mutex);
249 
250   if (!jcr->authenticated || !jcr->store_bsock) {
251     Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
252     DequeueMessages(jcr); /* send any queued messages */
253 
254     goto cleanup;
255   }
256 
257   Dmsg1(120, "Start run Job=%s\n", jcr->Job);
258 
259   dir->fsend(Job_start, jcr->Job);
260   jcr->start_time = time(NULL);
261   jcr->run_time = jcr->start_time;
262   jcr->sendJobStatus(JS_Running);
263 
264   /*
265    * See if we need to limit the inbound bandwidth.
266    */
267   if (me->max_bandwidth_per_job && jcr->store_bsock) {
268     jcr->store_bsock->SetBwlimit(me->max_bandwidth_per_job);
269     if (me->allow_bw_bursting) { jcr->store_bsock->SetBwlimitBursting(); }
270   }
271 
272   DoSdCommands(jcr);
273 
274   jcr->end_time = time(NULL);
275 
276   DequeueMessages(jcr); /* send any queued messages */
277   jcr->setJobStatus(JS_Terminated);
278 
279 cleanup:
280   GeneratePluginEvent(jcr, bSdEventJobEnd);
281 
282   dir->fsend(Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles,
283              edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors);
284 
285   dir->signal(BNET_EOD); /* send EOD to Director daemon */
286 
287   FreePlugins(jcr); /* release instantiated plugins */
288 
289   /*
290    * After a listen cmd we are done e.g. return false.
291    */
292   return false;
293 }
294 
295 /**
296  * Start of replication.
297  */
StartReplicationSession(JobControlRecord * jcr)298 static bool StartReplicationSession(JobControlRecord* jcr)
299 {
300   BareosSocket* sd = jcr->store_bsock;
301 
302   Dmsg1(120, "Start replication session: %s", sd->msg);
303   if (jcr->impl->session_opened) {
304     PmStrcpy(jcr->errmsg, _("Attempt to open already open session.\n"));
305     sd->fsend(NO_open);
306     return false;
307   }
308 
309   jcr->impl->session_opened = true;
310 
311   /*
312    * Send "Ticket" to Storage Daemon
313    */
314   sd->fsend(OK_start_replicate, jcr->VolSessionId);
315   Dmsg1(110, ">stored: %s", sd->msg);
316 
317   return true;
318 }
319 
320 /**
321  * Replicate data.
322  *    Open Data Channel and receive Data for archiving
323  *    Write the Data to the archive device
324  */
ReplicateData(JobControlRecord * jcr)325 static bool ReplicateData(JobControlRecord* jcr)
326 {
327   BareosSocket* sd = jcr->store_bsock;
328 
329   Dmsg1(120, "Replicate data: %s", sd->msg);
330   if (jcr->impl->session_opened) {
331     utime_t now;
332 
333     /*
334      * Update the initial Job Statistics.
335      */
336     now = (utime_t)time(NULL);
337     UpdateJobStatistics(jcr, now);
338 
339     Dmsg1(110, "<stored: %s", sd->msg);
340     if (DoAppendData(jcr, sd, "SD")) {
341       return true;
342     } else {
343       PmStrcpy(jcr->errmsg, _("Replicate data error.\n"));
344       BnetSuppressErrorMessages(sd, 1); /* ignore errors at this point */
345       sd->fsend(ERROR_replicate);
346     }
347   } else {
348     PmStrcpy(jcr->errmsg, _("Attempt to replicate on non-open session.\n"));
349     sd->fsend(NOT_opened);
350   }
351 
352   return false;
353 }
354 
355 /**
356  * End a replication session.
357  */
EndReplicationSession(JobControlRecord * jcr)358 static bool EndReplicationSession(JobControlRecord* jcr)
359 {
360   BareosSocket* sd = jcr->store_bsock;
361 
362   Dmsg1(120, "stored<stored: %s", sd->msg);
363   if (!jcr->impl->session_opened) {
364     PmStrcpy(jcr->errmsg, _("Attempt to close non-open session.\n"));
365     sd->fsend(NOT_opened);
366     return false;
367   }
368   return sd->fsend(OK_end_replicate);
369 }
370 
371 } /* namespace storagedaemon */
372