1 /* Copyright 2008-2020 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #include "mariadb.h"
17 #include "wsrep_sst.h"
18 #include <inttypes.h>
19 #include <ctype.h>
20 #include <mysqld.h>
21 #include <m_ctype.h>
22 #include <strfunc.h>
23 #include <sql_class.h>
24 #include <set_var.h>
25 #include <sql_acl.h>
26 #include <sql_reload.h>
27 #include <sql_parse.h>
28 #include "wsrep_priv.h"
29 #include "wsrep_utils.h"
30 #include "wsrep_xid.h"
31 #include "wsrep_thd.h"
32 #include "wsrep_mysqld.h"
33
34 #include <cstdio>
35 #include <cstdlib>
36
37 #include <my_service_manager.h>
38
39 static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 +
40 sizeof(WSREP_SST_OPT_CONF) +
41 sizeof(WSREP_SST_OPT_CONF_SUFFIX) +
42 sizeof(WSREP_SST_OPT_CONF_EXTRA)]= {0};
43
44 const char* wsrep_sst_method = WSREP_SST_DEFAULT;
45 const char* wsrep_sst_receive_address= WSREP_SST_ADDRESS_AUTO;
46 const char* wsrep_sst_donor = "";
47 const char* wsrep_sst_auth = NULL;
48
49 // container for real auth string
50 static const char* sst_auth_real = NULL;
51 my_bool wsrep_sst_donor_rejects_queries= FALSE;
52
53 #define WSREP_EXTEND_TIMEOUT_INTERVAL 60
54 #define WSREP_TIMEDWAIT_SECONDS 30
55
56 bool sst_joiner_completed = false;
57 bool sst_donor_completed = false;
58
59 struct sst_thread_arg
60 {
61 const char* cmd;
62 char** env;
63 char* ret_str;
64 int err;
65 mysql_mutex_t lock;
66 mysql_cond_t cond;
67
sst_thread_argsst_thread_arg68 sst_thread_arg (const char* c, char** e)
69 : cmd(c), env(e), ret_str(0), err(-1)
70 {
71 mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
72 mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
73 }
74
~sst_thread_argsst_thread_arg75 ~sst_thread_arg()
76 {
77 mysql_cond_destroy (&cond);
78 mysql_mutex_unlock (&lock);
79 mysql_mutex_destroy (&lock);
80 }
81 };
82
wsrep_donor_monitor_end(void)83 static void wsrep_donor_monitor_end(void)
84 {
85 mysql_mutex_lock(&LOCK_wsrep_donor_monitor);
86 sst_donor_completed= true;
87 mysql_cond_signal(&COND_wsrep_donor_monitor);
88 mysql_mutex_unlock(&LOCK_wsrep_donor_monitor);
89 }
90
wsrep_joiner_monitor_end(void)91 static void wsrep_joiner_monitor_end(void)
92 {
93 mysql_mutex_lock(&LOCK_wsrep_joiner_monitor);
94 sst_joiner_completed= true;
95 mysql_cond_signal(&COND_wsrep_joiner_monitor);
96 mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor);
97 }
98
wsrep_sst_donor_monitor_thread(void * arg)99 static void* wsrep_sst_donor_monitor_thread(void *arg __attribute__((unused)))
100 {
101 int ret= 0;
102 unsigned long time_waited= 0;
103
104 mysql_mutex_lock(&LOCK_wsrep_donor_monitor);
105
106 WSREP_INFO("Donor monitor thread started to monitor");
107
108 wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
109 // operate with wsrep_ready == OFF
110
111 while (!sst_donor_completed)
112 {
113 timespec ts;
114 set_timespec(ts, WSREP_TIMEDWAIT_SECONDS);
115 time_t start_time= time(NULL);
116 ret= mysql_cond_timedwait(&COND_wsrep_donor_monitor, &LOCK_wsrep_donor_monitor, &ts);
117 time_t end_time= time(NULL);
118 time_waited+= difftime(end_time, start_time);
119
120 if (ret == ETIMEDOUT && !sst_donor_completed)
121 {
122 WSREP_DEBUG("Donor waited %lu sec, extending systemd startup timeout as SST"
123 "is not completed",
124 time_waited);
125 service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
126 "WSREP state transfer ongoing...");
127 }
128 }
129
130 WSREP_INFO("Donor monitor thread ended with total time %lu sec", time_waited);
131 mysql_mutex_unlock(&LOCK_wsrep_donor_monitor);
132
133 return NULL;
134 }
135
wsrep_sst_joiner_monitor_thread(void * arg)136 static void* wsrep_sst_joiner_monitor_thread(void *arg __attribute__((unused)))
137 {
138 int ret= 0;
139 unsigned long time_waited= 0;
140
141 mysql_mutex_lock(&LOCK_wsrep_joiner_monitor);
142
143 WSREP_INFO("Joiner monitor thread started to monitor");
144
145 wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
146 // operate with wsrep_ready == OFF
147
148 while (!sst_joiner_completed)
149 {
150 timespec ts;
151 set_timespec(ts, WSREP_TIMEDWAIT_SECONDS);
152 time_t start_time= time(NULL);
153 ret= mysql_cond_timedwait(&COND_wsrep_joiner_monitor, &LOCK_wsrep_joiner_monitor, &ts);
154 time_t end_time= time(NULL);
155 time_waited+= difftime(end_time, start_time);
156
157 if (ret == ETIMEDOUT && !sst_joiner_completed)
158 {
159 WSREP_DEBUG("Joiner waited %lu sec, extending systemd startup timeout as SST"
160 "is not completed",
161 time_waited);
162 service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
163 "WSREP state transfer ongoing...");
164 }
165 }
166
167 WSREP_INFO("Joiner monitor thread ended with total time %lu sec", time_waited);
168 mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor);
169
170 return NULL;
171 }
172
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)173 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
174 {
175 if ((! var->save_result.string_value.str) ||
176 (var->save_result.string_value.length == 0 ))
177 {
178 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
179 var->save_result.string_value.str ?
180 var->save_result.string_value.str : "NULL");
181 return 1;
182 }
183
184 return 0;
185 }
186
187 static const char* data_home_dir;
188
wsrep_set_data_home_dir(const char * data_dir)189 void wsrep_set_data_home_dir(const char *data_dir)
190 {
191 data_home_dir= (data_dir && *data_dir) ? data_dir : NULL;
192 }
193
make_wsrep_defaults_file()194 static void make_wsrep_defaults_file()
195 {
196 if (!wsrep_defaults_file[0])
197 {
198 char *ptr= wsrep_defaults_file;
199 char *end= ptr + sizeof(wsrep_defaults_file);
200 if (my_defaults_file)
201 ptr= strxnmov(ptr, end - ptr,
202 WSREP_SST_OPT_CONF, " '", my_defaults_file, "' ", NULL);
203
204 if (my_defaults_extra_file)
205 ptr= strxnmov(ptr, end - ptr,
206 WSREP_SST_OPT_CONF_EXTRA, " '", my_defaults_extra_file, "' ", NULL);
207
208 if (my_defaults_group_suffix)
209 ptr= strxnmov(ptr, end - ptr,
210 WSREP_SST_OPT_CONF_SUFFIX, " '", my_defaults_group_suffix, "' ", NULL);
211 }
212 }
213
214
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)215 bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
216 {
217 if ((! var->save_result.string_value.str) ||
218 (var->save_result.string_value.length > (FN_REFLEN - 1))) // safety
219 {
220 goto err;
221 }
222
223 return 0;
224
225 err:
226 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
227 var->save_result.string_value.str ?
228 var->save_result.string_value.str : "NULL");
229 return 1;
230 }
231
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)232 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
233 enum_var_type type)
234 {
235 return 0;
236 }
237
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)238 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
239 {
240 return 0;
241 }
242
sst_auth_real_set(const char * value)243 static bool sst_auth_real_set (const char* value)
244 {
245 const char* v= NULL;
246
247 if (value)
248 {
249 v= my_strdup(PSI_INSTRUMENT_ME, value, MYF(0));
250 }
251 else // its NULL
252 {
253 wsrep_sst_auth_free();
254 return 0;
255 }
256
257 if (v)
258 {
259 // set sst_auth_real
260 if (sst_auth_real) { my_free((void *) sst_auth_real); }
261 sst_auth_real= v;
262
263 // mask wsrep_sst_auth
264 if (strlen(sst_auth_real))
265 {
266 if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
267 wsrep_sst_auth= my_strdup(PSI_INSTRUMENT_ME, WSREP_SST_AUTH_MASK, MYF(0));
268 }
269 else
270 {
271 if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
272 wsrep_sst_auth= NULL;
273 }
274
275 return 0;
276 }
277 return 1;
278 }
279
wsrep_sst_auth_free()280 void wsrep_sst_auth_free()
281 {
282 if (wsrep_sst_auth) { my_free((void *) wsrep_sst_auth); }
283 if (sst_auth_real) { my_free((void *) sst_auth_real); }
284 wsrep_sst_auth= NULL;
285 sst_auth_real= NULL;
286 }
287
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)288 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
289 {
290 return sst_auth_real_set (wsrep_sst_auth);
291 }
292
wsrep_sst_auth_init()293 void wsrep_sst_auth_init ()
294 {
295 sst_auth_real_set(wsrep_sst_auth);
296 }
297
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)298 bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
299 {
300 return 0;
301 }
302
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)303 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
304 {
305 return 0;
306 }
307
308
wsrep_before_SE()309 bool wsrep_before_SE()
310 {
311 return (wsrep_provider != NULL
312 && strcmp (wsrep_provider, WSREP_NONE)
313 && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
314 && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
315 }
316
317 // Signal end of SST
wsrep_sst_complete(THD * thd,int const rcode,wsrep::gtid const sst_gtid)318 static bool wsrep_sst_complete (THD* thd,
319 int const rcode,
320 wsrep::gtid const sst_gtid)
321 {
322 Wsrep_client_service client_service(thd, thd->wsrep_cs());
323 Wsrep_server_state& server_state= Wsrep_server_state::instance();
324 enum wsrep::server_state::state state= server_state.state();
325 bool failed= false;
326 char start_pos_buf[FN_REFLEN];
327 ssize_t len= wsrep::print_to_c_str(sst_gtid, start_pos_buf, FN_REFLEN-1);
328 start_pos_buf[len]='\0';
329
330 // Do not call sst_received if we are not in joiner or
331 // initialized state on server. This is because it
332 // assumes we are on those states. Give error if we are
333 // in incorrect state.
334 if ((state == Wsrep_server_state::s_joiner ||
335 state == Wsrep_server_state::s_initialized))
336 {
337 Wsrep_server_state::instance().sst_received(client_service,
338 rcode);
339 WSREP_INFO("SST succeeded for position %s", start_pos_buf);
340 }
341 else
342 {
343 WSREP_ERROR("SST failed for position %s initialized %d server_state %s",
344 start_pos_buf,
345 server_state.is_initialized(),
346 wsrep::to_c_string(state));
347 failed= true;
348 }
349
350 wsrep_joiner_monitor_end();
351 return failed;
352 }
353
354 /*
355 If wsrep provider is loaded, inform that the new state snapshot
356 has been received. Also update the local checkpoint.
357
358 @param thd [IN]
359 @param uuid [IN] Initial state UUID
360 @param seqno [IN] Initial state sequence number
361 @param state [IN] Always NULL, also ignored by wsrep provider (?)
362 @param state_len [IN] Always 0, also ignored by wsrep provider (?)
363 @return true when successful, false if error
364 */
wsrep_sst_received(THD * thd,const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno,const void * const state,size_t const state_len)365 bool wsrep_sst_received (THD* thd,
366 const wsrep_uuid_t& uuid,
367 wsrep_seqno_t const seqno,
368 const void* const state,
369 size_t const state_len)
370 {
371 bool error= false;
372 /*
373 To keep track of whether the local uuid:seqno should be updated. Also, note
374 that local state (uuid:seqno) is updated/checkpointed only after we get an
375 OK from wsrep provider. By doing so, the values remain consistent across
376 the server & wsrep provider.
377 */
378 /*
379 TODO: Handle backwards compatibility. WSREP API v25 does not have
380 wsrep schema.
381 */
382 /*
383 Logical SST methods (mysqldump etc) don't update InnoDB sys header.
384 Reset the SE checkpoint before recovering view in order to avoid
385 sanity check failure.
386 */
387 wsrep::gtid const sst_gtid(wsrep::id(uuid.data, sizeof(uuid.data)),
388 wsrep::seqno(seqno));
389
390 if (!wsrep_before_SE()) {
391 wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined());
392 wsrep_set_SE_checkpoint(sst_gtid, wsrep_gtid_server.gtid());
393 }
394 wsrep_verify_SE_checkpoint(uuid, seqno);
395
396 /*
397 Both wsrep_init_SR() and wsrep_recover_view() may use
398 wsrep thread pool. Restore original thd context before returning.
399 */
400 if (thd) {
401 wsrep_store_threadvars(thd);
402 }
403 else {
404 set_current_thd(nullptr);
405 }
406
407 /* During sst WSREP(thd) is not yet set for joiner. */
408 if (WSREP_ON)
409 {
410 int const rcode(seqno < 0 ? seqno : 0);
411 error= wsrep_sst_complete(thd,rcode, sst_gtid);
412 }
413
414 return error;
415 }
416
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)417 static int sst_scan_uuid_seqno (const char* str,
418 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
419 {
420 int offt= wsrep_uuid_scan (str, strlen(str), uuid);
421 errno= 0; /* Reset the errno */
422 if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
423 {
424 *seqno= strtoll (str + offt + 1, NULL, 10);
425 if (*seqno != LLONG_MAX || errno != ERANGE)
426 {
427 return 0;
428 }
429 }
430
431 WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
432 return -EINVAL;
433 }
434
435 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)436 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
437 {
438 char* ret= fgets (buf, buf_len, stream);
439
440 if (ret)
441 {
442 size_t len= strlen(ret);
443 if (len > 0 && ret[len - 1] == '\n') ret[len - 1]= '\0';
444 }
445
446 return ret;
447 }
448
449 /*
450 Generate "name 'value'" string.
451 */
generate_name_value(const char * name,const char * value)452 static char* generate_name_value(const char* name, const char* value)
453 {
454 size_t name_len= strlen(name);
455 size_t value_len= strlen(value);
456 char* buf=
457 (char*) my_malloc(PSI_INSTRUMENT_ME, (name_len + value_len + 5), MYF(0));
458 if (buf)
459 {
460 char* ref= buf;
461 *ref++ = ' ';
462 memcpy(ref, name, name_len * sizeof(char));
463 ref += name_len;
464 *ref++ = ' ';
465 *ref++ = '\'';
466 memcpy(ref, value, value_len * sizeof(char));
467 ref += value_len;
468 *ref++ = '\'';
469 *ref = 0;
470 }
471 return buf;
472 }
473 /*
474 Generate binlog option string for sst_donate_other(), sst_prepare_other().
475
476 Returns zero on success, negative error code otherwise.
477
478 String containing binlog name is stored in param ret if binlog is enabled
479 and GTID mode is on, otherwise empty string. Returned string should be
480 freed with my_free().
481 */
generate_binlog_opt_val(char ** ret)482 static int generate_binlog_opt_val(char** ret)
483 {
484 DBUG_ASSERT(ret);
485 *ret= NULL;
486 if (opt_bin_log)
487 {
488 assert(opt_bin_logname);
489 *ret= strcmp(opt_bin_logname, "0") ?
490 generate_name_value(WSREP_SST_OPT_BINLOG,
491 opt_bin_logname) :
492 my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
493 }
494 else
495 {
496 *ret= my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
497 }
498 if (!*ret) return -ENOMEM;
499 return 0;
500 }
501
generate_binlog_index_opt_val(char ** ret)502 static int generate_binlog_index_opt_val(char** ret)
503 {
504 DBUG_ASSERT(ret);
505 *ret= NULL;
506 if (opt_binlog_index_name)
507 {
508 *ret= strcmp(opt_binlog_index_name, "0") ?
509 generate_name_value(WSREP_SST_OPT_BINLOG_INDEX,
510 opt_binlog_index_name) :
511 my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
512 }
513 else
514 {
515 *ret= my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
516 }
517 if (!*ret) return -ENOMEM;
518 return 0;
519 }
520
sst_joiner_thread(void * a)521 static void* sst_joiner_thread (void* a)
522 {
523 sst_thread_arg* arg= (sst_thread_arg*) a;
524 int err= 1;
525
526 {
527 THD* thd;
528 const char magic[]= "ready";
529 const size_t magic_len= sizeof(magic) - 1;
530 const size_t out_len= 512;
531 char out[out_len];
532
533 WSREP_INFO("Running: '%s'", arg->cmd);
534
535 wsp::process proc (arg->cmd, "r", arg->env);
536
537 if (proc.pipe() && !proc.error())
538 {
539 const char* tmp= my_fgets (out, out_len, proc.pipe());
540
541 if (!tmp || strlen(tmp) < (magic_len + 2) ||
542 strncasecmp (tmp, magic, magic_len))
543 {
544 WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
545 magic, arg->cmd, tmp);
546 proc.wait();
547 if (proc.error()) err= proc.error();
548 }
549 else
550 {
551 err= 0;
552 }
553 }
554 else
555 {
556 err= proc.error();
557 WSREP_ERROR("Failed to execute: %s : %d (%s)",
558 arg->cmd, err, strerror(err));
559 }
560
561 /*
562 signal sst_prepare thread with ret code,
563 it will go on sending SST request
564 */
565 mysql_mutex_lock (&arg->lock);
566 if (!err)
567 {
568 arg->ret_str= strdup (out + magic_len + 1);
569 if (!arg->ret_str) err= ENOMEM;
570 }
571 arg->err= -err;
572 mysql_cond_signal (&arg->cond);
573 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
574
575 if (err) return NULL; /* lp:808417 - return immediately, don't signal
576 * initializer thread to ensure single thread of
577 * shutdown. */
578
579 wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
580 wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
581
582 // in case of successfull receiver start, wait for SST
583 // completion/end
584 char* tmp= my_fgets (out, out_len, proc.pipe());
585
586 proc.wait();
587
588 err= EINVAL;
589
590 if (!tmp)
591 {
592 WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
593 "joiner script.");
594 if (proc.error()) err= proc.error();
595 }
596 else
597 {
598 // Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any).
599 const char *pos= strchr(out, ' ');
600
601 if (!pos) {
602
603 if (wsrep_gtid_mode)
604 {
605 // There is no wsrep_gtid_domain_id (some older version SST script?).
606 WSREP_WARN("Did not find domain ID from SST script output '%s'. "
607 "Domain ID must be set manually to keep binlog consistent",
608 out);
609 }
610 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
611
612 } else {
613 // Scan state ID first followed by wsrep_gtid_domain_id.
614 unsigned long int domain_id;
615
616 // Null-terminate the state-id.
617 out[pos - out]= 0;
618 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
619
620 if (err)
621 {
622 goto err;
623 }
624 else if (wsrep_gtid_mode)
625 {
626 errno= 0; /* Reset the errno */
627 domain_id= strtoul(pos + 1, NULL, 10);
628 err= errno;
629
630 /* Check if we received a valid gtid_domain_id. */
631 if (err == EINVAL || err == ERANGE)
632 {
633 WSREP_ERROR("Failed to get donor wsrep_gtid_domain_id.");
634 err= EINVAL;
635 goto err;
636 } else {
637 wsrep_gtid_server.domain_id= (uint32) domain_id;
638 wsrep_gtid_domain_id= (uint32)domain_id;
639 }
640 }
641 }
642 }
643
644 err:
645
646 wsrep::gtid ret_gtid;
647
648 if (err)
649 {
650 ret_gtid= wsrep::gtid::undefined();
651 }
652 else
653 {
654 ret_gtid= wsrep::gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)),
655 wsrep::seqno(ret_seqno));
656 }
657
658 /*
659 Tell initializer thread that SST is complete
660 For that initialize a THD
661 */
662 if (my_thread_init())
663 {
664 WSREP_ERROR("my_thread_init() failed, can't signal end of SST. "
665 "Aborting.");
666 unireg_abort(1);
667 }
668
669 thd= new THD(next_thread_id());
670
671 if (!thd)
672 {
673 WSREP_ERROR("Failed to allocate THD to restore view from local state, "
674 "can't signal end of SST. Aborting.");
675 unireg_abort(1);
676 }
677
678 thd->thread_stack= (char*) &thd;
679 thd->security_ctx->skip_grants();
680 thd->system_thread= SYSTEM_THREAD_GENERIC;
681 thd->real_id= pthread_self();
682
683 wsrep_assign_from_threadvars(thd);
684 wsrep_store_threadvars(thd);
685
686 /* */
687 thd->variables.wsrep_on = 0;
688 /* No binlogging */
689 thd->variables.sql_log_bin = 0;
690 thd->variables.option_bits &= ~OPTION_BIN_LOG;
691 /* No general log */
692 thd->variables.option_bits |= OPTION_LOG_OFF;
693 /* Read committed isolation to avoid gap locking */
694 thd->variables.tx_isolation= ISO_READ_COMMITTED;
695
696 wsrep_sst_complete (thd, -err, ret_gtid);
697
698 delete thd;
699 my_thread_end();
700 }
701
702 return NULL;
703 }
704
705 #define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH"
706 #define WSREP_SST_REMOTE_AUTH_ENV "WSREP_SST_OPT_REMOTE_AUTH"
707 #define DATA_HOME_DIR_ENV "INNODB_DATA_HOME_DIR"
708
sst_append_env_var(wsp::env & env,const char * const var,const char * const val)709 static int sst_append_env_var(wsp::env& env,
710 const char* const var,
711 const char* const val)
712 {
713 int const env_str_size= strlen(var) + 1 /* = */
714 + (val ? strlen(val) : 0) + 1 /* \0 */;
715
716 wsp::string env_str(env_str_size); // for automatic cleanup on return
717 if (!env_str()) return -ENOMEM;
718
719 int ret= snprintf(env_str(), env_str_size, "%s=%s", var, val ? val : "");
720
721 if (ret < 0 || ret >= env_str_size)
722 {
723 WSREP_ERROR("sst_append_env_var(): snprintf(%s=%s) failed: %d",
724 var, val, ret);
725 return (ret < 0 ? ret : -EMSGSIZE);
726 }
727
728 env.append(env_str());
729 return -env.error();
730 }
731
732 #ifdef __WIN__
733 /*
734 Space, single quote, ampersand, backquote, I/O redirection
735 characters, caret, all brackets, plus, exclamation and comma
736 characters require text to be enclosed in double quotes:
737 */
738 #define IS_SPECIAL(c) \
739 (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
740 c == '>' || c == '<' || c == ';' || c == '^' || \
741 c == '[' || c == ']' || c == '{' || c == '}' || \
742 c == '(' || c == ')' || c == '+' || c == '!' || \
743 c == ',')
744 /*
745 Inside values, equals character are interpreted as special
746 character and requires quotation:
747 */
748 #define IS_SPECIAL_V(c) (IS_SPECIAL(c) || c == '=')
749 /*
750 Double quotation mark and percent characters require escaping:
751 */
752 #define IS_REQ_ESCAPING(c) (c == '""' || c == '%')
753 #else
754 /*
755 Space, single quote, ampersand, backquote, and I/O redirection
756 characters require text to be enclosed in double quotes. The
757 semicolon is used to separate shell commands, so it must be
758 enclosed in double quotes as well:
759 */
760 #define IS_SPECIAL(c) \
761 (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
762 c == '>' || c == '<' || c == ';')
763 /*
764 Inside values, characters are interpreted as in parameter names:
765 */
766 #define IS_SPECIAL_V(c) IS_SPECIAL(c)
767 /*
768 Double quotation mark and backslash characters require
769 backslash prefixing, the dollar symbol is used to substitute
770 a variable value, therefore it also requires escaping:
771 */
772 #define IS_REQ_ESCAPING(c) (c == '"' || c == '\\' || c == '$')
773 #endif
774
estimate_cmd_len(bool * extra_args)775 static size_t estimate_cmd_len (bool* extra_args)
776 {
777 /*
778 The length of the area reserved for the control parameters
779 of the SST script (excluding the copying of the original
780 mysqld arguments):
781 */
782 size_t cmd_len= 4096;
783 bool extra= false;
784 /*
785 If mysqld was started with arguments, add them all:
786 */
787 if (orig_argc > 1)
788 {
789 for (int i = 1; i < orig_argc; i++)
790 {
791 const char* arg= orig_argv[i];
792 size_t n= strlen(arg);
793 if (n == 0) continue;
794 cmd_len += n;
795 bool quotation= false;
796 char c;
797 while ((c = *arg++) != 0)
798 {
799 if (IS_SPECIAL(c))
800 {
801 quotation= true;
802 }
803 else if (IS_REQ_ESCAPING(c))
804 {
805 cmd_len++;
806 #ifdef __WIN__
807 quotation= true;
808 #endif
809 }
810 /*
811 If the equals symbol is encountered, then we need to separately
812 process the right side:
813 */
814 else if (c == '=')
815 {
816 /* Perhaps we need to quote the left part of the argument: */
817 if (quotation)
818 {
819 cmd_len += 2;
820 /*
821 Reset the quotation flag, since now the status for
822 the right side of the expression will be saved here:
823 */
824 quotation= false;
825 }
826 while ((c = *arg++) != 0)
827 {
828 if (IS_SPECIAL_V(c))
829 {
830 quotation= true;
831 }
832 else if (IS_REQ_ESCAPING(c))
833 {
834 cmd_len++;
835 #ifdef __WIN__
836 quotation= true;
837 #endif
838 }
839 }
840 break;
841 }
842 }
843 /* Perhaps we need to quote the entire argument or its right part: */
844 if (quotation)
845 {
846 cmd_len += 2;
847 }
848 }
849 extra = true;
850 cmd_len += strlen(WSREP_SST_OPT_MYSQLD);
851 /*
852 Add the separating spaces between arguments,
853 and one additional space before "--mysqld-args":
854 */
855 cmd_len += orig_argc;
856 }
857 *extra_args= extra;
858 return cmd_len;
859 }
860
copy_orig_argv(char * cmd_str)861 static void copy_orig_argv (char* cmd_str)
862 {
863 /*
864 If mysqld was started with arguments, copy them all:
865 */
866 if (orig_argc > 1)
867 {
868 size_t n = strlen(WSREP_SST_OPT_MYSQLD);
869 *cmd_str++ = ' ';
870 memcpy(cmd_str, WSREP_SST_OPT_MYSQLD, n * sizeof(char));
871 cmd_str += n;
872 for (int i = 1; i < orig_argc; i++)
873 {
874 char* arg= orig_argv[i];
875 n = strlen(arg);
876 if (n == 0) continue;
877 *cmd_str++ = ' ';
878 bool quotation= false;
879 bool plain= true;
880 char *arg_scan= arg;
881 char c;
882 while ((c = *arg_scan++) != 0)
883 {
884 if (IS_SPECIAL(c))
885 {
886 quotation= true;
887 }
888 else if (IS_REQ_ESCAPING(c))
889 {
890 plain= false;
891 #ifdef __WIN__
892 quotation= true;
893 #endif
894 }
895 /*
896 If the equals symbol is encountered, then we need to separately
897 process the right side:
898 */
899 else if (c == '=')
900 {
901 /* Calculate length of the Left part of the argument: */
902 size_t m = (size_t) (arg_scan - arg) - 1;
903 if (m)
904 {
905 /* Perhaps we need to quote the left part of the argument: */
906 if (quotation)
907 {
908 *cmd_str++ = '"';
909 }
910 /*
911 If there were special characters inside, then we can use
912 the fast memcpy function:
913 */
914 if (plain)
915 {
916 memcpy(cmd_str, arg, m * sizeof(char));
917 cmd_str += m;
918 /* Left part of the argument has already been processed: */
919 n -= m;
920 arg += m;
921 }
922 /* Otherwise we need to prefix individual characters: */
923 else
924 {
925 n -= m;
926 while (m)
927 {
928 c = *arg++;
929 if (IS_REQ_ESCAPING(c))
930 {
931 #ifdef __WIN__
932 *cmd_str++ = c;
933 #else
934 *cmd_str++ = '\\';
935 #endif
936 }
937 *cmd_str++ = c;
938 m--;
939 }
940 /*
941 Reset the plain string flag, since now the status for
942 the right side of the expression will be saved here:
943 */
944 plain= true;
945 }
946 /* Perhaps we need to quote the left part of the argument: */
947 if (quotation)
948 {
949 *cmd_str++ = '"';
950 /*
951 Reset the quotation flag, since now the status for
952 the right side of the expression will be saved here:
953 */
954 quotation= false;
955 }
956 }
957 /* Copy equals symbol: */
958 *cmd_str++ = '=';
959 arg++;
960 n--;
961 /* Let's deal with the left side of the expression: */
962 while ((c = *arg_scan++) != 0)
963 {
964 if (IS_SPECIAL_V(c))
965 {
966 quotation= true;
967 }
968 else if (IS_REQ_ESCAPING(c))
969 {
970 plain= false;
971 #ifdef __WIN__
972 quotation= true;
973 #endif
974 }
975 }
976 break;
977 }
978 }
979 if (n)
980 {
981 /* Perhaps we need to quote the entire argument or its right part: */
982 if (quotation)
983 {
984 *cmd_str++ = '"';
985 }
986 /*
987 If there were no special characters inside, then we can use
988 the fast memcpy function:
989 */
990 if (plain)
991 {
992 memcpy(cmd_str, arg, n * sizeof(char));
993 cmd_str += n;
994 }
995 /* Otherwise we need to prefix individual characters: */
996 else
997 {
998 while ((c = *arg++) != 0)
999 {
1000 if (IS_REQ_ESCAPING(c))
1001 {
1002 #ifdef __WIN__
1003 *cmd_str++ = c;
1004 #else
1005 *cmd_str++ = '\\';
1006 #endif
1007 }
1008 *cmd_str++ = c;
1009 }
1010 }
1011 /* Perhaps we need to quote the entire argument or its right part: */
1012 if (quotation)
1013 {
1014 *cmd_str++ = '"';
1015 }
1016 }
1017 }
1018 /*
1019 Add a terminating null character (not counted in the length,
1020 since we've overwritten the original null character which
1021 was previously added by snprintf:
1022 */
1023 *cmd_str = 0;
1024 }
1025 }
1026
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)1027 static ssize_t sst_prepare_other (const char* method,
1028 const char* sst_auth,
1029 const char* addr_in,
1030 const char** addr_out)
1031 {
1032 bool extra_args;
1033 size_t const cmd_len= estimate_cmd_len(&extra_args);
1034 wsp::string cmd_str(cmd_len);
1035
1036 if (!cmd_str())
1037 {
1038 WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %zd bytes",
1039 cmd_len);
1040 return -ENOMEM;
1041 }
1042
1043 char* binlog_opt_val= NULL;
1044 char* binlog_index_opt_val= NULL;
1045
1046 int ret;
1047 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1048 {
1049 WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
1050 ret);
1051 return ret;
1052 }
1053
1054 if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
1055 {
1056 WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
1057 ret);
1058 if (binlog_opt_val) my_free(binlog_opt_val);
1059 return ret;
1060 }
1061
1062 make_wsrep_defaults_file();
1063
1064 ret= snprintf (cmd_str(), cmd_len,
1065 "wsrep_sst_%s "
1066 WSREP_SST_OPT_ROLE " 'joiner' "
1067 WSREP_SST_OPT_ADDR " '%s' "
1068 WSREP_SST_OPT_DATA " '%s' "
1069 "%s"
1070 WSREP_SST_OPT_PARENT " '%d'"
1071 "%s"
1072 "%s",
1073 method, addr_in, mysql_real_data_home,
1074 wsrep_defaults_file,
1075 (int)getpid(),
1076 binlog_opt_val, binlog_index_opt_val);
1077
1078 my_free(binlog_opt_val);
1079 my_free(binlog_index_opt_val);
1080
1081 if (ret < 0 || size_t(ret) >= cmd_len)
1082 {
1083 WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
1084 return (ret < 0 ? ret : -EMSGSIZE);
1085 }
1086
1087 if (extra_args)
1088 copy_orig_argv(cmd_str() + ret);
1089
1090 wsp::env env(NULL);
1091 if (env.error())
1092 {
1093 WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
1094 return -env.error();
1095 }
1096
1097 if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth)))
1098 {
1099 WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
1100 return ret;
1101 }
1102
1103 if (data_home_dir)
1104 {
1105 if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1106 {
1107 WSREP_ERROR("sst_prepare_other(): appending data "
1108 "directory failed: %d", ret);
1109 return ret;
1110 }
1111 }
1112
1113 pthread_t tmp, monitor;
1114 sst_thread_arg arg(cmd_str(), env());
1115
1116 mysql_mutex_lock (&arg.lock);
1117
1118 ret = mysql_thread_create (key_wsrep_sst_joiner_monitor, &monitor, NULL, wsrep_sst_joiner_monitor_thread, NULL);
1119
1120 if (ret)
1121 {
1122 WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1123 ret, strerror(ret));
1124 return -ret;
1125 }
1126
1127 sst_joiner_completed= false;
1128
1129 ret= mysql_thread_create (key_wsrep_sst_joiner, &tmp, NULL, sst_joiner_thread, &arg);
1130
1131 if (ret)
1132 {
1133 WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1134 ret, strerror(ret));
1135
1136 pthread_detach(monitor);
1137 return -ret;
1138 }
1139
1140 mysql_cond_wait (&arg.cond, &arg.lock);
1141
1142 *addr_out= arg.ret_str;
1143
1144 if (!arg.err)
1145 ret= strlen(*addr_out);
1146 else
1147 {
1148 assert (arg.err < 0);
1149 ret= arg.err;
1150 }
1151
1152 pthread_detach (tmp);
1153 pthread_detach (monitor);
1154
1155 return ret;
1156 }
1157
1158 extern uint mysqld_port;
1159
1160 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)1161 static ssize_t sst_prepare_mysqldump (const char* addr_in,
1162 const char** addr_out)
1163 {
1164 ssize_t ret= strlen (addr_in);
1165
1166 if (!strrchr(addr_in, ':'))
1167 {
1168 ssize_t s= ret + 7;
1169 char* tmp= (char*) malloc (s);
1170
1171 if (tmp)
1172 {
1173 ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
1174
1175 if (ret > 0 && ret < s)
1176 {
1177 *addr_out= tmp;
1178 return ret;
1179 }
1180 if (ret > 0) /* buffer too short */ ret= -EMSGSIZE;
1181 free (tmp);
1182 }
1183 else {
1184 ret= -ENOMEM;
1185 }
1186
1187 WSREP_ERROR ("Could not prepare state transfer request: "
1188 "adding default port failed: %zd.", ret);
1189 }
1190 else {
1191 *addr_out= addr_in;
1192 }
1193
1194 pthread_t monitor;
1195 ret = mysql_thread_create (key_wsrep_sst_joiner_monitor, &monitor, NULL, wsrep_sst_joiner_monitor_thread, NULL);
1196
1197 if (ret)
1198 {
1199 WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1200 ret, strerror(ret));
1201 return -ret;
1202 }
1203
1204 sst_joiner_completed= false;
1205 pthread_detach (monitor);
1206
1207 return ret;
1208 }
1209
wsrep_sst_prepare()1210 std::string wsrep_sst_prepare()
1211 {
1212 const ssize_t ip_max= 256;
1213 char ip_buf[ip_max];
1214 const char* addr_in= NULL;
1215 const char* addr_out= NULL;
1216 const char* method;
1217
1218 if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
1219 {
1220 return WSREP_STATE_TRANSFER_TRIVIAL;
1221 }
1222
1223 /*
1224 Figure out SST receive address. Common for all SST methods.
1225 */
1226 // Attempt 1: wsrep_sst_receive_address
1227 if (wsrep_sst_receive_address &&
1228 strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
1229 {
1230 addr_in= wsrep_sst_receive_address;
1231 }
1232
1233 //Attempt 2: wsrep_node_address
1234 else if (wsrep_node_address && strlen(wsrep_node_address))
1235 {
1236 wsp::Address addr(wsrep_node_address);
1237
1238 if (!addr.is_valid())
1239 {
1240 WSREP_ERROR("Could not parse wsrep_node_address : %s",
1241 wsrep_node_address);
1242 throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable");
1243 }
1244 memcpy(ip_buf, addr.get_address(), addr.get_address_len());
1245 addr_in= ip_buf;
1246 }
1247 // Attempt 3: Try to get the IP from the list of available interfaces.
1248 else
1249 {
1250 ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
1251
1252 if (ret && ret < ip_max)
1253 {
1254 addr_in= ip_buf;
1255 }
1256 else
1257 {
1258 WSREP_ERROR("Failed to guess address to accept state transfer. "
1259 "wsrep_sst_receive_address must be set manually.");
1260 throw wsrep::runtime_error("Could not prepare state transfer request");
1261 }
1262 }
1263
1264 ssize_t addr_len= -ENOSYS;
1265 method = wsrep_sst_method;
1266 if (!strcmp(method, WSREP_SST_MYSQLDUMP))
1267 {
1268 addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
1269 if (addr_len < 0)
1270 {
1271 throw wsrep::runtime_error("Could not prepare mysqldimp address");
1272 }
1273 }
1274 else
1275 {
1276 /*! A heuristic workaround until we learn how to stop and start engines */
1277 if (Wsrep_server_state::instance().is_initialized() &&
1278 Wsrep_server_state::instance().state() == Wsrep_server_state::s_joiner)
1279 {
1280 if (!strcmp(method, WSREP_SST_XTRABACKUP) ||
1281 !strcmp(method, WSREP_SST_XTRABACKUPV2))
1282 {
1283 WSREP_WARN("The %s SST method is deprecated, so it is automatically "
1284 "replaced by %s", method, WSREP_SST_MARIABACKUP);
1285 method = WSREP_SST_MARIABACKUP;
1286 }
1287 // we already did SST at initializaiton, now engines are running
1288 // sql_print_information() is here because the message is too long
1289 // for WSREP_INFO.
1290 sql_print_information ("WSREP: "
1291 "You have configured '%s' state snapshot transfer method "
1292 "which cannot be performed on a running server. "
1293 "Wsrep provider won't be able to fall back to it "
1294 "if other means of state transfer are unavailable. "
1295 "In that case you will need to restart the server.",
1296 method);
1297 return "";
1298 }
1299
1300 addr_len = sst_prepare_other (method, sst_auth_real,
1301 addr_in, &addr_out);
1302 if (addr_len < 0)
1303 {
1304 WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
1305 method);
1306 throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable");
1307 }
1308 }
1309
1310 std::string ret;
1311 ret += method;
1312 ret.push_back('\0');
1313 ret += addr_out;
1314
1315 const char* method_ptr(ret.data());
1316 const char* addr_ptr(ret.data() + strlen(method_ptr) + 1);
1317 WSREP_DEBUG("Prepared SST request: %s|%s", method_ptr, addr_ptr);
1318
1319 if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
1320
1321 return ret;
1322 }
1323
1324 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)1325 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
1326 {
1327 int ret= 0;
1328
1329 for (int tries=1; tries <= max_tries; tries++)
1330 {
1331 wsp::process proc (cmd_str, "r", env);
1332
1333 if (NULL != proc.pipe())
1334 {
1335 proc.wait();
1336 }
1337
1338 if ((ret= proc.error()))
1339 {
1340 WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
1341 tries, max_tries, proc.cmd(), ret, strerror(ret));
1342 sleep (1);
1343 }
1344 else
1345 {
1346 WSREP_DEBUG("SST script successfully completed.");
1347 break;
1348 }
1349 }
1350
1351 return -ret;
1352 }
1353
sst_reject_queries(my_bool close_conn)1354 static void sst_reject_queries(my_bool close_conn)
1355 {
1356 WSREP_INFO("Rejecting client queries for the duration of SST.");
1357 if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
1358 }
1359
sst_donate_mysqldump(const char * addr,const wsrep::gtid & gtid,bool bypass,char ** env)1360 static int sst_donate_mysqldump (const char* addr,
1361 const wsrep::gtid& gtid,
1362 bool bypass,
1363 char** env) // carries auth info
1364 {
1365 char host[256];
1366 wsp::Address address(addr);
1367 if (!address.is_valid())
1368 {
1369 WSREP_ERROR("Could not parse SST address : %s", addr);
1370 return 0;
1371 }
1372 memcpy(host, address.get_address(), address.get_address_len());
1373 int port= address.get_port();
1374 bool extra_args;
1375 size_t const cmd_len= estimate_cmd_len(&extra_args);
1376 wsp::string cmd_str(cmd_len);
1377
1378 if (!cmd_str())
1379 {
1380 WSREP_ERROR("sst_donate_mysqldump(): "
1381 "could not allocate cmd buffer of %zd bytes", cmd_len);
1382 return -ENOMEM;
1383 }
1384
1385 /*
1386 we enable new client connections so that mysqldump donation can connect in,
1387 but we reject local connections from modifyingcdata during SST, to keep
1388 data intact
1389 */
1390 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
1391
1392 make_wsrep_defaults_file();
1393
1394 std::ostringstream uuid_oss;
1395 uuid_oss << gtid.id();
1396 int ret= snprintf (cmd_str(), cmd_len,
1397 "wsrep_sst_mysqldump "
1398 WSREP_SST_OPT_ADDR " '%s' "
1399 WSREP_SST_OPT_PORT " '%u' "
1400 WSREP_SST_OPT_LPORT " '%u' "
1401 WSREP_SST_OPT_SOCKET " '%s' "
1402 "%s"
1403 WSREP_SST_OPT_GTID " '%s:%lld,%d-%d-%llu' "
1404 WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1405 "%s",
1406 addr, port, mysqld_port, mysqld_unix_port,
1407 wsrep_defaults_file,
1408 uuid_oss.str().c_str(), gtid.seqno().get(),
1409 wsrep_gtid_server.domain_id, wsrep_gtid_server.server_id,
1410 wsrep_gtid_server.seqno(),
1411 wsrep_gtid_server.domain_id,
1412 bypass ? " " WSREP_SST_OPT_BYPASS : "");
1413
1414 if (ret < 0 || size_t(ret) >= cmd_len)
1415 {
1416 WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
1417 return (ret < 0 ? ret : -EMSGSIZE);
1418 }
1419
1420 if (extra_args)
1421 copy_orig_argv(cmd_str() + ret);
1422
1423 WSREP_DEBUG("Running: '%s'", cmd_str());
1424
1425 ret= sst_run_shell (cmd_str(), env, 3);
1426
1427 wsrep::gtid sst_sent_gtid(ret == 0 ?
1428 gtid :
1429 wsrep::gtid(gtid.id(),
1430 wsrep::seqno::undefined()));
1431 Wsrep_server_state::instance().sst_sent(sst_sent_gtid, ret);
1432
1433 return ret;
1434 }
1435
1436 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
1437
1438 /*
1439 Create a file under data directory.
1440 */
sst_create_file(const char * name,const char * content)1441 static int sst_create_file(const char *name, const char *content)
1442 {
1443 int err= 0;
1444 char *real_name;
1445 char *tmp_name;
1446 ssize_t len;
1447 FILE *file;
1448
1449 len= strlen(mysql_real_data_home) + strlen(name) + 2;
1450 real_name= (char *) alloca(len);
1451
1452 snprintf(real_name, (size_t) len, "%s/%s", mysql_real_data_home, name);
1453
1454 tmp_name= (char *) alloca(len + 4);
1455 snprintf(tmp_name, (size_t) len + 4, "%s.tmp", real_name);
1456
1457 file= fopen(tmp_name, "w+");
1458
1459 if (0 == file)
1460 {
1461 err= errno;
1462 WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err, strerror(err));
1463 }
1464 else
1465 {
1466 // Write the specified content into the file.
1467 if (content != NULL)
1468 {
1469 fprintf(file, "%s\n", content);
1470 fsync(fileno(file));
1471 }
1472
1473 fclose(file);
1474
1475 if (rename(tmp_name, real_name) == -1)
1476 {
1477 err= errno;
1478 WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", tmp_name,
1479 real_name, err, strerror(err));
1480 }
1481 }
1482
1483 return err;
1484 }
1485
run_sql_command(THD * thd,const char * query)1486 static int run_sql_command(THD *thd, const char *query)
1487 {
1488 thd->set_query((char *)query, strlen(query));
1489
1490 Parser_state ps;
1491 if (ps.init(thd, thd->query(), thd->query_length()))
1492 {
1493 WSREP_ERROR("SST query: %s failed", query);
1494 return -1;
1495 }
1496
1497 mysql_parse(thd, thd->query(), thd->query_length(), &ps, FALSE, FALSE);
1498 if (thd->is_error())
1499 {
1500 int const err= thd->get_stmt_da()->sql_errno();
1501 WSREP_WARN ("Error executing '%s': %d (%s)%s",
1502 query, err, thd->get_stmt_da()->message(),
1503 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
1504 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
1505 thd->clear_error();
1506 return -1;
1507 }
1508 return 0;
1509 }
1510
1511
sst_flush_tables(THD * thd)1512 static int sst_flush_tables(THD* thd)
1513 {
1514 WSREP_INFO("Flushing tables for SST...");
1515
1516 int err= 0;
1517 int not_used;
1518 /*
1519 Files created to notify the SST script about the outcome of table flush
1520 operation.
1521 */
1522 const char *flush_success= "tables_flushed";
1523 const char *flush_error= "sst_error";
1524
1525 CHARSET_INFO *current_charset= thd->variables.character_set_client;
1526
1527 if (!is_supported_parser_charset(current_charset))
1528 {
1529 /* Do not use non-supported parser character sets */
1530 WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1531 thd->variables.character_set_client= &my_charset_latin1;
1532 WSREP_WARN("For SST temporally setting character set to : %s",
1533 my_charset_latin1.csname);
1534 }
1535
1536 if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
1537 {
1538 err= -1;
1539 }
1540 else
1541 {
1542 /*
1543 Make sure logs are flushed after global read lock acquired. In case
1544 reload fails, we must also release the acquired FTWRL.
1545 */
1546 if (reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
1547 (TABLE_LIST*) 0, ¬_used))
1548 {
1549 thd->global_read_lock.unlock_global_read_lock(thd);
1550 err= -1;
1551 }
1552 }
1553
1554 thd->variables.character_set_client= current_charset;
1555
1556 if (err)
1557 {
1558 WSREP_ERROR("Failed to flush and lock tables");
1559
1560 /*
1561 The SST must be aborted as the flush tables failed. Notify this to SST
1562 script by creating the error file.
1563 */
1564 int tmp;
1565 if ((tmp= sst_create_file(flush_error, NULL))) {
1566 err= tmp;
1567 }
1568 }
1569 else
1570 {
1571 WSREP_INFO("Tables flushed.");
1572 /*
1573 Tables have been flushed. Create a file with cluster state ID and
1574 wsrep_gtid_domain_id.
1575 */
1576 char content[100];
1577 snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid,
1578 (long long)wsrep_locked_seqno, wsrep_gtid_server.domain_id);
1579 err= sst_create_file(flush_success, content);
1580
1581 const char base_name[]= "tables_flushed";
1582 ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
1583 char *real_name= (char*) malloc(full_len);
1584 sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
1585 char *tmp_name= (char*) malloc(full_len + 4);
1586 sprintf(tmp_name, "%s.tmp", real_name);
1587
1588 FILE* file= fopen(tmp_name, "w+");
1589 if (0 == file)
1590 {
1591 err= errno;
1592 WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
1593 }
1594 else
1595 {
1596 Wsrep_server_state& server_state= Wsrep_server_state::instance();
1597 std::ostringstream uuid_oss;
1598
1599 uuid_oss << server_state.current_view().state_id().id();
1600
1601 fprintf(file, "%s:%lld %u\n",
1602 uuid_oss.str().c_str(), server_state.pause_seqno().get(),
1603 wsrep_gtid_server.domain_id);
1604 fsync(fileno(file));
1605 fclose(file);
1606 if (rename(tmp_name, real_name) == -1)
1607 {
1608 err= errno;
1609 WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
1610 tmp_name, real_name, err,strerror(err));
1611 }
1612 }
1613 free(real_name);
1614 free(tmp_name);
1615 }
1616
1617 return err;
1618 }
1619
1620
sst_disallow_writes(THD * thd,bool yes)1621 static void sst_disallow_writes (THD* thd, bool yes)
1622 {
1623 char query_str[64]= { 0, };
1624 ssize_t const query_max= sizeof(query_str) - 1;
1625 CHARSET_INFO *current_charset;
1626
1627 current_charset= thd->variables.character_set_client;
1628
1629 if (!is_supported_parser_charset(current_charset))
1630 {
1631 /* Do not use non-supported parser character sets */
1632 WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1633 thd->variables.character_set_client= &my_charset_latin1;
1634 WSREP_WARN("For SST temporally setting character set to : %s",
1635 my_charset_latin1.csname);
1636 }
1637
1638 snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
1639 yes ? 1 : 0);
1640
1641 if (run_sql_command(thd, query_str))
1642 {
1643 WSREP_ERROR("Failed to disallow InnoDB writes");
1644 }
1645 thd->variables.character_set_client= current_charset;
1646 }
1647
sst_donor_thread(void * a)1648 static void* sst_donor_thread (void* a)
1649 {
1650 sst_thread_arg* arg= (sst_thread_arg*)a;
1651
1652 WSREP_INFO("Running: '%s'", arg->cmd);
1653
1654 int err= 1;
1655 bool locked= false;
1656
1657 const char* out= NULL;
1658 const size_t out_len= 128;
1659 char out_buf[out_len];
1660
1661 wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED;
1662 // seqno of complete SST
1663 wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
1664
1665 // We turn off wsrep_on for this THD so that it can
1666 // operate with wsrep_ready == OFF
1667 // We also set this SST thread THD as system thread
1668 wsp::thd thd(FALSE, true);
1669 wsp::process proc(arg->cmd, "r", arg->env);
1670
1671 err= -proc.error();
1672
1673 /* Inform server about SST script startup and release TO isolation */
1674 mysql_mutex_lock (&arg->lock);
1675 arg->err= -err;
1676 mysql_cond_signal (&arg->cond);
1677 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
1678
1679 if (proc.pipe() && !err)
1680 {
1681 wait_signal:
1682 out= my_fgets (out_buf, out_len, proc.pipe());
1683
1684 if (out)
1685 {
1686 const char magic_flush[]= "flush tables";
1687 const char magic_cont[]= "continue";
1688 const char magic_done[]= "done";
1689
1690 if (!strcasecmp (out, magic_flush))
1691 {
1692 err= sst_flush_tables (thd.ptr);
1693 if (!err)
1694 {
1695 sst_disallow_writes (thd.ptr, true);
1696 /*
1697 Lets also keep statements that modify binary logs (like RESET LOGS,
1698 RESET MASTER) from proceeding until the files have been transferred
1699 to the joiner node.
1700 */
1701 if (mysql_bin_log.is_open())
1702 {
1703 mysql_mutex_lock(mysql_bin_log.get_log_lock());
1704 }
1705
1706 locked= true;
1707 goto wait_signal;
1708 }
1709 }
1710 else if (!strcasecmp (out, magic_cont))
1711 {
1712 if (locked)
1713 {
1714 if (mysql_bin_log.is_open())
1715 {
1716 mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1717 mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1718 }
1719 sst_disallow_writes (thd.ptr, false);
1720 thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr);
1721 locked= false;
1722 }
1723 err= 0;
1724 goto wait_signal;
1725 }
1726 else if (!strncasecmp (out, magic_done, strlen(magic_done)))
1727 {
1728 err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
1729 &ret_uuid, &ret_seqno);
1730 }
1731 else
1732 {
1733 WSREP_WARN("Received unknown signal: '%s'", out);
1734 proc.wait();
1735 }
1736 }
1737 else
1738 {
1739 WSREP_ERROR("Failed to read from: %s", proc.cmd());
1740 proc.wait();
1741 }
1742 if (!err && proc.error()) err= -proc.error();
1743 }
1744 else
1745 {
1746 WSREP_ERROR("Failed to execute: %s : %d (%s)",
1747 proc.cmd(), err, strerror(err));
1748 }
1749
1750 if (locked) // don't forget to unlock server before return
1751 {
1752 if (mysql_bin_log.is_open())
1753 {
1754 mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1755 mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1756 }
1757 sst_disallow_writes (thd.ptr, false);
1758 thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr);
1759 }
1760
1761 wsrep::gtid gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)),
1762 wsrep::seqno(err ? wsrep::seqno::undefined() :
1763 wsrep::seqno(ret_seqno)));
1764
1765 Wsrep_server_state::instance().sst_sent(gtid, err);
1766
1767 proc.wait();
1768
1769 wsrep_donor_monitor_end();
1770 return nullptr;
1771 }
1772
sst_donate_other(const char * method,const char * addr,const wsrep::gtid & gtid,bool bypass,char ** env)1773 static int sst_donate_other (const char* method,
1774 const char* addr,
1775 const wsrep::gtid& gtid,
1776 bool bypass,
1777 char** env) // carries auth info
1778 {
1779 bool extra_args;
1780 size_t const cmd_len= estimate_cmd_len(&extra_args);
1781 wsp::string cmd_str(cmd_len);
1782
1783 if (!cmd_str())
1784 {
1785 WSREP_ERROR("sst_donate_other(): "
1786 "could not allocate cmd buffer of %zd bytes", cmd_len);
1787 return -ENOMEM;
1788 }
1789
1790 char* binlog_opt_val= NULL;
1791 char* binlog_index_opt_val= NULL;
1792
1793 int ret;
1794 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1795 {
1796 WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1797 return ret;
1798 }
1799
1800 if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
1801 {
1802 WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
1803 ret);
1804 if (binlog_opt_val) my_free(binlog_opt_val);
1805 return ret;
1806 }
1807
1808 make_wsrep_defaults_file();
1809
1810 std::ostringstream uuid_oss;
1811 uuid_oss << gtid.id();
1812 ret= snprintf (cmd_str(), cmd_len,
1813 "wsrep_sst_%s "
1814 WSREP_SST_OPT_ROLE " 'donor' "
1815 WSREP_SST_OPT_ADDR " '%s' "
1816 WSREP_SST_OPT_LPORT " '%u' "
1817 WSREP_SST_OPT_SOCKET " '%s' "
1818 WSREP_SST_OPT_DATA " '%s' "
1819 "%s"
1820 WSREP_SST_OPT_GTID " '%s:%lld' "
1821 WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1822 "%s"
1823 "%s"
1824 "%s",
1825 method, addr, mysqld_port, mysqld_unix_port,
1826 mysql_real_data_home,
1827 wsrep_defaults_file,
1828 uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_server.domain_id,
1829 binlog_opt_val, binlog_index_opt_val,
1830 bypass ? " " WSREP_SST_OPT_BYPASS : "");
1831
1832 my_free(binlog_opt_val);
1833 my_free(binlog_index_opt_val);
1834
1835 if (ret < 0 || size_t(ret) >= cmd_len)
1836 {
1837 WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1838 return (ret < 0 ? ret : -EMSGSIZE);
1839 }
1840
1841 if (extra_args)
1842 copy_orig_argv(cmd_str() + ret);
1843
1844 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1845
1846 pthread_t tmp;
1847 sst_thread_arg arg(cmd_str(), env);
1848
1849 mysql_mutex_lock (&arg.lock);
1850
1851 ret= mysql_thread_create (key_wsrep_sst_donor, &tmp, NULL, sst_donor_thread, &arg);
1852
1853 if (ret)
1854 {
1855 WSREP_ERROR("sst_donate_other(): mysql_thread_create() failed: %d (%s)",
1856 ret, strerror(ret));
1857 return ret;
1858 }
1859
1860 mysql_cond_wait (&arg.cond, &arg.lock);
1861
1862 WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1863 return arg.err;
1864 }
1865
1866 /* return true if character can be a part of a filename */
filename_char(int const c)1867 static bool filename_char(int const c)
1868 {
1869 return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1870 }
1871
1872 /* return true if character can be a part of an address string */
address_char(int const c)1873 static bool address_char(int const c)
1874 {
1875 return filename_char(c) ||
1876 (c == ':') || (c == '[') || (c == ']') || (c == '/');
1877 }
1878
check_request_str(const char * const str,bool (* check)(int c))1879 static bool check_request_str(const char* const str,
1880 bool (*check) (int c))
1881 {
1882 for (size_t i(0); str[i] != '\0'; ++i)
1883 {
1884 if (!check(str[i]))
1885 {
1886 WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1887 str[i], str[i]);
1888 return true;
1889 }
1890 }
1891
1892 return false;
1893 }
1894
wsrep_sst_donate(const std::string & msg,const wsrep::gtid & current_gtid,const bool bypass)1895 int wsrep_sst_donate(const std::string& msg,
1896 const wsrep::gtid& current_gtid,
1897 const bool bypass)
1898 {
1899 const char* method= msg.data();
1900 size_t method_len= strlen (method);
1901
1902 if (check_request_str(method, filename_char))
1903 {
1904 WSREP_ERROR("Bad SST method name. SST canceled.");
1905 return WSREP_CB_FAILURE;
1906 }
1907
1908 const char* data= method + method_len + 1;
1909
1910 /* check for auth@addr separator */
1911 const char* addr= strrchr(data, '@');
1912 wsp::string remote_auth;
1913 if (addr)
1914 {
1915 remote_auth.set(strndup(data, addr - data));
1916 addr++;
1917 }
1918 else
1919 {
1920 // no auth part
1921 addr= data;
1922 }
1923
1924 if (check_request_str(addr, address_char))
1925 {
1926 WSREP_ERROR("Bad SST address string. SST canceled.");
1927 return WSREP_CB_FAILURE;
1928 }
1929
1930 wsp::env env(NULL);
1931 if (env.error())
1932 {
1933 WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1934 return WSREP_CB_FAILURE;
1935 }
1936
1937 int ret;
1938 if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth_real)))
1939 {
1940 WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1941 return WSREP_CB_FAILURE;
1942 }
1943
1944 if (remote_auth())
1945 {
1946 if ((ret= sst_append_env_var(env, WSREP_SST_REMOTE_AUTH_ENV,remote_auth())))
1947 {
1948 WSREP_ERROR("wsrep_sst_donate_cb(): appending remote auth env failed: "
1949 "%d", ret);
1950 return WSREP_CB_FAILURE;
1951 }
1952 }
1953
1954 if (data_home_dir)
1955 {
1956 if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1957 {
1958 WSREP_ERROR("wsrep_sst_donate_cb(): appending data "
1959 "directory failed: %d", ret);
1960 return WSREP_CB_FAILURE;
1961 }
1962 }
1963
1964 sst_donor_completed= false;
1965 pthread_t monitor;
1966
1967 ret= mysql_thread_create (key_wsrep_sst_donor_monitor, &monitor, NULL, wsrep_sst_donor_monitor_thread, NULL);
1968
1969 if (ret)
1970 {
1971 WSREP_ERROR("sst_donate: mysql_thread_create() failed: %d (%s)",
1972 ret, strerror(ret));
1973 return WSREP_CB_FAILURE;
1974 }
1975
1976 if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1977 {
1978 ret= sst_donate_mysqldump(addr, current_gtid, bypass, env());
1979 }
1980 else
1981 {
1982 ret= sst_donate_other(method, addr, current_gtid, bypass, env());
1983 }
1984
1985 return (ret >= 0 ? 0 : 1);
1986 }
1987