1 /* Copyright 2008-2015 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15 
16 #include "wsrep_sst.h"
17 
18 #include <mysqld.h>
19 #include <sql_class.h>
20 #include <set_var.h>
21 #include <sql_acl.h>
22 #include <sql_reload.h>
23 #include <sql_parse.h>
24 #include "wsrep_priv.h"
25 #include "wsrep_utils.h"
26 #include "wsrep_xid.h"
27 #include <cstdio>
28 #include <cstdlib>
29 #include <cctype>
30 
31 extern const char wsrep_defaults_file[];
32 extern const char wsrep_defaults_group_suffix[];
33 
34 #define WSREP_SST_OPT_ROLE     "--role"
35 #define WSREP_SST_OPT_ADDR     "--address"
36 #define WSREP_SST_OPT_AUTH     "--auth"
37 #define WSREP_SST_OPT_DATA     "--datadir"
38 #define WSREP_SST_OPT_CONF     "--defaults-file"
39 #define WSREP_SST_OPT_CONF_SUFFIX "--defaults-group-suffix"
40 #define WSREP_SST_OPT_PARENT   "--parent"
41 #define WSREP_SST_OPT_BINLOG   "--binlog"
42 
43 // mysqldump-specific options
44 #define WSREP_SST_OPT_USER     "--user"
45 #define WSREP_SST_OPT_PSWD     "--password"
46 #define WSREP_SST_OPT_HOST     "--host"
47 #define WSREP_SST_OPT_PORT     "--port"
48 #define WSREP_SST_OPT_LPORT    "--local-port"
49 
50 // donor-specific
51 #define WSREP_SST_OPT_SOCKET   "--socket"
52 #define WSREP_SST_OPT_GTID     "--gtid"
53 #define WSREP_SST_OPT_BYPASS   "--bypass"
54 
55 #define WSREP_SST_MYSQLDUMP       "mysqldump"
56 #define WSREP_SST_RSYNC           "rsync"
57 #define WSREP_SST_SKIP            "skip"
58 #define WSREP_SST_XTRABACKUP      "xtrabackup"
59 #define WSREP_SST_XTRABACKUP_V2   "xtrabackup-v2"
60 #define WSREP_SST_DEFAULT      WSREP_SST_RSYNC
61 #define WSREP_SST_ADDRESS_AUTO "AUTO"
62 #define WSREP_SST_AUTH_MASK    "********"
63 
64 const char* wsrep_sst_method          = WSREP_SST_DEFAULT;
65 const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
66 const char* wsrep_sst_donor           = "";
67       char* wsrep_sst_auth            = NULL;
68 
69 // container for real auth string
70 static const char* sst_auth_real      = NULL;
71 my_bool wsrep_sst_donor_rejects_queries = FALSE;
72 
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)73 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
74 {
75     char   buff[FN_REFLEN];
76     String str(buff, sizeof(buff), system_charset_info), *res;
77     const char* c_str = NULL;
78 
79     if ((res   = var->value->val_str(&str)) &&
80         (c_str = res->c_ptr()) &&
81         strlen(c_str) > 0)
82         return 0;
83 
84     my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL");
85     return 1;
86 }
87 
wsrep_sst_method_update(sys_var * self,THD * thd,enum_var_type type)88 bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type)
89 {
90     return 0;
91 }
92 
sst_receive_address_check(const char * str)93 static bool sst_receive_address_check (const char* str)
94 {
95     if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) ||
96         !strncasecmp(str, "localhost", strlen("localhost")))
97     {
98         return 1;
99     }
100 
101     return 0;
102 }
103 
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)104 bool  wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
105 {
106     const char* c_str = var->value->str_value.c_ptr();
107 
108     if (sst_receive_address_check (c_str))
109     {
110         my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL");
111         return 1;
112     }
113 
114     return 0;
115 }
116 
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)117 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
118                                        enum_var_type type)
119 {
120     return 0;
121 }
122 
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)123 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
124 {
125     return 0;
126 }
sst_auth_real_set(const char * value)127 static bool sst_auth_real_set (const char* value)
128 {
129     const char* v = strdup (value);
130 
131     if (v)
132     {
133         if (sst_auth_real) free (const_cast<char*>(sst_auth_real));
134         sst_auth_real = v;
135 
136         if (strlen(sst_auth_real))
137         {
138           if (wsrep_sst_auth)
139           {
140             my_free ((void*)wsrep_sst_auth);
141             wsrep_sst_auth = my_strdup(WSREP_SST_AUTH_MASK, MYF(0));
142             //strncpy (wsrep_sst_auth, WSREP_SST_AUTH_MASK,
143             //     sizeof(wsrep_sst_auth) - 1);
144           }
145           else
146             wsrep_sst_auth = my_strdup (WSREP_SST_AUTH_MASK, MYF(0));
147         }
148         return 0;
149     }
150 
151     return 1;
152 }
153 
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)154 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
155 {
156     return sst_auth_real_set (wsrep_sst_auth);
157 }
158 
wsrep_sst_auth_init(const char * value)159 void wsrep_sst_auth_init (const char* value)
160 {
161     if (wsrep_sst_auth == value) wsrep_sst_auth = NULL;
162     if (value) sst_auth_real_set (value);
163 }
164 
wsrep_sst_auth_free()165 void wsrep_sst_auth_free()
166 {
167   if (wsrep_sst_auth) { my_free ((void*)wsrep_sst_auth); }
168   if (sst_auth_real) { free (const_cast<char*>(sst_auth_real)); }
169   wsrep_sst_auth= NULL;
170   sst_auth_real= NULL;
171 }
172 
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)173 bool  wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
174 {
175   return 0;
176 }
177 
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)178 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
179 {
180     return 0;
181 }
182 
183 static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
184 
wsrep_before_SE()185 bool wsrep_before_SE()
186 {
187   return (wsrep_provider != NULL
188           && strcmp (wsrep_provider,   WSREP_NONE)
189           && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
190           && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
191 }
192 
193 static bool            sst_complete = false;
194 static bool            sst_needed   = false;
195 
wsrep_sst_grab()196 void wsrep_sst_grab ()
197 {
198   WSREP_INFO("wsrep_sst_grab()");
199   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
200   sst_complete = false;
201   mysql_mutex_unlock (&LOCK_wsrep_sst);
202 }
203 
204 // Wait for end of SST
wsrep_sst_wait()205 bool wsrep_sst_wait ()
206 {
207   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
208   while (!sst_complete)
209   {
210     WSREP_INFO("Waiting for SST to complete.");
211     mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst);
212   }
213 
214   if (local_seqno >= 0)
215   {
216     WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
217   }
218   else
219   {
220     WSREP_ERROR("SST failed: %d (%s)",
221                 int(-local_seqno), strerror(-local_seqno));
222   }
223 
224   mysql_mutex_unlock (&LOCK_wsrep_sst);
225 
226   return (local_seqno >= 0);
227 }
228 
229 // Signal end of SST
wsrep_sst_complete(const wsrep_uuid_t * sst_uuid,wsrep_seqno_t sst_seqno,bool needed)230 void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
231                          wsrep_seqno_t       sst_seqno,
232                          bool                needed)
233 {
234   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
235   if (!sst_complete)
236   {
237     sst_complete = true;
238     sst_needed   = needed;
239     local_uuid   = *sst_uuid;
240     local_seqno  = sst_seqno;
241     mysql_cond_signal (&COND_wsrep_sst);
242   }
243   else
244   {
245     /* This can happen when called from wsrep_synced_cb().
246        At the moment there is no way to check there
247        if main thread is still waiting for signal,
248        so wsrep_sst_complete() is called from there
249        each time wsrep_ready changes from FALSE -> TRUE.
250     */
251     WSREP_DEBUG("Nobody is waiting for SST.");
252   }
253   mysql_mutex_unlock (&LOCK_wsrep_sst);
254 }
255 
wsrep_sst_received(wsrep_t * const wsrep,const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno,const void * const state,size_t const state_len)256 void wsrep_sst_received (wsrep_t* const      wsrep,
257                          const wsrep_uuid_t& uuid,
258                          wsrep_seqno_t const seqno,
259                          const void* const   state,
260                          size_t const        state_len)
261 {
262     wsrep_get_SE_checkpoint(local_uuid, local_seqno);
263 
264     if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
265         local_seqno < seqno || seqno < 0)
266     {
267         wsrep_set_SE_checkpoint(uuid, seqno);
268         local_uuid = uuid;
269         local_seqno = seqno;
270     }
271     else if (local_seqno > seqno)
272     {
273         WSREP_WARN("SST postion is in the past: %lld, current: %lld. "
274                    "Can't continue.",
275                    (long long)seqno, (long long)local_seqno);
276         unireg_abort(1);
277     }
278 
279     wsrep_init_sidno(uuid);
280 
281     if (wsrep)
282     {
283         int const rcode(seqno < 0 ? seqno : 0);
284         wsrep_gtid_t const state_id = {
285             uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
286         };
287 
288         wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
289     }
290 }
291 
292 // Let applier threads to continue
wsrep_sst_continue()293 void wsrep_sst_continue ()
294 {
295   if (sst_needed)
296   {
297     WSREP_INFO("Signalling provider to continue.");
298     // local_uuid and local_seqno are global variables and are volatile
299     wsrep_uuid_t  const sst_uuid  = local_uuid;
300     wsrep_seqno_t const sst_seqno = local_seqno;
301     wsrep_sst_received (wsrep, sst_uuid, sst_seqno, NULL, 0);
302   }
303 }
304 
305 struct sst_thread_arg
306 {
307   const char*     cmd;
308   char**          env;
309   char*           ret_str;
310   int             err;
311   mysql_mutex_t   lock;
312   mysql_cond_t    cond;
313 
sst_thread_argsst_thread_arg314   sst_thread_arg (const char* c, char** e)
315     : cmd(c), env(e), ret_str(0), err(-1)
316   {
317     mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
318     mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
319   }
320 
~sst_thread_argsst_thread_arg321   ~sst_thread_arg()
322   {
323     mysql_cond_destroy  (&cond);
324     mysql_mutex_unlock  (&lock);
325     mysql_mutex_destroy (&lock);
326   }
327 };
328 
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)329 static int sst_scan_uuid_seqno (const char* str,
330                                 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
331 {
332   int offt = wsrep_uuid_scan (str, strlen(str), uuid);
333   if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
334   {
335     *seqno = strtoll (str + offt + 1, NULL, 10);
336     if (*seqno != LLONG_MAX || errno != ERANGE)
337     {
338       return 0;
339     }
340   }
341 
342   WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
343   return EINVAL;
344 }
345 
346 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)347 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
348 {
349    char* ret= fgets (buf, buf_len, stream);
350 
351    if (ret)
352    {
353        size_t len = strlen(ret);
354        if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
355    }
356 
357    return ret;
358 }
359 
360 /*
361   Generate opt_binlog_opt_val for sst_donate_other(), sst_prepare_other().
362 
363   Returns zero on success, negative error code otherwise.
364 
365   String containing binlog name is stored in param ret if binlog is enabled
366   and GTID mode is on, otherwise empty string. Returned string should be
367   freed with my_free().
368  */
generate_binlog_opt_val(char ** ret)369 static int generate_binlog_opt_val(char** ret)
370 {
371   DBUG_ASSERT(ret);
372   *ret= NULL;
373   if (opt_bin_log && gtid_mode > 0)
374   {
375     assert(opt_bin_logname);
376     *ret= my_strdup(opt_bin_logname, MYF(0));
377   }
378   else
379   {
380     *ret= my_strdup("", MYF(0));
381   }
382   if (!*ret) return -ENOMEM;
383   return 0;
384 }
385 
sst_joiner_thread(void * a)386 static void* sst_joiner_thread (void* a)
387 {
388   sst_thread_arg* arg= (sst_thread_arg*) a;
389   int err= 1;
390 
391   {
392     const char magic[] = "ready";
393     const size_t magic_len = sizeof(magic) - 1;
394     const size_t out_len = 512;
395     char out[out_len];
396 
397     WSREP_INFO("Running: '%s'", arg->cmd);
398 
399     wsp::process proc (arg->cmd, "r", arg->env);
400 
401     if (proc.pipe() && !proc.error())
402     {
403       const char* tmp= my_fgets (out, out_len, proc.pipe());
404 
405       if (!tmp || strlen(tmp) < (magic_len + 2) ||
406           strncasecmp (tmp, magic, magic_len))
407       {
408         WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
409                     magic, arg->cmd, tmp);
410         proc.wait();
411         if (proc.error()) err = proc.error();
412       }
413       else
414       {
415         err = 0;
416       }
417     }
418     else
419     {
420       err = proc.error();
421       WSREP_ERROR("Failed to execute: %s : %d (%s)",
422                   arg->cmd, err, strerror(err));
423     }
424 
425     // signal sst_prepare thread with ret code,
426     // it will go on sending SST request
427     mysql_mutex_lock   (&arg->lock);
428     if (!err)
429     {
430       arg->ret_str = strdup (out + magic_len + 1);
431       if (!arg->ret_str) err = ENOMEM;
432     }
433     arg->err = -err;
434     mysql_cond_signal  (&arg->cond);
435     mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
436 
437     if (err) return NULL; /* lp:808417 - return immediately, don't signal
438                            * initializer thread to ensure single thread of
439                            * shutdown. */
440 
441     wsrep_uuid_t  ret_uuid  = WSREP_UUID_UNDEFINED;
442     wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
443 
444     // in case of successfull receiver start, wait for SST completion/end
445     char* tmp = my_fgets (out, out_len, proc.pipe());
446 
447     proc.wait();
448     err= EINVAL;
449 
450     if (!tmp)
451     {
452       WSREP_ERROR("Failed to read uuid:seqno from joiner script.");
453       if (proc.error()) err = proc.error();
454     }
455     else
456     {
457       err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
458     }
459 
460     if (err)
461     {
462       ret_uuid=  WSREP_UUID_UNDEFINED;
463       ret_seqno= -err;
464     }
465 
466     // Tell initializer thread that SST is complete
467     wsrep_sst_complete (&ret_uuid, ret_seqno, true);
468   }
469 
470   return NULL;
471 }
472 
473 #define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH"
474 
sst_append_auth_env(wsp::env & env,const char * sst_auth)475 static int sst_append_auth_env(wsp::env& env, const char* sst_auth)
476 {
477   int const sst_auth_size= strlen(WSREP_SST_AUTH_ENV) + 1 /* = */
478     + (sst_auth ? strlen(sst_auth) : 0) + 1 /* \0 */;
479 
480   wsp::string sst_auth_str(sst_auth_size); // for automatic cleanup on return
481   if (!sst_auth_str()) return -ENOMEM;
482 
483   int ret= snprintf(sst_auth_str(), sst_auth_size, "%s=%s",
484                     WSREP_SST_AUTH_ENV, sst_auth ? sst_auth : "");
485 
486   if (ret < 0 || ret >= sst_auth_size)
487   {
488     WSREP_ERROR("sst_append_auth_env(): snprintf() failed: %d", ret);
489     return (ret < 0 ? ret : -EMSGSIZE);
490   }
491 
492   env.append(sst_auth_str());
493   return -env.error();
494 }
495 
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)496 static ssize_t sst_prepare_other (const char*  method,
497                                   const char*  sst_auth,
498                                   const char*  addr_in,
499                                   const char** addr_out)
500 {
501   int const cmd_len= 4096;
502   wsp::string cmd_str(cmd_len);
503 
504   if (!cmd_str())
505   {
506     WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %d bytes",
507                 cmd_len);
508     return -ENOMEM;
509   }
510 
511   const char* binlog_opt= "";
512   char* binlog_opt_val= NULL;
513 
514   int ret;
515   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
516   {
517     WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
518                 ret);
519     return ret;
520   }
521   if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
522 
523   ret= snprintf (cmd_str(), cmd_len,
524                  "wsrep_sst_%s "
525                  WSREP_SST_OPT_ROLE" 'joiner' "
526                  WSREP_SST_OPT_ADDR" '%s' "
527                  WSREP_SST_OPT_DATA" '%s' "
528                  WSREP_SST_OPT_CONF" '%s' "
529                  WSREP_SST_OPT_CONF_SUFFIX" '%s' "
530                  WSREP_SST_OPT_PARENT" '%d'"
531                  " %s '%s' ",
532                  method, addr_in, mysql_real_data_home,
533                  wsrep_defaults_file, wsrep_defaults_group_suffix,
534                  (int)getpid(), binlog_opt, binlog_opt_val);
535   my_free(binlog_opt_val);
536 
537   if (ret < 0 || ret >= cmd_len)
538   {
539     WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
540     return (ret < 0 ? ret : -EMSGSIZE);
541   }
542 
543   wsp::env env(NULL);
544   if (env.error())
545   {
546     WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
547     return -env.error();
548   }
549 
550   if ((ret= sst_append_auth_env(env, sst_auth)))
551   {
552     WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
553     return ret;
554   }
555 
556   pthread_t tmp;
557   sst_thread_arg arg(cmd_str(), env());
558   mysql_mutex_lock (&arg.lock);
559   ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
560   if (ret)
561   {
562     WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)",
563                 ret, strerror(ret));
564     return -ret;
565   }
566   mysql_cond_wait (&arg.cond, &arg.lock);
567 
568   *addr_out= arg.ret_str;
569 
570   if (!arg.err)
571     ret = strlen(*addr_out);
572   else
573   {
574     assert (arg.err < 0);
575     ret = arg.err;
576   }
577 
578   pthread_detach (tmp);
579 
580   return ret;
581 }
582 
583 extern uint  mysqld_port;
584 
585 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)586 static ssize_t sst_prepare_mysqldump (const char*  addr_in,
587                                       const char** addr_out)
588 {
589   ssize_t ret = strlen (addr_in);
590 
591   if (!strrchr(addr_in, ':'))
592   {
593     ssize_t s = ret + 7;
594     char* tmp = (char*) malloc (s);
595 
596     if (tmp)
597     {
598       ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
599 
600       if (ret > 0 && ret < s)
601       {
602         *addr_out= tmp;
603         return ret;
604       }
605       if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
606       free (tmp);
607     }
608     else {
609       ret= -ENOMEM;
610     }
611 
612     WSREP_ERROR ("Could not prepare state transfer request: "
613                  "adding default port failed: %zd.", ret);
614   }
615   else {
616     *addr_out= addr_in;
617   }
618 
619   return ret;
620 }
621 
622 static enum Wsrep_SE_init_result SE_initialized= WSREP_SE_INIT_RESULT_NONE;
623 
wsrep_sst_prepare(void ** msg)624 ssize_t wsrep_sst_prepare (void** msg)
625 {
626   const ssize_t ip_max= 256;
627   char ip_buf[ip_max];
628   const char* addr_in=  NULL;
629   const char* addr_out= NULL;
630 
631   if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
632   {
633     ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
634     *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
635     if (!msg)
636     {
637       WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
638       unireg_abort(1);
639     }
640     return ret;
641   }
642 
643   // Figure out SST address. Common for all SST methods
644   if (wsrep_sst_receive_address &&
645       strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
646   {
647     addr_in= wsrep_sst_receive_address;
648   }
649   else if (wsrep_node_address && strlen(wsrep_node_address))
650   {
651     size_t const addr_len= strlen(wsrep_node_address);
652     size_t const host_len= wsrep_host_len(wsrep_node_address, addr_len);
653 
654     if (host_len < addr_len)
655     {
656       strncpy (ip_buf, wsrep_node_address, host_len);
657       ip_buf[host_len]= '\0';
658       addr_in= ip_buf;
659     }
660     else
661     {
662       addr_in= wsrep_node_address;
663     }
664   }
665   else
666   {
667     ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
668 
669     if (ret && ret < ip_max)
670     {
671       addr_in= ip_buf;
672     }
673     else
674     {
675       WSREP_ERROR("Could not prepare state transfer request: "
676                   "failed to guess address to accept state transfer at. "
677                   "wsrep_sst_receive_address must be set manually.");
678       unireg_abort(1);
679     }
680   }
681 
682   ssize_t addr_len= -ENOSYS;
683   if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP))
684   {
685     addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
686     if (addr_len < 0) unireg_abort(1);
687   }
688   else
689   {
690     /*! A heuristic workaround until we learn how to stop and start engines */
691     if (SE_initialized)
692     {
693       // we already did SST at initializaiton, now engines are running
694       // sql_print_information() is here because the message is too long
695       // for WSREP_INFO.
696       sql_print_information ("WSREP: "
697                  "You have configured '%s' state snapshot transfer method "
698                  "which cannot be performed on a running server. "
699                  "Wsrep provider won't be able to fall back to it "
700                  "if other means of state transfer are unavailable. "
701                  "In that case you will need to restart the server.",
702                  wsrep_sst_method);
703       *msg = 0;
704       return 0;
705     }
706 
707     addr_len = sst_prepare_other (wsrep_sst_method, sst_auth_real,
708                                   addr_in, &addr_out);
709     if (addr_len < 0)
710     {
711       WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
712                    wsrep_sst_method);
713       unireg_abort(1);
714     }
715   }
716 
717   size_t const method_len(strlen(wsrep_sst_method));
718   size_t const msg_len   (method_len + addr_len + 2 /* + auth_len + 1*/);
719 
720   *msg = malloc (msg_len);
721   if (NULL != *msg) {
722     char* const method_ptr(reinterpret_cast<char*>(*msg));
723     strcpy (method_ptr, wsrep_sst_method);
724     char* const addr_ptr(method_ptr + method_len + 1);
725     strcpy (addr_ptr, addr_out);
726 
727     WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr);
728   }
729   else {
730     WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
731                 msg_len);
732     unireg_abort(1);
733   }
734 
735   if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
736 
737   return msg_len;
738 }
739 
740 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)741 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
742 {
743   int ret = 0;
744 
745   for (int tries=1; tries <= max_tries; tries++)
746   {
747     wsp::process proc (cmd_str, "r", env);
748 
749     if (NULL != proc.pipe())
750     {
751       proc.wait();
752     }
753 
754     if ((ret = proc.error()))
755     {
756       WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
757                   tries, max_tries, proc.cmd(), ret, strerror(ret));
758       sleep (1);
759     }
760     else
761     {
762       WSREP_DEBUG("SST script successfully completed.");
763       break;
764     }
765   }
766 
767   return -ret;
768 }
769 
sst_reject_queries(my_bool close_conn)770 static void sst_reject_queries(my_bool close_conn)
771 {
772     wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
773     WSREP_INFO("Rejecting client queries for the duration of SST.");
774     if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
775 }
776 
sst_donate_mysqldump(const char * addr,const wsrep_uuid_t * uuid,const char * uuid_str,wsrep_seqno_t seqno,bool bypass,char ** env)777 static int sst_donate_mysqldump (const char*         addr,
778                                  const wsrep_uuid_t* uuid,
779                                  const char*         uuid_str,
780                                  wsrep_seqno_t       seqno,
781                                  bool                bypass,
782                                  char**              env) // carries auth info
783 {
784   int const cmd_len= 4096;
785   wsp::string  cmd_str(cmd_len);
786 
787   if (!cmd_str())
788   {
789     WSREP_ERROR("sst_donate_mysqldump(): "
790                 "could not allocate cmd buffer of %d bytes", cmd_len);
791     return -ENOMEM;
792   }
793 
794   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
795 
796   int ret= snprintf (cmd_str(), cmd_len,
797                      "wsrep_sst_mysqldump "
798                      WSREP_SST_OPT_ADDR" '%s' "
799                      WSREP_SST_OPT_LPORT" '%u' "
800                      WSREP_SST_OPT_SOCKET" '%s' "
801                      WSREP_SST_OPT_CONF" '%s' "
802                      WSREP_SST_OPT_GTID" '%s:%lld'"
803                      "%s",
804                      addr, mysqld_port, mysqld_unix_port,
805                      wsrep_defaults_file, uuid_str,
806                      (long long)seqno, bypass ? " " WSREP_SST_OPT_BYPASS : "");
807 
808   if (ret < 0 || ret >= cmd_len)
809   {
810     WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
811     return (ret < 0 ? ret : -EMSGSIZE);
812   }
813 
814   WSREP_DEBUG("Running: '%s'", cmd_str());
815 
816   ret= sst_run_shell (cmd_str(), env, 3);
817 
818   wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
819 
820   wsrep->sst_sent (wsrep, &state_id, ret);
821 
822   return ret;
823 }
824 
825 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
826 
run_sql_command(THD * thd,const char * query)827 static int run_sql_command(THD *thd, const char *query)
828 {
829   thd->set_query((char *)query, strlen(query));
830 
831   Parser_state ps;
832   if (ps.init(thd, thd->query(), thd->query_length()))
833   {
834     WSREP_ERROR("SST query: %s failed", query);
835     return -1;
836   }
837 
838   mysql_parse(thd, thd->query(), thd->query_length(), &ps);
839   if (thd->is_error())
840   {
841     int const err= thd->get_stmt_da()->sql_errno();
842     WSREP_WARN ("error executing '%s': %d (%s)%s",
843                 query, err, thd->get_stmt_da()->message(),
844                 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
845                 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
846     thd->clear_error();
847     return -1;
848   }
849   return 0;
850 }
851 
sst_flush_tables(THD * thd)852 static int sst_flush_tables(THD* thd)
853 {
854   WSREP_INFO("Flushing tables for SST...");
855 
856   int err;
857   int not_used;
858   if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
859   {
860     WSREP_ERROR("Failed to flush and lock tables");
861     err = -1;
862   }
863   else
864   {
865     /* make sure logs are flushed after global read lock acquired */
866     err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
867 			      (TABLE_LIST*) 0, &not_used);
868   }
869 
870   if (err)
871   {
872     WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err));
873   }
874   else
875   {
876     WSREP_INFO("Tables flushed.");
877     const char base_name[]= "tables_flushed";
878     ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
879     char *real_name = (char*) malloc(full_len);
880     sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
881     char *tmp_name = (char*) malloc(full_len + 4);
882     sprintf(tmp_name, "%s.tmp", real_name);
883 
884     FILE* file= fopen(tmp_name, "w+");
885     if (0 == file)
886     {
887       err= errno;
888       WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
889     }
890     else
891     {
892       fprintf(file, "%s:%lld\n",
893               wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno);
894       fsync(fileno(file));
895       fclose(file);
896       if (rename(tmp_name, real_name) == -1)
897       {
898         err= errno;
899         WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
900                      tmp_name, real_name, err,strerror(err));
901       }
902     }
903     free(real_name);
904     free(tmp_name);
905   }
906 
907   return err;
908 }
909 
sst_disallow_writes(THD * thd,bool yes)910 static void sst_disallow_writes (THD* thd, bool yes)
911 {
912   char query_str[64] = { 0, };
913   ssize_t const query_max = sizeof(query_str) - 1;
914   snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
915             yes ? 1 : 0);
916 
917   if (run_sql_command(thd, query_str))
918   {
919     WSREP_ERROR("Failed to disallow InnoDB writes");
920   }
921 }
922 
sst_donor_thread(void * a)923 static void* sst_donor_thread (void* a)
924 {
925   sst_thread_arg* arg= (sst_thread_arg*)a;
926 
927   WSREP_INFO("Running: '%s'", arg->cmd);
928 
929   int  err= 1;
930   bool locked= false;
931 
932   const char*  out= NULL;
933   const size_t out_len= 128;
934   char         out_buf[out_len];
935 
936   wsrep_uuid_t  ret_uuid= WSREP_UUID_UNDEFINED;
937   wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST
938 
939   wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
940                        // operate with wsrep_ready == OFF
941   wsp::process proc(arg->cmd, "r", arg->env);
942 
943   err= proc.error();
944 
945 /* Inform server about SST script startup and release TO isolation */
946   mysql_mutex_lock   (&arg->lock);
947   arg->err = -err;
948   mysql_cond_signal  (&arg->cond);
949   mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
950 
951   if (proc.pipe() && !err)
952   {
953 wait_signal:
954     out= my_fgets (out_buf, out_len, proc.pipe());
955 
956     if (out)
957     {
958       const char magic_flush[]= "flush tables";
959       const char magic_cont[]= "continue";
960       const char magic_done[]= "done";
961 
962       if (!strcasecmp (out, magic_flush))
963       {
964         err= sst_flush_tables (thd.ptr);
965         if (!err)
966         {
967           sst_disallow_writes (thd.ptr, true);
968           locked= true;
969           goto wait_signal;
970         }
971       }
972       else if (!strcasecmp (out, magic_cont))
973       {
974         if (locked)
975         {
976           sst_disallow_writes (thd.ptr, false);
977           thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
978           locked= false;
979         }
980         err=  0;
981         goto wait_signal;
982       }
983       else if (!strncasecmp (out, magic_done, strlen(magic_done)))
984       {
985         err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
986                                   &ret_uuid, &ret_seqno);
987       }
988       else
989       {
990         WSREP_WARN("Received unknown signal: '%s'", out);
991       }
992     }
993     else
994     {
995       WSREP_ERROR("Failed to read from: %s", proc.cmd());
996       proc.wait();
997     }
998     if (!err && proc.error()) err= proc.error();
999   }
1000   else
1001   {
1002     WSREP_ERROR("Failed to execute: %s : %d (%s)",
1003                 proc.cmd(), err, strerror(err));
1004   }
1005 
1006   if (locked) // don't forget to unlock server before return
1007   {
1008     sst_disallow_writes (thd.ptr, false);
1009     thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1010   }
1011 
1012   // signal to donor that SST is over
1013   struct wsrep_gtid const state_id = {
1014       ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
1015   };
1016   wsrep->sst_sent (wsrep, &state_id, -err);
1017   proc.wait();
1018 
1019   return NULL;
1020 }
1021 
1022 
1023 
sst_donate_other(const char * method,const char * addr,const char * uuid,wsrep_seqno_t seqno,bool bypass,char ** env)1024 static int sst_donate_other (const char*   method,
1025                              const char*   addr,
1026                              const char*   uuid,
1027                              wsrep_seqno_t seqno,
1028                              bool          bypass,
1029                              char**        env) // carries auth info
1030 {
1031   int const cmd_len= 4096;
1032   wsp::string  cmd_str(cmd_len);
1033 
1034   if (!cmd_str())
1035   {
1036     WSREP_ERROR("sst_donate_other(): "
1037                 "could not allocate cmd buffer of %d bytes", cmd_len);
1038     return -ENOMEM;
1039   }
1040 
1041   const char* binlog_opt= "";
1042   char* binlog_opt_val= NULL;
1043 
1044   int ret;
1045   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1046   {
1047     WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1048     return ret;
1049   }
1050   if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
1051 
1052   ret= snprintf (cmd_str(), cmd_len,
1053                  "wsrep_sst_%s "
1054                  WSREP_SST_OPT_ROLE" 'donor' "
1055                  WSREP_SST_OPT_ADDR" '%s' "
1056                  WSREP_SST_OPT_SOCKET" '%s' "
1057                  WSREP_SST_OPT_DATA" '%s' "
1058                  WSREP_SST_OPT_CONF" '%s' "
1059                  WSREP_SST_OPT_CONF_SUFFIX" '%s' "
1060                  " %s '%s' "
1061                  WSREP_SST_OPT_GTID" '%s:%lld'"
1062                  "%s",
1063                  method, addr, mysqld_unix_port, mysql_real_data_home,
1064                  wsrep_defaults_file, wsrep_defaults_group_suffix,
1065                  binlog_opt, binlog_opt_val,
1066                  uuid, (long long) seqno,
1067                  bypass ? " " WSREP_SST_OPT_BYPASS : "");
1068   my_free(binlog_opt_val);
1069 
1070   if (ret < 0 || ret >= cmd_len)
1071   {
1072     WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1073     return (ret < 0 ? ret : -EMSGSIZE);
1074   }
1075 
1076   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1077 
1078   pthread_t tmp;
1079   sst_thread_arg arg(cmd_str(), env);
1080   mysql_mutex_lock (&arg.lock);
1081   ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg);
1082   if (ret)
1083   {
1084     WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)",
1085                 ret, strerror(ret));
1086     return ret;
1087   }
1088   mysql_cond_wait (&arg.cond, &arg.lock);
1089 
1090   WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1091   return arg.err;
1092 }
1093 
1094 /* return true if character can be a part of a filename */
filename_char(int const c)1095 static bool filename_char(int const c)
1096 {
1097   return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1098 }
1099 
1100 /* return true if character can be a part of an address string */
address_char(int const c)1101 static bool address_char(int const c)
1102 {
1103   return filename_char(c) ||
1104          (c == ':') || (c == '[') || (c == ']') || (c == '/');
1105 }
1106 
check_request_str(const char * const str,bool (* check)(int c))1107 static bool check_request_str(const char* const str,
1108                               bool (*check) (int c))
1109 {
1110   for (size_t i(0); str[i] != '\0'; ++i)
1111   {
1112     if (!check(str[i]))
1113     {
1114       WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1115                  str[i], str[i]);
1116       return true;
1117     }
1118   }
1119 
1120   return false;
1121 }
1122 
wsrep_sst_donate_cb(void * app_ctx,void * recv_ctx,const void * msg,size_t msg_len,const wsrep_gtid_t * current_gtid,const char * state,size_t state_len,bool bypass)1123 wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
1124                                        const void* msg, size_t msg_len,
1125                                        const wsrep_gtid_t* current_gtid,
1126                                        const char* state, size_t state_len,
1127                                        bool bypass)
1128 {
1129   const char* method = (char*)msg;
1130   size_t method_len  = strlen (method);
1131 
1132   if (check_request_str(method, filename_char))
1133   {
1134     WSREP_ERROR("Bad SST method name. SST canceled.");
1135     return WSREP_CB_FAILURE;
1136   }
1137 
1138   const char* data   = method + method_len + 1;
1139 
1140   if (check_request_str(data, address_char))
1141   {
1142     WSREP_ERROR("Bad SST address string. SST canceled.");
1143     return WSREP_CB_FAILURE;
1144   }
1145 
1146   char uuid_str[37];
1147   wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
1148 
1149   /* This will be reset when sync callback is called.
1150    * Should we set wsrep_ready to FALSE here too? */
1151   local_status.set(WSREP_MEMBER_DONOR);
1152 
1153   wsp::env env(NULL);
1154   if (env.error())
1155   {
1156     WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1157     return WSREP_CB_FAILURE;
1158   }
1159 
1160   int ret;
1161   if ((ret= sst_append_auth_env(env, sst_auth_real)))
1162   {
1163     WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1164     return WSREP_CB_FAILURE;
1165   }
1166 
1167   if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1168   {
1169     ret = sst_donate_mysqldump(data, &current_gtid->uuid, uuid_str,
1170                                current_gtid->seqno, bypass, env());
1171   }
1172   else
1173   {
1174     ret = sst_donate_other(method, data, uuid_str,
1175                            current_gtid->seqno, bypass, env());
1176   }
1177 
1178   return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
1179 }
1180 
wsrep_SE_init_wait()1181 enum Wsrep_SE_init_result wsrep_SE_init_wait()
1182 {
1183   mysql_mutex_lock (&LOCK_wsrep_sst_init);
1184   while (SE_initialized == WSREP_SE_INIT_RESULT_NONE)
1185   {
1186     mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
1187   }
1188   enum Wsrep_SE_init_result ret= SE_initialized;
1189   mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1190   return ret;
1191 }
1192 
wsrep_SE_initialized(enum Wsrep_SE_init_result result)1193 void wsrep_SE_initialized(enum Wsrep_SE_init_result result)
1194 {
1195   mysql_mutex_lock (&LOCK_wsrep_sst_init);
1196   if (SE_initialized == WSREP_SE_INIT_RESULT_NONE)
1197   {
1198     SE_initialized= result;
1199   }
1200   mysql_cond_signal (&COND_wsrep_sst_init);
1201   mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1202 }
1203