1 /* Copyright 2008-2020 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #include "mariadb.h"
17 #include "wsrep_sst.h"
18 #include <inttypes.h>
19 #include <ctype.h>
20 #include <mysqld.h>
21 #include <m_ctype.h>
22 #include <strfunc.h>
23 #include <sql_class.h>
24 #include <set_var.h>
25 #include <sql_acl.h>
26 #include <sql_reload.h>
27 #include <sql_parse.h>
28 #include "wsrep_priv.h"
29 #include "wsrep_utils.h"
30 #include "wsrep_xid.h"
31 #include "wsrep_thd.h"
32 #include "wsrep_mysqld.h"
33 
34 #include <cstdio>
35 #include <cstdlib>
36 
37 #include <my_service_manager.h>
38 
39 static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 +
40                                 sizeof(WSREP_SST_OPT_CONF) +
41                                 sizeof(WSREP_SST_OPT_CONF_SUFFIX) +
42                                 sizeof(WSREP_SST_OPT_CONF_EXTRA)]= {0};
43 
44 const char* wsrep_sst_method         = WSREP_SST_DEFAULT;
45 const char* wsrep_sst_receive_address= WSREP_SST_ADDRESS_AUTO;
46 const char* wsrep_sst_donor          = "";
47 const char* wsrep_sst_auth           = NULL;
48 
49 // container for real auth string
50 static const char* sst_auth_real     = NULL;
51 my_bool wsrep_sst_donor_rejects_queries= FALSE;
52 
53 #define WSREP_EXTEND_TIMEOUT_INTERVAL 60
54 #define WSREP_TIMEDWAIT_SECONDS 30
55 
56 bool sst_joiner_completed            = false;
57 bool sst_donor_completed             = false;
58 
59 struct sst_thread_arg
60 {
61   const char*     cmd;
62   char**          env;
63   char*           ret_str;
64   int             err;
65   mysql_mutex_t   lock;
66   mysql_cond_t    cond;
67 
sst_thread_argsst_thread_arg68   sst_thread_arg (const char* c, char** e)
69     : cmd(c), env(e), ret_str(0), err(-1)
70   {
71     mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
72     mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
73   }
74 
~sst_thread_argsst_thread_arg75   ~sst_thread_arg()
76   {
77     mysql_cond_destroy  (&cond);
78     mysql_mutex_unlock  (&lock);
79     mysql_mutex_destroy (&lock);
80   }
81 };
82 
wsrep_donor_monitor_end(void)83 static void wsrep_donor_monitor_end(void)
84 {
85   mysql_mutex_lock(&LOCK_wsrep_donor_monitor);
86   sst_donor_completed= true;
87   mysql_cond_signal(&COND_wsrep_donor_monitor);
88   mysql_mutex_unlock(&LOCK_wsrep_donor_monitor);
89 }
90 
wsrep_joiner_monitor_end(void)91 static void wsrep_joiner_monitor_end(void)
92 {
93   mysql_mutex_lock(&LOCK_wsrep_joiner_monitor);
94   sst_joiner_completed= true;
95   mysql_cond_signal(&COND_wsrep_joiner_monitor);
96   mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor);
97 }
98 
wsrep_sst_donor_monitor_thread(void * arg)99 static void* wsrep_sst_donor_monitor_thread(void *arg __attribute__((unused)))
100 {
101   int ret= 0;
102   unsigned long time_waited= 0;
103 
104   mysql_mutex_lock(&LOCK_wsrep_donor_monitor);
105 
106   WSREP_INFO("Donor monitor thread started to monitor");
107 
108   wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
109                        // operate with wsrep_ready == OFF
110 
111   while (!sst_donor_completed)
112   {
113     timespec ts;
114     set_timespec(ts, WSREP_TIMEDWAIT_SECONDS);
115     time_t start_time= time(NULL);
116     ret= mysql_cond_timedwait(&COND_wsrep_donor_monitor, &LOCK_wsrep_donor_monitor, &ts);
117     time_t end_time= time(NULL);
118     time_waited+= difftime(end_time, start_time);
119 
120     if (ret == ETIMEDOUT && !sst_donor_completed)
121     {
122       WSREP_DEBUG("Donor waited %lu sec, extending systemd startup timeout as SST"
123                  "is not completed",
124                  time_waited);
125       service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
126         "WSREP state transfer ongoing...");
127     }
128   }
129 
130   WSREP_INFO("Donor monitor thread ended with total time %lu sec", time_waited);
131   mysql_mutex_unlock(&LOCK_wsrep_donor_monitor);
132 
133   return NULL;
134 }
135 
wsrep_sst_joiner_monitor_thread(void * arg)136 static void* wsrep_sst_joiner_monitor_thread(void *arg __attribute__((unused)))
137 {
138   int ret= 0;
139   unsigned long time_waited= 0;
140 
141   mysql_mutex_lock(&LOCK_wsrep_joiner_monitor);
142 
143   WSREP_INFO("Joiner monitor thread started to monitor");
144 
145   wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
146                        // operate with wsrep_ready == OFF
147 
148   while (!sst_joiner_completed)
149   {
150     timespec ts;
151     set_timespec(ts, WSREP_TIMEDWAIT_SECONDS);
152     time_t start_time= time(NULL);
153     ret= mysql_cond_timedwait(&COND_wsrep_joiner_monitor, &LOCK_wsrep_joiner_monitor, &ts);
154     time_t end_time= time(NULL);
155     time_waited+= difftime(end_time, start_time);
156 
157     if (ret == ETIMEDOUT && !sst_joiner_completed)
158     {
159       WSREP_DEBUG("Joiner waited %lu sec, extending systemd startup timeout as SST"
160                  "is not completed",
161                  time_waited);
162       service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
163         "WSREP state transfer ongoing...");
164     }
165   }
166 
167   WSREP_INFO("Joiner monitor thread ended with total time %lu sec", time_waited);
168   mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor);
169 
170   return NULL;
171 }
172 
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)173 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
174 {
175   if ((! var->save_result.string_value.str) ||
176       (var->save_result.string_value.length == 0 ))
177   {
178     my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
179              var->save_result.string_value.str ?
180              var->save_result.string_value.str : "NULL");
181     return 1;
182   }
183 
184   return 0;
185 }
186 
187 static const char* data_home_dir;
188 
wsrep_set_data_home_dir(const char * data_dir)189 void wsrep_set_data_home_dir(const char *data_dir)
190 {
191   data_home_dir= (data_dir && *data_dir) ? data_dir : NULL;
192 }
193 
make_wsrep_defaults_file()194 static void make_wsrep_defaults_file()
195 {
196   if (!wsrep_defaults_file[0])
197   {
198     char *ptr= wsrep_defaults_file;
199     char *end= ptr + sizeof(wsrep_defaults_file);
200     if (my_defaults_file)
201       ptr= strxnmov(ptr, end - ptr,
202                     WSREP_SST_OPT_CONF, " '", my_defaults_file, "' ", NULL);
203 
204     if (my_defaults_extra_file)
205       ptr= strxnmov(ptr, end - ptr,
206                     WSREP_SST_OPT_CONF_EXTRA, " '", my_defaults_extra_file, "' ", NULL);
207 
208     if (my_defaults_group_suffix)
209       ptr= strxnmov(ptr, end - ptr,
210                     WSREP_SST_OPT_CONF_SUFFIX, " '", my_defaults_group_suffix, "' ", NULL);
211   }
212 }
213 
214 
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)215 bool  wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
216 {
217   if ((! var->save_result.string_value.str) ||
218       (var->save_result.string_value.length > (FN_REFLEN - 1))) // safety
219   {
220     goto err;
221   }
222 
223   return 0;
224 
225 err:
226   my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
227            var->save_result.string_value.str ?
228            var->save_result.string_value.str : "NULL");
229   return 1;
230 }
231 
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)232 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
233                                        enum_var_type type)
234 {
235     return 0;
236 }
237 
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)238 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
239 {
240     return 0;
241 }
242 
sst_auth_real_set(const char * value)243 static bool sst_auth_real_set (const char* value)
244 {
245   const char* v= NULL;
246 
247   if (value)
248   {
249     v= my_strdup(PSI_INSTRUMENT_ME, value, MYF(0));
250   }
251   else                                          // its NULL
252   {
253     wsrep_sst_auth_free();
254     return 0;
255   }
256 
257   if (v)
258   {
259     // set sst_auth_real
260     if (sst_auth_real) { my_free((void *) sst_auth_real); }
261     sst_auth_real= v;
262 
263     // mask wsrep_sst_auth
264     if (strlen(sst_auth_real))
265     {
266       if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
267       wsrep_sst_auth= my_strdup(PSI_INSTRUMENT_ME, WSREP_SST_AUTH_MASK, MYF(0));
268     }
269     else
270     {
271       if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
272       wsrep_sst_auth= NULL;
273     }
274 
275     return 0;
276   }
277   return 1;
278 }
279 
wsrep_sst_auth_free()280 void wsrep_sst_auth_free()
281 {
282   if (wsrep_sst_auth) { my_free((void *) wsrep_sst_auth); }
283   if (sst_auth_real) { my_free((void *) sst_auth_real); }
284   wsrep_sst_auth= NULL;
285   sst_auth_real= NULL;
286 }
287 
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)288 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
289 {
290   return sst_auth_real_set (wsrep_sst_auth);
291 }
292 
wsrep_sst_auth_init()293 void wsrep_sst_auth_init ()
294 {
295   sst_auth_real_set(wsrep_sst_auth);
296 }
297 
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)298 bool  wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
299 {
300   return 0;
301 }
302 
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)303 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
304 {
305   return 0;
306 }
307 
308 
wsrep_before_SE()309 bool wsrep_before_SE()
310 {
311   return (wsrep_provider != NULL
312           && strcmp (wsrep_provider,   WSREP_NONE)
313           && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
314           && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
315 }
316 
317 // Signal end of SST
wsrep_sst_complete(THD * thd,int const rcode,wsrep::gtid const sst_gtid)318 static bool wsrep_sst_complete (THD*                thd,
319                                 int const           rcode,
320                                 wsrep::gtid const   sst_gtid)
321 {
322   Wsrep_client_service client_service(thd, thd->wsrep_cs());
323   Wsrep_server_state& server_state= Wsrep_server_state::instance();
324   enum wsrep::server_state::state state= server_state.state();
325   bool failed= false;
326   char start_pos_buf[FN_REFLEN];
327   ssize_t len= wsrep::print_to_c_str(sst_gtid, start_pos_buf, FN_REFLEN-1);
328   start_pos_buf[len]='\0';
329 
330   // Do not call sst_received if we are not in joiner or
331   // initialized state on server. This is because it
332   // assumes we are on those states. Give error if we are
333   // in incorrect state.
334   if ((state == Wsrep_server_state::s_joiner ||
335        state == Wsrep_server_state::s_initialized))
336   {
337     Wsrep_server_state::instance().sst_received(client_service,
338        rcode);
339     WSREP_INFO("SST succeeded for position %s", start_pos_buf);
340   }
341   else
342   {
343     WSREP_ERROR("SST failed for position %s initialized %d server_state %s",
344                 start_pos_buf,
345                 server_state.is_initialized(),
346                 wsrep::to_c_string(state));
347     failed= true;
348   }
349 
350   wsrep_joiner_monitor_end();
351   return failed;
352 }
353 
354   /*
355   If wsrep provider is loaded, inform that the new state snapshot
356   has been received. Also update the local checkpoint.
357 
358   @param thd       [IN]
359   @param uuid      [IN]               Initial state UUID
360   @param seqno     [IN]               Initial state sequence number
361   @param state     [IN]               Always NULL, also ignored by wsrep provider (?)
362   @param state_len [IN]               Always 0, also ignored by wsrep provider (?)
363   @return true when successful, false if error
364 */
wsrep_sst_received(THD * thd,const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno,const void * const state,size_t const state_len)365 bool wsrep_sst_received (THD*                thd,
366                          const wsrep_uuid_t& uuid,
367                          wsrep_seqno_t const seqno,
368                          const void* const   state,
369                          size_t const        state_len)
370 {
371   bool error= false;
372   /*
373     To keep track of whether the local uuid:seqno should be updated. Also, note
374     that local state (uuid:seqno) is updated/checkpointed only after we get an
375     OK from wsrep provider. By doing so, the values remain consistent across
376     the server & wsrep provider.
377   */
378     /*
379       TODO: Handle backwards compatibility. WSREP API v25 does not have
380       wsrep schema.
381     */
382     /*
383       Logical SST methods (mysqldump etc) don't update InnoDB sys header.
384       Reset the SE checkpoint before recovering view in order to avoid
385       sanity check failure.
386      */
387     wsrep::gtid const sst_gtid(wsrep::id(uuid.data, sizeof(uuid.data)),
388                                wsrep::seqno(seqno));
389 
390     if (!wsrep_before_SE()) {
391       wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined());
392       wsrep_set_SE_checkpoint(sst_gtid, wsrep_gtid_server.gtid());
393     }
394     wsrep_verify_SE_checkpoint(uuid, seqno);
395 
396     /*
397       Both wsrep_init_SR() and wsrep_recover_view() may use
398       wsrep thread pool. Restore original thd context before returning.
399     */
400     if (thd) {
401       wsrep_store_threadvars(thd);
402     }
403     else {
404       set_current_thd(nullptr);
405     }
406 
407     /* During sst WSREP(thd) is not yet set for joiner. */
408     if (WSREP_ON)
409     {
410       int const rcode(seqno < 0 ? seqno : 0);
411       error= wsrep_sst_complete(thd,rcode, sst_gtid);
412     }
413 
414     return error;
415 }
416 
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)417 static int sst_scan_uuid_seqno (const char* str,
418                                 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
419 {
420   int offt= wsrep_uuid_scan (str, strlen(str), uuid);
421   errno= 0;                                     /* Reset the errno */
422   if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
423   {
424     *seqno= strtoll (str + offt + 1, NULL, 10);
425     if (*seqno != LLONG_MAX || errno != ERANGE)
426     {
427       return 0;
428     }
429   }
430 
431   WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
432   return -EINVAL;
433 }
434 
435 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)436 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
437 {
438    char* ret= fgets (buf, buf_len, stream);
439 
440    if (ret)
441    {
442        size_t len= strlen(ret);
443        if (len > 0 && ret[len - 1] == '\n') ret[len - 1]= '\0';
444    }
445 
446    return ret;
447 }
448 
449 /*
450   Generate "name 'value'" string.
451 */
generate_name_value(const char * name,const char * value)452 static char* generate_name_value(const char* name, const char* value)
453 {
454   size_t name_len= strlen(name);
455   size_t value_len= strlen(value);
456   char* buf=
457     (char*) my_malloc(PSI_INSTRUMENT_ME, (name_len + value_len + 5), MYF(0));
458   if (buf)
459   {
460     char* ref= buf;
461     *ref++ = ' ';
462     memcpy(ref, name, name_len * sizeof(char));
463     ref += name_len;
464     *ref++ = ' ';
465     *ref++ = '\'';
466     memcpy(ref, value, value_len * sizeof(char));
467     ref += value_len;
468     *ref++ = '\'';
469     *ref = 0;
470   }
471   return buf;
472 }
473 /*
474   Generate binlog option string for sst_donate_other(), sst_prepare_other().
475 
476   Returns zero on success, negative error code otherwise.
477 
478   String containing binlog name is stored in param ret if binlog is enabled
479   and GTID mode is on, otherwise empty string. Returned string should be
480   freed with my_free().
481  */
generate_binlog_opt_val(char ** ret)482 static int generate_binlog_opt_val(char** ret)
483 {
484   DBUG_ASSERT(ret);
485   *ret= NULL;
486   if (opt_bin_log)
487   {
488     assert(opt_bin_logname);
489     *ret= strcmp(opt_bin_logname, "0") ?
490       generate_name_value(WSREP_SST_OPT_BINLOG,
491                           opt_bin_logname) :
492       my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
493   }
494   else
495   {
496     *ret= my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
497   }
498   if (!*ret) return -ENOMEM;
499   return 0;
500 }
501 
generate_binlog_index_opt_val(char ** ret)502 static int generate_binlog_index_opt_val(char** ret)
503 {
504   DBUG_ASSERT(ret);
505   *ret= NULL;
506   if (opt_binlog_index_name)
507   {
508     *ret= strcmp(opt_binlog_index_name, "0") ?
509       generate_name_value(WSREP_SST_OPT_BINLOG_INDEX,
510                           opt_binlog_index_name) :
511       my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
512   }
513   else
514   {
515     *ret= my_strdup(PSI_INSTRUMENT_ME, "", MYF(0));
516   }
517   if (!*ret) return -ENOMEM;
518   return 0;
519 }
520 
sst_joiner_thread(void * a)521 static void* sst_joiner_thread (void* a)
522 {
523   sst_thread_arg* arg= (sst_thread_arg*) a;
524   int err= 1;
525 
526   {
527     THD* thd;
528     const char magic[]= "ready";
529     const size_t magic_len= sizeof(magic) - 1;
530     const size_t out_len= 512;
531     char out[out_len];
532 
533     WSREP_INFO("Running: '%s'", arg->cmd);
534 
535     wsp::process proc (arg->cmd, "r", arg->env);
536 
537     if (proc.pipe() && !proc.error())
538     {
539       const char* tmp= my_fgets (out, out_len, proc.pipe());
540 
541       if (!tmp || strlen(tmp) < (magic_len + 2) ||
542           strncasecmp (tmp, magic, magic_len))
543       {
544         WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
545                     magic, arg->cmd, tmp);
546         proc.wait();
547         if (proc.error()) err= proc.error();
548       }
549       else
550       {
551         err= 0;
552       }
553     }
554     else
555     {
556       err= proc.error();
557       WSREP_ERROR("Failed to execute: %s : %d (%s)",
558                   arg->cmd, err, strerror(err));
559     }
560 
561     /*
562       signal sst_prepare thread with ret code,
563       it will go on sending SST request
564     */
565     mysql_mutex_lock   (&arg->lock);
566     if (!err)
567     {
568       arg->ret_str= strdup (out + magic_len + 1);
569       if (!arg->ret_str) err= ENOMEM;
570     }
571     arg->err= -err;
572     mysql_cond_signal  (&arg->cond);
573     mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
574 
575     if (err) return NULL; /* lp:808417 - return immediately, don't signal
576                            * initializer thread to ensure single thread of
577                            * shutdown. */
578 
579     wsrep_uuid_t  ret_uuid = WSREP_UUID_UNDEFINED;
580     wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
581 
582     // in case of successfull receiver start, wait for SST
583     // completion/end
584     char* tmp= my_fgets (out, out_len, proc.pipe());
585 
586     proc.wait();
587 
588     err= EINVAL;
589 
590     if (!tmp)
591     {
592       WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
593                   "joiner script.");
594       if (proc.error()) err= proc.error();
595     }
596     else
597     {
598       // Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any).
599       const char *pos= strchr(out, ' ');
600 
601       if (!pos) {
602 
603         if (wsrep_gtid_mode)
604         {
605           // There is no wsrep_gtid_domain_id (some older version SST script?).
606           WSREP_WARN("Did not find domain ID from SST script output '%s'. "
607                      "Domain ID must be set manually to keep binlog consistent",
608                      out);
609         }
610         err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
611 
612       } else {
613         // Scan state ID first followed by wsrep_gtid_domain_id.
614         unsigned long int domain_id;
615 
616         // Null-terminate the state-id.
617         out[pos - out]= 0;
618         err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
619 
620         if (err)
621         {
622           goto err;
623         }
624         else if (wsrep_gtid_mode)
625         {
626           errno= 0;                             /* Reset the errno */
627           domain_id= strtoul(pos + 1, NULL, 10);
628           err= errno;
629 
630           /* Check if we received a valid gtid_domain_id. */
631           if (err == EINVAL || err == ERANGE)
632           {
633             WSREP_ERROR("Failed to get donor wsrep_gtid_domain_id.");
634             err= EINVAL;
635             goto err;
636           } else {
637             wsrep_gtid_server.domain_id= (uint32) domain_id;
638             wsrep_gtid_domain_id= (uint32)domain_id;
639           }
640         }
641       }
642     }
643 
644 err:
645 
646     wsrep::gtid ret_gtid;
647 
648     if (err)
649     {
650       ret_gtid= wsrep::gtid::undefined();
651     }
652     else
653     {
654       ret_gtid= wsrep::gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)),
655                             wsrep::seqno(ret_seqno));
656     }
657 
658     /*
659       Tell initializer thread that SST is complete
660       For that initialize a THD
661     */
662     if (my_thread_init())
663     {
664       WSREP_ERROR("my_thread_init() failed, can't signal end of SST. "
665                   "Aborting.");
666       unireg_abort(1);
667     }
668 
669     thd= new THD(next_thread_id());
670 
671     if (!thd)
672     {
673       WSREP_ERROR("Failed to allocate THD to restore view from local state, "
674                   "can't signal end of SST. Aborting.");
675       unireg_abort(1);
676     }
677 
678     thd->thread_stack= (char*) &thd;
679     thd->security_ctx->skip_grants();
680     thd->system_thread= SYSTEM_THREAD_GENERIC;
681     thd->real_id= pthread_self();
682 
683     wsrep_assign_from_threadvars(thd);
684     wsrep_store_threadvars(thd);
685 
686     /* */
687     thd->variables.wsrep_on    = 0;
688     /* No binlogging */
689     thd->variables.sql_log_bin = 0;
690     thd->variables.option_bits &= ~OPTION_BIN_LOG;
691     /* No general log */
692     thd->variables.option_bits |= OPTION_LOG_OFF;
693     /* Read committed isolation to avoid gap locking */
694     thd->variables.tx_isolation= ISO_READ_COMMITTED;
695 
696     wsrep_sst_complete (thd, -err, ret_gtid);
697 
698     delete thd;
699     my_thread_end();
700   }
701 
702   return NULL;
703 }
704 
705 #define WSREP_SST_AUTH_ENV        "WSREP_SST_OPT_AUTH"
706 #define WSREP_SST_REMOTE_AUTH_ENV "WSREP_SST_OPT_REMOTE_AUTH"
707 #define DATA_HOME_DIR_ENV         "INNODB_DATA_HOME_DIR"
708 
sst_append_env_var(wsp::env & env,const char * const var,const char * const val)709 static int sst_append_env_var(wsp::env&   env,
710                               const char* const var,
711                               const char* const val)
712 {
713   int const env_str_size= strlen(var) + 1 /* = */
714                           + (val ? strlen(val) : 0) + 1 /* \0 */;
715 
716   wsp::string env_str(env_str_size); // for automatic cleanup on return
717   if (!env_str()) return -ENOMEM;
718 
719   int ret= snprintf(env_str(), env_str_size, "%s=%s", var, val ? val : "");
720 
721   if (ret < 0 || ret >= env_str_size)
722   {
723     WSREP_ERROR("sst_append_env_var(): snprintf(%s=%s) failed: %d",
724                 var, val, ret);
725     return (ret < 0 ? ret : -EMSGSIZE);
726   }
727 
728   env.append(env_str());
729   return -env.error();
730 }
731 
732 #ifdef __WIN__
733 /*
734   Space, single quote, ampersand, backquote, I/O redirection
735   characters, caret, all brackets, plus, exclamation and comma
736   characters require text to be enclosed in double quotes:
737 */
738 #define IS_SPECIAL(c) \
739   (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
740                  c ==  '>' || c == '<' || c == ';' || c == '^' || \
741                  c ==  '[' || c == ']' || c == '{' || c == '}' || \
742                  c ==  '(' || c == ')' || c == '+' || c == '!' || \
743                  c ==  ',')
744 /*
745   Inside values, equals character are interpreted as special
746   character and requires quotation:
747 */
748 #define IS_SPECIAL_V(c) (IS_SPECIAL(c) || c == '=')
749 /*
750   Double quotation mark and percent characters require escaping:
751 */
752 #define IS_REQ_ESCAPING(c) (c == '""' || c == '%')
753 #else
754 /*
755   Space, single quote, ampersand, backquote, and I/O redirection
756   characters require text to be enclosed in double quotes. The
757   semicolon is used to separate shell commands, so it must be
758   enclosed in double quotes as well:
759 */
760 #define IS_SPECIAL(c) \
761   (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
762                  c ==  '>' || c == '<' || c == ';')
763 /*
764   Inside values, characters are interpreted as in parameter names:
765 */
766 #define IS_SPECIAL_V(c) IS_SPECIAL(c)
767 /*
768   Double quotation mark and backslash characters require
769   backslash prefixing, the dollar symbol is used to substitute
770   a variable value, therefore it also requires escaping:
771 */
772 #define IS_REQ_ESCAPING(c) (c == '"' || c == '\\' || c == '$')
773 #endif
774 
estimate_cmd_len(bool * extra_args)775 static size_t estimate_cmd_len (bool* extra_args)
776 {
777   /*
778     The length of the area reserved for the control parameters
779     of the SST script (excluding the copying of the original
780     mysqld arguments):
781   */
782   size_t cmd_len= 4096;
783   bool extra= false;
784   /*
785     If mysqld was started with arguments, add them all:
786   */
787   if (orig_argc > 1)
788   {
789     for (int i = 1; i < orig_argc; i++)
790     {
791       const char* arg= orig_argv[i];
792       size_t n= strlen(arg);
793       if (n == 0) continue;
794       cmd_len += n;
795       bool quotation= false;
796       char c;
797       while ((c = *arg++) != 0)
798       {
799         if (IS_SPECIAL(c))
800         {
801           quotation= true;
802         }
803         else if (IS_REQ_ESCAPING(c))
804         {
805           cmd_len++;
806 #ifdef __WIN__
807           quotation= true;
808 #endif
809         }
810         /*
811           If the equals symbol is encountered, then we need to separately
812           process the right side:
813         */
814         else if (c == '=')
815         {
816           /* Perhaps we need to quote the left part of the argument: */
817           if (quotation)
818           {
819             cmd_len += 2;
820             /*
821               Reset the quotation flag, since now the status for
822               the right side of the expression will be saved here:
823             */
824             quotation= false;
825           }
826           while ((c = *arg++) != 0)
827           {
828             if (IS_SPECIAL_V(c))
829             {
830               quotation= true;
831             }
832             else if (IS_REQ_ESCAPING(c))
833             {
834               cmd_len++;
835 #ifdef __WIN__
836               quotation= true;
837 #endif
838             }
839           }
840           break;
841         }
842       }
843       /* Perhaps we need to quote the entire argument or its right part: */
844       if (quotation)
845       {
846         cmd_len += 2;
847       }
848     }
849     extra = true;
850     cmd_len += strlen(WSREP_SST_OPT_MYSQLD);
851     /*
852       Add the separating spaces between arguments,
853       and one additional space before "--mysqld-args":
854     */
855     cmd_len += orig_argc;
856   }
857   *extra_args= extra;
858   return cmd_len;
859 }
860 
copy_orig_argv(char * cmd_str)861 static void copy_orig_argv (char* cmd_str)
862 {
863   /*
864      If mysqld was started with arguments, copy them all:
865   */
866   if (orig_argc > 1)
867   {
868     size_t n = strlen(WSREP_SST_OPT_MYSQLD);
869     *cmd_str++ = ' ';
870     memcpy(cmd_str, WSREP_SST_OPT_MYSQLD, n * sizeof(char));
871     cmd_str += n;
872     for (int i = 1; i < orig_argc; i++)
873     {
874       char* arg= orig_argv[i];
875       n = strlen(arg);
876       if (n == 0) continue;
877       *cmd_str++ = ' ';
878       bool quotation= false;
879       bool plain= true;
880       char *arg_scan= arg;
881       char c;
882       while ((c = *arg_scan++) != 0)
883       {
884         if (IS_SPECIAL(c))
885         {
886           quotation= true;
887         }
888         else if (IS_REQ_ESCAPING(c))
889         {
890           plain= false;
891 #ifdef __WIN__
892           quotation= true;
893 #endif
894         }
895         /*
896           If the equals symbol is encountered, then we need to separately
897           process the right side:
898         */
899         else if (c == '=')
900         {
901           /* Calculate length of the Left part of the argument: */
902           size_t m = (size_t) (arg_scan - arg) - 1;
903           if (m)
904           {
905             /* Perhaps we need to quote the left part of the argument: */
906             if (quotation)
907             {
908               *cmd_str++ = '"';
909             }
910             /*
911               If there were special characters inside, then we can use
912               the fast memcpy function:
913             */
914             if (plain)
915             {
916               memcpy(cmd_str, arg, m * sizeof(char));
917               cmd_str += m;
918               /* Left part of the argument has already been processed: */
919               n -= m;
920               arg += m;
921             }
922             /* Otherwise we need to prefix individual characters: */
923             else
924             {
925               n -= m;
926               while (m)
927               {
928                 c = *arg++;
929                 if (IS_REQ_ESCAPING(c))
930                 {
931 #ifdef __WIN__
932                   *cmd_str++ = c;
933 #else
934                   *cmd_str++ = '\\';
935 #endif
936                 }
937                 *cmd_str++ = c;
938                 m--;
939               }
940               /*
941                 Reset the plain string flag, since now the status for
942                 the right side of the expression will be saved here:
943               */
944               plain= true;
945             }
946             /* Perhaps we need to quote the left part of the argument: */
947             if (quotation)
948             {
949               *cmd_str++ = '"';
950               /*
951                 Reset the quotation flag, since now the status for
952                 the right side of the expression will be saved here:
953               */
954               quotation= false;
955             }
956           }
957           /* Copy equals symbol: */
958           *cmd_str++ = '=';
959           arg++;
960           n--;
961           /* Let's deal with the left side of the expression: */
962           while ((c = *arg_scan++) != 0)
963           {
964             if (IS_SPECIAL_V(c))
965             {
966               quotation= true;
967             }
968             else if (IS_REQ_ESCAPING(c))
969             {
970               plain= false;
971 #ifdef __WIN__
972               quotation= true;
973 #endif
974             }
975           }
976           break;
977         }
978       }
979       if (n)
980       {
981         /* Perhaps we need to quote the entire argument or its right part: */
982         if (quotation)
983         {
984           *cmd_str++ = '"';
985         }
986         /*
987           If there were no special characters inside, then we can use
988           the fast memcpy function:
989         */
990         if (plain)
991         {
992           memcpy(cmd_str, arg, n * sizeof(char));
993           cmd_str += n;
994         }
995         /* Otherwise we need to prefix individual characters: */
996         else
997         {
998           while ((c = *arg++) != 0)
999           {
1000             if (IS_REQ_ESCAPING(c))
1001             {
1002 #ifdef __WIN__
1003               *cmd_str++ = c;
1004 #else
1005               *cmd_str++ = '\\';
1006 #endif
1007             }
1008             *cmd_str++ = c;
1009           }
1010         }
1011         /* Perhaps we need to quote the entire argument or its right part: */
1012         if (quotation)
1013         {
1014           *cmd_str++ = '"';
1015         }
1016       }
1017     }
1018     /*
1019       Add a terminating null character (not counted in the length,
1020       since we've overwritten the original null character which
1021       was previously added by snprintf:
1022     */
1023     *cmd_str = 0;
1024   }
1025 }
1026 
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)1027 static ssize_t sst_prepare_other (const char*  method,
1028                                   const char*  sst_auth,
1029                                   const char*  addr_in,
1030                                   const char** addr_out)
1031 {
1032   bool extra_args;
1033   size_t const cmd_len= estimate_cmd_len(&extra_args);
1034   wsp::string cmd_str(cmd_len);
1035 
1036   if (!cmd_str())
1037   {
1038     WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %zd bytes",
1039                 cmd_len);
1040     return -ENOMEM;
1041   }
1042 
1043   char* binlog_opt_val= NULL;
1044   char* binlog_index_opt_val= NULL;
1045 
1046   int ret;
1047   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1048   {
1049     WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
1050                 ret);
1051     return ret;
1052   }
1053 
1054   if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
1055   {
1056     WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
1057                 ret);
1058     if (binlog_opt_val) my_free(binlog_opt_val);
1059     return ret;
1060   }
1061 
1062   make_wsrep_defaults_file();
1063 
1064   ret= snprintf (cmd_str(), cmd_len,
1065                  "wsrep_sst_%s "
1066                  WSREP_SST_OPT_ROLE " 'joiner' "
1067                  WSREP_SST_OPT_ADDR " '%s' "
1068                  WSREP_SST_OPT_DATA " '%s' "
1069                  "%s"
1070                  WSREP_SST_OPT_PARENT " '%d'"
1071                  "%s"
1072                  "%s",
1073                  method, addr_in, mysql_real_data_home,
1074                  wsrep_defaults_file,
1075                  (int)getpid(),
1076                  binlog_opt_val, binlog_index_opt_val);
1077 
1078   my_free(binlog_opt_val);
1079   my_free(binlog_index_opt_val);
1080 
1081   if (ret < 0 || size_t(ret) >= cmd_len)
1082   {
1083     WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
1084     return (ret < 0 ? ret : -EMSGSIZE);
1085   }
1086 
1087   if (extra_args)
1088     copy_orig_argv(cmd_str() + ret);
1089 
1090   wsp::env env(NULL);
1091   if (env.error())
1092   {
1093     WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
1094     return -env.error();
1095   }
1096 
1097   if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth)))
1098   {
1099     WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
1100     return ret;
1101   }
1102 
1103   if (data_home_dir)
1104   {
1105     if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1106     {
1107       WSREP_ERROR("sst_prepare_other(): appending data "
1108                   "directory failed: %d", ret);
1109       return ret;
1110     }
1111   }
1112 
1113   pthread_t tmp, monitor;
1114   sst_thread_arg arg(cmd_str(), env());
1115 
1116   mysql_mutex_lock (&arg.lock);
1117 
1118   ret = mysql_thread_create (key_wsrep_sst_joiner_monitor, &monitor, NULL, wsrep_sst_joiner_monitor_thread, NULL);
1119 
1120   if (ret)
1121   {
1122     WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1123                 ret, strerror(ret));
1124     return -ret;
1125   }
1126 
1127   sst_joiner_completed= false;
1128 
1129   ret= mysql_thread_create (key_wsrep_sst_joiner, &tmp, NULL, sst_joiner_thread, &arg);
1130 
1131   if (ret)
1132   {
1133     WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1134                 ret, strerror(ret));
1135 
1136     pthread_detach(monitor);
1137     return -ret;
1138   }
1139 
1140   mysql_cond_wait (&arg.cond, &arg.lock);
1141 
1142   *addr_out= arg.ret_str;
1143 
1144   if (!arg.err)
1145     ret= strlen(*addr_out);
1146   else
1147   {
1148     assert (arg.err < 0);
1149     ret= arg.err;
1150   }
1151 
1152   pthread_detach (tmp);
1153   pthread_detach (monitor);
1154 
1155   return ret;
1156 }
1157 
1158 extern uint  mysqld_port;
1159 
1160 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)1161 static ssize_t sst_prepare_mysqldump (const char*  addr_in,
1162                                       const char** addr_out)
1163 {
1164   ssize_t ret= strlen (addr_in);
1165 
1166   if (!strrchr(addr_in, ':'))
1167   {
1168     ssize_t s= ret + 7;
1169     char* tmp= (char*) malloc (s);
1170 
1171     if (tmp)
1172     {
1173       ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
1174 
1175       if (ret > 0 && ret < s)
1176       {
1177         *addr_out= tmp;
1178         return ret;
1179       }
1180       if (ret > 0) /* buffer too short */ ret= -EMSGSIZE;
1181       free (tmp);
1182     }
1183     else {
1184       ret= -ENOMEM;
1185     }
1186 
1187     WSREP_ERROR ("Could not prepare state transfer request: "
1188                  "adding default port failed: %zd.", ret);
1189   }
1190   else {
1191     *addr_out= addr_in;
1192   }
1193 
1194   pthread_t monitor;
1195   ret = mysql_thread_create (key_wsrep_sst_joiner_monitor, &monitor, NULL, wsrep_sst_joiner_monitor_thread, NULL);
1196 
1197   if (ret)
1198   {
1199     WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1200                 ret, strerror(ret));
1201     return -ret;
1202   }
1203 
1204   sst_joiner_completed= false;
1205   pthread_detach (monitor);
1206 
1207   return ret;
1208 }
1209 
wsrep_sst_prepare()1210 std::string wsrep_sst_prepare()
1211 {
1212   const ssize_t ip_max= 256;
1213   char ip_buf[ip_max];
1214   const char* addr_in=  NULL;
1215   const char* addr_out= NULL;
1216   const char* method;
1217 
1218   if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
1219   {
1220     return WSREP_STATE_TRANSFER_TRIVIAL;
1221   }
1222 
1223   /*
1224     Figure out SST receive address. Common for all SST methods.
1225   */
1226   // Attempt 1: wsrep_sst_receive_address
1227   if (wsrep_sst_receive_address &&
1228       strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
1229   {
1230     addr_in= wsrep_sst_receive_address;
1231   }
1232 
1233   //Attempt 2: wsrep_node_address
1234   else if (wsrep_node_address && strlen(wsrep_node_address))
1235   {
1236     wsp::Address addr(wsrep_node_address);
1237 
1238     if (!addr.is_valid())
1239     {
1240       WSREP_ERROR("Could not parse wsrep_node_address : %s",
1241                   wsrep_node_address);
1242       throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable");
1243     }
1244     memcpy(ip_buf, addr.get_address(), addr.get_address_len());
1245     addr_in= ip_buf;
1246   }
1247   // Attempt 3: Try to get the IP from the list of available interfaces.
1248   else
1249   {
1250     ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
1251 
1252     if (ret && ret < ip_max)
1253     {
1254       addr_in= ip_buf;
1255     }
1256     else
1257     {
1258       WSREP_ERROR("Failed to guess address to accept state transfer. "
1259                   "wsrep_sst_receive_address must be set manually.");
1260       throw wsrep::runtime_error("Could not prepare state transfer request");
1261     }
1262   }
1263 
1264   ssize_t addr_len= -ENOSYS;
1265   method = wsrep_sst_method;
1266   if (!strcmp(method, WSREP_SST_MYSQLDUMP))
1267   {
1268     addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
1269     if (addr_len < 0)
1270     {
1271       throw wsrep::runtime_error("Could not prepare mysqldimp address");
1272     }
1273   }
1274   else
1275   {
1276     /*! A heuristic workaround until we learn how to stop and start engines */
1277     if (Wsrep_server_state::instance().is_initialized() &&
1278         Wsrep_server_state::instance().state() == Wsrep_server_state::s_joiner)
1279     {
1280       if (!strcmp(method, WSREP_SST_XTRABACKUP) ||
1281           !strcmp(method, WSREP_SST_XTRABACKUPV2))
1282       {
1283          WSREP_WARN("The %s SST method is deprecated, so it is automatically "
1284                     "replaced by %s", method, WSREP_SST_MARIABACKUP);
1285          method = WSREP_SST_MARIABACKUP;
1286       }
1287       // we already did SST at initializaiton, now engines are running
1288       // sql_print_information() is here because the message is too long
1289       // for WSREP_INFO.
1290       sql_print_information ("WSREP: "
1291                  "You have configured '%s' state snapshot transfer method "
1292                  "which cannot be performed on a running server. "
1293                  "Wsrep provider won't be able to fall back to it "
1294                  "if other means of state transfer are unavailable. "
1295                  "In that case you will need to restart the server.",
1296                  method);
1297       return "";
1298     }
1299 
1300     addr_len = sst_prepare_other (method, sst_auth_real,
1301                                   addr_in, &addr_out);
1302     if (addr_len < 0)
1303     {
1304       WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
1305                    method);
1306       throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable");
1307     }
1308   }
1309 
1310   std::string ret;
1311   ret += method;
1312   ret.push_back('\0');
1313   ret += addr_out;
1314 
1315   const char* method_ptr(ret.data());
1316   const char* addr_ptr(ret.data() + strlen(method_ptr) + 1);
1317   WSREP_DEBUG("Prepared SST request: %s|%s", method_ptr, addr_ptr);
1318 
1319   if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
1320 
1321   return ret;
1322 }
1323 
1324 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)1325 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
1326 {
1327   int ret= 0;
1328 
1329   for (int tries=1; tries <= max_tries; tries++)
1330   {
1331     wsp::process proc (cmd_str, "r", env);
1332 
1333     if (NULL != proc.pipe())
1334     {
1335       proc.wait();
1336     }
1337 
1338     if ((ret= proc.error()))
1339     {
1340       WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
1341                   tries, max_tries, proc.cmd(), ret, strerror(ret));
1342       sleep (1);
1343     }
1344     else
1345     {
1346       WSREP_DEBUG("SST script successfully completed.");
1347       break;
1348     }
1349   }
1350 
1351   return -ret;
1352 }
1353 
sst_reject_queries(my_bool close_conn)1354 static void sst_reject_queries(my_bool close_conn)
1355 {
1356   WSREP_INFO("Rejecting client queries for the duration of SST.");
1357   if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
1358 }
1359 
sst_donate_mysqldump(const char * addr,const wsrep::gtid & gtid,bool bypass,char ** env)1360 static int sst_donate_mysqldump (const char*         addr,
1361                                  const wsrep::gtid&  gtid,
1362                                  bool                bypass,
1363                                  char**              env) // carries auth info
1364 {
1365   char host[256];
1366   wsp::Address address(addr);
1367   if (!address.is_valid())
1368   {
1369     WSREP_ERROR("Could not parse SST address : %s", addr);
1370     return 0;
1371   }
1372   memcpy(host, address.get_address(), address.get_address_len());
1373   int port= address.get_port();
1374   bool extra_args;
1375   size_t const cmd_len= estimate_cmd_len(&extra_args);
1376   wsp::string cmd_str(cmd_len);
1377 
1378   if (!cmd_str())
1379   {
1380     WSREP_ERROR("sst_donate_mysqldump(): "
1381                 "could not allocate cmd buffer of %zd bytes", cmd_len);
1382     return -ENOMEM;
1383   }
1384 
1385   /*
1386     we enable new client connections so that mysqldump donation can connect in,
1387     but we reject local connections from modifyingcdata during SST, to keep
1388     data intact
1389   */
1390   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
1391 
1392   make_wsrep_defaults_file();
1393 
1394   std::ostringstream uuid_oss;
1395   uuid_oss << gtid.id();
1396   int ret= snprintf (cmd_str(), cmd_len,
1397                      "wsrep_sst_mysqldump "
1398                      WSREP_SST_OPT_ADDR " '%s' "
1399                      WSREP_SST_OPT_PORT " '%u' "
1400                      WSREP_SST_OPT_LPORT " '%u' "
1401                      WSREP_SST_OPT_SOCKET " '%s' "
1402                      "%s"
1403                      WSREP_SST_OPT_GTID " '%s:%lld,%d-%d-%llu' "
1404                      WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1405                      "%s",
1406                      addr, port, mysqld_port, mysqld_unix_port,
1407                      wsrep_defaults_file,
1408                      uuid_oss.str().c_str(), gtid.seqno().get(),
1409                      wsrep_gtid_server.domain_id, wsrep_gtid_server.server_id,
1410                      wsrep_gtid_server.seqno(),
1411                      wsrep_gtid_server.domain_id,
1412                      bypass ? " " WSREP_SST_OPT_BYPASS : "");
1413 
1414   if (ret < 0 || size_t(ret) >= cmd_len)
1415   {
1416     WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
1417     return (ret < 0 ? ret : -EMSGSIZE);
1418   }
1419 
1420   if (extra_args)
1421     copy_orig_argv(cmd_str() + ret);
1422 
1423   WSREP_DEBUG("Running: '%s'", cmd_str());
1424 
1425   ret= sst_run_shell (cmd_str(), env, 3);
1426 
1427   wsrep::gtid sst_sent_gtid(ret == 0 ?
1428                             gtid :
1429                             wsrep::gtid(gtid.id(),
1430                                         wsrep::seqno::undefined()));
1431   Wsrep_server_state::instance().sst_sent(sst_sent_gtid, ret);
1432 
1433   return ret;
1434 }
1435 
1436 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
1437 
1438 /*
1439   Create a file under data directory.
1440 */
sst_create_file(const char * name,const char * content)1441 static int sst_create_file(const char *name, const char *content)
1442 {
1443   int err= 0;
1444   char *real_name;
1445   char *tmp_name;
1446   ssize_t len;
1447   FILE *file;
1448 
1449   len= strlen(mysql_real_data_home) + strlen(name) + 2;
1450   real_name= (char *) alloca(len);
1451 
1452   snprintf(real_name, (size_t) len, "%s/%s", mysql_real_data_home, name);
1453 
1454   tmp_name= (char *) alloca(len + 4);
1455   snprintf(tmp_name, (size_t) len + 4, "%s.tmp", real_name);
1456 
1457   file= fopen(tmp_name, "w+");
1458 
1459   if (0 == file)
1460   {
1461     err= errno;
1462     WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err, strerror(err));
1463   }
1464   else
1465   {
1466     // Write the specified content into the file.
1467     if (content != NULL)
1468     {
1469       fprintf(file, "%s\n", content);
1470       fsync(fileno(file));
1471     }
1472 
1473     fclose(file);
1474 
1475     if (rename(tmp_name, real_name) == -1)
1476     {
1477       err= errno;
1478       WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", tmp_name,
1479                   real_name, err, strerror(err));
1480     }
1481   }
1482 
1483   return err;
1484 }
1485 
run_sql_command(THD * thd,const char * query)1486 static int run_sql_command(THD *thd, const char *query)
1487 {
1488   thd->set_query((char *)query, strlen(query));
1489 
1490   Parser_state ps;
1491   if (ps.init(thd, thd->query(), thd->query_length()))
1492   {
1493     WSREP_ERROR("SST query: %s failed", query);
1494     return -1;
1495   }
1496 
1497   mysql_parse(thd, thd->query(), thd->query_length(), &ps, FALSE, FALSE);
1498   if (thd->is_error())
1499   {
1500     int const err= thd->get_stmt_da()->sql_errno();
1501     WSREP_WARN ("Error executing '%s': %d (%s)%s",
1502                 query, err, thd->get_stmt_da()->message(),
1503                 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
1504                 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
1505     thd->clear_error();
1506     return -1;
1507   }
1508   return 0;
1509 }
1510 
1511 
sst_flush_tables(THD * thd)1512 static int sst_flush_tables(THD* thd)
1513 {
1514   WSREP_INFO("Flushing tables for SST...");
1515 
1516   int err= 0;
1517   int not_used;
1518   /*
1519     Files created to notify the SST script about the outcome of table flush
1520     operation.
1521   */
1522   const char *flush_success= "tables_flushed";
1523   const char *flush_error= "sst_error";
1524 
1525   CHARSET_INFO *current_charset= thd->variables.character_set_client;
1526 
1527   if (!is_supported_parser_charset(current_charset))
1528   {
1529       /* Do not use non-supported parser character sets */
1530       WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1531       thd->variables.character_set_client= &my_charset_latin1;
1532       WSREP_WARN("For SST temporally setting character set to : %s",
1533                  my_charset_latin1.csname);
1534   }
1535 
1536   if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
1537   {
1538     err= -1;
1539   }
1540   else
1541   {
1542     /*
1543       Make sure logs are flushed after global read lock acquired. In case
1544       reload fails, we must also release the acquired FTWRL.
1545     */
1546     if (reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
1547                              (TABLE_LIST*) 0, &not_used))
1548     {
1549       thd->global_read_lock.unlock_global_read_lock(thd);
1550       err= -1;
1551     }
1552   }
1553 
1554   thd->variables.character_set_client= current_charset;
1555 
1556   if (err)
1557   {
1558     WSREP_ERROR("Failed to flush and lock tables");
1559 
1560     /*
1561       The SST must be aborted as the flush tables failed. Notify this to SST
1562       script by creating the error file.
1563     */
1564     int tmp;
1565     if ((tmp= sst_create_file(flush_error, NULL))) {
1566       err= tmp;
1567     }
1568   }
1569   else
1570   {
1571     WSREP_INFO("Tables flushed.");
1572     /*
1573       Tables have been flushed. Create a file with cluster state ID and
1574       wsrep_gtid_domain_id.
1575     */
1576     char content[100];
1577     snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid,
1578              (long long)wsrep_locked_seqno, wsrep_gtid_server.domain_id);
1579     err= sst_create_file(flush_success, content);
1580 
1581     const char base_name[]= "tables_flushed";
1582     ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
1583     char *real_name= (char*) malloc(full_len);
1584     sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
1585     char *tmp_name= (char*) malloc(full_len + 4);
1586     sprintf(tmp_name, "%s.tmp", real_name);
1587 
1588     FILE* file= fopen(tmp_name, "w+");
1589     if (0 == file)
1590     {
1591       err= errno;
1592       WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
1593     }
1594     else
1595     {
1596       Wsrep_server_state& server_state= Wsrep_server_state::instance();
1597       std::ostringstream uuid_oss;
1598 
1599       uuid_oss << server_state.current_view().state_id().id();
1600 
1601       fprintf(file, "%s:%lld %u\n",
1602               uuid_oss.str().c_str(), server_state.pause_seqno().get(),
1603               wsrep_gtid_server.domain_id);
1604       fsync(fileno(file));
1605       fclose(file);
1606       if (rename(tmp_name, real_name) == -1)
1607       {
1608         err= errno;
1609         WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
1610                      tmp_name, real_name, err,strerror(err));
1611       }
1612     }
1613     free(real_name);
1614     free(tmp_name);
1615   }
1616 
1617   return err;
1618 }
1619 
1620 
sst_disallow_writes(THD * thd,bool yes)1621 static void sst_disallow_writes (THD* thd, bool yes)
1622 {
1623   char query_str[64]= { 0, };
1624   ssize_t const query_max= sizeof(query_str) - 1;
1625   CHARSET_INFO *current_charset;
1626 
1627   current_charset= thd->variables.character_set_client;
1628 
1629   if (!is_supported_parser_charset(current_charset))
1630   {
1631       /* Do not use non-supported parser character sets */
1632       WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1633       thd->variables.character_set_client= &my_charset_latin1;
1634       WSREP_WARN("For SST temporally setting character set to : %s",
1635                  my_charset_latin1.csname);
1636   }
1637 
1638   snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
1639             yes ? 1 : 0);
1640 
1641   if (run_sql_command(thd, query_str))
1642   {
1643     WSREP_ERROR("Failed to disallow InnoDB writes");
1644   }
1645   thd->variables.character_set_client= current_charset;
1646 }
1647 
sst_donor_thread(void * a)1648 static void* sst_donor_thread (void* a)
1649 {
1650   sst_thread_arg* arg= (sst_thread_arg*)a;
1651 
1652   WSREP_INFO("Running: '%s'", arg->cmd);
1653 
1654   int  err= 1;
1655   bool locked= false;
1656 
1657   const char*  out= NULL;
1658   const size_t out_len= 128;
1659   char         out_buf[out_len];
1660 
1661   wsrep_uuid_t  ret_uuid= WSREP_UUID_UNDEFINED;
1662   // seqno of complete SST
1663   wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
1664 
1665   // We turn off wsrep_on for this THD so that it can
1666   // operate with wsrep_ready == OFF
1667   // We also set this SST thread THD as system thread
1668   wsp::thd thd(FALSE, true);
1669   wsp::process proc(arg->cmd, "r", arg->env);
1670 
1671   err= -proc.error();
1672 
1673 /* Inform server about SST script startup and release TO isolation */
1674   mysql_mutex_lock   (&arg->lock);
1675   arg->err= -err;
1676   mysql_cond_signal  (&arg->cond);
1677   mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
1678 
1679   if (proc.pipe() && !err)
1680   {
1681 wait_signal:
1682     out= my_fgets (out_buf, out_len, proc.pipe());
1683 
1684     if (out)
1685     {
1686       const char magic_flush[]= "flush tables";
1687       const char magic_cont[]= "continue";
1688       const char magic_done[]= "done";
1689 
1690       if (!strcasecmp (out, magic_flush))
1691       {
1692         err= sst_flush_tables (thd.ptr);
1693         if (!err)
1694         {
1695           sst_disallow_writes (thd.ptr, true);
1696           /*
1697             Lets also keep statements that modify binary logs (like RESET LOGS,
1698             RESET MASTER) from proceeding until the files have been transferred
1699             to the joiner node.
1700           */
1701           if (mysql_bin_log.is_open())
1702           {
1703             mysql_mutex_lock(mysql_bin_log.get_log_lock());
1704           }
1705 
1706           locked= true;
1707           goto wait_signal;
1708         }
1709       }
1710       else if (!strcasecmp (out, magic_cont))
1711       {
1712         if (locked)
1713         {
1714           if (mysql_bin_log.is_open())
1715           {
1716             mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1717             mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1718           }
1719           sst_disallow_writes (thd.ptr, false);
1720           thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr);
1721           locked= false;
1722         }
1723         err=  0;
1724         goto wait_signal;
1725       }
1726       else if (!strncasecmp (out, magic_done, strlen(magic_done)))
1727       {
1728         err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
1729                                   &ret_uuid, &ret_seqno);
1730       }
1731       else
1732       {
1733         WSREP_WARN("Received unknown signal: '%s'", out);
1734         proc.wait();
1735       }
1736     }
1737     else
1738     {
1739       WSREP_ERROR("Failed to read from: %s", proc.cmd());
1740       proc.wait();
1741     }
1742     if (!err && proc.error()) err= -proc.error();
1743   }
1744   else
1745   {
1746     WSREP_ERROR("Failed to execute: %s : %d (%s)",
1747                 proc.cmd(), err, strerror(err));
1748   }
1749 
1750   if (locked) // don't forget to unlock server before return
1751   {
1752     if (mysql_bin_log.is_open())
1753     {
1754       mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1755       mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1756     }
1757     sst_disallow_writes (thd.ptr, false);
1758     thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr);
1759   }
1760 
1761   wsrep::gtid gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)),
1762                    wsrep::seqno(err ? wsrep::seqno::undefined() :
1763                                 wsrep::seqno(ret_seqno)));
1764 
1765   Wsrep_server_state::instance().sst_sent(gtid, err);
1766 
1767   proc.wait();
1768 
1769   wsrep_donor_monitor_end();
1770   return nullptr;
1771 }
1772 
sst_donate_other(const char * method,const char * addr,const wsrep::gtid & gtid,bool bypass,char ** env)1773 static int sst_donate_other (const char*        method,
1774                              const char*        addr,
1775                              const wsrep::gtid& gtid,
1776                              bool               bypass,
1777                              char**             env) // carries auth info
1778 {
1779   bool extra_args;
1780   size_t const cmd_len= estimate_cmd_len(&extra_args);
1781   wsp::string cmd_str(cmd_len);
1782 
1783   if (!cmd_str())
1784   {
1785     WSREP_ERROR("sst_donate_other(): "
1786                 "could not allocate cmd buffer of %zd bytes", cmd_len);
1787     return -ENOMEM;
1788   }
1789 
1790   char* binlog_opt_val= NULL;
1791   char* binlog_index_opt_val= NULL;
1792 
1793   int ret;
1794   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1795   {
1796     WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1797     return ret;
1798   }
1799 
1800   if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
1801   {
1802     WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
1803                 ret);
1804     if (binlog_opt_val) my_free(binlog_opt_val);
1805     return ret;
1806   }
1807 
1808   make_wsrep_defaults_file();
1809 
1810   std::ostringstream uuid_oss;
1811   uuid_oss << gtid.id();
1812   ret= snprintf (cmd_str(), cmd_len,
1813                  "wsrep_sst_%s "
1814                  WSREP_SST_OPT_ROLE " 'donor' "
1815                  WSREP_SST_OPT_ADDR " '%s' "
1816                  WSREP_SST_OPT_LPORT " '%u' "
1817                  WSREP_SST_OPT_SOCKET " '%s' "
1818                  WSREP_SST_OPT_DATA " '%s' "
1819                  "%s"
1820                  WSREP_SST_OPT_GTID " '%s:%lld' "
1821                  WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1822                  "%s"
1823                  "%s"
1824                  "%s",
1825                  method, addr, mysqld_port, mysqld_unix_port,
1826                  mysql_real_data_home,
1827                  wsrep_defaults_file,
1828                  uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_server.domain_id,
1829                  binlog_opt_val, binlog_index_opt_val,
1830                  bypass ? " " WSREP_SST_OPT_BYPASS : "");
1831 
1832   my_free(binlog_opt_val);
1833   my_free(binlog_index_opt_val);
1834 
1835   if (ret < 0 || size_t(ret) >= cmd_len)
1836   {
1837     WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1838     return (ret < 0 ? ret : -EMSGSIZE);
1839   }
1840 
1841   if (extra_args)
1842     copy_orig_argv(cmd_str() + ret);
1843 
1844   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1845 
1846   pthread_t tmp;
1847   sst_thread_arg arg(cmd_str(), env);
1848 
1849   mysql_mutex_lock (&arg.lock);
1850 
1851   ret= mysql_thread_create (key_wsrep_sst_donor, &tmp, NULL, sst_donor_thread, &arg);
1852 
1853   if (ret)
1854   {
1855     WSREP_ERROR("sst_donate_other(): mysql_thread_create() failed: %d (%s)",
1856                 ret, strerror(ret));
1857     return ret;
1858   }
1859 
1860   mysql_cond_wait (&arg.cond, &arg.lock);
1861 
1862   WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1863   return arg.err;
1864 }
1865 
1866 /* return true if character can be a part of a filename */
filename_char(int const c)1867 static bool filename_char(int const c)
1868 {
1869   return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1870 }
1871 
1872 /* return true if character can be a part of an address string */
address_char(int const c)1873 static bool address_char(int const c)
1874 {
1875   return filename_char(c) ||
1876          (c == ':') || (c == '[') || (c == ']') || (c == '/');
1877 }
1878 
check_request_str(const char * const str,bool (* check)(int c))1879 static bool check_request_str(const char* const str,
1880                               bool (*check) (int c))
1881 {
1882   for (size_t i(0); str[i] != '\0'; ++i)
1883   {
1884     if (!check(str[i]))
1885     {
1886       WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1887                  str[i], str[i]);
1888       return true;
1889     }
1890   }
1891 
1892   return false;
1893 }
1894 
wsrep_sst_donate(const std::string & msg,const wsrep::gtid & current_gtid,const bool bypass)1895 int wsrep_sst_donate(const std::string& msg,
1896                      const wsrep::gtid& current_gtid,
1897                      const bool         bypass)
1898 {
1899   const char* method= msg.data();
1900   size_t method_len= strlen (method);
1901 
1902   if (check_request_str(method, filename_char))
1903   {
1904     WSREP_ERROR("Bad SST method name. SST canceled.");
1905     return WSREP_CB_FAILURE;
1906   }
1907 
1908   const char* data= method + method_len + 1;
1909 
1910   /* check for auth@addr separator */
1911   const char* addr= strrchr(data, '@');
1912   wsp::string remote_auth;
1913   if (addr)
1914   {
1915     remote_auth.set(strndup(data, addr - data));
1916     addr++;
1917   }
1918   else
1919   {
1920     // no auth part
1921     addr= data;
1922   }
1923 
1924   if (check_request_str(addr, address_char))
1925   {
1926     WSREP_ERROR("Bad SST address string. SST canceled.");
1927     return WSREP_CB_FAILURE;
1928   }
1929 
1930   wsp::env env(NULL);
1931   if (env.error())
1932   {
1933     WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1934     return WSREP_CB_FAILURE;
1935   }
1936 
1937   int ret;
1938   if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth_real)))
1939   {
1940     WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1941     return WSREP_CB_FAILURE;
1942   }
1943 
1944   if (remote_auth())
1945   {
1946     if ((ret= sst_append_env_var(env, WSREP_SST_REMOTE_AUTH_ENV,remote_auth())))
1947     {
1948       WSREP_ERROR("wsrep_sst_donate_cb(): appending remote auth env failed: "
1949                   "%d", ret);
1950       return WSREP_CB_FAILURE;
1951     }
1952   }
1953 
1954   if (data_home_dir)
1955   {
1956     if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1957     {
1958       WSREP_ERROR("wsrep_sst_donate_cb(): appending data "
1959                   "directory failed: %d", ret);
1960       return WSREP_CB_FAILURE;
1961     }
1962   }
1963 
1964   sst_donor_completed= false;
1965   pthread_t monitor;
1966 
1967   ret= mysql_thread_create (key_wsrep_sst_donor_monitor, &monitor, NULL, wsrep_sst_donor_monitor_thread, NULL);
1968 
1969   if (ret)
1970   {
1971     WSREP_ERROR("sst_donate: mysql_thread_create() failed: %d (%s)",
1972                 ret, strerror(ret));
1973     return WSREP_CB_FAILURE;
1974   }
1975 
1976   if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1977   {
1978     ret= sst_donate_mysqldump(addr, current_gtid, bypass, env());
1979   }
1980   else
1981   {
1982     ret= sst_donate_other(method, addr, current_gtid, bypass, env());
1983   }
1984 
1985   return (ret >= 0 ? 0 : 1);
1986 }
1987