1 /* Copyright 2008-2015 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
15
16
17 #include <mysqld.h>
18 #include "wsrep_sst.h"
19 #include <sql_class.h>
20 #include <set_var.h>
21 #include <sql_acl.h>
22 #include <sql_reload.h>
23 #include <sql_parse.h>
24 #include "wsrep_priv.h"
25 #include "wsrep_utils.h"
26 #include "wsrep_xid.h"
27 #include <cstdio>
28 #include <cstdlib>
29 #include <cctype>
30
31 extern const char wsrep_defaults_file[];
32 extern const char wsrep_defaults_group_suffix[];
33
34 #define WSREP_SST_OPT_ROLE "--role"
35 #define WSREP_SST_OPT_ADDR "--address"
36 #define WSREP_SST_OPT_AUTH "--auth"
37 #define WSREP_SST_OPT_DATA "--datadir"
38 #define WSREP_SST_OPT_CONF "--defaults-file"
39 #define WSREP_SST_OPT_CONF_SUFFIX "--defaults-group-suffix"
40 #define WSREP_SST_OPT_PARENT "--parent"
41 #define WSREP_SST_OPT_BINLOG "--binlog"
42
43 // mysqldump-specific options
44 #define WSREP_SST_OPT_USER "--user"
45 #define WSREP_SST_OPT_PSWD "--password"
46 #define WSREP_SST_OPT_HOST "--host"
47 #define WSREP_SST_OPT_PORT "--port"
48 #define WSREP_SST_OPT_LPORT "--local-port"
49
50 // donor-specific
51 #define WSREP_SST_OPT_SOCKET "--socket"
52 #define WSREP_SST_OPT_GTID "--gtid"
53 #define WSREP_SST_OPT_BYPASS "--bypass"
54
55 #define WSREP_SST_MYSQLDUMP "mysqldump"
56 #define WSREP_SST_RSYNC "rsync"
57 #define WSREP_SST_SKIP "skip"
58 #define WSREP_SST_XTRABACKUP "xtrabackup"
59 #define WSREP_SST_XTRABACKUP_V2 "xtrabackup-v2"
60 #define WSREP_SST_DEFAULT WSREP_SST_RSYNC
61 #define WSREP_SST_ADDRESS_AUTO "AUTO"
62 #define WSREP_SST_AUTH_MASK "********"
63
64 const char* wsrep_sst_method = WSREP_SST_DEFAULT;
65 const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
66 const char* wsrep_sst_donor = "";
67 char* wsrep_sst_auth = NULL;
68
69 // container for real auth string
70 static const char* sst_auth_real = NULL;
71 my_bool wsrep_sst_donor_rejects_queries = FALSE;
72
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)73 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
74 {
75 char buff[FN_REFLEN];
76 String str(buff, sizeof(buff), system_charset_info), *res;
77 const char* c_str = NULL;
78
79 if ((res = var->value->val_str(&str)) &&
80 (c_str = res->c_ptr()) &&
81 strlen(c_str) > 0)
82 return 0;
83
84 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL");
85 return 1;
86 }
87
wsrep_sst_method_update(sys_var * self,THD * thd,enum_var_type type)88 bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type)
89 {
90 return 0;
91 }
92
sst_receive_address_check(const char * str)93 static bool sst_receive_address_check (const char* str)
94 {
95 if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) ||
96 !strncasecmp(str, "localhost", strlen("localhost")))
97 {
98 return 1;
99 }
100
101 return 0;
102 }
103
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)104 bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
105 {
106 const char* c_str = var->value->str_value.c_ptr();
107
108 if (sst_receive_address_check (c_str))
109 {
110 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL");
111 return 1;
112 }
113
114 return 0;
115 }
116
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)117 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
118 enum_var_type type)
119 {
120 return 0;
121 }
122
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)123 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
124 {
125 return 0;
126 }
sst_auth_real_set(const char * value)127 static bool sst_auth_real_set (const char* value)
128 {
129 const char* v = strdup (value);
130
131 if (v)
132 {
133 if (sst_auth_real) free (const_cast<char*>(sst_auth_real));
134 sst_auth_real = v;
135
136 if (strlen(sst_auth_real))
137 {
138 if (wsrep_sst_auth)
139 {
140 my_free ((void*)wsrep_sst_auth);
141 wsrep_sst_auth = my_strdup(key_memory_wsrep,
142 WSREP_SST_AUTH_MASK, MYF(0));
143 }
144 else
145 wsrep_sst_auth = my_strdup (key_memory_wsrep,
146 WSREP_SST_AUTH_MASK, MYF(0));
147 }
148 return 0;
149 }
150
151 return 1;
152 }
153
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)154 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
155 {
156 return sst_auth_real_set (wsrep_sst_auth);
157 }
158
wsrep_sst_auth_init(const char * value)159 void wsrep_sst_auth_init (const char* value)
160 {
161 if (wsrep_sst_auth == value) wsrep_sst_auth = NULL;
162 if (value) sst_auth_real_set (value);
163 }
164
wsrep_sst_auth_free()165 void wsrep_sst_auth_free()
166 {
167 if (wsrep_sst_auth) { my_free ((void*)wsrep_sst_auth); }
168 if (sst_auth_real) { free (const_cast<char*>(sst_auth_real)); }
169 wsrep_sst_auth= NULL;
170 sst_auth_real= NULL;
171 }
172
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)173 bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
174 {
175 return 0;
176 }
177
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)178 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
179 {
180 return 0;
181 }
182
wsrep_before_SE()183 bool wsrep_before_SE()
184 {
185 return (wsrep_provider != NULL
186 && strcmp (wsrep_provider, WSREP_NONE)
187 && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
188 && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
189 }
190
191 static bool sst_complete = false;
192 static bool sst_needed = false;
193
wsrep_sst_grab()194 void wsrep_sst_grab ()
195 {
196 WSREP_INFO("wsrep_sst_grab()");
197 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
198 sst_complete = false;
199 mysql_mutex_unlock (&LOCK_wsrep_sst);
200 }
201
202 // Wait for end of SST
wsrep_sst_wait()203 bool wsrep_sst_wait ()
204 {
205 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
206 while (!sst_complete)
207 {
208 WSREP_INFO("Waiting for SST to complete.");
209 mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst);
210 }
211
212 if (local_seqno >= 0)
213 {
214 WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
215 }
216 else
217 {
218 WSREP_ERROR("SST failed: %d (%s)",
219 int(-local_seqno), strerror(-local_seqno));
220 }
221
222 mysql_mutex_unlock (&LOCK_wsrep_sst);
223
224 return (local_seqno >= 0);
225 }
226
227 // Signal end of SST
wsrep_sst_complete(const wsrep_uuid_t * sst_uuid,wsrep_seqno_t sst_seqno,bool needed)228 void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
229 wsrep_seqno_t sst_seqno,
230 bool needed)
231 {
232 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
233 if (!sst_complete)
234 {
235 sst_complete = true;
236 sst_needed = needed;
237 local_uuid = *sst_uuid;
238 local_seqno = sst_seqno;
239 mysql_cond_signal (&COND_wsrep_sst);
240 }
241 else
242 {
243 /* This can happen when called from wsrep_synced_cb().
244 At the moment there is no way to check there
245 if main thread is still waiting for signal,
246 so wsrep_sst_complete() is called from there
247 each time wsrep_ready changes from FALSE -> TRUE.
248 */
249 WSREP_DEBUG("Nobody is waiting for SST.");
250 }
251 mysql_mutex_unlock (&LOCK_wsrep_sst);
252 }
253
wsrep_sst_received(wsrep_t * const wsrep,const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno,const void * const state,size_t const state_len)254 void wsrep_sst_received (wsrep_t* const wsrep,
255 const wsrep_uuid_t& uuid,
256 wsrep_seqno_t const seqno,
257 const void* const state,
258 size_t const state_len)
259 {
260 wsrep_get_SE_checkpoint(local_uuid, local_seqno);
261
262 if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
263 local_seqno < seqno || seqno < 0)
264 {
265 wsrep_set_SE_checkpoint(uuid, seqno);
266 local_uuid = uuid;
267 local_seqno = seqno;
268 }
269 else if (local_seqno > seqno)
270 {
271 WSREP_WARN("SST postion is in the past: %lld, current: %lld. "
272 "Can't continue.",
273 (long long)seqno, (long long)local_seqno);
274 unireg_abort(1);
275 }
276
277 wsrep_init_sidno(uuid);
278
279 if (wsrep)
280 {
281 int const rcode(seqno < 0 ? seqno : 0);
282 wsrep_gtid_t const state_id = {
283 uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
284 };
285
286 wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
287 }
288 }
289
290 // Let applier threads to continue
wsrep_sst_continue()291 void wsrep_sst_continue ()
292 {
293 if (sst_needed)
294 {
295 WSREP_INFO("Signalling provider to continue.");
296 // local_uuid and local_seqno are global variables and are volatile
297 wsrep_uuid_t const sst_uuid = local_uuid;
298 wsrep_seqno_t const sst_seqno = local_seqno;
299 wsrep_sst_received (wsrep, sst_uuid, sst_seqno, NULL, 0);
300 }
301 }
302
303 struct sst_thread_arg
304 {
305 const char* cmd;
306 char** env;
307 char* ret_str;
308 int err;
309 mysql_mutex_t lock;
310 mysql_cond_t cond;
311
sst_thread_argsst_thread_arg312 sst_thread_arg (const char* c, char** e)
313 : cmd(c), env(e), ret_str(0), err(-1)
314 {
315 mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
316 mysql_cond_init(key_COND_wsrep_sst_thread, &cond);
317 }
318
~sst_thread_argsst_thread_arg319 ~sst_thread_arg()
320 {
321 mysql_cond_destroy (&cond);
322 mysql_mutex_unlock (&lock);
323 mysql_mutex_destroy (&lock);
324 }
325 };
326
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)327 static int sst_scan_uuid_seqno (const char* str,
328 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
329 {
330 int offt = wsrep_uuid_scan (str, strlen(str), uuid);
331 if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
332 {
333 *seqno = strtoll (str + offt + 1, NULL, 10);
334 if (*seqno != LLONG_MAX || errno != ERANGE)
335 {
336 return 0;
337 }
338 }
339
340 WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
341 return EINVAL;
342 }
343
344 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)345 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
346 {
347 char* ret= fgets (buf, buf_len, stream);
348
349 if (ret)
350 {
351 size_t len = strlen(ret);
352 if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
353 }
354
355 return ret;
356 }
357
358 /*
359 Generate opt_binlog_opt_val for sst_donate_other(), sst_prepare_other().
360
361 Returns zero on success, negative error code otherwise.
362
363 String containing binlog name is stored in param ret if binlog is enabled
364 and GTID mode is on, otherwise empty string. Returned string should be
365 freed with my_free().
366 */
generate_binlog_opt_val(char ** ret)367 static int generate_binlog_opt_val(char** ret)
368 {
369 assert(ret);
370 *ret= NULL;
371 enum_gtid_mode gtid_mode = get_gtid_mode(GTID_MODE_LOCK_NONE);
372 if (opt_bin_log && gtid_mode > 0)
373 {
374 assert(opt_bin_logname);
375 *ret= my_strdup(key_memory_wsrep, opt_bin_logname, MYF(0));
376 }
377 else
378 {
379 *ret= my_strdup(key_memory_wsrep, "", MYF(0));
380 }
381 if (!*ret) return -ENOMEM;
382 return 0;
383 }
384
sst_joiner_thread(void * a)385 static void* sst_joiner_thread (void* a)
386 {
387 sst_thread_arg* arg= (sst_thread_arg*) a;
388 int err= 1;
389
390 {
391 const char magic[] = "ready";
392 const size_t magic_len = sizeof(magic) - 1;
393 const size_t out_len = 512;
394 char out[out_len];
395
396 WSREP_INFO("Running: '%s'", arg->cmd);
397
398 wsp::process proc (arg->cmd, "r", arg->env);
399
400 if (proc.pipe() && !proc.error())
401 {
402 const char* tmp= my_fgets (out, out_len, proc.pipe());
403
404 if (!tmp || strlen(tmp) < (magic_len + 2) ||
405 strncasecmp (tmp, magic, magic_len))
406 {
407 WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
408 magic, arg->cmd, tmp);
409 proc.wait();
410 if (proc.error()) err = proc.error();
411 }
412 else
413 {
414 err = 0;
415 }
416 }
417 else
418 {
419 err = proc.error();
420 WSREP_ERROR("Failed to execute: %s : %d (%s)",
421 arg->cmd, err, strerror(err));
422 }
423
424 // signal sst_prepare thread with ret code,
425 // it will go on sending SST request
426 mysql_mutex_lock (&arg->lock);
427 if (!err)
428 {
429 arg->ret_str = strdup (out + magic_len + 1);
430 if (!arg->ret_str) err = ENOMEM;
431 }
432 arg->err = -err;
433 mysql_cond_signal (&arg->cond);
434 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
435
436 if (err) return NULL; /* lp:808417 - return immediately, don't signal
437 * initializer thread to ensure single thread of
438 * shutdown. */
439
440 wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
441 wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
442
443 // in case of successfull receiver start, wait for SST completion/end
444 char* tmp = my_fgets (out, out_len, proc.pipe());
445
446 proc.wait();
447 err= EINVAL;
448
449 if (!tmp)
450 {
451 WSREP_ERROR("Failed to read uuid:seqno from joiner script.");
452 if (proc.error()) err = proc.error();
453 }
454 else
455 {
456 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
457 }
458
459 if (err)
460 {
461 ret_uuid= WSREP_UUID_UNDEFINED;
462 ret_seqno= -err;
463 }
464
465 // Tell initializer thread that SST is complete
466 wsrep_sst_complete (&ret_uuid, ret_seqno, true);
467 }
468
469 return NULL;
470 }
471
472 #define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH"
473 #define WSREP_SST_REMOTE_AUTH_ENV "WSREP_SST_OPT_REMOTE_AUTH"
474 #define DATA_HOME_DIR_ENV "INNODB_DATA_HOME_DIR"
475
sst_append_env_var(wsp::env & env,const char * const var,const char * const val)476 static int sst_append_env_var(wsp::env& env,
477 const char* const var,
478 const char* const val)
479 {
480 int const env_str_size= strlen(var) + 1 /* = */
481 + (val ? strlen(val) : 0) + 1 /* \0 */;
482
483 wsp::string env_str(env_str_size); // for automatic cleanup on return
484 if (!env_str()) return -ENOMEM;
485
486 int ret= snprintf(env_str(), env_str_size, "%s=%s", var, val ? val : "");
487
488 if (ret < 0 || ret >= env_str_size)
489 {
490 WSREP_ERROR("sst_append_env_var(): snprintf(%s=%s) failed: %d",
491 var, val, ret);
492 return (ret < 0 ? ret : -EMSGSIZE);
493 }
494
495 env.append(env_str());
496 return -env.error();
497 }
498
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)499 static ssize_t sst_prepare_other (const char* method,
500 const char* sst_auth,
501 const char* addr_in,
502 const char** addr_out)
503 {
504 int const cmd_len= 4096;
505 wsp::string cmd_str(cmd_len);
506
507 if (!cmd_str())
508 {
509 WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %d bytes",
510 cmd_len);
511 return -ENOMEM;
512 }
513
514 const char* binlog_opt= "";
515 char* binlog_opt_val= NULL;
516
517 int ret;
518 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
519 {
520 WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
521 ret);
522 return ret;
523 }
524 if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
525
526 ret= snprintf (cmd_str(), cmd_len,
527 "wsrep_sst_%s "
528 WSREP_SST_OPT_ROLE" 'joiner' "
529 WSREP_SST_OPT_ADDR" '%s' "
530 WSREP_SST_OPT_DATA" '%s' "
531 WSREP_SST_OPT_CONF" '%s' "
532 WSREP_SST_OPT_CONF_SUFFIX" '%s' "
533 WSREP_SST_OPT_PARENT" '%d'"
534 " %s '%s' ",
535 method, addr_in, mysql_real_data_home,
536 wsrep_defaults_file, wsrep_defaults_group_suffix,
537 (int)getpid(), binlog_opt, binlog_opt_val);
538 my_free(binlog_opt_val);
539
540 if (ret < 0 || ret >= cmd_len)
541 {
542 WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
543 return (ret < 0 ? ret : -EMSGSIZE);
544 }
545
546 wsp::env env(NULL);
547 if (env.error())
548 {
549 WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
550 return -env.error();
551 }
552
553 if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth)))
554 {
555 WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
556 return ret;
557 }
558
559 pthread_t tmp;
560 sst_thread_arg arg(cmd_str(), env());
561 mysql_mutex_lock (&arg.lock);
562 ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
563 if (ret)
564 {
565 WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)",
566 ret, strerror(ret));
567 return -ret;
568 }
569 mysql_cond_wait (&arg.cond, &arg.lock);
570
571 *addr_out= arg.ret_str;
572
573 if (!arg.err)
574 ret = strlen(*addr_out);
575 else
576 {
577 assert (arg.err < 0);
578 ret = arg.err;
579 }
580
581 pthread_detach (tmp);
582
583 return ret;
584 }
585
586 extern uint mysqld_port;
587
588 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)589 static ssize_t sst_prepare_mysqldump (const char* addr_in,
590 const char** addr_out)
591 {
592 ssize_t ret = strlen (addr_in);
593
594 if (!strrchr(addr_in, ':'))
595 {
596 ssize_t s = ret + 7;
597 char* tmp = (char*) malloc (s);
598
599 if (tmp)
600 {
601 ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
602
603 if (ret > 0 && ret < s)
604 {
605 *addr_out= tmp;
606 return ret;
607 }
608 if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
609 free (tmp);
610 }
611 else {
612 ret= -ENOMEM;
613 }
614
615 WSREP_ERROR ("Could not prepare state transfer request: "
616 "adding default port failed: %zd.", ret);
617 }
618 else {
619 *addr_out= addr_in;
620 }
621
622 return ret;
623 }
624
625 static enum Wsrep_SE_init_result SE_initialized= WSREP_SE_INIT_RESULT_NONE;
626
wsrep_sst_prepare(void ** msg,THD * thd)627 ssize_t wsrep_sst_prepare (void** msg, THD *thd)
628 {
629 const ssize_t ip_max= 256;
630 char ip_buf[ip_max];
631 const char* addr_in= NULL;
632 const char* addr_out= NULL;
633
634 if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
635 {
636 ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
637 *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
638 if (!msg)
639 {
640 WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
641 unireg_abort(1);
642 }
643 return ret;
644 }
645
646 // Figure out SST address. Common for all SST methods
647 if (wsrep_sst_receive_address &&
648 strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
649 {
650 addr_in= wsrep_sst_receive_address;
651 }
652 else if (wsrep_node_address && strlen(wsrep_node_address))
653 {
654 size_t const addr_len= strlen(wsrep_node_address);
655 size_t const host_len= wsrep_host_len(wsrep_node_address, addr_len);
656
657 if (host_len < addr_len)
658 {
659 strncpy (ip_buf, wsrep_node_address, host_len);
660 ip_buf[host_len]= '\0';
661 addr_in= ip_buf;
662 }
663 else
664 {
665 addr_in= wsrep_node_address;
666 }
667 }
668 else
669 {
670 ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
671
672 if (ret && ret < ip_max)
673 {
674 addr_in= ip_buf;
675 }
676 else
677 {
678 WSREP_ERROR("Could not prepare state transfer request: "
679 "failed to guess address to accept state transfer at. "
680 "wsrep_sst_receive_address must be set manually.");
681 if (thd) delete thd;
682 unireg_abort(1);
683 }
684 }
685
686 ssize_t addr_len= -ENOSYS;
687 if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP))
688 {
689 addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
690 if (addr_len < 0) unireg_abort(1);
691 }
692 else
693 {
694 /*! A heuristic workaround until we learn how to stop and start engines */
695 if (SE_initialized)
696 {
697 // we already did SST at initializaiton, now engines are running
698 // sql_print_information() is here because the message is too long
699 // for WSREP_INFO.
700 sql_print_information ("WSREP: "
701 "You have configured '%s' state snapshot transfer method "
702 "which cannot be performed on a running server. "
703 "Wsrep provider won't be able to fall back to it "
704 "if other means of state transfer are unavailable. "
705 "In that case you will need to restart the server.",
706 wsrep_sst_method);
707 *msg = 0;
708 return 0;
709 }
710
711 addr_len = sst_prepare_other (wsrep_sst_method, sst_auth_real,
712 addr_in, &addr_out);
713 if (addr_len < 0)
714 {
715 WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
716 wsrep_sst_method);
717 unireg_abort(1);
718 }
719 }
720
721 size_t const method_len(strlen(wsrep_sst_method));
722 size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/);
723
724 *msg = malloc (msg_len);
725 if (NULL != *msg) {
726 char* const method_ptr(static_cast<char*>(*msg));
727 strcpy (method_ptr, wsrep_sst_method);
728 char* const addr_ptr(method_ptr + method_len + 1);
729 strcpy (addr_ptr, addr_out);
730
731 WSREP_DEBUG("Prepared SST request: %s|%s", method_ptr, addr_ptr);
732 }
733 else {
734 WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
735 msg_len);
736 unireg_abort(1);
737 }
738
739 if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
740
741 return msg_len;
742 }
743
744 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)745 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
746 {
747 int ret = 0;
748
749 for (int tries=1; tries <= max_tries; tries++)
750 {
751 wsp::process proc (cmd_str, "r", env);
752
753 if (NULL != proc.pipe())
754 {
755 proc.wait();
756 }
757
758 if ((ret = proc.error()))
759 {
760 WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
761 tries, max_tries, proc.cmd(), ret, strerror(ret));
762 sleep (1);
763 }
764 else
765 {
766 WSREP_DEBUG("SST script successfully completed.");
767 break;
768 }
769 }
770
771 return -ret;
772 }
773
sst_reject_queries(my_bool close_conn)774 static void sst_reject_queries(my_bool close_conn)
775 {
776 wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
777 WSREP_INFO("Rejecting client queries for the duration of SST.");
778 if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
779 }
780
sst_donate_mysqldump(const char * addr,const wsrep_uuid_t * uuid,const char * uuid_str,wsrep_seqno_t seqno,bool bypass,char ** env)781 static int sst_donate_mysqldump (const char* addr,
782 const wsrep_uuid_t* uuid,
783 const char* uuid_str,
784 wsrep_seqno_t seqno,
785 bool bypass,
786 char** env) // carries auth info
787 {
788 int const cmd_len= 4096;
789 wsp::string cmd_str(cmd_len);
790
791 if (!cmd_str())
792 {
793 WSREP_ERROR("sst_donate_mysqldump(): "
794 "could not allocate cmd buffer of %d bytes", cmd_len);
795 return -ENOMEM;
796 }
797
798 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
799
800 int ret= snprintf (cmd_str(), cmd_len,
801 "wsrep_sst_mysqldump "
802 WSREP_SST_OPT_ADDR" '%s' "
803 WSREP_SST_OPT_LPORT" '%u' "
804 WSREP_SST_OPT_SOCKET" '%s' "
805 WSREP_SST_OPT_CONF" '%s' "
806 WSREP_SST_OPT_GTID" '%s:%lld'"
807 "%s",
808 addr, mysqld_port, mysqld_unix_port,
809 wsrep_defaults_file, uuid_str,
810 (long long)seqno, bypass ? " " WSREP_SST_OPT_BYPASS : "");
811
812 if (ret < 0 || ret >= cmd_len)
813 {
814 WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
815 return (ret < 0 ? ret : -EMSGSIZE);
816 }
817
818 WSREP_DEBUG("Running: '%s'", cmd_str());
819
820 ret= sst_run_shell (cmd_str(), env, 3);
821
822 wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
823
824 wsrep->sst_sent (wsrep, &state_id, ret);
825
826 return ret;
827 }
828
829 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
830
run_sql_command(THD * thd,const char * query)831 static int run_sql_command(THD *thd, const char *query)
832 {
833 thd->set_query((char *)query, strlen(query));
834
835 Parser_state ps;
836 if (ps.init(thd, thd->query().str, thd->query().length))
837 {
838 WSREP_ERROR("SST query: %s failed", query);
839 return -1;
840 }
841
842 mysql_parse(thd, &ps);
843 if (thd->is_error())
844 {
845 int const err= thd->get_stmt_da()->mysql_errno();
846 WSREP_WARN ("error executing '%s': %d (%s)%s",
847 query, err, thd->get_stmt_da()->message_text(),
848 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
849 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
850 thd->clear_error();
851 return -1;
852 }
853 return 0;
854 }
855
sst_flush_tables(THD * thd)856 static int sst_flush_tables(THD* thd)
857 {
858 WSREP_INFO("Flushing tables for SST...");
859
860 int err;
861 int not_used;
862 if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
863 {
864 WSREP_ERROR("Failed to flush and lock tables");
865 err = -1;
866 }
867 else
868 {
869 /* make sure logs are flushed after global read lock acquired */
870 err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
871 (TABLE_LIST*) 0, ¬_used);
872 }
873
874 if (err)
875 {
876 WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err));
877 }
878 else
879 {
880 WSREP_INFO("Tables flushed.");
881 const char base_name[]= "tables_flushed";
882 ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
883 char *real_name = (char*) malloc(full_len);
884 sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
885 char *tmp_name = (char*) malloc(full_len + 4);
886 sprintf(tmp_name, "%s.tmp", real_name);
887
888 FILE* file= fopen(tmp_name, "w+");
889 if (0 == file)
890 {
891 err= errno;
892 WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
893 }
894 else
895 {
896 fprintf(file, "%s:%lld\n",
897 wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno);
898 fsync(fileno(file));
899 fclose(file);
900 if (rename(tmp_name, real_name) == -1)
901 {
902 err= errno;
903 WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
904 tmp_name, real_name, err,strerror(err));
905 }
906 }
907 free(real_name);
908 free(tmp_name);
909 }
910
911 return err;
912 }
913
sst_disallow_writes(THD * thd,bool yes)914 static void sst_disallow_writes (THD* thd, bool yes)
915 {
916 char query_str[64] = { 0, };
917 ssize_t const query_max = sizeof(query_str) - 1;
918 snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
919 yes ? 1 : 0);
920
921 if (run_sql_command(thd, query_str))
922 {
923 WSREP_ERROR("Failed to disallow InnoDB writes");
924 }
925 }
926
sst_donor_thread(void * a)927 static void* sst_donor_thread (void* a)
928 {
929 sst_thread_arg* arg= (sst_thread_arg*)a;
930
931 WSREP_INFO("Running: '%s'", arg->cmd);
932
933 int err= 1;
934 bool locked= false;
935
936 const char* out= NULL;
937 const size_t out_len= 128;
938 char out_buf[out_len];
939
940 wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED;
941 wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST
942
943 wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
944 // operate with wsrep_ready == OFF
945 wsp::process proc(arg->cmd, "r", arg->env);
946
947 err= proc.error();
948
949 /* Inform server about SST script startup and release TO isolation */
950 mysql_mutex_lock (&arg->lock);
951 arg->err = -err;
952 mysql_cond_signal (&arg->cond);
953 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
954
955 if (proc.pipe() && !err)
956 {
957 wait_signal:
958 out= my_fgets (out_buf, out_len, proc.pipe());
959
960 if (out)
961 {
962 const char magic_flush[]= "flush tables";
963 const char magic_cont[]= "continue";
964 const char magic_done[]= "done";
965
966 if (!strcasecmp (out, magic_flush))
967 {
968 err= sst_flush_tables (thd.ptr);
969 if (!err)
970 {
971 sst_disallow_writes (thd.ptr, true);
972 locked= true;
973 goto wait_signal;
974 }
975 }
976 else if (!strcasecmp (out, magic_cont))
977 {
978 if (locked)
979 {
980 sst_disallow_writes (thd.ptr, false);
981 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
982 locked= false;
983 }
984 err= 0;
985 goto wait_signal;
986 }
987 else if (!strncasecmp (out, magic_done, strlen(magic_done)))
988 {
989 err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
990 &ret_uuid, &ret_seqno);
991 }
992 else
993 {
994 WSREP_WARN("Received unknown signal: '%s'", out);
995 }
996 }
997 else
998 {
999 WSREP_ERROR("Failed to read from: %s", proc.cmd());
1000 proc.wait();
1001 }
1002 if (!err && proc.error()) err= proc.error();
1003 }
1004 else
1005 {
1006 WSREP_ERROR("Failed to execute: %s : %d (%s)",
1007 proc.cmd(), err, strerror(err));
1008 }
1009
1010 if (locked) // don't forget to unlock server before return
1011 {
1012 sst_disallow_writes (thd.ptr, false);
1013 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1014 }
1015
1016 // signal to donor that SST is over
1017 struct wsrep_gtid const state_id = {
1018 ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
1019 };
1020 wsrep->sst_sent (wsrep, &state_id, -err);
1021 proc.wait();
1022
1023 return NULL;
1024 }
1025
1026
1027
sst_donate_other(const char * method,const char * addr,const char * uuid,wsrep_seqno_t seqno,bool bypass,char ** env)1028 static int sst_donate_other (const char* method,
1029 const char* addr,
1030 const char* uuid,
1031 wsrep_seqno_t seqno,
1032 bool bypass,
1033 char** env) // carries auth info
1034 {
1035 int const cmd_len= 4096;
1036 wsp::string cmd_str(cmd_len);
1037
1038 if (!cmd_str())
1039 {
1040 WSREP_ERROR("sst_donate_other(): "
1041 "could not allocate cmd buffer of %d bytes", cmd_len);
1042 return -ENOMEM;
1043 }
1044
1045 const char* binlog_opt= "";
1046 char* binlog_opt_val= NULL;
1047
1048 int ret;
1049 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1050 {
1051 WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1052 return ret;
1053 }
1054 if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
1055
1056 ret= snprintf (cmd_str(), cmd_len,
1057 "wsrep_sst_%s "
1058 WSREP_SST_OPT_ROLE" 'donor' "
1059 WSREP_SST_OPT_ADDR" '%s' "
1060 WSREP_SST_OPT_LPORT " '%u' "
1061 WSREP_SST_OPT_SOCKET" '%s' "
1062 WSREP_SST_OPT_DATA" '%s' "
1063 WSREP_SST_OPT_CONF" '%s' "
1064 WSREP_SST_OPT_CONF_SUFFIX" '%s' "
1065 " %s '%s' "
1066 WSREP_SST_OPT_GTID" '%s:%lld'"
1067 "%s",
1068 method, addr, mysqld_port, mysqld_unix_port,
1069 mysql_real_data_home,
1070 wsrep_defaults_file, wsrep_defaults_group_suffix,
1071 binlog_opt, binlog_opt_val,
1072 uuid, (long long) seqno,
1073 bypass ? " " WSREP_SST_OPT_BYPASS : "");
1074 my_free(binlog_opt_val);
1075
1076 if (ret < 0 || ret >= cmd_len)
1077 {
1078 WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1079 return (ret < 0 ? ret : -EMSGSIZE);
1080 }
1081
1082 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1083
1084 pthread_t tmp;
1085 sst_thread_arg arg(cmd_str(), env);
1086 mysql_mutex_lock (&arg.lock);
1087 ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg);
1088 if (ret)
1089 {
1090 WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)",
1091 ret, strerror(ret));
1092 return ret;
1093 }
1094 mysql_cond_wait (&arg.cond, &arg.lock);
1095
1096 WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1097 return arg.err;
1098 }
1099
1100 /* return true if character can be a part of a filename */
filename_char(int const c)1101 static bool filename_char(int const c)
1102 {
1103 return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1104 }
1105
1106 /* return true if character can be a part of an address string */
address_char(int const c)1107 static bool address_char(int const c)
1108 {
1109 return filename_char(c) ||
1110 (c == ':') || (c == '[') || (c == ']') || (c == '/');
1111 }
1112
check_request_str(const char * const str,bool (* check)(int c))1113 static bool check_request_str(const char* const str,
1114 bool (*check) (int c))
1115 {
1116 for (size_t i(0); str[i] != '\0'; ++i)
1117 {
1118 if (!check(str[i]))
1119 {
1120 WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1121 str[i], str[i]);
1122 return true;
1123 }
1124 }
1125
1126 return false;
1127 }
1128
wsrep_sst_donate_cb(void * app_ctx,void * recv_ctx,const void * msg,size_t msg_len,const wsrep_gtid_t * current_gtid,const char * state,size_t state_len,bool bypass)1129 wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
1130 const void* msg, size_t msg_len,
1131 const wsrep_gtid_t* current_gtid,
1132 const char* state, size_t state_len,
1133 bool bypass)
1134 {
1135 const char* method = (char*)msg;
1136 size_t method_len = strlen (method);
1137
1138 if (check_request_str(method, filename_char))
1139 {
1140 WSREP_ERROR("Bad SST method name. SST canceled.");
1141 return WSREP_CB_FAILURE;
1142 }
1143
1144 const char* data = method + method_len + 1;
1145
1146 /* check for auth@addr separator */
1147 const char* addr= strrchr(data, '@');
1148 wsp::string remote_auth;
1149 if (addr)
1150 {
1151 remote_auth.set(strndup(data, addr - data));
1152 addr++;
1153 }
1154 else
1155 {
1156 // no auth part
1157 addr= data;
1158 }
1159
1160 if (check_request_str(addr, address_char))
1161 {
1162 WSREP_ERROR("Bad SST address string. SST canceled.");
1163 return WSREP_CB_FAILURE;
1164 }
1165
1166 char uuid_str[37];
1167 wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str));
1168
1169 /* This will be reset when sync callback is called.
1170 * Should we set wsrep_ready to FALSE here too? */
1171 local_status.set(WSREP_MEMBER_DONOR);
1172
1173 wsp::env env(NULL);
1174 if (env.error())
1175 {
1176 WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1177 return WSREP_CB_FAILURE;
1178 }
1179
1180 int ret;
1181 if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth_real)))
1182 {
1183 WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1184 return WSREP_CB_FAILURE;
1185 }
1186
1187 if (remote_auth())
1188 {
1189 if ((ret= sst_append_env_var(env, WSREP_SST_REMOTE_AUTH_ENV,remote_auth())))
1190 {
1191 WSREP_ERROR("wsrep_sst_donate_cb(): appending remote auth env failed: "
1192 "%d", ret);
1193 return WSREP_CB_FAILURE;
1194 }
1195 }
1196
1197 if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1198 {
1199 ret = sst_donate_mysqldump(addr, ¤t_gtid->uuid, uuid_str,
1200 current_gtid->seqno, bypass, env());
1201 }
1202 else
1203 {
1204 ret = sst_donate_other(method, addr, uuid_str,
1205 current_gtid->seqno, bypass, env());
1206 }
1207
1208 return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
1209 }
1210
wsrep_SE_init_wait(THD * thd)1211 enum Wsrep_SE_init_result wsrep_SE_init_wait(THD* thd)
1212 {
1213 mysql_mutex_lock (&LOCK_wsrep_sst_init);
1214 while (SE_initialized == WSREP_SE_INIT_RESULT_NONE &&
1215 thd->killed == THD::NOT_KILLED)
1216 {
1217 mysql_mutex_lock(&thd->LOCK_thd_data);
1218 thd->current_cond= &COND_wsrep_sst_init;
1219 thd->current_mutex= &LOCK_wsrep_sst_init;
1220 mysql_mutex_unlock(&thd->LOCK_thd_data);
1221
1222 mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
1223
1224 if (thd->killed != THD::NOT_KILLED)
1225 {
1226 WSREP_DEBUG("SE init waiting canceled");
1227 break;
1228 }
1229 mysql_mutex_lock(&thd->LOCK_thd_data);
1230 thd->current_cond= NULL;
1231 thd->current_mutex= NULL;
1232 mysql_mutex_unlock(&thd->LOCK_thd_data);
1233 }
1234 enum Wsrep_SE_init_result ret= SE_initialized;
1235 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1236
1237 mysql_mutex_lock(&thd->LOCK_thd_data);
1238 thd->current_cond= NULL;
1239 thd->current_mutex= NULL;
1240 mysql_mutex_unlock(&thd->LOCK_thd_data);
1241 return ret;
1242 }
1243
wsrep_SE_initialized(enum Wsrep_SE_init_result result)1244 void wsrep_SE_initialized(enum Wsrep_SE_init_result result)
1245 {
1246 mysql_mutex_lock (&LOCK_wsrep_sst_init);
1247 if (SE_initialized == WSREP_SE_INIT_RESULT_NONE)
1248 {
1249 SE_initialized= result;
1250 }
1251 mysql_cond_signal (&COND_wsrep_sst_init);
1252 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1253 }
1254