1 /* Copyright 2008-2015 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
15
16 #include "wsrep_sst.h"
17
18 #include <mysqld.h>
19 #include <sql_class.h>
20 #include <set_var.h>
21 #include <sql_acl.h>
22 #include <sql_reload.h>
23 #include <sql_parse.h>
24 #include "wsrep_priv.h"
25 #include "wsrep_utils.h"
26 #include "wsrep_xid.h"
27 #include <cstdio>
28 #include <cstdlib>
29 #include <cctype>
30
31 extern const char wsrep_defaults_file[];
32 extern const char wsrep_defaults_group_suffix[];
33
34 #define WSREP_SST_OPT_ROLE "--role"
35 #define WSREP_SST_OPT_ADDR "--address"
36 #define WSREP_SST_OPT_AUTH "--auth"
37 #define WSREP_SST_OPT_DATA "--datadir"
38 #define WSREP_SST_OPT_CONF "--defaults-file"
39 #define WSREP_SST_OPT_CONF_SUFFIX "--defaults-group-suffix"
40 #define WSREP_SST_OPT_PARENT "--parent"
41 #define WSREP_SST_OPT_BINLOG "--binlog"
42
43 // mysqldump-specific options
44 #define WSREP_SST_OPT_USER "--user"
45 #define WSREP_SST_OPT_PSWD "--password"
46 #define WSREP_SST_OPT_HOST "--host"
47 #define WSREP_SST_OPT_PORT "--port"
48 #define WSREP_SST_OPT_LPORT "--local-port"
49
50 // donor-specific
51 #define WSREP_SST_OPT_SOCKET "--socket"
52 #define WSREP_SST_OPT_GTID "--gtid"
53 #define WSREP_SST_OPT_BYPASS "--bypass"
54
55 #define WSREP_SST_MYSQLDUMP "mysqldump"
56 #define WSREP_SST_RSYNC "rsync"
57 #define WSREP_SST_SKIP "skip"
58 #define WSREP_SST_XTRABACKUP "xtrabackup"
59 #define WSREP_SST_XTRABACKUP_V2 "xtrabackup-v2"
60 #define WSREP_SST_DEFAULT WSREP_SST_RSYNC
61 #define WSREP_SST_ADDRESS_AUTO "AUTO"
62 #define WSREP_SST_AUTH_MASK "********"
63
64 const char* wsrep_sst_method = WSREP_SST_DEFAULT;
65 const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
66 const char* wsrep_sst_donor = "";
67 char* wsrep_sst_auth = NULL;
68
69 // container for real auth string
70 static const char* sst_auth_real = NULL;
71 my_bool wsrep_sst_donor_rejects_queries = FALSE;
72
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)73 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
74 {
75 char buff[FN_REFLEN];
76 String str(buff, sizeof(buff), system_charset_info), *res;
77 const char* c_str = NULL;
78
79 if ((res = var->value->val_str(&str)) &&
80 (c_str = res->c_ptr()) &&
81 strlen(c_str) > 0)
82 return 0;
83
84 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL");
85 return 1;
86 }
87
wsrep_sst_method_update(sys_var * self,THD * thd,enum_var_type type)88 bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type)
89 {
90 return 0;
91 }
92
sst_receive_address_check(const char * str)93 static bool sst_receive_address_check (const char* str)
94 {
95 if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) ||
96 !strncasecmp(str, "localhost", strlen("localhost")))
97 {
98 return 1;
99 }
100
101 return 0;
102 }
103
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)104 bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
105 {
106 const char* c_str = var->value->str_value.c_ptr();
107
108 if (sst_receive_address_check (c_str))
109 {
110 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL");
111 return 1;
112 }
113
114 return 0;
115 }
116
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)117 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
118 enum_var_type type)
119 {
120 return 0;
121 }
122
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)123 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
124 {
125 return 0;
126 }
sst_auth_real_set(const char * value)127 static bool sst_auth_real_set (const char* value)
128 {
129 const char* v = strdup (value);
130
131 if (v)
132 {
133 if (sst_auth_real) free (const_cast<char*>(sst_auth_real));
134 sst_auth_real = v;
135
136 if (strlen(sst_auth_real))
137 {
138 if (wsrep_sst_auth)
139 {
140 my_free ((void*)wsrep_sst_auth);
141 wsrep_sst_auth = my_strdup(WSREP_SST_AUTH_MASK, MYF(0));
142 //strncpy (wsrep_sst_auth, WSREP_SST_AUTH_MASK,
143 // sizeof(wsrep_sst_auth) - 1);
144 }
145 else
146 wsrep_sst_auth = my_strdup (WSREP_SST_AUTH_MASK, MYF(0));
147 }
148 return 0;
149 }
150
151 return 1;
152 }
153
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)154 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
155 {
156 return sst_auth_real_set (wsrep_sst_auth);
157 }
158
wsrep_sst_auth_init(const char * value)159 void wsrep_sst_auth_init (const char* value)
160 {
161 if (wsrep_sst_auth == value) wsrep_sst_auth = NULL;
162 if (value) sst_auth_real_set (value);
163 }
164
wsrep_sst_auth_free()165 void wsrep_sst_auth_free()
166 {
167 if (wsrep_sst_auth) { my_free ((void*)wsrep_sst_auth); }
168 if (sst_auth_real) { free (const_cast<char*>(sst_auth_real)); }
169 wsrep_sst_auth= NULL;
170 sst_auth_real= NULL;
171 }
172
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)173 bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
174 {
175 return 0;
176 }
177
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)178 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
179 {
180 return 0;
181 }
182
183 static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
184
wsrep_before_SE()185 bool wsrep_before_SE()
186 {
187 return (wsrep_provider != NULL
188 && strcmp (wsrep_provider, WSREP_NONE)
189 && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
190 && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
191 }
192
193 static bool sst_complete = false;
194 static bool sst_needed = false;
195
wsrep_sst_grab()196 void wsrep_sst_grab ()
197 {
198 WSREP_INFO("wsrep_sst_grab()");
199 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
200 sst_complete = false;
201 mysql_mutex_unlock (&LOCK_wsrep_sst);
202 }
203
204 // Wait for end of SST
wsrep_sst_wait()205 bool wsrep_sst_wait ()
206 {
207 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
208 while (!sst_complete)
209 {
210 WSREP_INFO("Waiting for SST to complete.");
211 mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst);
212 }
213
214 if (local_seqno >= 0)
215 {
216 WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
217 }
218 else
219 {
220 WSREP_ERROR("SST failed: %d (%s)",
221 int(-local_seqno), strerror(-local_seqno));
222 }
223
224 mysql_mutex_unlock (&LOCK_wsrep_sst);
225
226 return (local_seqno >= 0);
227 }
228
229 // Signal end of SST
wsrep_sst_complete(const wsrep_uuid_t * sst_uuid,wsrep_seqno_t sst_seqno,bool needed)230 void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
231 wsrep_seqno_t sst_seqno,
232 bool needed)
233 {
234 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
235 if (!sst_complete)
236 {
237 sst_complete = true;
238 sst_needed = needed;
239 local_uuid = *sst_uuid;
240 local_seqno = sst_seqno;
241 mysql_cond_signal (&COND_wsrep_sst);
242 }
243 else
244 {
245 /* This can happen when called from wsrep_synced_cb().
246 At the moment there is no way to check there
247 if main thread is still waiting for signal,
248 so wsrep_sst_complete() is called from there
249 each time wsrep_ready changes from FALSE -> TRUE.
250 */
251 WSREP_DEBUG("Nobody is waiting for SST.");
252 }
253 mysql_mutex_unlock (&LOCK_wsrep_sst);
254 }
255
wsrep_sst_received(wsrep_t * const wsrep,const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno,const void * const state,size_t const state_len)256 void wsrep_sst_received (wsrep_t* const wsrep,
257 const wsrep_uuid_t& uuid,
258 wsrep_seqno_t const seqno,
259 const void* const state,
260 size_t const state_len)
261 {
262 wsrep_get_SE_checkpoint(local_uuid, local_seqno);
263
264 if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
265 local_seqno < seqno || seqno < 0)
266 {
267 wsrep_set_SE_checkpoint(uuid, seqno);
268 local_uuid = uuid;
269 local_seqno = seqno;
270 }
271 else if (local_seqno > seqno)
272 {
273 WSREP_WARN("SST postion is in the past: %lld, current: %lld. "
274 "Can't continue.",
275 (long long)seqno, (long long)local_seqno);
276 unireg_abort(1);
277 }
278
279 wsrep_init_sidno(uuid);
280
281 if (wsrep)
282 {
283 int const rcode(seqno < 0 ? seqno : 0);
284 wsrep_gtid_t const state_id = {
285 uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
286 };
287
288 wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
289 }
290 }
291
292 // Let applier threads to continue
wsrep_sst_continue()293 void wsrep_sst_continue ()
294 {
295 if (sst_needed)
296 {
297 WSREP_INFO("Signalling provider to continue.");
298 // local_uuid and local_seqno are global variables and are volatile
299 wsrep_uuid_t const sst_uuid = local_uuid;
300 wsrep_seqno_t const sst_seqno = local_seqno;
301 wsrep_sst_received (wsrep, sst_uuid, sst_seqno, NULL, 0);
302 }
303 }
304
305 struct sst_thread_arg
306 {
307 const char* cmd;
308 char** env;
309 char* ret_str;
310 int err;
311 mysql_mutex_t lock;
312 mysql_cond_t cond;
313
sst_thread_argsst_thread_arg314 sst_thread_arg (const char* c, char** e)
315 : cmd(c), env(e), ret_str(0), err(-1)
316 {
317 mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
318 mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
319 }
320
~sst_thread_argsst_thread_arg321 ~sst_thread_arg()
322 {
323 mysql_cond_destroy (&cond);
324 mysql_mutex_unlock (&lock);
325 mysql_mutex_destroy (&lock);
326 }
327 };
328
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)329 static int sst_scan_uuid_seqno (const char* str,
330 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
331 {
332 int offt = wsrep_uuid_scan (str, strlen(str), uuid);
333 if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
334 {
335 *seqno = strtoll (str + offt + 1, NULL, 10);
336 if (*seqno != LLONG_MAX || errno != ERANGE)
337 {
338 return 0;
339 }
340 }
341
342 WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
343 return EINVAL;
344 }
345
346 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)347 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
348 {
349 char* ret= fgets (buf, buf_len, stream);
350
351 if (ret)
352 {
353 size_t len = strlen(ret);
354 if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
355 }
356
357 return ret;
358 }
359
360 /*
361 Generate opt_binlog_opt_val for sst_donate_other(), sst_prepare_other().
362
363 Returns zero on success, negative error code otherwise.
364
365 String containing binlog name is stored in param ret if binlog is enabled
366 and GTID mode is on, otherwise empty string. Returned string should be
367 freed with my_free().
368 */
generate_binlog_opt_val(char ** ret)369 static int generate_binlog_opt_val(char** ret)
370 {
371 DBUG_ASSERT(ret);
372 *ret= NULL;
373 if (opt_bin_log && gtid_mode > 0)
374 {
375 assert(opt_bin_logname);
376 *ret= my_strdup(opt_bin_logname, MYF(0));
377 }
378 else
379 {
380 *ret= my_strdup("", MYF(0));
381 }
382 if (!*ret) return -ENOMEM;
383 return 0;
384 }
385
sst_joiner_thread(void * a)386 static void* sst_joiner_thread (void* a)
387 {
388 sst_thread_arg* arg= (sst_thread_arg*) a;
389 int err= 1;
390
391 {
392 const char magic[] = "ready";
393 const size_t magic_len = sizeof(magic) - 1;
394 const size_t out_len = 512;
395 char out[out_len];
396
397 WSREP_INFO("Running: '%s'", arg->cmd);
398
399 wsp::process proc (arg->cmd, "r", arg->env);
400
401 if (proc.pipe() && !proc.error())
402 {
403 const char* tmp= my_fgets (out, out_len, proc.pipe());
404
405 if (!tmp || strlen(tmp) < (magic_len + 2) ||
406 strncasecmp (tmp, magic, magic_len))
407 {
408 WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
409 magic, arg->cmd, tmp);
410 proc.wait();
411 if (proc.error()) err = proc.error();
412 }
413 else
414 {
415 err = 0;
416 }
417 }
418 else
419 {
420 err = proc.error();
421 WSREP_ERROR("Failed to execute: %s : %d (%s)",
422 arg->cmd, err, strerror(err));
423 }
424
425 // signal sst_prepare thread with ret code,
426 // it will go on sending SST request
427 mysql_mutex_lock (&arg->lock);
428 if (!err)
429 {
430 arg->ret_str = strdup (out + magic_len + 1);
431 if (!arg->ret_str) err = ENOMEM;
432 }
433 arg->err = -err;
434 mysql_cond_signal (&arg->cond);
435 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
436
437 if (err) return NULL; /* lp:808417 - return immediately, don't signal
438 * initializer thread to ensure single thread of
439 * shutdown. */
440
441 wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
442 wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
443
444 // in case of successfull receiver start, wait for SST completion/end
445 char* tmp = my_fgets (out, out_len, proc.pipe());
446
447 proc.wait();
448 err= EINVAL;
449
450 if (!tmp)
451 {
452 WSREP_ERROR("Failed to read uuid:seqno from joiner script.");
453 if (proc.error()) err = proc.error();
454 }
455 else
456 {
457 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
458 }
459
460 if (err)
461 {
462 ret_uuid= WSREP_UUID_UNDEFINED;
463 ret_seqno= -err;
464 }
465
466 // Tell initializer thread that SST is complete
467 wsrep_sst_complete (&ret_uuid, ret_seqno, true);
468 }
469
470 return NULL;
471 }
472
473 #define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH"
474
sst_append_auth_env(wsp::env & env,const char * sst_auth)475 static int sst_append_auth_env(wsp::env& env, const char* sst_auth)
476 {
477 int const sst_auth_size= strlen(WSREP_SST_AUTH_ENV) + 1 /* = */
478 + (sst_auth ? strlen(sst_auth) : 0) + 1 /* \0 */;
479
480 wsp::string sst_auth_str(sst_auth_size); // for automatic cleanup on return
481 if (!sst_auth_str()) return -ENOMEM;
482
483 int ret= snprintf(sst_auth_str(), sst_auth_size, "%s=%s",
484 WSREP_SST_AUTH_ENV, sst_auth ? sst_auth : "");
485
486 if (ret < 0 || ret >= sst_auth_size)
487 {
488 WSREP_ERROR("sst_append_auth_env(): snprintf() failed: %d", ret);
489 return (ret < 0 ? ret : -EMSGSIZE);
490 }
491
492 env.append(sst_auth_str());
493 return -env.error();
494 }
495
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)496 static ssize_t sst_prepare_other (const char* method,
497 const char* sst_auth,
498 const char* addr_in,
499 const char** addr_out)
500 {
501 int const cmd_len= 4096;
502 wsp::string cmd_str(cmd_len);
503
504 if (!cmd_str())
505 {
506 WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %d bytes",
507 cmd_len);
508 return -ENOMEM;
509 }
510
511 const char* binlog_opt= "";
512 char* binlog_opt_val= NULL;
513
514 int ret;
515 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
516 {
517 WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
518 ret);
519 return ret;
520 }
521 if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
522
523 ret= snprintf (cmd_str(), cmd_len,
524 "wsrep_sst_%s "
525 WSREP_SST_OPT_ROLE" 'joiner' "
526 WSREP_SST_OPT_ADDR" '%s' "
527 WSREP_SST_OPT_DATA" '%s' "
528 WSREP_SST_OPT_CONF" '%s' "
529 WSREP_SST_OPT_CONF_SUFFIX" '%s' "
530 WSREP_SST_OPT_PARENT" '%d'"
531 " %s '%s' ",
532 method, addr_in, mysql_real_data_home,
533 wsrep_defaults_file, wsrep_defaults_group_suffix,
534 (int)getpid(), binlog_opt, binlog_opt_val);
535 my_free(binlog_opt_val);
536
537 if (ret < 0 || ret >= cmd_len)
538 {
539 WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
540 return (ret < 0 ? ret : -EMSGSIZE);
541 }
542
543 wsp::env env(NULL);
544 if (env.error())
545 {
546 WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
547 return -env.error();
548 }
549
550 if ((ret= sst_append_auth_env(env, sst_auth)))
551 {
552 WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
553 return ret;
554 }
555
556 pthread_t tmp;
557 sst_thread_arg arg(cmd_str(), env());
558 mysql_mutex_lock (&arg.lock);
559 ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
560 if (ret)
561 {
562 WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)",
563 ret, strerror(ret));
564 return -ret;
565 }
566 mysql_cond_wait (&arg.cond, &arg.lock);
567
568 *addr_out= arg.ret_str;
569
570 if (!arg.err)
571 ret = strlen(*addr_out);
572 else
573 {
574 assert (arg.err < 0);
575 ret = arg.err;
576 }
577
578 pthread_detach (tmp);
579
580 return ret;
581 }
582
583 extern uint mysqld_port;
584
585 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)586 static ssize_t sst_prepare_mysqldump (const char* addr_in,
587 const char** addr_out)
588 {
589 ssize_t ret = strlen (addr_in);
590
591 if (!strrchr(addr_in, ':'))
592 {
593 ssize_t s = ret + 7;
594 char* tmp = (char*) malloc (s);
595
596 if (tmp)
597 {
598 ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
599
600 if (ret > 0 && ret < s)
601 {
602 *addr_out= tmp;
603 return ret;
604 }
605 if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
606 free (tmp);
607 }
608 else {
609 ret= -ENOMEM;
610 }
611
612 WSREP_ERROR ("Could not prepare state transfer request: "
613 "adding default port failed: %zd.", ret);
614 }
615 else {
616 *addr_out= addr_in;
617 }
618
619 return ret;
620 }
621
622 static enum Wsrep_SE_init_result SE_initialized= WSREP_SE_INIT_RESULT_NONE;
623
wsrep_sst_prepare(void ** msg)624 ssize_t wsrep_sst_prepare (void** msg)
625 {
626 const ssize_t ip_max= 256;
627 char ip_buf[ip_max];
628 const char* addr_in= NULL;
629 const char* addr_out= NULL;
630
631 if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
632 {
633 ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
634 *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
635 if (!msg)
636 {
637 WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
638 unireg_abort(1);
639 }
640 return ret;
641 }
642
643 // Figure out SST address. Common for all SST methods
644 if (wsrep_sst_receive_address &&
645 strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
646 {
647 addr_in= wsrep_sst_receive_address;
648 }
649 else if (wsrep_node_address && strlen(wsrep_node_address))
650 {
651 size_t const addr_len= strlen(wsrep_node_address);
652 size_t const host_len= wsrep_host_len(wsrep_node_address, addr_len);
653
654 if (host_len < addr_len)
655 {
656 strncpy (ip_buf, wsrep_node_address, host_len);
657 ip_buf[host_len]= '\0';
658 addr_in= ip_buf;
659 }
660 else
661 {
662 addr_in= wsrep_node_address;
663 }
664 }
665 else
666 {
667 ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
668
669 if (ret && ret < ip_max)
670 {
671 addr_in= ip_buf;
672 }
673 else
674 {
675 WSREP_ERROR("Could not prepare state transfer request: "
676 "failed to guess address to accept state transfer at. "
677 "wsrep_sst_receive_address must be set manually.");
678 unireg_abort(1);
679 }
680 }
681
682 ssize_t addr_len= -ENOSYS;
683 if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP))
684 {
685 addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
686 if (addr_len < 0) unireg_abort(1);
687 }
688 else
689 {
690 /*! A heuristic workaround until we learn how to stop and start engines */
691 if (SE_initialized)
692 {
693 // we already did SST at initializaiton, now engines are running
694 // sql_print_information() is here because the message is too long
695 // for WSREP_INFO.
696 sql_print_information ("WSREP: "
697 "You have configured '%s' state snapshot transfer method "
698 "which cannot be performed on a running server. "
699 "Wsrep provider won't be able to fall back to it "
700 "if other means of state transfer are unavailable. "
701 "In that case you will need to restart the server.",
702 wsrep_sst_method);
703 *msg = 0;
704 return 0;
705 }
706
707 addr_len = sst_prepare_other (wsrep_sst_method, sst_auth_real,
708 addr_in, &addr_out);
709 if (addr_len < 0)
710 {
711 WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
712 wsrep_sst_method);
713 unireg_abort(1);
714 }
715 }
716
717 size_t const method_len(strlen(wsrep_sst_method));
718 size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/);
719
720 *msg = malloc (msg_len);
721 if (NULL != *msg) {
722 char* const method_ptr(reinterpret_cast<char*>(*msg));
723 strcpy (method_ptr, wsrep_sst_method);
724 char* const addr_ptr(method_ptr + method_len + 1);
725 strcpy (addr_ptr, addr_out);
726
727 WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr);
728 }
729 else {
730 WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
731 msg_len);
732 unireg_abort(1);
733 }
734
735 if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
736
737 return msg_len;
738 }
739
740 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)741 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
742 {
743 int ret = 0;
744
745 for (int tries=1; tries <= max_tries; tries++)
746 {
747 wsp::process proc (cmd_str, "r", env);
748
749 if (NULL != proc.pipe())
750 {
751 proc.wait();
752 }
753
754 if ((ret = proc.error()))
755 {
756 WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
757 tries, max_tries, proc.cmd(), ret, strerror(ret));
758 sleep (1);
759 }
760 else
761 {
762 WSREP_DEBUG("SST script successfully completed.");
763 break;
764 }
765 }
766
767 return -ret;
768 }
769
sst_reject_queries(my_bool close_conn)770 static void sst_reject_queries(my_bool close_conn)
771 {
772 wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
773 WSREP_INFO("Rejecting client queries for the duration of SST.");
774 if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
775 }
776
sst_donate_mysqldump(const char * addr,const wsrep_uuid_t * uuid,const char * uuid_str,wsrep_seqno_t seqno,bool bypass,char ** env)777 static int sst_donate_mysqldump (const char* addr,
778 const wsrep_uuid_t* uuid,
779 const char* uuid_str,
780 wsrep_seqno_t seqno,
781 bool bypass,
782 char** env) // carries auth info
783 {
784 int const cmd_len= 4096;
785 wsp::string cmd_str(cmd_len);
786
787 if (!cmd_str())
788 {
789 WSREP_ERROR("sst_donate_mysqldump(): "
790 "could not allocate cmd buffer of %d bytes", cmd_len);
791 return -ENOMEM;
792 }
793
794 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
795
796 int ret= snprintf (cmd_str(), cmd_len,
797 "wsrep_sst_mysqldump "
798 WSREP_SST_OPT_ADDR" '%s' "
799 WSREP_SST_OPT_LPORT" '%u' "
800 WSREP_SST_OPT_SOCKET" '%s' "
801 WSREP_SST_OPT_CONF" '%s' "
802 WSREP_SST_OPT_GTID" '%s:%lld'"
803 "%s",
804 addr, mysqld_port, mysqld_unix_port,
805 wsrep_defaults_file, uuid_str,
806 (long long)seqno, bypass ? " " WSREP_SST_OPT_BYPASS : "");
807
808 if (ret < 0 || ret >= cmd_len)
809 {
810 WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
811 return (ret < 0 ? ret : -EMSGSIZE);
812 }
813
814 WSREP_DEBUG("Running: '%s'", cmd_str());
815
816 ret= sst_run_shell (cmd_str(), env, 3);
817
818 wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
819
820 wsrep->sst_sent (wsrep, &state_id, ret);
821
822 return ret;
823 }
824
825 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
826
run_sql_command(THD * thd,const char * query)827 static int run_sql_command(THD *thd, const char *query)
828 {
829 thd->set_query((char *)query, strlen(query));
830
831 Parser_state ps;
832 if (ps.init(thd, thd->query(), thd->query_length()))
833 {
834 WSREP_ERROR("SST query: %s failed", query);
835 return -1;
836 }
837
838 mysql_parse(thd, thd->query(), thd->query_length(), &ps);
839 if (thd->is_error())
840 {
841 int const err= thd->get_stmt_da()->sql_errno();
842 WSREP_WARN ("error executing '%s': %d (%s)%s",
843 query, err, thd->get_stmt_da()->message(),
844 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
845 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
846 thd->clear_error();
847 return -1;
848 }
849 return 0;
850 }
851
sst_flush_tables(THD * thd)852 static int sst_flush_tables(THD* thd)
853 {
854 WSREP_INFO("Flushing tables for SST...");
855
856 int err;
857 int not_used;
858 if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
859 {
860 WSREP_ERROR("Failed to flush and lock tables");
861 err = -1;
862 }
863 else
864 {
865 /* make sure logs are flushed after global read lock acquired */
866 err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
867 (TABLE_LIST*) 0, ¬_used);
868 }
869
870 if (err)
871 {
872 WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err));
873 }
874 else
875 {
876 WSREP_INFO("Tables flushed.");
877 const char base_name[]= "tables_flushed";
878 ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
879 char *real_name = (char*) malloc(full_len);
880 sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
881 char *tmp_name = (char*) malloc(full_len + 4);
882 sprintf(tmp_name, "%s.tmp", real_name);
883
884 FILE* file= fopen(tmp_name, "w+");
885 if (0 == file)
886 {
887 err= errno;
888 WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
889 }
890 else
891 {
892 fprintf(file, "%s:%lld\n",
893 wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno);
894 fsync(fileno(file));
895 fclose(file);
896 if (rename(tmp_name, real_name) == -1)
897 {
898 err= errno;
899 WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
900 tmp_name, real_name, err,strerror(err));
901 }
902 }
903 free(real_name);
904 free(tmp_name);
905 }
906
907 return err;
908 }
909
sst_disallow_writes(THD * thd,bool yes)910 static void sst_disallow_writes (THD* thd, bool yes)
911 {
912 char query_str[64] = { 0, };
913 ssize_t const query_max = sizeof(query_str) - 1;
914 snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
915 yes ? 1 : 0);
916
917 if (run_sql_command(thd, query_str))
918 {
919 WSREP_ERROR("Failed to disallow InnoDB writes");
920 }
921 }
922
sst_donor_thread(void * a)923 static void* sst_donor_thread (void* a)
924 {
925 sst_thread_arg* arg= (sst_thread_arg*)a;
926
927 WSREP_INFO("Running: '%s'", arg->cmd);
928
929 int err= 1;
930 bool locked= false;
931
932 const char* out= NULL;
933 const size_t out_len= 128;
934 char out_buf[out_len];
935
936 wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED;
937 wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST
938
939 wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
940 // operate with wsrep_ready == OFF
941 wsp::process proc(arg->cmd, "r", arg->env);
942
943 err= proc.error();
944
945 /* Inform server about SST script startup and release TO isolation */
946 mysql_mutex_lock (&arg->lock);
947 arg->err = -err;
948 mysql_cond_signal (&arg->cond);
949 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
950
951 if (proc.pipe() && !err)
952 {
953 wait_signal:
954 out= my_fgets (out_buf, out_len, proc.pipe());
955
956 if (out)
957 {
958 const char magic_flush[]= "flush tables";
959 const char magic_cont[]= "continue";
960 const char magic_done[]= "done";
961
962 if (!strcasecmp (out, magic_flush))
963 {
964 err= sst_flush_tables (thd.ptr);
965 if (!err)
966 {
967 sst_disallow_writes (thd.ptr, true);
968 locked= true;
969 goto wait_signal;
970 }
971 }
972 else if (!strcasecmp (out, magic_cont))
973 {
974 if (locked)
975 {
976 sst_disallow_writes (thd.ptr, false);
977 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
978 locked= false;
979 }
980 err= 0;
981 goto wait_signal;
982 }
983 else if (!strncasecmp (out, magic_done, strlen(magic_done)))
984 {
985 err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
986 &ret_uuid, &ret_seqno);
987 }
988 else
989 {
990 WSREP_WARN("Received unknown signal: '%s'", out);
991 }
992 }
993 else
994 {
995 WSREP_ERROR("Failed to read from: %s", proc.cmd());
996 proc.wait();
997 }
998 if (!err && proc.error()) err= proc.error();
999 }
1000 else
1001 {
1002 WSREP_ERROR("Failed to execute: %s : %d (%s)",
1003 proc.cmd(), err, strerror(err));
1004 }
1005
1006 if (locked) // don't forget to unlock server before return
1007 {
1008 sst_disallow_writes (thd.ptr, false);
1009 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1010 }
1011
1012 // signal to donor that SST is over
1013 struct wsrep_gtid const state_id = {
1014 ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
1015 };
1016 wsrep->sst_sent (wsrep, &state_id, -err);
1017 proc.wait();
1018
1019 return NULL;
1020 }
1021
1022
1023
sst_donate_other(const char * method,const char * addr,const char * uuid,wsrep_seqno_t seqno,bool bypass,char ** env)1024 static int sst_donate_other (const char* method,
1025 const char* addr,
1026 const char* uuid,
1027 wsrep_seqno_t seqno,
1028 bool bypass,
1029 char** env) // carries auth info
1030 {
1031 int const cmd_len= 4096;
1032 wsp::string cmd_str(cmd_len);
1033
1034 if (!cmd_str())
1035 {
1036 WSREP_ERROR("sst_donate_other(): "
1037 "could not allocate cmd buffer of %d bytes", cmd_len);
1038 return -ENOMEM;
1039 }
1040
1041 const char* binlog_opt= "";
1042 char* binlog_opt_val= NULL;
1043
1044 int ret;
1045 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1046 {
1047 WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1048 return ret;
1049 }
1050 if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
1051
1052 ret= snprintf (cmd_str(), cmd_len,
1053 "wsrep_sst_%s "
1054 WSREP_SST_OPT_ROLE" 'donor' "
1055 WSREP_SST_OPT_ADDR" '%s' "
1056 WSREP_SST_OPT_SOCKET" '%s' "
1057 WSREP_SST_OPT_DATA" '%s' "
1058 WSREP_SST_OPT_CONF" '%s' "
1059 WSREP_SST_OPT_CONF_SUFFIX" '%s' "
1060 " %s '%s' "
1061 WSREP_SST_OPT_GTID" '%s:%lld'"
1062 "%s",
1063 method, addr, mysqld_unix_port, mysql_real_data_home,
1064 wsrep_defaults_file, wsrep_defaults_group_suffix,
1065 binlog_opt, binlog_opt_val,
1066 uuid, (long long) seqno,
1067 bypass ? " " WSREP_SST_OPT_BYPASS : "");
1068 my_free(binlog_opt_val);
1069
1070 if (ret < 0 || ret >= cmd_len)
1071 {
1072 WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1073 return (ret < 0 ? ret : -EMSGSIZE);
1074 }
1075
1076 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1077
1078 pthread_t tmp;
1079 sst_thread_arg arg(cmd_str(), env);
1080 mysql_mutex_lock (&arg.lock);
1081 ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg);
1082 if (ret)
1083 {
1084 WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)",
1085 ret, strerror(ret));
1086 return ret;
1087 }
1088 mysql_cond_wait (&arg.cond, &arg.lock);
1089
1090 WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1091 return arg.err;
1092 }
1093
1094 /* return true if character can be a part of a filename */
filename_char(int const c)1095 static bool filename_char(int const c)
1096 {
1097 return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1098 }
1099
1100 /* return true if character can be a part of an address string */
address_char(int const c)1101 static bool address_char(int const c)
1102 {
1103 return filename_char(c) ||
1104 (c == ':') || (c == '[') || (c == ']') || (c == '/');
1105 }
1106
check_request_str(const char * const str,bool (* check)(int c))1107 static bool check_request_str(const char* const str,
1108 bool (*check) (int c))
1109 {
1110 for (size_t i(0); str[i] != '\0'; ++i)
1111 {
1112 if (!check(str[i]))
1113 {
1114 WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1115 str[i], str[i]);
1116 return true;
1117 }
1118 }
1119
1120 return false;
1121 }
1122
wsrep_sst_donate_cb(void * app_ctx,void * recv_ctx,const void * msg,size_t msg_len,const wsrep_gtid_t * current_gtid,const char * state,size_t state_len,bool bypass)1123 wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
1124 const void* msg, size_t msg_len,
1125 const wsrep_gtid_t* current_gtid,
1126 const char* state, size_t state_len,
1127 bool bypass)
1128 {
1129 const char* method = (char*)msg;
1130 size_t method_len = strlen (method);
1131
1132 if (check_request_str(method, filename_char))
1133 {
1134 WSREP_ERROR("Bad SST method name. SST canceled.");
1135 return WSREP_CB_FAILURE;
1136 }
1137
1138 const char* data = method + method_len + 1;
1139
1140 if (check_request_str(data, address_char))
1141 {
1142 WSREP_ERROR("Bad SST address string. SST canceled.");
1143 return WSREP_CB_FAILURE;
1144 }
1145
1146 char uuid_str[37];
1147 wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str));
1148
1149 /* This will be reset when sync callback is called.
1150 * Should we set wsrep_ready to FALSE here too? */
1151 local_status.set(WSREP_MEMBER_DONOR);
1152
1153 wsp::env env(NULL);
1154 if (env.error())
1155 {
1156 WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1157 return WSREP_CB_FAILURE;
1158 }
1159
1160 int ret;
1161 if ((ret= sst_append_auth_env(env, sst_auth_real)))
1162 {
1163 WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1164 return WSREP_CB_FAILURE;
1165 }
1166
1167 if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1168 {
1169 ret = sst_donate_mysqldump(data, ¤t_gtid->uuid, uuid_str,
1170 current_gtid->seqno, bypass, env());
1171 }
1172 else
1173 {
1174 ret = sst_donate_other(method, data, uuid_str,
1175 current_gtid->seqno, bypass, env());
1176 }
1177
1178 return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
1179 }
1180
wsrep_SE_init_wait()1181 enum Wsrep_SE_init_result wsrep_SE_init_wait()
1182 {
1183 mysql_mutex_lock (&LOCK_wsrep_sst_init);
1184 while (SE_initialized == WSREP_SE_INIT_RESULT_NONE)
1185 {
1186 mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
1187 }
1188 enum Wsrep_SE_init_result ret= SE_initialized;
1189 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1190 return ret;
1191 }
1192
wsrep_SE_initialized(enum Wsrep_SE_init_result result)1193 void wsrep_SE_initialized(enum Wsrep_SE_init_result result)
1194 {
1195 mysql_mutex_lock (&LOCK_wsrep_sst_init);
1196 if (SE_initialized == WSREP_SE_INIT_RESULT_NONE)
1197 {
1198 SE_initialized= result;
1199 }
1200 mysql_cond_signal (&COND_wsrep_sst_init);
1201 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1202 }
1203