1 /* Copyright 2008-2021 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.x1
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #include "sql_plugin.h"                         /* wsrep_plugins_pre_init() */
17 #include <mysqld.h>
18 #include <sql_class.h>
19 #include <sql_parse.h>
20 #include <sql_base.h> /* find_temporary_table() */
21 #include "slave.h"
22 #include "rpl_mi.h"
23 #include "sql_repl.h"
24 #include "rpl_filter.h"
25 #include "sql_callback.h"
26 #include "sp_head.h"
27 #include "sql_show.h"
28 #include "sp.h"
29 #include "wsrep_priv.h"
30 #include "wsrep_thd.h"
31 #include "wsrep_sst.h"
32 #include "wsrep_utils.h"
33 #include "wsrep_var.h"
34 #include "wsrep_binlog.h"
35 #include "wsrep_applier.h"
36 #include "wsrep_xid.h"
37 #include <cstdio>
38 #include <cstdlib>
39 #include "log_event.h"
40 
41 wsrep_t *wsrep                  = NULL;
42 /*
43   wsrep_emulate_bin_log is a flag to tell that binlog has not been configured.
44   wsrep needs to get binlog events from transaction cache even when binlog is
45   not enabled, wsrep_emulate_bin_log opens needed code paths to make this
46   possible
47 */
48 my_bool wsrep_emulate_bin_log   = FALSE; // activating parts of binlog interface
49 #ifdef GTID_SUPPORT
50 /* Sidno in global_sid_map corresponding to group uuid */
51 rpl_sidno wsrep_sidno= -1;
52 #endif /* GTID_SUPPORT */
53 my_bool wsrep_preordered_opt= FALSE;
54 
55 /*
56  * Begin configuration options
57  */
58 
59 extern my_bool plugins_are_initialized;
60 extern uint kill_cached_threads;
61 extern mysql_cond_t COND_thread_cache;
62 
63 /* System variables. */
64 const char *wsrep_provider;
65 const char *wsrep_provider_options;
66 const char *wsrep_cluster_address;
67 const char *wsrep_cluster_name;
68 const char *wsrep_node_name;
69 const char *wsrep_node_address;
70 const char *wsrep_node_incoming_address;
71 const char *wsrep_start_position;
72 const char *wsrep_data_home_dir;
73 const char *wsrep_dbug_option;
74 const char *wsrep_notify_cmd;
75 
76 my_bool wsrep_debug;                            // Enable debug level logging
77 my_bool wsrep_convert_LOCK_to_trx;              // Convert locking sessions to trx
78 my_bool wsrep_auto_increment_control;           // Control auto increment variables
79 my_bool wsrep_drupal_282555_workaround;         // Retry autoinc insert after dupkey
80 my_bool wsrep_certify_nonPK;                    // Certify, even when no primary key
81 ulong   wsrep_certification_rules      = WSREP_CERTIFICATION_RULES_STRICT;
82 my_bool wsrep_recovery;                         // Recovery
83 my_bool wsrep_replicate_myisam;                 // Enable MyISAM replication
84 my_bool wsrep_log_conflicts;
85 my_bool wsrep_load_data_splitting;              // Commit load data every 10K intervals
86 my_bool wsrep_slave_UK_checks;                  // Slave thread does UK checks
87 my_bool wsrep_slave_FK_checks;                  // Slave thread does FK checks
88 my_bool wsrep_restart_slave;                    // Should mysql slave thread be
89                                                 // restarted, when node joins back?
90 my_bool wsrep_desync;                           // De(re)synchronize the node from the
91                                                 // cluster
92 long wsrep_slave_threads;                       // No. of slave appliers threads
93 ulong wsrep_retry_autocommit;                   // Retry aborted autocommit trx
94 ulong wsrep_max_ws_size;                        // Max allowed ws (RBR buffer) size
95 ulong wsrep_max_ws_rows;                        // Max number of rows in ws
96 ulong wsrep_forced_binlog_format;
97 ulong wsrep_mysql_replication_bundle;
98 bool wsrep_gtid_mode;                           // Use wsrep_gtid_domain_id
99                                                 // for galera transactions?
100 uint32 wsrep_gtid_domain_id;                    // gtid_domain_id for galera
101                                                 // transactions
102 
103 /* Other configuration variables and their default values. */
104 my_bool wsrep_incremental_data_collection= 0;   // Incremental data collection
105 my_bool wsrep_restart_slave_activated= 0;       // Node has dropped, and slave
106                                                 // restart will be needed
107 bool wsrep_new_cluster= false;                  // Bootstrap the cluster?
108 int wsrep_slave_count_change= 0;                // No. of appliers to stop/start
109 int wsrep_to_isolation= 0;                      // No. of active TO isolation threads
110 long wsrep_max_protocol_version= 3;             // Maximum protocol version to use
111 
112 /*
113  * End configuration options
114  */
115 
116 /*
117  * Other wsrep global variables.
118  */
119 
120 mysql_mutex_t LOCK_wsrep_ready;
121 mysql_cond_t  COND_wsrep_ready;
122 mysql_mutex_t LOCK_wsrep_sst;
123 mysql_cond_t  COND_wsrep_sst;
124 mysql_mutex_t LOCK_wsrep_sst_init;
125 mysql_cond_t  COND_wsrep_sst_init;
126 mysql_mutex_t LOCK_wsrep_rollback;
127 mysql_cond_t  COND_wsrep_rollback;
128 wsrep_aborting_thd_t wsrep_aborting_thd= NULL;
129 mysql_mutex_t LOCK_wsrep_replaying;
130 mysql_cond_t  COND_wsrep_replaying;
131 mysql_mutex_t LOCK_wsrep_slave_threads;
132 mysql_mutex_t LOCK_wsrep_desync;
133 mysql_mutex_t LOCK_wsrep_config_state;
134 
135 int wsrep_replaying= 0;
136 ulong  wsrep_running_threads = 0; // # of currently running wsrep
137 				  // # threads
138 ulong  wsrep_running_applier_threads = 0; // # of running applier threads
139 ulong  wsrep_running_rollbacker_threads = 0; // # of running
140 					     // # rollbacker threads
141 ulong  my_bind_addr;
142 
143 #ifdef HAVE_PSI_INTERFACE
144 PSI_mutex_key key_LOCK_wsrep_rollback,
145   key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
146   key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
147   key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
148   key_LOCK_wsrep_config_state;
149 
150 PSI_cond_key key_COND_wsrep_rollback,
151   key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
152   key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
153 
154 PSI_file_key key_file_wsrep_gra_log;
155 
156 static PSI_mutex_info wsrep_mutexes[]=
157 {
158   { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
159   { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
160   { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
161   { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
162   { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
163   { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL},
164   { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
165   { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
166   { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
167   { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
168 };
169 
170 static PSI_cond_info wsrep_conds[]=
171 {
172   { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL},
173   { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
174   { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
175   { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
176   { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
177   { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
178 };
179 
180 static PSI_file_info wsrep_files[]=
181 {
182   { &key_file_wsrep_gra_log, "wsrep_gra_log", 0}
183 };
184 
185 PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
186   key_wsrep_rollbacker, key_wsrep_applier;
187 
188 static PSI_thread_info wsrep_threads[]=
189 {
190  {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
191  {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
192  {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
193  {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}
194 };
195 
196 #endif /* HAVE_PSI_INTERFACE */
197 
198 my_bool wsrep_inited                   = 0; // initialized ?
199 
200 static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
201 static char         cluster_uuid_str[40]= { 0, };
202 static const char*  cluster_status_str[WSREP_VIEW_MAX] =
203 {
204     "Primary",
205     "non-Primary",
206     "Disconnected"
207 };
208 
209 static char provider_name[256]= { 0, };
210 static char provider_version[256]= { 0, };
211 static char provider_vendor[256]= { 0, };
212 
213 /*
214  * wsrep status variables
215  */
216 my_bool     wsrep_connected          = FALSE;
217 my_bool     wsrep_ready              = FALSE; // node can accept queries
218 const char* wsrep_cluster_state_uuid = cluster_uuid_str;
219 long long   wsrep_cluster_conf_id    = WSREP_SEQNO_UNDEFINED;
220 const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
221 long        wsrep_cluster_size       = 0;
222 long        wsrep_local_index        = -1;
223 long long   wsrep_local_bf_aborts    = 0;
224 const char* wsrep_provider_name      = provider_name;
225 const char* wsrep_provider_version   = provider_version;
226 const char* wsrep_provider_vendor    = provider_vendor;
227 /* End wsrep status variables */
228 
229 wsrep_uuid_t     local_uuid   = WSREP_UUID_UNDEFINED;
230 wsrep_seqno_t    local_seqno  = WSREP_SEQNO_UNDEFINED;
231 long             wsrep_protocol_version = 3;
232 
233 wsp::Config_state *wsrep_config_state;
234 
235 // Boolean denoting if server is in initial startup phase. This is needed
236 // to make sure that main thread waiting in wsrep_sst_wait() is signaled
237 // if there was no state gap on receiving first view event.
238 static my_bool   wsrep_startup = TRUE;
239 
240 
wsrep_log_cb(wsrep_log_level_t level,const char * msg)241 static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
242   switch (level) {
243   case WSREP_LOG_INFO:
244     sql_print_information("WSREP: %s", msg);
245     break;
246   case WSREP_LOG_WARN:
247     sql_print_warning("WSREP: %s", msg);
248     break;
249   case WSREP_LOG_ERROR:
250   case WSREP_LOG_FATAL:
251     sql_print_error("WSREP: %s", msg);
252     break;
253   case WSREP_LOG_DEBUG:
254     if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
255   default:
256     break;
257   }
258 }
259 
wsrep_log(void (* fun)(const char *,...),const char * format,...)260 void wsrep_log(void (*fun)(const char *, ...), const char *format, ...)
261 {
262   va_list args;
263   char msg[1024];
264   va_start(args, format);
265   vsnprintf(msg, sizeof(msg) - 1, format, args);
266   va_end(args);
267   (fun)("WSREP: %s", msg);
268 }
269 
270 
wsrep_log_states(wsrep_log_level_t const level,const wsrep_uuid_t * const group_uuid,wsrep_seqno_t const group_seqno,const wsrep_uuid_t * const node_uuid,wsrep_seqno_t const node_seqno)271 static void wsrep_log_states (wsrep_log_level_t   const level,
272                               const wsrep_uuid_t* const group_uuid,
273                               wsrep_seqno_t       const group_seqno,
274                               const wsrep_uuid_t* const node_uuid,
275                               wsrep_seqno_t       const node_seqno)
276 {
277   char uuid_str[37];
278   char msg[256];
279 
280   wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
281   snprintf (msg, 255, "WSREP: Group state: %s:%lld",
282             uuid_str, (long long)group_seqno);
283   wsrep_log_cb (level, msg);
284 
285   wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
286   snprintf (msg, 255, "WSREP: Local state: %s:%lld",
287             uuid_str, (long long)node_seqno);
288   wsrep_log_cb (level, msg);
289 }
290 
291 #ifdef GTID_SUPPORT
wsrep_init_sidno(const wsrep_uuid_t & wsrep_uuid)292 void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid)
293 {
294   /* generate new Sid map entry from inverted uuid */
295   rpl_sid sid;
296   wsrep_uuid_t ltid_uuid;
297 
298   for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
299   {
300       ltid_uuid.data[i] = ~wsrep_uuid.data[i];
301   }
302 
303   sid.copy_from(ltid_uuid.data);
304   global_sid_lock->wrlock();
305   wsrep_sidno= global_sid_map->add_sid(sid);
306   WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno);
307   global_sid_lock->unlock();
308 }
309 #endif /* GTID_SUPPORT */
310 
311 static wsrep_cb_status_t
wsrep_view_handler_cb(void * app_ctx,void * recv_ctx,const wsrep_view_info_t * view,const char * state,size_t state_len,void ** sst_req,size_t * sst_req_len)312 wsrep_view_handler_cb (void*                    app_ctx,
313                        void*                    recv_ctx,
314                        const wsrep_view_info_t* view,
315                        const char*              state,
316                        size_t                   state_len,
317                        void**                   sst_req,
318                        size_t*                  sst_req_len)
319 {
320   *sst_req     = NULL;
321   *sst_req_len = 0;
322 
323   wsrep_member_status_t memb_status= wsrep_config_state->get_status();
324 
325   if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
326   {
327     memcpy(&cluster_uuid, &view->state_id.uuid, sizeof(cluster_uuid));
328 
329     wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
330                       sizeof(cluster_uuid_str));
331   }
332 
333   wsrep_cluster_conf_id= view->view;
334   wsrep_cluster_status= cluster_status_str[view->status];
335   wsrep_cluster_size= view->memb_num;
336   wsrep_local_index= view->my_idx;
337 
338   WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
339              "number of nodes: %ld, my index: %ld, protocol version %d",
340              wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
341              (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
342              wsrep_cluster_size, wsrep_local_index, view->proto_ver);
343 
344   /* Proceed further only if view is PRIMARY */
345   if (WSREP_VIEW_PRIMARY != view->status)
346   {
347 #ifdef HAVE_QUERY_CACHE
348     // query cache must be initialised by now
349     query_cache.flush();
350 #endif /* HAVE_QUERY_CACHE */
351 
352     wsrep_ready_set(FALSE);
353     memb_status= WSREP_MEMBER_UNDEFINED;
354     /* Always record local_uuid and local_seqno in non-prim since this
355      * may lead to re-initializing provider and start position is
356      * determined according to these variables */
357     // WRONG! local_uuid should be the last primary configuration uuid we were
358     // a member of. local_seqno should be updated in commit calls.
359     // local_uuid= cluster_uuid;
360     // local_seqno= view->first - 1;
361     goto out;
362   }
363 
364   switch (view->proto_ver)
365   {
366   case 0:
367   case 1:
368   case 2:
369   case 3:
370       // version change
371       if (view->proto_ver != wsrep_protocol_version)
372       {
373           my_bool wsrep_ready_saved= wsrep_ready_get();
374           wsrep_ready_set(FALSE);
375           WSREP_INFO("closing client connections for "
376                      "protocol change %ld -> %d",
377                      wsrep_protocol_version, view->proto_ver);
378           wsrep_close_client_connections(TRUE);
379           wsrep_protocol_version= view->proto_ver;
380           wsrep_ready_set(wsrep_ready_saved);
381       }
382       break;
383   default:
384       WSREP_ERROR("Unsupported application protocol version: %d",
385                   view->proto_ver);
386       unireg_abort(1);
387   }
388 
389   if (view->state_gap)
390   {
391     WSREP_WARN("Gap in state sequence. Need state transfer.");
392 
393     /* After that wsrep will call wsrep_sst_prepare. */
394     /* keep ready flag 0 until we receive the snapshot */
395     wsrep_ready_set(FALSE);
396 
397     /* Close client connections to ensure that they don't interfere
398      * with SST. Necessary only if storage engines are initialized
399      * before SST.
400      * TODO: Just killing all ongoing transactions should be enough
401      * since wsrep_ready is OFF and no new transactions can start.
402      */
403     if (!wsrep_before_SE())
404     {
405         WSREP_DEBUG("[debug]: closing client connections for PRIM");
406         wsrep_close_client_connections(FALSE);
407     }
408 
409     ssize_t const req_len= wsrep_sst_prepare (sst_req);
410 
411     if (req_len < 0)
412     {
413       WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
414                   strerror(-req_len));
415       memb_status= WSREP_MEMBER_UNDEFINED;
416     }
417     else
418     {
419       assert(sst_req != NULL);
420       *sst_req_len= req_len;
421       memb_status= WSREP_MEMBER_JOINER;
422     }
423   }
424   else
425   {
426     /*
427      *  NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
428      *  before - OR - it was reinitilized on startup (lp:992840)
429      */
430     if (wsrep_startup)
431     {
432       if (wsrep_before_SE())
433       {
434         wsrep_SE_init_grab();
435         // Signal mysqld init thread to continue
436         wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
437         // and wait for SE initialization
438         wsrep_SE_init_wait();
439       }
440       else
441       {
442         local_uuid=  cluster_uuid;
443         local_seqno= view->state_id.seqno;
444       }
445       /* Init storage engine XIDs from first view */
446       wsrep_set_SE_checkpoint(local_uuid, local_seqno);
447 #ifdef GTID_SUPPORT
448       wsrep_init_sidno(local_uuid);
449 #endif /* GTID_SUPPORT */
450       memb_status= WSREP_MEMBER_JOINED;
451     }
452 
453     // just some sanity check
454     if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
455     {
456       WSREP_ERROR("Undetected state gap. Can't continue.");
457       wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
458                        &local_uuid, -1);
459       unireg_abort(1);
460     }
461   }
462 
463   if (wsrep_auto_increment_control)
464   {
465     global_system_variables.auto_increment_offset= view->my_idx + 1;
466     global_system_variables.auto_increment_increment= view->memb_num;
467   }
468 
469   { /* capabilities may be updated on new configuration */
470     uint64_t const caps(wsrep->capabilities (wsrep));
471 
472     my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
473     if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
474     {
475       WSREP_WARN("Unsupported protocol downgrade: "
476                  "incremental data collection disabled. Expect abort.");
477     }
478     wsrep_incremental_data_collection = idc;
479   }
480 
481 out:
482   if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE;
483   wsrep_config_state->set(memb_status, view);
484 
485   return WSREP_CB_SUCCESS;
486 }
487 
wsrep_ready_set(my_bool x)488 my_bool wsrep_ready_set (my_bool x)
489 {
490   WSREP_DEBUG("Setting wsrep_ready to %d", x);
491   if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
492   my_bool ret= (wsrep_ready != x);
493   if (ret)
494   {
495     wsrep_ready= x;
496     mysql_cond_signal (&COND_wsrep_ready);
497   }
498   mysql_mutex_unlock (&LOCK_wsrep_ready);
499   return ret;
500 }
501 
wsrep_ready_get(void)502 my_bool wsrep_ready_get (void)
503 {
504   if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
505   my_bool ret= wsrep_ready;
506   mysql_mutex_unlock (&LOCK_wsrep_ready);
507   return ret;
508 }
509 
wsrep_show_ready(THD * thd,SHOW_VAR * var,char * buff)510 int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff)
511 {
512   var->type= SHOW_MY_BOOL;
513   var->value= buff;
514   *((my_bool *)buff)= wsrep_ready_get();
515   return 0;
516 }
517 
518 // Wait until wsrep has reached ready state
wsrep_ready_wait()519 void wsrep_ready_wait ()
520 {
521   if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
522   while (!wsrep_ready)
523   {
524     WSREP_INFO("Waiting to reach ready state");
525     mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
526   }
527   WSREP_INFO("ready state reached");
528   mysql_mutex_unlock (&LOCK_wsrep_ready);
529 }
530 
wsrep_synced_cb(void * app_ctx)531 static void wsrep_synced_cb(void* app_ctx)
532 {
533   WSREP_INFO("Synchronized with group, ready for connections");
534   my_bool signal_main= wsrep_ready_set(TRUE);
535   wsrep_config_state->set(WSREP_MEMBER_SYNCED);
536 
537   if (signal_main)
538   {
539       wsrep_SE_init_grab();
540       // Signal mysqld init thread to continue
541       wsrep_sst_complete (&local_uuid, local_seqno, false);
542       // and wait for SE initialization
543       wsrep_SE_init_wait();
544   }
545   if (wsrep_restart_slave_activated)
546   {
547     int rcode;
548     WSREP_INFO("MariaDB slave restart");
549     wsrep_restart_slave_activated= FALSE;
550 
551     mysql_mutex_lock(&LOCK_active_mi);
552     if ((rcode = start_slave_threads(0,
553                                      1 /* need mutex */,
554                                      0 /* no wait for start*/,
555                                      active_mi,
556                                      master_info_file,
557                                      relay_log_info_file,
558                                      SLAVE_SQL)))
559     {
560       WSREP_WARN("Failed to create slave threads: %d", rcode);
561     }
562     mysql_mutex_unlock(&LOCK_active_mi);
563 
564   }
565 }
566 
wsrep_init_position()567 static void wsrep_init_position()
568 {
569   /* read XIDs from storage engines */
570   wsrep_uuid_t uuid;
571   wsrep_seqno_t seqno;
572   wsrep_get_SE_checkpoint(uuid, seqno);
573 
574   if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)))
575   {
576     WSREP_INFO("Read nil XID from storage engines, skipping position init");
577     return;
578   }
579 
580   char uuid_str[40] = {0, };
581   wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
582   WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
583 
584   if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
585       local_seqno == WSREP_SEQNO_UNDEFINED)
586   {
587     // Initial state
588     local_uuid= uuid;
589     local_seqno= seqno;
590   }
591   else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) ||
592            local_seqno != seqno)
593   {
594     WSREP_WARN("Initial position was provided by configuration or SST, "
595                "avoiding override");
596   }
597 }
598 
599 extern char* my_bind_addr_str;
600 
wsrep_init()601 int wsrep_init()
602 {
603   int rcode= -1;
604   DBUG_ASSERT(wsrep_inited == 0);
605 
606   if (strcmp(wsrep_start_position, WSREP_START_POSITION_ZERO) &&
607       wsrep_start_position_init(wsrep_start_position))
608   {
609     return 1;
610   }
611 
612   wsrep_sst_auth_init();
613 
614   wsrep_ready_set(FALSE);
615   assert(wsrep_provider);
616 
617   wsrep_init_position();
618 
619   if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
620   {
621     if (strcasecmp(wsrep_provider, WSREP_NONE))
622     {
623       WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
624                   wsrep_provider, strerror(rcode), rcode);
625       strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
626       return wsrep_init();
627     }
628     else /* this is for recursive call above */
629     {
630       WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
631                   strerror(rcode), rcode);
632       unireg_abort(1);
633     }
634   }
635 
636   if (!WSREP_PROVIDER_EXISTS)
637   {
638     // enable normal operation in case no provider is specified
639     wsrep_ready_set(TRUE);
640     global_system_variables.wsrep_on = 0;
641     wsrep_init_args args;
642     args.logger_cb = wsrep_log_cb;
643     args.options = (wsrep_provider_options) ?
644             wsrep_provider_options : "";
645     rcode = wsrep->init(wsrep, &args);
646     if (rcode)
647     {
648       DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
649       WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
650       wsrep_ready_set(FALSE);
651       wsrep->free(wsrep);
652       free(wsrep);
653       wsrep = NULL;
654     }
655     else
656     {
657       wsrep_inited= 1;
658     }
659     return rcode;
660   }
661   else
662   {
663     global_system_variables.wsrep_on = 1;
664     strncpy(provider_name,
665             wsrep->provider_name,    sizeof(provider_name) - 1);
666     strncpy(provider_version,
667             wsrep->provider_version, sizeof(provider_version) - 1);
668     strncpy(provider_vendor,
669             wsrep->provider_vendor,  sizeof(provider_vendor) - 1);
670   }
671 
672   if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
673     wsrep_data_home_dir = mysql_real_data_home;
674 
675   /* Initialize node address */
676   char node_addr[512]= { 0, };
677   size_t const node_addr_max= sizeof(node_addr) - 1;
678   if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
679   {
680     size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
681     if (!(ret > 0 && ret < node_addr_max))
682     {
683       WSREP_WARN("Failed to guess base node address. Set it explicitly via "
684                  "wsrep_node_address.");
685       node_addr[0]= '\0';
686     }
687   }
688   else
689   {
690     strncpy(node_addr, wsrep_node_address, node_addr_max);
691   }
692 
693   /* Initialize node's incoming address */
694   char inc_addr[512]= { 0, };
695   size_t const inc_addr_max= sizeof (inc_addr);
696 
697   /*
698     In case wsrep_node_incoming_address is either not set or set to AUTO,
699     we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback
700     to wsrep_node_address' value if mysqld's bind-address is not set either.
701   */
702   if ((!wsrep_node_incoming_address ||
703        !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
704   {
705     bool is_ipv6= false;
706     unsigned int my_bind_ip= INADDR_ANY; // default if not set
707 
708     if (my_bind_addr_str && strlen(my_bind_addr_str))
709     {
710       my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6);
711     }
712 
713     if (INADDR_ANY != my_bind_ip)
714     {
715       /*
716         If its a not a valid address, leave inc_addr as empty string. mysqld
717         is not listening for client connections on network interfaces.
718       */
719       if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
720       {
721         const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u";
722         snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port);
723       }
724     }
725     else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */
726     {
727       size_t const node_addr_len= strlen(node_addr);
728       if (node_addr_len > 0)
729       {
730         wsp::Address addr(node_addr);
731 
732         if (!addr.is_valid())
733         {
734           WSREP_DEBUG("Could not parse node address : %s", node_addr);
735           WSREP_WARN("Guessing address for incoming client connections failed. "
736                      "Try setting wsrep_node_incoming_address explicitly.");
737           goto done;
738         }
739 
740         const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
741         snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(),
742                  (int) mysqld_port);
743       }
744     }
745   }
746   else
747   {
748     wsp::Address addr(wsrep_node_incoming_address);
749 
750     if (!addr.is_valid())
751     {
752       WSREP_WARN("Could not parse wsrep_node_incoming_address : %s",
753                  wsrep_node_incoming_address);
754       goto done;
755     }
756 
757     /*
758       In case port is not specified in wsrep_node_incoming_address, we use
759       mysqld_port.
760     */
761     int port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port;
762     const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
763 
764     snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port);
765   }
766 
767 done:
768   struct wsrep_init_args wsrep_args;
769 
770   struct wsrep_gtid const state_id = { local_uuid, local_seqno };
771 
772   wsrep_args.data_dir        = wsrep_data_home_dir;
773   wsrep_args.node_name       = (wsrep_node_name) ? wsrep_node_name : "";
774   wsrep_args.node_address    = node_addr;
775   wsrep_args.node_incoming   = inc_addr;
776   wsrep_args.options         = (wsrep_provider_options) ?
777                                 wsrep_provider_options : "";
778   wsrep_args.proto_ver       = wsrep_max_protocol_version;
779 
780   wsrep_args.state_id        = &state_id;
781 
782   wsrep_args.logger_cb       = wsrep_log_cb;
783   wsrep_args.view_handler_cb = wsrep_view_handler_cb;
784   wsrep_args.apply_cb        = wsrep_apply_cb;
785   wsrep_args.commit_cb       = wsrep_commit_cb;
786   wsrep_args.unordered_cb    = wsrep_unordered_cb;
787   wsrep_args.sst_donate_cb   = wsrep_sst_donate_cb;
788   wsrep_args.synced_cb       = wsrep_synced_cb;
789 
790   rcode = wsrep->init(wsrep, &wsrep_args);
791 
792   if (rcode)
793   {
794     DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
795     WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
796     wsrep->free(wsrep);
797     free(wsrep);
798     wsrep = NULL;
799   } else {
800     wsrep_inited= 1;
801   }
802 
803   return rcode;
804 }
805 
806 
807 /* Initialize wsrep thread LOCKs and CONDs */
wsrep_thr_init()808 void wsrep_thr_init()
809 {
810   DBUG_ENTER("wsrep_thr_init");
811   wsrep_config_state = new wsp::Config_state;
812 #ifdef HAVE_PSI_INTERFACE
813   mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes));
814   mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds));
815   mysql_file_register("sql", wsrep_files, array_elements(wsrep_files));
816   mysql_thread_register("sql", wsrep_threads, array_elements(wsrep_threads));
817 #endif
818 
819   mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
820   mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
821   mysql_mutex_init(key_LOCK_wsrep_sst, &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
822   mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
823   mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
824   mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
825   mysql_mutex_init(key_LOCK_wsrep_rollback, &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
826   mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
827   mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
828   mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
829   mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
830   mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
831   mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
832 
833   DBUG_VOID_RETURN;
834 }
835 
836 /* This is wrapper for wsrep_break_lock in thr_lock.c */
wsrep_thr_abort_thd(void * bf_thd_ptr,void * victim_thd_ptr,my_bool signal)837 static int wsrep_thr_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
838 {
839   THD* victim_thd= (THD *) victim_thd_ptr;
840   /* We need to lock THD::LOCK_thd_data to protect victim
841   from concurrent usage or disconnect or delete. */
842   wsrep_thd_LOCK(victim_thd);
843   int res= wsrep_abort_thd(bf_thd_ptr, victim_thd_ptr, signal);
844   return res;
845 }
846 
847 
wsrep_init_startup(bool first)848 void wsrep_init_startup (bool first)
849 {
850   if (wsrep_init()) unireg_abort(1);
851 
852   wsrep_thr_lock_init(
853      (wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF,
854      (wsrep_abort_thd_fun)wsrep_thr_abort_thd,
855      wsrep_debug, wsrep_convert_LOCK_to_trx,
856      (wsrep_on_fun)wsrep_on);
857 
858   /*
859     Pre-initialize global_system_variables.table_plugin with a dummy engine
860     (placeholder) required during the initialization of wsrep threads (THDs).
861     (see: plugin_thdvar_init())
862     Note: This only needs to be done for rsync & mariabackup based SST methods.
863     In case of mysqldump SST method, the wsrep threads are created after the
864     server plugins & global system variables are initialized.
865   */
866   if (wsrep_before_SE())
867     wsrep_plugins_pre_init();
868 
869   /* Skip replication start if dummy wsrep provider is loaded */
870   if (!strcmp(wsrep_provider, WSREP_NONE)) return;
871 
872   /* Skip replication start if no cluster address */
873   if (!wsrep_cluster_address || wsrep_cluster_address[0] == 0) return;
874 
875   if (first) wsrep_sst_grab(); // do it so we can wait for SST below
876 
877   if (!wsrep_start_replication()) unireg_abort(1);
878 
879   wsrep_create_rollbacker();
880   wsrep_create_appliers(1);
881 
882   if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
883 }
884 
885 
wsrep_deinit(bool free_options)886 void wsrep_deinit(bool free_options)
887 {
888   DBUG_ASSERT(wsrep_inited == 1);
889   wsrep_unload(wsrep);
890   wsrep= 0;
891   provider_name[0]=    '\0';
892   provider_version[0]= '\0';
893   provider_vendor[0]=  '\0';
894 
895   wsrep_inited= 0;
896 
897   if (free_options)
898   {
899     wsrep_sst_auth_free();
900   }
901 }
902 
903 /* Destroy wsrep thread LOCKs and CONDs */
wsrep_thr_deinit()904 void wsrep_thr_deinit()
905 {
906   if (!wsrep_config_state)
907     return;                                     // Never initialized
908   mysql_mutex_destroy(&LOCK_wsrep_ready);
909   mysql_cond_destroy(&COND_wsrep_ready);
910   mysql_mutex_destroy(&LOCK_wsrep_sst);
911   mysql_cond_destroy(&COND_wsrep_sst);
912   mysql_mutex_destroy(&LOCK_wsrep_sst_init);
913   mysql_cond_destroy(&COND_wsrep_sst_init);
914   mysql_mutex_destroy(&LOCK_wsrep_rollback);
915   mysql_cond_destroy(&COND_wsrep_rollback);
916   mysql_mutex_destroy(&LOCK_wsrep_replaying);
917   mysql_cond_destroy(&COND_wsrep_replaying);
918   mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
919   mysql_mutex_destroy(&LOCK_wsrep_desync);
920   mysql_mutex_destroy(&LOCK_wsrep_config_state);
921   delete wsrep_config_state;
922   wsrep_config_state= 0;                        // Safety
923 }
924 
wsrep_recover()925 void wsrep_recover()
926 {
927   char uuid_str[40];
928 
929   if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) &&
930       local_seqno == -2)
931   {
932     wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
933     WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
934                uuid_str, (long long)local_seqno);
935     return;
936   }
937   wsrep_uuid_t uuid;
938   wsrep_seqno_t seqno;
939   wsrep_get_SE_checkpoint(uuid, seqno);
940   wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
941   WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno);
942 }
943 
944 
wsrep_stop_replication(THD * thd)945 void wsrep_stop_replication(THD *thd)
946 {
947   WSREP_INFO("Stop replication");
948   if (!wsrep)
949   {
950     WSREP_INFO("Provider was not loaded, in stop replication");
951     return;
952   }
953 
954   /* disconnect from group first to get wsrep_ready == FALSE */
955   WSREP_DEBUG("Provider disconnect");
956   wsrep->disconnect(wsrep);
957 
958   wsrep_connected= FALSE;
959 
960   wsrep_close_client_connections(TRUE);
961 
962   /* wait until appliers have stopped */
963   wsrep_wait_appliers_close(thd);
964 
965   return;
966 }
967 
wsrep_start_replication()968 bool wsrep_start_replication()
969 {
970   wsrep_status_t rcode;
971 
972   /* wsrep provider must be loaded. */
973   DBUG_ASSERT(wsrep);
974 
975   /*
976     if provider is trivial, don't even try to connect,
977     but resume local node operation
978   */
979   if (!WSREP_PROVIDER_EXISTS)
980   {
981     // enable normal operation in case no provider is specified
982     wsrep_ready_set(TRUE);
983     return true;
984   }
985 
986   if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
987   {
988     // if provider is non-trivial, but no address is specified, wait for address
989     wsrep_ready_set(FALSE);
990     return true;
991   }
992 
993   bool const bootstrap= wsrep_new_cluster;
994 
995   WSREP_INFO("Start replication");
996 
997   if (wsrep_new_cluster)
998   {
999     WSREP_INFO("'wsrep-new-cluster' option used, bootstrapping the cluster");
1000     wsrep_new_cluster= false;
1001   }
1002 
1003   if ((rcode = wsrep->connect(wsrep,
1004                               wsrep_cluster_name,
1005                               wsrep_cluster_address,
1006                               wsrep_sst_donor,
1007                               bootstrap)))
1008   {
1009     DBUG_PRINT("wsrep",("wsrep->connect(%s) failed: %d",
1010                         wsrep_cluster_address, rcode));
1011     WSREP_ERROR("wsrep::connect(%s) failed: %d",
1012                 wsrep_cluster_address, rcode);
1013     return false;
1014   }
1015   else
1016   {
1017     wsrep_connected= TRUE;
1018 
1019     char* opts= wsrep->options_get(wsrep);
1020     if (opts)
1021     {
1022       wsrep_provider_options_init(opts);
1023       free(opts);
1024     }
1025     else
1026     {
1027       WSREP_WARN("Failed to get wsrep options");
1028     }
1029   }
1030 
1031   return true;
1032 }
1033 
wsrep_must_sync_wait(THD * thd,uint mask)1034 bool wsrep_must_sync_wait (THD* thd, uint mask)
1035 {
1036   return (thd->variables.wsrep_sync_wait & mask) &&
1037     thd->variables.wsrep_on &&
1038     !(thd->variables.wsrep_dirty_reads &&
1039       !is_update_query(thd->lex->sql_command)) &&
1040     !thd->in_active_multi_stmt_transaction() &&
1041     thd->wsrep_conflict_state != REPLAYING &&
1042     thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED;
1043 }
1044 
wsrep_sync_wait(THD * thd,uint mask)1045 bool wsrep_sync_wait (THD* thd, uint mask)
1046 {
1047   if (wsrep_must_sync_wait(thd, mask))
1048   {
1049     WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
1050                 thd->variables.wsrep_sync_wait, mask);
1051     // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
1052     // TODO: modify to check if thd has locked any rows.
1053     wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid);
1054 
1055     if (unlikely(WSREP_OK != ret))
1056     {
1057       const char* msg;
1058       int err;
1059 
1060       // Possibly relevant error codes:
1061       // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
1062       // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
1063       // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
1064 
1065       switch (ret)
1066       {
1067       case WSREP_NOT_IMPLEMENTED:
1068         msg= "synchronous reads by wsrep backend. "
1069              "Please unset wsrep_causal_reads variable.";
1070         err= ER_NOT_SUPPORTED_YET;
1071         break;
1072       default:
1073         msg= "Synchronous wait failed.";
1074         err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
1075                                    //       with ER_LOCK_WAIT_TIMEOUT
1076       }
1077 
1078       my_error(err, MYF(0), msg);
1079 
1080       return true;
1081     }
1082   }
1083 
1084   return false;
1085 }
1086 
wsrep_keys_free(wsrep_key_arr_t * key_arr)1087 void wsrep_keys_free(wsrep_key_arr_t* key_arr)
1088 {
1089     for (size_t i= 0; i < key_arr->keys_len; ++i)
1090     {
1091         my_free((void*)key_arr->keys[i].key_parts);
1092     }
1093     my_free(key_arr->keys);
1094     key_arr->keys= 0;
1095     key_arr->keys_len= 0;
1096 }
1097 
1098 
1099 /*!
1100  * @param db      Database string
1101  * @param table   Table string
1102  * @param key     Array of wsrep_key_t
1103  * @param key_len In: number of elements in key array, Out: number of
1104  *                elements populated
1105  *
1106  * @return true if preparation was successful, otherwise false.
1107  */
1108 
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_buf_t * key,size_t * key_len)1109 static bool wsrep_prepare_key_for_isolation(const char* db,
1110                                             const char* table,
1111                                             wsrep_buf_t* key,
1112                                             size_t* key_len)
1113 {
1114   if (*key_len < 2) return false;
1115 
1116   switch (wsrep_protocol_version)
1117   {
1118   case 0:
1119     *key_len= 0;
1120     break;
1121   case 1:
1122   case 2:
1123   case 3:
1124   {
1125     *key_len= 0;
1126     if (db)
1127     {
1128       // sql_print_information("%s.%s", db, table);
1129       key[*key_len].ptr= db;
1130       key[*key_len].len= strlen(db);
1131       ++(*key_len);
1132       if (table)
1133       {
1134         key[*key_len].ptr= table;
1135         key[*key_len].len= strlen(table);
1136         ++(*key_len);
1137       }
1138     }
1139     break;
1140   }
1141   default:
1142     return false;
1143   }
1144   return true;
1145 }
1146 
1147 
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_key_arr_t * ka)1148 static bool wsrep_prepare_key_for_isolation(const char* db,
1149                                             const char* table,
1150                                             wsrep_key_arr_t* ka)
1151 {
1152   wsrep_key_t* tmp;
1153 
1154   if (!ka->keys)
1155     tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t),
1156                                  MYF(0));
1157   else
1158     tmp= (wsrep_key_t*)my_realloc(ka->keys,
1159                                  (ka->keys_len + 1) * sizeof(wsrep_key_t),
1160                                  MYF(0));
1161 
1162   if (!tmp)
1163   {
1164     WSREP_ERROR("Can't allocate memory for key_array");
1165     return false;
1166   }
1167   ka->keys= tmp;
1168   if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
1169         my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
1170   {
1171     WSREP_ERROR("Can't allocate memory for key_parts");
1172     return false;
1173   }
1174   ka->keys[ka->keys_len].key_parts_num= 2;
1175   ++ka->keys_len;
1176   if (!wsrep_prepare_key_for_isolation(db, table,
1177                                        (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
1178                                        &ka->keys[ka->keys_len - 1].key_parts_num))
1179   {
1180     WSREP_ERROR("Preparing keys for isolation failed");
1181     return false;
1182   }
1183 
1184   return true;
1185 }
1186 
1187 
wsrep_prepare_keys_for_alter_add_fk(const char * child_table_db,Alter_info * alter_info,wsrep_key_arr_t * ka)1188 static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
1189                                                 Alter_info* alter_info,
1190                                                 wsrep_key_arr_t* ka)
1191 {
1192   Key *key;
1193   List_iterator<Key> key_iterator(alter_info->key_list);
1194   while ((key= key_iterator++))
1195   {
1196     if (key->type == Key::FOREIGN_KEY)
1197     {
1198       Foreign_key *fk_key= (Foreign_key *)key;
1199       const char *db_name= fk_key->ref_db.str;
1200       const char *table_name= fk_key->ref_table.str;
1201       if (!db_name)
1202       {
1203         db_name= child_table_db;
1204       }
1205       if (!wsrep_prepare_key_for_isolation(db_name, table_name, ka))
1206       {
1207         return false;
1208       }
1209     }
1210   }
1211   return true;
1212 }
1213 
1214 
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep_key_arr_t * ka)1215 static bool wsrep_prepare_keys_for_isolation(THD*              thd,
1216                                              const char*       db,
1217                                              const char*       table,
1218                                              const TABLE_LIST* table_list,
1219                                              Alter_info*       alter_info,
1220                                              wsrep_key_arr_t*  ka)
1221 {
1222   ka->keys= 0;
1223   ka->keys_len= 0;
1224 
1225   if (db || table)
1226   {
1227     if (!wsrep_prepare_key_for_isolation(db, table, ka))
1228       goto err;
1229   }
1230 
1231   for (const TABLE_LIST* table= table_list; table; table= table->next_global)
1232   {
1233     if (!wsrep_prepare_key_for_isolation(table->db.str, table->table_name.str, ka))
1234       goto err;
1235   }
1236 
1237   if (alter_info && (alter_info->flags & (ALTER_ADD_FOREIGN_KEY)))
1238   {
1239     if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info, ka))
1240       goto err;
1241   }
1242 
1243   return false;
1244 
1245 err:
1246   wsrep_keys_free(ka);
1247   return true;
1248 }
1249 
1250 
1251 /* Prepare key list from db/table and table_list */
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,wsrep_key_arr_t * ka)1252 bool wsrep_prepare_keys_for_isolation(THD*              thd,
1253                                       const char*       db,
1254                                       const char*       table,
1255                                       const TABLE_LIST* table_list,
1256                                       wsrep_key_arr_t*  ka)
1257 {
1258   return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka);
1259 }
1260 
1261 
wsrep_prepare_key(const uchar * cache_key,size_t cache_key_len,const uchar * row_id,size_t row_id_len,wsrep_buf_t * key,size_t * key_len)1262 bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len,
1263                        const uchar* row_id, size_t row_id_len,
1264                        wsrep_buf_t* key, size_t* key_len)
1265 {
1266     if (*key_len < 3) return false;
1267 
1268     *key_len= 0;
1269     switch (wsrep_protocol_version)
1270     {
1271     case 0:
1272     {
1273         key[0].ptr = cache_key;
1274         key[0].len = cache_key_len;
1275 
1276         *key_len = 1;
1277         break;
1278     }
1279     case 1:
1280     case 2:
1281     case 3:
1282     {
1283         key[0].ptr = cache_key;
1284         key[0].len = strlen( (char*)cache_key );
1285 
1286         key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
1287         key[1].len = strlen( (char*)(key[1].ptr) );
1288 
1289         *key_len = 2;
1290         break;
1291     }
1292     default:
1293         return false;
1294     }
1295 
1296     key[*key_len].ptr = row_id;
1297     key[*key_len].len = row_id_len;
1298     ++(*key_len);
1299 
1300     return true;
1301 }
1302 
1303 
1304 /*
1305  * Construct Query_log_Event from thd query and serialize it
1306  * into buffer.
1307  *
1308  * Return 0 in case of success, 1 in case of error.
1309  */
wsrep_to_buf_helper(THD * thd,const char * query,uint query_len,uchar ** buf,size_t * buf_len)1310 int wsrep_to_buf_helper(
1311     THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
1312 {
1313   IO_CACHE tmp_io_cache;
1314   Log_event_writer writer(&tmp_io_cache,0);
1315   if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
1316                        65536, MYF(MY_WME)))
1317     return 1;
1318   int ret(0);
1319   enum enum_binlog_checksum_alg current_binlog_check_alg=
1320     (enum_binlog_checksum_alg) binlog_checksum_options;
1321 
1322   Format_description_log_event *tmp_fd= new Format_description_log_event(4);
1323   tmp_fd->checksum_alg= current_binlog_check_alg;
1324   writer.write(tmp_fd);
1325   delete tmp_fd;
1326 
1327 #ifdef GTID_SUPPORT
1328   if (thd->variables.gtid_next.type == GTID_GROUP)
1329   {
1330       Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
1331       if (!gtid_ev.is_valid()) ret= 0;
1332       if (!ret && writer.write(&gtid_ev)) ret= 1;
1333   }
1334 #endif /* GTID_SUPPORT */
1335   if (wsrep_gtid_mode && thd->variables.gtid_seq_no)
1336   {
1337     Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no,
1338                           thd->variables.gtid_domain_id,
1339                           true, LOG_EVENT_SUPPRESS_USE_F,
1340                           true, 0);
1341     gtid_event.server_id= thd->variables.server_id;
1342     if (!gtid_event.is_valid()) ret= 0;
1343     ret= writer.write(&gtid_event);
1344   }
1345 
1346   /* if there is prepare query, add event for it */
1347   if (!ret && thd->wsrep_TOI_pre_query)
1348   {
1349     Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
1350 		       thd->wsrep_TOI_pre_query_len,
1351 		       FALSE, FALSE, FALSE, 0);
1352     ev.checksum_alg= current_binlog_check_alg;
1353     if (writer.write(&ev)) ret= 1;
1354   }
1355 
1356   /* continue to append the actual query */
1357   Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
1358   ev.checksum_alg= current_binlog_check_alg;
1359   if (!ret && writer.write(&ev)) ret= 1;
1360   if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
1361   close_cached_file(&tmp_io_cache);
1362   return ret;
1363 }
1364 
1365 static int
wsrep_alter_query_string(THD * thd,String * buf)1366 wsrep_alter_query_string(THD *thd, String *buf)
1367 {
1368   /* Append the "ALTER" part of the query */
1369   if (buf->append(STRING_WITH_LEN("ALTER ")))
1370     return 1;
1371   /* Append definer */
1372   append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host));
1373   /* Append the left part of thd->query after event name part */
1374   if (buf->append(thd->lex->stmt_definition_begin,
1375                   thd->lex->stmt_definition_end -
1376                   thd->lex->stmt_definition_begin))
1377     return 1;
1378 
1379   return 0;
1380 }
1381 
wsrep_alter_event_query(THD * thd,uchar ** buf,size_t * buf_len)1382 static int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len)
1383 {
1384   String log_query;
1385 
1386   if (wsrep_alter_query_string(thd, &log_query))
1387   {
1388     WSREP_WARN("events alter string failed: schema: %s, query: %s",
1389                thd->get_db(), thd->query());
1390     return 1;
1391   }
1392   return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
1393 }
1394 
1395 #include "sql_show.h"
1396 static int
create_view_query(THD * thd,uchar ** buf,size_t * buf_len)1397 create_view_query(THD *thd, uchar** buf, size_t* buf_len)
1398 {
1399     LEX *lex= thd->lex;
1400     SELECT_LEX *select_lex= &lex->select_lex;
1401     TABLE_LIST *first_table= select_lex->table_list.first;
1402     TABLE_LIST *views = first_table;
1403     LEX_USER *definer;
1404     String buff;
1405     const LEX_CSTRING command[3]=
1406       {{ STRING_WITH_LEN("CREATE ") },
1407        { STRING_WITH_LEN("ALTER ") },
1408        { STRING_WITH_LEN("CREATE OR REPLACE ") }};
1409 
1410     buff.append(&command[thd->lex->create_view->mode]);
1411 
1412     if (lex->definer)
1413       definer= get_current_user(thd, lex->definer);
1414     else
1415     {
1416       /*
1417         DEFINER-clause is missing; we have to create default definer in
1418         persistent arena to be PS/SP friendly.
1419         If this is an ALTER VIEW then the current user should be set as
1420         the definer.
1421       */
1422       definer= create_default_definer(thd, false);
1423     }
1424 
1425     if (definer)
1426     {
1427       views->definer.user = definer->user;
1428       views->definer.host = definer->host;
1429     } else {
1430       WSREP_ERROR("Failed to get DEFINER for VIEW.");
1431       return 1;
1432     }
1433 
1434     views->algorithm    = lex->create_view->algorithm;
1435     views->view_suid    = lex->create_view->suid;
1436     views->with_check   = lex->create_view->check;
1437 
1438     view_store_options(thd, views, &buff);
1439     buff.append(STRING_WITH_LEN("VIEW "));
1440     /* Test if user supplied a db (ie: we did not use thd->db) */
1441     if (views->db.str && views->db.str[0] &&
1442         (thd->db.str == NULL || cmp(&views->db, &thd->db)))
1443     {
1444       append_identifier(thd, &buff, &views->db);
1445       buff.append('.');
1446     }
1447     append_identifier(thd, &buff, &views->table_name);
1448     if (lex->view_list.elements)
1449     {
1450       List_iterator_fast<LEX_CSTRING> names(lex->view_list);
1451       LEX_CSTRING *name;
1452       int i;
1453 
1454       for (i= 0; (name= names++); i++)
1455       {
1456         buff.append(i ? ", " : "(");
1457         append_identifier(thd, &buff, name);
1458       }
1459       buff.append(')');
1460     }
1461     buff.append(STRING_WITH_LEN(" AS "));
1462     //buff.append(views->source.str, views->source.length);
1463     buff.append(thd->lex->create_view->select.str,
1464                 thd->lex->create_view->select.length);
1465     //int errcode= query_error_code(thd, TRUE);
1466     //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
1467     //                      buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
1468     return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1469 }
1470 
1471 /*
1472   Rewrite DROP TABLE for TOI. Temporary tables are eliminated from
1473   the query as they are visible only to client connection.
1474 
1475   TODO: See comments for sql_base.cc:drop_temporary_table() and refine
1476   the function to deal with transactional locked tables.
1477  */
wsrep_drop_table_query(THD * thd,uchar ** buf,size_t * buf_len)1478 static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len)
1479 {
1480 
1481   LEX* lex= thd->lex;
1482   SELECT_LEX* select_lex= &lex->select_lex;
1483   TABLE_LIST* first_table= select_lex->table_list.first;
1484   String buff;
1485 
1486   DBUG_ASSERT(!lex->create_info.tmp_table());
1487 
1488   bool found_temp_table= false;
1489   for (TABLE_LIST* table= first_table; table; table= table->next_global)
1490   {
1491     if (thd->find_temporary_table(table->db.str, table->table_name.str))
1492     {
1493       found_temp_table= true;
1494       break;
1495     }
1496   }
1497 
1498   if (found_temp_table)
1499   {
1500     buff.append("DROP TABLE ");
1501     if (lex->check_exists)
1502       buff.append("IF EXISTS ");
1503 
1504     for (TABLE_LIST* table= first_table; table; table= table->next_global)
1505     {
1506       if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1507       {
1508         append_identifier(thd, &buff, table->db.str, table->db.length);
1509         buff.append(".");
1510         append_identifier(thd, &buff,
1511                           table->table_name.str, table->table_name.length);
1512         buff.append(",");
1513       }
1514     }
1515 
1516     /* Chop the last comma */
1517     buff.chop();
1518     buff.append(" /* generated by wsrep */");
1519 
1520     WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr());
1521 
1522     return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1523   }
1524   else
1525   {
1526     return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1527                                buf, buf_len);
1528   }
1529 }
1530 
1531 
1532 /* Forward declarations. */
1533 static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len);
1534 static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
1535 
1536 /*
1537   Decide if statement should run in TOI.
1538 
1539   Look if table or table_list contain temporary tables. If the
1540   statement affects only temporary tables,   statement should not run
1541   in TOI. If the table list contains mix of regular and temporary tables
1542   (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but
1543   should be rewritten at later time for replication to contain only
1544   non-temporary tables.
1545  */
wsrep_can_run_in_toi(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list)1546 static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
1547                                  const TABLE_LIST *table_list)
1548 {
1549   DBUG_ASSERT(!table || db);
1550   DBUG_ASSERT(table_list || db);
1551 
1552   LEX* lex= thd->lex;
1553   SELECT_LEX* select_lex= &lex->select_lex;
1554   TABLE_LIST* first_table= select_lex->table_list.first;
1555 
1556   switch (lex->sql_command)
1557   {
1558   case SQLCOM_CREATE_TABLE:
1559     DBUG_ASSERT(!table_list);
1560     if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)
1561     {
1562       return false;
1563     }
1564     /*
1565       If mariadb master has replicated a CTAS, we should not replicate the create table
1566       part separately as TOI, but to replicate both create table and following inserts
1567       as one write set.
1568       Howver, if CTAS creates empty table, we should replicate the create table alone
1569       as TOI. We have to do relay log event lookup to see if row events follow the
1570       create table event.
1571     */
1572     if (thd->slave_thread && !(thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_STANDALONE))
1573     {
1574       /* this is CTAS, either empty or populated table */
1575       ulonglong event_size = 0;
1576       enum Log_event_type ev_type= wsrep_peak_event(thd->rgi_slave, &event_size);
1577       switch (ev_type)
1578       {
1579       case QUERY_EVENT:
1580         /* CTAS with empty table, we replicate create table as TOI */
1581         break;
1582 
1583       case TABLE_MAP_EVENT:
1584         WSREP_DEBUG("replicating CTAS of empty table as TOI");
1585         // fall through
1586       case WRITE_ROWS_EVENT:
1587         /* CTAS with populated table, we replicate later at commit time */
1588         WSREP_DEBUG("skipping create table of CTAS replication");
1589         return false;
1590 
1591       default:
1592         WSREP_WARN("unexpected async replication event: %d", ev_type);
1593       }
1594       return true;
1595     }
1596     /* no next async replication event */
1597     return true;
1598 
1599   case SQLCOM_CREATE_VIEW:
1600 
1601     DBUG_ASSERT(!table_list);
1602     DBUG_ASSERT(first_table); /* First table is view name */
1603     /*
1604       If any of the remaining tables refer to temporary table error
1605       is returned to client, so TOI can be skipped
1606     */
1607     for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global)
1608     {
1609       if (thd->find_temporary_table(it))
1610       {
1611         return false;
1612       }
1613     }
1614     return true;
1615 
1616   case SQLCOM_CREATE_TRIGGER:
1617 
1618     DBUG_ASSERT(first_table);
1619 
1620     if (thd->find_temporary_table(first_table))
1621     {
1622       return false;
1623     }
1624     return true;
1625 
1626   case SQLCOM_DROP_TRIGGER:
1627     DBUG_ASSERT(table_list);
1628     if (thd->find_temporary_table(table_list))
1629     {
1630       return false;
1631     }
1632     return true;
1633 
1634   default:
1635     if (table && !thd->find_temporary_table(db, table))
1636     {
1637       return true;
1638     }
1639 
1640     if (table_list)
1641     {
1642       for (TABLE_LIST* table= first_table; table; table= table->next_global)
1643       {
1644         if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1645         {
1646           return true;
1647         }
1648       }
1649     }
1650     return !(table || table_list);
1651   }
1652 }
1653 
1654 /*
1655   returns:
1656    0: statement was replicated as TOI
1657    1: TOI replication was skipped
1658   -1: TOI replication failed
1659  */
wsrep_TOI_begin(THD * thd,const char * db_,const char * table_,const TABLE_LIST * table_list,Alter_info * alter_info)1660 static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
1661                            const TABLE_LIST* table_list,
1662                            Alter_info* alter_info)
1663 {
1664   wsrep_status_t ret(WSREP_WARNING);
1665   uchar* buf(0);
1666   size_t buf_len(0);
1667   int buf_err;
1668   int rc= 0;
1669 
1670   if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
1671   {
1672     WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
1673     return 1;
1674   }
1675 
1676   WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1677               thd->wsrep_exec_mode, wsrep_thd_query(thd));
1678 
1679   switch (thd->lex->sql_command)
1680   {
1681   case SQLCOM_CREATE_VIEW:
1682     buf_err= create_view_query(thd, &buf, &buf_len);
1683     break;
1684   case SQLCOM_CREATE_PROCEDURE:
1685   case SQLCOM_CREATE_SPFUNCTION:
1686     buf_err= wsrep_create_sp(thd, &buf, &buf_len);
1687     break;
1688   case SQLCOM_CREATE_TRIGGER:
1689     buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
1690     break;
1691   case SQLCOM_CREATE_EVENT:
1692     buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
1693     break;
1694   case SQLCOM_ALTER_EVENT:
1695     buf_err= wsrep_alter_event_query(thd, &buf, &buf_len);
1696     break;
1697   case SQLCOM_DROP_TABLE:
1698     buf_err= wsrep_drop_table_query(thd, &buf, &buf_len);
1699     break;
1700   case SQLCOM_KILL:
1701     WSREP_DEBUG("KILL as TOI: %s", thd->query());
1702     buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1703                                  &buf, &buf_len);
1704     break;
1705   case SQLCOM_CREATE_ROLE:
1706     if (sp_process_definer(thd))
1707     {
1708       WSREP_WARN("Failed to set CREATE ROLE definer for TOI.");
1709     }
1710     /* fallthrough */
1711   default:
1712     buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1713                                  &buf, &buf_len);
1714     break;
1715   }
1716 
1717   wsrep_key_arr_t key_arr= {0, 0};
1718   struct wsrep_buf buff = { buf, buf_len };
1719   if (!buf_err                                                                  &&
1720       !wsrep_prepare_keys_for_isolation(thd, db_, table_,
1721                                         table_list, alter_info, &key_arr)       &&
1722       key_arr.keys_len > 0                                                      &&
1723       WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
1724 						 key_arr.keys, key_arr.keys_len,
1725 						 &buff, 1,
1726 						 &thd->wsrep_trx_meta)))
1727   {
1728     thd->wsrep_exec_mode= TOTAL_ORDER;
1729     wsrep_to_isolation++;
1730     wsrep_keys_free(&key_arr);
1731     WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
1732 		thd->wsrep_exec_mode);
1733   }
1734   else if (key_arr.keys_len > 0) {
1735     /* jump to error handler in mysql_execute_command() */
1736     WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep "
1737                "connection state and retry the query.",
1738                ret,
1739                thd->get_db(),
1740                (thd->query()) ? thd->query() : "void");
1741     my_message(ER_LOCK_DEADLOCK, "WSREP replication failed. Check "
1742                "your wsrep connection state and retry the query.", MYF(0));
1743     wsrep_keys_free(&key_arr);
1744     rc= -1;
1745   }
1746   else {
1747     /* non replicated DDL, affecting temporary tables only */
1748     WSREP_DEBUG("TO isolation skipped for: %d, sql: %s."
1749 		"Only temporary tables affected.",
1750 		ret, (thd->query()) ? thd->query() : "void");
1751     rc= 1;
1752   }
1753   if (buf) my_free(buf);
1754   return rc;
1755 }
1756 
wsrep_TOI_end(THD * thd)1757 static void wsrep_TOI_end(THD *thd) {
1758   wsrep_status_t ret;
1759   wsrep_to_isolation--;
1760 
1761   WSREP_DEBUG("TO END: %lld, %d: %s", (long long)wsrep_thd_trx_seqno(thd),
1762               thd->wsrep_exec_mode, wsrep_thd_query(thd));
1763 
1764   wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid,
1765                           thd->wsrep_trx_meta.gtid.seqno);
1766   WSREP_DEBUG("TO END: %lld, update seqno",
1767               (long long)wsrep_thd_trx_seqno(thd));
1768 
1769   if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
1770     WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
1771   }
1772   else {
1773     WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s",
1774                ret,
1775                thd->get_db(),
1776                (thd->query()) ? thd->query() : "void");
1777   }
1778 }
1779 
wsrep_RSU_begin(THD * thd,const char * db_,const char * table_)1780 static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_)
1781 {
1782   wsrep_status_t ret(WSREP_WARNING);
1783   WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1784                thd->wsrep_exec_mode, thd->query() );
1785 
1786   ret = wsrep->desync(wsrep);
1787   if (ret != WSREP_OK)
1788   {
1789     WSREP_WARN("RSU desync failed %d for schema: %s, query: %s",
1790                ret, thd->get_db(), thd->query());
1791     my_error(ER_LOCK_DEADLOCK, MYF(0));
1792     return(ret);
1793   }
1794 
1795   mysql_mutex_lock(&LOCK_wsrep_replaying);
1796   wsrep_replaying++;
1797   mysql_mutex_unlock(&LOCK_wsrep_replaying);
1798 
1799   if (wsrep_wait_committing_connections_close(5000))
1800   {
1801     /* no can do, bail out from DDL */
1802     WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s",
1803                thd->get_db(), thd->query());
1804     mysql_mutex_lock(&LOCK_wsrep_replaying);
1805     wsrep_replaying--;
1806     mysql_mutex_unlock(&LOCK_wsrep_replaying);
1807 
1808     ret = wsrep->resync(wsrep);
1809     if (ret != WSREP_OK)
1810     {
1811       WSREP_WARN("resync failed %d for schema: %s, query: %s",
1812                  ret, thd->get_db(), thd->query());
1813     }
1814 
1815     my_error(ER_LOCK_DEADLOCK, MYF(0));
1816     return(-1);
1817   }
1818 
1819   wsrep_seqno_t seqno = wsrep->pause(wsrep);
1820   if (seqno == WSREP_SEQNO_UNDEFINED)
1821   {
1822     WSREP_WARN("pause failed %lld for schema: %s, query: %s", (long long)seqno,
1823                thd->get_db(), thd->query());
1824     return(1);
1825   }
1826   WSREP_DEBUG("paused at %lld", (long long)seqno);
1827   thd->variables.wsrep_on = 0;
1828   return 0;
1829 }
1830 
wsrep_RSU_end(THD * thd)1831 static void wsrep_RSU_end(THD *thd)
1832 {
1833   wsrep_status_t ret(WSREP_WARNING);
1834   WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1835                thd->wsrep_exec_mode, thd->query() );
1836 
1837 
1838   mysql_mutex_lock(&LOCK_wsrep_replaying);
1839   wsrep_replaying--;
1840   mysql_mutex_unlock(&LOCK_wsrep_replaying);
1841 
1842   ret = wsrep->resume(wsrep);
1843   if (ret != WSREP_OK)
1844   {
1845     WSREP_WARN("resume failed %d for schema: %s, query: %s", ret,
1846                thd->get_db(), thd->query());
1847   }
1848 
1849   ret = wsrep->resync(wsrep);
1850   if (ret != WSREP_OK)
1851   {
1852     WSREP_WARN("resync failed %d for schema: %s, query: %s", ret,
1853                thd->get_db(), thd->query());
1854     return;
1855   }
1856 
1857   thd->variables.wsrep_on = 1;
1858 }
1859 
wsrep_to_isolation_begin(THD * thd,const char * db_,const char * table_,const TABLE_LIST * table_list,Alter_info * alter_info)1860 int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
1861                              const TABLE_LIST* table_list,
1862                              Alter_info* alter_info)
1863 {
1864   int ret= 0;
1865 
1866   /*
1867     No isolation for applier or replaying threads.
1868    */
1869   if (thd->wsrep_exec_mode == REPL_RECV)
1870     return 0;
1871 
1872   mysql_mutex_lock(&thd->LOCK_thd_data);
1873 
1874   if (thd->wsrep_conflict_state == MUST_ABORT)
1875   {
1876     WSREP_INFO("thread: %lld  schema: %s  query: %s has been aborted due to multi-master conflict",
1877                (longlong) thd->thread_id, thd->get_db(), thd->query());
1878     mysql_mutex_unlock(&thd->LOCK_thd_data);
1879     return WSREP_TRX_FAIL;
1880   }
1881   mysql_mutex_unlock(&thd->LOCK_thd_data);
1882 
1883   DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
1884   DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
1885 
1886   if (thd->global_read_lock.can_acquire_protection())
1887   {
1888     WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lld",
1889                 thd->query(), (longlong) thd->thread_id);
1890     return -1;
1891   }
1892 
1893   if (wsrep_debug && thd->mdl_context.has_locks())
1894   {
1895     WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lld",
1896                 thd->query(), (longlong) thd->thread_id);
1897   }
1898 
1899   /*
1900     It makes sense to set auto_increment_* to defaults in TOI operations.
1901     Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
1902     TOI statement and auto inc variables for wsrep replication is constructed
1903     there. Variables are reset back in THD::reset_for_next_command() before
1904     processing of next command.
1905    */
1906   if (wsrep_auto_increment_control)
1907   {
1908     thd->variables.auto_increment_offset = 1;
1909     thd->variables.auto_increment_increment = 1;
1910   }
1911 
1912   if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
1913   {
1914     switch (thd->variables.wsrep_OSU_method) {
1915     case WSREP_OSU_TOI:
1916       ret= wsrep_TOI_begin(thd, db_, table_, table_list, alter_info);
1917       break;
1918     case WSREP_OSU_RSU:
1919       ret= wsrep_RSU_begin(thd, db_, table_);
1920       break;
1921     default:
1922       WSREP_ERROR("Unsupported OSU method: %lu",
1923                   thd->variables.wsrep_OSU_method);
1924       ret= -1;
1925       break;
1926     }
1927     switch (ret) {
1928     case 0:  thd->wsrep_exec_mode= TOTAL_ORDER; break;
1929     case 1:
1930       /* TOI replication skipped, treat as success */
1931       ret = 0;
1932       break;
1933     case -1:
1934       /* TOI replication failed, treat as error */
1935       break;
1936     }
1937   }
1938   return ret;
1939 }
1940 
wsrep_to_isolation_end(THD * thd)1941 void wsrep_to_isolation_end(THD *thd)
1942 {
1943   if (thd->wsrep_exec_mode == TOTAL_ORDER)
1944   {
1945     switch(thd->variables.wsrep_OSU_method)
1946     {
1947     case WSREP_OSU_TOI: wsrep_TOI_end(thd); break;
1948     case WSREP_OSU_RSU: wsrep_RSU_end(thd); break;
1949     default:
1950       WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu",
1951                  thd->variables.wsrep_OSU_method);
1952       break;
1953     }
1954     wsrep_cleanup_transaction(thd);
1955   }
1956 }
1957 
1958 #define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra)             \
1959     WSREP_##severity(                                                          \
1960       "%s\n"                                                                   \
1961       "schema:  %.*s\n"                                                        \
1962       "request: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n"      \
1963       "granted: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)",       \
1964       msg, schema_len, schema,                                                 \
1965       (longlong) req->thread_id, (long long)wsrep_thd_trx_seqno(req),                     \
1966       req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
1967       req->get_command(), req->lex->sql_command, req->query(),                 \
1968       (longlong) gra->thread_id, (long long)wsrep_thd_trx_seqno(gra),                     \
1969       gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
1970       gra->get_command(), gra->lex->sql_command, gra->query());
1971 
1972 /**
1973   Check if request for the metadata lock should be granted to the requester.
1974 
1975   @param  requestor_ctx        The MDL context of the requestor
1976   @param  ticket               MDL ticket for the requested lock
1977 
1978   @retval TRUE   Lock request can be granted
1979   @retval FALSE  Lock request cannot be granted
1980 */
1981 
wsrep_grant_mdl_exception(MDL_context * requestor_ctx,MDL_ticket * ticket,const MDL_key * key)1982 bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
1983                                MDL_ticket *ticket,
1984                                const MDL_key *key)
1985 {
1986   /* Fallback to the non-wsrep behaviour */
1987   if (!WSREP_ON) return FALSE;
1988 
1989   THD *request_thd= requestor_ctx->get_thd();
1990   THD *granted_thd= ticket->get_ctx()->get_thd();
1991   bool ret= false;
1992 
1993   const char* schema= key->db_name();
1994   int schema_len= key->db_name_length();
1995 
1996   mysql_mutex_lock(&request_thd->LOCK_thd_data);
1997 
1998   /*
1999     We consider granting MDL exceptions only for appliers (BF THD) and ones
2000     executing under TOI mode.
2001 
2002     Rules:
2003     1. If granted/owner THD is also an applier (BF THD) or one executing
2004        under TOI mode, then we grant the requested lock to the requester
2005        THD.
2006        @return true
2007 
2008     2. If granted/owner THD is executing a FLUSH command or already has an
2009        explicit lock, then do not grant the requested lock to the requester
2010        THD and it has to wait.
2011        @return false
2012 
2013     3. In all other cases the granted/owner THD is aborted and the requested
2014        lock is not granted to the requester THD, thus it has to wait.
2015        @return false
2016   */
2017   if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
2018       request_thd->wsrep_exec_mode == REPL_RECV)
2019   {
2020     mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2021     WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
2022                   request_thd, granted_thd);
2023     ticket->wsrep_report(wsrep_debug);
2024 
2025     wsrep_thd_LOCK(granted_thd);
2026     if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
2027         granted_thd->wsrep_exec_mode == REPL_RECV)
2028     {
2029       WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
2030                     request_thd, granted_thd);
2031       ticket->wsrep_report(true);
2032       wsrep_thd_UNLOCK(granted_thd);
2033       ret= true;
2034     }
2035     else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
2036              granted_thd->mdl_context.has_explicit_locks())
2037     {
2038       WSREP_DEBUG("BF thread waiting for FLUSH");
2039       ticket->wsrep_report(wsrep_debug);
2040       wsrep_thd_UNLOCK(granted_thd);
2041       ret= false;
2042     }
2043     else
2044     {
2045       /* Print some debug information. */
2046       if (wsrep_debug)
2047       {
2048         if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE ||
2049             request_thd->lex->sql_command == SQLCOM_DROP_SEQUENCE)
2050         {
2051           WSREP_DEBUG("DROP caused BF abort, conf %d", granted_thd->wsrep_conflict_state);
2052         }
2053         else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
2054         {
2055           WSREP_DEBUG("MDL granted, but committing thd abort scheduled");
2056         }
2057         else
2058         {
2059           WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
2060                         request_thd, granted_thd);
2061         }
2062         ticket->wsrep_report(true);
2063       }
2064 
2065       /* This will call wsrep_abort_transaction so we should hold
2066       THD::LOCK_thd_data to protect victim from concurrent usage
2067       or disconnect or delete. */
2068       wsrep_abort_thd((void *) request_thd, (void *) granted_thd, true);
2069       ret= false;
2070     }
2071   }
2072   else
2073   {
2074     mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2075   }
2076 
2077   return ret;
2078 }
2079 
2080 
start_wsrep_THD(void * arg)2081 pthread_handler_t start_wsrep_THD(void *arg)
2082 {
2083   THD *thd;
2084   wsrep_thread_args* args= (wsrep_thread_args*)arg;
2085   wsrep_thd_processor_fun processor= args->processor;
2086 
2087   if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
2088   {
2089     goto error;
2090   }
2091 
2092   mysql_mutex_lock(&LOCK_thread_count);
2093 
2094   if (wsrep_gtid_mode)
2095   {
2096     /* Adjust domain_id. */
2097     thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
2098   }
2099 
2100   thd->real_id=pthread_self(); // Keep purify happy
2101   thread_created++;
2102   threads.append(thd);
2103 
2104   my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
2105 
2106   DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
2107   thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
2108   (void) mysql_mutex_unlock(&LOCK_thread_count);
2109 
2110   /* from bootstrap()... */
2111   thd->bootstrap=1;
2112   thd->max_client_packet_length= thd->net.max_packet;
2113   thd->security_ctx->master_access= ~(ulong)0;
2114 
2115   /* from handle_one_connection... */
2116   pthread_detach_this_thread();
2117 
2118   mysql_thread_set_psi_id(thd->thread_id);
2119   thd->thr_create_utime=  microsecond_interval_timer();
2120   if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
2121   {
2122     close_connection(thd, ER_OUT_OF_RESOURCES);
2123     statistic_increment(aborted_connects,&LOCK_status);
2124     MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2125     goto error;
2126   }
2127 
2128 // </5.1.17>
2129   /*
2130     handle_one_connection() is normally the only way a thread would
2131     start and would always be on the very high end of the stack ,
2132     therefore, the thread stack always starts at the address of the
2133     first local variable of handle_one_connection, which is thd. We
2134     need to know the start of the stack so that we could check for
2135     stack overruns.
2136   */
2137   DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld",
2138                        (long long)thd->thread_id));
2139   /* now that we've called my_thread_init(), it is safe to call DBUG_* */
2140 
2141   thd->thread_stack= (char*) &thd;
2142   if (thd->store_globals())
2143   {
2144     close_connection(thd, ER_OUT_OF_RESOURCES);
2145     statistic_increment(aborted_connects,&LOCK_status);
2146     MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2147     goto error;
2148   }
2149 
2150   thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
2151   thd->security_ctx->skip_grants();
2152 
2153   /* handle_one_connection() again... */
2154   //thd->version= refresh_version;
2155   thd->proc_info= 0;
2156   thd->set_command(COM_SLEEP);
2157   thd->init_for_queries();
2158 
2159   mysql_mutex_lock(&LOCK_thread_count);
2160   wsrep_running_threads++;
2161 
2162   switch (args->thread_type) {
2163     case WSREP_APPLIER_THREAD:
2164       wsrep_running_applier_threads++;
2165       break;
2166     case WSREP_ROLLBACKER_THREAD:
2167       wsrep_running_rollbacker_threads++;
2168       break;
2169     default:
2170       WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
2171       break;
2172   }
2173 
2174   mysql_cond_broadcast(&COND_thread_count);
2175   mysql_mutex_unlock(&LOCK_thread_count);
2176 
2177   processor(thd);
2178 
2179   close_connection(thd, 0);
2180 
2181   mysql_mutex_lock(&LOCK_thread_count);
2182   DBUG_ASSERT(wsrep_running_threads > 0);
2183   wsrep_running_threads--;
2184 
2185   switch (args->thread_type) {
2186     case WSREP_APPLIER_THREAD:
2187       DBUG_ASSERT(wsrep_running_applier_threads > 0);
2188       wsrep_running_applier_threads--;
2189       break;
2190     case WSREP_ROLLBACKER_THREAD:
2191       DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
2192       wsrep_running_rollbacker_threads--;
2193       break;
2194     default:
2195       WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
2196       break;
2197   }
2198 
2199   my_free(args);
2200 
2201   WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
2202   mysql_cond_broadcast(&COND_thread_count);
2203   mysql_mutex_unlock(&LOCK_thread_count);
2204 
2205   // Note: We can't call THD destructor without crashing
2206   // if plugins have not been initialized. However, in most of the
2207   // cases this means that pre SE initialization SST failed and
2208   // we are going to exit anyway.
2209   if (plugins_are_initialized)
2210   {
2211     net_end(&thd->net);
2212     MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
2213   }
2214   else
2215   {
2216     // TODO: lightweight cleanup to get rid of:
2217     // 'Error in my_thread_global_end(): 2 threads didn't exit'
2218     // at server shutdown
2219   }
2220 
2221   unlink_not_visible_thd(thd);
2222   delete thd;
2223   my_thread_end();
2224   return(NULL);
2225 
2226 error:
2227   WSREP_ERROR("Failed to create/initialize system thread");
2228 
2229   my_free(args);
2230 
2231   /* Abort if its the first applier/rollbacker thread. */
2232   if (!mysqld_server_initialized)
2233     unireg_abort(1);
2234   else
2235     return NULL;
2236 }
2237 
2238 
2239 /**/
abort_replicated(THD * thd)2240 static bool abort_replicated(THD *thd)
2241 {
2242   bool ret_code= false;
2243   wsrep_thd_LOCK(thd);
2244   if (thd->wsrep_query_state== QUERY_COMMITTING)
2245   {
2246     WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
2247 
2248     (void)wsrep_abort_thd(thd, thd, TRUE);
2249     ret_code= true;
2250   }
2251   else
2252     wsrep_thd_UNLOCK(thd);
2253   return ret_code;
2254 }
2255 
2256 
2257 /**/
is_client_connection(THD * thd)2258 static inline bool is_client_connection(THD *thd)
2259 {
2260   return (thd->wsrep_client_thread && thd->variables.wsrep_on);
2261 }
2262 
2263 
is_replaying_connection(THD * thd)2264 static inline bool is_replaying_connection(THD *thd)
2265 {
2266   bool ret;
2267 
2268   mysql_mutex_lock(&thd->LOCK_thd_data);
2269   ret=  (thd->wsrep_conflict_state == REPLAYING) ? true : false;
2270   mysql_mutex_unlock(&thd->LOCK_thd_data);
2271 
2272   return ret;
2273 }
2274 
2275 
is_committing_connection(THD * thd)2276 static inline bool is_committing_connection(THD *thd)
2277 {
2278   bool ret;
2279 
2280   mysql_mutex_lock(&thd->LOCK_thd_data);
2281   ret=  (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
2282   mysql_mutex_unlock(&thd->LOCK_thd_data);
2283 
2284   return ret;
2285 }
2286 
2287 
have_client_connections()2288 static bool have_client_connections()
2289 {
2290   THD *tmp;
2291 
2292   I_List_iterator<THD> it(threads);
2293   while ((tmp=it++))
2294   {
2295     DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2296                        (longlong) tmp->thread_id));
2297     if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
2298     {
2299       WSREP_DEBUG("Informing thread %lld that it's time to die",
2300                   (longlong)tmp->thread_id);
2301       (void)abort_replicated(tmp);
2302       return true;
2303     }
2304   }
2305   return false;
2306 }
2307 
wsrep_close_thread(THD * thd)2308 static void wsrep_close_thread(THD *thd)
2309 {
2310   thd->set_killed(KILL_CONNECTION);
2311   MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
2312   if (thd->mysys_var)
2313   {
2314     thd->mysys_var->abort=1;
2315     mysql_mutex_lock(&thd->mysys_var->mutex);
2316     if (thd->mysys_var->current_cond)
2317     {
2318       mysql_mutex_lock(thd->mysys_var->current_mutex);
2319       mysql_cond_broadcast(thd->mysys_var->current_cond);
2320       mysql_mutex_unlock(thd->mysys_var->current_mutex);
2321     }
2322     mysql_mutex_unlock(&thd->mysys_var->mutex);
2323   }
2324 }
2325 
2326 
have_committing_connections()2327 static my_bool have_committing_connections()
2328 {
2329   THD *tmp;
2330   mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2331 
2332   I_List_iterator<THD> it(threads);
2333   while ((tmp=it++))
2334   {
2335     if (!is_client_connection(tmp))
2336       continue;
2337 
2338     if (is_committing_connection(tmp))
2339     {
2340       mysql_mutex_unlock(&LOCK_thread_count);
2341       return TRUE;
2342     }
2343   }
2344   mysql_mutex_unlock(&LOCK_thread_count);
2345   return FALSE;
2346 }
2347 
2348 
wsrep_wait_committing_connections_close(int wait_time)2349 int wsrep_wait_committing_connections_close(int wait_time)
2350 {
2351   int sleep_time= 100;
2352 
2353   while (have_committing_connections() && wait_time > 0)
2354   {
2355     WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
2356     my_sleep(sleep_time);
2357     wait_time -= sleep_time;
2358   }
2359   if (have_committing_connections())
2360   {
2361     return 1;
2362   }
2363   return 0;
2364 }
2365 
2366 
wsrep_close_client_connections(my_bool wait_to_end,THD * except_caller_thd)2367 void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
2368 {
2369   /*
2370     First signal all threads that it's time to die
2371   */
2372 
2373   THD *tmp;
2374   mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2375 
2376   bool kill_cached_threads_saved= kill_cached_threads;
2377   kill_cached_threads= true; // prevent future threads caching
2378   mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
2379 
2380   I_List_iterator<THD> it(threads);
2381   while ((tmp=it++))
2382   {
2383     DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2384                        (longlong) tmp->thread_id));
2385     WSREP_DEBUG("Informing thread %lld that it's time to die",
2386                 (longlong)tmp->thread_id);
2387     /* We skip slave threads & scheduler on this first loop through. */
2388     if (!is_client_connection(tmp))
2389       continue;
2390 
2391     if (tmp == except_caller_thd)
2392     {
2393       DBUG_ASSERT(is_client_connection(tmp));
2394       continue;
2395     }
2396 
2397     if (is_replaying_connection(tmp))
2398     {
2399       tmp->set_killed(KILL_CONNECTION);
2400       continue;
2401     }
2402 
2403     /* replicated transactions must be skipped and aborted
2404     with wsrep_abort_thd. */
2405     if (abort_replicated(tmp))
2406       continue;
2407 
2408     WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id);
2409 
2410     /*
2411       instead of wsrep_close_thread() we do now  soft kill by
2412       THD::awake(). Here also victim needs to be protected from
2413       concurrent usage or disconnect or delete.
2414     */
2415     tmp->awake(KILL_CONNECTION);
2416   }
2417   mysql_mutex_unlock(&LOCK_thread_count);
2418 
2419   if (thread_count)
2420     sleep(2);                               // Give threads time to die
2421 
2422   mysql_mutex_lock(&LOCK_thread_count);
2423   /*
2424     Force remaining threads to die by closing the connection to the client
2425   */
2426 
2427   I_List_iterator<THD> it2(threads);
2428   while ((tmp=it2++))
2429   {
2430     if (is_client_connection(tmp) &&
2431         !abort_replicated(tmp)    &&
2432         !is_replaying_connection(tmp) &&
2433         tmp != except_caller_thd)
2434     {
2435       WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id);
2436       close_connection(tmp,0);
2437     }
2438   }
2439 
2440   DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
2441   WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
2442 
2443   while (wait_to_end && have_client_connections())
2444   {
2445     mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
2446     DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
2447   }
2448 
2449   kill_cached_threads= kill_cached_threads_saved;
2450 
2451   mysql_mutex_unlock(&LOCK_thread_count);
2452 
2453   /* All client connection threads have now been aborted */
2454 }
2455 
2456 
wsrep_close_applier(THD * thd)2457 void wsrep_close_applier(THD *thd)
2458 {
2459   WSREP_DEBUG("closing applier %lld", (longlong) thd->thread_id);
2460   wsrep_close_thread(thd);
2461 }
2462 
2463 
wsrep_close_threads(THD * thd)2464 void wsrep_close_threads(THD *thd)
2465 {
2466   THD *tmp;
2467   mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2468 
2469   I_List_iterator<THD> it(threads);
2470   while ((tmp=it++))
2471   {
2472     DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2473                        (longlong) tmp->thread_id));
2474     /* We skip slave threads & scheduler on this first loop through. */
2475     if (tmp->wsrep_applier && tmp != thd)
2476     {
2477       WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
2478       wsrep_close_thread (tmp);
2479     }
2480   }
2481 
2482   mysql_mutex_unlock(&LOCK_thread_count);
2483 }
2484 
wsrep_wait_appliers_close(THD * thd)2485 void wsrep_wait_appliers_close(THD *thd)
2486 {
2487   /* Wait for wsrep appliers to gracefully exit */
2488   mysql_mutex_lock(&LOCK_thread_count);
2489   while (wsrep_running_threads > 1)
2490   // 1 is for rollbacker thread which needs to be killed explicitly.
2491   // This gotta be fixed in a more elegant manner if we gonna have arbitrary
2492   // number of non-applier wsrep threads.
2493   {
2494     if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
2495     {
2496       mysql_mutex_unlock(&LOCK_thread_count);
2497       my_sleep(100);
2498       mysql_mutex_lock(&LOCK_thread_count);
2499     }
2500     else
2501       mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
2502     DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
2503   }
2504   mysql_mutex_unlock(&LOCK_thread_count);
2505   /* Now kill remaining wsrep threads: rollbacker */
2506   wsrep_close_threads (thd);
2507   /* and wait for them to die */
2508   mysql_mutex_lock(&LOCK_thread_count);
2509   while (wsrep_running_threads > 0)
2510   {
2511    if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
2512     {
2513       mysql_mutex_unlock(&LOCK_thread_count);
2514       my_sleep(100);
2515       mysql_mutex_lock(&LOCK_thread_count);
2516     }
2517     else
2518       mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
2519     DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
2520   }
2521   mysql_mutex_unlock(&LOCK_thread_count);
2522 
2523   /* All wsrep applier threads have now been aborted. However, if this thread
2524      is also applier, we are still running...
2525   */
2526 }
2527 
2528 
wsrep_kill_mysql(THD * thd)2529 void wsrep_kill_mysql(THD *thd)
2530 {
2531   if (mysqld_server_started)
2532   {
2533     if (!shutdown_in_progress)
2534     {
2535       WSREP_INFO("starting shutdown");
2536       kill_mysql();
2537     }
2538   }
2539   else
2540   {
2541     unireg_abort(1);
2542   }
2543 }
2544 
2545 
wsrep_create_sp(THD * thd,uchar ** buf,size_t * buf_len)2546 static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
2547 {
2548   String log_query;
2549   sp_head *sp = thd->lex->sphead;
2550   sql_mode_t saved_mode= thd->variables.sql_mode;
2551   String retstr(64);
2552   LEX_CSTRING returns= empty_clex_str;
2553   retstr.set_charset(system_charset_info);
2554 
2555   log_query.set_charset(system_charset_info);
2556 
2557   if (sp->m_handler->type() == TYPE_ENUM_FUNCTION)
2558   {
2559     sp_returns_type(thd, retstr, sp);
2560     returns= retstr.lex_cstring();
2561   }
2562   if (sp->m_handler->
2563       show_create_sp(thd, &log_query,
2564                      sp->m_explicit_name ? sp->m_db : null_clex_str,
2565                      sp->m_name, sp->m_params, returns,
2566                      sp->m_body, sp->chistics(),
2567                      thd->lex->definer[0],
2568                      thd->lex->create_info,
2569                      saved_mode))
2570   {
2571     WSREP_WARN("SP create string failed: schema: %s, query: %s",
2572                thd->get_db(), thd->query());
2573     return 1;
2574   }
2575 
2576   return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
2577 }
2578 
2579 
wsrep_on(THD * thd)2580 extern int wsrep_on(THD *thd)
2581 {
2582   return (int)(WSREP(thd));
2583 }
2584 
2585 
wsrep_thd_is_wsrep_on(THD * thd)2586 extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
2587 {
2588   return thd->variables.wsrep_on;
2589 }
2590 
2591 
wsrep_consistency_check(THD * thd)2592 bool wsrep_consistency_check(THD *thd)
2593 {
2594   return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
2595 }
2596 
2597 
wsrep_thd_set_exec_mode(THD * thd,enum wsrep_exec_mode mode)2598 extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
2599 {
2600   thd->wsrep_exec_mode= mode;
2601 }
2602 
2603 
wsrep_thd_set_query_state(THD * thd,enum wsrep_query_state state)2604 extern "C" void wsrep_thd_set_query_state(
2605 	THD *thd, enum wsrep_query_state state)
2606 {
2607   /* async slave thread should never flag IDLE state, as it may
2608      give rollbacker thread chance to interfere and rollback async slave
2609      transaction.
2610      in fact, async slave thread is never idle as it reads complete
2611      transactions from relay log and applies them, as a whole.
2612      BF abort happens voluntarily by async slave thread.
2613   */
2614   if (thd->slave_thread && state == QUERY_IDLE) {
2615     WSREP_DEBUG("Skipping IDLE state change for slave SQL");
2616     return;
2617   }
2618   thd->wsrep_query_state= state;
2619 }
2620 
2621 
wsrep_thd_set_conflict_state(THD * thd,enum wsrep_conflict_state state)2622 void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state)
2623 {
2624   mysql_mutex_assert_owner(&thd->LOCK_thd_data);
2625   thd->wsrep_conflict_state= state;
2626 }
2627 
2628 
wsrep_thd_exec_mode(THD * thd)2629 enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
2630 {
2631   return thd->wsrep_exec_mode;
2632 }
2633 
2634 
wsrep_thd_exec_mode_str(THD * thd)2635 const char *wsrep_thd_exec_mode_str(THD *thd)
2636 {
2637   return
2638     (!thd) ? "void" :
2639     (thd->wsrep_exec_mode == LOCAL_STATE)  ? "local"         :
2640     (thd->wsrep_exec_mode == REPL_RECV)    ? "applier"       :
2641     (thd->wsrep_exec_mode == TOTAL_ORDER)  ? "total order"   :
2642     (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit"  : "void";
2643 }
2644 
2645 
wsrep_thd_query_state(THD * thd)2646 enum wsrep_query_state wsrep_thd_query_state(THD *thd)
2647 {
2648   return thd->wsrep_query_state;
2649 }
2650 
2651 
wsrep_thd_query_state_str(THD * thd)2652 const char *wsrep_thd_query_state_str(THD *thd)
2653 {
2654   return
2655     (!thd) ? "void" :
2656     (thd->wsrep_query_state == QUERY_IDLE)        ? "idle"          :
2657     (thd->wsrep_query_state == QUERY_EXEC)        ? "executing"     :
2658     (thd->wsrep_query_state == QUERY_COMMITTING)  ? "committing"    :
2659     (thd->wsrep_query_state == QUERY_EXITING)     ? "exiting"       :
2660     (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back"  : "void";
2661 }
2662 
2663 
wsrep_thd_get_conflict_state(THD * thd)2664 enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *thd)
2665 {
2666   return thd->wsrep_conflict_state;
2667 }
2668 
2669 
wsrep_thd_conflict_state_str(THD * thd)2670 const char *wsrep_thd_conflict_state_str(THD *thd)
2671 {
2672   return
2673     (!thd) ? "void" :
2674     (thd->wsrep_conflict_state == NO_CONFLICT)      ? "no conflict"  :
2675     (thd->wsrep_conflict_state == MUST_ABORT)       ? "must abort"   :
2676     (thd->wsrep_conflict_state == ABORTING)         ? "aborting"     :
2677     (thd->wsrep_conflict_state == MUST_REPLAY)      ? "must replay"  :
2678     (thd->wsrep_conflict_state == REPLAYING)        ? "replaying"    :
2679     (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying"     :
2680     (thd->wsrep_conflict_state == CERT_FAILURE)     ? "cert failure" : "void";
2681 }
2682 
2683 
wsrep_thd_ws_handle(THD * thd)2684 wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
2685 {
2686   return &thd->wsrep_ws_handle;
2687 }
2688 
2689 
wsrep_thd_LOCK(THD * thd)2690 void wsrep_thd_LOCK(THD *thd)
2691 {
2692   mysql_mutex_lock(&thd->LOCK_thd_data);
2693   mysql_mutex_lock(&thd->LOCK_thd_kill);
2694 }
2695 
2696 
wsrep_thd_UNLOCK(THD * thd)2697 void wsrep_thd_UNLOCK(THD *thd)
2698 {
2699   mysql_mutex_unlock(&thd->LOCK_thd_kill);
2700   mysql_mutex_unlock(&thd->LOCK_thd_data);
2701 }
2702 
2703 
wsrep_thd_kill_LOCK(THD * thd)2704 void wsrep_thd_kill_LOCK(THD *thd)
2705 {
2706   mysql_mutex_lock(&thd->LOCK_thd_kill);
2707 }
2708 
2709 
wsrep_thd_kill_UNLOCK(THD * thd)2710 void wsrep_thd_kill_UNLOCK(THD *thd)
2711 {
2712   mysql_mutex_unlock(&thd->LOCK_thd_kill);
2713 }
2714 
2715 
wsrep_thd_query_start(THD * thd)2716 extern "C" time_t wsrep_thd_query_start(THD *thd)
2717 {
2718   return thd->query_start();
2719 }
2720 
2721 
wsrep_thd_wsrep_rand(THD * thd)2722 extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
2723 {
2724   return thd->wsrep_rand;
2725 }
2726 
wsrep_thd_trx_seqno(THD * thd)2727 longlong wsrep_thd_trx_seqno(THD *thd)
2728 {
2729   return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED;
2730 }
2731 
2732 
wsrep_thd_query_id(THD * thd)2733 extern "C" query_id_t wsrep_thd_query_id(THD *thd)
2734 {
2735   return thd->query_id;
2736 }
2737 
2738 
wsrep_thd_query(THD * thd)2739 const char *wsrep_thd_query(THD *thd)
2740 {
2741   if (thd)
2742   {
2743     switch(thd->lex->sql_command)
2744     {
2745     case SQLCOM_CREATE_USER:
2746       return "CREATE USER";
2747     case SQLCOM_GRANT:
2748       return "GRANT";
2749     case SQLCOM_REVOKE:
2750       return "REVOKE";
2751     case SQLCOM_SET_OPTION:
2752       if (thd->lex->definer)
2753 	return "SET PASSWORD";
2754       /* fallthrough */
2755     default:
2756       if (thd->query())
2757         return thd->query();
2758     }
2759   }
2760   return "NULL";
2761 }
2762 
2763 
wsrep_thd_wsrep_last_query_id(THD * thd)2764 extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
2765 {
2766   return thd->wsrep_last_query_id;
2767 }
2768 
2769 
wsrep_thd_set_wsrep_last_query_id(THD * thd,query_id_t id)2770 extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
2771 {
2772   thd->wsrep_last_query_id= id;
2773 }
2774 
2775 
wsrep_thd_awake(THD * thd,my_bool signal)2776 extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
2777 {
2778   if (signal)
2779   {
2780     /* Here we should hold THD::LOCK_thd_data to
2781     protect from concurrent usage and
2782     THD::LOCK_thd_kill from disconnect or delete */
2783     mysql_mutex_assert_owner(&thd->LOCK_thd_data);
2784     mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
2785     thd->awake_no_mutex(KILL_QUERY);
2786   }
2787   else
2788   {
2789     mysql_mutex_lock(&LOCK_wsrep_replaying);
2790     mysql_cond_broadcast(&COND_wsrep_replaying);
2791     mysql_mutex_unlock(&LOCK_wsrep_replaying);
2792   }
2793 }
2794 
2795 
wsrep_thd_retry_counter(THD * thd)2796 int wsrep_thd_retry_counter(THD *thd)
2797 {
2798   return(thd->wsrep_retry_counter);
2799 }
2800 
2801 
wsrep_thd_ignore_table(THD * thd)2802 extern "C" bool wsrep_thd_ignore_table(THD *thd)
2803 {
2804   return thd->wsrep_ignore_table;
2805 }
2806 
2807 
2808 extern int
wsrep_trx_order_before(THD * thd1,THD * thd2)2809 wsrep_trx_order_before(THD *thd1, THD *thd2)
2810 {
2811   const longlong trx1_seqno= wsrep_thd_trx_seqno(thd1);
2812   const longlong trx2_seqno= wsrep_thd_trx_seqno(thd2);
2813   WSREP_DEBUG("BF conflict, order: %lld %lld\n",
2814               trx1_seqno, trx2_seqno);
2815 
2816   if (trx1_seqno == WSREP_SEQNO_UNDEFINED ||
2817       trx2_seqno == WSREP_SEQNO_UNDEFINED)
2818     return 1; /* trx is not yet replicated */
2819   else if (trx1_seqno < trx2_seqno)
2820     return 1;
2821 
2822   return 0;
2823 }
2824 
2825 
wsrep_trx_is_aborting(THD * thd_ptr)2826 int wsrep_trx_is_aborting(THD *thd_ptr)
2827 {
2828 	if (thd_ptr) {
2829 		if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
2830 		    (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
2831 		  return 1;
2832 		}
2833 	}
2834 	return 0;
2835 }
2836 
2837 
wsrep_copy_query(THD * thd)2838 void wsrep_copy_query(THD *thd)
2839 {
2840   thd->wsrep_retry_command   = thd->get_command();
2841   thd->wsrep_retry_query_len = thd->query_length();
2842   if (thd->wsrep_retry_query) {
2843 	  my_free(thd->wsrep_retry_query);
2844   }
2845   thd->wsrep_retry_query     = (char *)my_malloc(
2846                                  thd->wsrep_retry_query_len + 1, MYF(0));
2847   strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len);
2848   thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0';
2849 }
2850 
2851 
wsrep_is_show_query(enum enum_sql_command command)2852 bool wsrep_is_show_query(enum enum_sql_command command)
2853 {
2854   DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
2855   return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
2856 }
2857 
wsrep_create_like_table(THD * thd,TABLE_LIST * table,TABLE_LIST * src_table,HA_CREATE_INFO * create_info)2858 bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
2859                              TABLE_LIST* src_table,
2860                              HA_CREATE_INFO *create_info)
2861 {
2862   if (create_info->tmp_table())
2863   {
2864     /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */
2865     WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s",
2866                 thd->query());
2867   }
2868   else if (!(thd->find_temporary_table(src_table)))
2869   {
2870     /* this is straight CREATE TABLE LIKE... with no tmp tables */
2871     WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2872   }
2873   else
2874   {
2875     /* Non-MERGE tables ignore this call. */
2876     if (src_table->table->file->extra(HA_EXTRA_ADD_CHILDREN_LIST))
2877       return (true);
2878 
2879     char buf[2048];
2880     String query(buf, sizeof(buf), system_charset_info);
2881     query.length(0);  // Have to zero it since constructor doesn't
2882 
2883     int result __attribute__((unused))=
2884       show_create_table(thd, src_table, &query, NULL, WITH_DB_NAME);
2885     WSREP_DEBUG("TMP TABLE: %s ret_code %d", query.ptr(), result);
2886 
2887     thd->wsrep_TOI_pre_query=     query.ptr();
2888     thd->wsrep_TOI_pre_query_len= query.length();
2889 
2890     WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2891 
2892     thd->wsrep_TOI_pre_query=      NULL;
2893     thd->wsrep_TOI_pre_query_len= 0;
2894 
2895     /* Non-MERGE tables ignore this call. */
2896     src_table->table->file->extra(HA_EXTRA_DETACH_CHILDREN);
2897   }
2898 
2899   return(false);
2900 
2901 wsrep_error_label:
2902   thd->wsrep_TOI_pre_query= NULL;
2903   return (true);
2904 }
2905 
2906 
wsrep_create_trigger_query(THD * thd,uchar ** buf,size_t * buf_len)2907 static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
2908 {
2909   LEX *lex= thd->lex;
2910   String stmt_query;
2911 
2912   LEX_CSTRING definer_user;
2913   LEX_CSTRING definer_host;
2914 
2915   if (!lex->definer)
2916   {
2917     if (!thd->slave_thread)
2918     {
2919       if (!(lex->definer= create_default_definer(thd, false)))
2920         return 1;
2921     }
2922   }
2923 
2924   if (lex->definer)
2925   {
2926     /* SUID trigger. */
2927     LEX_USER *d= get_current_user(thd, lex->definer);
2928 
2929     if (!d)
2930       return 1;
2931 
2932     definer_user= d->user;
2933     definer_host= d->host;
2934   }
2935   else
2936   {
2937     /* non-SUID trigger. */
2938 
2939     definer_user.str= 0;
2940     definer_user.length= 0;
2941 
2942     definer_host.str= 0;
2943     definer_host.length= 0;
2944   }
2945 
2946   const LEX_CSTRING command[2]=
2947       {{ C_STRING_WITH_LEN("CREATE ") },
2948        { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
2949 
2950   if (thd->lex->create_info.or_replace())
2951     stmt_query.append(command[1]);
2952   else
2953     stmt_query.append(command[0]);
2954 
2955   append_definer(thd, &stmt_query, &definer_user, &definer_host);
2956 
2957   LEX_CSTRING stmt_definition;
2958   stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
2959   stmt_definition.length= thd->lex->stmt_definition_end
2960     - thd->lex->stmt_definition_begin;
2961   trim_whitespace(thd->charset(), &stmt_definition);
2962 
2963   stmt_query.append(stmt_definition.str, stmt_definition.length);
2964 
2965   return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
2966                              buf, buf_len);
2967 }
2968 
2969 /***** callbacks for wsrep service ************/
2970 
get_wsrep_debug()2971 my_bool get_wsrep_debug()
2972 {
2973   return wsrep_debug;
2974 }
2975 
get_wsrep_load_data_splitting()2976 my_bool get_wsrep_load_data_splitting()
2977 {
2978   return wsrep_load_data_splitting;
2979 }
2980 
get_wsrep_protocol_version()2981 long get_wsrep_protocol_version()
2982 {
2983   return wsrep_protocol_version;
2984 }
2985 
get_wsrep_drupal_282555_workaround()2986 my_bool get_wsrep_drupal_282555_workaround()
2987 {
2988   return wsrep_drupal_282555_workaround;
2989 }
2990 
get_wsrep_recovery()2991 my_bool get_wsrep_recovery()
2992 {
2993   return wsrep_recovery;
2994 }
2995 
get_wsrep_log_conflicts()2996 my_bool get_wsrep_log_conflicts()
2997 {
2998   return wsrep_log_conflicts;
2999 }
3000 
get_wsrep()3001 wsrep_t *get_wsrep()
3002 {
3003   return wsrep;
3004 }
3005 
get_wsrep_certify_nonPK()3006 my_bool get_wsrep_certify_nonPK()
3007 {
3008   return wsrep_certify_nonPK;
3009 }
3010 
wsrep_lock_rollback()3011 void wsrep_lock_rollback()
3012 {
3013   mysql_mutex_lock(&LOCK_wsrep_rollback);
3014 }
3015 
wsrep_unlock_rollback()3016 void wsrep_unlock_rollback()
3017 {
3018   mysql_cond_signal(&COND_wsrep_rollback);
3019   mysql_mutex_unlock(&LOCK_wsrep_rollback);
3020 }
3021 
wsrep_aborting_thd_contains(THD * thd)3022 my_bool wsrep_aborting_thd_contains(THD *thd)
3023 {
3024   mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
3025   wsrep_aborting_thd_t abortees = wsrep_aborting_thd;
3026   while (abortees)
3027   {
3028     if (abortees->aborting_thd == thd)
3029       return true;
3030     abortees = abortees->next;
3031   }
3032   return false;
3033 }
3034 
wsrep_aborting_thd_enqueue(THD * thd)3035 void wsrep_aborting_thd_enqueue(THD *thd)
3036 {
3037   mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
3038   wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
3039           my_malloc(sizeof(struct wsrep_aborting_thd), MYF(0));
3040   aborting->aborting_thd  = thd;
3041   aborting->next          = wsrep_aborting_thd;
3042   wsrep_aborting_thd      = aborting;
3043 }
3044 
wsrep_node_is_donor()3045 bool wsrep_node_is_donor()
3046 {
3047   return (WSREP_ON) ? (wsrep_config_state->get_status() == 2) : false;
3048 }
3049 
wsrep_node_is_synced()3050 bool wsrep_node_is_synced()
3051 {
3052   return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
3053 }
3054