1 /* Copyright 2008-2020 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #include "mariadb.h"
17 #include "wsrep_sst.h"
18 #include <inttypes.h>
19 #include <ctype.h>
20 #include <mysqld.h>
21 #include <m_ctype.h>
22 #include <strfunc.h>
23 #include <sql_class.h>
24 #include <set_var.h>
25 #include <sql_acl.h>
26 #include <sql_reload.h>
27 #include <sql_parse.h>
28 #include "wsrep_priv.h"
29 #include "wsrep_utils.h"
30 #include "wsrep_xid.h"
31 #include <cstdio>
32 #include <cstdlib>
33
34 #include <my_service_manager.h>
35
36 static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 +
37 sizeof(WSREP_SST_OPT_CONF) +
38 sizeof(WSREP_SST_OPT_CONF_SUFFIX) +
39 sizeof(WSREP_SST_OPT_CONF_EXTRA)] = {0};
40
41 const char* wsrep_sst_method = WSREP_SST_DEFAULT;
42 const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
43 const char* wsrep_sst_donor = "";
44 const char* wsrep_sst_auth = NULL;
45
46 // container for real auth string
47 static const char* sst_auth_real = NULL;
48 my_bool wsrep_sst_donor_rejects_queries = FALSE;
49
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)50 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
51 {
52 if ((! var->save_result.string_value.str) ||
53 (var->save_result.string_value.length == 0 ))
54 {
55 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
56 var->save_result.string_value.str ?
57 var->save_result.string_value.str : "NULL");
58 return 1;
59 }
60
61 return 0;
62 }
63
64 static const char* data_home_dir = NULL;
65
wsrep_set_data_home_dir(const char * data_dir)66 void wsrep_set_data_home_dir(const char *data_dir)
67 {
68 data_home_dir= (data_dir && *data_dir) ? data_dir : NULL;
69 }
70
make_wsrep_defaults_file()71 static void make_wsrep_defaults_file()
72 {
73 if (!wsrep_defaults_file[0])
74 {
75 char *ptr= wsrep_defaults_file;
76 char *end= ptr + sizeof(wsrep_defaults_file);
77 if (my_defaults_file)
78 ptr= strxnmov(ptr, end - ptr,
79 WSREP_SST_OPT_CONF, " '", my_defaults_file, "' ", NULL);
80
81 if (my_defaults_extra_file)
82 ptr= strxnmov(ptr, end - ptr,
83 WSREP_SST_OPT_CONF_EXTRA, " '", my_defaults_extra_file, "' ", NULL);
84
85 if (my_defaults_group_suffix)
86 ptr= strxnmov(ptr, end - ptr,
87 WSREP_SST_OPT_CONF_SUFFIX, " '", my_defaults_group_suffix, "' ", NULL);
88 }
89 }
90
91
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)92 bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
93 {
94 if ((! var->save_result.string_value.str) ||
95 (var->save_result.string_value.length > (FN_REFLEN - 1))) // safety
96 {
97 goto err;
98 }
99
100 return 0;
101
102 err:
103 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
104 var->save_result.string_value.str ?
105 var->save_result.string_value.str : "NULL");
106 return 1;
107 }
108
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)109 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
110 enum_var_type type)
111 {
112 return 0;
113 }
114
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)115 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
116 {
117 return 0;
118 }
119
sst_auth_real_set(const char * value)120 static bool sst_auth_real_set (const char* value)
121 {
122 const char* v= NULL;
123
124 if (value)
125 {
126 v= my_strdup(value, MYF(0));
127 }
128 else // its NULL
129 {
130 wsrep_sst_auth_free();
131 return 0;
132 }
133
134 if (v)
135 {
136 // set sst_auth_real
137 if (sst_auth_real) { my_free((void *) sst_auth_real); }
138 sst_auth_real = v;
139
140 // mask wsrep_sst_auth
141 if (strlen(sst_auth_real))
142 {
143 if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
144 wsrep_sst_auth= my_strdup(WSREP_SST_AUTH_MASK, MYF(0));
145 }
146 else
147 {
148 if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
149 wsrep_sst_auth= NULL;
150 }
151
152 return 0;
153 }
154 return 1;
155 }
156
wsrep_sst_auth_free()157 void wsrep_sst_auth_free()
158 {
159 if (wsrep_sst_auth) { my_free((void *) wsrep_sst_auth); }
160 if (sst_auth_real) { my_free((void *) sst_auth_real); }
161 wsrep_sst_auth= NULL;
162 sst_auth_real= NULL;
163 }
164
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)165 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
166 {
167 return sst_auth_real_set (wsrep_sst_auth);
168 }
169
wsrep_sst_auth_init()170 void wsrep_sst_auth_init ()
171 {
172 sst_auth_real_set(wsrep_sst_auth);
173 }
174
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)175 bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
176 {
177 return 0;
178 }
179
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)180 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
181 {
182 return 0;
183 }
184
wsrep_before_SE()185 bool wsrep_before_SE()
186 {
187 return (wsrep_provider != NULL
188 && strcmp (wsrep_provider, WSREP_NONE)
189 && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
190 && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
191 }
192
193 static bool sst_complete = false;
194 static bool sst_needed = false;
195
196 #define WSREP_EXTEND_TIMEOUT_INTERVAL 30
197 #define WSREP_TIMEDWAIT_SECONDS 10
198
wsrep_sst_grab()199 void wsrep_sst_grab ()
200 {
201 WSREP_INFO("wsrep_sst_grab()");
202 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
203 sst_complete = false;
204 mysql_mutex_unlock (&LOCK_wsrep_sst);
205 }
206
207 // Wait for end of SST
wsrep_sst_wait()208 bool wsrep_sst_wait ()
209 {
210 double total_wtime = 0;
211
212 if (mysql_mutex_lock (&LOCK_wsrep_sst))
213 abort();
214
215 WSREP_INFO("Waiting for SST to complete.");
216
217 while (!sst_complete)
218 {
219 struct timespec wtime;
220 set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS);
221 time_t start_time = time(NULL);
222 mysql_cond_timedwait (&COND_wsrep_sst, &LOCK_wsrep_sst, &wtime);
223 time_t end_time = time(NULL);
224
225 if (!sst_complete)
226 {
227 total_wtime += difftime(end_time, start_time);
228 WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime);
229 service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
230 "WSREP state transfer ongoing, current seqno: %" PRId64 " waited %f secs", local_seqno, total_wtime);
231 }
232 }
233
234 if (local_seqno >= 0)
235 {
236 WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
237 }
238 else
239 {
240 WSREP_ERROR("SST failed: %d (%s)",
241 int(-local_seqno), strerror(-local_seqno));
242 }
243
244 mysql_mutex_unlock (&LOCK_wsrep_sst);
245
246 return (local_seqno >= 0);
247 }
248
249 // Signal end of SST
wsrep_sst_complete(const wsrep_uuid_t * sst_uuid,wsrep_seqno_t sst_seqno,bool needed)250 void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
251 wsrep_seqno_t sst_seqno,
252 bool needed)
253 {
254 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
255 if (!sst_complete)
256 {
257 sst_complete = true;
258 sst_needed = needed;
259 local_uuid = *sst_uuid;
260 local_seqno = sst_seqno;
261 mysql_cond_signal (&COND_wsrep_sst);
262 }
263 else
264 {
265 /* This can happen when called from wsrep_synced_cb().
266 At the moment there is no way to check there
267 if main thread is still waiting for signal,
268 so wsrep_sst_complete() is called from there
269 each time wsrep_ready changes from FALSE -> TRUE.
270 */
271 WSREP_DEBUG("Nobody is waiting for SST.");
272 }
273 mysql_mutex_unlock (&LOCK_wsrep_sst);
274 }
275
276 /*
277 If wsrep provider is loaded, inform that the new state snapshot
278 has been received. Also update the local checkpoint.
279
280 @param wsrep [IN] wsrep handle
281 @param uuid [IN] Initial state UUID
282 @param seqno [IN] Initial state sequence number
283 @param state [IN] Always NULL, also ignored by wsrep provider (?)
284 @param state_len [IN] Always 0, also ignored by wsrep provider (?)
285 @param implicit [IN] Whether invoked implicitly due to SST
286 (true) or explicitly because if change
287 in wsrep_start_position by user (false).
288 @return false Success
289 true Error
290
291 */
wsrep_sst_received(wsrep_t * const wsrep,const wsrep_uuid_t & uuid,const wsrep_seqno_t seqno,const void * const state,const size_t state_len,const bool implicit)292 bool wsrep_sst_received (wsrep_t* const wsrep,
293 const wsrep_uuid_t& uuid,
294 const wsrep_seqno_t seqno,
295 const void* const state,
296 const size_t state_len,
297 const bool implicit)
298 {
299 /*
300 To keep track of whether the local uuid:seqno should be updated. Also, note
301 that local state (uuid:seqno) is updated/checkpointed only after we get an
302 OK from wsrep provider. By doing so, the values remain consistent across
303 the server & wsrep provider.
304 */
305 bool do_update= false;
306
307 // Get the locally stored uuid:seqno.
308 if (wsrep_get_SE_checkpoint(local_uuid, local_seqno))
309 {
310 return true;
311 }
312
313 if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
314 local_seqno < seqno || seqno < 0)
315 {
316 do_update= true;
317 }
318 else if (local_seqno > seqno)
319 {
320 WSREP_WARN("SST position can't be set in past. Requested: %lld, Current: "
321 " %lld.", (long long)seqno, (long long)local_seqno);
322 /*
323 If we are here because of SET command, simply return true (error) instead of
324 aborting.
325 */
326 if (implicit)
327 {
328 WSREP_WARN("Can't continue.");
329 unireg_abort(1);
330 }
331 else
332 {
333 return true;
334 }
335 }
336
337 #ifdef GTID_SUPPORT
338 wsrep_init_sidno(uuid);
339 #endif /* GTID_SUPPORT */
340
341 if (wsrep)
342 {
343 int const rcode(seqno < 0 ? seqno : 0);
344 wsrep_gtid_t const state_id= {uuid,
345 (rcode ? WSREP_SEQNO_UNDEFINED : seqno)};
346
347 wsrep_status_t ret= wsrep->sst_received(wsrep, &state_id, state,
348 state_len, rcode);
349
350 if (ret != WSREP_OK)
351 {
352 return true;
353 }
354 }
355
356 // Now is the good time to update the local state and checkpoint.
357 if (do_update)
358 {
359 if (wsrep_set_SE_checkpoint(uuid, seqno))
360 {
361 return true;
362 }
363
364 local_uuid= uuid;
365 local_seqno= seqno;
366 }
367
368 return false;
369 }
370
371 // Let applier threads to continue
wsrep_sst_continue()372 bool wsrep_sst_continue ()
373 {
374 if (sst_needed)
375 {
376 WSREP_INFO("Signalling provider to continue.");
377 return wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0, true);
378 }
379 return false;
380 }
381
382 struct sst_thread_arg
383 {
384 const char* cmd;
385 char** env;
386 char* ret_str;
387 int err;
388 mysql_mutex_t lock;
389 mysql_cond_t cond;
390
sst_thread_argsst_thread_arg391 sst_thread_arg (const char* c, char** e)
392 : cmd(c), env(e), ret_str(0), err(-1)
393 {
394 mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
395 mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
396 }
397
~sst_thread_argsst_thread_arg398 ~sst_thread_arg()
399 {
400 mysql_cond_destroy (&cond);
401 mysql_mutex_unlock (&lock);
402 mysql_mutex_destroy (&lock);
403 }
404 };
405
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)406 static int sst_scan_uuid_seqno (const char* str,
407 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
408 {
409 int offt = wsrep_uuid_scan (str, strlen(str), uuid);
410 errno= 0; /* Reset the errno */
411 if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
412 {
413 *seqno = strtoll (str + offt + 1, NULL, 10);
414 if (*seqno != LLONG_MAX || errno != ERANGE)
415 {
416 return 0;
417 }
418 }
419
420 WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
421 return EINVAL;
422 }
423
424 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)425 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
426 {
427 char* ret= fgets (buf, buf_len, stream);
428
429 if (ret)
430 {
431 size_t len = strlen(ret);
432 if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
433 }
434
435 return ret;
436 }
437
438 /*
439 Generate "name 'value'" string.
440 */
generate_name_value(const char * name,const char * value)441 static char* generate_name_value(const char* name, const char* value)
442 {
443 size_t name_len= strlen(name);
444 size_t value_len= strlen(value);
445 char* buf=
446 (char*) my_malloc((name_len + value_len + 5) * sizeof(char), MYF(0));
447 if (buf)
448 {
449 char* ref= buf;
450 *ref++ = ' ';
451 memcpy(ref, name, name_len * sizeof(char));
452 ref += name_len;
453 *ref++ = ' ';
454 *ref++ = '\'';
455 memcpy(ref, value, value_len * sizeof(char));
456 ref += value_len;
457 *ref++ = '\'';
458 *ref = 0;
459 }
460 return buf;
461 }
462 /*
463 Generate binlog option string for sst_donate_other(), sst_prepare_other().
464
465 Returns zero on success, negative error code otherwise.
466
467 String containing binlog name is stored in param ret if binlog is enabled
468 and GTID mode is on, otherwise empty string. Returned string should be
469 freed with my_free().
470 */
generate_binlog_opt_val(char ** ret)471 static int generate_binlog_opt_val(char** ret)
472 {
473 DBUG_ASSERT(ret);
474 *ret= NULL;
475 if (opt_bin_log)
476 {
477 assert(opt_bin_logname);
478 *ret= strcmp(opt_bin_logname, "0") ?
479 generate_name_value(WSREP_SST_OPT_BINLOG,
480 opt_bin_logname) :
481 my_strdup("", MYF(0));
482 }
483 else
484 {
485 *ret= my_strdup("", MYF(0));
486 }
487 if (!*ret) return -ENOMEM;
488 return 0;
489 }
490
generate_binlog_index_opt_val(char ** ret)491 static int generate_binlog_index_opt_val(char** ret)
492 {
493 DBUG_ASSERT(ret);
494 *ret= NULL;
495 if (opt_binlog_index_name) {
496 *ret= strcmp(opt_binlog_index_name, "0") ?
497 generate_name_value(WSREP_SST_OPT_BINLOG_INDEX,
498 opt_binlog_index_name) :
499 my_strdup("", MYF(0));
500 }
501 else
502 {
503 *ret= my_strdup("", MYF(0));
504 }
505 if (!*ret) return -ENOMEM;
506 return 0;
507 }
508
sst_joiner_thread(void * a)509 static void* sst_joiner_thread (void* a)
510 {
511 sst_thread_arg* arg= (sst_thread_arg*) a;
512 int err= 1;
513
514 {
515 const char magic[] = "ready";
516 const size_t magic_len = sizeof(magic) - 1;
517 const size_t out_len = 512;
518 char out[out_len];
519
520 WSREP_INFO("Running: '%s'", arg->cmd);
521
522 wsp::process proc (arg->cmd, "r", arg->env);
523
524 if (proc.pipe() && !proc.error())
525 {
526 const char* tmp= my_fgets (out, out_len, proc.pipe());
527
528 if (!tmp || strlen(tmp) < (magic_len + 2) ||
529 strncasecmp (tmp, magic, magic_len))
530 {
531 WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
532 magic, arg->cmd, tmp);
533 proc.wait();
534 if (proc.error()) err = proc.error();
535 }
536 else
537 {
538 err = 0;
539 }
540 }
541 else
542 {
543 err = proc.error();
544 WSREP_ERROR("Failed to execute: %s : %d (%s)",
545 arg->cmd, err, strerror(err));
546 }
547
548 // signal sst_prepare thread with ret code,
549 // it will go on sending SST request
550 mysql_mutex_lock (&arg->lock);
551 if (!err)
552 {
553 arg->ret_str = strdup (out + magic_len + 1);
554 if (!arg->ret_str) err = ENOMEM;
555 }
556 arg->err = -err;
557 mysql_cond_signal (&arg->cond);
558 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
559
560 if (err) return NULL; /* lp:808417 - return immediately, don't signal
561 * initializer thread to ensure single thread of
562 * shutdown. */
563
564 wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
565 wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
566
567 // in case of successfull receiver start, wait for SST completion/end
568 char* tmp = my_fgets (out, out_len, proc.pipe());
569
570 proc.wait();
571 err= EINVAL;
572
573 if (!tmp)
574 {
575 WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
576 "joiner script.");
577 if (proc.error()) err = proc.error();
578 }
579 else
580 {
581 // Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any).
582 const char *pos= strchr(out, ' ');
583
584 if (!pos) {
585 // There is no wsrep_gtid_domain_id (some older version SST script?).
586 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
587
588 } else {
589 // Scan state ID first followed by wsrep_gtid_domain_id.
590 unsigned long int domain_id;
591
592 // Null-terminate the state-id.
593 out[pos - out]= 0;
594 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
595
596 if (err)
597 {
598 goto err;
599 }
600 else if (wsrep_gtid_mode)
601 {
602 errno= 0; /* Reset the errno */
603 domain_id= strtoul(pos + 1, NULL, 10);
604 err= errno;
605
606 /* Check if we received a valid gtid_domain_id. */
607 if (err == EINVAL || err == ERANGE)
608 {
609 WSREP_ERROR("Failed to get donor wsrep_gtid_domain_id.");
610 err= EINVAL;
611 goto err;
612 } else {
613 wsrep_gtid_domain_id= (uint32) domain_id;
614 }
615 }
616 }
617 }
618
619 err:
620
621 if (err)
622 {
623 ret_uuid= WSREP_UUID_UNDEFINED;
624 ret_seqno= -err;
625 }
626
627 // Tell initializer thread that SST is complete
628 wsrep_sst_complete (&ret_uuid, ret_seqno, true);
629 }
630
631 return NULL;
632 }
633
634 #define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH"
635 #define WSREP_SST_REMOTE_AUTH_ENV "WSREP_SST_OPT_REMOTE_AUTH"
636 #define DATA_HOME_DIR_ENV "INNODB_DATA_HOME_DIR"
637
sst_append_env_var(wsp::env & env,const char * const var,const char * const val)638 static int sst_append_env_var(wsp::env& env,
639 const char* const var,
640 const char* const val)
641 {
642 int const env_str_size= strlen(var) + 1 /* = */
643 + (val ? strlen(val) : 0) + 1 /* \0 */;
644
645 wsp::string env_str(env_str_size); // for automatic cleanup on return
646 if (!env_str()) return -ENOMEM;
647
648 int ret= snprintf(env_str(), env_str_size, "%s=%s", var, val ? val : "");
649
650 if (ret < 0 || ret >= env_str_size)
651 {
652 WSREP_ERROR("sst_append_env_var(): snprintf(%s=%s) failed: %d",
653 var, val, ret);
654 return (ret < 0 ? ret : -EMSGSIZE);
655 }
656
657 env.append(env_str());
658 return -env.error();
659 }
660
661 #ifdef __WIN__
662 /*
663 Space, single quote, ampersand, backquote, I/O redirection
664 characters, caret, all brackets, plus, exclamation and comma
665 characters require text to be enclosed in double quotes:
666 */
667 #define IS_SPECIAL(c) \
668 (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
669 c == '>' || c == '<' || c == ';' || c == '^' || \
670 c == '[' || c == ']' || c == '{' || c == '}' || \
671 c == '(' || c == ')' || c == '+' || c == '!' || \
672 c == ',')
673 /*
674 Inside values, equals character are interpreted as special
675 character and requires quotation:
676 */
677 #define IS_SPECIAL_V(c) (IS_SPECIAL(c) || c == '=')
678 /*
679 Double quotation mark and percent characters require escaping:
680 */
681 #define IS_REQ_ESCAPING(c) (c == '""' || c == '%')
682 #else
683 /*
684 Space, single quote, ampersand, backquote, and I/O redirection
685 characters require text to be enclosed in double quotes. The
686 semicolon is used to separate shell commands, so it must be
687 enclosed in double quotes as well:
688 */
689 #define IS_SPECIAL(c) \
690 (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
691 c == '>' || c == '<' || c == ';')
692 /*
693 Inside values, characters are interpreted as in parameter names:
694 */
695 #define IS_SPECIAL_V(c) IS_SPECIAL(c)
696 /*
697 Double quotation mark and backslash characters require
698 backslash prefixing, the dollar symbol is used to substitute
699 a variable value, therefore it also requires escaping:
700 */
701 #define IS_REQ_ESCAPING(c) (c == '"' || c == '\\' || c == '$')
702 #endif
703
estimate_cmd_len(bool * extra_args)704 static size_t estimate_cmd_len (bool* extra_args)
705 {
706 /*
707 The length of the area reserved for the control parameters
708 of the SST script (excluding the copying of the original
709 mysqld arguments):
710 */
711 size_t cmd_len= 4096;
712 bool extra= false;
713 /*
714 If mysqld was started with arguments, add them all:
715 */
716 if (orig_argc > 1)
717 {
718 for (int i = 1; i < orig_argc; i++)
719 {
720 const char* arg= orig_argv[i];
721 size_t n= strlen(arg);
722 if (n == 0) continue;
723 cmd_len += n;
724 bool quotation= false;
725 char c;
726 while ((c = *arg++) != 0)
727 {
728 if (IS_SPECIAL(c))
729 {
730 quotation= true;
731 }
732 else if (IS_REQ_ESCAPING(c))
733 {
734 cmd_len++;
735 #ifdef __WIN__
736 quotation= true;
737 #endif
738 }
739 /*
740 If the equals symbol is encountered, then we need to separately
741 process the right side:
742 */
743 else if (c == '=')
744 {
745 /* Perhaps we need to quote the left part of the argument: */
746 if (quotation)
747 {
748 cmd_len += 2;
749 /*
750 Reset the quotation flag, since now the status for
751 the right side of the expression will be saved here:
752 */
753 quotation= false;
754 }
755 while ((c = *arg++) != 0)
756 {
757 if (IS_SPECIAL_V(c))
758 {
759 quotation= true;
760 }
761 else if (IS_REQ_ESCAPING(c))
762 {
763 cmd_len++;
764 #ifdef __WIN__
765 quotation= true;
766 #endif
767 }
768 }
769 break;
770 }
771 }
772 /* Perhaps we need to quote the entire argument or its right part: */
773 if (quotation)
774 {
775 cmd_len += 2;
776 }
777 }
778 extra = true;
779 cmd_len += strlen(WSREP_SST_OPT_MYSQLD);
780 /*
781 Add the separating spaces between arguments,
782 and one additional space before "--mysqld-args":
783 */
784 cmd_len += orig_argc;
785 }
786 *extra_args= extra;
787 return cmd_len;
788 }
789
copy_orig_argv(char * cmd_str)790 static void copy_orig_argv (char* cmd_str)
791 {
792 /*
793 If mysqld was started with arguments, copy them all:
794 */
795 if (orig_argc > 1)
796 {
797 size_t n = strlen(WSREP_SST_OPT_MYSQLD);
798 *cmd_str++ = ' ';
799 memcpy(cmd_str, WSREP_SST_OPT_MYSQLD, n * sizeof(char));
800 cmd_str += n;
801 for (int i = 1; i < orig_argc; i++)
802 {
803 char* arg= orig_argv[i];
804 n = strlen(arg);
805 if (n == 0) continue;
806 *cmd_str++ = ' ';
807 bool quotation= false;
808 bool plain= true;
809 char *arg_scan= arg;
810 char c;
811 while ((c = *arg_scan++) != 0)
812 {
813 if (IS_SPECIAL(c))
814 {
815 quotation= true;
816 }
817 else if (IS_REQ_ESCAPING(c))
818 {
819 plain= false;
820 #ifdef __WIN__
821 quotation= true;
822 #endif
823 }
824 /*
825 If the equals symbol is encountered, then we need to separately
826 process the right side:
827 */
828 else if (c == '=')
829 {
830 /* Calculate length of the Left part of the argument: */
831 size_t m = (size_t) (arg_scan - arg) - 1;
832 if (m)
833 {
834 /* Perhaps we need to quote the left part of the argument: */
835 if (quotation)
836 {
837 *cmd_str++ = '"';
838 }
839 /*
840 If there were special characters inside, then we can use
841 the fast memcpy function:
842 */
843 if (plain)
844 {
845 memcpy(cmd_str, arg, m * sizeof(char));
846 cmd_str += m;
847 /* Left part of the argument has already been processed: */
848 n -= m;
849 arg += m;
850 }
851 /* Otherwise we need to prefix individual characters: */
852 else
853 {
854 n -= m;
855 while (m)
856 {
857 c = *arg++;
858 if (IS_REQ_ESCAPING(c))
859 {
860 #ifdef __WIN__
861 *cmd_str++ = c;
862 #else
863 *cmd_str++ = '\\';
864 #endif
865 }
866 *cmd_str++ = c;
867 m--;
868 }
869 /*
870 Reset the plain string flag, since now the status for
871 the right side of the expression will be saved here:
872 */
873 plain= true;
874 }
875 /* Perhaps we need to quote the left part of the argument: */
876 if (quotation)
877 {
878 *cmd_str++ = '"';
879 /*
880 Reset the quotation flag, since now the status for
881 the right side of the expression will be saved here:
882 */
883 quotation= false;
884 }
885 }
886 /* Copy equals symbol: */
887 *cmd_str++ = '=';
888 arg++;
889 n--;
890 /* Let's deal with the left side of the expression: */
891 while ((c = *arg_scan++) != 0)
892 {
893 if (IS_SPECIAL_V(c))
894 {
895 quotation= true;
896 }
897 else if (IS_REQ_ESCAPING(c))
898 {
899 plain= false;
900 #ifdef __WIN__
901 quotation= true;
902 #endif
903 }
904 }
905 break;
906 }
907 }
908 if (n)
909 {
910 /* Perhaps we need to quote the entire argument or its right part: */
911 if (quotation)
912 {
913 *cmd_str++ = '"';
914 }
915 /*
916 If there were no special characters inside, then we can use
917 the fast memcpy function:
918 */
919 if (plain)
920 {
921 memcpy(cmd_str, arg, n * sizeof(char));
922 cmd_str += n;
923 }
924 /* Otherwise we need to prefix individual characters: */
925 else
926 {
927 while ((c = *arg++) != 0)
928 {
929 if (IS_REQ_ESCAPING(c))
930 {
931 #ifdef __WIN__
932 *cmd_str++ = c;
933 #else
934 *cmd_str++ = '\\';
935 #endif
936 }
937 *cmd_str++ = c;
938 }
939 }
940 /* Perhaps we need to quote the entire argument or its right part: */
941 if (quotation)
942 {
943 *cmd_str++ = '"';
944 }
945 }
946 }
947 /*
948 Add a terminating null character (not counted in the length,
949 since we've overwritten the original null character which
950 was previously added by snprintf:
951 */
952 *cmd_str = 0;
953 }
954 }
955
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)956 static ssize_t sst_prepare_other (const char* method,
957 const char* sst_auth,
958 const char* addr_in,
959 const char** addr_out)
960 {
961 bool extra_args;
962 size_t const cmd_len= estimate_cmd_len(&extra_args);
963 wsp::string cmd_str(cmd_len);
964
965 if (!cmd_str())
966 {
967 WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %zd bytes",
968 cmd_len);
969 return -ENOMEM;
970 }
971
972 char* binlog_opt_val= NULL;
973 char* binlog_index_opt_val= NULL;
974
975 int ret;
976 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
977 {
978 WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
979 ret);
980 return ret;
981 }
982
983 if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
984 {
985 WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
986 ret);
987 if (binlog_opt_val) my_free(binlog_opt_val);
988 return ret;
989 }
990
991 make_wsrep_defaults_file();
992
993 ret= snprintf (cmd_str(), cmd_len,
994 "wsrep_sst_%s "
995 WSREP_SST_OPT_ROLE " 'joiner' "
996 WSREP_SST_OPT_ADDR " '%s' "
997 WSREP_SST_OPT_DATA " '%s' "
998 "%s"
999 WSREP_SST_OPT_PARENT " '%d'"
1000 "%s"
1001 "%s",
1002 method, addr_in, mysql_real_data_home,
1003 wsrep_defaults_file,
1004 (int)getpid(),
1005 binlog_opt_val, binlog_index_opt_val);
1006
1007 my_free(binlog_opt_val);
1008 my_free(binlog_index_opt_val);
1009
1010 if (ret < 0 || size_t(ret) >= cmd_len)
1011 {
1012 WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
1013 return (ret < 0 ? ret : -EMSGSIZE);
1014 }
1015
1016 if (extra_args)
1017 copy_orig_argv(cmd_str() + ret);
1018
1019 wsp::env env(NULL);
1020 if (env.error())
1021 {
1022 WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
1023 return -env.error();
1024 }
1025
1026 if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth)))
1027 {
1028 WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
1029 return ret;
1030 }
1031
1032 if (data_home_dir)
1033 {
1034 if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1035 {
1036 WSREP_ERROR("sst_prepare_other(): appending data "
1037 "directory failed: %d", ret);
1038 return ret;
1039 }
1040 }
1041
1042 pthread_t tmp;
1043 sst_thread_arg arg(cmd_str(), env());
1044 mysql_mutex_lock (&arg.lock);
1045 ret = mysql_thread_create (key_wsrep_sst_joiner, &tmp, NULL, sst_joiner_thread, &arg);
1046 if (ret)
1047 {
1048 WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1049 ret, strerror(ret));
1050 return -ret;
1051 }
1052 mysql_cond_wait (&arg.cond, &arg.lock);
1053
1054 *addr_out= arg.ret_str;
1055
1056 if (!arg.err)
1057 ret = strlen(*addr_out);
1058 else
1059 {
1060 assert (arg.err < 0);
1061 ret = arg.err;
1062 }
1063
1064 pthread_detach (tmp);
1065
1066 return ret;
1067 }
1068
1069 extern uint mysqld_port;
1070
1071 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)1072 static ssize_t sst_prepare_mysqldump (const char* addr_in,
1073 const char** addr_out)
1074 {
1075 ssize_t ret = strlen (addr_in);
1076
1077 if (!strrchr(addr_in, ':'))
1078 {
1079 ssize_t s = ret + 7;
1080 char* tmp = (char*) malloc (s);
1081
1082 if (tmp)
1083 {
1084 ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
1085
1086 if (ret > 0 && ret < s)
1087 {
1088 *addr_out= tmp;
1089 return ret;
1090 }
1091 if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
1092 free (tmp);
1093 }
1094 else {
1095 ret= -ENOMEM;
1096 }
1097
1098 WSREP_ERROR ("Could not prepare state transfer request: "
1099 "adding default port failed: %zd.", ret);
1100 }
1101 else {
1102 *addr_out= addr_in;
1103 }
1104
1105 return ret;
1106 }
1107
1108 static bool SE_initialized = false;
1109
wsrep_sst_prepare(void ** msg)1110 ssize_t wsrep_sst_prepare (void** msg)
1111 {
1112 const char* addr_in= NULL;
1113 const char* addr_out= NULL;
1114 const char* method;
1115
1116 if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
1117 {
1118 ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
1119 *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
1120 if (!msg)
1121 {
1122 WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
1123 unireg_abort(1);
1124 }
1125 return ret;
1126 }
1127
1128 /*
1129 Figure out SST receive address. Common for all SST methods.
1130 */
1131 char ip_buf[256];
1132 const ssize_t ip_max= sizeof(ip_buf);
1133
1134 // Attempt 1: wsrep_sst_receive_address
1135 if (wsrep_sst_receive_address &&
1136 strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
1137 {
1138 addr_in= wsrep_sst_receive_address;
1139 }
1140
1141 //Attempt 2: wsrep_node_address
1142 else if (wsrep_node_address && strlen(wsrep_node_address))
1143 {
1144 wsp::Address addr(wsrep_node_address);
1145
1146 if (!addr.is_valid())
1147 {
1148 WSREP_ERROR("Could not parse wsrep_node_address : %s",
1149 wsrep_node_address);
1150 unireg_abort(1);
1151 }
1152 memcpy(ip_buf, addr.get_address(), addr.get_address_len());
1153 addr_in= ip_buf;
1154 }
1155 // Attempt 3: Try to get the IP from the list of available interfaces.
1156 else
1157 {
1158 ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
1159
1160 if (ret && ret < ip_max)
1161 {
1162 addr_in= ip_buf;
1163 }
1164 else
1165 {
1166 WSREP_ERROR("Failed to guess address to accept state transfer. "
1167 "wsrep_sst_receive_address must be set manually.");
1168 unireg_abort(1);
1169 }
1170 }
1171
1172 ssize_t addr_len= -ENOSYS;
1173 method = wsrep_sst_method;
1174 if (!strcmp(method, WSREP_SST_MYSQLDUMP))
1175 {
1176 addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
1177 if (addr_len < 0) unireg_abort(1);
1178 }
1179 else
1180 {
1181 /*! A heuristic workaround until we learn how to stop and start engines */
1182 if (SE_initialized)
1183 {
1184 if (!strcmp(method, WSREP_SST_XTRABACKUP) ||
1185 !strcmp(method, WSREP_SST_XTRABACKUPV2))
1186 {
1187 WSREP_WARN("The %s SST method is deprecated, so it is automatically "
1188 "replaced by %s", method, WSREP_SST_MARIABACKUP);
1189 method = WSREP_SST_MARIABACKUP;
1190 }
1191 // we already did SST at initializaiton, now engines are running
1192 // sql_print_information() is here because the message is too long
1193 // for WSREP_INFO.
1194 sql_print_information ("WSREP: "
1195 "You have configured '%s' state snapshot transfer method "
1196 "which cannot be performed on a running server. "
1197 "Wsrep provider won't be able to fall back to it "
1198 "if other means of state transfer are unavailable. "
1199 "In that case you will need to restart the server.",
1200 method);
1201 *msg = 0;
1202 return 0;
1203 }
1204
1205 addr_len = sst_prepare_other (method, sst_auth_real,
1206 addr_in, &addr_out);
1207 if (addr_len < 0)
1208 {
1209 WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
1210 method);
1211 unireg_abort(1);
1212 }
1213 }
1214
1215 size_t const method_len(strlen(method));
1216 size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/);
1217
1218 *msg = malloc (msg_len);
1219 if (NULL != *msg) {
1220 char* const method_ptr(static_cast<char*>(*msg));
1221 strcpy (method_ptr, method);
1222 char* const addr_ptr(method_ptr + method_len + 1);
1223 strcpy (addr_ptr, addr_out);
1224
1225 WSREP_DEBUG("Prepared SST request: %s|%s", method_ptr, addr_ptr);
1226 }
1227 else {
1228 WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
1229 msg_len);
1230 unireg_abort(1);
1231 }
1232
1233 if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
1234
1235 return msg_len;
1236 }
1237
1238 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)1239 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
1240 {
1241 int ret = 0;
1242
1243 for (int tries=1; tries <= max_tries; tries++)
1244 {
1245 wsp::process proc (cmd_str, "r", env);
1246
1247 if (NULL != proc.pipe())
1248 {
1249 proc.wait();
1250 }
1251
1252 if ((ret = proc.error()))
1253 {
1254 WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
1255 tries, max_tries, proc.cmd(), ret, strerror(ret));
1256 sleep (1);
1257 }
1258 else
1259 {
1260 WSREP_DEBUG("SST script successfully completed.");
1261 break;
1262 }
1263 }
1264
1265 return -ret;
1266 }
1267
sst_reject_queries(my_bool close_conn)1268 static void sst_reject_queries(my_bool close_conn)
1269 {
1270 wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
1271 WSREP_INFO("Rejecting client queries for the duration of SST.");
1272 if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
1273 }
1274
sst_donate_mysqldump(const char * addr,const wsrep_uuid_t * uuid,const char * uuid_str,wsrep_seqno_t seqno,bool bypass,char ** env)1275 static int sst_donate_mysqldump (const char* addr,
1276 const wsrep_uuid_t* uuid,
1277 const char* uuid_str,
1278 wsrep_seqno_t seqno,
1279 bool bypass,
1280 char** env) // carries auth info
1281 {
1282 char host[256];
1283 wsp::Address address(addr);
1284 if (!address.is_valid())
1285 {
1286 WSREP_ERROR("Could not parse SST address : %s", addr);
1287 return 0;
1288 }
1289 memcpy(host, address.get_address(), address.get_address_len());
1290 int port= address.get_port();
1291 bool extra_args;
1292 size_t const cmd_len= estimate_cmd_len(&extra_args);
1293 wsp::string cmd_str(cmd_len);
1294
1295 if (!cmd_str())
1296 {
1297 WSREP_ERROR("sst_donate_mysqldump(): "
1298 "could not allocate cmd buffer of %zd bytes", cmd_len);
1299 return -ENOMEM;
1300 }
1301
1302 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
1303
1304 make_wsrep_defaults_file();
1305
1306 int ret= snprintf (cmd_str(), cmd_len,
1307 "wsrep_sst_mysqldump "
1308 WSREP_SST_OPT_ADDR " '%s' "
1309 WSREP_SST_OPT_PORT " '%d' "
1310 WSREP_SST_OPT_LPORT " '%u' "
1311 WSREP_SST_OPT_SOCKET " '%s' "
1312 "%s"
1313 WSREP_SST_OPT_GTID " '%s:%lld' "
1314 WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1315 "%s",
1316 addr, port, mysqld_port, mysqld_unix_port,
1317 wsrep_defaults_file, uuid_str,
1318 (long long)seqno, wsrep_gtid_domain_id,
1319 bypass ? " " WSREP_SST_OPT_BYPASS : "");
1320
1321 if (ret < 0 || size_t(ret) >= cmd_len)
1322 {
1323 WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
1324 return (ret < 0 ? ret : -EMSGSIZE);
1325 }
1326
1327 if (extra_args)
1328 copy_orig_argv(cmd_str() + ret);
1329
1330 WSREP_DEBUG("Running: '%s'", cmd_str());
1331
1332 ret= sst_run_shell (cmd_str(), env, 3);
1333
1334 wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
1335
1336 wsrep->sst_sent (wsrep, &state_id, ret);
1337
1338 return ret;
1339 }
1340
1341 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
1342
1343
1344 /*
1345 Create a file under data directory.
1346 */
sst_create_file(const char * name,const char * content)1347 static int sst_create_file(const char *name, const char *content)
1348 {
1349 int err= 0;
1350 char *real_name;
1351 char *tmp_name;
1352 ssize_t len;
1353 FILE *file;
1354
1355 len= strlen(mysql_real_data_home) + strlen(name) + 2;
1356 real_name= (char *) alloca(len);
1357
1358 snprintf(real_name, (size_t) len, "%s/%s", mysql_real_data_home, name);
1359
1360 tmp_name= (char *) alloca(len + 4);
1361 snprintf(tmp_name, (size_t) len + 4, "%s.tmp", real_name);
1362
1363 file= fopen(tmp_name, "w+");
1364
1365 if (0 == file)
1366 {
1367 err= errno;
1368 WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err, strerror(err));
1369 }
1370 else
1371 {
1372 // Write the specified content into the file.
1373 if (content != NULL)
1374 {
1375 fprintf(file, "%s\n", content);
1376 fsync(fileno(file));
1377 }
1378
1379 fclose(file);
1380
1381 if (rename(tmp_name, real_name) == -1)
1382 {
1383 err= errno;
1384 WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", tmp_name,
1385 real_name, err, strerror(err));
1386 }
1387 }
1388
1389 return err;
1390 }
1391
1392
run_sql_command(THD * thd,const char * query)1393 static int run_sql_command(THD *thd, const char *query)
1394 {
1395 thd->set_query((char *)query, strlen(query));
1396
1397 Parser_state ps;
1398 if (ps.init(thd, thd->query(), thd->query_length()))
1399 {
1400 WSREP_ERROR("SST query: %s failed", query);
1401 return -1;
1402 }
1403
1404 mysql_parse(thd, thd->query(), thd->query_length(), &ps, FALSE, FALSE);
1405 if (thd->is_error())
1406 {
1407 int const err= thd->get_stmt_da()->sql_errno();
1408 WSREP_WARN ("Error executing '%s': %d (%s)%s",
1409 query, err, thd->get_stmt_da()->message(),
1410 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
1411 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
1412 thd->clear_error();
1413 return -1;
1414 }
1415 return 0;
1416 }
1417
1418
sst_flush_tables(THD * thd)1419 static int sst_flush_tables(THD* thd)
1420 {
1421 WSREP_INFO("Flushing tables for SST...");
1422
1423 int err= 0;
1424 int not_used;
1425 /*
1426 Files created to notify the SST script about the outcome of table flush
1427 operation.
1428 */
1429 const char *flush_success= "tables_flushed";
1430 const char *flush_error= "sst_error";
1431
1432 CHARSET_INFO *current_charset= thd->variables.character_set_client;
1433
1434 if (!is_supported_parser_charset(current_charset))
1435 {
1436 /* Do not use non-supported parser character sets */
1437 WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1438 thd->variables.character_set_client = &my_charset_latin1;
1439 WSREP_WARN("For SST temporally setting character set to : %s",
1440 my_charset_latin1.csname);
1441 }
1442
1443 if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
1444 {
1445 err= -1;
1446 }
1447 else
1448 {
1449 /*
1450 Make sure logs are flushed after global read lock acquired. In case
1451 reload fails, we must also release the acquired FTWRL.
1452 */
1453 if (reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
1454 (TABLE_LIST*) 0, ¬_used))
1455 {
1456 thd->global_read_lock.unlock_global_read_lock(thd);
1457 err= -1;
1458 }
1459 }
1460
1461 thd->variables.character_set_client = current_charset;
1462
1463 if (err)
1464 {
1465 WSREP_ERROR("Failed to flush and lock tables");
1466
1467 /*
1468 The SST must be aborted as the flush tables failed. Notify this to SST
1469 script by creating the error file.
1470 */
1471 int tmp;
1472 if ((tmp= sst_create_file(flush_error, NULL))) {
1473 err= tmp;
1474 }
1475 }
1476 else
1477 {
1478 WSREP_INFO("Tables flushed.");
1479
1480 /*
1481 Tables have been flushed. Create a file with cluster state ID and
1482 wsrep_gtid_domain_id.
1483 */
1484 char content[100];
1485 snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid,
1486 (long long)wsrep_locked_seqno, wsrep_gtid_domain_id);
1487 err= sst_create_file(flush_success, content);
1488 }
1489
1490 return err;
1491 }
1492
1493
sst_disallow_writes(THD * thd,bool yes)1494 static void sst_disallow_writes (THD* thd, bool yes)
1495 {
1496 char query_str[64] = { 0, };
1497 ssize_t const query_max = sizeof(query_str) - 1;
1498 CHARSET_INFO *current_charset;
1499
1500 current_charset = thd->variables.character_set_client;
1501
1502 if (!is_supported_parser_charset(current_charset))
1503 {
1504 /* Do not use non-supported parser character sets */
1505 WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1506 thd->variables.character_set_client = &my_charset_latin1;
1507 WSREP_WARN("For SST temporally setting character set to : %s",
1508 my_charset_latin1.csname);
1509 }
1510
1511 snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
1512 yes ? 1 : 0);
1513
1514 if (run_sql_command(thd, query_str))
1515 {
1516 WSREP_ERROR("Failed to disallow InnoDB writes");
1517 }
1518 thd->variables.character_set_client = current_charset;
1519 }
1520
sst_donor_thread(void * a)1521 static void* sst_donor_thread (void* a)
1522 {
1523 sst_thread_arg* arg= (sst_thread_arg*)a;
1524
1525 WSREP_INFO("Running: '%s'", arg->cmd);
1526
1527 int err= 1;
1528 bool locked= false;
1529
1530 const char* out= NULL;
1531 const size_t out_len= 128;
1532 char out_buf[out_len];
1533
1534 wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED;
1535 // seqno of complete SST
1536 wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
1537
1538 // We turn off wsrep_on for this THD so that it can
1539 // operate with wsrep_ready == OFF
1540 // We also set this SST thread THD as system thread
1541 wsp::thd thd(FALSE, true);
1542 wsp::process proc(arg->cmd, "r", arg->env);
1543
1544 err= proc.error();
1545
1546 /* Inform server about SST script startup and release TO isolation */
1547 mysql_mutex_lock (&arg->lock);
1548 arg->err = -err;
1549 mysql_cond_signal (&arg->cond);
1550 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
1551
1552 if (proc.pipe() && !err)
1553 {
1554 wait_signal:
1555 out= my_fgets (out_buf, out_len, proc.pipe());
1556
1557 if (out)
1558 {
1559 const char magic_flush[]= "flush tables";
1560 const char magic_cont[]= "continue";
1561 const char magic_done[]= "done";
1562
1563 if (!strcasecmp (out, magic_flush))
1564 {
1565 err= sst_flush_tables (thd.ptr);
1566 if (!err)
1567 {
1568 sst_disallow_writes (thd.ptr, true);
1569 /*
1570 Lets also keep statements that modify binary logs (like RESET LOGS,
1571 RESET MASTER) from proceeding until the files have been transferred
1572 to the joiner node.
1573 */
1574 if (mysql_bin_log.is_open())
1575 {
1576 mysql_mutex_lock(mysql_bin_log.get_log_lock());
1577 }
1578
1579 locked= true;
1580 goto wait_signal;
1581 }
1582 }
1583 else if (!strcasecmp (out, magic_cont))
1584 {
1585 if (locked)
1586 {
1587 if (mysql_bin_log.is_open())
1588 {
1589 mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1590 mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1591 }
1592 sst_disallow_writes (thd.ptr, false);
1593 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1594 locked= false;
1595 }
1596 err= 0;
1597 goto wait_signal;
1598 }
1599 else if (!strncasecmp (out, magic_done, strlen(magic_done)))
1600 {
1601 err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
1602 &ret_uuid, &ret_seqno);
1603 }
1604 else
1605 {
1606 WSREP_WARN("Received unknown signal: '%s'", out);
1607 }
1608 }
1609 else
1610 {
1611 WSREP_ERROR("Failed to read from: %s", proc.cmd());
1612 proc.wait();
1613 }
1614 if (!err && proc.error()) err= proc.error();
1615 }
1616 else
1617 {
1618 WSREP_ERROR("Failed to execute: %s : %d (%s)",
1619 proc.cmd(), err, strerror(err));
1620 }
1621
1622 if (locked) // don't forget to unlock server before return
1623 {
1624 if (mysql_bin_log.is_open())
1625 {
1626 mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1627 mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1628 }
1629 sst_disallow_writes (thd.ptr, false);
1630 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1631 }
1632
1633 // signal to donor that SST is over
1634 struct wsrep_gtid const state_id = {
1635 ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
1636 };
1637 wsrep->sst_sent (wsrep, &state_id, -err);
1638 proc.wait();
1639
1640 return NULL;
1641 }
1642
1643
1644
sst_donate_other(const char * method,const char * addr,const char * uuid,wsrep_seqno_t seqno,bool bypass,char ** env)1645 static int sst_donate_other (const char* method,
1646 const char* addr,
1647 const char* uuid,
1648 wsrep_seqno_t seqno,
1649 bool bypass,
1650 char** env) // carries auth info
1651 {
1652 bool extra_args;
1653 size_t const cmd_len= estimate_cmd_len(&extra_args);
1654 wsp::string cmd_str(cmd_len);
1655
1656 if (!cmd_str())
1657 {
1658 WSREP_ERROR("sst_donate_other(): "
1659 "could not allocate cmd buffer of %zd bytes", cmd_len);
1660 return -ENOMEM;
1661 }
1662
1663 char* binlog_opt_val= NULL;
1664 char* binlog_index_opt_val= NULL;
1665
1666 int ret;
1667 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1668 {
1669 WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1670 return ret;
1671 }
1672
1673 if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
1674 {
1675 WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
1676 ret);
1677 if (binlog_opt_val) my_free(binlog_opt_val);
1678 return ret;
1679 }
1680
1681 make_wsrep_defaults_file();
1682
1683 ret= snprintf (cmd_str(), cmd_len,
1684 "wsrep_sst_%s "
1685 WSREP_SST_OPT_ROLE " 'donor' "
1686 WSREP_SST_OPT_ADDR " '%s' "
1687 WSREP_SST_OPT_LPORT " '%u' "
1688 WSREP_SST_OPT_SOCKET " '%s' "
1689 WSREP_SST_OPT_DATA " '%s' "
1690 "%s"
1691 WSREP_SST_OPT_GTID " '%s:%lld' "
1692 WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1693 "%s"
1694 "%s"
1695 "%s",
1696 method, addr, mysqld_port, mysqld_unix_port,
1697 mysql_real_data_home,
1698 wsrep_defaults_file,
1699 uuid, (long long) seqno, wsrep_gtid_domain_id,
1700 binlog_opt_val, binlog_index_opt_val,
1701 bypass ? " " WSREP_SST_OPT_BYPASS : "");
1702
1703 my_free(binlog_opt_val);
1704 my_free(binlog_index_opt_val);
1705
1706 if (ret < 0 || size_t(ret) >= cmd_len)
1707 {
1708 WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1709 return (ret < 0 ? ret : -EMSGSIZE);
1710 }
1711
1712 if (extra_args)
1713 copy_orig_argv(cmd_str() + ret);
1714
1715 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1716
1717 pthread_t tmp;
1718 sst_thread_arg arg(cmd_str(), env);
1719 mysql_mutex_lock (&arg.lock);
1720 ret = mysql_thread_create (key_wsrep_sst_donor, &tmp, NULL, sst_donor_thread, &arg);
1721 if (ret)
1722 {
1723 WSREP_ERROR("sst_donate_other(): mysql_thread_create() failed: %d (%s)",
1724 ret, strerror(ret));
1725 return ret;
1726 }
1727 mysql_cond_wait (&arg.cond, &arg.lock);
1728
1729 WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1730 return arg.err;
1731 }
1732
1733 /* return true if character can be a part of a filename */
filename_char(int const c)1734 static bool filename_char(int const c)
1735 {
1736 return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1737 }
1738
1739 /* return true if character can be a part of an address string */
address_char(int const c)1740 static bool address_char(int const c)
1741 {
1742 return filename_char(c) ||
1743 (c == ':') || (c == '[') || (c == ']') || (c == '/');
1744 }
1745
check_request_str(const char * const str,bool (* check)(int c))1746 static bool check_request_str(const char* const str,
1747 bool (*check) (int c))
1748 {
1749 for (size_t i(0); str[i] != '\0'; ++i)
1750 {
1751 if (!check(str[i]))
1752 {
1753 WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1754 str[i], str[i]);
1755 return true;
1756 }
1757 }
1758
1759 return false;
1760 }
1761
wsrep_sst_donate_cb(void * app_ctx,void * recv_ctx,const void * msg,size_t msg_len,const wsrep_gtid_t * current_gtid,const char * state,size_t state_len,bool bypass)1762 wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
1763 const void* msg, size_t msg_len,
1764 const wsrep_gtid_t* current_gtid,
1765 const char* state, size_t state_len,
1766 bool bypass)
1767 {
1768 const char* method = (char*)msg;
1769 size_t method_len = strlen (method);
1770
1771 if (check_request_str(method, filename_char))
1772 {
1773 WSREP_ERROR("Bad SST method name. SST canceled.");
1774 return WSREP_CB_FAILURE;
1775 }
1776
1777 const char* data = method + method_len + 1;
1778
1779 /* check for auth@addr separator */
1780 const char* addr= strrchr(data, '@');
1781 wsp::string remote_auth;
1782 if (addr)
1783 {
1784 remote_auth.set(strndup(data, addr - data));
1785 addr++;
1786 }
1787 else
1788 {
1789 // no auth part
1790 addr= data;
1791 }
1792
1793 if (check_request_str(addr, address_char))
1794 {
1795 WSREP_ERROR("Bad SST address string. SST canceled.");
1796 return WSREP_CB_FAILURE;
1797 }
1798
1799 char uuid_str[37];
1800 wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str));
1801
1802 /* This will be reset when sync callback is called.
1803 * Should we set wsrep_ready to FALSE here too? */
1804 wsrep_config_state->set(WSREP_MEMBER_DONOR);
1805
1806 wsp::env env(NULL);
1807 if (env.error())
1808 {
1809 WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1810 return WSREP_CB_FAILURE;
1811 }
1812
1813 int ret;
1814 if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth_real)))
1815 {
1816 WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1817 return WSREP_CB_FAILURE;
1818 }
1819
1820 if (remote_auth())
1821 {
1822 if ((ret= sst_append_env_var(env, WSREP_SST_REMOTE_AUTH_ENV,remote_auth())))
1823 {
1824 WSREP_ERROR("wsrep_sst_donate_cb(): appending remote auth env failed: "
1825 "%d", ret);
1826 return WSREP_CB_FAILURE;
1827 }
1828 }
1829
1830 if (data_home_dir)
1831 {
1832 if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1833 {
1834 WSREP_ERROR("wsrep_sst_donate_cb(): appending data "
1835 "directory failed: %d", ret);
1836 return WSREP_CB_FAILURE;
1837 }
1838 }
1839
1840 if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1841 {
1842 ret = sst_donate_mysqldump(addr, ¤t_gtid->uuid, uuid_str,
1843 current_gtid->seqno, bypass, env());
1844 }
1845 else
1846 {
1847 ret = sst_donate_other(method, addr, uuid_str,
1848 current_gtid->seqno, bypass, env());
1849 }
1850
1851 return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
1852 }
1853
wsrep_SE_init_grab()1854 void wsrep_SE_init_grab()
1855 {
1856 if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort();
1857 }
1858
wsrep_SE_init_wait()1859 void wsrep_SE_init_wait()
1860 {
1861 double total_wtime=0;
1862
1863 while (SE_initialized == false)
1864 {
1865 struct timespec wtime;
1866 set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS);
1867 time_t start_time = time(NULL);
1868 mysql_cond_timedwait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init, &wtime);
1869 time_t end_time = time(NULL);
1870
1871 if (!SE_initialized)
1872 {
1873 total_wtime += difftime(end_time, start_time);
1874 WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime);
1875 service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
1876 "WSREP state transfer ongoing, current seqno: %" PRId64 " waited %f secs", local_seqno, total_wtime);
1877 }
1878 }
1879
1880 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1881 }
1882
wsrep_SE_init_done()1883 void wsrep_SE_init_done()
1884 {
1885 mysql_cond_signal (&COND_wsrep_sst_init);
1886 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1887 }
1888
wsrep_SE_initialized()1889 void wsrep_SE_initialized()
1890 {
1891 SE_initialized = true;
1892 }
1893