1 /*
2 BAREOS - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2012 Planets Communications B.V.
5 Copyright (C) 2013-2020 Bareos GmbH & Co. KG
6
7 This program is Free Software; you can redistribute it and/or
8 modify it under the terms of version three of the GNU Affero General Public
9 License as published by the Free Software Foundation and included
10 in the file LICENSE.
11
12 This program is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Affero General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20 02110-1301, USA.
21 */
22 /*
23 * Marco van Wieringen, November 2012
24 */
25 /**
26 * @file
27 * This file handles commands from another Storage daemon.
28 *
29 * We get here because the Director has initiated a Job with
30 * another Storage daemon, then done the same with this
31 * Storage daemon. When the Storage daemon receives a proper
32 * connection from the other Storage daemon, control is
33 * passed here to handle the subsequent Storage daemon commands.
34 */
35
36 #include "include/bareos.h"
37 #include "stored/stored.h"
38 #include "stored/stored_globals.h"
39 #include "stored/append.h"
40 #include "stored/authenticate.h"
41 #include "stored/jcr_private.h"
42 #include "stored/sd_stats.h"
43 #include "stored/sd_stats.h"
44 #include "lib/bnet.h"
45 #include "lib/bsock.h"
46 #include "lib/edit.h"
47 #include "include/jcr.h"
48
49 namespace storagedaemon {
50
51 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
52
53 /* Imported variables */
54
55 /* Static variables */
56 static char serrmsg[] = "3900 Invalid command\n";
57
58 /* Imported functions */
59
60 /* Forward referenced SD commands */
61 static bool StartReplicationSession(JobControlRecord* jcr);
62 static bool ReplicateData(JobControlRecord* jcr);
63 static bool EndReplicationSession(JobControlRecord* jcr);
64
65 struct s_cmds {
66 const char* cmd;
67 bool (*func)(JobControlRecord* jcr);
68 };
69
70 /**
71 * The following are the recognized commands from the Remote Storage daemon
72 */
73 static struct s_cmds sd_cmds[] = {
74 {"start replicate", StartReplicationSession},
75 {"replicate data", ReplicateData},
76 {"end replicate", EndReplicationSession},
77 {NULL, NULL} /* list terminator */
78 };
79
80 /**
81 * Responses sent to the Remote Storage daemon
82 */
83 static char NO_open[] = "3901 Error replicate session already open\n";
84 static char NOT_opened[] = "3902 Error replicate session not opened\n";
85 static char ERROR_replicate[] = "3903 Error replicate data\n";
86 static char OK_end_replicate[] = "3000 OK end replicate\n";
87 static char OK_start_replicate[] = "3000 OK start replicate ticket = %d\n";
88
89 /**
90 * Responses sent to the Director
91 */
92 static char Job_start[] = "3010 Job %s start\n";
93 static char Job_end[]
94 = "3099 Job %s end JobStatus=%d JobFiles=%d JobBytes=%s JobErrors=%u\n";
95
96 /**
97 * After receiving a connection (in socket_server.c) if it is
98 * from the Storage daemon, this routine is called.
99 */
handle_stored_connection(BareosSocket * sd,char * job_name)100 void* handle_stored_connection(BareosSocket* sd, char* job_name)
101 {
102 JobControlRecord* jcr;
103
104 /**
105 * With the following Bmicrosleep on, running the
106 * SD under the debugger fails.
107 */
108 // Bmicrosleep(0, 50000); /* wait 50 millisecs */
109 if (!(jcr = get_jcr_by_full_name(job_name))) {
110 Jmsg1(NULL, M_FATAL, 0, _("SD connect failed: Job name not found: %s\n"),
111 job_name);
112 Dmsg1(3, "**** Job \"%s\" not found.\n", job_name);
113 sd->close();
114 delete sd;
115 return NULL;
116 }
117
118 Dmsg1(50, "Found Job %s\n", job_name);
119
120 if (jcr->authenticated) {
121 Jmsg2(jcr, M_FATAL, 0,
122 _("Hey!!!! JobId %u Job %s already authenticated.\n"),
123 (uint32_t)jcr->JobId, jcr->Job);
124 Dmsg2(50, "Hey!!!! JobId %u Job %s already authenticated.\n",
125 (uint32_t)jcr->JobId, jcr->Job);
126 sd->close();
127 delete sd;
128 FreeJcr(jcr);
129 return NULL;
130 }
131
132 jcr->store_bsock = sd;
133 jcr->store_bsock->SetJcr(jcr);
134
135 /*
136 * Authenticate the Storage daemon
137 */
138 if (jcr->authenticated || !AuthenticateStoragedaemon(jcr)) {
139 Dmsg1(50, "Authentication failed Job %s\n", jcr->Job);
140 Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate Storage daemon\n"));
141 } else {
142 jcr->authenticated = true;
143 Dmsg2(50, "OK Authentication jid=%u Job %s\n", (uint32_t)jcr->JobId,
144 jcr->Job);
145 }
146
147 if (!jcr->authenticated) { jcr->setJobStatus(JS_ErrorTerminated); }
148
149 pthread_cond_signal(&jcr->impl->job_start_wait); /* wake waiting job */
150 FreeJcr(jcr);
151
152 return NULL;
153 }
154
155 /**
156 * Now talk to the SD and do what he says
157 */
DoSdCommands(JobControlRecord * jcr)158 static void DoSdCommands(JobControlRecord* jcr)
159 {
160 int i, status;
161 bool found, quit;
162 BareosSocket* sd = jcr->store_bsock;
163
164 sd->SetJcr(jcr);
165 quit = false;
166 while (!quit) {
167 /*
168 * Read command coming from the Storage daemon
169 */
170 status = sd->recv();
171 if (IsBnetStop(sd)) { /* hardeof or error */
172 break; /* connection terminated */
173 }
174 if (status <= 0) { continue; /* ignore signals and zero length msgs */ }
175
176 Dmsg1(110, "<stored: %s", sd->msg);
177 found = false;
178 for (i = 0; sd_cmds[i].cmd; i++) {
179 if (bstrncmp(sd_cmds[i].cmd, sd->msg, strlen(sd_cmds[i].cmd))) {
180 found = true; /* indicate command found */
181 jcr->errmsg[0] = 0;
182 if (!sd_cmds[i].func(jcr)) { /* do command */
183 /*
184 * Note sd->msg command may be destroyed by comm activity
185 */
186 if (!JobCanceled(jcr)) {
187 if (jcr->errmsg[0]) {
188 Jmsg1(jcr, M_FATAL, 0,
189 _("Command error with SD, hanging up. %s\n"), jcr->errmsg);
190 } else {
191 Jmsg0(jcr, M_FATAL, 0, _("Command error with SD, hanging up.\n"));
192 }
193 jcr->setJobStatus(JS_ErrorTerminated);
194 }
195 quit = true;
196 }
197 break;
198 }
199 }
200
201 if (!found) { /* command not found */
202 if (!JobCanceled(jcr)) {
203 Jmsg1(jcr, M_FATAL, 0, _("SD command not found: %s\n"), sd->msg);
204 Dmsg1(110, "<stored: Command not found: %s\n", sd->msg);
205 }
206 sd->fsend(serrmsg);
207 break;
208 }
209 }
210 sd->signal(BNET_TERMINATE); /* signal to SD job is done */
211 }
212
213 /**
214 * Run a Storage daemon replicate Job -- Wait for remote Storage daemon
215 * to connect and authenticate it we then will get a wakeup sign using
216 * the job_start_wait conditional
217 *
218 * Director sends us this command.
219 *
220 * Basic task here is:
221 * - Read a command from the Storage daemon
222 * - Execute it
223 */
DoListenRun(JobControlRecord * jcr)224 bool DoListenRun(JobControlRecord* jcr)
225 {
226 char ec1[30];
227 int errstat = 0;
228 BareosSocket* dir = jcr->dir_bsock;
229
230 jcr->sendJobStatus(JS_WaitSD); /* wait for SD to connect */
231
232 Dmsg2(50, "%s waiting for SD to contact SD key=%s\n", jcr->Job,
233 jcr->sd_auth_key);
234 Dmsg2(800, "Wait SD for jid=%d %p\n", jcr->JobId, jcr);
235
236 /*
237 * Wait for the Storage daemon to contact us to start the Job, when he does,
238 * we will be released.
239 */
240 P(mutex);
241 while (!jcr->authenticated && !JobCanceled(jcr)) {
242 errstat = pthread_cond_wait(&jcr->impl->job_start_wait, &mutex);
243 if (errstat == EINVAL || errstat == EPERM) { break; }
244 Dmsg1(800, "=== Auth cond errstat=%d\n", errstat);
245 }
246 Dmsg3(50, "Auth=%d canceled=%d errstat=%d\n", jcr->authenticated,
247 JobCanceled(jcr), errstat);
248 V(mutex);
249
250 if (!jcr->authenticated || !jcr->store_bsock) {
251 Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
252 DequeueMessages(jcr); /* send any queued messages */
253
254 goto cleanup;
255 }
256
257 Dmsg1(120, "Start run Job=%s\n", jcr->Job);
258
259 dir->fsend(Job_start, jcr->Job);
260 jcr->start_time = time(NULL);
261 jcr->run_time = jcr->start_time;
262 jcr->sendJobStatus(JS_Running);
263
264 /*
265 * See if we need to limit the inbound bandwidth.
266 */
267 if (me->max_bandwidth_per_job && jcr->store_bsock) {
268 jcr->store_bsock->SetBwlimit(me->max_bandwidth_per_job);
269 if (me->allow_bw_bursting) { jcr->store_bsock->SetBwlimitBursting(); }
270 }
271
272 DoSdCommands(jcr);
273
274 jcr->end_time = time(NULL);
275
276 DequeueMessages(jcr); /* send any queued messages */
277 jcr->setJobStatus(JS_Terminated);
278
279 cleanup:
280 GeneratePluginEvent(jcr, bSdEventJobEnd);
281
282 dir->fsend(Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles,
283 edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors);
284
285 dir->signal(BNET_EOD); /* send EOD to Director daemon */
286
287 FreePlugins(jcr); /* release instantiated plugins */
288
289 /*
290 * After a listen cmd we are done e.g. return false.
291 */
292 return false;
293 }
294
295 /**
296 * Start of replication.
297 */
StartReplicationSession(JobControlRecord * jcr)298 static bool StartReplicationSession(JobControlRecord* jcr)
299 {
300 BareosSocket* sd = jcr->store_bsock;
301
302 Dmsg1(120, "Start replication session: %s", sd->msg);
303 if (jcr->impl->session_opened) {
304 PmStrcpy(jcr->errmsg, _("Attempt to open already open session.\n"));
305 sd->fsend(NO_open);
306 return false;
307 }
308
309 jcr->impl->session_opened = true;
310
311 /*
312 * Send "Ticket" to Storage Daemon
313 */
314 sd->fsend(OK_start_replicate, jcr->VolSessionId);
315 Dmsg1(110, ">stored: %s", sd->msg);
316
317 return true;
318 }
319
320 /**
321 * Replicate data.
322 * Open Data Channel and receive Data for archiving
323 * Write the Data to the archive device
324 */
ReplicateData(JobControlRecord * jcr)325 static bool ReplicateData(JobControlRecord* jcr)
326 {
327 BareosSocket* sd = jcr->store_bsock;
328
329 Dmsg1(120, "Replicate data: %s", sd->msg);
330 if (jcr->impl->session_opened) {
331 utime_t now;
332
333 /*
334 * Update the initial Job Statistics.
335 */
336 now = (utime_t)time(NULL);
337 UpdateJobStatistics(jcr, now);
338
339 Dmsg1(110, "<stored: %s", sd->msg);
340 if (DoAppendData(jcr, sd, "SD")) {
341 return true;
342 } else {
343 PmStrcpy(jcr->errmsg, _("Replicate data error.\n"));
344 BnetSuppressErrorMessages(sd, 1); /* ignore errors at this point */
345 sd->fsend(ERROR_replicate);
346 }
347 } else {
348 PmStrcpy(jcr->errmsg, _("Attempt to replicate on non-open session.\n"));
349 sd->fsend(NOT_opened);
350 }
351
352 return false;
353 }
354
355 /**
356 * End a replication session.
357 */
EndReplicationSession(JobControlRecord * jcr)358 static bool EndReplicationSession(JobControlRecord* jcr)
359 {
360 BareosSocket* sd = jcr->store_bsock;
361
362 Dmsg1(120, "stored<stored: %s", sd->msg);
363 if (!jcr->impl->session_opened) {
364 PmStrcpy(jcr->errmsg, _("Attempt to close non-open session.\n"));
365 sd->fsend(NOT_opened);
366 return false;
367 }
368 return sd->fsend(OK_end_replicate);
369 }
370
371 } /* namespace storagedaemon */
372