1 /* Copyright 2008-2020 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #include "mariadb.h"
17 #include "wsrep_sst.h"
18 #include <inttypes.h>
19 #include <ctype.h>
20 #include <mysqld.h>
21 #include <m_ctype.h>
22 #include <strfunc.h>
23 #include <sql_class.h>
24 #include <set_var.h>
25 #include <sql_acl.h>
26 #include <sql_reload.h>
27 #include <sql_parse.h>
28 #include "wsrep_priv.h"
29 #include "wsrep_utils.h"
30 #include "wsrep_xid.h"
31 #include <cstdio>
32 #include <cstdlib>
33 
34 #include <my_service_manager.h>
35 
36 static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 +
37                                 sizeof(WSREP_SST_OPT_CONF) +
38                                 sizeof(WSREP_SST_OPT_CONF_SUFFIX) +
39                                 sizeof(WSREP_SST_OPT_CONF_EXTRA)] = {0};
40 
41 const char* wsrep_sst_method          = WSREP_SST_DEFAULT;
42 const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
43 const char* wsrep_sst_donor           = "";
44 const char* wsrep_sst_auth            = NULL;
45 
46 // container for real auth string
47 static const char* sst_auth_real      = NULL;
48 my_bool wsrep_sst_donor_rejects_queries = FALSE;
49 
wsrep_sst_method_check(sys_var * self,THD * thd,set_var * var)50 bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
51 {
52   if ((! var->save_result.string_value.str) ||
53       (var->save_result.string_value.length == 0 ))
54   {
55     my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
56              var->save_result.string_value.str ?
57              var->save_result.string_value.str : "NULL");
58     return 1;
59   }
60 
61   return 0;
62 }
63 
64 static const char* data_home_dir = NULL;
65 
wsrep_set_data_home_dir(const char * data_dir)66 void wsrep_set_data_home_dir(const char *data_dir)
67 {
68   data_home_dir= (data_dir && *data_dir) ? data_dir : NULL;
69 }
70 
make_wsrep_defaults_file()71 static void make_wsrep_defaults_file()
72 {
73   if (!wsrep_defaults_file[0])
74   {
75     char *ptr= wsrep_defaults_file;
76     char *end= ptr + sizeof(wsrep_defaults_file);
77     if (my_defaults_file)
78       ptr= strxnmov(ptr, end - ptr,
79                     WSREP_SST_OPT_CONF, " '", my_defaults_file, "' ", NULL);
80 
81     if (my_defaults_extra_file)
82       ptr= strxnmov(ptr, end - ptr,
83                     WSREP_SST_OPT_CONF_EXTRA, " '", my_defaults_extra_file, "' ", NULL);
84 
85     if (my_defaults_group_suffix)
86       ptr= strxnmov(ptr, end - ptr,
87                     WSREP_SST_OPT_CONF_SUFFIX, " '", my_defaults_group_suffix, "' ", NULL);
88   }
89 }
90 
91 
wsrep_sst_receive_address_check(sys_var * self,THD * thd,set_var * var)92 bool  wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
93 {
94   if ((! var->save_result.string_value.str) ||
95       (var->save_result.string_value.length > (FN_REFLEN - 1))) // safety
96   {
97     goto err;
98   }
99 
100   return 0;
101 
102 err:
103   my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
104            var->save_result.string_value.str ?
105            var->save_result.string_value.str : "NULL");
106   return 1;
107 }
108 
wsrep_sst_receive_address_update(sys_var * self,THD * thd,enum_var_type type)109 bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
110                                        enum_var_type type)
111 {
112     return 0;
113 }
114 
wsrep_sst_auth_check(sys_var * self,THD * thd,set_var * var)115 bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
116 {
117     return 0;
118 }
119 
sst_auth_real_set(const char * value)120 static bool sst_auth_real_set (const char* value)
121 {
122   const char* v= NULL;
123 
124   if (value)
125   {
126     v= my_strdup(value, MYF(0));
127   }
128   else                                          // its NULL
129   {
130     wsrep_sst_auth_free();
131     return 0;
132   }
133 
134   if (v)
135   {
136     // set sst_auth_real
137     if (sst_auth_real) { my_free((void *) sst_auth_real); }
138     sst_auth_real = v;
139 
140     // mask wsrep_sst_auth
141     if (strlen(sst_auth_real))
142     {
143       if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
144       wsrep_sst_auth= my_strdup(WSREP_SST_AUTH_MASK, MYF(0));
145     }
146     else
147     {
148       if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
149       wsrep_sst_auth= NULL;
150     }
151 
152     return 0;
153   }
154   return 1;
155 }
156 
wsrep_sst_auth_free()157 void wsrep_sst_auth_free()
158 {
159   if (wsrep_sst_auth) { my_free((void *) wsrep_sst_auth); }
160   if (sst_auth_real) { my_free((void *) sst_auth_real); }
161   wsrep_sst_auth= NULL;
162   sst_auth_real= NULL;
163 }
164 
wsrep_sst_auth_update(sys_var * self,THD * thd,enum_var_type type)165 bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
166 {
167   return sst_auth_real_set (wsrep_sst_auth);
168 }
169 
wsrep_sst_auth_init()170 void wsrep_sst_auth_init ()
171 {
172   sst_auth_real_set(wsrep_sst_auth);
173 }
174 
wsrep_sst_donor_check(sys_var * self,THD * thd,set_var * var)175 bool  wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
176 {
177   return 0;
178 }
179 
wsrep_sst_donor_update(sys_var * self,THD * thd,enum_var_type type)180 bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
181 {
182   return 0;
183 }
184 
wsrep_before_SE()185 bool wsrep_before_SE()
186 {
187   return (wsrep_provider != NULL
188           && strcmp (wsrep_provider,   WSREP_NONE)
189           && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
190           && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
191 }
192 
193 static bool            sst_complete = false;
194 static bool            sst_needed   = false;
195 
196 #define WSREP_EXTEND_TIMEOUT_INTERVAL 30
197 #define WSREP_TIMEDWAIT_SECONDS 10
198 
wsrep_sst_grab()199 void wsrep_sst_grab ()
200 {
201   WSREP_INFO("wsrep_sst_grab()");
202   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
203   sst_complete = false;
204   mysql_mutex_unlock (&LOCK_wsrep_sst);
205 }
206 
207 // Wait for end of SST
wsrep_sst_wait()208 bool wsrep_sst_wait ()
209 {
210   double total_wtime = 0;
211 
212   if (mysql_mutex_lock (&LOCK_wsrep_sst))
213     abort();
214 
215   WSREP_INFO("Waiting for SST to complete.");
216 
217   while (!sst_complete)
218   {
219     struct timespec wtime;
220     set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS);
221     time_t start_time = time(NULL);
222     mysql_cond_timedwait (&COND_wsrep_sst, &LOCK_wsrep_sst, &wtime);
223     time_t end_time = time(NULL);
224 
225     if (!sst_complete)
226     {
227       total_wtime += difftime(end_time, start_time);
228       WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime);
229       service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
230         "WSREP state transfer ongoing, current seqno: %" PRId64 " waited %f secs", local_seqno, total_wtime);
231     }
232   }
233 
234   if (local_seqno >= 0)
235   {
236     WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
237   }
238   else
239   {
240     WSREP_ERROR("SST failed: %d (%s)",
241                 int(-local_seqno), strerror(-local_seqno));
242   }
243 
244   mysql_mutex_unlock (&LOCK_wsrep_sst);
245 
246   return (local_seqno >= 0);
247 }
248 
249 // Signal end of SST
wsrep_sst_complete(const wsrep_uuid_t * sst_uuid,wsrep_seqno_t sst_seqno,bool needed)250 void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
251                          wsrep_seqno_t       sst_seqno,
252                          bool                needed)
253 {
254   if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
255   if (!sst_complete)
256   {
257     sst_complete = true;
258     sst_needed   = needed;
259     local_uuid   = *sst_uuid;
260     local_seqno  = sst_seqno;
261     mysql_cond_signal (&COND_wsrep_sst);
262   }
263   else
264   {
265     /* This can happen when called from wsrep_synced_cb().
266        At the moment there is no way to check there
267        if main thread is still waiting for signal,
268        so wsrep_sst_complete() is called from there
269        each time wsrep_ready changes from FALSE -> TRUE.
270     */
271     WSREP_DEBUG("Nobody is waiting for SST.");
272   }
273   mysql_mutex_unlock (&LOCK_wsrep_sst);
274 }
275 
276 /*
277   If wsrep provider is loaded, inform that the new state snapshot
278   has been received. Also update the local checkpoint.
279 
280   @param wsrep     [IN]               wsrep handle
281   @param uuid      [IN]               Initial state UUID
282   @param seqno     [IN]               Initial state sequence number
283   @param state     [IN]               Always NULL, also ignored by wsrep provider (?)
284   @param state_len [IN]               Always 0, also ignored by wsrep provider (?)
285   @param implicit  [IN]               Whether invoked implicitly due to SST
286                                       (true) or explicitly because if change
287                                       in wsrep_start_position by user (false).
288   @return false                       Success
289           true                        Error
290 
291 */
wsrep_sst_received(wsrep_t * const wsrep,const wsrep_uuid_t & uuid,const wsrep_seqno_t seqno,const void * const state,const size_t state_len,const bool implicit)292 bool wsrep_sst_received (wsrep_t*            const wsrep,
293                          const wsrep_uuid_t&       uuid,
294                          const wsrep_seqno_t       seqno,
295                          const void*         const state,
296                          const size_t              state_len,
297                          const bool                implicit)
298 {
299   /*
300     To keep track of whether the local uuid:seqno should be updated. Also, note
301     that local state (uuid:seqno) is updated/checkpointed only after we get an
302     OK from wsrep provider. By doing so, the values remain consistent across
303     the server & wsrep provider.
304   */
305   bool do_update= false;
306 
307   // Get the locally stored uuid:seqno.
308   if (wsrep_get_SE_checkpoint(local_uuid, local_seqno))
309   {
310     return true;
311   }
312 
313   if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
314       local_seqno < seqno || seqno < 0)
315   {
316     do_update= true;
317   }
318   else if (local_seqno > seqno)
319   {
320     WSREP_WARN("SST position can't be set in past. Requested: %lld, Current: "
321                " %lld.", (long long)seqno, (long long)local_seqno);
322     /*
323       If we are here because of SET command, simply return true (error) instead of
324       aborting.
325     */
326     if (implicit)
327     {
328       WSREP_WARN("Can't continue.");
329       unireg_abort(1);
330     }
331     else
332     {
333       return true;
334     }
335   }
336 
337 #ifdef GTID_SUPPORT
338   wsrep_init_sidno(uuid);
339 #endif /* GTID_SUPPORT */
340 
341   if (wsrep)
342   {
343     int const rcode(seqno < 0 ? seqno : 0);
344     wsrep_gtid_t const state_id= {uuid,
345       (rcode ? WSREP_SEQNO_UNDEFINED : seqno)};
346 
347     wsrep_status_t ret= wsrep->sst_received(wsrep, &state_id, state,
348                                             state_len, rcode);
349 
350     if (ret != WSREP_OK)
351     {
352       return true;
353     }
354   }
355 
356   // Now is the good time to update the local state and checkpoint.
357   if (do_update)
358   {
359     if (wsrep_set_SE_checkpoint(uuid, seqno))
360     {
361       return true;
362     }
363 
364     local_uuid= uuid;
365     local_seqno= seqno;
366   }
367 
368   return false;
369 }
370 
371 // Let applier threads to continue
wsrep_sst_continue()372 bool wsrep_sst_continue ()
373 {
374   if (sst_needed)
375   {
376     WSREP_INFO("Signalling provider to continue.");
377     return wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0, true);
378   }
379   return false;
380 }
381 
382 struct sst_thread_arg
383 {
384   const char*     cmd;
385   char**          env;
386   char*           ret_str;
387   int             err;
388   mysql_mutex_t   lock;
389   mysql_cond_t    cond;
390 
sst_thread_argsst_thread_arg391   sst_thread_arg (const char* c, char** e)
392     : cmd(c), env(e), ret_str(0), err(-1)
393   {
394     mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
395     mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
396   }
397 
~sst_thread_argsst_thread_arg398   ~sst_thread_arg()
399   {
400     mysql_cond_destroy  (&cond);
401     mysql_mutex_unlock  (&lock);
402     mysql_mutex_destroy (&lock);
403   }
404 };
405 
sst_scan_uuid_seqno(const char * str,wsrep_uuid_t * uuid,wsrep_seqno_t * seqno)406 static int sst_scan_uuid_seqno (const char* str,
407                                 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
408 {
409   int offt = wsrep_uuid_scan (str, strlen(str), uuid);
410   errno= 0;                                     /* Reset the errno */
411   if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
412   {
413     *seqno = strtoll (str + offt + 1, NULL, 10);
414     if (*seqno != LLONG_MAX || errno != ERANGE)
415     {
416       return 0;
417     }
418   }
419 
420   WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
421   return EINVAL;
422 }
423 
424 // get rid of trailing \n
my_fgets(char * buf,size_t buf_len,FILE * stream)425 static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
426 {
427    char* ret= fgets (buf, buf_len, stream);
428 
429    if (ret)
430    {
431        size_t len = strlen(ret);
432        if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
433    }
434 
435    return ret;
436 }
437 
438 /*
439   Generate "name 'value'" string.
440 */
generate_name_value(const char * name,const char * value)441 static char* generate_name_value(const char* name, const char* value)
442 {
443   size_t name_len= strlen(name);
444   size_t value_len= strlen(value);
445   char* buf=
446     (char*) my_malloc((name_len + value_len + 5) * sizeof(char), MYF(0));
447   if (buf)
448   {
449     char* ref= buf;
450     *ref++ = ' ';
451     memcpy(ref, name, name_len * sizeof(char));
452     ref += name_len;
453     *ref++ = ' ';
454     *ref++ = '\'';
455     memcpy(ref, value, value_len * sizeof(char));
456     ref += value_len;
457     *ref++ = '\'';
458     *ref = 0;
459   }
460   return buf;
461 }
462 /*
463   Generate binlog option string for sst_donate_other(), sst_prepare_other().
464 
465   Returns zero on success, negative error code otherwise.
466 
467   String containing binlog name is stored in param ret if binlog is enabled
468   and GTID mode is on, otherwise empty string. Returned string should be
469   freed with my_free().
470  */
generate_binlog_opt_val(char ** ret)471 static int generate_binlog_opt_val(char** ret)
472 {
473   DBUG_ASSERT(ret);
474   *ret= NULL;
475   if (opt_bin_log)
476   {
477     assert(opt_bin_logname);
478     *ret= strcmp(opt_bin_logname, "0") ?
479       generate_name_value(WSREP_SST_OPT_BINLOG,
480                           opt_bin_logname) :
481       my_strdup("", MYF(0));
482   }
483   else
484   {
485     *ret= my_strdup("", MYF(0));
486   }
487   if (!*ret) return -ENOMEM;
488   return 0;
489 }
490 
generate_binlog_index_opt_val(char ** ret)491 static int generate_binlog_index_opt_val(char** ret)
492 {
493   DBUG_ASSERT(ret);
494   *ret= NULL;
495   if (opt_binlog_index_name) {
496     *ret= strcmp(opt_binlog_index_name, "0") ?
497       generate_name_value(WSREP_SST_OPT_BINLOG_INDEX,
498                           opt_binlog_index_name) :
499       my_strdup("", MYF(0));
500   }
501   else
502   {
503     *ret= my_strdup("", MYF(0));
504   }
505   if (!*ret) return -ENOMEM;
506   return 0;
507 }
508 
sst_joiner_thread(void * a)509 static void* sst_joiner_thread (void* a)
510 {
511   sst_thread_arg* arg= (sst_thread_arg*) a;
512   int err= 1;
513 
514   {
515     const char magic[] = "ready";
516     const size_t magic_len = sizeof(magic) - 1;
517     const size_t out_len = 512;
518     char out[out_len];
519 
520     WSREP_INFO("Running: '%s'", arg->cmd);
521 
522     wsp::process proc (arg->cmd, "r", arg->env);
523 
524     if (proc.pipe() && !proc.error())
525     {
526       const char* tmp= my_fgets (out, out_len, proc.pipe());
527 
528       if (!tmp || strlen(tmp) < (magic_len + 2) ||
529           strncasecmp (tmp, magic, magic_len))
530       {
531         WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
532                     magic, arg->cmd, tmp);
533         proc.wait();
534         if (proc.error()) err = proc.error();
535       }
536       else
537       {
538         err = 0;
539       }
540     }
541     else
542     {
543       err = proc.error();
544       WSREP_ERROR("Failed to execute: %s : %d (%s)",
545                   arg->cmd, err, strerror(err));
546     }
547 
548     // signal sst_prepare thread with ret code,
549     // it will go on sending SST request
550     mysql_mutex_lock   (&arg->lock);
551     if (!err)
552     {
553       arg->ret_str = strdup (out + magic_len + 1);
554       if (!arg->ret_str) err = ENOMEM;
555     }
556     arg->err = -err;
557     mysql_cond_signal  (&arg->cond);
558     mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
559 
560     if (err) return NULL; /* lp:808417 - return immediately, don't signal
561                            * initializer thread to ensure single thread of
562                            * shutdown. */
563 
564     wsrep_uuid_t  ret_uuid  = WSREP_UUID_UNDEFINED;
565     wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
566 
567     // in case of successfull receiver start, wait for SST completion/end
568     char* tmp = my_fgets (out, out_len, proc.pipe());
569 
570     proc.wait();
571     err= EINVAL;
572 
573     if (!tmp)
574     {
575       WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
576                   "joiner script.");
577       if (proc.error()) err = proc.error();
578     }
579     else
580     {
581       // Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any).
582       const char *pos= strchr(out, ' ');
583 
584       if (!pos) {
585         // There is no wsrep_gtid_domain_id (some older version SST script?).
586         err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
587 
588       } else {
589         // Scan state ID first followed by wsrep_gtid_domain_id.
590         unsigned long int domain_id;
591 
592         // Null-terminate the state-id.
593         out[pos - out]= 0;
594         err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
595 
596         if (err)
597         {
598           goto err;
599         }
600         else if (wsrep_gtid_mode)
601         {
602           errno= 0;                             /* Reset the errno */
603           domain_id= strtoul(pos + 1, NULL, 10);
604           err= errno;
605 
606           /* Check if we received a valid gtid_domain_id. */
607           if (err == EINVAL || err == ERANGE)
608           {
609             WSREP_ERROR("Failed to get donor wsrep_gtid_domain_id.");
610             err= EINVAL;
611             goto err;
612           } else {
613             wsrep_gtid_domain_id= (uint32) domain_id;
614           }
615         }
616       }
617     }
618 
619 err:
620 
621     if (err)
622     {
623       ret_uuid= WSREP_UUID_UNDEFINED;
624       ret_seqno= -err;
625     }
626 
627     // Tell initializer thread that SST is complete
628     wsrep_sst_complete (&ret_uuid, ret_seqno, true);
629   }
630 
631   return NULL;
632 }
633 
634 #define WSREP_SST_AUTH_ENV        "WSREP_SST_OPT_AUTH"
635 #define WSREP_SST_REMOTE_AUTH_ENV "WSREP_SST_OPT_REMOTE_AUTH"
636 #define DATA_HOME_DIR_ENV         "INNODB_DATA_HOME_DIR"
637 
sst_append_env_var(wsp::env & env,const char * const var,const char * const val)638 static int sst_append_env_var(wsp::env&   env,
639                               const char* const var,
640                               const char* const val)
641 {
642   int const env_str_size= strlen(var) + 1 /* = */
643                           + (val ? strlen(val) : 0) + 1 /* \0 */;
644 
645   wsp::string env_str(env_str_size); // for automatic cleanup on return
646   if (!env_str()) return -ENOMEM;
647 
648   int ret= snprintf(env_str(), env_str_size, "%s=%s", var, val ? val : "");
649 
650   if (ret < 0 || ret >= env_str_size)
651   {
652     WSREP_ERROR("sst_append_env_var(): snprintf(%s=%s) failed: %d",
653                 var, val, ret);
654     return (ret < 0 ? ret : -EMSGSIZE);
655   }
656 
657   env.append(env_str());
658   return -env.error();
659 }
660 
661 #ifdef __WIN__
662 /*
663   Space, single quote, ampersand, backquote, I/O redirection
664   characters, caret, all brackets, plus, exclamation and comma
665   characters require text to be enclosed in double quotes:
666 */
667 #define IS_SPECIAL(c) \
668   (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
669                  c ==  '>' || c == '<' || c == ';' || c == '^' || \
670                  c ==  '[' || c == ']' || c == '{' || c == '}' || \
671                  c ==  '(' || c == ')' || c == '+' || c == '!' || \
672                  c ==  ',')
673 /*
674   Inside values, equals character are interpreted as special
675   character and requires quotation:
676 */
677 #define IS_SPECIAL_V(c) (IS_SPECIAL(c) || c == '=')
678 /*
679   Double quotation mark and percent characters require escaping:
680 */
681 #define IS_REQ_ESCAPING(c) (c == '""' || c == '%')
682 #else
683 /*
684   Space, single quote, ampersand, backquote, and I/O redirection
685   characters require text to be enclosed in double quotes. The
686   semicolon is used to separate shell commands, so it must be
687   enclosed in double quotes as well:
688 */
689 #define IS_SPECIAL(c) \
690   (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \
691                  c ==  '>' || c == '<' || c == ';')
692 /*
693   Inside values, characters are interpreted as in parameter names:
694 */
695 #define IS_SPECIAL_V(c) IS_SPECIAL(c)
696 /*
697   Double quotation mark and backslash characters require
698   backslash prefixing, the dollar symbol is used to substitute
699   a variable value, therefore it also requires escaping:
700 */
701 #define IS_REQ_ESCAPING(c) (c == '"' || c == '\\' || c == '$')
702 #endif
703 
estimate_cmd_len(bool * extra_args)704 static size_t estimate_cmd_len (bool* extra_args)
705 {
706   /*
707     The length of the area reserved for the control parameters
708     of the SST script (excluding the copying of the original
709     mysqld arguments):
710   */
711   size_t cmd_len= 4096;
712   bool extra= false;
713   /*
714     If mysqld was started with arguments, add them all:
715   */
716   if (orig_argc > 1)
717   {
718     for (int i = 1; i < orig_argc; i++)
719     {
720       const char* arg= orig_argv[i];
721       size_t n= strlen(arg);
722       if (n == 0) continue;
723       cmd_len += n;
724       bool quotation= false;
725       char c;
726       while ((c = *arg++) != 0)
727       {
728         if (IS_SPECIAL(c))
729         {
730           quotation= true;
731         }
732         else if (IS_REQ_ESCAPING(c))
733         {
734           cmd_len++;
735 #ifdef __WIN__
736           quotation= true;
737 #endif
738         }
739         /*
740           If the equals symbol is encountered, then we need to separately
741           process the right side:
742         */
743         else if (c == '=')
744         {
745           /* Perhaps we need to quote the left part of the argument: */
746           if (quotation)
747           {
748             cmd_len += 2;
749             /*
750               Reset the quotation flag, since now the status for
751               the right side of the expression will be saved here:
752             */
753             quotation= false;
754           }
755           while ((c = *arg++) != 0)
756           {
757             if (IS_SPECIAL_V(c))
758             {
759               quotation= true;
760             }
761             else if (IS_REQ_ESCAPING(c))
762             {
763               cmd_len++;
764 #ifdef __WIN__
765               quotation= true;
766 #endif
767             }
768           }
769           break;
770         }
771       }
772       /* Perhaps we need to quote the entire argument or its right part: */
773       if (quotation)
774       {
775         cmd_len += 2;
776       }
777     }
778     extra = true;
779     cmd_len += strlen(WSREP_SST_OPT_MYSQLD);
780     /*
781       Add the separating spaces between arguments,
782       and one additional space before "--mysqld-args":
783     */
784     cmd_len += orig_argc;
785   }
786   *extra_args= extra;
787   return cmd_len;
788 }
789 
copy_orig_argv(char * cmd_str)790 static void copy_orig_argv (char* cmd_str)
791 {
792   /*
793      If mysqld was started with arguments, copy them all:
794   */
795   if (orig_argc > 1)
796   {
797     size_t n = strlen(WSREP_SST_OPT_MYSQLD);
798     *cmd_str++ = ' ';
799     memcpy(cmd_str, WSREP_SST_OPT_MYSQLD, n * sizeof(char));
800     cmd_str += n;
801     for (int i = 1; i < orig_argc; i++)
802     {
803       char* arg= orig_argv[i];
804       n = strlen(arg);
805       if (n == 0) continue;
806       *cmd_str++ = ' ';
807       bool quotation= false;
808       bool plain= true;
809       char *arg_scan= arg;
810       char c;
811       while ((c = *arg_scan++) != 0)
812       {
813         if (IS_SPECIAL(c))
814         {
815           quotation= true;
816         }
817         else if (IS_REQ_ESCAPING(c))
818         {
819           plain= false;
820 #ifdef __WIN__
821           quotation= true;
822 #endif
823         }
824         /*
825           If the equals symbol is encountered, then we need to separately
826           process the right side:
827         */
828         else if (c == '=')
829         {
830           /* Calculate length of the Left part of the argument: */
831           size_t m = (size_t) (arg_scan - arg) - 1;
832           if (m)
833           {
834             /* Perhaps we need to quote the left part of the argument: */
835             if (quotation)
836             {
837               *cmd_str++ = '"';
838             }
839             /*
840               If there were special characters inside, then we can use
841               the fast memcpy function:
842             */
843             if (plain)
844             {
845               memcpy(cmd_str, arg, m * sizeof(char));
846               cmd_str += m;
847               /* Left part of the argument has already been processed: */
848               n -= m;
849               arg += m;
850             }
851             /* Otherwise we need to prefix individual characters: */
852             else
853             {
854               n -= m;
855               while (m)
856               {
857                 c = *arg++;
858                 if (IS_REQ_ESCAPING(c))
859                 {
860 #ifdef __WIN__
861                   *cmd_str++ = c;
862 #else
863                   *cmd_str++ = '\\';
864 #endif
865                 }
866                 *cmd_str++ = c;
867                 m--;
868               }
869               /*
870                 Reset the plain string flag, since now the status for
871                 the right side of the expression will be saved here:
872               */
873               plain= true;
874             }
875             /* Perhaps we need to quote the left part of the argument: */
876             if (quotation)
877             {
878               *cmd_str++ = '"';
879               /*
880                 Reset the quotation flag, since now the status for
881                 the right side of the expression will be saved here:
882               */
883               quotation= false;
884             }
885           }
886           /* Copy equals symbol: */
887           *cmd_str++ = '=';
888           arg++;
889           n--;
890           /* Let's deal with the left side of the expression: */
891           while ((c = *arg_scan++) != 0)
892           {
893             if (IS_SPECIAL_V(c))
894             {
895               quotation= true;
896             }
897             else if (IS_REQ_ESCAPING(c))
898             {
899               plain= false;
900 #ifdef __WIN__
901               quotation= true;
902 #endif
903             }
904           }
905           break;
906         }
907       }
908       if (n)
909       {
910         /* Perhaps we need to quote the entire argument or its right part: */
911         if (quotation)
912         {
913           *cmd_str++ = '"';
914         }
915         /*
916           If there were no special characters inside, then we can use
917           the fast memcpy function:
918         */
919         if (plain)
920         {
921           memcpy(cmd_str, arg, n * sizeof(char));
922           cmd_str += n;
923         }
924         /* Otherwise we need to prefix individual characters: */
925         else
926         {
927           while ((c = *arg++) != 0)
928           {
929             if (IS_REQ_ESCAPING(c))
930             {
931 #ifdef __WIN__
932               *cmd_str++ = c;
933 #else
934               *cmd_str++ = '\\';
935 #endif
936             }
937             *cmd_str++ = c;
938           }
939         }
940         /* Perhaps we need to quote the entire argument or its right part: */
941         if (quotation)
942         {
943           *cmd_str++ = '"';
944         }
945       }
946     }
947     /*
948       Add a terminating null character (not counted in the length,
949       since we've overwritten the original null character which
950       was previously added by snprintf:
951     */
952     *cmd_str = 0;
953   }
954 }
955 
sst_prepare_other(const char * method,const char * sst_auth,const char * addr_in,const char ** addr_out)956 static ssize_t sst_prepare_other (const char*  method,
957                                   const char*  sst_auth,
958                                   const char*  addr_in,
959                                   const char** addr_out)
960 {
961   bool extra_args;
962   size_t const cmd_len= estimate_cmd_len(&extra_args);
963   wsp::string cmd_str(cmd_len);
964 
965   if (!cmd_str())
966   {
967     WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %zd bytes",
968                 cmd_len);
969     return -ENOMEM;
970   }
971 
972   char* binlog_opt_val= NULL;
973   char* binlog_index_opt_val= NULL;
974 
975   int ret;
976   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
977   {
978     WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
979                 ret);
980     return ret;
981   }
982 
983   if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
984   {
985     WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
986                 ret);
987     if (binlog_opt_val) my_free(binlog_opt_val);
988     return ret;
989   }
990 
991   make_wsrep_defaults_file();
992 
993   ret= snprintf (cmd_str(), cmd_len,
994                  "wsrep_sst_%s "
995                  WSREP_SST_OPT_ROLE " 'joiner' "
996                  WSREP_SST_OPT_ADDR " '%s' "
997                  WSREP_SST_OPT_DATA " '%s' "
998                  "%s"
999                  WSREP_SST_OPT_PARENT " '%d'"
1000                  "%s"
1001                  "%s",
1002                  method, addr_in, mysql_real_data_home,
1003                  wsrep_defaults_file,
1004                  (int)getpid(),
1005                  binlog_opt_val, binlog_index_opt_val);
1006 
1007   my_free(binlog_opt_val);
1008   my_free(binlog_index_opt_val);
1009 
1010   if (ret < 0 || size_t(ret) >= cmd_len)
1011   {
1012     WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
1013     return (ret < 0 ? ret : -EMSGSIZE);
1014   }
1015 
1016   if (extra_args)
1017     copy_orig_argv(cmd_str() + ret);
1018 
1019   wsp::env env(NULL);
1020   if (env.error())
1021   {
1022     WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
1023     return -env.error();
1024   }
1025 
1026   if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth)))
1027   {
1028     WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
1029     return ret;
1030   }
1031 
1032   if (data_home_dir)
1033   {
1034     if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1035     {
1036       WSREP_ERROR("sst_prepare_other(): appending data "
1037                   "directory failed: %d", ret);
1038       return ret;
1039     }
1040   }
1041 
1042   pthread_t tmp;
1043   sst_thread_arg arg(cmd_str(), env());
1044   mysql_mutex_lock (&arg.lock);
1045   ret = mysql_thread_create (key_wsrep_sst_joiner, &tmp, NULL, sst_joiner_thread, &arg);
1046   if (ret)
1047   {
1048     WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
1049                 ret, strerror(ret));
1050     return -ret;
1051   }
1052   mysql_cond_wait (&arg.cond, &arg.lock);
1053 
1054   *addr_out= arg.ret_str;
1055 
1056   if (!arg.err)
1057     ret = strlen(*addr_out);
1058   else
1059   {
1060     assert (arg.err < 0);
1061     ret = arg.err;
1062   }
1063 
1064   pthread_detach (tmp);
1065 
1066   return ret;
1067 }
1068 
1069 extern uint  mysqld_port;
1070 
1071 /*! Just tells donor where to send mysqldump */
sst_prepare_mysqldump(const char * addr_in,const char ** addr_out)1072 static ssize_t sst_prepare_mysqldump (const char*  addr_in,
1073                                       const char** addr_out)
1074 {
1075   ssize_t ret = strlen (addr_in);
1076 
1077   if (!strrchr(addr_in, ':'))
1078   {
1079     ssize_t s = ret + 7;
1080     char* tmp = (char*) malloc (s);
1081 
1082     if (tmp)
1083     {
1084       ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
1085 
1086       if (ret > 0 && ret < s)
1087       {
1088         *addr_out= tmp;
1089         return ret;
1090       }
1091       if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
1092       free (tmp);
1093     }
1094     else {
1095       ret= -ENOMEM;
1096     }
1097 
1098     WSREP_ERROR ("Could not prepare state transfer request: "
1099                  "adding default port failed: %zd.", ret);
1100   }
1101   else {
1102     *addr_out= addr_in;
1103   }
1104 
1105   return ret;
1106 }
1107 
1108 static bool SE_initialized = false;
1109 
wsrep_sst_prepare(void ** msg)1110 ssize_t wsrep_sst_prepare (void** msg)
1111 {
1112   const char* addr_in=  NULL;
1113   const char* addr_out= NULL;
1114   const char* method;
1115 
1116   if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
1117   {
1118     ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
1119     *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
1120     if (!msg)
1121     {
1122       WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
1123       unireg_abort(1);
1124     }
1125     return ret;
1126   }
1127 
1128   /*
1129     Figure out SST receive address. Common for all SST methods.
1130   */
1131   char ip_buf[256];
1132   const ssize_t ip_max= sizeof(ip_buf);
1133 
1134   // Attempt 1: wsrep_sst_receive_address
1135   if (wsrep_sst_receive_address &&
1136       strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
1137   {
1138     addr_in= wsrep_sst_receive_address;
1139   }
1140 
1141   //Attempt 2: wsrep_node_address
1142   else if (wsrep_node_address && strlen(wsrep_node_address))
1143   {
1144     wsp::Address addr(wsrep_node_address);
1145 
1146     if (!addr.is_valid())
1147     {
1148       WSREP_ERROR("Could not parse wsrep_node_address : %s",
1149                   wsrep_node_address);
1150       unireg_abort(1);
1151     }
1152     memcpy(ip_buf, addr.get_address(), addr.get_address_len());
1153     addr_in= ip_buf;
1154   }
1155   // Attempt 3: Try to get the IP from the list of available interfaces.
1156   else
1157   {
1158     ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
1159 
1160     if (ret && ret < ip_max)
1161     {
1162       addr_in= ip_buf;
1163     }
1164     else
1165     {
1166       WSREP_ERROR("Failed to guess address to accept state transfer. "
1167                   "wsrep_sst_receive_address must be set manually.");
1168       unireg_abort(1);
1169     }
1170   }
1171 
1172   ssize_t addr_len= -ENOSYS;
1173   method = wsrep_sst_method;
1174   if (!strcmp(method, WSREP_SST_MYSQLDUMP))
1175   {
1176     addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
1177     if (addr_len < 0) unireg_abort(1);
1178   }
1179   else
1180   {
1181     /*! A heuristic workaround until we learn how to stop and start engines */
1182     if (SE_initialized)
1183     {
1184       if (!strcmp(method, WSREP_SST_XTRABACKUP) ||
1185           !strcmp(method, WSREP_SST_XTRABACKUPV2))
1186       {
1187          WSREP_WARN("The %s SST method is deprecated, so it is automatically "
1188                     "replaced by %s", method, WSREP_SST_MARIABACKUP);
1189          method = WSREP_SST_MARIABACKUP;
1190       }
1191       // we already did SST at initializaiton, now engines are running
1192       // sql_print_information() is here because the message is too long
1193       // for WSREP_INFO.
1194       sql_print_information ("WSREP: "
1195                  "You have configured '%s' state snapshot transfer method "
1196                  "which cannot be performed on a running server. "
1197                  "Wsrep provider won't be able to fall back to it "
1198                  "if other means of state transfer are unavailable. "
1199                  "In that case you will need to restart the server.",
1200                  method);
1201       *msg = 0;
1202       return 0;
1203     }
1204 
1205     addr_len = sst_prepare_other (method, sst_auth_real,
1206                                   addr_in, &addr_out);
1207     if (addr_len < 0)
1208     {
1209       WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
1210                    method);
1211       unireg_abort(1);
1212     }
1213   }
1214 
1215   size_t const method_len(strlen(method));
1216   size_t const msg_len   (method_len + addr_len + 2 /* + auth_len + 1*/);
1217 
1218   *msg = malloc (msg_len);
1219   if (NULL != *msg) {
1220     char* const method_ptr(static_cast<char*>(*msg));
1221     strcpy (method_ptr, method);
1222     char* const addr_ptr(method_ptr + method_len + 1);
1223     strcpy (addr_ptr, addr_out);
1224 
1225     WSREP_DEBUG("Prepared SST request: %s|%s", method_ptr, addr_ptr);
1226   }
1227   else {
1228     WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
1229                 msg_len);
1230     unireg_abort(1);
1231   }
1232 
1233   if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
1234 
1235   return msg_len;
1236 }
1237 
1238 // helper method for donors
sst_run_shell(const char * cmd_str,char ** env,int max_tries)1239 static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
1240 {
1241   int ret = 0;
1242 
1243   for (int tries=1; tries <= max_tries; tries++)
1244   {
1245     wsp::process proc (cmd_str, "r", env);
1246 
1247     if (NULL != proc.pipe())
1248     {
1249       proc.wait();
1250     }
1251 
1252     if ((ret = proc.error()))
1253     {
1254       WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
1255                   tries, max_tries, proc.cmd(), ret, strerror(ret));
1256       sleep (1);
1257     }
1258     else
1259     {
1260       WSREP_DEBUG("SST script successfully completed.");
1261       break;
1262     }
1263   }
1264 
1265   return -ret;
1266 }
1267 
sst_reject_queries(my_bool close_conn)1268 static void sst_reject_queries(my_bool close_conn)
1269 {
1270     wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
1271     WSREP_INFO("Rejecting client queries for the duration of SST.");
1272     if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
1273 }
1274 
sst_donate_mysqldump(const char * addr,const wsrep_uuid_t * uuid,const char * uuid_str,wsrep_seqno_t seqno,bool bypass,char ** env)1275 static int sst_donate_mysqldump (const char*         addr,
1276                                  const wsrep_uuid_t* uuid,
1277                                  const char*         uuid_str,
1278                                  wsrep_seqno_t       seqno,
1279                                  bool                bypass,
1280                                  char**              env) // carries auth info
1281 {
1282   char host[256];
1283   wsp::Address address(addr);
1284   if (!address.is_valid())
1285   {
1286     WSREP_ERROR("Could not parse SST address : %s", addr);
1287     return 0;
1288   }
1289   memcpy(host, address.get_address(), address.get_address_len());
1290   int port= address.get_port();
1291   bool extra_args;
1292   size_t const cmd_len= estimate_cmd_len(&extra_args);
1293   wsp::string cmd_str(cmd_len);
1294 
1295   if (!cmd_str())
1296   {
1297     WSREP_ERROR("sst_donate_mysqldump(): "
1298                 "could not allocate cmd buffer of %zd bytes", cmd_len);
1299     return -ENOMEM;
1300   }
1301 
1302   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
1303 
1304   make_wsrep_defaults_file();
1305 
1306   int ret= snprintf (cmd_str(), cmd_len,
1307                      "wsrep_sst_mysqldump "
1308                      WSREP_SST_OPT_ADDR " '%s' "
1309                      WSREP_SST_OPT_PORT " '%d' "
1310                      WSREP_SST_OPT_LPORT " '%u' "
1311                      WSREP_SST_OPT_SOCKET " '%s' "
1312                      "%s"
1313                      WSREP_SST_OPT_GTID " '%s:%lld' "
1314                      WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1315                      "%s",
1316                      addr, port, mysqld_port, mysqld_unix_port,
1317                      wsrep_defaults_file, uuid_str,
1318                      (long long)seqno, wsrep_gtid_domain_id,
1319                      bypass ? " " WSREP_SST_OPT_BYPASS : "");
1320 
1321   if (ret < 0 || size_t(ret) >= cmd_len)
1322   {
1323     WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
1324     return (ret < 0 ? ret : -EMSGSIZE);
1325   }
1326 
1327   if (extra_args)
1328     copy_orig_argv(cmd_str() + ret);
1329 
1330   WSREP_DEBUG("Running: '%s'", cmd_str());
1331 
1332   ret= sst_run_shell (cmd_str(), env, 3);
1333 
1334   wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
1335 
1336   wsrep->sst_sent (wsrep, &state_id, ret);
1337 
1338   return ret;
1339 }
1340 
1341 wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
1342 
1343 
1344 /*
1345   Create a file under data directory.
1346 */
sst_create_file(const char * name,const char * content)1347 static int sst_create_file(const char *name, const char *content)
1348 {
1349   int err= 0;
1350   char *real_name;
1351   char *tmp_name;
1352   ssize_t len;
1353   FILE *file;
1354 
1355   len= strlen(mysql_real_data_home) + strlen(name) + 2;
1356   real_name= (char *) alloca(len);
1357 
1358   snprintf(real_name, (size_t) len, "%s/%s", mysql_real_data_home, name);
1359 
1360   tmp_name= (char *) alloca(len + 4);
1361   snprintf(tmp_name, (size_t) len + 4, "%s.tmp", real_name);
1362 
1363   file= fopen(tmp_name, "w+");
1364 
1365   if (0 == file)
1366   {
1367     err= errno;
1368     WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err, strerror(err));
1369   }
1370   else
1371   {
1372     // Write the specified content into the file.
1373     if (content != NULL)
1374     {
1375       fprintf(file, "%s\n", content);
1376       fsync(fileno(file));
1377     }
1378 
1379     fclose(file);
1380 
1381     if (rename(tmp_name, real_name) == -1)
1382     {
1383       err= errno;
1384       WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", tmp_name,
1385                   real_name, err, strerror(err));
1386     }
1387   }
1388 
1389   return err;
1390 }
1391 
1392 
run_sql_command(THD * thd,const char * query)1393 static int run_sql_command(THD *thd, const char *query)
1394 {
1395   thd->set_query((char *)query, strlen(query));
1396 
1397   Parser_state ps;
1398   if (ps.init(thd, thd->query(), thd->query_length()))
1399   {
1400     WSREP_ERROR("SST query: %s failed", query);
1401     return -1;
1402   }
1403 
1404   mysql_parse(thd, thd->query(), thd->query_length(), &ps, FALSE, FALSE);
1405   if (thd->is_error())
1406   {
1407     int const err= thd->get_stmt_da()->sql_errno();
1408     WSREP_WARN ("Error executing '%s': %d (%s)%s",
1409                 query, err, thd->get_stmt_da()->message(),
1410                 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
1411                 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
1412     thd->clear_error();
1413     return -1;
1414   }
1415   return 0;
1416 }
1417 
1418 
sst_flush_tables(THD * thd)1419 static int sst_flush_tables(THD* thd)
1420 {
1421   WSREP_INFO("Flushing tables for SST...");
1422 
1423   int err= 0;
1424   int not_used;
1425   /*
1426     Files created to notify the SST script about the outcome of table flush
1427     operation.
1428   */
1429   const char *flush_success= "tables_flushed";
1430   const char *flush_error= "sst_error";
1431 
1432   CHARSET_INFO *current_charset= thd->variables.character_set_client;
1433 
1434   if (!is_supported_parser_charset(current_charset))
1435   {
1436       /* Do not use non-supported parser character sets */
1437       WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1438       thd->variables.character_set_client = &my_charset_latin1;
1439       WSREP_WARN("For SST temporally setting character set to : %s",
1440               my_charset_latin1.csname);
1441   }
1442 
1443   if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
1444   {
1445     err= -1;
1446   }
1447   else
1448   {
1449     /*
1450       Make sure logs are flushed after global read lock acquired. In case
1451       reload fails, we must also release the acquired FTWRL.
1452     */
1453     if (reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
1454                              (TABLE_LIST*) 0, &not_used))
1455     {
1456       thd->global_read_lock.unlock_global_read_lock(thd);
1457       err= -1;
1458     }
1459   }
1460 
1461   thd->variables.character_set_client = current_charset;
1462 
1463   if (err)
1464   {
1465     WSREP_ERROR("Failed to flush and lock tables");
1466 
1467     /*
1468       The SST must be aborted as the flush tables failed. Notify this to SST
1469       script by creating the error file.
1470     */
1471     int tmp;
1472     if ((tmp= sst_create_file(flush_error, NULL))) {
1473       err= tmp;
1474     }
1475   }
1476   else
1477   {
1478     WSREP_INFO("Tables flushed.");
1479 
1480     /*
1481       Tables have been flushed. Create a file with cluster state ID and
1482       wsrep_gtid_domain_id.
1483     */
1484     char content[100];
1485     snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid,
1486              (long long)wsrep_locked_seqno, wsrep_gtid_domain_id);
1487     err= sst_create_file(flush_success, content);
1488   }
1489 
1490   return err;
1491 }
1492 
1493 
sst_disallow_writes(THD * thd,bool yes)1494 static void sst_disallow_writes (THD* thd, bool yes)
1495 {
1496   char query_str[64] = { 0, };
1497   ssize_t const query_max = sizeof(query_str) - 1;
1498   CHARSET_INFO *current_charset;
1499 
1500   current_charset = thd->variables.character_set_client;
1501 
1502   if (!is_supported_parser_charset(current_charset))
1503   {
1504       /* Do not use non-supported parser character sets */
1505       WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1506       thd->variables.character_set_client = &my_charset_latin1;
1507       WSREP_WARN("For SST temporally setting character set to : %s",
1508               my_charset_latin1.csname);
1509   }
1510 
1511   snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
1512             yes ? 1 : 0);
1513 
1514   if (run_sql_command(thd, query_str))
1515   {
1516     WSREP_ERROR("Failed to disallow InnoDB writes");
1517   }
1518   thd->variables.character_set_client = current_charset;
1519 }
1520 
sst_donor_thread(void * a)1521 static void* sst_donor_thread (void* a)
1522 {
1523   sst_thread_arg* arg= (sst_thread_arg*)a;
1524 
1525   WSREP_INFO("Running: '%s'", arg->cmd);
1526 
1527   int  err= 1;
1528   bool locked= false;
1529 
1530   const char*  out= NULL;
1531   const size_t out_len= 128;
1532   char         out_buf[out_len];
1533 
1534   wsrep_uuid_t  ret_uuid= WSREP_UUID_UNDEFINED;
1535   // seqno of complete SST
1536   wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
1537 
1538   // We turn off wsrep_on for this THD so that it can
1539   // operate with wsrep_ready == OFF
1540   // We also set this SST thread THD as system thread
1541   wsp::thd thd(FALSE, true);
1542   wsp::process proc(arg->cmd, "r", arg->env);
1543 
1544   err= proc.error();
1545 
1546 /* Inform server about SST script startup and release TO isolation */
1547   mysql_mutex_lock   (&arg->lock);
1548   arg->err = -err;
1549   mysql_cond_signal  (&arg->cond);
1550   mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
1551 
1552   if (proc.pipe() && !err)
1553   {
1554 wait_signal:
1555     out= my_fgets (out_buf, out_len, proc.pipe());
1556 
1557     if (out)
1558     {
1559       const char magic_flush[]= "flush tables";
1560       const char magic_cont[]= "continue";
1561       const char magic_done[]= "done";
1562 
1563       if (!strcasecmp (out, magic_flush))
1564       {
1565         err= sst_flush_tables (thd.ptr);
1566         if (!err)
1567         {
1568           sst_disallow_writes (thd.ptr, true);
1569           /*
1570             Lets also keep statements that modify binary logs (like RESET LOGS,
1571             RESET MASTER) from proceeding until the files have been transferred
1572             to the joiner node.
1573           */
1574           if (mysql_bin_log.is_open())
1575           {
1576             mysql_mutex_lock(mysql_bin_log.get_log_lock());
1577           }
1578 
1579           locked= true;
1580           goto wait_signal;
1581         }
1582       }
1583       else if (!strcasecmp (out, magic_cont))
1584       {
1585         if (locked)
1586         {
1587           if (mysql_bin_log.is_open())
1588           {
1589             mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1590             mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1591           }
1592           sst_disallow_writes (thd.ptr, false);
1593           thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1594           locked= false;
1595         }
1596         err=  0;
1597         goto wait_signal;
1598       }
1599       else if (!strncasecmp (out, magic_done, strlen(magic_done)))
1600       {
1601         err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
1602                                   &ret_uuid, &ret_seqno);
1603       }
1604       else
1605       {
1606         WSREP_WARN("Received unknown signal: '%s'", out);
1607       }
1608     }
1609     else
1610     {
1611       WSREP_ERROR("Failed to read from: %s", proc.cmd());
1612       proc.wait();
1613     }
1614     if (!err && proc.error()) err= proc.error();
1615   }
1616   else
1617   {
1618     WSREP_ERROR("Failed to execute: %s : %d (%s)",
1619                 proc.cmd(), err, strerror(err));
1620   }
1621 
1622   if (locked) // don't forget to unlock server before return
1623   {
1624     if (mysql_bin_log.is_open())
1625     {
1626       mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1627       mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1628     }
1629     sst_disallow_writes (thd.ptr, false);
1630     thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1631   }
1632 
1633   // signal to donor that SST is over
1634   struct wsrep_gtid const state_id = {
1635       ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
1636   };
1637   wsrep->sst_sent (wsrep, &state_id, -err);
1638   proc.wait();
1639 
1640   return NULL;
1641 }
1642 
1643 
1644 
sst_donate_other(const char * method,const char * addr,const char * uuid,wsrep_seqno_t seqno,bool bypass,char ** env)1645 static int sst_donate_other (const char*   method,
1646                              const char*   addr,
1647                              const char*   uuid,
1648                              wsrep_seqno_t seqno,
1649                              bool          bypass,
1650                              char**        env) // carries auth info
1651 {
1652   bool extra_args;
1653   size_t const cmd_len= estimate_cmd_len(&extra_args);
1654   wsp::string cmd_str(cmd_len);
1655 
1656   if (!cmd_str())
1657   {
1658     WSREP_ERROR("sst_donate_other(): "
1659                 "could not allocate cmd buffer of %zd bytes", cmd_len);
1660     return -ENOMEM;
1661   }
1662 
1663   char* binlog_opt_val= NULL;
1664   char* binlog_index_opt_val= NULL;
1665 
1666   int ret;
1667   if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1668   {
1669     WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1670     return ret;
1671   }
1672 
1673   if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val)))
1674   {
1675     WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d",
1676                 ret);
1677     if (binlog_opt_val) my_free(binlog_opt_val);
1678     return ret;
1679   }
1680 
1681   make_wsrep_defaults_file();
1682 
1683   ret= snprintf (cmd_str(), cmd_len,
1684                  "wsrep_sst_%s "
1685                  WSREP_SST_OPT_ROLE " 'donor' "
1686                  WSREP_SST_OPT_ADDR " '%s' "
1687                  WSREP_SST_OPT_LPORT " '%u' "
1688                  WSREP_SST_OPT_SOCKET " '%s' "
1689                  WSREP_SST_OPT_DATA " '%s' "
1690                  "%s"
1691                  WSREP_SST_OPT_GTID " '%s:%lld' "
1692                  WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
1693                  "%s"
1694                  "%s"
1695                  "%s",
1696                  method, addr, mysqld_port, mysqld_unix_port,
1697                  mysql_real_data_home,
1698                  wsrep_defaults_file,
1699                  uuid, (long long) seqno, wsrep_gtid_domain_id,
1700                  binlog_opt_val, binlog_index_opt_val,
1701                  bypass ? " " WSREP_SST_OPT_BYPASS : "");
1702 
1703   my_free(binlog_opt_val);
1704   my_free(binlog_index_opt_val);
1705 
1706   if (ret < 0 || size_t(ret) >= cmd_len)
1707   {
1708     WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1709     return (ret < 0 ? ret : -EMSGSIZE);
1710   }
1711 
1712   if (extra_args)
1713     copy_orig_argv(cmd_str() + ret);
1714 
1715   if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1716 
1717   pthread_t tmp;
1718   sst_thread_arg arg(cmd_str(), env);
1719   mysql_mutex_lock (&arg.lock);
1720   ret = mysql_thread_create (key_wsrep_sst_donor, &tmp, NULL, sst_donor_thread, &arg);
1721   if (ret)
1722   {
1723     WSREP_ERROR("sst_donate_other(): mysql_thread_create() failed: %d (%s)",
1724                 ret, strerror(ret));
1725     return ret;
1726   }
1727   mysql_cond_wait (&arg.cond, &arg.lock);
1728 
1729   WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1730   return arg.err;
1731 }
1732 
1733 /* return true if character can be a part of a filename */
filename_char(int const c)1734 static bool filename_char(int const c)
1735 {
1736   return isalnum(c) || (c == '-') || (c == '_') || (c == '.');
1737 }
1738 
1739 /* return true if character can be a part of an address string */
address_char(int const c)1740 static bool address_char(int const c)
1741 {
1742   return filename_char(c) ||
1743          (c == ':') || (c == '[') || (c == ']') || (c == '/');
1744 }
1745 
check_request_str(const char * const str,bool (* check)(int c))1746 static bool check_request_str(const char* const str,
1747                               bool (*check) (int c))
1748 {
1749   for (size_t i(0); str[i] != '\0'; ++i)
1750   {
1751     if (!check(str[i]))
1752     {
1753       WSREP_WARN("Illegal character in state transfer request: %i (%c).",
1754                  str[i], str[i]);
1755       return true;
1756     }
1757   }
1758 
1759   return false;
1760 }
1761 
wsrep_sst_donate_cb(void * app_ctx,void * recv_ctx,const void * msg,size_t msg_len,const wsrep_gtid_t * current_gtid,const char * state,size_t state_len,bool bypass)1762 wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
1763                                        const void* msg, size_t msg_len,
1764                                        const wsrep_gtid_t* current_gtid,
1765                                        const char* state, size_t state_len,
1766                                        bool bypass)
1767 {
1768   const char* method = (char*)msg;
1769   size_t method_len  = strlen (method);
1770 
1771   if (check_request_str(method, filename_char))
1772   {
1773     WSREP_ERROR("Bad SST method name. SST canceled.");
1774     return WSREP_CB_FAILURE;
1775   }
1776 
1777   const char* data   = method + method_len + 1;
1778 
1779   /* check for auth@addr separator */
1780   const char* addr= strrchr(data, '@');
1781   wsp::string remote_auth;
1782   if (addr)
1783   {
1784     remote_auth.set(strndup(data, addr - data));
1785     addr++;
1786   }
1787   else
1788   {
1789     // no auth part
1790     addr= data;
1791   }
1792 
1793   if (check_request_str(addr, address_char))
1794   {
1795     WSREP_ERROR("Bad SST address string. SST canceled.");
1796     return WSREP_CB_FAILURE;
1797   }
1798 
1799   char uuid_str[37];
1800   wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
1801 
1802   /* This will be reset when sync callback is called.
1803    * Should we set wsrep_ready to FALSE here too? */
1804   wsrep_config_state->set(WSREP_MEMBER_DONOR);
1805 
1806   wsp::env env(NULL);
1807   if (env.error())
1808   {
1809     WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1810     return WSREP_CB_FAILURE;
1811   }
1812 
1813   int ret;
1814   if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth_real)))
1815   {
1816     WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1817     return WSREP_CB_FAILURE;
1818   }
1819 
1820   if (remote_auth())
1821   {
1822     if ((ret= sst_append_env_var(env, WSREP_SST_REMOTE_AUTH_ENV,remote_auth())))
1823     {
1824       WSREP_ERROR("wsrep_sst_donate_cb(): appending remote auth env failed: "
1825                   "%d", ret);
1826       return WSREP_CB_FAILURE;
1827     }
1828   }
1829 
1830   if (data_home_dir)
1831   {
1832     if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir)))
1833     {
1834       WSREP_ERROR("wsrep_sst_donate_cb(): appending data "
1835                   "directory failed: %d", ret);
1836       return WSREP_CB_FAILURE;
1837     }
1838   }
1839 
1840   if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1841   {
1842     ret = sst_donate_mysqldump(addr, &current_gtid->uuid, uuid_str,
1843                                current_gtid->seqno, bypass, env());
1844   }
1845   else
1846   {
1847     ret = sst_donate_other(method, addr, uuid_str,
1848                            current_gtid->seqno, bypass, env());
1849   }
1850 
1851   return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
1852 }
1853 
wsrep_SE_init_grab()1854 void wsrep_SE_init_grab()
1855 {
1856   if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort();
1857 }
1858 
wsrep_SE_init_wait()1859 void wsrep_SE_init_wait()
1860 {
1861   double total_wtime=0;
1862 
1863   while (SE_initialized == false)
1864   {
1865     struct timespec wtime;
1866     set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS);
1867     time_t start_time = time(NULL);
1868     mysql_cond_timedwait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init, &wtime);
1869     time_t end_time = time(NULL);
1870 
1871     if (!SE_initialized)
1872     {
1873       total_wtime += difftime(end_time, start_time);
1874       WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime);
1875       service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
1876         "WSREP state transfer ongoing, current seqno: %" PRId64 " waited %f secs", local_seqno, total_wtime);
1877     }
1878   }
1879 
1880   mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1881 }
1882 
wsrep_SE_init_done()1883 void wsrep_SE_init_done()
1884 {
1885   mysql_cond_signal (&COND_wsrep_sst_init);
1886   mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1887 }
1888 
wsrep_SE_initialized()1889 void wsrep_SE_initialized()
1890 {
1891   SE_initialized = true;
1892 }
1893