1 /* Copyright 2008-2015 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15 
16 
17 #include <mysqld.h>
18 #include "wsrep_sst.h"
19 #include <sql_class.h>
20 #include <set_var.h>
21 #include <sql_acl.h>
22 #include <sql_reload.h>
23 #include <sql_parse.h>
24 #include "wsrep_priv.h"
25 #include "wsrep_utils.h"
26 #include "wsrep_xid.h"
27 #include <cstdio>
28 #include <cstdlib>
29 #include <cctype>
30 
31 extern const char wsrep_defaults_file[];
32 extern const char wsrep_defaults_group_suffix[];
33 
34 #define WSREP_SST_OPT_ROLE     "--role"
35 #define WSREP_SST_OPT_ADDR     "--address"
36 #define WSREP_SST_OPT_AUTH     "--auth"
37 #define WSREP_SST_OPT_DATA     "--datadir"
38 #define WSREP_SST_OPT_CONF     "--defaults-file"
39 #define WSREP_SST_OPT_CONF_SUFFIX "--defaults-group-suffix"
40 #define WSREP_SST_OPT_PARENT   "--parent"
41 #define WSREP_SST_OPT_BINLOG   "--binlog"
42 
43 // mysqldump-specific options
44 #define WSREP_SST_OPT_USER     "--user"
45 #define WSREP_SST_OPT_PSWD     "--password"
46 #define WSREP_SST_OPT_HOST     "--host"
47 #define WSREP_SST_OPT_PORT     "--port"
48 #define WSREP_SST_OPT_LPORT    "--local-port"
49 
50 // donor-specific
51 #define WSREP_SST_OPT_SOCKET   "--socket"
52 #define WSREP_SST_OPT_GTID     "--gtid"
53 #define WSREP_SST_OPT_BYPASS   "--bypass"
54 
55 #define WSREP_SST_MYSQLDUMP       "mysqldump"
56 #define WSREP_SST_RSYNC           "rsync"
57 #define WSREP_SST_SKIP            "skip"
58 #define WSREP_SST_XTRABACKUP      "xtrabackup"
59 #define WSREP_SST_XTRABACKUP_V2   "xtrabackup-v2"
60 #define WSREP_SST_DEFAULT      WSREP_SST_RSYNC
61 #define WSREP_SST_ADDRESS_AUTO "AUTO"
62 #define WSREP_SST_AUTH_MASK    "********"
63 
64 const char* wsrep_sst_method          = WSREP_SST_DEFAULT;
65 const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
66 const char* wsrep_sst_donor           = "";
67       char* wsrep_sst_auth            = NULL;
68 
69 // container for real auth string
70 static const char* sst_auth_real      = NULL;
71 my_bool wsrep_sst_donor_rejects_queries = FALSE;
72 
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)73 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
74 {
75     char   buff[FN_REFLEN];
76     String str(buff, sizeof(buff), system_charset_info), *res;
77     const char* c_str = NULL;
78 
79     if ((res   = var->value->val_str(&str)) &&
80         (c_str = res->c_ptr()) &&
81         strlen(c_str) > 0)
82         return 0;
83 
84     my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL");
85     return 1;
86 }
87 
wsrep_sst_method_update(sys_var * self,THD * thd,enum_var_type type)88 bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type)
89 {
90     return 0;
91 }
92 
sst_receive_address_check(const char * str)93 static bool sst_receive_address_check (const char* str)
94 {
95     if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) ||
96         !strncasecmp(str, "localhost", strlen("localhost")))
97     {
98         return 1;
99     }
100 
101     return 0;
102 }
103 
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)104 bool  wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
105 {
106     const char* c_str = var->value->str_value.c_ptr();
107 
108     if (sst_receive_address_check (c_str))
109     {
110         my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL");
111         return 1;
112     }
113 
114     return 0;
115 }
116 
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)117 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
118                                        enum_var_type type)
119 {
120     return 0;
121 }
122 
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)123 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
124 {
125     return 0;
126 }
sst_auth_real_set(const char * value)127 static bool sst_auth_real_set (const char* value)
128 {
129     const char* v = strdup (value);
130 
131     if (v)
132     {
133         if (sst_auth_real) free (const_cast<char*>(sst_auth_real));
134         sst_auth_real = v;
135 
136         if (strlen(sst_auth_real))
137         {
138           if (wsrep_sst_auth)
139           {
140             my_free ((void*)wsrep_sst_auth);
141             wsrep_sst_auth = my_strdup(key_memory_wsrep,
142                                        WSREP_SST_AUTH_MASK, MYF(0));
143           }
144           else
145             wsrep_sst_auth = my_strdup (key_memory_wsrep,
146                                         WSREP_SST_AUTH_MASK, MYF(0));
147         }
148         return 0;
149     }
150 
151     return 1;
152 }
153 
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)154 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
155 {
156     return sst_auth_real_set (wsrep_sst_auth);
157 }
158 
wsrep_sst_auth_init(const char * value)159 void wsrep_sst_auth_init (const char* value)
160 {
161     if (wsrep_sst_auth == value) wsrep_sst_auth = NULL;
162     if (value) sst_auth_real_set (value);
163 }
164 
wsrep_sst_auth_free()165 void wsrep_sst_auth_free()
166 {
167   if (wsrep_sst_auth) { my_free ((void*)wsrep_sst_auth); }
168   if (sst_auth_real) { free (const_cast<char*>(sst_auth_real)); }
169   wsrep_sst_auth= NULL;
170   sst_auth_real= NULL;
171 }
172 
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)173 bool  wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
174 {
175   return 0;
176 }
177 
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)178 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
179 {
180     return 0;
181 }
182 
wsrep_before_SE()183 bool wsrep_before_SE()
184 {
185   return (wsrep_provider != NULL
186           && strcmp (wsrep_provider,   WSREP_NONE)
187           && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
188           && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
189 }
190 
191 static bool            sst_complete = false;
192 static bool            sst_needed   = false;
193 
wsrep_sst_grab()194 void wsrep_sst_grab ()
195 {
196   WSREP_INFO("wsrep_sst_grab()");
197   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
198   sst_complete = false;
199   mysql_mutex_unlock (&LOCK_wsrep_sst);
200 }
201 
202 // Wait for end of SST
wsrep_sst_wait()203 bool wsrep_sst_wait ()
204 {
205   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
206   while (!sst_complete)
207   {
208     WSREP_INFO("Waiting for SST to complete.");
209     mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst);
210   }
211 
212   if (local_seqno >= 0)
213   {
214     WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
215   }
216   else
217   {
218     WSREP_ERROR("SST failed: %d (%s)",
219                 int(-local_seqno), strerror(-local_seqno));
220   }
221 
222   mysql_mutex_unlock (&LOCK_wsrep_sst);
223 
224   return (local_seqno >= 0);
225 }
226 
227 // Signal end of SST
wsrep_sst_complete(const wsrep_uuid_t * sst_uuid,wsrep_seqno_t sst_seqno,bool needed)228 void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
229                          wsrep_seqno_t       sst_seqno,
230                          bool                needed)
231 {
232   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
233   if (!sst_complete)
234   {
235     sst_complete = true;
236     sst_needed   = needed;
237     local_uuid   = *sst_uuid;
238     local_seqno  = sst_seqno;
239     mysql_cond_signal (&COND_wsrep_sst);
240   }
241   else
242   {
243     /* This can happen when called from wsrep_synced_cb().
244        At the moment there is no way to check there
245        if main thread is still waiting for signal,
246        so wsrep_sst_complete() is called from there
247        each time wsrep_ready changes from FALSE -> TRUE.
248     */
249     WSREP_DEBUG("Nobody is waiting for SST.");
250   }
251   mysql_mutex_unlock (&LOCK_wsrep_sst);
252 }
253 
wsrep_sst_received(wsrep_t * const wsrep,const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno,const void * const state,size_t const state_len)254 void wsrep_sst_received (wsrep_t* const      wsrep,
255                          const wsrep_uuid_t& uuid,
256                          wsrep_seqno_t const seqno,
257                          const void* const   state,
258                          size_t const        state_len)
259 {
260     wsrep_get_SE_checkpoint(local_uuid, local_seqno);
261 
262     if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
263         local_seqno < seqno || seqno < 0)
264     {
265         wsrep_set_SE_checkpoint(uuid, seqno);
266         local_uuid = uuid;
267         local_seqno = seqno;
268     }
269     else if (local_seqno > seqno)
270     {
271         WSREP_WARN("SST postion is in the past: %lld, current: %lld. "
272                    "Can't continue.",
273                    (long long)seqno, (long long)local_seqno);
274         unireg_abort(1);
275     }
276 
277     wsrep_init_sidno(uuid);
278 
279     if (wsrep)
280     {
281         int const rcode(seqno < 0 ? seqno : 0);
282         wsrep_gtid_t const state_id = {
283             uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
284         };
285 
286         wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
287     }
288 }
289 
290 // Let applier threads to continue
wsrep_sst_continue()291 void wsrep_sst_continue ()
292 {
293   if (sst_needed)
294   {
295     WSREP_INFO("Signalling provider to continue.");
296     // local_uuid and local_seqno are global variables and are volatile
297     wsrep_uuid_t  const sst_uuid  = local_uuid;
298     wsrep_seqno_t const sst_seqno = local_seqno;
299     wsrep_sst_received (wsrep, sst_uuid, sst_seqno, NULL, 0);
300   }
301 }
302 
303 struct sst_thread_arg
304 {
305   const char*     cmd;
306   char**          env;
307   char*           ret_str;
308   int             err;
309   mysql_mutex_t   lock;
310   mysql_cond_t    cond;
311 
sst_thread_argsst_thread_arg312   sst_thread_arg (const char* c, char** e)
313     : cmd(c), env(e), ret_str(0), err(-1)
314   {
315     mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
316     mysql_cond_init(key_COND_wsrep_sst_thread, &cond);
317   }
318 
~sst_thread_argsst_thread_arg319   ~sst_thread_arg()
320   {
321     mysql_cond_destroy  (&cond);
322     mysql_mutex_unlock  (&lock);
323     mysql_mutex_destroy (&lock);
324   }
325 };
326 
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)327 static int sst_scan_uuid_seqno (const char* str,
328                                 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
329 {
330   int offt = wsrep_uuid_scan (str, strlen(str), uuid);
331   if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
332   {
333     *seqno = strtoll (str + offt + 1, NULL, 10);
334     if (*seqno != LLONG_MAX || errno != ERANGE)
335     {
336       return 0;
337     }
338   }
339 
340   WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
341   return EINVAL;
342 }
343 
344 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)345 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
346 {
347    char* ret= fgets (buf, buf_len, stream);
348 
349    if (ret)
350    {
351        size_t len = strlen(ret);
352        if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
353    }
354 
355    return ret;
356 }
357 
358 /*
359   Generate opt_binlog_opt_val for sst_donate_other(), sst_prepare_other().
360 
361   Returns zero on success, negative error code otherwise.
362 
363   String containing binlog name is stored in param ret if binlog is enabled
364   and GTID mode is on, otherwise empty string. Returned string should be
365   freed with my_free().
366  */
generate_binlog_opt_val(char ** ret)367 static int generate_binlog_opt_val(char** ret)
368 {
369   assert(ret);
370   *ret= NULL;
371   enum_gtid_mode gtid_mode =   get_gtid_mode(GTID_MODE_LOCK_NONE);
372   if (opt_bin_log && gtid_mode > 0)
373   {
374     assert(opt_bin_logname);
375     *ret=  my_strdup(key_memory_wsrep, opt_bin_logname, MYF(0));
376   }
377   else
378   {
379     *ret= my_strdup(key_memory_wsrep, "", MYF(0));
380   }
381   if (!*ret) return -ENOMEM;
382   return 0;
383 }
384 
sst_joiner_thread(void * a)385 static void* sst_joiner_thread (void* a)
386 {
387   sst_thread_arg* arg= (sst_thread_arg*) a;
388   int err= 1;
389 
390   {
391     const char magic[] = "ready";
392     const size_t magic_len = sizeof(magic) - 1;
393     const size_t out_len = 512;
394     char out[out_len];
395 
396     WSREP_INFO("Running: '%s'", arg->cmd);
397 
398     wsp::process proc (arg->cmd, "r", arg->env);
399 
400     if (proc.pipe() && !proc.error())
401     {
402       const char* tmp= my_fgets (out, out_len, proc.pipe());
403 
404       if (!tmp || strlen(tmp) < (magic_len + 2) ||
405           strncasecmp (tmp, magic, magic_len))
406       {
407         WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
408                     magic, arg->cmd, tmp);
409         proc.wait();
410         if (proc.error()) err = proc.error();
411       }
412       else
413       {
414         err = 0;
415       }
416     }
417     else
418     {
419       err = proc.error();
420       WSREP_ERROR("Failed to execute: %s : %d (%s)",
421                   arg->cmd, err, strerror(err));
422     }
423 
424     // signal sst_prepare thread with ret code,
425     // it will go on sending SST request
426     mysql_mutex_lock   (&arg->lock);
427     if (!err)
428     {
429       arg->ret_str = strdup (out + magic_len + 1);
430       if (!arg->ret_str) err = ENOMEM;
431     }
432     arg->err = -err;
433     mysql_cond_signal  (&arg->cond);
434     mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
435 
436     if (err) return NULL; /* lp:808417 - return immediately, don't signal
437                            * initializer thread to ensure single thread of
438                            * shutdown. */
439 
440     wsrep_uuid_t  ret_uuid  = WSREP_UUID_UNDEFINED;
441     wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
442 
443     // in case of successfull receiver start, wait for SST completion/end
444     char* tmp = my_fgets (out, out_len, proc.pipe());
445 
446     proc.wait();
447     err= EINVAL;
448 
449     if (!tmp)
450     {
451       WSREP_ERROR("Failed to read uuid:seqno from joiner script.");
452       if (proc.error()) err = proc.error();
453     }
454     else
455     {
456       err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
457     }
458 
459     if (err)
460     {
461       ret_uuid=  WSREP_UUID_UNDEFINED;
462       ret_seqno= -err;
463     }
464 
465     // Tell initializer thread that SST is complete
466     wsrep_sst_complete (&ret_uuid, ret_seqno, true);
467   }
468 
469   return NULL;
470 }
471 
472 #define WSREP_SST_AUTH_ENV        "WSREP_SST_OPT_AUTH"
473 #define WSREP_SST_REMOTE_AUTH_ENV "WSREP_SST_OPT_REMOTE_AUTH"
474 #define DATA_HOME_DIR_ENV         "INNODB_DATA_HOME_DIR"
475 
sst_append_env_var(wsp::env & env,const char * const var,const char * const val)476 static int sst_append_env_var(wsp::env&   env,
477                               const char* const var,
478                               const char* const val)
479 {
480   int const env_str_size= strlen(var) + 1 /* = */
481                           + (val ? strlen(val) : 0) + 1 /* \0 */;
482 
483   wsp::string env_str(env_str_size); // for automatic cleanup on return
484   if (!env_str()) return -ENOMEM;
485 
486   int ret= snprintf(env_str(), env_str_size, "%s=%s", var, val ? val : "");
487 
488   if (ret < 0 || ret >= env_str_size)
489   {
490     WSREP_ERROR("sst_append_env_var(): snprintf(%s=%s) failed: %d",
491                 var, val, ret);
492     return (ret < 0 ? ret : -EMSGSIZE);
493   }
494 
495   env.append(env_str());
496   return -env.error();
497 }
498 
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)499 static ssize_t sst_prepare_other (const char*  method,
500                                   const char*  sst_auth,
501                                   const char*  addr_in,
502                                   const char** addr_out)
503 {
504   int const cmd_len= 4096;
505   wsp::string cmd_str(cmd_len);
506 
507   if (!cmd_str())
508   {
509     WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %d bytes",
510                 cmd_len);
511     return -ENOMEM;
512   }
513 
514   const char* binlog_opt= "";
515   char* binlog_opt_val= NULL;
516 
517   int ret;
518   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
519   {
520     WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
521                 ret);
522     return ret;
523   }
524   if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
525 
526   ret= snprintf (cmd_str(), cmd_len,
527                  "wsrep_sst_%s "
528                  WSREP_SST_OPT_ROLE" 'joiner' "
529                  WSREP_SST_OPT_ADDR" '%s' "
530                  WSREP_SST_OPT_DATA" '%s' "
531                  WSREP_SST_OPT_CONF" '%s' "
532                  WSREP_SST_OPT_CONF_SUFFIX" '%s' "
533                  WSREP_SST_OPT_PARENT" '%d'"
534                  " %s '%s' ",
535                  method, addr_in, mysql_real_data_home,
536                  wsrep_defaults_file, wsrep_defaults_group_suffix,
537                  (int)getpid(), binlog_opt, binlog_opt_val);
538   my_free(binlog_opt_val);
539 
540   if (ret < 0 || ret >= cmd_len)
541   {
542     WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
543     return (ret < 0 ? ret : -EMSGSIZE);
544   }
545 
546   wsp::env env(NULL);
547   if (env.error())
548   {
549     WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
550     return -env.error();
551   }
552 
553   if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth)))
554   {
555     WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
556     return ret;
557   }
558 
559   pthread_t tmp;
560   sst_thread_arg arg(cmd_str(), env());
561   mysql_mutex_lock (&arg.lock);
562   ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
563   if (ret)
564   {
565     WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)",
566                 ret, strerror(ret));
567     return -ret;
568   }
569   mysql_cond_wait (&arg.cond, &arg.lock);
570 
571   *addr_out= arg.ret_str;
572 
573   if (!arg.err)
574     ret = strlen(*addr_out);
575   else
576   {
577     assert (arg.err < 0);
578     ret = arg.err;
579   }
580 
581   pthread_detach (tmp);
582 
583   return ret;
584 }
585 
586 extern uint  mysqld_port;
587 
588 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)589 static ssize_t sst_prepare_mysqldump (const char*  addr_in,
590                                       const char** addr_out)
591 {
592   ssize_t ret = strlen (addr_in);
593 
594   if (!strrchr(addr_in, ':'))
595   {
596     ssize_t s = ret + 7;
597     char* tmp = (char*) malloc (s);
598 
599     if (tmp)
600     {
601       ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
602 
603       if (ret > 0 && ret < s)
604       {
605         *addr_out= tmp;
606         return ret;
607       }
608       if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
609       free (tmp);
610     }
611     else {
612       ret= -ENOMEM;
613     }
614 
615     WSREP_ERROR ("Could not prepare state transfer request: "
616                  "adding default port failed: %zd.", ret);
617   }
618   else {
619     *addr_out= addr_in;
620   }
621 
622   return ret;
623 }
624 
625 static enum Wsrep_SE_init_result SE_initialized= WSREP_SE_INIT_RESULT_NONE;
626 
wsrep_sst_prepare(void ** msg,THD * thd)627 ssize_t wsrep_sst_prepare (void** msg, THD *thd)
628 {
629   const ssize_t ip_max= 256;
630   char ip_buf[ip_max];
631   const char* addr_in=  NULL;
632   const char* addr_out= NULL;
633 
634   if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
635   {
636     ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
637     *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
638     if (!msg)
639     {
640       WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
641       unireg_abort(1);
642     }
643     return ret;
644   }
645 
646   // Figure out SST address. Common for all SST methods
647   if (wsrep_sst_receive_address &&
648       strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
649   {
650     addr_in= wsrep_sst_receive_address;
651   }
652   else if (wsrep_node_address && strlen(wsrep_node_address))
653   {
654     size_t const addr_len= strlen(wsrep_node_address);
655     size_t const host_len= wsrep_host_len(wsrep_node_address, addr_len);
656 
657     if (host_len < addr_len)
658     {
659       strncpy (ip_buf, wsrep_node_address, host_len);
660       ip_buf[host_len]= '\0';
661       addr_in= ip_buf;
662     }
663     else
664     {
665       addr_in= wsrep_node_address;
666     }
667   }
668   else
669   {
670     ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
671 
672     if (ret && ret < ip_max)
673     {
674       addr_in= ip_buf;
675     }
676     else
677     {
678       WSREP_ERROR("Could not prepare state transfer request: "
679                   "failed to guess address to accept state transfer at. "
680                   "wsrep_sst_receive_address must be set manually.");
681       if (thd) delete thd;
682       unireg_abort(1);
683     }
684   }
685 
686   ssize_t addr_len= -ENOSYS;
687   if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP))
688   {
689     addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
690     if (addr_len < 0) unireg_abort(1);
691   }
692   else
693   {
694     /*! A heuristic workaround until we learn how to stop and start engines */
695     if (SE_initialized)
696     {
697       // we already did SST at initializaiton, now engines are running
698       // sql_print_information() is here because the message is too long
699       // for WSREP_INFO.
700       sql_print_information ("WSREP: "
701                  "You have configured '%s' state snapshot transfer method "
702                  "which cannot be performed on a running server. "
703                  "Wsrep provider won't be able to fall back to it "
704                  "if other means of state transfer are unavailable. "
705                  "In that case you will need to restart the server.",
706                  wsrep_sst_method);
707       *msg = 0;
708       return 0;
709     }
710 
711     addr_len = sst_prepare_other (wsrep_sst_method, sst_auth_real,
712                                   addr_in, &addr_out);
713     if (addr_len < 0)
714     {
715       WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
716                    wsrep_sst_method);
717       unireg_abort(1);
718     }
719   }
720 
721   size_t const method_len(strlen(wsrep_sst_method));
722   size_t const msg_len   (method_len + addr_len + 2 /* + auth_len + 1*/);
723 
724   *msg = malloc (msg_len);
725   if (NULL != *msg) {
726     char* const method_ptr(static_cast<char*>(*msg));
727     strcpy (method_ptr, wsrep_sst_method);
728     char* const addr_ptr(method_ptr + method_len + 1);
729     strcpy (addr_ptr, addr_out);
730 
731     WSREP_DEBUG("Prepared SST request: %s|%s", method_ptr, addr_ptr);
732   }
733   else {
734     WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
735                 msg_len);
736     unireg_abort(1);
737   }
738 
739   if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
740 
741   return msg_len;
742 }
743 
744 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)745 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
746 {
747   int ret = 0;
748 
749   for (int tries=1; tries <= max_tries; tries++)
750   {
751     wsp::process proc (cmd_str, "r", env);
752 
753     if (NULL != proc.pipe())
754     {
755       proc.wait();
756     }
757 
758     if ((ret = proc.error()))
759     {
760       WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
761                   tries, max_tries, proc.cmd(), ret, strerror(ret));
762       sleep (1);
763     }
764     else
765     {
766       WSREP_DEBUG("SST script successfully completed.");
767       break;
768     }
769   }
770 
771   return -ret;
772 }
773 
sst_reject_queries(my_bool close_conn)774 static void sst_reject_queries(my_bool close_conn)
775 {
776     wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
777     WSREP_INFO("Rejecting client queries for the duration of SST.");
778     if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
779 }
780 
sst_donate_mysqldump(const char * addr,const wsrep_uuid_t * uuid,const char * uuid_str,wsrep_seqno_t seqno,bool bypass,char ** env)781 static int sst_donate_mysqldump (const char*         addr,
782                                  const wsrep_uuid_t* uuid,
783                                  const char*         uuid_str,
784                                  wsrep_seqno_t       seqno,
785                                  bool                bypass,
786                                  char**              env) // carries auth info
787 {
788   int const cmd_len= 4096;
789   wsp::string  cmd_str(cmd_len);
790 
791   if (!cmd_str())
792   {
793     WSREP_ERROR("sst_donate_mysqldump(): "
794                 "could not allocate cmd buffer of %d bytes", cmd_len);
795     return -ENOMEM;
796   }
797 
798   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
799 
800   int ret= snprintf (cmd_str(), cmd_len,
801                      "wsrep_sst_mysqldump "
802                      WSREP_SST_OPT_ADDR" '%s' "
803                      WSREP_SST_OPT_LPORT" '%u' "
804                      WSREP_SST_OPT_SOCKET" '%s' "
805                      WSREP_SST_OPT_CONF" '%s' "
806                      WSREP_SST_OPT_GTID" '%s:%lld'"
807                      "%s",
808                      addr, mysqld_port, mysqld_unix_port,
809                      wsrep_defaults_file, uuid_str,
810                      (long long)seqno, bypass ? " " WSREP_SST_OPT_BYPASS : "");
811 
812   if (ret < 0 || ret >= cmd_len)
813   {
814     WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
815     return (ret < 0 ? ret : -EMSGSIZE);
816   }
817 
818   WSREP_DEBUG("Running: '%s'", cmd_str());
819 
820   ret= sst_run_shell (cmd_str(), env, 3);
821 
822   wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
823 
824   wsrep->sst_sent (wsrep, &state_id, ret);
825 
826   return ret;
827 }
828 
829 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
830 
run_sql_command(THD * thd,const char * query)831 static int run_sql_command(THD *thd, const char *query)
832 {
833   thd->set_query((char *)query, strlen(query));
834 
835   Parser_state ps;
836   if (ps.init(thd, thd->query().str, thd->query().length))
837   {
838     WSREP_ERROR("SST query: %s failed", query);
839     return -1;
840   }
841 
842   mysql_parse(thd, &ps);
843   if (thd->is_error())
844   {
845     int const err= thd->get_stmt_da()->mysql_errno();
846     WSREP_WARN ("error executing '%s': %d (%s)%s",
847                 query, err, thd->get_stmt_da()->message_text(),
848                 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
849                 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
850     thd->clear_error();
851     return -1;
852   }
853   return 0;
854 }
855 
sst_flush_tables(THD * thd)856 static int sst_flush_tables(THD* thd)
857 {
858   WSREP_INFO("Flushing tables for SST...");
859 
860   int err;
861   int not_used;
862   if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
863   {
864     WSREP_ERROR("Failed to flush and lock tables");
865     err = -1;
866   }
867   else
868   {
869     /* make sure logs are flushed after global read lock acquired */
870     err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
871 			      (TABLE_LIST*) 0, &not_used);
872   }
873 
874   if (err)
875   {
876     WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err));
877   }
878   else
879   {
880     WSREP_INFO("Tables flushed.");
881     const char base_name[]= "tables_flushed";
882     ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
883     char *real_name = (char*) malloc(full_len);
884     sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
885     char *tmp_name = (char*) malloc(full_len + 4);
886     sprintf(tmp_name, "%s.tmp", real_name);
887 
888     FILE* file= fopen(tmp_name, "w+");
889     if (0 == file)
890     {
891       err= errno;
892       WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
893     }
894     else
895     {
896       fprintf(file, "%s:%lld\n",
897               wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno);
898       fsync(fileno(file));
899       fclose(file);
900       if (rename(tmp_name, real_name) == -1)
901       {
902         err= errno;
903         WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
904                      tmp_name, real_name, err,strerror(err));
905       }
906     }
907     free(real_name);
908     free(tmp_name);
909   }
910 
911   return err;
912 }
913 
sst_disallow_writes(THD * thd,bool yes)914 static void sst_disallow_writes (THD* thd, bool yes)
915 {
916   char query_str[64] = { 0, };
917   ssize_t const query_max = sizeof(query_str) - 1;
918   snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
919             yes ? 1 : 0);
920 
921   if (run_sql_command(thd, query_str))
922   {
923     WSREP_ERROR("Failed to disallow InnoDB writes");
924   }
925 }
926 
sst_donor_thread(void * a)927 static void* sst_donor_thread (void* a)
928 {
929   sst_thread_arg* arg= (sst_thread_arg*)a;
930 
931   WSREP_INFO("Running: '%s'", arg->cmd);
932 
933   int  err= 1;
934   bool locked= false;
935 
936   const char*  out= NULL;
937   const size_t out_len= 128;
938   char         out_buf[out_len];
939 
940   wsrep_uuid_t  ret_uuid= WSREP_UUID_UNDEFINED;
941   wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST
942 
943   wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
944                        // operate with wsrep_ready == OFF
945   wsp::process proc(arg->cmd, "r", arg->env);
946 
947   err= proc.error();
948 
949 /* Inform server about SST script startup and release TO isolation */
950   mysql_mutex_lock   (&arg->lock);
951   arg->err = -err;
952   mysql_cond_signal  (&arg->cond);
953   mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
954 
955   if (proc.pipe() && !err)
956   {
957 wait_signal:
958     out= my_fgets (out_buf, out_len, proc.pipe());
959 
960     if (out)
961     {
962       const char magic_flush[]= "flush tables";
963       const char magic_cont[]= "continue";
964       const char magic_done[]= "done";
965 
966       if (!strcasecmp (out, magic_flush))
967       {
968         err= sst_flush_tables (thd.ptr);
969         if (!err)
970         {
971           sst_disallow_writes (thd.ptr, true);
972           locked= true;
973           goto wait_signal;
974         }
975       }
976       else if (!strcasecmp (out, magic_cont))
977       {
978         if (locked)
979         {
980           sst_disallow_writes (thd.ptr, false);
981           thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
982           locked= false;
983         }
984         err=  0;
985         goto wait_signal;
986       }
987       else if (!strncasecmp (out, magic_done, strlen(magic_done)))
988       {
989         err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
990                                   &ret_uuid, &ret_seqno);
991       }
992       else
993       {
994         WSREP_WARN("Received unknown signal: '%s'", out);
995       }
996     }
997     else
998     {
999       WSREP_ERROR("Failed to read from: %s", proc.cmd());
1000       proc.wait();
1001     }
1002     if (!err && proc.error()) err= proc.error();
1003   }
1004   else
1005   {
1006     WSREP_ERROR("Failed to execute: %s : %d (%s)",
1007                 proc.cmd(), err, strerror(err));
1008   }
1009 
1010   if (locked) // don't forget to unlock server before return
1011   {
1012     sst_disallow_writes (thd.ptr, false);
1013     thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1014   }
1015 
1016   // signal to donor that SST is over
1017   struct wsrep_gtid const state_id = {
1018       ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
1019   };
1020   wsrep->sst_sent (wsrep, &state_id, -err);
1021   proc.wait();
1022 
1023   return NULL;
1024 }
1025 
1026 
1027 
sst_donate_other(const char * method,const char * addr,const char * uuid,wsrep_seqno_t seqno,bool bypass,char ** env)1028 static int sst_donate_other (const char*   method,
1029                              const char*   addr,
1030                              const char*   uuid,
1031                              wsrep_seqno_t seqno,
1032                              bool          bypass,
1033                              char**        env) // carries auth info
1034 {
1035   int const cmd_len= 4096;
1036   wsp::string  cmd_str(cmd_len);
1037 
1038   if (!cmd_str())
1039   {
1040     WSREP_ERROR("sst_donate_other(): "
1041                 "could not allocate cmd buffer of %d bytes", cmd_len);
1042     return -ENOMEM;
1043   }
1044 
1045   const char* binlog_opt= "";
1046   char* binlog_opt_val= NULL;
1047 
1048   int ret;
1049   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1050   {
1051     WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1052     return ret;
1053   }
1054   if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
1055 
1056   ret= snprintf (cmd_str(), cmd_len,
1057                  "wsrep_sst_%s "
1058                  WSREP_SST_OPT_ROLE" 'donor' "
1059                  WSREP_SST_OPT_ADDR" '%s' "
1060                  WSREP_SST_OPT_LPORT " '%u' "
1061                  WSREP_SST_OPT_SOCKET" '%s' "
1062                  WSREP_SST_OPT_DATA" '%s' "
1063                  WSREP_SST_OPT_CONF" '%s' "
1064                  WSREP_SST_OPT_CONF_SUFFIX" '%s' "
1065                  " %s '%s' "
1066                  WSREP_SST_OPT_GTID" '%s:%lld'"
1067                  "%s",
1068                  method, addr, mysqld_port, mysqld_unix_port,
1069                  mysql_real_data_home,
1070                  wsrep_defaults_file, wsrep_defaults_group_suffix,
1071                  binlog_opt, binlog_opt_val,
1072                  uuid, (long long) seqno,
1073                  bypass ? " " WSREP_SST_OPT_BYPASS : "");
1074   my_free(binlog_opt_val);
1075 
1076   if (ret < 0 || ret >= cmd_len)
1077   {
1078     WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1079     return (ret < 0 ? ret : -EMSGSIZE);
1080   }
1081 
1082   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1083 
1084   pthread_t tmp;
1085   sst_thread_arg arg(cmd_str(), env);
1086   mysql_mutex_lock (&arg.lock);
1087   ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg);
1088   if (ret)
1089   {
1090     WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)",
1091                 ret, strerror(ret));
1092     return ret;
1093   }
1094   mysql_cond_wait (&arg.cond, &arg.lock);
1095 
1096   WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1097   return arg.err;
1098 }
1099 
1100 /* return true if character can be a part of a filename */
filename_char(int const c)1101 static bool filename_char(int const c)
1102 {
1103   return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1104 }
1105 
1106 /* return true if character can be a part of an address string */
address_char(int const c)1107 static bool address_char(int const c)
1108 {
1109   return filename_char(c) ||
1110          (c == ':') || (c == '[') || (c == ']') || (c == '/');
1111 }
1112 
check_request_str(const char * const str,bool (* check)(int c))1113 static bool check_request_str(const char* const str,
1114                               bool (*check) (int c))
1115 {
1116   for (size_t i(0); str[i] != '\0'; ++i)
1117   {
1118     if (!check(str[i]))
1119     {
1120       WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1121                  str[i], str[i]);
1122       return true;
1123     }
1124   }
1125 
1126   return false;
1127 }
1128 
wsrep_sst_donate_cb(void * app_ctx,void * recv_ctx,const void * msg,size_t msg_len,const wsrep_gtid_t * current_gtid,const char * state,size_t state_len,bool bypass)1129 wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
1130                                        const void* msg, size_t msg_len,
1131                                        const wsrep_gtid_t* current_gtid,
1132                                        const char* state, size_t state_len,
1133                                        bool bypass)
1134 {
1135   const char* method = (char*)msg;
1136   size_t method_len  = strlen (method);
1137 
1138   if (check_request_str(method, filename_char))
1139   {
1140     WSREP_ERROR("Bad SST method name. SST canceled.");
1141     return WSREP_CB_FAILURE;
1142   }
1143 
1144   const char* data   = method + method_len + 1;
1145 
1146   /* check for auth@addr separator */
1147   const char* addr= strrchr(data, '@');
1148   wsp::string remote_auth;
1149   if (addr)
1150   {
1151     remote_auth.set(strndup(data, addr - data));
1152     addr++;
1153   }
1154   else
1155   {
1156     // no auth part
1157     addr= data;
1158   }
1159 
1160   if (check_request_str(addr, address_char))
1161   {
1162     WSREP_ERROR("Bad SST address string. SST canceled.");
1163     return WSREP_CB_FAILURE;
1164   }
1165 
1166   char uuid_str[37];
1167   wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
1168 
1169   /* This will be reset when sync callback is called.
1170    * Should we set wsrep_ready to FALSE here too? */
1171   local_status.set(WSREP_MEMBER_DONOR);
1172 
1173   wsp::env env(NULL);
1174   if (env.error())
1175   {
1176     WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1177     return WSREP_CB_FAILURE;
1178   }
1179 
1180   int ret;
1181   if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth_real)))
1182   {
1183     WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1184     return WSREP_CB_FAILURE;
1185   }
1186 
1187   if (remote_auth())
1188   {
1189     if ((ret= sst_append_env_var(env, WSREP_SST_REMOTE_AUTH_ENV,remote_auth())))
1190     {
1191       WSREP_ERROR("wsrep_sst_donate_cb(): appending remote auth env failed: "
1192                   "%d", ret);
1193       return WSREP_CB_FAILURE;
1194     }
1195   }
1196 
1197   if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1198   {
1199     ret = sst_donate_mysqldump(addr, &current_gtid->uuid, uuid_str,
1200                                current_gtid->seqno, bypass, env());
1201   }
1202   else
1203   {
1204     ret = sst_donate_other(method, addr, uuid_str,
1205                            current_gtid->seqno, bypass, env());
1206   }
1207 
1208   return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
1209 }
1210 
wsrep_SE_init_wait(THD * thd)1211 enum Wsrep_SE_init_result wsrep_SE_init_wait(THD* thd)
1212 {
1213   mysql_mutex_lock (&LOCK_wsrep_sst_init);
1214   while (SE_initialized == WSREP_SE_INIT_RESULT_NONE &&
1215          thd->killed == THD::NOT_KILLED)
1216   {
1217     mysql_mutex_lock(&thd->LOCK_thd_data);
1218     thd->current_cond= &COND_wsrep_sst_init;
1219     thd->current_mutex= &LOCK_wsrep_sst_init;
1220     mysql_mutex_unlock(&thd->LOCK_thd_data);
1221 
1222     mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
1223 
1224     if (thd->killed != THD::NOT_KILLED)
1225     {
1226       WSREP_DEBUG("SE init waiting canceled");
1227       break;
1228     }
1229     mysql_mutex_lock(&thd->LOCK_thd_data);
1230     thd->current_cond= NULL;
1231     thd->current_mutex= NULL;
1232     mysql_mutex_unlock(&thd->LOCK_thd_data);
1233   }
1234   enum Wsrep_SE_init_result ret= SE_initialized;
1235   mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1236 
1237   mysql_mutex_lock(&thd->LOCK_thd_data);
1238   thd->current_cond= NULL;
1239   thd->current_mutex= NULL;
1240   mysql_mutex_unlock(&thd->LOCK_thd_data);
1241   return ret;
1242 }
1243 
wsrep_SE_initialized(enum Wsrep_SE_init_result result)1244 void wsrep_SE_initialized(enum Wsrep_SE_init_result result)
1245 {
1246   mysql_mutex_lock (&LOCK_wsrep_sst_init);
1247   if (SE_initialized == WSREP_SE_INIT_RESULT_NONE)
1248   {
1249     SE_initialized= result;
1250   }
1251   mysql_cond_signal (&COND_wsrep_sst_init);
1252   mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1253 }
1254