1 /* Copyright 2008-2021 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.x1
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #include "sql_plugin.h" /* wsrep_plugins_pre_init() */
17 #include <mysqld.h>
18 #include <sql_class.h>
19 #include <sql_parse.h>
20 #include <sql_base.h> /* find_temporary_table() */
21 #include "slave.h"
22 #include "rpl_mi.h"
23 #include "sql_repl.h"
24 #include "rpl_filter.h"
25 #include "sql_callback.h"
26 #include "sp_head.h"
27 #include "sql_show.h"
28 #include "sp.h"
29 #include "wsrep_priv.h"
30 #include "wsrep_thd.h"
31 #include "wsrep_sst.h"
32 #include "wsrep_utils.h"
33 #include "wsrep_var.h"
34 #include "wsrep_binlog.h"
35 #include "wsrep_applier.h"
36 #include "wsrep_xid.h"
37 #include <cstdio>
38 #include <cstdlib>
39 #include "log_event.h"
40
41 wsrep_t *wsrep = NULL;
42 /*
43 wsrep_emulate_bin_log is a flag to tell that binlog has not been configured.
44 wsrep needs to get binlog events from transaction cache even when binlog is
45 not enabled, wsrep_emulate_bin_log opens needed code paths to make this
46 possible
47 */
48 my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
49 #ifdef GTID_SUPPORT
50 /* Sidno in global_sid_map corresponding to group uuid */
51 rpl_sidno wsrep_sidno= -1;
52 #endif /* GTID_SUPPORT */
53 my_bool wsrep_preordered_opt= FALSE;
54
55 /*
56 * Begin configuration options
57 */
58
59 extern my_bool plugins_are_initialized;
60 extern uint kill_cached_threads;
61 extern mysql_cond_t COND_thread_cache;
62
63 /* System variables. */
64 const char *wsrep_provider;
65 const char *wsrep_provider_options;
66 const char *wsrep_cluster_address;
67 const char *wsrep_cluster_name;
68 const char *wsrep_node_name;
69 const char *wsrep_node_address;
70 const char *wsrep_node_incoming_address;
71 const char *wsrep_start_position;
72 const char *wsrep_data_home_dir;
73 const char *wsrep_dbug_option;
74 const char *wsrep_notify_cmd;
75
76 my_bool wsrep_debug; // Enable debug level logging
77 my_bool wsrep_convert_LOCK_to_trx; // Convert locking sessions to trx
78 my_bool wsrep_auto_increment_control; // Control auto increment variables
79 my_bool wsrep_drupal_282555_workaround; // Retry autoinc insert after dupkey
80 my_bool wsrep_certify_nonPK; // Certify, even when no primary key
81 ulong wsrep_certification_rules = WSREP_CERTIFICATION_RULES_STRICT;
82 my_bool wsrep_recovery; // Recovery
83 my_bool wsrep_replicate_myisam; // Enable MyISAM replication
84 my_bool wsrep_log_conflicts;
85 my_bool wsrep_load_data_splitting; // Commit load data every 10K intervals
86 my_bool wsrep_slave_UK_checks; // Slave thread does UK checks
87 my_bool wsrep_slave_FK_checks; // Slave thread does FK checks
88 my_bool wsrep_restart_slave; // Should mysql slave thread be
89 // restarted, when node joins back?
90 my_bool wsrep_desync; // De(re)synchronize the node from the
91 // cluster
92 long wsrep_slave_threads; // No. of slave appliers threads
93 ulong wsrep_retry_autocommit; // Retry aborted autocommit trx
94 ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) size
95 ulong wsrep_max_ws_rows; // Max number of rows in ws
96 ulong wsrep_forced_binlog_format;
97 ulong wsrep_mysql_replication_bundle;
98 bool wsrep_gtid_mode; // Use wsrep_gtid_domain_id
99 // for galera transactions?
100 uint32 wsrep_gtid_domain_id; // gtid_domain_id for galera
101 // transactions
102
103 /* Other configuration variables and their default values. */
104 my_bool wsrep_incremental_data_collection= 0; // Incremental data collection
105 my_bool wsrep_restart_slave_activated= 0; // Node has dropped, and slave
106 // restart will be needed
107 bool wsrep_new_cluster= false; // Bootstrap the cluster?
108 int wsrep_slave_count_change= 0; // No. of appliers to stop/start
109 int wsrep_to_isolation= 0; // No. of active TO isolation threads
110 long wsrep_max_protocol_version= 3; // Maximum protocol version to use
111
112 /*
113 * End configuration options
114 */
115
116 /*
117 * Other wsrep global variables.
118 */
119
120 mysql_mutex_t LOCK_wsrep_ready;
121 mysql_cond_t COND_wsrep_ready;
122 mysql_mutex_t LOCK_wsrep_sst;
123 mysql_cond_t COND_wsrep_sst;
124 mysql_mutex_t LOCK_wsrep_sst_init;
125 mysql_cond_t COND_wsrep_sst_init;
126 mysql_mutex_t LOCK_wsrep_rollback;
127 mysql_cond_t COND_wsrep_rollback;
128 wsrep_aborting_thd_t wsrep_aborting_thd= NULL;
129 mysql_mutex_t LOCK_wsrep_replaying;
130 mysql_cond_t COND_wsrep_replaying;
131 mysql_mutex_t LOCK_wsrep_slave_threads;
132 mysql_mutex_t LOCK_wsrep_desync;
133 mysql_mutex_t LOCK_wsrep_config_state;
134
135 int wsrep_replaying= 0;
136 ulong wsrep_running_threads = 0; // # of currently running wsrep
137 // # threads
138 ulong wsrep_running_applier_threads = 0; // # of running applier threads
139 ulong wsrep_running_rollbacker_threads = 0; // # of running
140 // # rollbacker threads
141 ulong my_bind_addr;
142
143 #ifdef HAVE_PSI_INTERFACE
144 PSI_mutex_key key_LOCK_wsrep_rollback,
145 key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
146 key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
147 key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
148 key_LOCK_wsrep_config_state;
149
150 PSI_cond_key key_COND_wsrep_rollback,
151 key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
152 key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
153
154 PSI_file_key key_file_wsrep_gra_log;
155
156 static PSI_mutex_info wsrep_mutexes[]=
157 {
158 { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
159 { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
160 { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
161 { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
162 { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
163 { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL},
164 { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
165 { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
166 { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
167 { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
168 };
169
170 static PSI_cond_info wsrep_conds[]=
171 {
172 { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL},
173 { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
174 { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
175 { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
176 { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
177 { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
178 };
179
180 static PSI_file_info wsrep_files[]=
181 {
182 { &key_file_wsrep_gra_log, "wsrep_gra_log", 0}
183 };
184
185 PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
186 key_wsrep_rollbacker, key_wsrep_applier;
187
188 static PSI_thread_info wsrep_threads[]=
189 {
190 {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
191 {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
192 {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
193 {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}
194 };
195
196 #endif /* HAVE_PSI_INTERFACE */
197
198 my_bool wsrep_inited = 0; // initialized ?
199
200 static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
201 static char cluster_uuid_str[40]= { 0, };
202 static const char* cluster_status_str[WSREP_VIEW_MAX] =
203 {
204 "Primary",
205 "non-Primary",
206 "Disconnected"
207 };
208
209 static char provider_name[256]= { 0, };
210 static char provider_version[256]= { 0, };
211 static char provider_vendor[256]= { 0, };
212
213 /*
214 * wsrep status variables
215 */
216 my_bool wsrep_connected = FALSE;
217 my_bool wsrep_ready = FALSE; // node can accept queries
218 const char* wsrep_cluster_state_uuid = cluster_uuid_str;
219 long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
220 const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
221 long wsrep_cluster_size = 0;
222 long wsrep_local_index = -1;
223 long long wsrep_local_bf_aborts = 0;
224 const char* wsrep_provider_name = provider_name;
225 const char* wsrep_provider_version = provider_version;
226 const char* wsrep_provider_vendor = provider_vendor;
227 /* End wsrep status variables */
228
229 wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
230 wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
231 long wsrep_protocol_version = 3;
232
233 wsp::Config_state *wsrep_config_state;
234
235 // Boolean denoting if server is in initial startup phase. This is needed
236 // to make sure that main thread waiting in wsrep_sst_wait() is signaled
237 // if there was no state gap on receiving first view event.
238 static my_bool wsrep_startup = TRUE;
239
240
wsrep_log_cb(wsrep_log_level_t level,const char * msg)241 static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
242 switch (level) {
243 case WSREP_LOG_INFO:
244 sql_print_information("WSREP: %s", msg);
245 break;
246 case WSREP_LOG_WARN:
247 sql_print_warning("WSREP: %s", msg);
248 break;
249 case WSREP_LOG_ERROR:
250 case WSREP_LOG_FATAL:
251 sql_print_error("WSREP: %s", msg);
252 break;
253 case WSREP_LOG_DEBUG:
254 if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
255 default:
256 break;
257 }
258 }
259
wsrep_log(void (* fun)(const char *,...),const char * format,...)260 void wsrep_log(void (*fun)(const char *, ...), const char *format, ...)
261 {
262 va_list args;
263 char msg[1024];
264 va_start(args, format);
265 vsnprintf(msg, sizeof(msg) - 1, format, args);
266 va_end(args);
267 (fun)("WSREP: %s", msg);
268 }
269
270
wsrep_log_states(wsrep_log_level_t const level,const wsrep_uuid_t * const group_uuid,wsrep_seqno_t const group_seqno,const wsrep_uuid_t * const node_uuid,wsrep_seqno_t const node_seqno)271 static void wsrep_log_states (wsrep_log_level_t const level,
272 const wsrep_uuid_t* const group_uuid,
273 wsrep_seqno_t const group_seqno,
274 const wsrep_uuid_t* const node_uuid,
275 wsrep_seqno_t const node_seqno)
276 {
277 char uuid_str[37];
278 char msg[256];
279
280 wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
281 snprintf (msg, 255, "WSREP: Group state: %s:%lld",
282 uuid_str, (long long)group_seqno);
283 wsrep_log_cb (level, msg);
284
285 wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
286 snprintf (msg, 255, "WSREP: Local state: %s:%lld",
287 uuid_str, (long long)node_seqno);
288 wsrep_log_cb (level, msg);
289 }
290
291 #ifdef GTID_SUPPORT
wsrep_init_sidno(const wsrep_uuid_t & wsrep_uuid)292 void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid)
293 {
294 /* generate new Sid map entry from inverted uuid */
295 rpl_sid sid;
296 wsrep_uuid_t ltid_uuid;
297
298 for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
299 {
300 ltid_uuid.data[i] = ~wsrep_uuid.data[i];
301 }
302
303 sid.copy_from(ltid_uuid.data);
304 global_sid_lock->wrlock();
305 wsrep_sidno= global_sid_map->add_sid(sid);
306 WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno);
307 global_sid_lock->unlock();
308 }
309 #endif /* GTID_SUPPORT */
310
311 static wsrep_cb_status_t
wsrep_view_handler_cb(void * app_ctx,void * recv_ctx,const wsrep_view_info_t * view,const char * state,size_t state_len,void ** sst_req,size_t * sst_req_len)312 wsrep_view_handler_cb (void* app_ctx,
313 void* recv_ctx,
314 const wsrep_view_info_t* view,
315 const char* state,
316 size_t state_len,
317 void** sst_req,
318 size_t* sst_req_len)
319 {
320 *sst_req = NULL;
321 *sst_req_len = 0;
322
323 wsrep_member_status_t memb_status= wsrep_config_state->get_status();
324
325 if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
326 {
327 memcpy(&cluster_uuid, &view->state_id.uuid, sizeof(cluster_uuid));
328
329 wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
330 sizeof(cluster_uuid_str));
331 }
332
333 wsrep_cluster_conf_id= view->view;
334 wsrep_cluster_status= cluster_status_str[view->status];
335 wsrep_cluster_size= view->memb_num;
336 wsrep_local_index= view->my_idx;
337
338 WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
339 "number of nodes: %ld, my index: %ld, protocol version %d",
340 wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
341 (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
342 wsrep_cluster_size, wsrep_local_index, view->proto_ver);
343
344 /* Proceed further only if view is PRIMARY */
345 if (WSREP_VIEW_PRIMARY != view->status)
346 {
347 #ifdef HAVE_QUERY_CACHE
348 // query cache must be initialised by now
349 query_cache.flush();
350 #endif /* HAVE_QUERY_CACHE */
351
352 wsrep_ready_set(FALSE);
353 memb_status= WSREP_MEMBER_UNDEFINED;
354 /* Always record local_uuid and local_seqno in non-prim since this
355 * may lead to re-initializing provider and start position is
356 * determined according to these variables */
357 // WRONG! local_uuid should be the last primary configuration uuid we were
358 // a member of. local_seqno should be updated in commit calls.
359 // local_uuid= cluster_uuid;
360 // local_seqno= view->first - 1;
361 goto out;
362 }
363
364 switch (view->proto_ver)
365 {
366 case 0:
367 case 1:
368 case 2:
369 case 3:
370 // version change
371 if (view->proto_ver != wsrep_protocol_version)
372 {
373 my_bool wsrep_ready_saved= wsrep_ready_get();
374 wsrep_ready_set(FALSE);
375 WSREP_INFO("closing client connections for "
376 "protocol change %ld -> %d",
377 wsrep_protocol_version, view->proto_ver);
378 wsrep_close_client_connections(TRUE);
379 wsrep_protocol_version= view->proto_ver;
380 wsrep_ready_set(wsrep_ready_saved);
381 }
382 break;
383 default:
384 WSREP_ERROR("Unsupported application protocol version: %d",
385 view->proto_ver);
386 unireg_abort(1);
387 }
388
389 if (view->state_gap)
390 {
391 WSREP_WARN("Gap in state sequence. Need state transfer.");
392
393 /* After that wsrep will call wsrep_sst_prepare. */
394 /* keep ready flag 0 until we receive the snapshot */
395 wsrep_ready_set(FALSE);
396
397 /* Close client connections to ensure that they don't interfere
398 * with SST. Necessary only if storage engines are initialized
399 * before SST.
400 * TODO: Just killing all ongoing transactions should be enough
401 * since wsrep_ready is OFF and no new transactions can start.
402 */
403 if (!wsrep_before_SE())
404 {
405 WSREP_DEBUG("[debug]: closing client connections for PRIM");
406 wsrep_close_client_connections(FALSE);
407 }
408
409 ssize_t const req_len= wsrep_sst_prepare (sst_req);
410
411 if (req_len < 0)
412 {
413 WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
414 strerror(-req_len));
415 memb_status= WSREP_MEMBER_UNDEFINED;
416 }
417 else
418 {
419 assert(sst_req != NULL);
420 *sst_req_len= req_len;
421 memb_status= WSREP_MEMBER_JOINER;
422 }
423 }
424 else
425 {
426 /*
427 * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
428 * before - OR - it was reinitilized on startup (lp:992840)
429 */
430 if (wsrep_startup)
431 {
432 if (wsrep_before_SE())
433 {
434 wsrep_SE_init_grab();
435 // Signal mysqld init thread to continue
436 wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
437 // and wait for SE initialization
438 wsrep_SE_init_wait();
439 }
440 else
441 {
442 local_uuid= cluster_uuid;
443 local_seqno= view->state_id.seqno;
444 }
445 /* Init storage engine XIDs from first view */
446 wsrep_set_SE_checkpoint(local_uuid, local_seqno);
447 #ifdef GTID_SUPPORT
448 wsrep_init_sidno(local_uuid);
449 #endif /* GTID_SUPPORT */
450 memb_status= WSREP_MEMBER_JOINED;
451 }
452
453 // just some sanity check
454 if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
455 {
456 WSREP_ERROR("Undetected state gap. Can't continue.");
457 wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
458 &local_uuid, -1);
459 unireg_abort(1);
460 }
461 }
462
463 if (wsrep_auto_increment_control)
464 {
465 global_system_variables.auto_increment_offset= view->my_idx + 1;
466 global_system_variables.auto_increment_increment= view->memb_num;
467 }
468
469 { /* capabilities may be updated on new configuration */
470 uint64_t const caps(wsrep->capabilities (wsrep));
471
472 my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
473 if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
474 {
475 WSREP_WARN("Unsupported protocol downgrade: "
476 "incremental data collection disabled. Expect abort.");
477 }
478 wsrep_incremental_data_collection = idc;
479 }
480
481 out:
482 if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE;
483 wsrep_config_state->set(memb_status, view);
484
485 return WSREP_CB_SUCCESS;
486 }
487
wsrep_ready_set(my_bool x)488 my_bool wsrep_ready_set (my_bool x)
489 {
490 WSREP_DEBUG("Setting wsrep_ready to %d", x);
491 if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
492 my_bool ret= (wsrep_ready != x);
493 if (ret)
494 {
495 wsrep_ready= x;
496 mysql_cond_signal (&COND_wsrep_ready);
497 }
498 mysql_mutex_unlock (&LOCK_wsrep_ready);
499 return ret;
500 }
501
wsrep_ready_get(void)502 my_bool wsrep_ready_get (void)
503 {
504 if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
505 my_bool ret= wsrep_ready;
506 mysql_mutex_unlock (&LOCK_wsrep_ready);
507 return ret;
508 }
509
wsrep_show_ready(THD * thd,SHOW_VAR * var,char * buff)510 int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff)
511 {
512 var->type= SHOW_MY_BOOL;
513 var->value= buff;
514 *((my_bool *)buff)= wsrep_ready_get();
515 return 0;
516 }
517
518 // Wait until wsrep has reached ready state
wsrep_ready_wait()519 void wsrep_ready_wait ()
520 {
521 if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
522 while (!wsrep_ready)
523 {
524 WSREP_INFO("Waiting to reach ready state");
525 mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
526 }
527 WSREP_INFO("ready state reached");
528 mysql_mutex_unlock (&LOCK_wsrep_ready);
529 }
530
wsrep_synced_cb(void * app_ctx)531 static void wsrep_synced_cb(void* app_ctx)
532 {
533 WSREP_INFO("Synchronized with group, ready for connections");
534 my_bool signal_main= wsrep_ready_set(TRUE);
535 wsrep_config_state->set(WSREP_MEMBER_SYNCED);
536
537 if (signal_main)
538 {
539 wsrep_SE_init_grab();
540 // Signal mysqld init thread to continue
541 wsrep_sst_complete (&local_uuid, local_seqno, false);
542 // and wait for SE initialization
543 wsrep_SE_init_wait();
544 }
545 if (wsrep_restart_slave_activated)
546 {
547 int rcode;
548 WSREP_INFO("MariaDB slave restart");
549 wsrep_restart_slave_activated= FALSE;
550
551 mysql_mutex_lock(&LOCK_active_mi);
552 if ((rcode = start_slave_threads(0,
553 1 /* need mutex */,
554 0 /* no wait for start*/,
555 active_mi,
556 master_info_file,
557 relay_log_info_file,
558 SLAVE_SQL)))
559 {
560 WSREP_WARN("Failed to create slave threads: %d", rcode);
561 }
562 mysql_mutex_unlock(&LOCK_active_mi);
563
564 }
565 }
566
wsrep_init_position()567 static void wsrep_init_position()
568 {
569 /* read XIDs from storage engines */
570 wsrep_uuid_t uuid;
571 wsrep_seqno_t seqno;
572 wsrep_get_SE_checkpoint(uuid, seqno);
573
574 if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)))
575 {
576 WSREP_INFO("Read nil XID from storage engines, skipping position init");
577 return;
578 }
579
580 char uuid_str[40] = {0, };
581 wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
582 WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
583
584 if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
585 local_seqno == WSREP_SEQNO_UNDEFINED)
586 {
587 // Initial state
588 local_uuid= uuid;
589 local_seqno= seqno;
590 }
591 else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) ||
592 local_seqno != seqno)
593 {
594 WSREP_WARN("Initial position was provided by configuration or SST, "
595 "avoiding override");
596 }
597 }
598
599 extern char* my_bind_addr_str;
600
wsrep_init()601 int wsrep_init()
602 {
603 int rcode= -1;
604 DBUG_ASSERT(wsrep_inited == 0);
605
606 if (strcmp(wsrep_start_position, WSREP_START_POSITION_ZERO) &&
607 wsrep_start_position_init(wsrep_start_position))
608 {
609 return 1;
610 }
611
612 wsrep_sst_auth_init();
613
614 wsrep_ready_set(FALSE);
615 assert(wsrep_provider);
616
617 wsrep_init_position();
618
619 if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
620 {
621 if (strcasecmp(wsrep_provider, WSREP_NONE))
622 {
623 WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
624 wsrep_provider, strerror(rcode), rcode);
625 strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
626 return wsrep_init();
627 }
628 else /* this is for recursive call above */
629 {
630 WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
631 strerror(rcode), rcode);
632 unireg_abort(1);
633 }
634 }
635
636 if (!WSREP_PROVIDER_EXISTS)
637 {
638 // enable normal operation in case no provider is specified
639 wsrep_ready_set(TRUE);
640 global_system_variables.wsrep_on = 0;
641 wsrep_init_args args;
642 args.logger_cb = wsrep_log_cb;
643 args.options = (wsrep_provider_options) ?
644 wsrep_provider_options : "";
645 rcode = wsrep->init(wsrep, &args);
646 if (rcode)
647 {
648 DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
649 WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
650 wsrep_ready_set(FALSE);
651 wsrep->free(wsrep);
652 free(wsrep);
653 wsrep = NULL;
654 }
655 else
656 {
657 wsrep_inited= 1;
658 }
659 return rcode;
660 }
661 else
662 {
663 global_system_variables.wsrep_on = 1;
664 strncpy(provider_name,
665 wsrep->provider_name, sizeof(provider_name) - 1);
666 strncpy(provider_version,
667 wsrep->provider_version, sizeof(provider_version) - 1);
668 strncpy(provider_vendor,
669 wsrep->provider_vendor, sizeof(provider_vendor) - 1);
670 }
671
672 if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
673 wsrep_data_home_dir = mysql_real_data_home;
674
675 /* Initialize node address */
676 char node_addr[512]= { 0, };
677 size_t const node_addr_max= sizeof(node_addr) - 1;
678 if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
679 {
680 size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
681 if (!(ret > 0 && ret < node_addr_max))
682 {
683 WSREP_WARN("Failed to guess base node address. Set it explicitly via "
684 "wsrep_node_address.");
685 node_addr[0]= '\0';
686 }
687 }
688 else
689 {
690 strncpy(node_addr, wsrep_node_address, node_addr_max);
691 }
692
693 /* Initialize node's incoming address */
694 char inc_addr[512]= { 0, };
695 size_t const inc_addr_max= sizeof (inc_addr);
696
697 /*
698 In case wsrep_node_incoming_address is either not set or set to AUTO,
699 we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback
700 to wsrep_node_address' value if mysqld's bind-address is not set either.
701 */
702 if ((!wsrep_node_incoming_address ||
703 !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
704 {
705 bool is_ipv6= false;
706 unsigned int my_bind_ip= INADDR_ANY; // default if not set
707
708 if (my_bind_addr_str && strlen(my_bind_addr_str))
709 {
710 my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6);
711 }
712
713 if (INADDR_ANY != my_bind_ip)
714 {
715 /*
716 If its a not a valid address, leave inc_addr as empty string. mysqld
717 is not listening for client connections on network interfaces.
718 */
719 if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
720 {
721 const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u";
722 snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port);
723 }
724 }
725 else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */
726 {
727 size_t const node_addr_len= strlen(node_addr);
728 if (node_addr_len > 0)
729 {
730 wsp::Address addr(node_addr);
731
732 if (!addr.is_valid())
733 {
734 WSREP_DEBUG("Could not parse node address : %s", node_addr);
735 WSREP_WARN("Guessing address for incoming client connections failed. "
736 "Try setting wsrep_node_incoming_address explicitly.");
737 goto done;
738 }
739
740 const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
741 snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(),
742 (int) mysqld_port);
743 }
744 }
745 }
746 else
747 {
748 wsp::Address addr(wsrep_node_incoming_address);
749
750 if (!addr.is_valid())
751 {
752 WSREP_WARN("Could not parse wsrep_node_incoming_address : %s",
753 wsrep_node_incoming_address);
754 goto done;
755 }
756
757 /*
758 In case port is not specified in wsrep_node_incoming_address, we use
759 mysqld_port.
760 */
761 int port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port;
762 const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
763
764 snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port);
765 }
766
767 done:
768 struct wsrep_init_args wsrep_args;
769
770 struct wsrep_gtid const state_id = { local_uuid, local_seqno };
771
772 wsrep_args.data_dir = wsrep_data_home_dir;
773 wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
774 wsrep_args.node_address = node_addr;
775 wsrep_args.node_incoming = inc_addr;
776 wsrep_args.options = (wsrep_provider_options) ?
777 wsrep_provider_options : "";
778 wsrep_args.proto_ver = wsrep_max_protocol_version;
779
780 wsrep_args.state_id = &state_id;
781
782 wsrep_args.logger_cb = wsrep_log_cb;
783 wsrep_args.view_handler_cb = wsrep_view_handler_cb;
784 wsrep_args.apply_cb = wsrep_apply_cb;
785 wsrep_args.commit_cb = wsrep_commit_cb;
786 wsrep_args.unordered_cb = wsrep_unordered_cb;
787 wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
788 wsrep_args.synced_cb = wsrep_synced_cb;
789
790 rcode = wsrep->init(wsrep, &wsrep_args);
791
792 if (rcode)
793 {
794 DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
795 WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
796 wsrep->free(wsrep);
797 free(wsrep);
798 wsrep = NULL;
799 } else {
800 wsrep_inited= 1;
801 }
802
803 return rcode;
804 }
805
806
807 /* Initialize wsrep thread LOCKs and CONDs */
wsrep_thr_init()808 void wsrep_thr_init()
809 {
810 DBUG_ENTER("wsrep_thr_init");
811 wsrep_config_state = new wsp::Config_state;
812 #ifdef HAVE_PSI_INTERFACE
813 mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes));
814 mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds));
815 mysql_file_register("sql", wsrep_files, array_elements(wsrep_files));
816 mysql_thread_register("sql", wsrep_threads, array_elements(wsrep_threads));
817 #endif
818
819 mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
820 mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
821 mysql_mutex_init(key_LOCK_wsrep_sst, &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
822 mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
823 mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
824 mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
825 mysql_mutex_init(key_LOCK_wsrep_rollback, &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
826 mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
827 mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
828 mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
829 mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
830 mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
831 mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
832
833 DBUG_VOID_RETURN;
834 }
835
836 /* This is wrapper for wsrep_break_lock in thr_lock.c */
wsrep_thr_abort_thd(void * bf_thd_ptr,void * victim_thd_ptr,my_bool signal)837 static int wsrep_thr_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
838 {
839 THD* victim_thd= (THD *) victim_thd_ptr;
840 /* We need to lock THD::LOCK_thd_data to protect victim
841 from concurrent usage or disconnect or delete. */
842 wsrep_thd_LOCK(victim_thd);
843 int res= wsrep_abort_thd(bf_thd_ptr, victim_thd_ptr, signal);
844 return res;
845 }
846
847
wsrep_init_startup(bool first)848 void wsrep_init_startup (bool first)
849 {
850 if (wsrep_init()) unireg_abort(1);
851
852 wsrep_thr_lock_init(
853 (wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF,
854 (wsrep_abort_thd_fun)wsrep_thr_abort_thd,
855 wsrep_debug, wsrep_convert_LOCK_to_trx,
856 (wsrep_on_fun)wsrep_on);
857
858 /*
859 Pre-initialize global_system_variables.table_plugin with a dummy engine
860 (placeholder) required during the initialization of wsrep threads (THDs).
861 (see: plugin_thdvar_init())
862 Note: This only needs to be done for rsync & mariabackup based SST methods.
863 In case of mysqldump SST method, the wsrep threads are created after the
864 server plugins & global system variables are initialized.
865 */
866 if (wsrep_before_SE())
867 wsrep_plugins_pre_init();
868
869 /* Skip replication start if dummy wsrep provider is loaded */
870 if (!strcmp(wsrep_provider, WSREP_NONE)) return;
871
872 /* Skip replication start if no cluster address */
873 if (!wsrep_cluster_address || wsrep_cluster_address[0] == 0) return;
874
875 if (first) wsrep_sst_grab(); // do it so we can wait for SST below
876
877 if (!wsrep_start_replication()) unireg_abort(1);
878
879 wsrep_create_rollbacker();
880 wsrep_create_appliers(1);
881
882 if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
883 }
884
885
wsrep_deinit(bool free_options)886 void wsrep_deinit(bool free_options)
887 {
888 DBUG_ASSERT(wsrep_inited == 1);
889 wsrep_unload(wsrep);
890 wsrep= 0;
891 provider_name[0]= '\0';
892 provider_version[0]= '\0';
893 provider_vendor[0]= '\0';
894
895 wsrep_inited= 0;
896
897 if (free_options)
898 {
899 wsrep_sst_auth_free();
900 }
901 }
902
903 /* Destroy wsrep thread LOCKs and CONDs */
wsrep_thr_deinit()904 void wsrep_thr_deinit()
905 {
906 if (!wsrep_config_state)
907 return; // Never initialized
908 mysql_mutex_destroy(&LOCK_wsrep_ready);
909 mysql_cond_destroy(&COND_wsrep_ready);
910 mysql_mutex_destroy(&LOCK_wsrep_sst);
911 mysql_cond_destroy(&COND_wsrep_sst);
912 mysql_mutex_destroy(&LOCK_wsrep_sst_init);
913 mysql_cond_destroy(&COND_wsrep_sst_init);
914 mysql_mutex_destroy(&LOCK_wsrep_rollback);
915 mysql_cond_destroy(&COND_wsrep_rollback);
916 mysql_mutex_destroy(&LOCK_wsrep_replaying);
917 mysql_cond_destroy(&COND_wsrep_replaying);
918 mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
919 mysql_mutex_destroy(&LOCK_wsrep_desync);
920 mysql_mutex_destroy(&LOCK_wsrep_config_state);
921 delete wsrep_config_state;
922 wsrep_config_state= 0; // Safety
923 }
924
wsrep_recover()925 void wsrep_recover()
926 {
927 char uuid_str[40];
928
929 if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) &&
930 local_seqno == -2)
931 {
932 wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
933 WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
934 uuid_str, (long long)local_seqno);
935 return;
936 }
937 wsrep_uuid_t uuid;
938 wsrep_seqno_t seqno;
939 wsrep_get_SE_checkpoint(uuid, seqno);
940 wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
941 WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno);
942 }
943
944
wsrep_stop_replication(THD * thd)945 void wsrep_stop_replication(THD *thd)
946 {
947 WSREP_INFO("Stop replication");
948 if (!wsrep)
949 {
950 WSREP_INFO("Provider was not loaded, in stop replication");
951 return;
952 }
953
954 /* disconnect from group first to get wsrep_ready == FALSE */
955 WSREP_DEBUG("Provider disconnect");
956 wsrep->disconnect(wsrep);
957
958 wsrep_connected= FALSE;
959
960 wsrep_close_client_connections(TRUE);
961
962 /* wait until appliers have stopped */
963 wsrep_wait_appliers_close(thd);
964
965 return;
966 }
967
wsrep_start_replication()968 bool wsrep_start_replication()
969 {
970 wsrep_status_t rcode;
971
972 /* wsrep provider must be loaded. */
973 DBUG_ASSERT(wsrep);
974
975 /*
976 if provider is trivial, don't even try to connect,
977 but resume local node operation
978 */
979 if (!WSREP_PROVIDER_EXISTS)
980 {
981 // enable normal operation in case no provider is specified
982 wsrep_ready_set(TRUE);
983 return true;
984 }
985
986 if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
987 {
988 // if provider is non-trivial, but no address is specified, wait for address
989 wsrep_ready_set(FALSE);
990 return true;
991 }
992
993 bool const bootstrap= wsrep_new_cluster;
994
995 WSREP_INFO("Start replication");
996
997 if (wsrep_new_cluster)
998 {
999 WSREP_INFO("'wsrep-new-cluster' option used, bootstrapping the cluster");
1000 wsrep_new_cluster= false;
1001 }
1002
1003 if ((rcode = wsrep->connect(wsrep,
1004 wsrep_cluster_name,
1005 wsrep_cluster_address,
1006 wsrep_sst_donor,
1007 bootstrap)))
1008 {
1009 DBUG_PRINT("wsrep",("wsrep->connect(%s) failed: %d",
1010 wsrep_cluster_address, rcode));
1011 WSREP_ERROR("wsrep::connect(%s) failed: %d",
1012 wsrep_cluster_address, rcode);
1013 return false;
1014 }
1015 else
1016 {
1017 wsrep_connected= TRUE;
1018
1019 char* opts= wsrep->options_get(wsrep);
1020 if (opts)
1021 {
1022 wsrep_provider_options_init(opts);
1023 free(opts);
1024 }
1025 else
1026 {
1027 WSREP_WARN("Failed to get wsrep options");
1028 }
1029 }
1030
1031 return true;
1032 }
1033
wsrep_must_sync_wait(THD * thd,uint mask)1034 bool wsrep_must_sync_wait (THD* thd, uint mask)
1035 {
1036 return (thd->variables.wsrep_sync_wait & mask) &&
1037 thd->variables.wsrep_on &&
1038 !(thd->variables.wsrep_dirty_reads &&
1039 !is_update_query(thd->lex->sql_command)) &&
1040 !thd->in_active_multi_stmt_transaction() &&
1041 thd->wsrep_conflict_state != REPLAYING &&
1042 thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED;
1043 }
1044
wsrep_sync_wait(THD * thd,uint mask)1045 bool wsrep_sync_wait (THD* thd, uint mask)
1046 {
1047 if (wsrep_must_sync_wait(thd, mask))
1048 {
1049 WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
1050 thd->variables.wsrep_sync_wait, mask);
1051 // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
1052 // TODO: modify to check if thd has locked any rows.
1053 wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid);
1054
1055 if (unlikely(WSREP_OK != ret))
1056 {
1057 const char* msg;
1058 int err;
1059
1060 // Possibly relevant error codes:
1061 // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
1062 // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
1063 // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
1064
1065 switch (ret)
1066 {
1067 case WSREP_NOT_IMPLEMENTED:
1068 msg= "synchronous reads by wsrep backend. "
1069 "Please unset wsrep_causal_reads variable.";
1070 err= ER_NOT_SUPPORTED_YET;
1071 break;
1072 default:
1073 msg= "Synchronous wait failed.";
1074 err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
1075 // with ER_LOCK_WAIT_TIMEOUT
1076 }
1077
1078 my_error(err, MYF(0), msg);
1079
1080 return true;
1081 }
1082 }
1083
1084 return false;
1085 }
1086
wsrep_keys_free(wsrep_key_arr_t * key_arr)1087 void wsrep_keys_free(wsrep_key_arr_t* key_arr)
1088 {
1089 for (size_t i= 0; i < key_arr->keys_len; ++i)
1090 {
1091 my_free((void*)key_arr->keys[i].key_parts);
1092 }
1093 my_free(key_arr->keys);
1094 key_arr->keys= 0;
1095 key_arr->keys_len= 0;
1096 }
1097
1098
1099 /*!
1100 * @param db Database string
1101 * @param table Table string
1102 * @param key Array of wsrep_key_t
1103 * @param key_len In: number of elements in key array, Out: number of
1104 * elements populated
1105 *
1106 * @return true if preparation was successful, otherwise false.
1107 */
1108
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_buf_t * key,size_t * key_len)1109 static bool wsrep_prepare_key_for_isolation(const char* db,
1110 const char* table,
1111 wsrep_buf_t* key,
1112 size_t* key_len)
1113 {
1114 if (*key_len < 2) return false;
1115
1116 switch (wsrep_protocol_version)
1117 {
1118 case 0:
1119 *key_len= 0;
1120 break;
1121 case 1:
1122 case 2:
1123 case 3:
1124 {
1125 *key_len= 0;
1126 if (db)
1127 {
1128 // sql_print_information("%s.%s", db, table);
1129 key[*key_len].ptr= db;
1130 key[*key_len].len= strlen(db);
1131 ++(*key_len);
1132 if (table)
1133 {
1134 key[*key_len].ptr= table;
1135 key[*key_len].len= strlen(table);
1136 ++(*key_len);
1137 }
1138 }
1139 break;
1140 }
1141 default:
1142 return false;
1143 }
1144 return true;
1145 }
1146
1147
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_key_arr_t * ka)1148 static bool wsrep_prepare_key_for_isolation(const char* db,
1149 const char* table,
1150 wsrep_key_arr_t* ka)
1151 {
1152 wsrep_key_t* tmp;
1153
1154 if (!ka->keys)
1155 tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t),
1156 MYF(0));
1157 else
1158 tmp= (wsrep_key_t*)my_realloc(ka->keys,
1159 (ka->keys_len + 1) * sizeof(wsrep_key_t),
1160 MYF(0));
1161
1162 if (!tmp)
1163 {
1164 WSREP_ERROR("Can't allocate memory for key_array");
1165 return false;
1166 }
1167 ka->keys= tmp;
1168 if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
1169 my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
1170 {
1171 WSREP_ERROR("Can't allocate memory for key_parts");
1172 return false;
1173 }
1174 ka->keys[ka->keys_len].key_parts_num= 2;
1175 ++ka->keys_len;
1176 if (!wsrep_prepare_key_for_isolation(db, table,
1177 (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
1178 &ka->keys[ka->keys_len - 1].key_parts_num))
1179 {
1180 WSREP_ERROR("Preparing keys for isolation failed");
1181 return false;
1182 }
1183
1184 return true;
1185 }
1186
1187
wsrep_prepare_keys_for_alter_add_fk(const char * child_table_db,Alter_info * alter_info,wsrep_key_arr_t * ka)1188 static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
1189 Alter_info* alter_info,
1190 wsrep_key_arr_t* ka)
1191 {
1192 Key *key;
1193 List_iterator<Key> key_iterator(alter_info->key_list);
1194 while ((key= key_iterator++))
1195 {
1196 if (key->type == Key::FOREIGN_KEY)
1197 {
1198 Foreign_key *fk_key= (Foreign_key *)key;
1199 const char *db_name= fk_key->ref_db.str;
1200 const char *table_name= fk_key->ref_table.str;
1201 if (!db_name)
1202 {
1203 db_name= child_table_db;
1204 }
1205 if (!wsrep_prepare_key_for_isolation(db_name, table_name, ka))
1206 {
1207 return false;
1208 }
1209 }
1210 }
1211 return true;
1212 }
1213
1214
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep_key_arr_t * ka)1215 static bool wsrep_prepare_keys_for_isolation(THD* thd,
1216 const char* db,
1217 const char* table,
1218 const TABLE_LIST* table_list,
1219 Alter_info* alter_info,
1220 wsrep_key_arr_t* ka)
1221 {
1222 ka->keys= 0;
1223 ka->keys_len= 0;
1224
1225 if (db || table)
1226 {
1227 if (!wsrep_prepare_key_for_isolation(db, table, ka))
1228 goto err;
1229 }
1230
1231 for (const TABLE_LIST* table= table_list; table; table= table->next_global)
1232 {
1233 if (!wsrep_prepare_key_for_isolation(table->db.str, table->table_name.str, ka))
1234 goto err;
1235 }
1236
1237 if (alter_info && (alter_info->flags & (ALTER_ADD_FOREIGN_KEY)))
1238 {
1239 if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info, ka))
1240 goto err;
1241 }
1242
1243 return false;
1244
1245 err:
1246 wsrep_keys_free(ka);
1247 return true;
1248 }
1249
1250
1251 /* Prepare key list from db/table and table_list */
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,wsrep_key_arr_t * ka)1252 bool wsrep_prepare_keys_for_isolation(THD* thd,
1253 const char* db,
1254 const char* table,
1255 const TABLE_LIST* table_list,
1256 wsrep_key_arr_t* ka)
1257 {
1258 return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka);
1259 }
1260
1261
wsrep_prepare_key(const uchar * cache_key,size_t cache_key_len,const uchar * row_id,size_t row_id_len,wsrep_buf_t * key,size_t * key_len)1262 bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len,
1263 const uchar* row_id, size_t row_id_len,
1264 wsrep_buf_t* key, size_t* key_len)
1265 {
1266 if (*key_len < 3) return false;
1267
1268 *key_len= 0;
1269 switch (wsrep_protocol_version)
1270 {
1271 case 0:
1272 {
1273 key[0].ptr = cache_key;
1274 key[0].len = cache_key_len;
1275
1276 *key_len = 1;
1277 break;
1278 }
1279 case 1:
1280 case 2:
1281 case 3:
1282 {
1283 key[0].ptr = cache_key;
1284 key[0].len = strlen( (char*)cache_key );
1285
1286 key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
1287 key[1].len = strlen( (char*)(key[1].ptr) );
1288
1289 *key_len = 2;
1290 break;
1291 }
1292 default:
1293 return false;
1294 }
1295
1296 key[*key_len].ptr = row_id;
1297 key[*key_len].len = row_id_len;
1298 ++(*key_len);
1299
1300 return true;
1301 }
1302
1303
1304 /*
1305 * Construct Query_log_Event from thd query and serialize it
1306 * into buffer.
1307 *
1308 * Return 0 in case of success, 1 in case of error.
1309 */
wsrep_to_buf_helper(THD * thd,const char * query,uint query_len,uchar ** buf,size_t * buf_len)1310 int wsrep_to_buf_helper(
1311 THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
1312 {
1313 IO_CACHE tmp_io_cache;
1314 Log_event_writer writer(&tmp_io_cache,0);
1315 if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
1316 65536, MYF(MY_WME)))
1317 return 1;
1318 int ret(0);
1319 enum enum_binlog_checksum_alg current_binlog_check_alg=
1320 (enum_binlog_checksum_alg) binlog_checksum_options;
1321
1322 Format_description_log_event *tmp_fd= new Format_description_log_event(4);
1323 tmp_fd->checksum_alg= current_binlog_check_alg;
1324 writer.write(tmp_fd);
1325 delete tmp_fd;
1326
1327 #ifdef GTID_SUPPORT
1328 if (thd->variables.gtid_next.type == GTID_GROUP)
1329 {
1330 Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
1331 if (!gtid_ev.is_valid()) ret= 0;
1332 if (!ret && writer.write(>id_ev)) ret= 1;
1333 }
1334 #endif /* GTID_SUPPORT */
1335 if (wsrep_gtid_mode && thd->variables.gtid_seq_no)
1336 {
1337 Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no,
1338 thd->variables.gtid_domain_id,
1339 true, LOG_EVENT_SUPPRESS_USE_F,
1340 true, 0);
1341 gtid_event.server_id= thd->variables.server_id;
1342 if (!gtid_event.is_valid()) ret= 0;
1343 ret= writer.write(>id_event);
1344 }
1345
1346 /* if there is prepare query, add event for it */
1347 if (!ret && thd->wsrep_TOI_pre_query)
1348 {
1349 Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
1350 thd->wsrep_TOI_pre_query_len,
1351 FALSE, FALSE, FALSE, 0);
1352 ev.checksum_alg= current_binlog_check_alg;
1353 if (writer.write(&ev)) ret= 1;
1354 }
1355
1356 /* continue to append the actual query */
1357 Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
1358 ev.checksum_alg= current_binlog_check_alg;
1359 if (!ret && writer.write(&ev)) ret= 1;
1360 if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
1361 close_cached_file(&tmp_io_cache);
1362 return ret;
1363 }
1364
1365 static int
wsrep_alter_query_string(THD * thd,String * buf)1366 wsrep_alter_query_string(THD *thd, String *buf)
1367 {
1368 /* Append the "ALTER" part of the query */
1369 if (buf->append(STRING_WITH_LEN("ALTER ")))
1370 return 1;
1371 /* Append definer */
1372 append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host));
1373 /* Append the left part of thd->query after event name part */
1374 if (buf->append(thd->lex->stmt_definition_begin,
1375 thd->lex->stmt_definition_end -
1376 thd->lex->stmt_definition_begin))
1377 return 1;
1378
1379 return 0;
1380 }
1381
wsrep_alter_event_query(THD * thd,uchar ** buf,size_t * buf_len)1382 static int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len)
1383 {
1384 String log_query;
1385
1386 if (wsrep_alter_query_string(thd, &log_query))
1387 {
1388 WSREP_WARN("events alter string failed: schema: %s, query: %s",
1389 thd->get_db(), thd->query());
1390 return 1;
1391 }
1392 return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
1393 }
1394
1395 #include "sql_show.h"
1396 static int
create_view_query(THD * thd,uchar ** buf,size_t * buf_len)1397 create_view_query(THD *thd, uchar** buf, size_t* buf_len)
1398 {
1399 LEX *lex= thd->lex;
1400 SELECT_LEX *select_lex= &lex->select_lex;
1401 TABLE_LIST *first_table= select_lex->table_list.first;
1402 TABLE_LIST *views = first_table;
1403 LEX_USER *definer;
1404 String buff;
1405 const LEX_CSTRING command[3]=
1406 {{ STRING_WITH_LEN("CREATE ") },
1407 { STRING_WITH_LEN("ALTER ") },
1408 { STRING_WITH_LEN("CREATE OR REPLACE ") }};
1409
1410 buff.append(&command[thd->lex->create_view->mode]);
1411
1412 if (lex->definer)
1413 definer= get_current_user(thd, lex->definer);
1414 else
1415 {
1416 /*
1417 DEFINER-clause is missing; we have to create default definer in
1418 persistent arena to be PS/SP friendly.
1419 If this is an ALTER VIEW then the current user should be set as
1420 the definer.
1421 */
1422 definer= create_default_definer(thd, false);
1423 }
1424
1425 if (definer)
1426 {
1427 views->definer.user = definer->user;
1428 views->definer.host = definer->host;
1429 } else {
1430 WSREP_ERROR("Failed to get DEFINER for VIEW.");
1431 return 1;
1432 }
1433
1434 views->algorithm = lex->create_view->algorithm;
1435 views->view_suid = lex->create_view->suid;
1436 views->with_check = lex->create_view->check;
1437
1438 view_store_options(thd, views, &buff);
1439 buff.append(STRING_WITH_LEN("VIEW "));
1440 /* Test if user supplied a db (ie: we did not use thd->db) */
1441 if (views->db.str && views->db.str[0] &&
1442 (thd->db.str == NULL || cmp(&views->db, &thd->db)))
1443 {
1444 append_identifier(thd, &buff, &views->db);
1445 buff.append('.');
1446 }
1447 append_identifier(thd, &buff, &views->table_name);
1448 if (lex->view_list.elements)
1449 {
1450 List_iterator_fast<LEX_CSTRING> names(lex->view_list);
1451 LEX_CSTRING *name;
1452 int i;
1453
1454 for (i= 0; (name= names++); i++)
1455 {
1456 buff.append(i ? ", " : "(");
1457 append_identifier(thd, &buff, name);
1458 }
1459 buff.append(')');
1460 }
1461 buff.append(STRING_WITH_LEN(" AS "));
1462 //buff.append(views->source.str, views->source.length);
1463 buff.append(thd->lex->create_view->select.str,
1464 thd->lex->create_view->select.length);
1465 //int errcode= query_error_code(thd, TRUE);
1466 //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
1467 // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
1468 return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1469 }
1470
1471 /*
1472 Rewrite DROP TABLE for TOI. Temporary tables are eliminated from
1473 the query as they are visible only to client connection.
1474
1475 TODO: See comments for sql_base.cc:drop_temporary_table() and refine
1476 the function to deal with transactional locked tables.
1477 */
wsrep_drop_table_query(THD * thd,uchar ** buf,size_t * buf_len)1478 static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len)
1479 {
1480
1481 LEX* lex= thd->lex;
1482 SELECT_LEX* select_lex= &lex->select_lex;
1483 TABLE_LIST* first_table= select_lex->table_list.first;
1484 String buff;
1485
1486 DBUG_ASSERT(!lex->create_info.tmp_table());
1487
1488 bool found_temp_table= false;
1489 for (TABLE_LIST* table= first_table; table; table= table->next_global)
1490 {
1491 if (thd->find_temporary_table(table->db.str, table->table_name.str))
1492 {
1493 found_temp_table= true;
1494 break;
1495 }
1496 }
1497
1498 if (found_temp_table)
1499 {
1500 buff.append("DROP TABLE ");
1501 if (lex->check_exists)
1502 buff.append("IF EXISTS ");
1503
1504 for (TABLE_LIST* table= first_table; table; table= table->next_global)
1505 {
1506 if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1507 {
1508 append_identifier(thd, &buff, table->db.str, table->db.length);
1509 buff.append(".");
1510 append_identifier(thd, &buff,
1511 table->table_name.str, table->table_name.length);
1512 buff.append(",");
1513 }
1514 }
1515
1516 /* Chop the last comma */
1517 buff.chop();
1518 buff.append(" /* generated by wsrep */");
1519
1520 WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr());
1521
1522 return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1523 }
1524 else
1525 {
1526 return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1527 buf, buf_len);
1528 }
1529 }
1530
1531
1532 /* Forward declarations. */
1533 static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len);
1534 static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
1535
1536 /*
1537 Decide if statement should run in TOI.
1538
1539 Look if table or table_list contain temporary tables. If the
1540 statement affects only temporary tables, statement should not run
1541 in TOI. If the table list contains mix of regular and temporary tables
1542 (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but
1543 should be rewritten at later time for replication to contain only
1544 non-temporary tables.
1545 */
wsrep_can_run_in_toi(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list)1546 static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
1547 const TABLE_LIST *table_list)
1548 {
1549 DBUG_ASSERT(!table || db);
1550 DBUG_ASSERT(table_list || db);
1551
1552 LEX* lex= thd->lex;
1553 SELECT_LEX* select_lex= &lex->select_lex;
1554 TABLE_LIST* first_table= select_lex->table_list.first;
1555
1556 switch (lex->sql_command)
1557 {
1558 case SQLCOM_CREATE_TABLE:
1559 DBUG_ASSERT(!table_list);
1560 if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)
1561 {
1562 return false;
1563 }
1564 /*
1565 If mariadb master has replicated a CTAS, we should not replicate the create table
1566 part separately as TOI, but to replicate both create table and following inserts
1567 as one write set.
1568 Howver, if CTAS creates empty table, we should replicate the create table alone
1569 as TOI. We have to do relay log event lookup to see if row events follow the
1570 create table event.
1571 */
1572 if (thd->slave_thread && !(thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_STANDALONE))
1573 {
1574 /* this is CTAS, either empty or populated table */
1575 ulonglong event_size = 0;
1576 enum Log_event_type ev_type= wsrep_peak_event(thd->rgi_slave, &event_size);
1577 switch (ev_type)
1578 {
1579 case QUERY_EVENT:
1580 /* CTAS with empty table, we replicate create table as TOI */
1581 break;
1582
1583 case TABLE_MAP_EVENT:
1584 WSREP_DEBUG("replicating CTAS of empty table as TOI");
1585 // fall through
1586 case WRITE_ROWS_EVENT:
1587 /* CTAS with populated table, we replicate later at commit time */
1588 WSREP_DEBUG("skipping create table of CTAS replication");
1589 return false;
1590
1591 default:
1592 WSREP_WARN("unexpected async replication event: %d", ev_type);
1593 }
1594 return true;
1595 }
1596 /* no next async replication event */
1597 return true;
1598
1599 case SQLCOM_CREATE_VIEW:
1600
1601 DBUG_ASSERT(!table_list);
1602 DBUG_ASSERT(first_table); /* First table is view name */
1603 /*
1604 If any of the remaining tables refer to temporary table error
1605 is returned to client, so TOI can be skipped
1606 */
1607 for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global)
1608 {
1609 if (thd->find_temporary_table(it))
1610 {
1611 return false;
1612 }
1613 }
1614 return true;
1615
1616 case SQLCOM_CREATE_TRIGGER:
1617
1618 DBUG_ASSERT(first_table);
1619
1620 if (thd->find_temporary_table(first_table))
1621 {
1622 return false;
1623 }
1624 return true;
1625
1626 case SQLCOM_DROP_TRIGGER:
1627 DBUG_ASSERT(table_list);
1628 if (thd->find_temporary_table(table_list))
1629 {
1630 return false;
1631 }
1632 return true;
1633
1634 default:
1635 if (table && !thd->find_temporary_table(db, table))
1636 {
1637 return true;
1638 }
1639
1640 if (table_list)
1641 {
1642 for (TABLE_LIST* table= first_table; table; table= table->next_global)
1643 {
1644 if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1645 {
1646 return true;
1647 }
1648 }
1649 }
1650 return !(table || table_list);
1651 }
1652 }
1653
1654 /*
1655 returns:
1656 0: statement was replicated as TOI
1657 1: TOI replication was skipped
1658 -1: TOI replication failed
1659 */
wsrep_TOI_begin(THD * thd,const char * db_,const char * table_,const TABLE_LIST * table_list,Alter_info * alter_info)1660 static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
1661 const TABLE_LIST* table_list,
1662 Alter_info* alter_info)
1663 {
1664 wsrep_status_t ret(WSREP_WARNING);
1665 uchar* buf(0);
1666 size_t buf_len(0);
1667 int buf_err;
1668 int rc= 0;
1669
1670 if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
1671 {
1672 WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
1673 return 1;
1674 }
1675
1676 WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1677 thd->wsrep_exec_mode, wsrep_thd_query(thd));
1678
1679 switch (thd->lex->sql_command)
1680 {
1681 case SQLCOM_CREATE_VIEW:
1682 buf_err= create_view_query(thd, &buf, &buf_len);
1683 break;
1684 case SQLCOM_CREATE_PROCEDURE:
1685 case SQLCOM_CREATE_SPFUNCTION:
1686 buf_err= wsrep_create_sp(thd, &buf, &buf_len);
1687 break;
1688 case SQLCOM_CREATE_TRIGGER:
1689 buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
1690 break;
1691 case SQLCOM_CREATE_EVENT:
1692 buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
1693 break;
1694 case SQLCOM_ALTER_EVENT:
1695 buf_err= wsrep_alter_event_query(thd, &buf, &buf_len);
1696 break;
1697 case SQLCOM_DROP_TABLE:
1698 buf_err= wsrep_drop_table_query(thd, &buf, &buf_len);
1699 break;
1700 case SQLCOM_KILL:
1701 WSREP_DEBUG("KILL as TOI: %s", thd->query());
1702 buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1703 &buf, &buf_len);
1704 break;
1705 case SQLCOM_CREATE_ROLE:
1706 if (sp_process_definer(thd))
1707 {
1708 WSREP_WARN("Failed to set CREATE ROLE definer for TOI.");
1709 }
1710 /* fallthrough */
1711 default:
1712 buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1713 &buf, &buf_len);
1714 break;
1715 }
1716
1717 wsrep_key_arr_t key_arr= {0, 0};
1718 struct wsrep_buf buff = { buf, buf_len };
1719 if (!buf_err &&
1720 !wsrep_prepare_keys_for_isolation(thd, db_, table_,
1721 table_list, alter_info, &key_arr) &&
1722 key_arr.keys_len > 0 &&
1723 WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
1724 key_arr.keys, key_arr.keys_len,
1725 &buff, 1,
1726 &thd->wsrep_trx_meta)))
1727 {
1728 thd->wsrep_exec_mode= TOTAL_ORDER;
1729 wsrep_to_isolation++;
1730 wsrep_keys_free(&key_arr);
1731 WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
1732 thd->wsrep_exec_mode);
1733 }
1734 else if (key_arr.keys_len > 0) {
1735 /* jump to error handler in mysql_execute_command() */
1736 WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep "
1737 "connection state and retry the query.",
1738 ret,
1739 thd->get_db(),
1740 (thd->query()) ? thd->query() : "void");
1741 my_message(ER_LOCK_DEADLOCK, "WSREP replication failed. Check "
1742 "your wsrep connection state and retry the query.", MYF(0));
1743 wsrep_keys_free(&key_arr);
1744 rc= -1;
1745 }
1746 else {
1747 /* non replicated DDL, affecting temporary tables only */
1748 WSREP_DEBUG("TO isolation skipped for: %d, sql: %s."
1749 "Only temporary tables affected.",
1750 ret, (thd->query()) ? thd->query() : "void");
1751 rc= 1;
1752 }
1753 if (buf) my_free(buf);
1754 return rc;
1755 }
1756
wsrep_TOI_end(THD * thd)1757 static void wsrep_TOI_end(THD *thd) {
1758 wsrep_status_t ret;
1759 wsrep_to_isolation--;
1760
1761 WSREP_DEBUG("TO END: %lld, %d: %s", (long long)wsrep_thd_trx_seqno(thd),
1762 thd->wsrep_exec_mode, wsrep_thd_query(thd));
1763
1764 wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid,
1765 thd->wsrep_trx_meta.gtid.seqno);
1766 WSREP_DEBUG("TO END: %lld, update seqno",
1767 (long long)wsrep_thd_trx_seqno(thd));
1768
1769 if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
1770 WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
1771 }
1772 else {
1773 WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s",
1774 ret,
1775 thd->get_db(),
1776 (thd->query()) ? thd->query() : "void");
1777 }
1778 }
1779
wsrep_RSU_begin(THD * thd,const char * db_,const char * table_)1780 static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_)
1781 {
1782 wsrep_status_t ret(WSREP_WARNING);
1783 WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1784 thd->wsrep_exec_mode, thd->query() );
1785
1786 ret = wsrep->desync(wsrep);
1787 if (ret != WSREP_OK)
1788 {
1789 WSREP_WARN("RSU desync failed %d for schema: %s, query: %s",
1790 ret, thd->get_db(), thd->query());
1791 my_error(ER_LOCK_DEADLOCK, MYF(0));
1792 return(ret);
1793 }
1794
1795 mysql_mutex_lock(&LOCK_wsrep_replaying);
1796 wsrep_replaying++;
1797 mysql_mutex_unlock(&LOCK_wsrep_replaying);
1798
1799 if (wsrep_wait_committing_connections_close(5000))
1800 {
1801 /* no can do, bail out from DDL */
1802 WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s",
1803 thd->get_db(), thd->query());
1804 mysql_mutex_lock(&LOCK_wsrep_replaying);
1805 wsrep_replaying--;
1806 mysql_mutex_unlock(&LOCK_wsrep_replaying);
1807
1808 ret = wsrep->resync(wsrep);
1809 if (ret != WSREP_OK)
1810 {
1811 WSREP_WARN("resync failed %d for schema: %s, query: %s",
1812 ret, thd->get_db(), thd->query());
1813 }
1814
1815 my_error(ER_LOCK_DEADLOCK, MYF(0));
1816 return(-1);
1817 }
1818
1819 wsrep_seqno_t seqno = wsrep->pause(wsrep);
1820 if (seqno == WSREP_SEQNO_UNDEFINED)
1821 {
1822 WSREP_WARN("pause failed %lld for schema: %s, query: %s", (long long)seqno,
1823 thd->get_db(), thd->query());
1824 return(1);
1825 }
1826 WSREP_DEBUG("paused at %lld", (long long)seqno);
1827 thd->variables.wsrep_on = 0;
1828 return 0;
1829 }
1830
wsrep_RSU_end(THD * thd)1831 static void wsrep_RSU_end(THD *thd)
1832 {
1833 wsrep_status_t ret(WSREP_WARNING);
1834 WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1835 thd->wsrep_exec_mode, thd->query() );
1836
1837
1838 mysql_mutex_lock(&LOCK_wsrep_replaying);
1839 wsrep_replaying--;
1840 mysql_mutex_unlock(&LOCK_wsrep_replaying);
1841
1842 ret = wsrep->resume(wsrep);
1843 if (ret != WSREP_OK)
1844 {
1845 WSREP_WARN("resume failed %d for schema: %s, query: %s", ret,
1846 thd->get_db(), thd->query());
1847 }
1848
1849 ret = wsrep->resync(wsrep);
1850 if (ret != WSREP_OK)
1851 {
1852 WSREP_WARN("resync failed %d for schema: %s, query: %s", ret,
1853 thd->get_db(), thd->query());
1854 return;
1855 }
1856
1857 thd->variables.wsrep_on = 1;
1858 }
1859
wsrep_to_isolation_begin(THD * thd,const char * db_,const char * table_,const TABLE_LIST * table_list,Alter_info * alter_info)1860 int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
1861 const TABLE_LIST* table_list,
1862 Alter_info* alter_info)
1863 {
1864 int ret= 0;
1865
1866 /*
1867 No isolation for applier or replaying threads.
1868 */
1869 if (thd->wsrep_exec_mode == REPL_RECV)
1870 return 0;
1871
1872 mysql_mutex_lock(&thd->LOCK_thd_data);
1873
1874 if (thd->wsrep_conflict_state == MUST_ABORT)
1875 {
1876 WSREP_INFO("thread: %lld schema: %s query: %s has been aborted due to multi-master conflict",
1877 (longlong) thd->thread_id, thd->get_db(), thd->query());
1878 mysql_mutex_unlock(&thd->LOCK_thd_data);
1879 return WSREP_TRX_FAIL;
1880 }
1881 mysql_mutex_unlock(&thd->LOCK_thd_data);
1882
1883 DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
1884 DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
1885
1886 if (thd->global_read_lock.can_acquire_protection())
1887 {
1888 WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lld",
1889 thd->query(), (longlong) thd->thread_id);
1890 return -1;
1891 }
1892
1893 if (wsrep_debug && thd->mdl_context.has_locks())
1894 {
1895 WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lld",
1896 thd->query(), (longlong) thd->thread_id);
1897 }
1898
1899 /*
1900 It makes sense to set auto_increment_* to defaults in TOI operations.
1901 Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
1902 TOI statement and auto inc variables for wsrep replication is constructed
1903 there. Variables are reset back in THD::reset_for_next_command() before
1904 processing of next command.
1905 */
1906 if (wsrep_auto_increment_control)
1907 {
1908 thd->variables.auto_increment_offset = 1;
1909 thd->variables.auto_increment_increment = 1;
1910 }
1911
1912 if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
1913 {
1914 switch (thd->variables.wsrep_OSU_method) {
1915 case WSREP_OSU_TOI:
1916 ret= wsrep_TOI_begin(thd, db_, table_, table_list, alter_info);
1917 break;
1918 case WSREP_OSU_RSU:
1919 ret= wsrep_RSU_begin(thd, db_, table_);
1920 break;
1921 default:
1922 WSREP_ERROR("Unsupported OSU method: %lu",
1923 thd->variables.wsrep_OSU_method);
1924 ret= -1;
1925 break;
1926 }
1927 switch (ret) {
1928 case 0: thd->wsrep_exec_mode= TOTAL_ORDER; break;
1929 case 1:
1930 /* TOI replication skipped, treat as success */
1931 ret = 0;
1932 break;
1933 case -1:
1934 /* TOI replication failed, treat as error */
1935 break;
1936 }
1937 }
1938 return ret;
1939 }
1940
wsrep_to_isolation_end(THD * thd)1941 void wsrep_to_isolation_end(THD *thd)
1942 {
1943 if (thd->wsrep_exec_mode == TOTAL_ORDER)
1944 {
1945 switch(thd->variables.wsrep_OSU_method)
1946 {
1947 case WSREP_OSU_TOI: wsrep_TOI_end(thd); break;
1948 case WSREP_OSU_RSU: wsrep_RSU_end(thd); break;
1949 default:
1950 WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu",
1951 thd->variables.wsrep_OSU_method);
1952 break;
1953 }
1954 wsrep_cleanup_transaction(thd);
1955 }
1956 }
1957
1958 #define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \
1959 WSREP_##severity( \
1960 "%s\n" \
1961 "schema: %.*s\n" \
1962 "request: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
1963 "granted: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
1964 msg, schema_len, schema, \
1965 (longlong) req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
1966 req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
1967 req->get_command(), req->lex->sql_command, req->query(), \
1968 (longlong) gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
1969 gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
1970 gra->get_command(), gra->lex->sql_command, gra->query());
1971
1972 /**
1973 Check if request for the metadata lock should be granted to the requester.
1974
1975 @param requestor_ctx The MDL context of the requestor
1976 @param ticket MDL ticket for the requested lock
1977
1978 @retval TRUE Lock request can be granted
1979 @retval FALSE Lock request cannot be granted
1980 */
1981
wsrep_grant_mdl_exception(MDL_context * requestor_ctx,MDL_ticket * ticket,const MDL_key * key)1982 bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
1983 MDL_ticket *ticket,
1984 const MDL_key *key)
1985 {
1986 /* Fallback to the non-wsrep behaviour */
1987 if (!WSREP_ON) return FALSE;
1988
1989 THD *request_thd= requestor_ctx->get_thd();
1990 THD *granted_thd= ticket->get_ctx()->get_thd();
1991 bool ret= false;
1992
1993 const char* schema= key->db_name();
1994 int schema_len= key->db_name_length();
1995
1996 mysql_mutex_lock(&request_thd->LOCK_thd_data);
1997
1998 /*
1999 We consider granting MDL exceptions only for appliers (BF THD) and ones
2000 executing under TOI mode.
2001
2002 Rules:
2003 1. If granted/owner THD is also an applier (BF THD) or one executing
2004 under TOI mode, then we grant the requested lock to the requester
2005 THD.
2006 @return true
2007
2008 2. If granted/owner THD is executing a FLUSH command or already has an
2009 explicit lock, then do not grant the requested lock to the requester
2010 THD and it has to wait.
2011 @return false
2012
2013 3. In all other cases the granted/owner THD is aborted and the requested
2014 lock is not granted to the requester THD, thus it has to wait.
2015 @return false
2016 */
2017 if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
2018 request_thd->wsrep_exec_mode == REPL_RECV)
2019 {
2020 mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2021 WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
2022 request_thd, granted_thd);
2023 ticket->wsrep_report(wsrep_debug);
2024
2025 wsrep_thd_LOCK(granted_thd);
2026 if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
2027 granted_thd->wsrep_exec_mode == REPL_RECV)
2028 {
2029 WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
2030 request_thd, granted_thd);
2031 ticket->wsrep_report(true);
2032 wsrep_thd_UNLOCK(granted_thd);
2033 ret= true;
2034 }
2035 else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
2036 granted_thd->mdl_context.has_explicit_locks())
2037 {
2038 WSREP_DEBUG("BF thread waiting for FLUSH");
2039 ticket->wsrep_report(wsrep_debug);
2040 wsrep_thd_UNLOCK(granted_thd);
2041 ret= false;
2042 }
2043 else
2044 {
2045 /* Print some debug information. */
2046 if (wsrep_debug)
2047 {
2048 if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE ||
2049 request_thd->lex->sql_command == SQLCOM_DROP_SEQUENCE)
2050 {
2051 WSREP_DEBUG("DROP caused BF abort, conf %d", granted_thd->wsrep_conflict_state);
2052 }
2053 else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
2054 {
2055 WSREP_DEBUG("MDL granted, but committing thd abort scheduled");
2056 }
2057 else
2058 {
2059 WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
2060 request_thd, granted_thd);
2061 }
2062 ticket->wsrep_report(true);
2063 }
2064
2065 /* This will call wsrep_abort_transaction so we should hold
2066 THD::LOCK_thd_data to protect victim from concurrent usage
2067 or disconnect or delete. */
2068 wsrep_abort_thd((void *) request_thd, (void *) granted_thd, true);
2069 ret= false;
2070 }
2071 }
2072 else
2073 {
2074 mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2075 }
2076
2077 return ret;
2078 }
2079
2080
start_wsrep_THD(void * arg)2081 pthread_handler_t start_wsrep_THD(void *arg)
2082 {
2083 THD *thd;
2084 wsrep_thread_args* args= (wsrep_thread_args*)arg;
2085 wsrep_thd_processor_fun processor= args->processor;
2086
2087 if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
2088 {
2089 goto error;
2090 }
2091
2092 mysql_mutex_lock(&LOCK_thread_count);
2093
2094 if (wsrep_gtid_mode)
2095 {
2096 /* Adjust domain_id. */
2097 thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
2098 }
2099
2100 thd->real_id=pthread_self(); // Keep purify happy
2101 thread_created++;
2102 threads.append(thd);
2103
2104 my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
2105
2106 DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
2107 thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
2108 (void) mysql_mutex_unlock(&LOCK_thread_count);
2109
2110 /* from bootstrap()... */
2111 thd->bootstrap=1;
2112 thd->max_client_packet_length= thd->net.max_packet;
2113 thd->security_ctx->master_access= ~(ulong)0;
2114
2115 /* from handle_one_connection... */
2116 pthread_detach_this_thread();
2117
2118 mysql_thread_set_psi_id(thd->thread_id);
2119 thd->thr_create_utime= microsecond_interval_timer();
2120 if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
2121 {
2122 close_connection(thd, ER_OUT_OF_RESOURCES);
2123 statistic_increment(aborted_connects,&LOCK_status);
2124 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2125 goto error;
2126 }
2127
2128 // </5.1.17>
2129 /*
2130 handle_one_connection() is normally the only way a thread would
2131 start and would always be on the very high end of the stack ,
2132 therefore, the thread stack always starts at the address of the
2133 first local variable of handle_one_connection, which is thd. We
2134 need to know the start of the stack so that we could check for
2135 stack overruns.
2136 */
2137 DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld",
2138 (long long)thd->thread_id));
2139 /* now that we've called my_thread_init(), it is safe to call DBUG_* */
2140
2141 thd->thread_stack= (char*) &thd;
2142 if (thd->store_globals())
2143 {
2144 close_connection(thd, ER_OUT_OF_RESOURCES);
2145 statistic_increment(aborted_connects,&LOCK_status);
2146 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2147 goto error;
2148 }
2149
2150 thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
2151 thd->security_ctx->skip_grants();
2152
2153 /* handle_one_connection() again... */
2154 //thd->version= refresh_version;
2155 thd->proc_info= 0;
2156 thd->set_command(COM_SLEEP);
2157 thd->init_for_queries();
2158
2159 mysql_mutex_lock(&LOCK_thread_count);
2160 wsrep_running_threads++;
2161
2162 switch (args->thread_type) {
2163 case WSREP_APPLIER_THREAD:
2164 wsrep_running_applier_threads++;
2165 break;
2166 case WSREP_ROLLBACKER_THREAD:
2167 wsrep_running_rollbacker_threads++;
2168 break;
2169 default:
2170 WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
2171 break;
2172 }
2173
2174 mysql_cond_broadcast(&COND_thread_count);
2175 mysql_mutex_unlock(&LOCK_thread_count);
2176
2177 processor(thd);
2178
2179 close_connection(thd, 0);
2180
2181 mysql_mutex_lock(&LOCK_thread_count);
2182 DBUG_ASSERT(wsrep_running_threads > 0);
2183 wsrep_running_threads--;
2184
2185 switch (args->thread_type) {
2186 case WSREP_APPLIER_THREAD:
2187 DBUG_ASSERT(wsrep_running_applier_threads > 0);
2188 wsrep_running_applier_threads--;
2189 break;
2190 case WSREP_ROLLBACKER_THREAD:
2191 DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
2192 wsrep_running_rollbacker_threads--;
2193 break;
2194 default:
2195 WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
2196 break;
2197 }
2198
2199 my_free(args);
2200
2201 WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
2202 mysql_cond_broadcast(&COND_thread_count);
2203 mysql_mutex_unlock(&LOCK_thread_count);
2204
2205 // Note: We can't call THD destructor without crashing
2206 // if plugins have not been initialized. However, in most of the
2207 // cases this means that pre SE initialization SST failed and
2208 // we are going to exit anyway.
2209 if (plugins_are_initialized)
2210 {
2211 net_end(&thd->net);
2212 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
2213 }
2214 else
2215 {
2216 // TODO: lightweight cleanup to get rid of:
2217 // 'Error in my_thread_global_end(): 2 threads didn't exit'
2218 // at server shutdown
2219 }
2220
2221 unlink_not_visible_thd(thd);
2222 delete thd;
2223 my_thread_end();
2224 return(NULL);
2225
2226 error:
2227 WSREP_ERROR("Failed to create/initialize system thread");
2228
2229 my_free(args);
2230
2231 /* Abort if its the first applier/rollbacker thread. */
2232 if (!mysqld_server_initialized)
2233 unireg_abort(1);
2234 else
2235 return NULL;
2236 }
2237
2238
2239 /**/
abort_replicated(THD * thd)2240 static bool abort_replicated(THD *thd)
2241 {
2242 bool ret_code= false;
2243 wsrep_thd_LOCK(thd);
2244 if (thd->wsrep_query_state== QUERY_COMMITTING)
2245 {
2246 WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
2247
2248 (void)wsrep_abort_thd(thd, thd, TRUE);
2249 ret_code= true;
2250 }
2251 else
2252 wsrep_thd_UNLOCK(thd);
2253 return ret_code;
2254 }
2255
2256
2257 /**/
is_client_connection(THD * thd)2258 static inline bool is_client_connection(THD *thd)
2259 {
2260 return (thd->wsrep_client_thread && thd->variables.wsrep_on);
2261 }
2262
2263
is_replaying_connection(THD * thd)2264 static inline bool is_replaying_connection(THD *thd)
2265 {
2266 bool ret;
2267
2268 mysql_mutex_lock(&thd->LOCK_thd_data);
2269 ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
2270 mysql_mutex_unlock(&thd->LOCK_thd_data);
2271
2272 return ret;
2273 }
2274
2275
is_committing_connection(THD * thd)2276 static inline bool is_committing_connection(THD *thd)
2277 {
2278 bool ret;
2279
2280 mysql_mutex_lock(&thd->LOCK_thd_data);
2281 ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
2282 mysql_mutex_unlock(&thd->LOCK_thd_data);
2283
2284 return ret;
2285 }
2286
2287
have_client_connections()2288 static bool have_client_connections()
2289 {
2290 THD *tmp;
2291
2292 I_List_iterator<THD> it(threads);
2293 while ((tmp=it++))
2294 {
2295 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2296 (longlong) tmp->thread_id));
2297 if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
2298 {
2299 WSREP_DEBUG("Informing thread %lld that it's time to die",
2300 (longlong)tmp->thread_id);
2301 (void)abort_replicated(tmp);
2302 return true;
2303 }
2304 }
2305 return false;
2306 }
2307
wsrep_close_thread(THD * thd)2308 static void wsrep_close_thread(THD *thd)
2309 {
2310 thd->set_killed(KILL_CONNECTION);
2311 MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
2312 if (thd->mysys_var)
2313 {
2314 thd->mysys_var->abort=1;
2315 mysql_mutex_lock(&thd->mysys_var->mutex);
2316 if (thd->mysys_var->current_cond)
2317 {
2318 mysql_mutex_lock(thd->mysys_var->current_mutex);
2319 mysql_cond_broadcast(thd->mysys_var->current_cond);
2320 mysql_mutex_unlock(thd->mysys_var->current_mutex);
2321 }
2322 mysql_mutex_unlock(&thd->mysys_var->mutex);
2323 }
2324 }
2325
2326
have_committing_connections()2327 static my_bool have_committing_connections()
2328 {
2329 THD *tmp;
2330 mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2331
2332 I_List_iterator<THD> it(threads);
2333 while ((tmp=it++))
2334 {
2335 if (!is_client_connection(tmp))
2336 continue;
2337
2338 if (is_committing_connection(tmp))
2339 {
2340 mysql_mutex_unlock(&LOCK_thread_count);
2341 return TRUE;
2342 }
2343 }
2344 mysql_mutex_unlock(&LOCK_thread_count);
2345 return FALSE;
2346 }
2347
2348
wsrep_wait_committing_connections_close(int wait_time)2349 int wsrep_wait_committing_connections_close(int wait_time)
2350 {
2351 int sleep_time= 100;
2352
2353 while (have_committing_connections() && wait_time > 0)
2354 {
2355 WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
2356 my_sleep(sleep_time);
2357 wait_time -= sleep_time;
2358 }
2359 if (have_committing_connections())
2360 {
2361 return 1;
2362 }
2363 return 0;
2364 }
2365
2366
wsrep_close_client_connections(my_bool wait_to_end,THD * except_caller_thd)2367 void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
2368 {
2369 /*
2370 First signal all threads that it's time to die
2371 */
2372
2373 THD *tmp;
2374 mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2375
2376 bool kill_cached_threads_saved= kill_cached_threads;
2377 kill_cached_threads= true; // prevent future threads caching
2378 mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
2379
2380 I_List_iterator<THD> it(threads);
2381 while ((tmp=it++))
2382 {
2383 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2384 (longlong) tmp->thread_id));
2385 WSREP_DEBUG("Informing thread %lld that it's time to die",
2386 (longlong)tmp->thread_id);
2387 /* We skip slave threads & scheduler on this first loop through. */
2388 if (!is_client_connection(tmp))
2389 continue;
2390
2391 if (tmp == except_caller_thd)
2392 {
2393 DBUG_ASSERT(is_client_connection(tmp));
2394 continue;
2395 }
2396
2397 if (is_replaying_connection(tmp))
2398 {
2399 tmp->set_killed(KILL_CONNECTION);
2400 continue;
2401 }
2402
2403 /* replicated transactions must be skipped and aborted
2404 with wsrep_abort_thd. */
2405 if (abort_replicated(tmp))
2406 continue;
2407
2408 WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id);
2409
2410 /*
2411 instead of wsrep_close_thread() we do now soft kill by
2412 THD::awake(). Here also victim needs to be protected from
2413 concurrent usage or disconnect or delete.
2414 */
2415 tmp->awake(KILL_CONNECTION);
2416 }
2417 mysql_mutex_unlock(&LOCK_thread_count);
2418
2419 if (thread_count)
2420 sleep(2); // Give threads time to die
2421
2422 mysql_mutex_lock(&LOCK_thread_count);
2423 /*
2424 Force remaining threads to die by closing the connection to the client
2425 */
2426
2427 I_List_iterator<THD> it2(threads);
2428 while ((tmp=it2++))
2429 {
2430 if (is_client_connection(tmp) &&
2431 !abort_replicated(tmp) &&
2432 !is_replaying_connection(tmp) &&
2433 tmp != except_caller_thd)
2434 {
2435 WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id);
2436 close_connection(tmp,0);
2437 }
2438 }
2439
2440 DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
2441 WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
2442
2443 while (wait_to_end && have_client_connections())
2444 {
2445 mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
2446 DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
2447 }
2448
2449 kill_cached_threads= kill_cached_threads_saved;
2450
2451 mysql_mutex_unlock(&LOCK_thread_count);
2452
2453 /* All client connection threads have now been aborted */
2454 }
2455
2456
wsrep_close_applier(THD * thd)2457 void wsrep_close_applier(THD *thd)
2458 {
2459 WSREP_DEBUG("closing applier %lld", (longlong) thd->thread_id);
2460 wsrep_close_thread(thd);
2461 }
2462
2463
wsrep_close_threads(THD * thd)2464 void wsrep_close_threads(THD *thd)
2465 {
2466 THD *tmp;
2467 mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2468
2469 I_List_iterator<THD> it(threads);
2470 while ((tmp=it++))
2471 {
2472 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2473 (longlong) tmp->thread_id));
2474 /* We skip slave threads & scheduler on this first loop through. */
2475 if (tmp->wsrep_applier && tmp != thd)
2476 {
2477 WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
2478 wsrep_close_thread (tmp);
2479 }
2480 }
2481
2482 mysql_mutex_unlock(&LOCK_thread_count);
2483 }
2484
wsrep_wait_appliers_close(THD * thd)2485 void wsrep_wait_appliers_close(THD *thd)
2486 {
2487 /* Wait for wsrep appliers to gracefully exit */
2488 mysql_mutex_lock(&LOCK_thread_count);
2489 while (wsrep_running_threads > 1)
2490 // 1 is for rollbacker thread which needs to be killed explicitly.
2491 // This gotta be fixed in a more elegant manner if we gonna have arbitrary
2492 // number of non-applier wsrep threads.
2493 {
2494 if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
2495 {
2496 mysql_mutex_unlock(&LOCK_thread_count);
2497 my_sleep(100);
2498 mysql_mutex_lock(&LOCK_thread_count);
2499 }
2500 else
2501 mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
2502 DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
2503 }
2504 mysql_mutex_unlock(&LOCK_thread_count);
2505 /* Now kill remaining wsrep threads: rollbacker */
2506 wsrep_close_threads (thd);
2507 /* and wait for them to die */
2508 mysql_mutex_lock(&LOCK_thread_count);
2509 while (wsrep_running_threads > 0)
2510 {
2511 if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
2512 {
2513 mysql_mutex_unlock(&LOCK_thread_count);
2514 my_sleep(100);
2515 mysql_mutex_lock(&LOCK_thread_count);
2516 }
2517 else
2518 mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
2519 DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
2520 }
2521 mysql_mutex_unlock(&LOCK_thread_count);
2522
2523 /* All wsrep applier threads have now been aborted. However, if this thread
2524 is also applier, we are still running...
2525 */
2526 }
2527
2528
wsrep_kill_mysql(THD * thd)2529 void wsrep_kill_mysql(THD *thd)
2530 {
2531 if (mysqld_server_started)
2532 {
2533 if (!shutdown_in_progress)
2534 {
2535 WSREP_INFO("starting shutdown");
2536 kill_mysql();
2537 }
2538 }
2539 else
2540 {
2541 unireg_abort(1);
2542 }
2543 }
2544
2545
wsrep_create_sp(THD * thd,uchar ** buf,size_t * buf_len)2546 static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
2547 {
2548 String log_query;
2549 sp_head *sp = thd->lex->sphead;
2550 sql_mode_t saved_mode= thd->variables.sql_mode;
2551 String retstr(64);
2552 LEX_CSTRING returns= empty_clex_str;
2553 retstr.set_charset(system_charset_info);
2554
2555 log_query.set_charset(system_charset_info);
2556
2557 if (sp->m_handler->type() == TYPE_ENUM_FUNCTION)
2558 {
2559 sp_returns_type(thd, retstr, sp);
2560 returns= retstr.lex_cstring();
2561 }
2562 if (sp->m_handler->
2563 show_create_sp(thd, &log_query,
2564 sp->m_explicit_name ? sp->m_db : null_clex_str,
2565 sp->m_name, sp->m_params, returns,
2566 sp->m_body, sp->chistics(),
2567 thd->lex->definer[0],
2568 thd->lex->create_info,
2569 saved_mode))
2570 {
2571 WSREP_WARN("SP create string failed: schema: %s, query: %s",
2572 thd->get_db(), thd->query());
2573 return 1;
2574 }
2575
2576 return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
2577 }
2578
2579
wsrep_on(THD * thd)2580 extern int wsrep_on(THD *thd)
2581 {
2582 return (int)(WSREP(thd));
2583 }
2584
2585
wsrep_thd_is_wsrep_on(THD * thd)2586 extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
2587 {
2588 return thd->variables.wsrep_on;
2589 }
2590
2591
wsrep_consistency_check(THD * thd)2592 bool wsrep_consistency_check(THD *thd)
2593 {
2594 return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
2595 }
2596
2597
wsrep_thd_set_exec_mode(THD * thd,enum wsrep_exec_mode mode)2598 extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
2599 {
2600 thd->wsrep_exec_mode= mode;
2601 }
2602
2603
wsrep_thd_set_query_state(THD * thd,enum wsrep_query_state state)2604 extern "C" void wsrep_thd_set_query_state(
2605 THD *thd, enum wsrep_query_state state)
2606 {
2607 /* async slave thread should never flag IDLE state, as it may
2608 give rollbacker thread chance to interfere and rollback async slave
2609 transaction.
2610 in fact, async slave thread is never idle as it reads complete
2611 transactions from relay log and applies them, as a whole.
2612 BF abort happens voluntarily by async slave thread.
2613 */
2614 if (thd->slave_thread && state == QUERY_IDLE) {
2615 WSREP_DEBUG("Skipping IDLE state change for slave SQL");
2616 return;
2617 }
2618 thd->wsrep_query_state= state;
2619 }
2620
2621
wsrep_thd_set_conflict_state(THD * thd,enum wsrep_conflict_state state)2622 void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state)
2623 {
2624 mysql_mutex_assert_owner(&thd->LOCK_thd_data);
2625 thd->wsrep_conflict_state= state;
2626 }
2627
2628
wsrep_thd_exec_mode(THD * thd)2629 enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
2630 {
2631 return thd->wsrep_exec_mode;
2632 }
2633
2634
wsrep_thd_exec_mode_str(THD * thd)2635 const char *wsrep_thd_exec_mode_str(THD *thd)
2636 {
2637 return
2638 (!thd) ? "void" :
2639 (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" :
2640 (thd->wsrep_exec_mode == REPL_RECV) ? "applier" :
2641 (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" :
2642 (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void";
2643 }
2644
2645
wsrep_thd_query_state(THD * thd)2646 enum wsrep_query_state wsrep_thd_query_state(THD *thd)
2647 {
2648 return thd->wsrep_query_state;
2649 }
2650
2651
wsrep_thd_query_state_str(THD * thd)2652 const char *wsrep_thd_query_state_str(THD *thd)
2653 {
2654 return
2655 (!thd) ? "void" :
2656 (thd->wsrep_query_state == QUERY_IDLE) ? "idle" :
2657 (thd->wsrep_query_state == QUERY_EXEC) ? "executing" :
2658 (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" :
2659 (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" :
2660 (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void";
2661 }
2662
2663
wsrep_thd_get_conflict_state(THD * thd)2664 enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *thd)
2665 {
2666 return thd->wsrep_conflict_state;
2667 }
2668
2669
wsrep_thd_conflict_state_str(THD * thd)2670 const char *wsrep_thd_conflict_state_str(THD *thd)
2671 {
2672 return
2673 (!thd) ? "void" :
2674 (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" :
2675 (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" :
2676 (thd->wsrep_conflict_state == ABORTING) ? "aborting" :
2677 (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" :
2678 (thd->wsrep_conflict_state == REPLAYING) ? "replaying" :
2679 (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" :
2680 (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
2681 }
2682
2683
wsrep_thd_ws_handle(THD * thd)2684 wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
2685 {
2686 return &thd->wsrep_ws_handle;
2687 }
2688
2689
wsrep_thd_LOCK(THD * thd)2690 void wsrep_thd_LOCK(THD *thd)
2691 {
2692 mysql_mutex_lock(&thd->LOCK_thd_data);
2693 mysql_mutex_lock(&thd->LOCK_thd_kill);
2694 }
2695
2696
wsrep_thd_UNLOCK(THD * thd)2697 void wsrep_thd_UNLOCK(THD *thd)
2698 {
2699 mysql_mutex_unlock(&thd->LOCK_thd_kill);
2700 mysql_mutex_unlock(&thd->LOCK_thd_data);
2701 }
2702
2703
wsrep_thd_kill_LOCK(THD * thd)2704 void wsrep_thd_kill_LOCK(THD *thd)
2705 {
2706 mysql_mutex_lock(&thd->LOCK_thd_kill);
2707 }
2708
2709
wsrep_thd_kill_UNLOCK(THD * thd)2710 void wsrep_thd_kill_UNLOCK(THD *thd)
2711 {
2712 mysql_mutex_unlock(&thd->LOCK_thd_kill);
2713 }
2714
2715
wsrep_thd_query_start(THD * thd)2716 extern "C" time_t wsrep_thd_query_start(THD *thd)
2717 {
2718 return thd->query_start();
2719 }
2720
2721
wsrep_thd_wsrep_rand(THD * thd)2722 extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
2723 {
2724 return thd->wsrep_rand;
2725 }
2726
wsrep_thd_trx_seqno(THD * thd)2727 longlong wsrep_thd_trx_seqno(THD *thd)
2728 {
2729 return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED;
2730 }
2731
2732
wsrep_thd_query_id(THD * thd)2733 extern "C" query_id_t wsrep_thd_query_id(THD *thd)
2734 {
2735 return thd->query_id;
2736 }
2737
2738
wsrep_thd_query(THD * thd)2739 const char *wsrep_thd_query(THD *thd)
2740 {
2741 if (thd)
2742 {
2743 switch(thd->lex->sql_command)
2744 {
2745 case SQLCOM_CREATE_USER:
2746 return "CREATE USER";
2747 case SQLCOM_GRANT:
2748 return "GRANT";
2749 case SQLCOM_REVOKE:
2750 return "REVOKE";
2751 case SQLCOM_SET_OPTION:
2752 if (thd->lex->definer)
2753 return "SET PASSWORD";
2754 /* fallthrough */
2755 default:
2756 if (thd->query())
2757 return thd->query();
2758 }
2759 }
2760 return "NULL";
2761 }
2762
2763
wsrep_thd_wsrep_last_query_id(THD * thd)2764 extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
2765 {
2766 return thd->wsrep_last_query_id;
2767 }
2768
2769
wsrep_thd_set_wsrep_last_query_id(THD * thd,query_id_t id)2770 extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
2771 {
2772 thd->wsrep_last_query_id= id;
2773 }
2774
2775
wsrep_thd_awake(THD * thd,my_bool signal)2776 extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
2777 {
2778 if (signal)
2779 {
2780 /* Here we should hold THD::LOCK_thd_data to
2781 protect from concurrent usage and
2782 THD::LOCK_thd_kill from disconnect or delete */
2783 mysql_mutex_assert_owner(&thd->LOCK_thd_data);
2784 mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
2785 thd->awake_no_mutex(KILL_QUERY);
2786 }
2787 else
2788 {
2789 mysql_mutex_lock(&LOCK_wsrep_replaying);
2790 mysql_cond_broadcast(&COND_wsrep_replaying);
2791 mysql_mutex_unlock(&LOCK_wsrep_replaying);
2792 }
2793 }
2794
2795
wsrep_thd_retry_counter(THD * thd)2796 int wsrep_thd_retry_counter(THD *thd)
2797 {
2798 return(thd->wsrep_retry_counter);
2799 }
2800
2801
wsrep_thd_ignore_table(THD * thd)2802 extern "C" bool wsrep_thd_ignore_table(THD *thd)
2803 {
2804 return thd->wsrep_ignore_table;
2805 }
2806
2807
2808 extern int
wsrep_trx_order_before(THD * thd1,THD * thd2)2809 wsrep_trx_order_before(THD *thd1, THD *thd2)
2810 {
2811 const longlong trx1_seqno= wsrep_thd_trx_seqno(thd1);
2812 const longlong trx2_seqno= wsrep_thd_trx_seqno(thd2);
2813 WSREP_DEBUG("BF conflict, order: %lld %lld\n",
2814 trx1_seqno, trx2_seqno);
2815
2816 if (trx1_seqno == WSREP_SEQNO_UNDEFINED ||
2817 trx2_seqno == WSREP_SEQNO_UNDEFINED)
2818 return 1; /* trx is not yet replicated */
2819 else if (trx1_seqno < trx2_seqno)
2820 return 1;
2821
2822 return 0;
2823 }
2824
2825
wsrep_trx_is_aborting(THD * thd_ptr)2826 int wsrep_trx_is_aborting(THD *thd_ptr)
2827 {
2828 if (thd_ptr) {
2829 if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
2830 (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
2831 return 1;
2832 }
2833 }
2834 return 0;
2835 }
2836
2837
wsrep_copy_query(THD * thd)2838 void wsrep_copy_query(THD *thd)
2839 {
2840 thd->wsrep_retry_command = thd->get_command();
2841 thd->wsrep_retry_query_len = thd->query_length();
2842 if (thd->wsrep_retry_query) {
2843 my_free(thd->wsrep_retry_query);
2844 }
2845 thd->wsrep_retry_query = (char *)my_malloc(
2846 thd->wsrep_retry_query_len + 1, MYF(0));
2847 strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len);
2848 thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0';
2849 }
2850
2851
wsrep_is_show_query(enum enum_sql_command command)2852 bool wsrep_is_show_query(enum enum_sql_command command)
2853 {
2854 DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
2855 return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
2856 }
2857
wsrep_create_like_table(THD * thd,TABLE_LIST * table,TABLE_LIST * src_table,HA_CREATE_INFO * create_info)2858 bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
2859 TABLE_LIST* src_table,
2860 HA_CREATE_INFO *create_info)
2861 {
2862 if (create_info->tmp_table())
2863 {
2864 /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */
2865 WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s",
2866 thd->query());
2867 }
2868 else if (!(thd->find_temporary_table(src_table)))
2869 {
2870 /* this is straight CREATE TABLE LIKE... with no tmp tables */
2871 WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2872 }
2873 else
2874 {
2875 /* Non-MERGE tables ignore this call. */
2876 if (src_table->table->file->extra(HA_EXTRA_ADD_CHILDREN_LIST))
2877 return (true);
2878
2879 char buf[2048];
2880 String query(buf, sizeof(buf), system_charset_info);
2881 query.length(0); // Have to zero it since constructor doesn't
2882
2883 int result __attribute__((unused))=
2884 show_create_table(thd, src_table, &query, NULL, WITH_DB_NAME);
2885 WSREP_DEBUG("TMP TABLE: %s ret_code %d", query.ptr(), result);
2886
2887 thd->wsrep_TOI_pre_query= query.ptr();
2888 thd->wsrep_TOI_pre_query_len= query.length();
2889
2890 WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2891
2892 thd->wsrep_TOI_pre_query= NULL;
2893 thd->wsrep_TOI_pre_query_len= 0;
2894
2895 /* Non-MERGE tables ignore this call. */
2896 src_table->table->file->extra(HA_EXTRA_DETACH_CHILDREN);
2897 }
2898
2899 return(false);
2900
2901 wsrep_error_label:
2902 thd->wsrep_TOI_pre_query= NULL;
2903 return (true);
2904 }
2905
2906
wsrep_create_trigger_query(THD * thd,uchar ** buf,size_t * buf_len)2907 static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
2908 {
2909 LEX *lex= thd->lex;
2910 String stmt_query;
2911
2912 LEX_CSTRING definer_user;
2913 LEX_CSTRING definer_host;
2914
2915 if (!lex->definer)
2916 {
2917 if (!thd->slave_thread)
2918 {
2919 if (!(lex->definer= create_default_definer(thd, false)))
2920 return 1;
2921 }
2922 }
2923
2924 if (lex->definer)
2925 {
2926 /* SUID trigger. */
2927 LEX_USER *d= get_current_user(thd, lex->definer);
2928
2929 if (!d)
2930 return 1;
2931
2932 definer_user= d->user;
2933 definer_host= d->host;
2934 }
2935 else
2936 {
2937 /* non-SUID trigger. */
2938
2939 definer_user.str= 0;
2940 definer_user.length= 0;
2941
2942 definer_host.str= 0;
2943 definer_host.length= 0;
2944 }
2945
2946 const LEX_CSTRING command[2]=
2947 {{ C_STRING_WITH_LEN("CREATE ") },
2948 { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
2949
2950 if (thd->lex->create_info.or_replace())
2951 stmt_query.append(command[1]);
2952 else
2953 stmt_query.append(command[0]);
2954
2955 append_definer(thd, &stmt_query, &definer_user, &definer_host);
2956
2957 LEX_CSTRING stmt_definition;
2958 stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
2959 stmt_definition.length= thd->lex->stmt_definition_end
2960 - thd->lex->stmt_definition_begin;
2961 trim_whitespace(thd->charset(), &stmt_definition);
2962
2963 stmt_query.append(stmt_definition.str, stmt_definition.length);
2964
2965 return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
2966 buf, buf_len);
2967 }
2968
2969 /***** callbacks for wsrep service ************/
2970
get_wsrep_debug()2971 my_bool get_wsrep_debug()
2972 {
2973 return wsrep_debug;
2974 }
2975
get_wsrep_load_data_splitting()2976 my_bool get_wsrep_load_data_splitting()
2977 {
2978 return wsrep_load_data_splitting;
2979 }
2980
get_wsrep_protocol_version()2981 long get_wsrep_protocol_version()
2982 {
2983 return wsrep_protocol_version;
2984 }
2985
get_wsrep_drupal_282555_workaround()2986 my_bool get_wsrep_drupal_282555_workaround()
2987 {
2988 return wsrep_drupal_282555_workaround;
2989 }
2990
get_wsrep_recovery()2991 my_bool get_wsrep_recovery()
2992 {
2993 return wsrep_recovery;
2994 }
2995
get_wsrep_log_conflicts()2996 my_bool get_wsrep_log_conflicts()
2997 {
2998 return wsrep_log_conflicts;
2999 }
3000
get_wsrep()3001 wsrep_t *get_wsrep()
3002 {
3003 return wsrep;
3004 }
3005
get_wsrep_certify_nonPK()3006 my_bool get_wsrep_certify_nonPK()
3007 {
3008 return wsrep_certify_nonPK;
3009 }
3010
wsrep_lock_rollback()3011 void wsrep_lock_rollback()
3012 {
3013 mysql_mutex_lock(&LOCK_wsrep_rollback);
3014 }
3015
wsrep_unlock_rollback()3016 void wsrep_unlock_rollback()
3017 {
3018 mysql_cond_signal(&COND_wsrep_rollback);
3019 mysql_mutex_unlock(&LOCK_wsrep_rollback);
3020 }
3021
wsrep_aborting_thd_contains(THD * thd)3022 my_bool wsrep_aborting_thd_contains(THD *thd)
3023 {
3024 mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
3025 wsrep_aborting_thd_t abortees = wsrep_aborting_thd;
3026 while (abortees)
3027 {
3028 if (abortees->aborting_thd == thd)
3029 return true;
3030 abortees = abortees->next;
3031 }
3032 return false;
3033 }
3034
wsrep_aborting_thd_enqueue(THD * thd)3035 void wsrep_aborting_thd_enqueue(THD *thd)
3036 {
3037 mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
3038 wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
3039 my_malloc(sizeof(struct wsrep_aborting_thd), MYF(0));
3040 aborting->aborting_thd = thd;
3041 aborting->next = wsrep_aborting_thd;
3042 wsrep_aborting_thd = aborting;
3043 }
3044
wsrep_node_is_donor()3045 bool wsrep_node_is_donor()
3046 {
3047 return (WSREP_ON) ? (wsrep_config_state->get_status() == 2) : false;
3048 }
3049
wsrep_node_is_synced()3050 bool wsrep_node_is_synced()
3051 {
3052 return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
3053 }
3054