1 /* Copyright 2008-2021 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.x1
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #include "sql_plugin.h"                         /* wsrep_plugins_pre_init() */
17 #include "my_global.h"
18 #include "wsrep_server_state.h"
19 
20 #include "mariadb.h"
21 #include <mysqld.h>
22 #include <transaction.h>
23 #include <sql_class.h>
24 #include <sql_parse.h>
25 #include <sql_base.h> /* find_temporary_table() */
26 #include "slave.h"
27 #include "rpl_mi.h"
28 #include "sql_repl.h"
29 #include "rpl_filter.h"
30 #include "sql_callback.h"
31 #include "sp_head.h"
32 #include "sql_show.h"
33 #include "sp.h"
34 #include "wsrep_priv.h"
35 #include "wsrep_thd.h"
36 #include "wsrep_sst.h"
37 #include "wsrep_utils.h"
38 #include "wsrep_var.h"
39 #include "wsrep_binlog.h"
40 #include "wsrep_applier.h"
41 #include "wsrep_schema.h"
42 #include "wsrep_xid.h"
43 #include "wsrep_trans_observer.h"
44 #include "mysql/service_wsrep.h"
45 #include <cstdio>
46 #include <cstdlib>
47 #include <string>
48 #include "log_event.h"
49 #include "sql_connect.h"
50 
51 #include <sstream>
52 
53 /* wsrep-lib */
54 Wsrep_server_state* Wsrep_server_state::m_instance;
55 
56 my_bool wsrep_emulate_bin_log  = FALSE; // activating parts of binlog interface
57 #ifdef GTID_SUPPORT
58 /* Sidno in global_sid_map corresponding to group uuid */
59 rpl_sidno wsrep_sidno= -1;
60 #endif /* GTID_SUPPORT */
61 my_bool wsrep_preordered_opt= FALSE;
62 
63 /* Streaming Replication */
64 const char *wsrep_fragment_units[]= { "bytes", "rows", "statements", NullS };
65 const char *wsrep_SR_store_types[]= { "none", "table", NullS };
66 
67 /*
68  * Begin configuration options
69  */
70 
71 extern my_bool plugins_are_initialized;
72 extern uint kill_cached_threads;
73 extern mysql_cond_t COND_thread_cache;
74 
75 /* System variables. */
76 const char *wsrep_provider;
77 const char *wsrep_provider_options;
78 const char *wsrep_cluster_address;
79 const char *wsrep_cluster_name;
80 const char *wsrep_node_name;
81 const char *wsrep_node_address;
82 const char *wsrep_node_incoming_address;
83 const char *wsrep_start_position;
84 const char *wsrep_data_home_dir;
85 const char *wsrep_dbug_option;
86 const char *wsrep_notify_cmd;
87 
88 ulong   wsrep_debug;                            // Debug level logging
89 my_bool wsrep_convert_LOCK_to_trx;              // Convert locking sessions to trx
90 my_bool wsrep_auto_increment_control;           // Control auto increment variables
91 my_bool wsrep_drupal_282555_workaround;         // Retry autoinc insert after dupkey
92 my_bool wsrep_certify_nonPK;                    // Certify, even when no primary key
93 ulong   wsrep_certification_rules      = WSREP_CERTIFICATION_RULES_STRICT;
94 my_bool wsrep_recovery;                         // Recovery
95 my_bool wsrep_replicate_myisam;                 // Enable MyISAM replication
96 my_bool wsrep_log_conflicts;
97 my_bool wsrep_load_data_splitting= 0;           // Commit load data every 10K intervals
98 my_bool wsrep_slave_UK_checks;                  // Slave thread does UK checks
99 my_bool wsrep_slave_FK_checks;                  // Slave thread does FK checks
100 my_bool wsrep_restart_slave;                    // Should mysql slave thread be
101                                                 // restarted, when node joins back?
102 my_bool wsrep_desync;                           // De(re)synchronize the node from the
103                                                 // cluster
104 long wsrep_slave_threads;                       // No. of slave appliers threads
105 ulong wsrep_retry_autocommit;                   // Retry aborted autocommit trx
106 ulong wsrep_max_ws_size;                        // Max allowed ws (RBR buffer) size
107 ulong wsrep_max_ws_rows;                        // Max number of rows in ws
108 ulong wsrep_forced_binlog_format= BINLOG_FORMAT_UNSPEC;
109 ulong wsrep_mysql_replication_bundle;
110 bool wsrep_gtid_mode;                           // Use wsrep_gtid_domain_id
111                                                 // for galera transactions?
112 uint32 wsrep_gtid_domain_id;                    // gtid_domain_id for galera
113                                                 // transactions
114 
115 /* Other configuration variables and their default values. */
116 my_bool wsrep_incremental_data_collection= 0;   // Incremental data collection
117 my_bool wsrep_restart_slave_activated= 0;       // Node has dropped, and slave
118                                                 // restart will be needed
119 bool wsrep_new_cluster= false;                  // Bootstrap the cluster?
120 int wsrep_slave_count_change= 0;                // No. of appliers to stop/start
121 int wsrep_to_isolation= 0;                      // No. of active TO isolation threads
122 long wsrep_max_protocol_version= 4;             // Maximum protocol version to use
123 long int  wsrep_protocol_version= wsrep_max_protocol_version;
124 ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES;
125                                                 // unit for fragment size
126 ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE;
127 uint  wsrep_ignore_apply_errors= 0;
128 
129 
130 /*
131  * End configuration options
132  */
133 
134 /*
135  * Cached variables
136  */
137 
138 // Whether the Galera write-set replication provider is set
139 // wsrep_provider && strcmp(wsrep_provider, WSREP_NONE)
140 bool WSREP_PROVIDER_EXISTS_;
141 
142 // Whether the Galera write-set replication is enabled
143 // global_system_variables.wsrep_on && WSREP_PROVIDER_EXISTS_
144 bool WSREP_ON_;
145 
146 /*
147  * Other wsrep global variables.
148  */
149 
150 mysql_mutex_t LOCK_wsrep_ready;
151 mysql_cond_t  COND_wsrep_ready;
152 mysql_mutex_t LOCK_wsrep_sst;
153 mysql_cond_t  COND_wsrep_sst;
154 mysql_mutex_t LOCK_wsrep_sst_init;
155 mysql_cond_t  COND_wsrep_sst_init;
156 mysql_mutex_t LOCK_wsrep_replaying;
157 mysql_cond_t  COND_wsrep_replaying;
158 mysql_mutex_t LOCK_wsrep_slave_threads;
159 mysql_cond_t  COND_wsrep_slave_threads;
160 mysql_mutex_t LOCK_wsrep_cluster_config;
161 mysql_mutex_t LOCK_wsrep_desync;
162 mysql_mutex_t LOCK_wsrep_config_state;
163 mysql_mutex_t LOCK_wsrep_group_commit;
164 mysql_mutex_t LOCK_wsrep_SR_pool;
165 mysql_mutex_t LOCK_wsrep_SR_store;
166 mysql_mutex_t LOCK_wsrep_joiner_monitor;
167 mysql_mutex_t LOCK_wsrep_donor_monitor;
168 mysql_cond_t  COND_wsrep_joiner_monitor;
169 mysql_cond_t  COND_wsrep_donor_monitor;
170 
171 int wsrep_replaying= 0;
172 ulong  wsrep_running_threads = 0; // # of currently running wsrep
173 				  // # threads
174 ulong  wsrep_running_applier_threads = 0; // # of running applier threads
175 ulong  wsrep_running_rollbacker_threads = 0; // # of running
176 					     // # rollbacker threads
177 ulong  my_bind_addr;
178 
179 #ifdef HAVE_PSI_INTERFACE
180 PSI_mutex_key
181   key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
182   key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
183   key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
184   key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config,
185   key_LOCK_wsrep_group_commit,
186   key_LOCK_wsrep_SR_pool,
187   key_LOCK_wsrep_SR_store,
188   key_LOCK_wsrep_thd_queue,
189   key_LOCK_wsrep_joiner_monitor,
190   key_LOCK_wsrep_donor_monitor;
191 
192 PSI_cond_key key_COND_wsrep_thd,
193   key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
194   key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
195   key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads,
196   key_COND_wsrep_joiner_monitor, key_COND_wsrep_donor_monitor;
197 
198 PSI_file_key key_file_wsrep_gra_log;
199 
200 static PSI_mutex_info wsrep_mutexes[]=
201 {
202   { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
203   { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
204   { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
205   { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
206   { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
207   { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
208   { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
209   { &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL},
210   { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
211   { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
212   { &key_LOCK_wsrep_group_commit, "LOCK_wsrep_group_commit", PSI_FLAG_GLOBAL},
213   { &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL},
214   { &key_LOCK_wsrep_SR_store, "LOCK_wsrep_SR_store", PSI_FLAG_GLOBAL},
215   { &key_LOCK_wsrep_joiner_monitor, "LOCK_wsrep_joiner_monitor", PSI_FLAG_GLOBAL},
216   { &key_LOCK_wsrep_donor_monitor, "LOCK_wsrep_donor_monitor", PSI_FLAG_GLOBAL}
217 };
218 
219 static PSI_cond_info wsrep_conds[]=
220 {
221   { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL},
222   { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
223   { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
224   { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
225   { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
226   { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
227   { &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL},
228   { &key_COND_wsrep_joiner_monitor, "COND_wsrep_joiner_monitor", PSI_FLAG_GLOBAL},
229   { &key_COND_wsrep_donor_monitor, "COND_wsrep_donor_monitor", PSI_FLAG_GLOBAL}
230 };
231 
232 static PSI_file_info wsrep_files[]=
233 {
234   { &key_file_wsrep_gra_log, "wsrep_gra_log", 0}
235 };
236 
237 PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
238   key_wsrep_rollbacker, key_wsrep_applier,
239   key_wsrep_sst_joiner_monitor, key_wsrep_sst_donor_monitor;
240 
241 static PSI_thread_info wsrep_threads[]=
242 {
243  {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
244  {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
245  {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
246  {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
247  {&key_wsrep_sst_joiner_monitor, "wsrep_sst_joiner_monitor", PSI_FLAG_GLOBAL},
248  {&key_wsrep_sst_donor_monitor, "wsrep_sst_donor_monitor", PSI_FLAG_GLOBAL}
249 };
250 
251 #endif /* HAVE_PSI_INTERFACE */
252 
253 my_bool wsrep_inited= 0; // initialized ?
254 
255 static wsrep_uuid_t node_uuid= WSREP_UUID_UNDEFINED;
256 static char         cluster_uuid_str[40]= { 0, };
257 
258 static char provider_name[256]= { 0, };
259 static char provider_version[256]= { 0, };
260 static char provider_vendor[256]= { 0, };
261 
262 /*
263  * Wsrep status variables. LOCK_status must be locked When modifying
264  * these variables,
265  */
266 my_bool     wsrep_connected         = FALSE;
267 my_bool     wsrep_ready             = FALSE;
268 const char* wsrep_cluster_state_uuid= cluster_uuid_str;
269 long long   wsrep_cluster_conf_id   = WSREP_SEQNO_UNDEFINED;
270 const char* wsrep_cluster_status    = "Disconnected";
271 long        wsrep_cluster_size      = 0;
272 long        wsrep_local_index       = -1;
273 long long   wsrep_local_bf_aborts   = 0;
274 const char* wsrep_provider_name     = provider_name;
275 const char* wsrep_provider_version  = provider_version;
276 const char* wsrep_provider_vendor   = provider_vendor;
277 char* wsrep_provider_capabilities   = NULL;
278 char* wsrep_cluster_capabilities    = NULL;
279 /* End wsrep status variables */
280 
281 wsp::Config_state *wsrep_config_state;
282 
WSREP_LOG(void (* fun)(const char * fmt,...),const char * fmt,...)283 void WSREP_LOG(void (*fun)(const char* fmt, ...), const char* fmt, ...)
284 {
285   /* Allocate short buffer from stack. If the vsnprintf() return value
286      indicates that the message was truncated, a new buffer will be allocated
287      dynamically and the message will be reprinted. */
288   char msg[128] = {'\0'};
289   va_list arglist;
290   va_start(arglist, fmt);
291   int n= vsnprintf(msg, sizeof(msg), fmt, arglist);
292   va_end(arglist);
293   if (n < 0)
294   {
295     sql_print_warning("WSREP: Printing message failed");
296   }
297   else if (n < (int)sizeof(msg))
298   {
299     fun("WSREP: %s", msg);
300   }
301   else
302   {
303     size_t dynbuf_size= std::max(n, 4096);
304     char* dynbuf= (char*) my_malloc(dynbuf_size, MYF(0));
305     if (dynbuf)
306     {
307       va_start(arglist, fmt);
308       (void)vsnprintf(&dynbuf[0], dynbuf_size - 1, fmt, arglist);
309       va_end(arglist);
310       dynbuf[dynbuf_size - 1] = '\0';
311       fun("WSREP: %s", &dynbuf[0]);
312       my_free(dynbuf);
313     }
314     else
315     {
316       /* Memory allocation for vector failed, print truncated message. */
317       fun("WSREP: %s", msg);
318     }
319   }
320 }
321 
322 
323 wsrep_uuid_t               local_uuid       = WSREP_UUID_UNDEFINED;
324 wsrep_seqno_t              local_seqno      = WSREP_SEQNO_UNDEFINED;
325 wsp::node_status           local_status;
326 
327 /*
328  */
329 Wsrep_schema *wsrep_schema= 0;
330 
wsrep_log_cb(wsrep::log::level level,const char *,const char * msg)331 static void wsrep_log_cb(wsrep::log::level level,
332                          const char*, const char *msg)
333 {
334   /*
335     Silence all wsrep related logging from lib and provider if
336     wsrep is not enabled.
337   */
338   if (!WSREP_ON) return;
339 
340   switch (level) {
341   case wsrep::log::info:
342     WSREP_INFO("%s", msg);
343     break;
344   case wsrep::log::warning:
345     WSREP_WARN("%s", msg);
346     break;
347   case wsrep::log::error:
348     WSREP_ERROR("%s", msg);
349     break;
350   case wsrep::log::debug:
351     WSREP_DEBUG("%s", msg);
352     break;
353   case wsrep::log::unknown:
354     WSREP_UNKNOWN("%s", msg);
355     break;
356   }
357 }
358 
wsrep_init_sidno(const wsrep::id & uuid)359 void wsrep_init_sidno(const wsrep::id& uuid)
360 {
361   /*
362     Protocol versions starting from 4 use group gtid as it is.
363     For lesser protocol versions generate new Sid map entry from inverted
364     uuid.
365   */
366   rpl_gtid sid;
367   if (wsrep_protocol_version >= 4)
368   {
369     memcpy((void*)&sid, (const uchar*)uuid.data(),16);
370   }
371   else
372   {
373     wsrep_uuid_t ltid_uuid;
374     for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
375     {
376       ltid_uuid.data[i]= ~((const uchar*)uuid.data())[i];
377     }
378     memcpy((void*)&sid, (const uchar*)ltid_uuid.data,16);
379   }
380 #ifdef GTID_SUPPORT
381   global_sid_lock->wrlock();
382   wsrep_sidno= global_sid_map->add_sid(sid);
383   WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno);
384   global_sid_lock->unlock();
385 #endif
386 }
387 
wsrep_init_schema()388 void wsrep_init_schema()
389 {
390   DBUG_ASSERT(!wsrep_schema);
391 
392   WSREP_INFO("wsrep_init_schema_and_SR %p", wsrep_schema);
393   if (!wsrep_schema)
394   {
395     wsrep_schema= new Wsrep_schema();
396     if (wsrep_schema->init())
397     {
398       WSREP_ERROR("Failed to init wsrep schema");
399       unireg_abort(1);
400     }
401   }
402 }
403 
wsrep_deinit_schema()404 void wsrep_deinit_schema()
405 {
406   delete wsrep_schema;
407   wsrep_schema= 0;
408 }
409 
wsrep_recover_sr_from_storage(THD * orig_thd)410 void wsrep_recover_sr_from_storage(THD *orig_thd)
411 {
412   switch (wsrep_SR_store_type)
413   {
414   case WSREP_SR_STORE_TABLE:
415     if (!wsrep_schema)
416     {
417       WSREP_ERROR("Wsrep schema not initialized when trying to recover "
418                   "streaming transactions");
419       unireg_abort(1);
420     }
421     if (wsrep_schema->recover_sr_transactions(orig_thd))
422     {
423       WSREP_ERROR("Failed to recover SR transactions from schema");
424       unireg_abort(1);
425     }
426     break;
427   default:
428     /* */
429     WSREP_ERROR("Unsupported wsrep SR store type: %lu", wsrep_SR_store_type);
430     unireg_abort(1);
431     break;
432   }
433 }
434 
435 /** Export the WSREP provider's capabilities as a human readable string.
436  * The result is saved in a dynamically allocated string of the form:
437  * :cap1:cap2:cap3:
438  */
wsrep_capabilities_export(wsrep_cap_t const cap,char ** str)439 static void wsrep_capabilities_export(wsrep_cap_t const cap, char** str)
440 {
441   static const char* names[] =
442   {
443     /* Keep in sync with wsrep/wsrep_api.h WSREP_CAP_* macros. */
444     "MULTI_MASTER",
445     "CERTIFICATION",
446     "PARALLEL_APPLYING",
447     "TRX_REPLAY",
448     "ISOLATION",
449     "PAUSE",
450     "CAUSAL_READS",
451     "CAUSAL_TRX",
452     "INCREMENTAL_WRITESET",
453     "SESSION_LOCKS",
454     "DISTRIBUTED_LOCKS",
455     "CONSISTENCY_CHECK",
456     "UNORDERED",
457     "ANNOTATION",
458     "PREORDERED",
459     "STREAMING",
460     "SNAPSHOT",
461     "NBO",
462   };
463 
464   std::string s;
465   for (size_t i= 0; i < sizeof(names) / sizeof(names[0]); ++i)
466   {
467     if (cap & (1ULL << i))
468     {
469       if (s.empty())
470       {
471         s= ":";
472       }
473       s += names[i];
474       s += ":";
475     }
476   }
477 
478   /* A read from the string pointed to by *str may be started at any time,
479    * so it must never point to free(3)d memory or non '\0' terminated string. */
480 
481   char* const previous= *str;
482 
483   *str= strdup(s.c_str());
484 
485   if (previous != NULL)
486   {
487     free(previous);
488   }
489 }
490 
491 /* Verifies that SE position is consistent with the group position
492  * and initializes other variables */
wsrep_verify_SE_checkpoint(const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno)493 void wsrep_verify_SE_checkpoint(const wsrep_uuid_t& uuid,
494                                 wsrep_seqno_t const seqno)
495 {
496 }
497 
498 /*
499   Wsrep is considered ready if
500   1) Provider is not loaded (native mode)
501   2) Server has reached synced state
502   3) Server is in joiner mode and mysqldump SST method has been
503      specified
504   See Wsrep_server_service::log_state_change() for further details.
505  */
wsrep_ready_get(void)506 my_bool wsrep_ready_get (void)
507 {
508   if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
509   my_bool ret= wsrep_ready;
510   mysql_mutex_unlock (&LOCK_wsrep_ready);
511   return ret;
512 }
513 
wsrep_show_ready(THD * thd,SHOW_VAR * var,char * buff)514 int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff)
515 {
516   var->type= SHOW_MY_BOOL;
517   var->value= buff;
518   *((my_bool *)buff)= wsrep_ready_get();
519   return 0;
520 }
521 
wsrep_update_cluster_state_uuid(const char * uuid)522 void wsrep_update_cluster_state_uuid(const char* uuid)
523 {
524   strncpy(cluster_uuid_str, uuid, sizeof(cluster_uuid_str) - 1);
525 }
526 
wsrep_init_position()527 static void wsrep_init_position()
528 {
529 }
530 
531 /****************************************************************************
532                          Helpers for wsrep_init()
533  ****************************************************************************/
wsrep_server_name()534 static std::string wsrep_server_name()
535 {
536   std::string ret(wsrep_node_name ? wsrep_node_name : "");
537   return ret;
538 }
539 
wsrep_server_id()540 static std::string wsrep_server_id()
541 {
542   /* using empty server_id, which enables view change handler to
543      set final server_id later on
544   */
545   std::string ret("");
546   return ret;
547 }
548 
wsrep_server_node_address()549 static std::string wsrep_server_node_address()
550 {
551 
552   std::string ret;
553   if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
554     wsrep_data_home_dir= mysql_real_data_home;
555 
556   /* Initialize node address */
557   if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
558   {
559     char node_addr[512]= {0, };
560     const size_t node_addr_max= sizeof(node_addr) - 1;
561     size_t guess_ip_ret= wsrep_guess_ip(node_addr, node_addr_max);
562     if (!(guess_ip_ret > 0 && guess_ip_ret < node_addr_max))
563     {
564       WSREP_WARN("Failed to guess base node address. Set it explicitly via "
565                  "wsrep_node_address.");
566     }
567     else
568     {
569       ret= node_addr;
570     }
571   }
572   else
573   {
574     ret= wsrep_node_address;
575   }
576   return ret;
577 }
578 
wsrep_server_incoming_address()579 static std::string wsrep_server_incoming_address()
580 {
581   std::string ret;
582   const std::string node_addr(wsrep_server_node_address());
583   char inc_addr[512]= { 0, };
584   size_t const inc_addr_max= sizeof (inc_addr);
585 
586   /*
587     In case wsrep_node_incoming_address is either not set or set to AUTO,
588     we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback
589     to wsrep_node_address' value if mysqld's bind-address is not set either.
590   */
591   if ((!wsrep_node_incoming_address ||
592        !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
593   {
594     bool is_ipv6= false;
595     unsigned int my_bind_ip= INADDR_ANY; // default if not set
596 
597     if (my_bind_addr_str && strlen(my_bind_addr_str) &&
598         strcmp(my_bind_addr_str, "*") != 0)
599     {
600       my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6);
601     }
602 
603     if (INADDR_ANY != my_bind_ip)
604     {
605       /*
606         If its a not a valid address, leave inc_addr as empty string. mysqld
607         is not listening for client connections on network interfaces.
608       */
609       if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
610       {
611         const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u";
612         snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port);
613       }
614     }
615     else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */
616     {
617       if (node_addr.size())
618       {
619         size_t const ip_len_mdb= wsrep_host_len(node_addr.c_str(), node_addr.size());
620         if (ip_len_mdb + 7 /* :55555\0 */ < inc_addr_max)
621         {
622           memcpy (inc_addr, node_addr.c_str(), ip_len_mdb);
623           snprintf(inc_addr + ip_len_mdb, inc_addr_max - ip_len_mdb, ":%u",
624                    (int)mysqld_port);
625         }
626         else
627         {
628           WSREP_WARN("Guessing address for incoming client connections: "
629                      "address too long.");
630           inc_addr[0]= '\0';
631         }
632       }
633 
634       if (!strlen(inc_addr))
635       {
636         WSREP_WARN("Guessing address for incoming client connections failed. "
637                    "Try setting wsrep_node_incoming_address explicitly.");
638         WSREP_INFO("Node addr: %s", node_addr.c_str());
639       }
640     }
641   }
642   else
643   {
644     wsp::Address addr(wsrep_node_incoming_address);
645 
646     if (!addr.is_valid())
647     {
648       WSREP_WARN("Could not parse wsrep_node_incoming_address : %s",
649                  wsrep_node_incoming_address);
650       goto done;
651     }
652 
653     /*
654       In case port is not specified in wsrep_node_incoming_address, we use
655       mysqld_port.
656     */
657     int port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port;
658     const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
659 
660     snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port);
661   }
662 
663  done:
664   ret= wsrep_node_incoming_address;
665   return ret;
666 }
667 
wsrep_server_working_dir()668 static std::string wsrep_server_working_dir()
669 {
670   std::string ret;
671   if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
672   {
673     ret= mysql_real_data_home;
674   }
675   else
676   {
677     ret= wsrep_data_home_dir;
678   }
679   return ret;
680 }
681 
wsrep_server_initial_position()682 static wsrep::gtid wsrep_server_initial_position()
683 {
684   wsrep::gtid ret;
685   WSREP_DEBUG("Server initial position: %s", wsrep_start_position);
686   std::istringstream is(wsrep_start_position);
687   is >> ret;
688   return ret;
689 }
690 
691 /*
692   Intitialize provider specific status variables
693  */
wsrep_init_provider_status_variables()694 static void wsrep_init_provider_status_variables()
695 {
696   wsrep_inited= 1;
697   const wsrep::provider& provider=
698     Wsrep_server_state::instance().provider();
699   strncpy(provider_name,
700           provider.name().c_str(),    sizeof(provider_name) - 1);
701   strncpy(provider_version,
702           provider.version().c_str(), sizeof(provider_version) - 1);
703   strncpy(provider_vendor,
704           provider.vendor().c_str(),  sizeof(provider_vendor) - 1);
705 }
706 
wsrep_init_server()707 int wsrep_init_server()
708 {
709   wsrep::log::logger_fn(wsrep_log_cb);
710   try
711   {
712     std::string server_name;
713     std::string server_id;
714     std::string node_address;
715     std::string incoming_address;
716     std::string working_dir;
717     wsrep::gtid initial_position;
718 
719     server_name= wsrep_server_name();
720     server_id= wsrep_server_id();
721     node_address= wsrep_server_node_address();
722     incoming_address= wsrep_server_incoming_address();
723     working_dir= wsrep_server_working_dir();
724     initial_position= wsrep_server_initial_position();
725 
726     Wsrep_server_state::init_once(server_name,
727                                   incoming_address,
728                                   node_address,
729                                   working_dir,
730                                   initial_position,
731                                   wsrep_max_protocol_version);
732     Wsrep_server_state::instance().debug_log_level(wsrep_debug);
733   }
734   catch (const wsrep::runtime_error& e)
735   {
736     WSREP_ERROR("Failed to init wsrep server %s", e.what());
737     return 1;
738   }
739   catch (const std::exception& e)
740   {
741     WSREP_ERROR("Failed to init wsrep server %s", e.what());
742   }
743   return 0;
744 }
745 
wsrep_init_globals()746 void wsrep_init_globals()
747 {
748   wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id());
749   wsrep_init_schema();
750 
751   if (WSREP_ON)
752   {
753     Wsrep_server_state::instance().initialized();
754   }
755 }
756 
wsrep_deinit_server()757 void wsrep_deinit_server()
758 {
759   wsrep_deinit_schema();
760   Wsrep_server_state::destroy();
761 }
762 
wsrep_init()763 int wsrep_init()
764 {
765   assert(wsrep_provider);
766 
767   wsrep_init_position();
768   wsrep_sst_auth_init();
769 
770   if (strlen(wsrep_provider)== 0 ||
771       !strcmp(wsrep_provider, WSREP_NONE))
772   {
773     // enable normal operation in case no provider is specified
774     global_system_variables.wsrep_on= 0;
775     int err= Wsrep_server_state::instance().load_provider(wsrep_provider, wsrep_provider_options ? wsrep_provider_options : "");
776     if (err)
777     {
778       DBUG_PRINT("wsrep",("wsrep::init() failed: %d", err));
779       WSREP_ERROR("wsrep::init() failed: %d, must shutdown", err);
780     }
781     else
782       wsrep_init_provider_status_variables();
783     return err;
784   }
785 
786   global_system_variables.wsrep_on= 1;
787 
788   WSREP_ON_= wsrep_provider && strcmp(wsrep_provider, WSREP_NONE);
789 
790   if (wsrep_gtid_mode && opt_bin_log && !opt_log_slave_updates)
791   {
792     WSREP_ERROR("Option --log-slave-updates is required if "
793                 "binlog is enabled, GTID mode is on and wsrep provider "
794                 "is specified");
795     return 1;
796   }
797 
798   if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
799     wsrep_data_home_dir= mysql_real_data_home;
800 
801   if (Wsrep_server_state::instance().load_provider(wsrep_provider,
802                                                    wsrep_provider_options))
803   {
804     WSREP_ERROR("Failed to load provider");
805     return 1;
806   }
807 
808   if (!wsrep_provider_is_SR_capable() &&
809       global_system_variables.wsrep_trx_fragment_size > 0)
810   {
811     WSREP_ERROR("The WSREP provider (%s) does not support streaming "
812                 "replication but wsrep_trx_fragment_size is set to a "
813                 "value other than 0 (%llu). Cannot continue. Either set "
814                 "wsrep_trx_fragment_size to 0 or use wsrep_provider that "
815                 "supports streaming replication.",
816                 wsrep_provider, global_system_variables.wsrep_trx_fragment_size);
817     Wsrep_server_state::instance().unload_provider();
818     return 1;
819   }
820 
821   wsrep_init_provider_status_variables();
822   wsrep_capabilities_export(Wsrep_server_state::instance().provider().capabilities(),
823                             &wsrep_provider_capabilities);
824 
825   WSREP_DEBUG("SR storage init for: %s",
826               (wsrep_SR_store_type == WSREP_SR_STORE_TABLE) ? "table" : "void");
827 
828   return 0;
829 }
830 
831 /* Initialize wsrep thread LOCKs and CONDs */
wsrep_thr_init()832 void wsrep_thr_init()
833 {
834   DBUG_ENTER("wsrep_thr_init");
835   wsrep_config_state= new wsp::Config_state;
836 #ifdef HAVE_PSI_INTERFACE
837   mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes));
838   mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds));
839   mysql_file_register("sql", wsrep_files, array_elements(wsrep_files));
840   mysql_thread_register("sql", wsrep_threads, array_elements(wsrep_threads));
841 #endif
842 
843   mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
844   mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
845   mysql_mutex_init(key_LOCK_wsrep_sst, &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
846   mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
847   mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
848   mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
849   mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
850   mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
851   mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
852   mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL);
853   mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST);
854   mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
855   mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
856   mysql_mutex_init(key_LOCK_wsrep_group_commit, &LOCK_wsrep_group_commit, MY_MUTEX_INIT_FAST);
857   mysql_mutex_init(key_LOCK_wsrep_SR_pool,
858                    &LOCK_wsrep_SR_pool, MY_MUTEX_INIT_FAST);
859   mysql_mutex_init(key_LOCK_wsrep_SR_store,
860                    &LOCK_wsrep_SR_store, MY_MUTEX_INIT_FAST);
861   mysql_mutex_init(key_LOCK_wsrep_joiner_monitor,
862                    &LOCK_wsrep_joiner_monitor, MY_MUTEX_INIT_FAST);
863   mysql_mutex_init(key_LOCK_wsrep_donor_monitor,
864                    &LOCK_wsrep_donor_monitor, MY_MUTEX_INIT_FAST);
865   mysql_cond_init(key_COND_wsrep_joiner_monitor, &COND_wsrep_joiner_monitor, NULL);
866   mysql_cond_init(key_COND_wsrep_donor_monitor, &COND_wsrep_donor_monitor, NULL);
867 
868   DBUG_VOID_RETURN;
869 }
870 
wsrep_init_startup(bool sst_first)871 void wsrep_init_startup (bool sst_first)
872 {
873   if (wsrep_init()) unireg_abort(1);
874 
875   /*
876     Pre-initialize global_system_variables.table_plugin with a dummy engine
877     (placeholder) required during the initialization of wsrep threads (THDs).
878     (see: plugin_thdvar_init())
879     Note: This only needs to be done for rsync & mariabackup based SST methods.
880     In case of mysqldump SST method, the wsrep threads are created after the
881     server plugins & global system variables are initialized.
882   */
883   if (wsrep_before_SE())
884     wsrep_plugins_pre_init();
885 
886   /* Skip replication start if dummy wsrep provider is loaded */
887   if (!strcmp(wsrep_provider, WSREP_NONE)) return;
888 
889   /* Skip replication start if no cluster address */
890   if (!wsrep_cluster_address_exists()) return;
891 
892   /*
893     Read value of wsrep_new_cluster before wsrep_start_replication(),
894     the value is reset to FALSE inside wsrep_start_replication.
895   */
896   if (!wsrep_start_replication(wsrep_cluster_address)) unireg_abort(1);
897 
898   wsrep_create_rollbacker();
899   wsrep_create_appliers(1);
900 
901   Wsrep_server_state& server_state= Wsrep_server_state::instance();
902   /*
903     If the SST happens before server initialization, wait until the server
904     state reaches initializing. This indicates that
905     either SST was not necessary or SST has been delivered.
906 
907     With mysqldump SST (!sst_first) wait until the server reaches
908     joiner state and procedd to accepting connections.
909   */
910   if (sst_first)
911   {
912     server_state.wait_until_state(Wsrep_server_state::s_initializing);
913   }
914   else
915   {
916     server_state.wait_until_state(Wsrep_server_state::s_joiner);
917   }
918 }
919 
920 
wsrep_deinit(bool free_options)921 void wsrep_deinit(bool free_options)
922 {
923   DBUG_ASSERT(wsrep_inited == 1);
924   WSREP_DEBUG("wsrep_deinit");
925 
926   Wsrep_server_state::instance().unload_provider();
927   provider_name[0]=    '\0';
928   provider_version[0]= '\0';
929   provider_vendor[0]=  '\0';
930 
931   wsrep_inited= 0;
932 
933   if (wsrep_provider_capabilities != NULL)
934   {
935     char* p= wsrep_provider_capabilities;
936     wsrep_provider_capabilities= NULL;
937     free(p);
938   }
939 
940   if (free_options)
941   {
942     wsrep_sst_auth_free();
943   }
944 }
945 
946 /* Destroy wsrep thread LOCKs and CONDs */
wsrep_thr_deinit()947 void wsrep_thr_deinit()
948 {
949   if (!wsrep_config_state)
950     return;                                     // Never initialized
951   WSREP_DEBUG("wsrep_thr_deinit");
952   mysql_mutex_destroy(&LOCK_wsrep_ready);
953   mysql_cond_destroy(&COND_wsrep_ready);
954   mysql_mutex_destroy(&LOCK_wsrep_sst);
955   mysql_cond_destroy(&COND_wsrep_sst);
956   mysql_mutex_destroy(&LOCK_wsrep_sst_init);
957   mysql_cond_destroy(&COND_wsrep_sst_init);
958   mysql_mutex_destroy(&LOCK_wsrep_replaying);
959   mysql_cond_destroy(&COND_wsrep_replaying);
960   mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
961   mysql_cond_destroy(&COND_wsrep_slave_threads);
962   mysql_mutex_destroy(&LOCK_wsrep_cluster_config);
963   mysql_mutex_destroy(&LOCK_wsrep_desync);
964   mysql_mutex_destroy(&LOCK_wsrep_config_state);
965   mysql_mutex_destroy(&LOCK_wsrep_group_commit);
966   mysql_mutex_destroy(&LOCK_wsrep_SR_pool);
967   mysql_mutex_destroy(&LOCK_wsrep_SR_store);
968   mysql_mutex_destroy(&LOCK_wsrep_joiner_monitor);
969   mysql_mutex_destroy(&LOCK_wsrep_donor_monitor);
970   mysql_cond_destroy(&COND_wsrep_joiner_monitor);
971   mysql_cond_destroy(&COND_wsrep_donor_monitor);
972 
973   delete wsrep_config_state;
974   wsrep_config_state= 0;                        // Safety
975 
976   if (wsrep_cluster_capabilities != NULL)
977   {
978     char* p= wsrep_cluster_capabilities;
979     wsrep_cluster_capabilities= NULL;
980     free(p);
981   }
982 }
983 
wsrep_recover()984 void wsrep_recover()
985 {
986   char uuid_str[40];
987 
988   if (wsrep_uuid_compare(&local_uuid, &WSREP_UUID_UNDEFINED) == 0 &&
989       local_seqno == -2)
990   {
991     wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
992     WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
993                uuid_str, (long long)local_seqno);
994     return;
995   }
996   wsrep::gtid gtid= wsrep_get_SE_checkpoint();
997   std::ostringstream oss;
998   oss << gtid;
999   WSREP_INFO("Recovered position: %s", oss.str().c_str());
1000 }
1001 
1002 
wsrep_stop_replication(THD * thd)1003 void wsrep_stop_replication(THD *thd)
1004 {
1005   WSREP_INFO("Stop replication by %llu", (thd) ? thd->thread_id : 0);
1006   if (Wsrep_server_state::instance().state() !=
1007       Wsrep_server_state::s_disconnected)
1008   {
1009     WSREP_DEBUG("Disconnect provider");
1010     Wsrep_server_state::instance().disconnect();
1011     Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected);
1012   }
1013 
1014   /* my connection, should not terminate with wsrep_close_client_connection(),
1015      make transaction to rollback
1016   */
1017   if (thd && !thd->wsrep_applier) trans_rollback(thd);
1018   wsrep_close_client_connections(TRUE, thd);
1019 
1020   /* wait until appliers have stopped */
1021   wsrep_wait_appliers_close(thd);
1022 
1023   node_uuid= WSREP_UUID_UNDEFINED;
1024 }
1025 
wsrep_shutdown_replication()1026 void wsrep_shutdown_replication()
1027 {
1028   WSREP_INFO("Shutdown replication");
1029   if (Wsrep_server_state::instance().state() != wsrep::server_state::s_disconnected)
1030   {
1031     WSREP_DEBUG("Disconnect provider");
1032     Wsrep_server_state::instance().disconnect();
1033     Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected);
1034   }
1035 
1036   wsrep_close_client_connections(TRUE);
1037 
1038   /* wait until appliers have stopped */
1039   wsrep_wait_appliers_close(NULL);
1040   node_uuid= WSREP_UUID_UNDEFINED;
1041 
1042   /* Undocking the thread specific data. */
1043   my_pthread_setspecific_ptr(THR_THD, NULL);
1044 }
1045 
wsrep_start_replication(const char * wsrep_cluster_address)1046 bool wsrep_start_replication(const char *wsrep_cluster_address)
1047 {
1048   int rcode;
1049   WSREP_DEBUG("wsrep_start_replication");
1050 
1051   /*
1052     if provider is trivial, don't even try to connect,
1053     but resume local node operation
1054   */
1055   if (!WSREP_PROVIDER_EXISTS)
1056   {
1057     // enable normal operation in case no provider is specified
1058     return true;
1059   }
1060 
1061   DBUG_ASSERT(wsrep_cluster_address[0]);
1062 
1063   bool const bootstrap(TRUE == wsrep_new_cluster);
1064   wsrep_new_cluster= FALSE;
1065 
1066   WSREP_INFO("Start replication");
1067 
1068   if ((rcode= Wsrep_server_state::instance().connect(
1069         wsrep_cluster_name,
1070         wsrep_cluster_address,
1071         wsrep_sst_donor,
1072         bootstrap)))
1073   {
1074     DBUG_PRINT("wsrep",("wsrep_ptr->connect(%s) failed: %d",
1075                         wsrep_cluster_address, rcode));
1076     WSREP_ERROR("wsrep::connect(%s) failed: %d",
1077                 wsrep_cluster_address, rcode);
1078     return false;
1079   }
1080   else
1081   {
1082     try
1083     {
1084       std::string opts= Wsrep_server_state::instance().provider().options();
1085       wsrep_provider_options_init(opts.c_str());
1086     }
1087     catch (const wsrep::runtime_error&)
1088     {
1089       WSREP_WARN("Failed to get wsrep options");
1090     }
1091   }
1092 
1093   return true;
1094 }
1095 
wsrep_must_sync_wait(THD * thd,uint mask)1096 bool wsrep_must_sync_wait (THD* thd, uint mask)
1097 {
1098   bool ret;
1099   mysql_mutex_lock(&thd->LOCK_thd_data);
1100   ret= (thd->variables.wsrep_sync_wait & mask) &&
1101     thd->wsrep_client_thread &&
1102     WSREP_ON && thd->variables.wsrep_on &&
1103     !(thd->variables.wsrep_dirty_reads &&
1104       !is_update_query(thd->lex->sql_command)) &&
1105     !thd->in_active_multi_stmt_transaction() &&
1106     thd->wsrep_trx().state() !=
1107     wsrep::transaction::s_replaying &&
1108     thd->wsrep_cs().sync_wait_gtid().is_undefined();
1109   mysql_mutex_unlock(&thd->LOCK_thd_data);
1110   return ret;
1111 }
1112 
wsrep_sync_wait(THD * thd,uint mask)1113 bool wsrep_sync_wait (THD* thd, uint mask)
1114 {
1115   if (wsrep_must_sync_wait(thd, mask))
1116   {
1117     WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait= %u, "
1118                 "mask= %u, thd->variables.wsrep_on= %d",
1119                 thd->variables.wsrep_sync_wait, mask,
1120                 thd->variables.wsrep_on);
1121     /*
1122       This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
1123       TODO: modify to check if thd has locked any rows.
1124     */
1125     if (thd->wsrep_cs().sync_wait(-1))
1126     {
1127       const char* msg;
1128       int err;
1129 
1130       /*
1131         Possibly relevant error codes:
1132         ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
1133         ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
1134         ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
1135       */
1136 
1137       switch (thd->wsrep_cs().current_error())
1138       {
1139       case wsrep::e_not_supported_error:
1140         msg= "synchronous reads by wsrep backend. "
1141           "Please unset wsrep_causal_reads variable.";
1142         err= ER_NOT_SUPPORTED_YET;
1143         break;
1144       default:
1145         msg= "Synchronous wait failed.";
1146         err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
1147                                    //       with ER_LOCK_WAIT_TIMEOUT
1148       }
1149 
1150       my_error(err, MYF(0), msg);
1151 
1152       return true;
1153     }
1154   }
1155 
1156   return false;
1157 }
1158 
1159 enum wsrep::provider::status
wsrep_sync_wait_upto(THD * thd,wsrep_gtid_t * upto,int timeout)1160 wsrep_sync_wait_upto (THD*          thd,
1161                       wsrep_gtid_t* upto,
1162                       int           timeout)
1163 {
1164   DBUG_ASSERT(upto);
1165   enum wsrep::provider::status ret;
1166   if (upto)
1167   {
1168     wsrep::gtid upto_gtid(wsrep::id(upto->uuid.data, sizeof(upto->uuid.data)),
1169                           wsrep::seqno(upto->seqno));
1170     ret= Wsrep_server_state::instance().wait_for_gtid(upto_gtid, timeout);
1171   }
1172   else
1173   {
1174     ret= Wsrep_server_state::instance().causal_read(timeout).second;
1175   }
1176   WSREP_DEBUG("wsrep_sync_wait_upto: %d", ret);
1177   return ret;
1178 }
1179 
wsrep_keys_free(wsrep_key_arr_t * key_arr)1180 void wsrep_keys_free(wsrep_key_arr_t* key_arr)
1181 {
1182     for (size_t i= 0; i < key_arr->keys_len; ++i)
1183     {
1184         my_free((void*)key_arr->keys[i].key_parts);
1185     }
1186     my_free(key_arr->keys);
1187     key_arr->keys= 0;
1188     key_arr->keys_len= 0;
1189 }
1190 
1191 /*!
1192  * @param thd    thread
1193  * @param tables list of tables
1194  * @param keys   prepared keys
1195 
1196  * @return true if parent table append was successfull, otherwise false.
1197 */
1198 bool
wsrep_append_fk_parent_table(THD * thd,TABLE_LIST * tables,wsrep::key_array * keys)1199 wsrep_append_fk_parent_table(THD* thd, TABLE_LIST* tables, wsrep::key_array* keys)
1200 {
1201     bool fail= false;
1202     TABLE_LIST *table;
1203 
1204     for (table= tables; table; table= table->next_local)
1205     {
1206       if (is_temporary_table(table))
1207       {
1208         WSREP_DEBUG("Temporary table %s.%s already opened query=%s", table->db.str,
1209                     table->table_name.str, wsrep_thd_query(thd));
1210 	return false;
1211       }
1212     }
1213 
1214     thd->release_transactional_locks();
1215     uint counter;
1216     MDL_savepoint mdl_savepoint= thd->mdl_context.mdl_savepoint();
1217 
1218     if (open_tables(thd, &tables, &counter, MYSQL_OPEN_FORCE_SHARED_HIGH_PRIO_MDL))
1219     {
1220       WSREP_DEBUG("Unable to open table for FK checks for %s", wsrep_thd_query(thd));
1221       fail= true;
1222       goto exit;
1223     }
1224 
1225     for (table= tables; table; table= table->next_local)
1226     {
1227       if (!is_temporary_table(table) && table->table)
1228       {
1229         FOREIGN_KEY_INFO *f_key_info;
1230         List<FOREIGN_KEY_INFO> f_key_list;
1231 
1232         table->table->file->get_foreign_key_list(thd, &f_key_list);
1233         List_iterator_fast<FOREIGN_KEY_INFO> it(f_key_list);
1234         while ((f_key_info=it++))
1235         {
1236           WSREP_DEBUG("appended fkey %s", f_key_info->referenced_table->str);
1237           keys->push_back(wsrep_prepare_key_for_toi(f_key_info->referenced_db->str,
1238                                                     f_key_info->referenced_table->str,
1239                                                     wsrep::key::shared));
1240         }
1241       }
1242     }
1243 
1244 exit:
1245     /* close the table and release MDL locks */
1246     close_thread_tables(thd);
1247     thd->mdl_context.rollback_to_savepoint(mdl_savepoint);
1248     for (table= tables; table; table= table->next_local)
1249     {
1250       table->table= NULL;
1251       table->next_global= NULL;
1252       table->mdl_request.ticket= NULL;
1253     }
1254 
1255     return fail;
1256 }
1257 
wsrep_reload_ssl()1258 bool wsrep_reload_ssl()
1259 {
1260   try
1261   {
1262     std::string opts= Wsrep_server_state::instance().provider().options();
1263     if (opts.find("socket.ssl_reload") == std::string::npos)
1264     {
1265       WSREP_DEBUG("Option `socket.ssl_reload` not found in parameters.");
1266       return false;
1267     }
1268     const std::string reload_ssl_param("socket.ssl_reload=1");
1269     enum wsrep::provider::status ret= Wsrep_server_state::instance().provider().options(reload_ssl_param);
1270     if (ret)
1271     {
1272       WSREP_ERROR("Set options returned %d", ret);
1273       return true;
1274     }
1275     return false;
1276   }
1277   catch (...)
1278   {
1279     WSREP_ERROR("Failed to get provider options");
1280     return true;
1281   }
1282 }
1283 
1284 /*!
1285  * @param db      Database string
1286  * @param table   Table string
1287  * @param key     Array of wsrep_key_t
1288  * @param key_len In: number of elements in key array, Out: number of
1289  *                elements populated
1290  *
1291  * @return true if preparation was successful, otherwise false.
1292  */
1293 
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_buf_t * key,size_t * key_len)1294 static bool wsrep_prepare_key_for_isolation(const char* db,
1295                                            const char* table,
1296                                            wsrep_buf_t* key,
1297                                            size_t* key_len)
1298 {
1299   if (*key_len < 2) return false;
1300 
1301   switch (wsrep_protocol_version)
1302   {
1303   case 0:
1304     *key_len= 0;
1305     break;
1306   case 1:
1307   case 2:
1308   case 3:
1309   case 4:
1310   {
1311     *key_len= 0;
1312     if (db)
1313     {
1314       key[*key_len].ptr= db;
1315       key[*key_len].len= strlen(db);
1316       ++(*key_len);
1317       if (table)
1318       {
1319         key[*key_len].ptr= table;
1320         key[*key_len].len= strlen(table);
1321         ++(*key_len);
1322       }
1323     }
1324     break;
1325   }
1326   default:
1327     assert(0);
1328     WSREP_ERROR("Unsupported protocol version: %ld", wsrep_protocol_version);
1329     unireg_abort(1);
1330     return false;
1331   }
1332 
1333     return true;
1334 }
1335 
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_key_arr_t * ka)1336 static bool wsrep_prepare_key_for_isolation(const char* db,
1337                                             const char* table,
1338                                             wsrep_key_arr_t* ka)
1339 {
1340   wsrep_key_t* tmp;
1341   tmp= (wsrep_key_t*)my_realloc(ka->keys,
1342                                 (ka->keys_len + 1) * sizeof(wsrep_key_t),
1343                                 MYF(MY_ALLOW_ZERO_PTR));
1344   if (!tmp)
1345   {
1346     WSREP_ERROR("Can't allocate memory for key_array");
1347     return false;
1348   }
1349   ka->keys= tmp;
1350   if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
1351         my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
1352   {
1353     WSREP_ERROR("Can't allocate memory for key_parts");
1354     return false;
1355   }
1356   ka->keys[ka->keys_len].key_parts_num= 2;
1357   ++ka->keys_len;
1358   if (!wsrep_prepare_key_for_isolation(db, table,
1359                                        (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
1360                                        &ka->keys[ka->keys_len - 1].key_parts_num))
1361   {
1362     WSREP_ERROR("Preparing keys for isolation failed");
1363     return false;
1364   }
1365 
1366   return true;
1367 }
1368 
wsrep_prepare_keys_for_alter_add_fk(const char * child_table_db,Alter_info * alter_info,wsrep_key_arr_t * ka)1369 static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
1370                                                 Alter_info* alter_info,
1371                                                 wsrep_key_arr_t* ka)
1372 {
1373   Key *key;
1374   List_iterator<Key> key_iterator(alter_info->key_list);
1375   while ((key= key_iterator++))
1376   {
1377     if (key->type == Key::FOREIGN_KEY)
1378     {
1379       Foreign_key *fk_key= (Foreign_key *)key;
1380       const char *db_name= fk_key->ref_db.str;
1381       const char *table_name= fk_key->ref_table.str;
1382       if (!db_name)
1383       {
1384         db_name= child_table_db;
1385       }
1386       if (!wsrep_prepare_key_for_isolation(db_name, table_name, ka))
1387       {
1388         return false;
1389       }
1390     }
1391   }
1392   return true;
1393 }
1394 
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep_key_arr_t * ka)1395 static bool wsrep_prepare_keys_for_isolation(THD*              thd,
1396                                              const char*       db,
1397                                              const char*       table,
1398                                              const TABLE_LIST* table_list,
1399                                              Alter_info*       alter_info,
1400                                              wsrep_key_arr_t*  ka)
1401 {
1402   ka->keys= 0;
1403   ka->keys_len= 0;
1404 
1405   if (db || table)
1406   {
1407     if (!wsrep_prepare_key_for_isolation(db, table, ka))
1408       goto err;
1409   }
1410 
1411   for (const TABLE_LIST* table= table_list; table; table= table->next_global)
1412   {
1413     if (!wsrep_prepare_key_for_isolation(table->db.str, table->table_name.str, ka))
1414       goto err;
1415   }
1416 
1417   if (alter_info)
1418   {
1419     if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info, ka))
1420       goto err;
1421   }
1422   return false;
1423 
1424 err:
1425     wsrep_keys_free(ka);
1426     return true;
1427 }
1428 
1429 /*
1430  * Prepare key list from db/table and table_list
1431  *
1432  * Return zero in case of success, 1 in case of failure.
1433  */
1434 
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,wsrep_key_arr_t * ka)1435 bool wsrep_prepare_keys_for_isolation(THD*              thd,
1436                                       const char*       db,
1437                                       const char*       table,
1438                                       const TABLE_LIST* table_list,
1439                                       wsrep_key_arr_t*  ka)
1440 {
1441   return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka);
1442 }
1443 
wsrep_prepare_key(const uchar * cache_key,size_t cache_key_len,const uchar * row_id,size_t row_id_len,wsrep_buf_t * key,size_t * key_len)1444 bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len,
1445                        const uchar* row_id, size_t row_id_len,
1446                        wsrep_buf_t* key, size_t* key_len)
1447 {
1448     if (*key_len < 3) return false;
1449 
1450     *key_len= 0;
1451     switch (wsrep_protocol_version)
1452     {
1453     case 0:
1454     {
1455         key[0].ptr= cache_key;
1456         key[0].len= cache_key_len;
1457 
1458         *key_len= 1;
1459         break;
1460     }
1461     case 1:
1462     case 2:
1463     case 3:
1464     case 4:
1465     {
1466         key[0].ptr= cache_key;
1467         key[0].len= strlen( (char*)cache_key );
1468 
1469         key[1].ptr= cache_key + strlen( (char*)cache_key ) + 1;
1470         key[1].len= strlen( (char*)(key[1].ptr) );
1471 
1472         *key_len= 2;
1473         break;
1474     }
1475     default:
1476         return false;
1477     }
1478 
1479     key[*key_len].ptr= row_id;
1480     key[*key_len].len= row_id_len;
1481     ++(*key_len);
1482 
1483     return true;
1484 }
1485 
wsrep_prepare_key_for_innodb(THD * thd,const uchar * cache_key,size_t cache_key_len,const uchar * row_id,size_t row_id_len,wsrep_buf_t * key,size_t * key_len)1486 bool wsrep_prepare_key_for_innodb(THD* thd,
1487                                   const uchar* cache_key,
1488                                   size_t cache_key_len,
1489                                   const uchar* row_id,
1490                                   size_t row_id_len,
1491                                   wsrep_buf_t* key,
1492                                   size_t* key_len)
1493 {
1494 
1495   return wsrep_prepare_key(cache_key, cache_key_len, row_id, row_id_len, key, key_len);
1496 }
1497 
wsrep_prepare_key_for_toi(const char * db,const char * table,enum wsrep::key::type type)1498 wsrep::key wsrep_prepare_key_for_toi(const char* db, const char* table,
1499                                      enum wsrep::key::type type)
1500 {
1501   wsrep::key ret(type);
1502   DBUG_ASSERT(db);
1503   ret.append_key_part(db, strlen(db));
1504   if (table) ret.append_key_part(table, strlen(table));
1505   return ret;
1506 }
1507 
1508 wsrep::key_array
wsrep_prepare_keys_for_alter_add_fk(const char * child_table_db,Alter_info * alter_info)1509 wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
1510                                     Alter_info* alter_info)
1511 
1512 {
1513   wsrep::key_array ret;
1514   Key *key;
1515   List_iterator<Key> key_iterator(alter_info->key_list);
1516   while ((key= key_iterator++))
1517   {
1518     if (key->type == Key::FOREIGN_KEY)
1519     {
1520       Foreign_key *fk_key= (Foreign_key *)key;
1521       const char *db_name= fk_key->ref_db.str;
1522       const char *table_name= fk_key->ref_table.str;
1523       if (!db_name)
1524       {
1525         db_name= child_table_db;
1526       }
1527       ret.push_back(wsrep_prepare_key_for_toi(db_name, table_name,
1528                                               wsrep::key::exclusive));
1529     }
1530   }
1531   return ret;
1532 }
1533 
wsrep_prepare_keys_for_toi(const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep::key_array * fk_tables)1534 wsrep::key_array wsrep_prepare_keys_for_toi(const char* db,
1535                                             const char* table,
1536                                             const TABLE_LIST* table_list,
1537                                             Alter_info* alter_info,
1538                                             wsrep::key_array* fk_tables)
1539 {
1540   wsrep::key_array ret;
1541   if (db || table)
1542   {
1543     ret.push_back(wsrep_prepare_key_for_toi(db, table, wsrep::key::exclusive));
1544   }
1545   for (const TABLE_LIST* table= table_list; table; table= table->next_global)
1546   {
1547     ret.push_back(wsrep_prepare_key_for_toi(table->db.str, table->table_name.str,
1548                                             wsrep::key::exclusive));
1549   }
1550   if (alter_info)
1551   {
1552     wsrep::key_array fk(wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info));
1553     if (!fk.empty())
1554     {
1555       ret.insert(ret.end(), fk.begin(), fk.end());
1556     }
1557   }
1558   if (fk_tables && !fk_tables->empty())
1559   {
1560     ret.insert(ret.end(), fk_tables->begin(), fk_tables->end());
1561   }
1562   return ret;
1563 }
1564 
1565 /*
1566  * Construct Query_log_Event from thd query and serialize it
1567  * into buffer.
1568  *
1569  * Return 0 in case of success, 1 in case of error.
1570  */
wsrep_to_buf_helper(THD * thd,const char * query,uint query_len,uchar ** buf,size_t * buf_len)1571 int wsrep_to_buf_helper(
1572     THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
1573 {
1574   IO_CACHE tmp_io_cache;
1575   Log_event_writer writer(&tmp_io_cache, 0);
1576   if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
1577                        65536, MYF(MY_WME)))
1578     return 1;
1579   int ret(0);
1580   enum enum_binlog_checksum_alg current_binlog_check_alg=
1581     (enum_binlog_checksum_alg) binlog_checksum_options;
1582 
1583   Format_description_log_event *tmp_fd= new Format_description_log_event(4);
1584   tmp_fd->checksum_alg= current_binlog_check_alg;
1585   writer.write(tmp_fd);
1586   delete tmp_fd;
1587 
1588 #ifdef GTID_SUPPORT
1589   if (thd->variables.gtid_next.type == GTID_GROUP)
1590   {
1591       Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
1592       if (!gtid_ev.is_valid()) ret= 0;
1593       if (!ret && writer.write(&gtid_ev)) ret= 1;
1594   }
1595 #endif /* GTID_SUPPORT */
1596   if (wsrep_gtid_mode && thd->variables.gtid_seq_no)
1597   {
1598     Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no,
1599                           thd->variables.gtid_domain_id,
1600                           true, LOG_EVENT_SUPPRESS_USE_F,
1601                           true, 0);
1602     gtid_event.server_id= thd->variables.server_id;
1603     if (!gtid_event.is_valid()) ret= 0;
1604     ret= writer.write(&gtid_event);
1605   }
1606 
1607   /* if there is prepare query, add event for it */
1608   if (!ret && thd->wsrep_TOI_pre_query)
1609   {
1610     Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
1611 		       thd->wsrep_TOI_pre_query_len,
1612 		       FALSE, FALSE, FALSE, 0);
1613     ev.checksum_alg= current_binlog_check_alg;
1614     if (writer.write(&ev)) ret= 1;
1615   }
1616 
1617   /* continue to append the actual query */
1618   Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
1619   ev.checksum_alg= current_binlog_check_alg;
1620   if (!ret && writer.write(&ev)) ret= 1;
1621   if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
1622   close_cached_file(&tmp_io_cache);
1623   return ret;
1624 }
1625 
1626 static int
wsrep_alter_query_string(THD * thd,String * buf)1627 wsrep_alter_query_string(THD *thd, String *buf)
1628 {
1629   /* Append the "ALTER" part of the query */
1630   if (buf->append(STRING_WITH_LEN("ALTER ")))
1631     return 1;
1632   /* Append definer */
1633   append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host));
1634   /* Append the left part of thd->query after event name part */
1635   if (buf->append(thd->lex->stmt_definition_begin,
1636                   thd->lex->stmt_definition_end -
1637                   thd->lex->stmt_definition_begin))
1638     return 1;
1639 
1640   return 0;
1641 }
1642 
wsrep_alter_event_query(THD * thd,uchar ** buf,size_t * buf_len)1643 static int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len)
1644 {
1645   String log_query;
1646 
1647   if (wsrep_alter_query_string(thd, &log_query))
1648   {
1649     WSREP_WARN("events alter string failed: schema: %s, query: %s",
1650                thd->get_db(), thd->query());
1651     return 1;
1652   }
1653   return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
1654 }
1655 
1656 #include "sql_show.h"
1657 static int
create_view_query(THD * thd,uchar ** buf,size_t * buf_len)1658 create_view_query(THD *thd, uchar** buf, size_t* buf_len)
1659 {
1660     LEX *lex= thd->lex;
1661     SELECT_LEX *select_lex= lex->first_select_lex();
1662     TABLE_LIST *first_table= select_lex->table_list.first;
1663     TABLE_LIST *views= first_table;
1664     LEX_USER *definer;
1665     String buff;
1666     const LEX_CSTRING command[3]=
1667       {{ STRING_WITH_LEN("CREATE ") },
1668        { STRING_WITH_LEN("ALTER ") },
1669        { STRING_WITH_LEN("CREATE OR REPLACE ") }};
1670 
1671     buff.append(&command[thd->lex->create_view->mode]);
1672 
1673     if (lex->definer)
1674       definer= get_current_user(thd, lex->definer);
1675     else
1676     {
1677       /*
1678         DEFINER-clause is missing; we have to create default definer in
1679         persistent arena to be PS/SP friendly.
1680         If this is an ALTER VIEW then the current user should be set as
1681         the definer.
1682       */
1683       definer= create_default_definer(thd, false);
1684     }
1685 
1686     if (definer)
1687     {
1688       views->definer.user= definer->user;
1689       views->definer.host= definer->host;
1690     } else {
1691       WSREP_ERROR("Failed to get DEFINER for VIEW.");
1692       return 1;
1693     }
1694 
1695     views->algorithm   = lex->create_view->algorithm;
1696     views->view_suid   = lex->create_view->suid;
1697     views->with_check  = lex->create_view->check;
1698 
1699     view_store_options(thd, views, &buff);
1700     buff.append(STRING_WITH_LEN("VIEW "));
1701     /* Test if user supplied a db (ie: we did not use thd->db) */
1702     if (views->db.str && views->db.str[0] &&
1703         (thd->db.str == NULL || cmp(&views->db, &thd->db)))
1704     {
1705       append_identifier(thd, &buff, &views->db);
1706       buff.append('.');
1707     }
1708     append_identifier(thd, &buff, &views->table_name);
1709     if (lex->view_list.elements)
1710     {
1711       List_iterator_fast<LEX_CSTRING> names(lex->view_list);
1712       LEX_CSTRING *name;
1713       int i;
1714 
1715       for (i= 0; (name= names++); i++)
1716       {
1717         buff.append(i ? ", " : "(");
1718         append_identifier(thd, &buff, name);
1719       }
1720       buff.append(')');
1721     }
1722     buff.append(STRING_WITH_LEN(" AS "));
1723     buff.append(thd->lex->create_view->select.str,
1724                 thd->lex->create_view->select.length);
1725     return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1726 }
1727 
1728 /*
1729   Rewrite DROP TABLE for TOI. Temporary tables are eliminated from
1730   the query as they are visible only to client connection.
1731 
1732   TODO: See comments for sql_base.cc:drop_temporary_table() and refine
1733   the function to deal with transactional locked tables.
1734  */
wsrep_drop_table_query(THD * thd,uchar ** buf,size_t * buf_len)1735 static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len)
1736 {
1737 
1738   LEX* lex= thd->lex;
1739   SELECT_LEX* select_lex= lex->first_select_lex();
1740   TABLE_LIST* first_table= select_lex->table_list.first;
1741   String buff;
1742 
1743   DBUG_ASSERT(!lex->create_info.tmp_table());
1744 
1745   bool found_temp_table= false;
1746   for (TABLE_LIST* table= first_table; table; table= table->next_global)
1747   {
1748     if (thd->find_temporary_table(table->db.str, table->table_name.str))
1749     {
1750       found_temp_table= true;
1751       break;
1752     }
1753   }
1754 
1755   if (found_temp_table)
1756   {
1757     buff.append("DROP TABLE ");
1758     if (lex->check_exists)
1759       buff.append("IF EXISTS ");
1760 
1761     for (TABLE_LIST* table= first_table; table; table= table->next_global)
1762     {
1763       if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1764       {
1765         append_identifier(thd, &buff, table->db.str, table->db.length);
1766         buff.append(".");
1767         append_identifier(thd, &buff,
1768                           table->table_name.str, table->table_name.length);
1769         buff.append(",");
1770       }
1771     }
1772 
1773     /* Chop the last comma */
1774     buff.chop();
1775     buff.append(" /* generated by wsrep */");
1776 
1777     WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr());
1778 
1779     return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1780   }
1781   else
1782   {
1783     return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1784                                buf, buf_len);
1785   }
1786 }
1787 
1788 
1789 /* Forward declarations. */
1790 int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
1791 
1792 /*
1793   Decide if statement should run in TOI.
1794 
1795   Look if table or table_list contain temporary tables. If the
1796   statement affects only temporary tables,   statement should not run
1797   in TOI. If the table list contains mix of regular and temporary tables
1798   (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but
1799   should be rewritten at later time for replication to contain only
1800   non-temporary tables.
1801  */
wsrep_can_run_in_toi(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list)1802 static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
1803                                  const TABLE_LIST *table_list)
1804 {
1805   DBUG_ASSERT(!table || db);
1806   DBUG_ASSERT(table_list || db);
1807 
1808   LEX* lex= thd->lex;
1809   SELECT_LEX* select_lex= lex->first_select_lex();
1810   TABLE_LIST* first_table= select_lex->table_list.first;
1811 
1812   switch (lex->sql_command)
1813   {
1814   case SQLCOM_CREATE_TABLE:
1815     if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)
1816     {
1817       return false;
1818     }
1819     /*
1820       If mariadb master has replicated a CTAS, we should not replicate the create table
1821       part separately as TOI, but to replicate both create table and following inserts
1822       as one write set.
1823       Howver, if CTAS creates empty table, we should replicate the create table alone
1824       as TOI. We have to do relay log event lookup to see if row events follow the
1825       create table event.
1826     */
1827     if (thd->slave_thread && !(thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_STANDALONE))
1828     {
1829       /* this is CTAS, either empty or populated table */
1830       ulonglong event_size = 0;
1831       enum Log_event_type ev_type= wsrep_peak_event(thd->rgi_slave, &event_size);
1832       switch (ev_type)
1833       {
1834       case QUERY_EVENT:
1835         /* CTAS with empty table, we replicate create table as TOI */
1836         break;
1837 
1838       case TABLE_MAP_EVENT:
1839         WSREP_DEBUG("replicating CTAS of empty table as TOI");
1840         // fall through
1841       case WRITE_ROWS_EVENT:
1842         /* CTAS with populated table, we replicate later at commit time */
1843         WSREP_DEBUG("skipping create table of CTAS replication");
1844         return false;
1845 
1846       default:
1847         WSREP_WARN("unexpected async replication event: %d", ev_type);
1848       }
1849       return true;
1850     }
1851     /* no next async replication event */
1852     return true;
1853 
1854   case SQLCOM_CREATE_VIEW:
1855 
1856     DBUG_ASSERT(!table_list);
1857     DBUG_ASSERT(first_table); /* First table is view name */
1858     /*
1859       If any of the remaining tables refer to temporary table error
1860       is returned to client, so TOI can be skipped
1861     */
1862     for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global)
1863     {
1864       if (thd->find_temporary_table(it))
1865       {
1866         return false;
1867       }
1868     }
1869     return true;
1870 
1871   case SQLCOM_CREATE_TRIGGER:
1872 
1873     DBUG_ASSERT(first_table);
1874 
1875     if (thd->find_temporary_table(first_table))
1876     {
1877       return false;
1878     }
1879     return true;
1880 
1881   case SQLCOM_DROP_TRIGGER:
1882     DBUG_ASSERT(table_list);
1883     if (thd->find_temporary_table(table_list))
1884     {
1885       return false;
1886     }
1887     return true;
1888 
1889   default:
1890     if (table && !thd->find_temporary_table(db, table))
1891     {
1892       return true;
1893     }
1894 
1895     if (table_list)
1896     {
1897       for (TABLE_LIST* table= first_table; table; table= table->next_global)
1898       {
1899         if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1900         {
1901           return true;
1902         }
1903       }
1904     }
1905     return !(table || table_list);
1906   }
1907 }
1908 
1909 
wsrep_create_sp(THD * thd,uchar ** buf,size_t * buf_len)1910 static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
1911 {
1912   String log_query;
1913   sp_head *sp= thd->lex->sphead;
1914   sql_mode_t saved_mode= thd->variables.sql_mode;
1915   String retstr(64);
1916   LEX_CSTRING returns= empty_clex_str;
1917   retstr.set_charset(system_charset_info);
1918 
1919   log_query.set_charset(system_charset_info);
1920 
1921   if (sp->m_handler->type() == TYPE_ENUM_FUNCTION)
1922   {
1923     sp_returns_type(thd, retstr, sp);
1924     returns= retstr.lex_cstring();
1925   }
1926   if (sp->m_handler->
1927       show_create_sp(thd, &log_query,
1928                      sp->m_explicit_name ? sp->m_db : null_clex_str,
1929                      sp->m_name, sp->m_params, returns,
1930                      sp->m_body, sp->chistics(),
1931                      thd->lex->definer[0],
1932                      thd->lex->create_info,
1933                      saved_mode))
1934   {
1935     WSREP_WARN("SP create string failed: schema: %s, query: %s",
1936                thd->get_db(), thd->query());
1937     return 1;
1938   }
1939 
1940   return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
1941 }
1942 
wsrep_TOI_event_buf(THD * thd,uchar ** buf,size_t * buf_len)1943 static int wsrep_TOI_event_buf(THD* thd, uchar** buf, size_t* buf_len)
1944 {
1945   int err;
1946   switch (thd->lex->sql_command)
1947   {
1948   case SQLCOM_CREATE_VIEW:
1949     err= create_view_query(thd, buf, buf_len);
1950     break;
1951   case SQLCOM_CREATE_PROCEDURE:
1952   case SQLCOM_CREATE_SPFUNCTION:
1953     err= wsrep_create_sp(thd, buf, buf_len);
1954     break;
1955   case SQLCOM_CREATE_TRIGGER:
1956     err= wsrep_create_trigger_query(thd, buf, buf_len);
1957     break;
1958   case SQLCOM_CREATE_EVENT:
1959     err= wsrep_create_event_query(thd, buf, buf_len);
1960     break;
1961   case SQLCOM_ALTER_EVENT:
1962     err= wsrep_alter_event_query(thd, buf, buf_len);
1963     break;
1964   case SQLCOM_DROP_TABLE:
1965     err= wsrep_drop_table_query(thd, buf, buf_len);
1966     break;
1967   case SQLCOM_KILL:
1968     WSREP_DEBUG("KILL as TOI: %s", thd->query());
1969     err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1970                              buf, buf_len);
1971     break;
1972   case SQLCOM_CREATE_ROLE:
1973     if (sp_process_definer(thd))
1974     {
1975       WSREP_WARN("Failed to set CREATE ROLE definer for TOI.");
1976     }
1977     /* fallthrough */
1978   default:
1979     err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), buf,
1980                              buf_len);
1981     break;
1982   }
1983 
1984   return err;
1985 }
1986 
wsrep_TOI_begin_failed(THD * thd,const wsrep_buf_t *)1987 static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */)
1988 {
1989   if (wsrep_thd_trx_seqno(thd) > 0)
1990   {
1991     /* GTID was granted and TO acquired - need to log event and release TO */
1992     if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
1993     if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; }
1994     wsrep::client_state& cs(thd->wsrep_cs());
1995     int const ret= cs.leave_toi_local(wsrep::mutable_buffer());
1996     if (ret)
1997     {
1998       WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, "
1999                   "schema: %s, SQL: %s, rcode: %d wsrep_error: %s",
2000                   (long long)thd->real_id, thd->db.str,
2001                   thd->query(), ret, wsrep::to_c_string(cs.current_error()));
2002       goto fail;
2003     }
2004   }
2005   return;
2006 fail:
2007   WSREP_ERROR("Failed to release TOI resources. Need to abort.");
2008   unireg_abort(1);
2009 }
2010 
2011 
2012 /*
2013   returns:
2014    0: statement was replicated as TOI
2015    1: TOI replication was skipped
2016   -1: TOI replication failed
2017  */
wsrep_TOI_begin(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep::key_array * fk_tables)2018 static int wsrep_TOI_begin(THD *thd, const char *db, const char *table,
2019                            const TABLE_LIST* table_list,
2020                            Alter_info* alter_info, wsrep::key_array* fk_tables)
2021 {
2022   DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI);
2023 
2024   WSREP_DEBUG("TOI Begin for %s", wsrep_thd_query(thd));
2025   if (wsrep_can_run_in_toi(thd, db, table, table_list) == false)
2026   {
2027     WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd));
2028     return 1;
2029   }
2030 
2031   uchar* buf= 0;
2032   size_t buf_len(0);
2033   int buf_err;
2034   int rc;
2035 
2036   buf_err= wsrep_TOI_event_buf(thd, &buf, &buf_len);
2037   if (buf_err) {
2038     WSREP_ERROR("Failed to create TOI event buf: %d", buf_err);
2039     my_message(ER_UNKNOWN_ERROR,
2040                "WSREP replication failed to prepare TOI event buffer. "
2041                "Check your query.",
2042                MYF(0));
2043     return -1;
2044   }
2045   struct wsrep_buf buff= { buf, buf_len };
2046 
2047   wsrep::key_array key_array=
2048     wsrep_prepare_keys_for_toi(db, table, table_list, alter_info, fk_tables);
2049 
2050   if (thd->has_read_only_protection())
2051   {
2052     /* non replicated DDL, affecting temporary tables only */
2053     WSREP_DEBUG("TO isolation skipped, sql: %s."
2054                 "Only temporary tables affected.",
2055                 wsrep_thd_query(thd));
2056     if (buf) my_free(buf);
2057     return -1;
2058   }
2059 
2060   thd_proc_info(thd, "acquiring total order isolation");
2061 
2062   wsrep::client_state& cs(thd->wsrep_cs());
2063   int ret= cs.enter_toi_local(key_array,
2064                               wsrep::const_buffer(buff.ptr, buff.len));
2065 
2066   if (ret)
2067   {
2068     DBUG_ASSERT(cs.current_error());
2069     WSREP_DEBUG("to_execute_start() failed for %llu: %s, seqno: %lld",
2070                 thd->thread_id, wsrep_thd_query(thd),
2071                 (long long)wsrep_thd_trx_seqno(thd));
2072 
2073     /* jump to error handler in mysql_execute_command() */
2074     switch (cs.current_error())
2075     {
2076     case wsrep::e_size_exceeded_error:
2077       WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2078                  "Maximum size exceeded.",
2079                  ret,
2080                  (thd->db.str ? thd->db.str : "(null)"),
2081                  wsrep_thd_query(thd));
2082       my_error(ER_ERROR_DURING_COMMIT, MYF(0), WSREP_SIZE_EXCEEDED);
2083       break;
2084     case wsrep::e_deadlock_error:
2085       WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2086                  "Deadlock error.",
2087                  ret,
2088                  (thd->db.str ? thd->db.str : "(null)"),
2089                  wsrep_thd_query(thd));
2090       my_error(ER_LOCK_DEADLOCK, MYF(0));
2091       break;
2092     case wsrep::e_timeout_error:
2093       WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2094                  "Operation timed out.",
2095                  ret,
2096                  (thd->db.str ? thd->db.str : "(null)"),
2097                  wsrep_thd_query(thd));
2098       my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0));
2099       break;
2100     default:
2101       WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2102                  "Check your wsrep connection state and retry the query.",
2103                  ret,
2104                  (thd->db.str ? thd->db.str : "(null)"),
2105                  wsrep_thd_query(thd));
2106 
2107       if (!thd->is_error())
2108       {
2109         my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
2110                  "your wsrep connection state and retry the query.");
2111       }
2112     }
2113     rc= -1;
2114   }
2115   else {
2116     ++wsrep_to_isolation;
2117     rc= 0;
2118   }
2119 
2120   if (buf) my_free(buf);
2121 
2122   if (rc) wsrep_TOI_begin_failed(thd, NULL);
2123 
2124   return rc;
2125 }
2126 
wsrep_TOI_end(THD * thd)2127 static void wsrep_TOI_end(THD *thd) {
2128   wsrep_to_isolation--;
2129   wsrep::client_state& client_state(thd->wsrep_cs());
2130   DBUG_ASSERT(wsrep_thd_is_local_toi(thd));
2131 
2132   wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
2133 
2134   int ret= client_state.leave_toi_local(wsrep::mutable_buffer());
2135 
2136   if (!ret)
2137   {
2138     WSREP_DEBUG("TO END: %lld: %s",
2139                 client_state.toi_meta().seqno().get(), wsrep_thd_query(thd));
2140   }
2141   else
2142   {
2143     WSREP_WARN("TO isolation end failed for: %d, sql: %s",
2144                 ret, wsrep_thd_query(thd));
2145   }
2146 }
2147 
wsrep_RSU_begin(THD * thd,const char * db_,const char * table_)2148 static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_)
2149 {
2150   WSREP_DEBUG("RSU BEGIN: %lld, : %s", wsrep_thd_trx_seqno(thd),
2151               wsrep_thd_query(thd));
2152   if (thd->wsrep_cs().begin_rsu(5000))
2153   {
2154     WSREP_WARN("RSU begin failed");
2155   }
2156   else
2157   {
2158     thd->variables.wsrep_on= 0;
2159   }
2160   return 0;
2161 }
2162 
wsrep_RSU_end(THD * thd)2163 static void wsrep_RSU_end(THD *thd)
2164 {
2165   WSREP_DEBUG("RSU END: %lld : %s", wsrep_thd_trx_seqno(thd),
2166               wsrep_thd_query(thd));
2167   if (thd->wsrep_cs().end_rsu())
2168   {
2169     WSREP_WARN("Failed to end RSU, server may need to be restarted");
2170   }
2171   thd->variables.wsrep_on= 1;
2172 }
2173 
wsrep_to_isolation_begin(THD * thd,const char * db_,const char * table_,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep::key_array * fk_tables)2174 int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
2175                              const TABLE_LIST* table_list,
2176                              Alter_info* alter_info, wsrep::key_array* fk_tables)
2177 {
2178   /*
2179     No isolation for applier or replaying threads.
2180   */
2181   if (!wsrep_thd_is_local(thd))
2182     return 0;
2183 
2184   int ret= 0;
2185   mysql_mutex_lock(&thd->LOCK_thd_data);
2186 
2187   if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
2188   {
2189     WSREP_INFO("thread: %lld  schema: %s  query: %s has been aborted due to multi-master conflict",
2190                (longlong) thd->thread_id, thd->get_db(), thd->query());
2191     mysql_mutex_unlock(&thd->LOCK_thd_data);
2192     return WSREP_TRX_FAIL;
2193   }
2194   mysql_mutex_unlock(&thd->LOCK_thd_data);
2195 
2196   DBUG_ASSERT(wsrep_thd_is_local(thd));
2197   DBUG_ASSERT(thd->wsrep_trx().ws_meta().seqno().is_undefined());
2198 
2199   if (Wsrep_server_state::instance().desynced_on_pause())
2200   {
2201     my_message(ER_UNKNOWN_COM_ERROR,
2202                "Aborting TOI: Replication paused on node for FTWRL/BACKUP STAGE.", MYF(0));
2203     return -1;
2204   }
2205 
2206   if (wsrep_debug && thd->mdl_context.has_locks())
2207   {
2208     WSREP_DEBUG("thread holds MDL locks at TI begin: %s %llu",
2209                 wsrep_thd_query(thd), thd->thread_id);
2210   }
2211 
2212   /*
2213     It makes sense to set auto_increment_* to defaults in TOI operations.
2214     Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
2215     TOI statement and auto inc variables for wsrep replication is constructed
2216     there. Variables are reset back in THD::reset_for_next_command() before
2217     processing of next command.
2218    */
2219   if (wsrep_auto_increment_control)
2220   {
2221     thd->variables.auto_increment_offset= 1;
2222     thd->variables.auto_increment_increment= 1;
2223   }
2224 
2225   if (thd->variables.wsrep_on && wsrep_thd_is_local(thd))
2226   {
2227     switch (thd->variables.wsrep_OSU_method) {
2228     case WSREP_OSU_TOI:
2229       ret= wsrep_TOI_begin(thd, db_, table_, table_list, alter_info, fk_tables);
2230       break;
2231     case WSREP_OSU_RSU:
2232       ret= wsrep_RSU_begin(thd, db_, table_);
2233       break;
2234     default:
2235       WSREP_ERROR("Unsupported OSU method: %lu",
2236                   thd->variables.wsrep_OSU_method);
2237       ret= -1;
2238       break;
2239     }
2240 
2241     switch (ret) {
2242     case 0: /* wsrep_TOI_begin sould set toi mode */
2243       if (thd->variables.wsrep_OSU_method == WSREP_OSU_TOI)
2244       {
2245         /*
2246           TOI operations ignore the provided lock_wait_timeout once replicated,
2247           and restore it after operation is done.
2248          */
2249         thd->variables.saved_lock_wait_timeout= thd->variables.lock_wait_timeout;
2250         thd->variables.lock_wait_timeout= LONG_TIMEOUT;
2251       }
2252       break;
2253     case 1:
2254       /* TOI replication skipped, treat as success */
2255       ret= 0;
2256       break;
2257     case -1:
2258       /* TOI replication failed, treat as error */
2259       break;
2260     }
2261   }
2262 
2263   return ret;
2264 }
2265 
wsrep_to_isolation_end(THD * thd)2266 void wsrep_to_isolation_end(THD *thd)
2267 {
2268   DBUG_ASSERT(wsrep_thd_is_local_toi(thd) ||
2269               wsrep_thd_is_in_rsu(thd));
2270 
2271   if (wsrep_thd_is_local_toi(thd))
2272   {
2273     thd->variables.lock_wait_timeout= thd->variables.saved_lock_wait_timeout;
2274     DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI);
2275     wsrep_TOI_end(thd);
2276   }
2277   else if (wsrep_thd_is_in_rsu(thd))
2278   {
2279     DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_RSU);
2280     wsrep_RSU_end(thd);
2281   }
2282   else
2283   {
2284     DBUG_ASSERT(0);
2285   }
2286   if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
2287 }
2288 
2289 #define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra)             \
2290     WSREP_##severity(                                                          \
2291       "%s\n"                                                                   \
2292       "schema:  %.*s\n"                                                        \
2293       "request: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)\n"     \
2294       "granted: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)",      \
2295       msg, schema_len, schema,                                                 \
2296       req->thread_id, (long long)wsrep_thd_trx_seqno(req),                     \
2297       wsrep_thd_client_mode_str(req), wsrep_thd_client_state_str(req), wsrep_thd_transaction_state_str(req), \
2298       req->get_command(), req->lex->sql_command, req->query(),                 \
2299       gra->thread_id, (long long)wsrep_thd_trx_seqno(gra),                     \
2300       wsrep_thd_client_mode_str(gra), wsrep_thd_client_state_str(gra), wsrep_thd_transaction_state_str(gra), \
2301       gra->get_command(), gra->lex->sql_command, gra->query());
2302 
2303 /**
2304   Check if request for the metadata lock should be granted to the requester.
2305 
2306   @param  requestor_ctx        The MDL context of the requestor
2307   @param  ticket               MDL ticket for the requested lock
2308 
2309   @retval TRUE   Lock request can be granted
2310   @retval FALSE  Lock request cannot be granted
2311 */
2312 
wsrep_handle_mdl_conflict(MDL_context * requestor_ctx,MDL_ticket * ticket,const MDL_key * key)2313 void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
2314                                MDL_ticket *ticket,
2315                                const MDL_key *key)
2316 {
2317   /* Fallback to the non-wsrep behaviour */
2318   if (!WSREP_ON) return;
2319 
2320   THD *request_thd= requestor_ctx->get_thd();
2321   THD *granted_thd= ticket->get_ctx()->get_thd();
2322 
2323   const char* schema= key->db_name();
2324   int schema_len= key->db_name_length();
2325 
2326   mysql_mutex_lock(&request_thd->LOCK_thd_data);
2327   if (wsrep_thd_is_toi(request_thd) ||
2328       wsrep_thd_is_applying(request_thd)) {
2329 
2330     mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2331     WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
2332                   request_thd, granted_thd);
2333     ticket->wsrep_report(wsrep_debug);
2334 
2335     /* Here we will call wsrep_abort_transaction so we should hold
2336     THD::LOCK_thd_data to protect victim from concurrent usage
2337     and THD::LOCK_thd_kill to protect from disconnect or delete. */
2338     wsrep_thd_LOCK(granted_thd);
2339 
2340     if (wsrep_thd_is_toi(granted_thd) ||
2341         wsrep_thd_is_applying(granted_thd))
2342     {
2343       if (wsrep_thd_is_aborting(granted_thd))
2344       {
2345         WSREP_DEBUG("BF thread waiting for SR in aborting state");
2346         ticket->wsrep_report(wsrep_debug);
2347         wsrep_thd_UNLOCK(granted_thd);
2348       }
2349       else if (wsrep_thd_is_SR(granted_thd) && !wsrep_thd_is_SR(request_thd))
2350       {
2351         WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR",
2352                       schema, schema_len, request_thd, granted_thd);
2353         wsrep_abort_thd(request_thd, granted_thd, 1);
2354         mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2355         mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2356       }
2357       else
2358       {
2359         WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
2360                       request_thd, granted_thd);
2361         ticket->wsrep_report(true);
2362         wsrep_thd_UNLOCK(granted_thd);
2363         unireg_abort(1);
2364       }
2365     }
2366     else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
2367              granted_thd->mdl_context.has_explicit_locks())
2368     {
2369       WSREP_DEBUG("BF thread waiting for FLUSH");
2370       ticket->wsrep_report(wsrep_debug);
2371       wsrep_thd_UNLOCK(granted_thd);
2372     }
2373     else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
2374     {
2375       WSREP_DEBUG("DROP caused BF abort, conf %s",
2376                   wsrep_thd_transaction_state_str(granted_thd));
2377       ticket->wsrep_report(wsrep_debug);
2378       wsrep_abort_thd(request_thd, granted_thd, 1);
2379       mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2380       mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2381     }
2382     else
2383     {
2384       WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
2385                     request_thd, granted_thd);
2386       ticket->wsrep_report(wsrep_debug);
2387       if (granted_thd->wsrep_trx().active())
2388       {
2389         wsrep_abort_thd(request_thd, granted_thd, true);
2390         mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2391         mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2392       }
2393       else
2394       {
2395         /*
2396           Granted_thd is likely executing with wsrep_on=0. If the requesting
2397           thd is BF, BF abort and wait.
2398         */
2399         if (wsrep_thd_is_BF(request_thd, FALSE))
2400         {
2401           ha_abort_transaction(request_thd, granted_thd, TRUE);
2402           mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2403           mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2404         }
2405         else
2406         {
2407 	  WSREP_MDL_LOG(INFO, "MDL unknown BF-BF conflict", schema, schema_len,
2408                       request_thd, granted_thd);
2409 	  ticket->wsrep_report(true);
2410 	  unireg_abort(1);
2411         }
2412       }
2413     }
2414   }
2415   else
2416   {
2417     mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2418   }
2419 }
2420 
2421 /**/
abort_replicated(THD * thd)2422 static bool abort_replicated(THD *thd)
2423 {
2424   bool ret_code= false;
2425   wsrep_thd_LOCK(thd);
2426   if (thd->wsrep_trx().state() == wsrep::transaction::s_committing)
2427   {
2428     WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
2429 
2430     (void)wsrep_abort_thd(thd, thd, TRUE);
2431     ret_code= true;
2432   }
2433   else
2434     wsrep_thd_UNLOCK(thd);
2435 
2436   return ret_code;
2437 }
2438 
2439 /**/
is_client_connection(THD * thd)2440 static inline bool is_client_connection(THD *thd)
2441 {
2442   return (thd->wsrep_client_thread && thd->variables.wsrep_on);
2443 }
2444 
is_replaying_connection(THD * thd)2445 static inline bool is_replaying_connection(THD *thd)
2446 {
2447   bool ret;
2448 
2449   mysql_mutex_lock(&thd->LOCK_thd_data);
2450   ret=  (thd->wsrep_trx().state() == wsrep::transaction::s_replaying) ? true : false;
2451   mysql_mutex_unlock(&thd->LOCK_thd_data);
2452 
2453   return ret;
2454 }
2455 
is_committing_connection(THD * thd)2456 static inline bool is_committing_connection(THD *thd)
2457 {
2458   bool ret;
2459 
2460   mysql_mutex_lock(&thd->LOCK_thd_data);
2461   ret=  (thd->wsrep_trx().state() == wsrep::transaction::s_committing) ? true : false;
2462   mysql_mutex_unlock(&thd->LOCK_thd_data);
2463 
2464   return ret;
2465 }
2466 
have_client_connections(THD * thd,void *)2467 static my_bool have_client_connections(THD *thd, void*)
2468 {
2469   DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2470                      (longlong) thd->thread_id));
2471   if (is_client_connection(thd) && thd->killed == KILL_CONNECTION)
2472   {
2473     WSREP_DEBUG("Informing thread %lld that it's time to die",
2474                 thd->thread_id);
2475     (void)abort_replicated(thd);
2476     return true;
2477   }
2478   return 0;
2479 }
2480 
wsrep_close_thread(THD * thd)2481 static void wsrep_close_thread(THD *thd)
2482 {
2483   thd->set_killed(KILL_CONNECTION);
2484   MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
2485   mysql_mutex_lock(&thd->LOCK_thd_kill);
2486   thd->abort_current_cond_wait(true);
2487   mysql_mutex_unlock(&thd->LOCK_thd_kill);
2488 }
2489 
have_committing_connections(THD * thd,void *)2490 static my_bool have_committing_connections(THD *thd, void *)
2491 {
2492   return is_client_connection(thd) && is_committing_connection(thd) ? 1 : 0;
2493 }
2494 
wsrep_wait_committing_connections_close(int wait_time)2495 int wsrep_wait_committing_connections_close(int wait_time)
2496 {
2497   int sleep_time= 100;
2498 
2499   WSREP_DEBUG("wait for committing transaction to close: %d sleep: %d", wait_time, sleep_time);
2500   while (server_threads.iterate(have_committing_connections) && wait_time > 0)
2501   {
2502     WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
2503     my_sleep(sleep_time);
2504     wait_time -= sleep_time;
2505   }
2506   return server_threads.iterate(have_committing_connections);
2507 }
2508 
kill_all_threads(THD * thd,THD * caller_thd)2509 static my_bool kill_all_threads(THD *thd, THD *caller_thd)
2510 {
2511   DBUG_PRINT("quit", ("Informing thread %lld that it's time to die",
2512                       (longlong) thd->thread_id));
2513   WSREP_DEBUG("Informing thread %lld that it's time to die",
2514 	      thd->thread_id);
2515   /* We skip slave threads & scheduler on this first loop through. */
2516   if (is_client_connection(thd) && thd != caller_thd)
2517   {
2518     if (is_replaying_connection(thd))
2519       thd->set_killed(KILL_CONNECTION);
2520     else if (!abort_replicated(thd))
2521     {
2522       /* replicated transactions must be skipped */
2523       WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id);
2524       /* instead of wsrep_close_thread() we do now  soft kill by THD::awake */
2525       thd->awake(KILL_CONNECTION);
2526     }
2527   }
2528   return 0;
2529 }
2530 
kill_remaining_threads(THD * thd,THD * caller_thd)2531 static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)
2532 {
2533 #ifndef __bsdi__				// Bug in BSDI kernel
2534   if (is_client_connection(thd) &&
2535       !abort_replicated(thd)    &&
2536       !is_replaying_connection(thd) &&
2537       thd_is_connection_alive(thd) &&
2538       thd != caller_thd)
2539   {
2540 
2541     WSREP_INFO("killing local connection: %lld", (longlong) thd->thread_id);
2542     close_connection(thd);
2543   }
2544 #endif
2545   return 0;
2546 }
2547 
wsrep_close_client_connections(my_bool wait_to_end,THD * except_caller_thd)2548 void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
2549 {
2550   /* Clear thread cache */
2551   kill_cached_threads++;
2552   flush_thread_cache();
2553 
2554   /*
2555     First signal all threads that it's time to die
2556   */
2557   server_threads.iterate(kill_all_threads, except_caller_thd);
2558 
2559   /*
2560     Force remaining threads to die by closing the connection to the client
2561   */
2562   server_threads.iterate(kill_remaining_threads, except_caller_thd);
2563 
2564   DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)", THD_count::value()));
2565   WSREP_DEBUG("waiting for client connections to close: %u", THD_count::value());
2566 
2567   while (wait_to_end && server_threads.iterate(have_client_connections))
2568   {
2569     sleep(1);
2570     DBUG_PRINT("quit",("One thread died (count=%u)", THD_count::value()));
2571   }
2572 
2573   /* All client connection threads have now been aborted */
2574 }
2575 
2576 
wsrep_close_applier(THD * thd)2577 void wsrep_close_applier(THD *thd)
2578 {
2579   WSREP_DEBUG("closing applier %lld", (longlong) thd->thread_id);
2580   wsrep_close_thread(thd);
2581 }
2582 
wsrep_close_threads_callback(THD * thd,THD * caller_thd)2583 static my_bool wsrep_close_threads_callback(THD *thd, THD *caller_thd)
2584 {
2585   DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2586                      (longlong) thd->thread_id));
2587   /* We skip slave threads & scheduler on this first loop through. */
2588   if (thd->wsrep_applier && thd != caller_thd)
2589   {
2590     WSREP_DEBUG("closing wsrep thread %lld", (longlong) thd->thread_id);
2591     wsrep_close_thread(thd);
2592   }
2593   return 0;
2594 }
2595 
wsrep_close_threads(THD * thd)2596 void wsrep_close_threads(THD *thd)
2597 {
2598   server_threads.iterate(wsrep_close_threads_callback, thd);
2599 }
2600 
wsrep_wait_appliers_close(THD * thd)2601 void wsrep_wait_appliers_close(THD *thd)
2602 {
2603   /* Wait for wsrep appliers to gracefully exit */
2604   mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2605   while (wsrep_running_threads > 2)
2606     /*
2607       2 is for rollbacker thread which needs to be killed explicitly.
2608       This gotta be fixed in a more elegant manner if we gonna have arbitrary
2609       number of non-applier wsrep threads.
2610     */
2611   {
2612     mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
2613   }
2614   mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2615   DBUG_PRINT("quit",("applier threads have died (count=%u)",
2616                      uint32_t(wsrep_running_threads)));
2617 
2618   /* Now kill remaining wsrep threads: rollbacker */
2619   wsrep_close_threads (thd);
2620   /* and wait for them to die */
2621   mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2622   while (wsrep_running_threads > 0)
2623   {
2624     mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
2625   }
2626   mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2627   DBUG_PRINT("quit",("all wsrep system threads have died"));
2628 
2629   /* All wsrep applier threads have now been aborted. However, if this thread
2630      is also applier, we are still running...
2631   */
2632 }
2633 
wsrep_must_ignore_error(THD * thd)2634 int wsrep_must_ignore_error(THD* thd)
2635 {
2636   const int error= thd->get_stmt_da()->sql_errno();
2637   const uint flags= sql_command_flags[thd->lex->sql_command];
2638 
2639   DBUG_ASSERT(error);
2640   DBUG_ASSERT(wsrep_thd_is_toi(thd) || wsrep_thd_is_applying(thd));
2641 
2642   if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL))
2643     goto ignore_error;
2644 
2645   if ((flags & CF_WSREP_MAY_IGNORE_ERRORS) &&
2646       (wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DDL))
2647   {
2648     switch (error)
2649     {
2650     case ER_DB_DROP_EXISTS:
2651     case ER_BAD_TABLE_ERROR:
2652     case ER_CANT_DROP_FIELD_OR_KEY:
2653       goto ignore_error;
2654     }
2655   }
2656 
2657   return 0;
2658 
2659 ignore_error:
2660   WSREP_WARN("Ignoring error '%s' on query. "
2661              "Default database: '%s'. Query: '%s', Error_code: %d",
2662              thd->get_stmt_da()->message(),
2663              print_slave_db_safe(thd->db.str),
2664              thd->query(),
2665              error);
2666   return 1;
2667 }
2668 
wsrep_ignored_error_code(Log_event * ev,int error)2669 int wsrep_ignored_error_code(Log_event* ev, int error)
2670 {
2671   const THD* thd= ev->thd;
2672 
2673   DBUG_ASSERT(error);
2674   DBUG_ASSERT(wsrep_thd_is_applying(thd) &&
2675               !wsrep_thd_is_local_toi(thd));
2676 
2677   if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DML))
2678   {
2679     const int ev_type= ev->get_type_code();
2680     if ((ev_type == DELETE_ROWS_EVENT || ev_type == DELETE_ROWS_EVENT_V1)
2681         && error == ER_KEY_NOT_FOUND)
2682       goto ignore_error;
2683   }
2684 
2685   return 0;
2686 
2687 ignore_error:
2688   WSREP_WARN("Ignoring error '%s' on %s event. Error_code: %d",
2689              thd->get_stmt_da()->message(),
2690              ev->get_type_str(),
2691              error);
2692   return 1;
2693 }
2694 
wsrep_provider_is_SR_capable()2695 bool wsrep_provider_is_SR_capable()
2696 {
2697   return Wsrep_server_state::has_capability(wsrep::provider::capability::streaming);
2698 }
2699 
wsrep_thd_retry_counter(const THD * thd)2700 int wsrep_thd_retry_counter(const THD *thd)
2701 {
2702   return thd->wsrep_retry_counter;
2703 }
2704 
wsrep_thd_ignore_table(THD * thd)2705 extern  bool wsrep_thd_ignore_table(THD *thd)
2706 {
2707   return thd->wsrep_ignore_table;
2708 }
2709 
wsrep_is_show_query(enum enum_sql_command command)2710 bool wsrep_is_show_query(enum enum_sql_command command)
2711 {
2712   DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
2713   return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
2714 }
wsrep_create_like_table(THD * thd,TABLE_LIST * table,TABLE_LIST * src_table,HA_CREATE_INFO * create_info)2715 bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
2716                              TABLE_LIST* src_table,
2717                              HA_CREATE_INFO *create_info)
2718 {
2719   if (create_info->tmp_table())
2720   {
2721     /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */
2722     WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s",
2723                 thd->query());
2724   }
2725   else if (!(thd->find_temporary_table(src_table)))
2726   {
2727     /* this is straight CREATE TABLE LIKE... with no tmp tables */
2728     WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2729   }
2730   else
2731   {
2732     /* Non-MERGE tables ignore this call. */
2733     if (src_table->table->file->extra(HA_EXTRA_ADD_CHILDREN_LIST))
2734       return (true);
2735 
2736     char buf[2048];
2737     String query(buf, sizeof(buf), system_charset_info);
2738     query.length(0);  // Have to zero it since constructor doesn't
2739 
2740     int result __attribute__((unused))=
2741       show_create_table(thd, src_table, &query, NULL, WITH_DB_NAME);
2742     WSREP_DEBUG("TMP TABLE: %s ret_code %d", query.ptr(), result);
2743 
2744     thd->wsrep_TOI_pre_query=     query.ptr();
2745     thd->wsrep_TOI_pre_query_len= query.length();
2746 
2747     WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2748 
2749     thd->wsrep_TOI_pre_query=      NULL;
2750     thd->wsrep_TOI_pre_query_len= 0;
2751 
2752     /* Non-MERGE tables ignore this call. */
2753     src_table->table->file->extra(HA_EXTRA_DETACH_CHILDREN);
2754   }
2755 
2756   return(false);
2757 #ifdef WITH_WSREP
2758 wsrep_error_label:
2759   thd->wsrep_TOI_pre_query= NULL;
2760   return (true);
2761 #endif
2762 }
2763 
wsrep_create_trigger_query(THD * thd,uchar ** buf,size_t * buf_len)2764 int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
2765 {
2766   LEX *lex= thd->lex;
2767   String stmt_query;
2768 
2769   LEX_CSTRING definer_user;
2770   LEX_CSTRING definer_host;
2771 
2772   if (!lex->definer)
2773   {
2774     if (!thd->slave_thread)
2775     {
2776       if (!(lex->definer= create_default_definer(thd, false)))
2777         return 1;
2778     }
2779   }
2780 
2781   if (lex->definer)
2782   {
2783     /* SUID trigger. */
2784     LEX_USER *d= get_current_user(thd, lex->definer);
2785 
2786     if (!d)
2787       return 1;
2788 
2789     definer_user= d->user;
2790     definer_host= d->host;
2791   }
2792   else
2793   {
2794     /* non-SUID trigger. */
2795 
2796     definer_user.str= 0;
2797     definer_user.length= 0;
2798 
2799     definer_host.str= 0;
2800     definer_host.length= 0;
2801   }
2802 
2803   const LEX_CSTRING command[2]=
2804       {{ C_STRING_WITH_LEN("CREATE ") },
2805        { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
2806 
2807   if (thd->lex->create_info.or_replace())
2808     stmt_query.append(command[1]);
2809   else
2810     stmt_query.append(command[0]);
2811 
2812   append_definer(thd, &stmt_query, &definer_user, &definer_host);
2813 
2814   LEX_CSTRING stmt_definition;
2815   stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
2816   stmt_definition.length= thd->lex->stmt_definition_end
2817     - thd->lex->stmt_definition_begin;
2818   trim_whitespace(thd->charset(), &stmt_definition);
2819 
2820   stmt_query.append(stmt_definition.str, stmt_definition.length);
2821 
2822   return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
2823                              buf, buf_len);
2824 }
2825 
start_wsrep_THD(void * arg)2826 void* start_wsrep_THD(void *arg)
2827 {
2828   THD *thd;
2829 
2830   Wsrep_thd_args* thd_args= (Wsrep_thd_args*) arg;
2831 
2832   if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
2833   {
2834     goto error;
2835   }
2836 
2837   statistic_increment(thread_created, &LOCK_status);
2838 
2839   if (wsrep_gtid_mode)
2840   {
2841     /* Adjust domain_id. */
2842     thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
2843   }
2844 
2845   thd->real_id=pthread_self(); // Keep purify happy
2846 
2847   my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
2848 
2849   DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
2850   thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
2851 
2852   server_threads.insert(thd);
2853 
2854   /* from bootstrap()... */
2855   thd->bootstrap=1;
2856   thd->max_client_packet_length= thd->net.max_packet;
2857   thd->security_ctx->master_access= ~(ulong)0;
2858 
2859   /* from handle_one_connection... */
2860   pthread_detach_this_thread();
2861 
2862   mysql_thread_set_psi_id(thd->thread_id);
2863   thd->thr_create_utime=  microsecond_interval_timer();
2864   if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
2865   {
2866     close_connection(thd, ER_OUT_OF_RESOURCES);
2867     statistic_increment(aborted_connects,&LOCK_status);
2868     MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2869     goto error;
2870   }
2871 
2872 // </5.1.17>
2873   /*
2874     handle_one_connection() is normally the only way a thread would
2875     start and would always be on the very high end of the stack ,
2876     therefore, the thread stack always starts at the address of the
2877     first local variable of handle_one_connection, which is thd. We
2878     need to know the start of the stack so that we could check for
2879     stack overruns.
2880   */
2881   DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld",
2882                        (long long)thd->thread_id));
2883   /* now that we've called my_thread_init(), it is safe to call DBUG_* */
2884 
2885   thd->thread_stack= (char*) &thd;
2886   wsrep_assign_from_threadvars(thd);
2887   if (wsrep_store_threadvars(thd))
2888   {
2889     close_connection(thd, ER_OUT_OF_RESOURCES);
2890     statistic_increment(aborted_connects,&LOCK_status);
2891     MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2892     delete thd;
2893     delete thd_args;
2894     goto error;
2895   }
2896 
2897   thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
2898   thd->security_ctx->skip_grants();
2899 
2900   /* handle_one_connection() again... */
2901   thd->proc_info= 0;
2902   thd->set_command(COM_SLEEP);
2903   thd->init_for_queries();
2904   mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2905 
2906   wsrep_running_threads++;
2907 
2908   switch (thd_args->thread_type()) {
2909     case WSREP_APPLIER_THREAD:
2910       wsrep_running_applier_threads++;
2911       break;
2912     case WSREP_ROLLBACKER_THREAD:
2913       wsrep_running_rollbacker_threads++;
2914       break;
2915     default:
2916       WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type());
2917       break;
2918   }
2919 
2920   mysql_cond_broadcast(&COND_wsrep_slave_threads);
2921   mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2922 
2923   WSREP_DEBUG("wsrep system thread %llu, %p starting",
2924               thd->thread_id, thd);
2925   thd_args->fun()(thd, static_cast<void *>(thd_args));
2926 
2927   WSREP_DEBUG("wsrep system thread: %llu, %p closing",
2928               thd->thread_id, thd);
2929 
2930   /* Wsrep may reset globals during thread context switches, store globals
2931      before cleanup. */
2932   wsrep_store_threadvars(thd);
2933 
2934   close_connection(thd, 0);
2935 
2936   mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2937   DBUG_ASSERT(wsrep_running_threads > 0);
2938   wsrep_running_threads--;
2939 
2940   switch (thd_args->thread_type()) {
2941     case WSREP_APPLIER_THREAD:
2942       DBUG_ASSERT(wsrep_running_applier_threads > 0);
2943       wsrep_running_applier_threads--;
2944       break;
2945     case WSREP_ROLLBACKER_THREAD:
2946       DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
2947       wsrep_running_rollbacker_threads--;
2948       break;
2949     default:
2950       WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type());
2951       break;
2952   }
2953 
2954   delete thd_args;
2955   WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
2956   mysql_cond_broadcast(&COND_wsrep_slave_threads);
2957   mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2958   /*
2959     Note: We can't call THD destructor without crashing
2960     if plugins have not been initialized. However, in most of the
2961     cases this means that pre SE initialization SST failed and
2962     we are going to exit anyway.
2963   */
2964   if (plugins_are_initialized)
2965   {
2966     net_end(&thd->net);
2967     MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
2968   }
2969   else
2970   {
2971     /*
2972       TODO: lightweight cleanup to get rid of:
2973       'Error in my_thread_global_end(): 2 threads didn't exit'
2974       at server shutdown
2975     */
2976   }
2977 
2978   server_threads.erase(thd);
2979   delete thd;
2980   my_thread_end();
2981   return(NULL);
2982 
2983 error:
2984   WSREP_ERROR("Failed to create/initialize system thread");
2985 
2986   /* Abort if its the first applier/rollbacker thread. */
2987   if (!mysqld_server_initialized)
2988     unireg_abort(1);
2989   else
2990     return NULL;
2991 }
2992 
wsrep_fragment_unit(ulong unit)2993 enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit)
2994 {
2995   switch (unit)
2996   {
2997   case WSREP_FRAG_BYTES: return wsrep::streaming_context::bytes;
2998   case WSREP_FRAG_ROWS: return wsrep::streaming_context::row;
2999   case WSREP_FRAG_STATEMENTS: return wsrep::streaming_context::statement;
3000   default:
3001     DBUG_ASSERT(0);
3002     return wsrep::streaming_context::bytes;
3003   }
3004 }
3005 
3006 /***** callbacks for wsrep service ************/
3007 
get_wsrep_recovery()3008 my_bool get_wsrep_recovery()
3009 {
3010   return wsrep_recovery;
3011 }
3012 
wsrep_consistency_check(THD * thd)3013 bool wsrep_consistency_check(THD *thd)
3014 {
3015   return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
3016 }
3017