1 /* Copyright 2008-2021 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.x1
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #include "sql_plugin.h" /* wsrep_plugins_pre_init() */
17 #include "my_global.h"
18 #include "wsrep_server_state.h"
19
20 #include "mariadb.h"
21 #include <mysqld.h>
22 #include <transaction.h>
23 #include <sql_class.h>
24 #include <sql_parse.h>
25 #include <sql_base.h> /* find_temporary_table() */
26 #include "slave.h"
27 #include "rpl_mi.h"
28 #include "sql_repl.h"
29 #include "rpl_filter.h"
30 #include "sql_callback.h"
31 #include "sp_head.h"
32 #include "sql_show.h"
33 #include "sp.h"
34 #include "wsrep_priv.h"
35 #include "wsrep_thd.h"
36 #include "wsrep_sst.h"
37 #include "wsrep_utils.h"
38 #include "wsrep_var.h"
39 #include "wsrep_binlog.h"
40 #include "wsrep_applier.h"
41 #include "wsrep_schema.h"
42 #include "wsrep_xid.h"
43 #include "wsrep_trans_observer.h"
44 #include "mysql/service_wsrep.h"
45 #include <cstdio>
46 #include <cstdlib>
47 #include <string>
48 #include "log_event.h"
49 #include "sql_connect.h"
50
51 #include <sstream>
52
53 /* wsrep-lib */
54 Wsrep_server_state* Wsrep_server_state::m_instance;
55
56 my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
57 #ifdef GTID_SUPPORT
58 /* Sidno in global_sid_map corresponding to group uuid */
59 rpl_sidno wsrep_sidno= -1;
60 #endif /* GTID_SUPPORT */
61 my_bool wsrep_preordered_opt= FALSE;
62
63 /* Streaming Replication */
64 const char *wsrep_fragment_units[]= { "bytes", "rows", "statements", NullS };
65 const char *wsrep_SR_store_types[]= { "none", "table", NullS };
66
67 /*
68 * Begin configuration options
69 */
70
71 extern my_bool plugins_are_initialized;
72 extern uint kill_cached_threads;
73 extern mysql_cond_t COND_thread_cache;
74
75 /* System variables. */
76 const char *wsrep_provider;
77 const char *wsrep_provider_options;
78 const char *wsrep_cluster_address;
79 const char *wsrep_cluster_name;
80 const char *wsrep_node_name;
81 const char *wsrep_node_address;
82 const char *wsrep_node_incoming_address;
83 const char *wsrep_start_position;
84 const char *wsrep_data_home_dir;
85 const char *wsrep_dbug_option;
86 const char *wsrep_notify_cmd;
87
88 ulong wsrep_debug; // Debug level logging
89 my_bool wsrep_convert_LOCK_to_trx; // Convert locking sessions to trx
90 my_bool wsrep_auto_increment_control; // Control auto increment variables
91 my_bool wsrep_drupal_282555_workaround; // Retry autoinc insert after dupkey
92 my_bool wsrep_certify_nonPK; // Certify, even when no primary key
93 ulong wsrep_certification_rules = WSREP_CERTIFICATION_RULES_STRICT;
94 my_bool wsrep_recovery; // Recovery
95 my_bool wsrep_replicate_myisam; // Enable MyISAM replication
96 my_bool wsrep_log_conflicts;
97 my_bool wsrep_load_data_splitting= 0; // Commit load data every 10K intervals
98 my_bool wsrep_slave_UK_checks; // Slave thread does UK checks
99 my_bool wsrep_slave_FK_checks; // Slave thread does FK checks
100 my_bool wsrep_restart_slave; // Should mysql slave thread be
101 // restarted, when node joins back?
102 my_bool wsrep_desync; // De(re)synchronize the node from the
103 // cluster
104 long wsrep_slave_threads; // No. of slave appliers threads
105 ulong wsrep_retry_autocommit; // Retry aborted autocommit trx
106 ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) size
107 ulong wsrep_max_ws_rows; // Max number of rows in ws
108 ulong wsrep_forced_binlog_format= BINLOG_FORMAT_UNSPEC;
109 ulong wsrep_mysql_replication_bundle;
110 bool wsrep_gtid_mode; // Use wsrep_gtid_domain_id
111 // for galera transactions?
112 uint32 wsrep_gtid_domain_id; // gtid_domain_id for galera
113 // transactions
114
115 /* Other configuration variables and their default values. */
116 my_bool wsrep_incremental_data_collection= 0; // Incremental data collection
117 my_bool wsrep_restart_slave_activated= 0; // Node has dropped, and slave
118 // restart will be needed
119 bool wsrep_new_cluster= false; // Bootstrap the cluster?
120 int wsrep_slave_count_change= 0; // No. of appliers to stop/start
121 int wsrep_to_isolation= 0; // No. of active TO isolation threads
122 long wsrep_max_protocol_version= 4; // Maximum protocol version to use
123 long int wsrep_protocol_version= wsrep_max_protocol_version;
124 ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES;
125 // unit for fragment size
126 ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE;
127 uint wsrep_ignore_apply_errors= 0;
128
129
130 /*
131 * End configuration options
132 */
133
134 /*
135 * Cached variables
136 */
137
138 // Whether the Galera write-set replication provider is set
139 // wsrep_provider && strcmp(wsrep_provider, WSREP_NONE)
140 bool WSREP_PROVIDER_EXISTS_;
141
142 // Whether the Galera write-set replication is enabled
143 // global_system_variables.wsrep_on && WSREP_PROVIDER_EXISTS_
144 bool WSREP_ON_;
145
146 /*
147 * Other wsrep global variables.
148 */
149
150 mysql_mutex_t LOCK_wsrep_ready;
151 mysql_cond_t COND_wsrep_ready;
152 mysql_mutex_t LOCK_wsrep_sst;
153 mysql_cond_t COND_wsrep_sst;
154 mysql_mutex_t LOCK_wsrep_sst_init;
155 mysql_cond_t COND_wsrep_sst_init;
156 mysql_mutex_t LOCK_wsrep_replaying;
157 mysql_cond_t COND_wsrep_replaying;
158 mysql_mutex_t LOCK_wsrep_slave_threads;
159 mysql_cond_t COND_wsrep_slave_threads;
160 mysql_mutex_t LOCK_wsrep_cluster_config;
161 mysql_mutex_t LOCK_wsrep_desync;
162 mysql_mutex_t LOCK_wsrep_config_state;
163 mysql_mutex_t LOCK_wsrep_group_commit;
164 mysql_mutex_t LOCK_wsrep_SR_pool;
165 mysql_mutex_t LOCK_wsrep_SR_store;
166 mysql_mutex_t LOCK_wsrep_joiner_monitor;
167 mysql_mutex_t LOCK_wsrep_donor_monitor;
168 mysql_cond_t COND_wsrep_joiner_monitor;
169 mysql_cond_t COND_wsrep_donor_monitor;
170
171 int wsrep_replaying= 0;
172 ulong wsrep_running_threads = 0; // # of currently running wsrep
173 // # threads
174 ulong wsrep_running_applier_threads = 0; // # of running applier threads
175 ulong wsrep_running_rollbacker_threads = 0; // # of running
176 // # rollbacker threads
177 ulong my_bind_addr;
178
179 #ifdef HAVE_PSI_INTERFACE
180 PSI_mutex_key
181 key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
182 key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
183 key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
184 key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config,
185 key_LOCK_wsrep_group_commit,
186 key_LOCK_wsrep_SR_pool,
187 key_LOCK_wsrep_SR_store,
188 key_LOCK_wsrep_thd_queue,
189 key_LOCK_wsrep_joiner_monitor,
190 key_LOCK_wsrep_donor_monitor;
191
192 PSI_cond_key key_COND_wsrep_thd,
193 key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
194 key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
195 key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads,
196 key_COND_wsrep_joiner_monitor, key_COND_wsrep_donor_monitor;
197
198 PSI_file_key key_file_wsrep_gra_log;
199
200 static PSI_mutex_info wsrep_mutexes[]=
201 {
202 { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
203 { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
204 { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
205 { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
206 { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
207 { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
208 { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
209 { &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL},
210 { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
211 { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
212 { &key_LOCK_wsrep_group_commit, "LOCK_wsrep_group_commit", PSI_FLAG_GLOBAL},
213 { &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL},
214 { &key_LOCK_wsrep_SR_store, "LOCK_wsrep_SR_store", PSI_FLAG_GLOBAL},
215 { &key_LOCK_wsrep_joiner_monitor, "LOCK_wsrep_joiner_monitor", PSI_FLAG_GLOBAL},
216 { &key_LOCK_wsrep_donor_monitor, "LOCK_wsrep_donor_monitor", PSI_FLAG_GLOBAL}
217 };
218
219 static PSI_cond_info wsrep_conds[]=
220 {
221 { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL},
222 { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
223 { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
224 { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
225 { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
226 { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
227 { &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL},
228 { &key_COND_wsrep_joiner_monitor, "COND_wsrep_joiner_monitor", PSI_FLAG_GLOBAL},
229 { &key_COND_wsrep_donor_monitor, "COND_wsrep_donor_monitor", PSI_FLAG_GLOBAL}
230 };
231
232 static PSI_file_info wsrep_files[]=
233 {
234 { &key_file_wsrep_gra_log, "wsrep_gra_log", 0}
235 };
236
237 PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
238 key_wsrep_rollbacker, key_wsrep_applier,
239 key_wsrep_sst_joiner_monitor, key_wsrep_sst_donor_monitor;
240
241 static PSI_thread_info wsrep_threads[]=
242 {
243 {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
244 {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
245 {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
246 {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
247 {&key_wsrep_sst_joiner_monitor, "wsrep_sst_joiner_monitor", PSI_FLAG_GLOBAL},
248 {&key_wsrep_sst_donor_monitor, "wsrep_sst_donor_monitor", PSI_FLAG_GLOBAL}
249 };
250
251 #endif /* HAVE_PSI_INTERFACE */
252
253 my_bool wsrep_inited= 0; // initialized ?
254
255 static wsrep_uuid_t node_uuid= WSREP_UUID_UNDEFINED;
256 static char cluster_uuid_str[40]= { 0, };
257
258 static char provider_name[256]= { 0, };
259 static char provider_version[256]= { 0, };
260 static char provider_vendor[256]= { 0, };
261
262 /*
263 * Wsrep status variables. LOCK_status must be locked When modifying
264 * these variables,
265 */
266 my_bool wsrep_connected = FALSE;
267 my_bool wsrep_ready = FALSE;
268 const char* wsrep_cluster_state_uuid= cluster_uuid_str;
269 long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
270 const char* wsrep_cluster_status = "Disconnected";
271 long wsrep_cluster_size = 0;
272 long wsrep_local_index = -1;
273 long long wsrep_local_bf_aborts = 0;
274 const char* wsrep_provider_name = provider_name;
275 const char* wsrep_provider_version = provider_version;
276 const char* wsrep_provider_vendor = provider_vendor;
277 char* wsrep_provider_capabilities = NULL;
278 char* wsrep_cluster_capabilities = NULL;
279 /* End wsrep status variables */
280
281 wsp::Config_state *wsrep_config_state;
282
WSREP_LOG(void (* fun)(const char * fmt,...),const char * fmt,...)283 void WSREP_LOG(void (*fun)(const char* fmt, ...), const char* fmt, ...)
284 {
285 /* Allocate short buffer from stack. If the vsnprintf() return value
286 indicates that the message was truncated, a new buffer will be allocated
287 dynamically and the message will be reprinted. */
288 char msg[128] = {'\0'};
289 va_list arglist;
290 va_start(arglist, fmt);
291 int n= vsnprintf(msg, sizeof(msg), fmt, arglist);
292 va_end(arglist);
293 if (n < 0)
294 {
295 sql_print_warning("WSREP: Printing message failed");
296 }
297 else if (n < (int)sizeof(msg))
298 {
299 fun("WSREP: %s", msg);
300 }
301 else
302 {
303 size_t dynbuf_size= std::max(n, 4096);
304 char* dynbuf= (char*) my_malloc(dynbuf_size, MYF(0));
305 if (dynbuf)
306 {
307 va_start(arglist, fmt);
308 (void)vsnprintf(&dynbuf[0], dynbuf_size - 1, fmt, arglist);
309 va_end(arglist);
310 dynbuf[dynbuf_size - 1] = '\0';
311 fun("WSREP: %s", &dynbuf[0]);
312 my_free(dynbuf);
313 }
314 else
315 {
316 /* Memory allocation for vector failed, print truncated message. */
317 fun("WSREP: %s", msg);
318 }
319 }
320 }
321
322
323 wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
324 wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
325 wsp::node_status local_status;
326
327 /*
328 */
329 Wsrep_schema *wsrep_schema= 0;
330
wsrep_log_cb(wsrep::log::level level,const char *,const char * msg)331 static void wsrep_log_cb(wsrep::log::level level,
332 const char*, const char *msg)
333 {
334 /*
335 Silence all wsrep related logging from lib and provider if
336 wsrep is not enabled.
337 */
338 if (!WSREP_ON) return;
339
340 switch (level) {
341 case wsrep::log::info:
342 WSREP_INFO("%s", msg);
343 break;
344 case wsrep::log::warning:
345 WSREP_WARN("%s", msg);
346 break;
347 case wsrep::log::error:
348 WSREP_ERROR("%s", msg);
349 break;
350 case wsrep::log::debug:
351 WSREP_DEBUG("%s", msg);
352 break;
353 case wsrep::log::unknown:
354 WSREP_UNKNOWN("%s", msg);
355 break;
356 }
357 }
358
wsrep_init_sidno(const wsrep::id & uuid)359 void wsrep_init_sidno(const wsrep::id& uuid)
360 {
361 /*
362 Protocol versions starting from 4 use group gtid as it is.
363 For lesser protocol versions generate new Sid map entry from inverted
364 uuid.
365 */
366 rpl_gtid sid;
367 if (wsrep_protocol_version >= 4)
368 {
369 memcpy((void*)&sid, (const uchar*)uuid.data(),16);
370 }
371 else
372 {
373 wsrep_uuid_t ltid_uuid;
374 for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
375 {
376 ltid_uuid.data[i]= ~((const uchar*)uuid.data())[i];
377 }
378 memcpy((void*)&sid, (const uchar*)ltid_uuid.data,16);
379 }
380 #ifdef GTID_SUPPORT
381 global_sid_lock->wrlock();
382 wsrep_sidno= global_sid_map->add_sid(sid);
383 WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno);
384 global_sid_lock->unlock();
385 #endif
386 }
387
wsrep_init_schema()388 void wsrep_init_schema()
389 {
390 DBUG_ASSERT(!wsrep_schema);
391
392 WSREP_INFO("wsrep_init_schema_and_SR %p", wsrep_schema);
393 if (!wsrep_schema)
394 {
395 wsrep_schema= new Wsrep_schema();
396 if (wsrep_schema->init())
397 {
398 WSREP_ERROR("Failed to init wsrep schema");
399 unireg_abort(1);
400 }
401 }
402 }
403
wsrep_deinit_schema()404 void wsrep_deinit_schema()
405 {
406 delete wsrep_schema;
407 wsrep_schema= 0;
408 }
409
wsrep_recover_sr_from_storage(THD * orig_thd)410 void wsrep_recover_sr_from_storage(THD *orig_thd)
411 {
412 switch (wsrep_SR_store_type)
413 {
414 case WSREP_SR_STORE_TABLE:
415 if (!wsrep_schema)
416 {
417 WSREP_ERROR("Wsrep schema not initialized when trying to recover "
418 "streaming transactions");
419 unireg_abort(1);
420 }
421 if (wsrep_schema->recover_sr_transactions(orig_thd))
422 {
423 WSREP_ERROR("Failed to recover SR transactions from schema");
424 unireg_abort(1);
425 }
426 break;
427 default:
428 /* */
429 WSREP_ERROR("Unsupported wsrep SR store type: %lu", wsrep_SR_store_type);
430 unireg_abort(1);
431 break;
432 }
433 }
434
435 /** Export the WSREP provider's capabilities as a human readable string.
436 * The result is saved in a dynamically allocated string of the form:
437 * :cap1:cap2:cap3:
438 */
wsrep_capabilities_export(wsrep_cap_t const cap,char ** str)439 static void wsrep_capabilities_export(wsrep_cap_t const cap, char** str)
440 {
441 static const char* names[] =
442 {
443 /* Keep in sync with wsrep/wsrep_api.h WSREP_CAP_* macros. */
444 "MULTI_MASTER",
445 "CERTIFICATION",
446 "PARALLEL_APPLYING",
447 "TRX_REPLAY",
448 "ISOLATION",
449 "PAUSE",
450 "CAUSAL_READS",
451 "CAUSAL_TRX",
452 "INCREMENTAL_WRITESET",
453 "SESSION_LOCKS",
454 "DISTRIBUTED_LOCKS",
455 "CONSISTENCY_CHECK",
456 "UNORDERED",
457 "ANNOTATION",
458 "PREORDERED",
459 "STREAMING",
460 "SNAPSHOT",
461 "NBO",
462 };
463
464 std::string s;
465 for (size_t i= 0; i < sizeof(names) / sizeof(names[0]); ++i)
466 {
467 if (cap & (1ULL << i))
468 {
469 if (s.empty())
470 {
471 s= ":";
472 }
473 s += names[i];
474 s += ":";
475 }
476 }
477
478 /* A read from the string pointed to by *str may be started at any time,
479 * so it must never point to free(3)d memory or non '\0' terminated string. */
480
481 char* const previous= *str;
482
483 *str= strdup(s.c_str());
484
485 if (previous != NULL)
486 {
487 free(previous);
488 }
489 }
490
491 /* Verifies that SE position is consistent with the group position
492 * and initializes other variables */
wsrep_verify_SE_checkpoint(const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno)493 void wsrep_verify_SE_checkpoint(const wsrep_uuid_t& uuid,
494 wsrep_seqno_t const seqno)
495 {
496 }
497
498 /*
499 Wsrep is considered ready if
500 1) Provider is not loaded (native mode)
501 2) Server has reached synced state
502 3) Server is in joiner mode and mysqldump SST method has been
503 specified
504 See Wsrep_server_service::log_state_change() for further details.
505 */
wsrep_ready_get(void)506 my_bool wsrep_ready_get (void)
507 {
508 if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
509 my_bool ret= wsrep_ready;
510 mysql_mutex_unlock (&LOCK_wsrep_ready);
511 return ret;
512 }
513
wsrep_show_ready(THD * thd,SHOW_VAR * var,char * buff)514 int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff)
515 {
516 var->type= SHOW_MY_BOOL;
517 var->value= buff;
518 *((my_bool *)buff)= wsrep_ready_get();
519 return 0;
520 }
521
wsrep_update_cluster_state_uuid(const char * uuid)522 void wsrep_update_cluster_state_uuid(const char* uuid)
523 {
524 strncpy(cluster_uuid_str, uuid, sizeof(cluster_uuid_str) - 1);
525 }
526
wsrep_init_position()527 static void wsrep_init_position()
528 {
529 }
530
531 /****************************************************************************
532 Helpers for wsrep_init()
533 ****************************************************************************/
wsrep_server_name()534 static std::string wsrep_server_name()
535 {
536 std::string ret(wsrep_node_name ? wsrep_node_name : "");
537 return ret;
538 }
539
wsrep_server_id()540 static std::string wsrep_server_id()
541 {
542 /* using empty server_id, which enables view change handler to
543 set final server_id later on
544 */
545 std::string ret("");
546 return ret;
547 }
548
wsrep_server_node_address()549 static std::string wsrep_server_node_address()
550 {
551
552 std::string ret;
553 if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
554 wsrep_data_home_dir= mysql_real_data_home;
555
556 /* Initialize node address */
557 if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
558 {
559 char node_addr[512]= {0, };
560 const size_t node_addr_max= sizeof(node_addr) - 1;
561 size_t guess_ip_ret= wsrep_guess_ip(node_addr, node_addr_max);
562 if (!(guess_ip_ret > 0 && guess_ip_ret < node_addr_max))
563 {
564 WSREP_WARN("Failed to guess base node address. Set it explicitly via "
565 "wsrep_node_address.");
566 }
567 else
568 {
569 ret= node_addr;
570 }
571 }
572 else
573 {
574 ret= wsrep_node_address;
575 }
576 return ret;
577 }
578
wsrep_server_incoming_address()579 static std::string wsrep_server_incoming_address()
580 {
581 std::string ret;
582 const std::string node_addr(wsrep_server_node_address());
583 char inc_addr[512]= { 0, };
584 size_t const inc_addr_max= sizeof (inc_addr);
585
586 /*
587 In case wsrep_node_incoming_address is either not set or set to AUTO,
588 we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback
589 to wsrep_node_address' value if mysqld's bind-address is not set either.
590 */
591 if ((!wsrep_node_incoming_address ||
592 !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
593 {
594 bool is_ipv6= false;
595 unsigned int my_bind_ip= INADDR_ANY; // default if not set
596
597 if (my_bind_addr_str && strlen(my_bind_addr_str) &&
598 strcmp(my_bind_addr_str, "*") != 0)
599 {
600 my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6);
601 }
602
603 if (INADDR_ANY != my_bind_ip)
604 {
605 /*
606 If its a not a valid address, leave inc_addr as empty string. mysqld
607 is not listening for client connections on network interfaces.
608 */
609 if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
610 {
611 const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u";
612 snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port);
613 }
614 }
615 else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */
616 {
617 if (node_addr.size())
618 {
619 size_t const ip_len_mdb= wsrep_host_len(node_addr.c_str(), node_addr.size());
620 if (ip_len_mdb + 7 /* :55555\0 */ < inc_addr_max)
621 {
622 memcpy (inc_addr, node_addr.c_str(), ip_len_mdb);
623 snprintf(inc_addr + ip_len_mdb, inc_addr_max - ip_len_mdb, ":%u",
624 (int)mysqld_port);
625 }
626 else
627 {
628 WSREP_WARN("Guessing address for incoming client connections: "
629 "address too long.");
630 inc_addr[0]= '\0';
631 }
632 }
633
634 if (!strlen(inc_addr))
635 {
636 WSREP_WARN("Guessing address for incoming client connections failed. "
637 "Try setting wsrep_node_incoming_address explicitly.");
638 WSREP_INFO("Node addr: %s", node_addr.c_str());
639 }
640 }
641 }
642 else
643 {
644 wsp::Address addr(wsrep_node_incoming_address);
645
646 if (!addr.is_valid())
647 {
648 WSREP_WARN("Could not parse wsrep_node_incoming_address : %s",
649 wsrep_node_incoming_address);
650 goto done;
651 }
652
653 /*
654 In case port is not specified in wsrep_node_incoming_address, we use
655 mysqld_port.
656 */
657 int port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port;
658 const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
659
660 snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port);
661 }
662
663 done:
664 ret= wsrep_node_incoming_address;
665 return ret;
666 }
667
wsrep_server_working_dir()668 static std::string wsrep_server_working_dir()
669 {
670 std::string ret;
671 if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
672 {
673 ret= mysql_real_data_home;
674 }
675 else
676 {
677 ret= wsrep_data_home_dir;
678 }
679 return ret;
680 }
681
wsrep_server_initial_position()682 static wsrep::gtid wsrep_server_initial_position()
683 {
684 wsrep::gtid ret;
685 WSREP_DEBUG("Server initial position: %s", wsrep_start_position);
686 std::istringstream is(wsrep_start_position);
687 is >> ret;
688 return ret;
689 }
690
691 /*
692 Intitialize provider specific status variables
693 */
wsrep_init_provider_status_variables()694 static void wsrep_init_provider_status_variables()
695 {
696 wsrep_inited= 1;
697 const wsrep::provider& provider=
698 Wsrep_server_state::instance().provider();
699 strncpy(provider_name,
700 provider.name().c_str(), sizeof(provider_name) - 1);
701 strncpy(provider_version,
702 provider.version().c_str(), sizeof(provider_version) - 1);
703 strncpy(provider_vendor,
704 provider.vendor().c_str(), sizeof(provider_vendor) - 1);
705 }
706
wsrep_init_server()707 int wsrep_init_server()
708 {
709 wsrep::log::logger_fn(wsrep_log_cb);
710 try
711 {
712 std::string server_name;
713 std::string server_id;
714 std::string node_address;
715 std::string incoming_address;
716 std::string working_dir;
717 wsrep::gtid initial_position;
718
719 server_name= wsrep_server_name();
720 server_id= wsrep_server_id();
721 node_address= wsrep_server_node_address();
722 incoming_address= wsrep_server_incoming_address();
723 working_dir= wsrep_server_working_dir();
724 initial_position= wsrep_server_initial_position();
725
726 Wsrep_server_state::init_once(server_name,
727 incoming_address,
728 node_address,
729 working_dir,
730 initial_position,
731 wsrep_max_protocol_version);
732 Wsrep_server_state::instance().debug_log_level(wsrep_debug);
733 }
734 catch (const wsrep::runtime_error& e)
735 {
736 WSREP_ERROR("Failed to init wsrep server %s", e.what());
737 return 1;
738 }
739 catch (const std::exception& e)
740 {
741 WSREP_ERROR("Failed to init wsrep server %s", e.what());
742 }
743 return 0;
744 }
745
wsrep_init_globals()746 void wsrep_init_globals()
747 {
748 wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id());
749 wsrep_init_schema();
750
751 if (WSREP_ON)
752 {
753 Wsrep_server_state::instance().initialized();
754 }
755 }
756
wsrep_deinit_server()757 void wsrep_deinit_server()
758 {
759 wsrep_deinit_schema();
760 Wsrep_server_state::destroy();
761 }
762
wsrep_init()763 int wsrep_init()
764 {
765 assert(wsrep_provider);
766
767 wsrep_init_position();
768 wsrep_sst_auth_init();
769
770 if (strlen(wsrep_provider)== 0 ||
771 !strcmp(wsrep_provider, WSREP_NONE))
772 {
773 // enable normal operation in case no provider is specified
774 global_system_variables.wsrep_on= 0;
775 int err= Wsrep_server_state::instance().load_provider(wsrep_provider, wsrep_provider_options ? wsrep_provider_options : "");
776 if (err)
777 {
778 DBUG_PRINT("wsrep",("wsrep::init() failed: %d", err));
779 WSREP_ERROR("wsrep::init() failed: %d, must shutdown", err);
780 }
781 else
782 wsrep_init_provider_status_variables();
783 return err;
784 }
785
786 global_system_variables.wsrep_on= 1;
787
788 WSREP_ON_= wsrep_provider && strcmp(wsrep_provider, WSREP_NONE);
789
790 if (wsrep_gtid_mode && opt_bin_log && !opt_log_slave_updates)
791 {
792 WSREP_ERROR("Option --log-slave-updates is required if "
793 "binlog is enabled, GTID mode is on and wsrep provider "
794 "is specified");
795 return 1;
796 }
797
798 if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
799 wsrep_data_home_dir= mysql_real_data_home;
800
801 if (Wsrep_server_state::instance().load_provider(wsrep_provider,
802 wsrep_provider_options))
803 {
804 WSREP_ERROR("Failed to load provider");
805 return 1;
806 }
807
808 if (!wsrep_provider_is_SR_capable() &&
809 global_system_variables.wsrep_trx_fragment_size > 0)
810 {
811 WSREP_ERROR("The WSREP provider (%s) does not support streaming "
812 "replication but wsrep_trx_fragment_size is set to a "
813 "value other than 0 (%llu). Cannot continue. Either set "
814 "wsrep_trx_fragment_size to 0 or use wsrep_provider that "
815 "supports streaming replication.",
816 wsrep_provider, global_system_variables.wsrep_trx_fragment_size);
817 Wsrep_server_state::instance().unload_provider();
818 return 1;
819 }
820
821 wsrep_init_provider_status_variables();
822 wsrep_capabilities_export(Wsrep_server_state::instance().provider().capabilities(),
823 &wsrep_provider_capabilities);
824
825 WSREP_DEBUG("SR storage init for: %s",
826 (wsrep_SR_store_type == WSREP_SR_STORE_TABLE) ? "table" : "void");
827
828 return 0;
829 }
830
831 /* Initialize wsrep thread LOCKs and CONDs */
wsrep_thr_init()832 void wsrep_thr_init()
833 {
834 DBUG_ENTER("wsrep_thr_init");
835 wsrep_config_state= new wsp::Config_state;
836 #ifdef HAVE_PSI_INTERFACE
837 mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes));
838 mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds));
839 mysql_file_register("sql", wsrep_files, array_elements(wsrep_files));
840 mysql_thread_register("sql", wsrep_threads, array_elements(wsrep_threads));
841 #endif
842
843 mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
844 mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
845 mysql_mutex_init(key_LOCK_wsrep_sst, &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
846 mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
847 mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
848 mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
849 mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
850 mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
851 mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
852 mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL);
853 mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST);
854 mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
855 mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
856 mysql_mutex_init(key_LOCK_wsrep_group_commit, &LOCK_wsrep_group_commit, MY_MUTEX_INIT_FAST);
857 mysql_mutex_init(key_LOCK_wsrep_SR_pool,
858 &LOCK_wsrep_SR_pool, MY_MUTEX_INIT_FAST);
859 mysql_mutex_init(key_LOCK_wsrep_SR_store,
860 &LOCK_wsrep_SR_store, MY_MUTEX_INIT_FAST);
861 mysql_mutex_init(key_LOCK_wsrep_joiner_monitor,
862 &LOCK_wsrep_joiner_monitor, MY_MUTEX_INIT_FAST);
863 mysql_mutex_init(key_LOCK_wsrep_donor_monitor,
864 &LOCK_wsrep_donor_monitor, MY_MUTEX_INIT_FAST);
865 mysql_cond_init(key_COND_wsrep_joiner_monitor, &COND_wsrep_joiner_monitor, NULL);
866 mysql_cond_init(key_COND_wsrep_donor_monitor, &COND_wsrep_donor_monitor, NULL);
867
868 DBUG_VOID_RETURN;
869 }
870
wsrep_init_startup(bool sst_first)871 void wsrep_init_startup (bool sst_first)
872 {
873 if (wsrep_init()) unireg_abort(1);
874
875 /*
876 Pre-initialize global_system_variables.table_plugin with a dummy engine
877 (placeholder) required during the initialization of wsrep threads (THDs).
878 (see: plugin_thdvar_init())
879 Note: This only needs to be done for rsync & mariabackup based SST methods.
880 In case of mysqldump SST method, the wsrep threads are created after the
881 server plugins & global system variables are initialized.
882 */
883 if (wsrep_before_SE())
884 wsrep_plugins_pre_init();
885
886 /* Skip replication start if dummy wsrep provider is loaded */
887 if (!strcmp(wsrep_provider, WSREP_NONE)) return;
888
889 /* Skip replication start if no cluster address */
890 if (!wsrep_cluster_address_exists()) return;
891
892 /*
893 Read value of wsrep_new_cluster before wsrep_start_replication(),
894 the value is reset to FALSE inside wsrep_start_replication.
895 */
896 if (!wsrep_start_replication(wsrep_cluster_address)) unireg_abort(1);
897
898 wsrep_create_rollbacker();
899 wsrep_create_appliers(1);
900
901 Wsrep_server_state& server_state= Wsrep_server_state::instance();
902 /*
903 If the SST happens before server initialization, wait until the server
904 state reaches initializing. This indicates that
905 either SST was not necessary or SST has been delivered.
906
907 With mysqldump SST (!sst_first) wait until the server reaches
908 joiner state and procedd to accepting connections.
909 */
910 if (sst_first)
911 {
912 server_state.wait_until_state(Wsrep_server_state::s_initializing);
913 }
914 else
915 {
916 server_state.wait_until_state(Wsrep_server_state::s_joiner);
917 }
918 }
919
920
wsrep_deinit(bool free_options)921 void wsrep_deinit(bool free_options)
922 {
923 DBUG_ASSERT(wsrep_inited == 1);
924 WSREP_DEBUG("wsrep_deinit");
925
926 Wsrep_server_state::instance().unload_provider();
927 provider_name[0]= '\0';
928 provider_version[0]= '\0';
929 provider_vendor[0]= '\0';
930
931 wsrep_inited= 0;
932
933 if (wsrep_provider_capabilities != NULL)
934 {
935 char* p= wsrep_provider_capabilities;
936 wsrep_provider_capabilities= NULL;
937 free(p);
938 }
939
940 if (free_options)
941 {
942 wsrep_sst_auth_free();
943 }
944 }
945
946 /* Destroy wsrep thread LOCKs and CONDs */
wsrep_thr_deinit()947 void wsrep_thr_deinit()
948 {
949 if (!wsrep_config_state)
950 return; // Never initialized
951 WSREP_DEBUG("wsrep_thr_deinit");
952 mysql_mutex_destroy(&LOCK_wsrep_ready);
953 mysql_cond_destroy(&COND_wsrep_ready);
954 mysql_mutex_destroy(&LOCK_wsrep_sst);
955 mysql_cond_destroy(&COND_wsrep_sst);
956 mysql_mutex_destroy(&LOCK_wsrep_sst_init);
957 mysql_cond_destroy(&COND_wsrep_sst_init);
958 mysql_mutex_destroy(&LOCK_wsrep_replaying);
959 mysql_cond_destroy(&COND_wsrep_replaying);
960 mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
961 mysql_cond_destroy(&COND_wsrep_slave_threads);
962 mysql_mutex_destroy(&LOCK_wsrep_cluster_config);
963 mysql_mutex_destroy(&LOCK_wsrep_desync);
964 mysql_mutex_destroy(&LOCK_wsrep_config_state);
965 mysql_mutex_destroy(&LOCK_wsrep_group_commit);
966 mysql_mutex_destroy(&LOCK_wsrep_SR_pool);
967 mysql_mutex_destroy(&LOCK_wsrep_SR_store);
968 mysql_mutex_destroy(&LOCK_wsrep_joiner_monitor);
969 mysql_mutex_destroy(&LOCK_wsrep_donor_monitor);
970 mysql_cond_destroy(&COND_wsrep_joiner_monitor);
971 mysql_cond_destroy(&COND_wsrep_donor_monitor);
972
973 delete wsrep_config_state;
974 wsrep_config_state= 0; // Safety
975
976 if (wsrep_cluster_capabilities != NULL)
977 {
978 char* p= wsrep_cluster_capabilities;
979 wsrep_cluster_capabilities= NULL;
980 free(p);
981 }
982 }
983
wsrep_recover()984 void wsrep_recover()
985 {
986 char uuid_str[40];
987
988 if (wsrep_uuid_compare(&local_uuid, &WSREP_UUID_UNDEFINED) == 0 &&
989 local_seqno == -2)
990 {
991 wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
992 WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
993 uuid_str, (long long)local_seqno);
994 return;
995 }
996 wsrep::gtid gtid= wsrep_get_SE_checkpoint();
997 std::ostringstream oss;
998 oss << gtid;
999 WSREP_INFO("Recovered position: %s", oss.str().c_str());
1000 }
1001
1002
wsrep_stop_replication(THD * thd)1003 void wsrep_stop_replication(THD *thd)
1004 {
1005 WSREP_INFO("Stop replication by %llu", (thd) ? thd->thread_id : 0);
1006 if (Wsrep_server_state::instance().state() !=
1007 Wsrep_server_state::s_disconnected)
1008 {
1009 WSREP_DEBUG("Disconnect provider");
1010 Wsrep_server_state::instance().disconnect();
1011 Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected);
1012 }
1013
1014 /* my connection, should not terminate with wsrep_close_client_connection(),
1015 make transaction to rollback
1016 */
1017 if (thd && !thd->wsrep_applier) trans_rollback(thd);
1018 wsrep_close_client_connections(TRUE, thd);
1019
1020 /* wait until appliers have stopped */
1021 wsrep_wait_appliers_close(thd);
1022
1023 node_uuid= WSREP_UUID_UNDEFINED;
1024 }
1025
wsrep_shutdown_replication()1026 void wsrep_shutdown_replication()
1027 {
1028 WSREP_INFO("Shutdown replication");
1029 if (Wsrep_server_state::instance().state() != wsrep::server_state::s_disconnected)
1030 {
1031 WSREP_DEBUG("Disconnect provider");
1032 Wsrep_server_state::instance().disconnect();
1033 Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected);
1034 }
1035
1036 wsrep_close_client_connections(TRUE);
1037
1038 /* wait until appliers have stopped */
1039 wsrep_wait_appliers_close(NULL);
1040 node_uuid= WSREP_UUID_UNDEFINED;
1041
1042 /* Undocking the thread specific data. */
1043 my_pthread_setspecific_ptr(THR_THD, NULL);
1044 }
1045
wsrep_start_replication(const char * wsrep_cluster_address)1046 bool wsrep_start_replication(const char *wsrep_cluster_address)
1047 {
1048 int rcode;
1049 WSREP_DEBUG("wsrep_start_replication");
1050
1051 /*
1052 if provider is trivial, don't even try to connect,
1053 but resume local node operation
1054 */
1055 if (!WSREP_PROVIDER_EXISTS)
1056 {
1057 // enable normal operation in case no provider is specified
1058 return true;
1059 }
1060
1061 DBUG_ASSERT(wsrep_cluster_address[0]);
1062
1063 bool const bootstrap(TRUE == wsrep_new_cluster);
1064 wsrep_new_cluster= FALSE;
1065
1066 WSREP_INFO("Start replication");
1067
1068 if ((rcode= Wsrep_server_state::instance().connect(
1069 wsrep_cluster_name,
1070 wsrep_cluster_address,
1071 wsrep_sst_donor,
1072 bootstrap)))
1073 {
1074 DBUG_PRINT("wsrep",("wsrep_ptr->connect(%s) failed: %d",
1075 wsrep_cluster_address, rcode));
1076 WSREP_ERROR("wsrep::connect(%s) failed: %d",
1077 wsrep_cluster_address, rcode);
1078 return false;
1079 }
1080 else
1081 {
1082 try
1083 {
1084 std::string opts= Wsrep_server_state::instance().provider().options();
1085 wsrep_provider_options_init(opts.c_str());
1086 }
1087 catch (const wsrep::runtime_error&)
1088 {
1089 WSREP_WARN("Failed to get wsrep options");
1090 }
1091 }
1092
1093 return true;
1094 }
1095
wsrep_must_sync_wait(THD * thd,uint mask)1096 bool wsrep_must_sync_wait (THD* thd, uint mask)
1097 {
1098 bool ret;
1099 mysql_mutex_lock(&thd->LOCK_thd_data);
1100 ret= (thd->variables.wsrep_sync_wait & mask) &&
1101 thd->wsrep_client_thread &&
1102 WSREP_ON && thd->variables.wsrep_on &&
1103 !(thd->variables.wsrep_dirty_reads &&
1104 !is_update_query(thd->lex->sql_command)) &&
1105 !thd->in_active_multi_stmt_transaction() &&
1106 thd->wsrep_trx().state() !=
1107 wsrep::transaction::s_replaying &&
1108 thd->wsrep_cs().sync_wait_gtid().is_undefined();
1109 mysql_mutex_unlock(&thd->LOCK_thd_data);
1110 return ret;
1111 }
1112
wsrep_sync_wait(THD * thd,uint mask)1113 bool wsrep_sync_wait (THD* thd, uint mask)
1114 {
1115 if (wsrep_must_sync_wait(thd, mask))
1116 {
1117 WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait= %u, "
1118 "mask= %u, thd->variables.wsrep_on= %d",
1119 thd->variables.wsrep_sync_wait, mask,
1120 thd->variables.wsrep_on);
1121 /*
1122 This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
1123 TODO: modify to check if thd has locked any rows.
1124 */
1125 if (thd->wsrep_cs().sync_wait(-1))
1126 {
1127 const char* msg;
1128 int err;
1129
1130 /*
1131 Possibly relevant error codes:
1132 ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
1133 ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
1134 ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
1135 */
1136
1137 switch (thd->wsrep_cs().current_error())
1138 {
1139 case wsrep::e_not_supported_error:
1140 msg= "synchronous reads by wsrep backend. "
1141 "Please unset wsrep_causal_reads variable.";
1142 err= ER_NOT_SUPPORTED_YET;
1143 break;
1144 default:
1145 msg= "Synchronous wait failed.";
1146 err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
1147 // with ER_LOCK_WAIT_TIMEOUT
1148 }
1149
1150 my_error(err, MYF(0), msg);
1151
1152 return true;
1153 }
1154 }
1155
1156 return false;
1157 }
1158
1159 enum wsrep::provider::status
wsrep_sync_wait_upto(THD * thd,wsrep_gtid_t * upto,int timeout)1160 wsrep_sync_wait_upto (THD* thd,
1161 wsrep_gtid_t* upto,
1162 int timeout)
1163 {
1164 DBUG_ASSERT(upto);
1165 enum wsrep::provider::status ret;
1166 if (upto)
1167 {
1168 wsrep::gtid upto_gtid(wsrep::id(upto->uuid.data, sizeof(upto->uuid.data)),
1169 wsrep::seqno(upto->seqno));
1170 ret= Wsrep_server_state::instance().wait_for_gtid(upto_gtid, timeout);
1171 }
1172 else
1173 {
1174 ret= Wsrep_server_state::instance().causal_read(timeout).second;
1175 }
1176 WSREP_DEBUG("wsrep_sync_wait_upto: %d", ret);
1177 return ret;
1178 }
1179
wsrep_keys_free(wsrep_key_arr_t * key_arr)1180 void wsrep_keys_free(wsrep_key_arr_t* key_arr)
1181 {
1182 for (size_t i= 0; i < key_arr->keys_len; ++i)
1183 {
1184 my_free((void*)key_arr->keys[i].key_parts);
1185 }
1186 my_free(key_arr->keys);
1187 key_arr->keys= 0;
1188 key_arr->keys_len= 0;
1189 }
1190
1191 /*!
1192 * @param thd thread
1193 * @param tables list of tables
1194 * @param keys prepared keys
1195
1196 * @return true if parent table append was successfull, otherwise false.
1197 */
1198 bool
wsrep_append_fk_parent_table(THD * thd,TABLE_LIST * tables,wsrep::key_array * keys)1199 wsrep_append_fk_parent_table(THD* thd, TABLE_LIST* tables, wsrep::key_array* keys)
1200 {
1201 bool fail= false;
1202 TABLE_LIST *table;
1203
1204 for (table= tables; table; table= table->next_local)
1205 {
1206 if (is_temporary_table(table))
1207 {
1208 WSREP_DEBUG("Temporary table %s.%s already opened query=%s", table->db.str,
1209 table->table_name.str, wsrep_thd_query(thd));
1210 return false;
1211 }
1212 }
1213
1214 thd->release_transactional_locks();
1215 uint counter;
1216 MDL_savepoint mdl_savepoint= thd->mdl_context.mdl_savepoint();
1217
1218 if (open_tables(thd, &tables, &counter, MYSQL_OPEN_FORCE_SHARED_HIGH_PRIO_MDL))
1219 {
1220 WSREP_DEBUG("Unable to open table for FK checks for %s", wsrep_thd_query(thd));
1221 fail= true;
1222 goto exit;
1223 }
1224
1225 for (table= tables; table; table= table->next_local)
1226 {
1227 if (!is_temporary_table(table) && table->table)
1228 {
1229 FOREIGN_KEY_INFO *f_key_info;
1230 List<FOREIGN_KEY_INFO> f_key_list;
1231
1232 table->table->file->get_foreign_key_list(thd, &f_key_list);
1233 List_iterator_fast<FOREIGN_KEY_INFO> it(f_key_list);
1234 while ((f_key_info=it++))
1235 {
1236 WSREP_DEBUG("appended fkey %s", f_key_info->referenced_table->str);
1237 keys->push_back(wsrep_prepare_key_for_toi(f_key_info->referenced_db->str,
1238 f_key_info->referenced_table->str,
1239 wsrep::key::shared));
1240 }
1241 }
1242 }
1243
1244 exit:
1245 /* close the table and release MDL locks */
1246 close_thread_tables(thd);
1247 thd->mdl_context.rollback_to_savepoint(mdl_savepoint);
1248 for (table= tables; table; table= table->next_local)
1249 {
1250 table->table= NULL;
1251 table->next_global= NULL;
1252 table->mdl_request.ticket= NULL;
1253 }
1254
1255 return fail;
1256 }
1257
wsrep_reload_ssl()1258 bool wsrep_reload_ssl()
1259 {
1260 try
1261 {
1262 std::string opts= Wsrep_server_state::instance().provider().options();
1263 if (opts.find("socket.ssl_reload") == std::string::npos)
1264 {
1265 WSREP_DEBUG("Option `socket.ssl_reload` not found in parameters.");
1266 return false;
1267 }
1268 const std::string reload_ssl_param("socket.ssl_reload=1");
1269 enum wsrep::provider::status ret= Wsrep_server_state::instance().provider().options(reload_ssl_param);
1270 if (ret)
1271 {
1272 WSREP_ERROR("Set options returned %d", ret);
1273 return true;
1274 }
1275 return false;
1276 }
1277 catch (...)
1278 {
1279 WSREP_ERROR("Failed to get provider options");
1280 return true;
1281 }
1282 }
1283
1284 /*!
1285 * @param db Database string
1286 * @param table Table string
1287 * @param key Array of wsrep_key_t
1288 * @param key_len In: number of elements in key array, Out: number of
1289 * elements populated
1290 *
1291 * @return true if preparation was successful, otherwise false.
1292 */
1293
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_buf_t * key,size_t * key_len)1294 static bool wsrep_prepare_key_for_isolation(const char* db,
1295 const char* table,
1296 wsrep_buf_t* key,
1297 size_t* key_len)
1298 {
1299 if (*key_len < 2) return false;
1300
1301 switch (wsrep_protocol_version)
1302 {
1303 case 0:
1304 *key_len= 0;
1305 break;
1306 case 1:
1307 case 2:
1308 case 3:
1309 case 4:
1310 {
1311 *key_len= 0;
1312 if (db)
1313 {
1314 key[*key_len].ptr= db;
1315 key[*key_len].len= strlen(db);
1316 ++(*key_len);
1317 if (table)
1318 {
1319 key[*key_len].ptr= table;
1320 key[*key_len].len= strlen(table);
1321 ++(*key_len);
1322 }
1323 }
1324 break;
1325 }
1326 default:
1327 assert(0);
1328 WSREP_ERROR("Unsupported protocol version: %ld", wsrep_protocol_version);
1329 unireg_abort(1);
1330 return false;
1331 }
1332
1333 return true;
1334 }
1335
wsrep_prepare_key_for_isolation(const char * db,const char * table,wsrep_key_arr_t * ka)1336 static bool wsrep_prepare_key_for_isolation(const char* db,
1337 const char* table,
1338 wsrep_key_arr_t* ka)
1339 {
1340 wsrep_key_t* tmp;
1341 tmp= (wsrep_key_t*)my_realloc(ka->keys,
1342 (ka->keys_len + 1) * sizeof(wsrep_key_t),
1343 MYF(MY_ALLOW_ZERO_PTR));
1344 if (!tmp)
1345 {
1346 WSREP_ERROR("Can't allocate memory for key_array");
1347 return false;
1348 }
1349 ka->keys= tmp;
1350 if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
1351 my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
1352 {
1353 WSREP_ERROR("Can't allocate memory for key_parts");
1354 return false;
1355 }
1356 ka->keys[ka->keys_len].key_parts_num= 2;
1357 ++ka->keys_len;
1358 if (!wsrep_prepare_key_for_isolation(db, table,
1359 (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
1360 &ka->keys[ka->keys_len - 1].key_parts_num))
1361 {
1362 WSREP_ERROR("Preparing keys for isolation failed");
1363 return false;
1364 }
1365
1366 return true;
1367 }
1368
wsrep_prepare_keys_for_alter_add_fk(const char * child_table_db,Alter_info * alter_info,wsrep_key_arr_t * ka)1369 static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
1370 Alter_info* alter_info,
1371 wsrep_key_arr_t* ka)
1372 {
1373 Key *key;
1374 List_iterator<Key> key_iterator(alter_info->key_list);
1375 while ((key= key_iterator++))
1376 {
1377 if (key->type == Key::FOREIGN_KEY)
1378 {
1379 Foreign_key *fk_key= (Foreign_key *)key;
1380 const char *db_name= fk_key->ref_db.str;
1381 const char *table_name= fk_key->ref_table.str;
1382 if (!db_name)
1383 {
1384 db_name= child_table_db;
1385 }
1386 if (!wsrep_prepare_key_for_isolation(db_name, table_name, ka))
1387 {
1388 return false;
1389 }
1390 }
1391 }
1392 return true;
1393 }
1394
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep_key_arr_t * ka)1395 static bool wsrep_prepare_keys_for_isolation(THD* thd,
1396 const char* db,
1397 const char* table,
1398 const TABLE_LIST* table_list,
1399 Alter_info* alter_info,
1400 wsrep_key_arr_t* ka)
1401 {
1402 ka->keys= 0;
1403 ka->keys_len= 0;
1404
1405 if (db || table)
1406 {
1407 if (!wsrep_prepare_key_for_isolation(db, table, ka))
1408 goto err;
1409 }
1410
1411 for (const TABLE_LIST* table= table_list; table; table= table->next_global)
1412 {
1413 if (!wsrep_prepare_key_for_isolation(table->db.str, table->table_name.str, ka))
1414 goto err;
1415 }
1416
1417 if (alter_info)
1418 {
1419 if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info, ka))
1420 goto err;
1421 }
1422 return false;
1423
1424 err:
1425 wsrep_keys_free(ka);
1426 return true;
1427 }
1428
1429 /*
1430 * Prepare key list from db/table and table_list
1431 *
1432 * Return zero in case of success, 1 in case of failure.
1433 */
1434
wsrep_prepare_keys_for_isolation(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,wsrep_key_arr_t * ka)1435 bool wsrep_prepare_keys_for_isolation(THD* thd,
1436 const char* db,
1437 const char* table,
1438 const TABLE_LIST* table_list,
1439 wsrep_key_arr_t* ka)
1440 {
1441 return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka);
1442 }
1443
wsrep_prepare_key(const uchar * cache_key,size_t cache_key_len,const uchar * row_id,size_t row_id_len,wsrep_buf_t * key,size_t * key_len)1444 bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len,
1445 const uchar* row_id, size_t row_id_len,
1446 wsrep_buf_t* key, size_t* key_len)
1447 {
1448 if (*key_len < 3) return false;
1449
1450 *key_len= 0;
1451 switch (wsrep_protocol_version)
1452 {
1453 case 0:
1454 {
1455 key[0].ptr= cache_key;
1456 key[0].len= cache_key_len;
1457
1458 *key_len= 1;
1459 break;
1460 }
1461 case 1:
1462 case 2:
1463 case 3:
1464 case 4:
1465 {
1466 key[0].ptr= cache_key;
1467 key[0].len= strlen( (char*)cache_key );
1468
1469 key[1].ptr= cache_key + strlen( (char*)cache_key ) + 1;
1470 key[1].len= strlen( (char*)(key[1].ptr) );
1471
1472 *key_len= 2;
1473 break;
1474 }
1475 default:
1476 return false;
1477 }
1478
1479 key[*key_len].ptr= row_id;
1480 key[*key_len].len= row_id_len;
1481 ++(*key_len);
1482
1483 return true;
1484 }
1485
wsrep_prepare_key_for_innodb(THD * thd,const uchar * cache_key,size_t cache_key_len,const uchar * row_id,size_t row_id_len,wsrep_buf_t * key,size_t * key_len)1486 bool wsrep_prepare_key_for_innodb(THD* thd,
1487 const uchar* cache_key,
1488 size_t cache_key_len,
1489 const uchar* row_id,
1490 size_t row_id_len,
1491 wsrep_buf_t* key,
1492 size_t* key_len)
1493 {
1494
1495 return wsrep_prepare_key(cache_key, cache_key_len, row_id, row_id_len, key, key_len);
1496 }
1497
wsrep_prepare_key_for_toi(const char * db,const char * table,enum wsrep::key::type type)1498 wsrep::key wsrep_prepare_key_for_toi(const char* db, const char* table,
1499 enum wsrep::key::type type)
1500 {
1501 wsrep::key ret(type);
1502 DBUG_ASSERT(db);
1503 ret.append_key_part(db, strlen(db));
1504 if (table) ret.append_key_part(table, strlen(table));
1505 return ret;
1506 }
1507
1508 wsrep::key_array
wsrep_prepare_keys_for_alter_add_fk(const char * child_table_db,Alter_info * alter_info)1509 wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
1510 Alter_info* alter_info)
1511
1512 {
1513 wsrep::key_array ret;
1514 Key *key;
1515 List_iterator<Key> key_iterator(alter_info->key_list);
1516 while ((key= key_iterator++))
1517 {
1518 if (key->type == Key::FOREIGN_KEY)
1519 {
1520 Foreign_key *fk_key= (Foreign_key *)key;
1521 const char *db_name= fk_key->ref_db.str;
1522 const char *table_name= fk_key->ref_table.str;
1523 if (!db_name)
1524 {
1525 db_name= child_table_db;
1526 }
1527 ret.push_back(wsrep_prepare_key_for_toi(db_name, table_name,
1528 wsrep::key::exclusive));
1529 }
1530 }
1531 return ret;
1532 }
1533
wsrep_prepare_keys_for_toi(const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep::key_array * fk_tables)1534 wsrep::key_array wsrep_prepare_keys_for_toi(const char* db,
1535 const char* table,
1536 const TABLE_LIST* table_list,
1537 Alter_info* alter_info,
1538 wsrep::key_array* fk_tables)
1539 {
1540 wsrep::key_array ret;
1541 if (db || table)
1542 {
1543 ret.push_back(wsrep_prepare_key_for_toi(db, table, wsrep::key::exclusive));
1544 }
1545 for (const TABLE_LIST* table= table_list; table; table= table->next_global)
1546 {
1547 ret.push_back(wsrep_prepare_key_for_toi(table->db.str, table->table_name.str,
1548 wsrep::key::exclusive));
1549 }
1550 if (alter_info)
1551 {
1552 wsrep::key_array fk(wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info));
1553 if (!fk.empty())
1554 {
1555 ret.insert(ret.end(), fk.begin(), fk.end());
1556 }
1557 }
1558 if (fk_tables && !fk_tables->empty())
1559 {
1560 ret.insert(ret.end(), fk_tables->begin(), fk_tables->end());
1561 }
1562 return ret;
1563 }
1564
1565 /*
1566 * Construct Query_log_Event from thd query and serialize it
1567 * into buffer.
1568 *
1569 * Return 0 in case of success, 1 in case of error.
1570 */
wsrep_to_buf_helper(THD * thd,const char * query,uint query_len,uchar ** buf,size_t * buf_len)1571 int wsrep_to_buf_helper(
1572 THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
1573 {
1574 IO_CACHE tmp_io_cache;
1575 Log_event_writer writer(&tmp_io_cache, 0);
1576 if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
1577 65536, MYF(MY_WME)))
1578 return 1;
1579 int ret(0);
1580 enum enum_binlog_checksum_alg current_binlog_check_alg=
1581 (enum_binlog_checksum_alg) binlog_checksum_options;
1582
1583 Format_description_log_event *tmp_fd= new Format_description_log_event(4);
1584 tmp_fd->checksum_alg= current_binlog_check_alg;
1585 writer.write(tmp_fd);
1586 delete tmp_fd;
1587
1588 #ifdef GTID_SUPPORT
1589 if (thd->variables.gtid_next.type == GTID_GROUP)
1590 {
1591 Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
1592 if (!gtid_ev.is_valid()) ret= 0;
1593 if (!ret && writer.write(>id_ev)) ret= 1;
1594 }
1595 #endif /* GTID_SUPPORT */
1596 if (wsrep_gtid_mode && thd->variables.gtid_seq_no)
1597 {
1598 Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no,
1599 thd->variables.gtid_domain_id,
1600 true, LOG_EVENT_SUPPRESS_USE_F,
1601 true, 0);
1602 gtid_event.server_id= thd->variables.server_id;
1603 if (!gtid_event.is_valid()) ret= 0;
1604 ret= writer.write(>id_event);
1605 }
1606
1607 /* if there is prepare query, add event for it */
1608 if (!ret && thd->wsrep_TOI_pre_query)
1609 {
1610 Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
1611 thd->wsrep_TOI_pre_query_len,
1612 FALSE, FALSE, FALSE, 0);
1613 ev.checksum_alg= current_binlog_check_alg;
1614 if (writer.write(&ev)) ret= 1;
1615 }
1616
1617 /* continue to append the actual query */
1618 Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
1619 ev.checksum_alg= current_binlog_check_alg;
1620 if (!ret && writer.write(&ev)) ret= 1;
1621 if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
1622 close_cached_file(&tmp_io_cache);
1623 return ret;
1624 }
1625
1626 static int
wsrep_alter_query_string(THD * thd,String * buf)1627 wsrep_alter_query_string(THD *thd, String *buf)
1628 {
1629 /* Append the "ALTER" part of the query */
1630 if (buf->append(STRING_WITH_LEN("ALTER ")))
1631 return 1;
1632 /* Append definer */
1633 append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host));
1634 /* Append the left part of thd->query after event name part */
1635 if (buf->append(thd->lex->stmt_definition_begin,
1636 thd->lex->stmt_definition_end -
1637 thd->lex->stmt_definition_begin))
1638 return 1;
1639
1640 return 0;
1641 }
1642
wsrep_alter_event_query(THD * thd,uchar ** buf,size_t * buf_len)1643 static int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len)
1644 {
1645 String log_query;
1646
1647 if (wsrep_alter_query_string(thd, &log_query))
1648 {
1649 WSREP_WARN("events alter string failed: schema: %s, query: %s",
1650 thd->get_db(), thd->query());
1651 return 1;
1652 }
1653 return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
1654 }
1655
1656 #include "sql_show.h"
1657 static int
create_view_query(THD * thd,uchar ** buf,size_t * buf_len)1658 create_view_query(THD *thd, uchar** buf, size_t* buf_len)
1659 {
1660 LEX *lex= thd->lex;
1661 SELECT_LEX *select_lex= lex->first_select_lex();
1662 TABLE_LIST *first_table= select_lex->table_list.first;
1663 TABLE_LIST *views= first_table;
1664 LEX_USER *definer;
1665 String buff;
1666 const LEX_CSTRING command[3]=
1667 {{ STRING_WITH_LEN("CREATE ") },
1668 { STRING_WITH_LEN("ALTER ") },
1669 { STRING_WITH_LEN("CREATE OR REPLACE ") }};
1670
1671 buff.append(&command[thd->lex->create_view->mode]);
1672
1673 if (lex->definer)
1674 definer= get_current_user(thd, lex->definer);
1675 else
1676 {
1677 /*
1678 DEFINER-clause is missing; we have to create default definer in
1679 persistent arena to be PS/SP friendly.
1680 If this is an ALTER VIEW then the current user should be set as
1681 the definer.
1682 */
1683 definer= create_default_definer(thd, false);
1684 }
1685
1686 if (definer)
1687 {
1688 views->definer.user= definer->user;
1689 views->definer.host= definer->host;
1690 } else {
1691 WSREP_ERROR("Failed to get DEFINER for VIEW.");
1692 return 1;
1693 }
1694
1695 views->algorithm = lex->create_view->algorithm;
1696 views->view_suid = lex->create_view->suid;
1697 views->with_check = lex->create_view->check;
1698
1699 view_store_options(thd, views, &buff);
1700 buff.append(STRING_WITH_LEN("VIEW "));
1701 /* Test if user supplied a db (ie: we did not use thd->db) */
1702 if (views->db.str && views->db.str[0] &&
1703 (thd->db.str == NULL || cmp(&views->db, &thd->db)))
1704 {
1705 append_identifier(thd, &buff, &views->db);
1706 buff.append('.');
1707 }
1708 append_identifier(thd, &buff, &views->table_name);
1709 if (lex->view_list.elements)
1710 {
1711 List_iterator_fast<LEX_CSTRING> names(lex->view_list);
1712 LEX_CSTRING *name;
1713 int i;
1714
1715 for (i= 0; (name= names++); i++)
1716 {
1717 buff.append(i ? ", " : "(");
1718 append_identifier(thd, &buff, name);
1719 }
1720 buff.append(')');
1721 }
1722 buff.append(STRING_WITH_LEN(" AS "));
1723 buff.append(thd->lex->create_view->select.str,
1724 thd->lex->create_view->select.length);
1725 return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1726 }
1727
1728 /*
1729 Rewrite DROP TABLE for TOI. Temporary tables are eliminated from
1730 the query as they are visible only to client connection.
1731
1732 TODO: See comments for sql_base.cc:drop_temporary_table() and refine
1733 the function to deal with transactional locked tables.
1734 */
wsrep_drop_table_query(THD * thd,uchar ** buf,size_t * buf_len)1735 static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len)
1736 {
1737
1738 LEX* lex= thd->lex;
1739 SELECT_LEX* select_lex= lex->first_select_lex();
1740 TABLE_LIST* first_table= select_lex->table_list.first;
1741 String buff;
1742
1743 DBUG_ASSERT(!lex->create_info.tmp_table());
1744
1745 bool found_temp_table= false;
1746 for (TABLE_LIST* table= first_table; table; table= table->next_global)
1747 {
1748 if (thd->find_temporary_table(table->db.str, table->table_name.str))
1749 {
1750 found_temp_table= true;
1751 break;
1752 }
1753 }
1754
1755 if (found_temp_table)
1756 {
1757 buff.append("DROP TABLE ");
1758 if (lex->check_exists)
1759 buff.append("IF EXISTS ");
1760
1761 for (TABLE_LIST* table= first_table; table; table= table->next_global)
1762 {
1763 if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1764 {
1765 append_identifier(thd, &buff, table->db.str, table->db.length);
1766 buff.append(".");
1767 append_identifier(thd, &buff,
1768 table->table_name.str, table->table_name.length);
1769 buff.append(",");
1770 }
1771 }
1772
1773 /* Chop the last comma */
1774 buff.chop();
1775 buff.append(" /* generated by wsrep */");
1776
1777 WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr());
1778
1779 return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1780 }
1781 else
1782 {
1783 return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1784 buf, buf_len);
1785 }
1786 }
1787
1788
1789 /* Forward declarations. */
1790 int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
1791
1792 /*
1793 Decide if statement should run in TOI.
1794
1795 Look if table or table_list contain temporary tables. If the
1796 statement affects only temporary tables, statement should not run
1797 in TOI. If the table list contains mix of regular and temporary tables
1798 (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but
1799 should be rewritten at later time for replication to contain only
1800 non-temporary tables.
1801 */
wsrep_can_run_in_toi(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list)1802 static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
1803 const TABLE_LIST *table_list)
1804 {
1805 DBUG_ASSERT(!table || db);
1806 DBUG_ASSERT(table_list || db);
1807
1808 LEX* lex= thd->lex;
1809 SELECT_LEX* select_lex= lex->first_select_lex();
1810 TABLE_LIST* first_table= select_lex->table_list.first;
1811
1812 switch (lex->sql_command)
1813 {
1814 case SQLCOM_CREATE_TABLE:
1815 if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)
1816 {
1817 return false;
1818 }
1819 /*
1820 If mariadb master has replicated a CTAS, we should not replicate the create table
1821 part separately as TOI, but to replicate both create table and following inserts
1822 as one write set.
1823 Howver, if CTAS creates empty table, we should replicate the create table alone
1824 as TOI. We have to do relay log event lookup to see if row events follow the
1825 create table event.
1826 */
1827 if (thd->slave_thread && !(thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_STANDALONE))
1828 {
1829 /* this is CTAS, either empty or populated table */
1830 ulonglong event_size = 0;
1831 enum Log_event_type ev_type= wsrep_peak_event(thd->rgi_slave, &event_size);
1832 switch (ev_type)
1833 {
1834 case QUERY_EVENT:
1835 /* CTAS with empty table, we replicate create table as TOI */
1836 break;
1837
1838 case TABLE_MAP_EVENT:
1839 WSREP_DEBUG("replicating CTAS of empty table as TOI");
1840 // fall through
1841 case WRITE_ROWS_EVENT:
1842 /* CTAS with populated table, we replicate later at commit time */
1843 WSREP_DEBUG("skipping create table of CTAS replication");
1844 return false;
1845
1846 default:
1847 WSREP_WARN("unexpected async replication event: %d", ev_type);
1848 }
1849 return true;
1850 }
1851 /* no next async replication event */
1852 return true;
1853
1854 case SQLCOM_CREATE_VIEW:
1855
1856 DBUG_ASSERT(!table_list);
1857 DBUG_ASSERT(first_table); /* First table is view name */
1858 /*
1859 If any of the remaining tables refer to temporary table error
1860 is returned to client, so TOI can be skipped
1861 */
1862 for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global)
1863 {
1864 if (thd->find_temporary_table(it))
1865 {
1866 return false;
1867 }
1868 }
1869 return true;
1870
1871 case SQLCOM_CREATE_TRIGGER:
1872
1873 DBUG_ASSERT(first_table);
1874
1875 if (thd->find_temporary_table(first_table))
1876 {
1877 return false;
1878 }
1879 return true;
1880
1881 case SQLCOM_DROP_TRIGGER:
1882 DBUG_ASSERT(table_list);
1883 if (thd->find_temporary_table(table_list))
1884 {
1885 return false;
1886 }
1887 return true;
1888
1889 default:
1890 if (table && !thd->find_temporary_table(db, table))
1891 {
1892 return true;
1893 }
1894
1895 if (table_list)
1896 {
1897 for (TABLE_LIST* table= first_table; table; table= table->next_global)
1898 {
1899 if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1900 {
1901 return true;
1902 }
1903 }
1904 }
1905 return !(table || table_list);
1906 }
1907 }
1908
1909
wsrep_create_sp(THD * thd,uchar ** buf,size_t * buf_len)1910 static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
1911 {
1912 String log_query;
1913 sp_head *sp= thd->lex->sphead;
1914 sql_mode_t saved_mode= thd->variables.sql_mode;
1915 String retstr(64);
1916 LEX_CSTRING returns= empty_clex_str;
1917 retstr.set_charset(system_charset_info);
1918
1919 log_query.set_charset(system_charset_info);
1920
1921 if (sp->m_handler->type() == TYPE_ENUM_FUNCTION)
1922 {
1923 sp_returns_type(thd, retstr, sp);
1924 returns= retstr.lex_cstring();
1925 }
1926 if (sp->m_handler->
1927 show_create_sp(thd, &log_query,
1928 sp->m_explicit_name ? sp->m_db : null_clex_str,
1929 sp->m_name, sp->m_params, returns,
1930 sp->m_body, sp->chistics(),
1931 thd->lex->definer[0],
1932 thd->lex->create_info,
1933 saved_mode))
1934 {
1935 WSREP_WARN("SP create string failed: schema: %s, query: %s",
1936 thd->get_db(), thd->query());
1937 return 1;
1938 }
1939
1940 return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
1941 }
1942
wsrep_TOI_event_buf(THD * thd,uchar ** buf,size_t * buf_len)1943 static int wsrep_TOI_event_buf(THD* thd, uchar** buf, size_t* buf_len)
1944 {
1945 int err;
1946 switch (thd->lex->sql_command)
1947 {
1948 case SQLCOM_CREATE_VIEW:
1949 err= create_view_query(thd, buf, buf_len);
1950 break;
1951 case SQLCOM_CREATE_PROCEDURE:
1952 case SQLCOM_CREATE_SPFUNCTION:
1953 err= wsrep_create_sp(thd, buf, buf_len);
1954 break;
1955 case SQLCOM_CREATE_TRIGGER:
1956 err= wsrep_create_trigger_query(thd, buf, buf_len);
1957 break;
1958 case SQLCOM_CREATE_EVENT:
1959 err= wsrep_create_event_query(thd, buf, buf_len);
1960 break;
1961 case SQLCOM_ALTER_EVENT:
1962 err= wsrep_alter_event_query(thd, buf, buf_len);
1963 break;
1964 case SQLCOM_DROP_TABLE:
1965 err= wsrep_drop_table_query(thd, buf, buf_len);
1966 break;
1967 case SQLCOM_KILL:
1968 WSREP_DEBUG("KILL as TOI: %s", thd->query());
1969 err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1970 buf, buf_len);
1971 break;
1972 case SQLCOM_CREATE_ROLE:
1973 if (sp_process_definer(thd))
1974 {
1975 WSREP_WARN("Failed to set CREATE ROLE definer for TOI.");
1976 }
1977 /* fallthrough */
1978 default:
1979 err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), buf,
1980 buf_len);
1981 break;
1982 }
1983
1984 return err;
1985 }
1986
wsrep_TOI_begin_failed(THD * thd,const wsrep_buf_t *)1987 static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */)
1988 {
1989 if (wsrep_thd_trx_seqno(thd) > 0)
1990 {
1991 /* GTID was granted and TO acquired - need to log event and release TO */
1992 if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
1993 if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; }
1994 wsrep::client_state& cs(thd->wsrep_cs());
1995 int const ret= cs.leave_toi_local(wsrep::mutable_buffer());
1996 if (ret)
1997 {
1998 WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, "
1999 "schema: %s, SQL: %s, rcode: %d wsrep_error: %s",
2000 (long long)thd->real_id, thd->db.str,
2001 thd->query(), ret, wsrep::to_c_string(cs.current_error()));
2002 goto fail;
2003 }
2004 }
2005 return;
2006 fail:
2007 WSREP_ERROR("Failed to release TOI resources. Need to abort.");
2008 unireg_abort(1);
2009 }
2010
2011
2012 /*
2013 returns:
2014 0: statement was replicated as TOI
2015 1: TOI replication was skipped
2016 -1: TOI replication failed
2017 */
wsrep_TOI_begin(THD * thd,const char * db,const char * table,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep::key_array * fk_tables)2018 static int wsrep_TOI_begin(THD *thd, const char *db, const char *table,
2019 const TABLE_LIST* table_list,
2020 Alter_info* alter_info, wsrep::key_array* fk_tables)
2021 {
2022 DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI);
2023
2024 WSREP_DEBUG("TOI Begin for %s", wsrep_thd_query(thd));
2025 if (wsrep_can_run_in_toi(thd, db, table, table_list) == false)
2026 {
2027 WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd));
2028 return 1;
2029 }
2030
2031 uchar* buf= 0;
2032 size_t buf_len(0);
2033 int buf_err;
2034 int rc;
2035
2036 buf_err= wsrep_TOI_event_buf(thd, &buf, &buf_len);
2037 if (buf_err) {
2038 WSREP_ERROR("Failed to create TOI event buf: %d", buf_err);
2039 my_message(ER_UNKNOWN_ERROR,
2040 "WSREP replication failed to prepare TOI event buffer. "
2041 "Check your query.",
2042 MYF(0));
2043 return -1;
2044 }
2045 struct wsrep_buf buff= { buf, buf_len };
2046
2047 wsrep::key_array key_array=
2048 wsrep_prepare_keys_for_toi(db, table, table_list, alter_info, fk_tables);
2049
2050 if (thd->has_read_only_protection())
2051 {
2052 /* non replicated DDL, affecting temporary tables only */
2053 WSREP_DEBUG("TO isolation skipped, sql: %s."
2054 "Only temporary tables affected.",
2055 wsrep_thd_query(thd));
2056 if (buf) my_free(buf);
2057 return -1;
2058 }
2059
2060 thd_proc_info(thd, "acquiring total order isolation");
2061
2062 wsrep::client_state& cs(thd->wsrep_cs());
2063 int ret= cs.enter_toi_local(key_array,
2064 wsrep::const_buffer(buff.ptr, buff.len));
2065
2066 if (ret)
2067 {
2068 DBUG_ASSERT(cs.current_error());
2069 WSREP_DEBUG("to_execute_start() failed for %llu: %s, seqno: %lld",
2070 thd->thread_id, wsrep_thd_query(thd),
2071 (long long)wsrep_thd_trx_seqno(thd));
2072
2073 /* jump to error handler in mysql_execute_command() */
2074 switch (cs.current_error())
2075 {
2076 case wsrep::e_size_exceeded_error:
2077 WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2078 "Maximum size exceeded.",
2079 ret,
2080 (thd->db.str ? thd->db.str : "(null)"),
2081 wsrep_thd_query(thd));
2082 my_error(ER_ERROR_DURING_COMMIT, MYF(0), WSREP_SIZE_EXCEEDED);
2083 break;
2084 case wsrep::e_deadlock_error:
2085 WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2086 "Deadlock error.",
2087 ret,
2088 (thd->db.str ? thd->db.str : "(null)"),
2089 wsrep_thd_query(thd));
2090 my_error(ER_LOCK_DEADLOCK, MYF(0));
2091 break;
2092 case wsrep::e_timeout_error:
2093 WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2094 "Operation timed out.",
2095 ret,
2096 (thd->db.str ? thd->db.str : "(null)"),
2097 wsrep_thd_query(thd));
2098 my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0));
2099 break;
2100 default:
2101 WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
2102 "Check your wsrep connection state and retry the query.",
2103 ret,
2104 (thd->db.str ? thd->db.str : "(null)"),
2105 wsrep_thd_query(thd));
2106
2107 if (!thd->is_error())
2108 {
2109 my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
2110 "your wsrep connection state and retry the query.");
2111 }
2112 }
2113 rc= -1;
2114 }
2115 else {
2116 ++wsrep_to_isolation;
2117 rc= 0;
2118 }
2119
2120 if (buf) my_free(buf);
2121
2122 if (rc) wsrep_TOI_begin_failed(thd, NULL);
2123
2124 return rc;
2125 }
2126
wsrep_TOI_end(THD * thd)2127 static void wsrep_TOI_end(THD *thd) {
2128 wsrep_to_isolation--;
2129 wsrep::client_state& client_state(thd->wsrep_cs());
2130 DBUG_ASSERT(wsrep_thd_is_local_toi(thd));
2131
2132 wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
2133
2134 int ret= client_state.leave_toi_local(wsrep::mutable_buffer());
2135
2136 if (!ret)
2137 {
2138 WSREP_DEBUG("TO END: %lld: %s",
2139 client_state.toi_meta().seqno().get(), wsrep_thd_query(thd));
2140 }
2141 else
2142 {
2143 WSREP_WARN("TO isolation end failed for: %d, sql: %s",
2144 ret, wsrep_thd_query(thd));
2145 }
2146 }
2147
wsrep_RSU_begin(THD * thd,const char * db_,const char * table_)2148 static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_)
2149 {
2150 WSREP_DEBUG("RSU BEGIN: %lld, : %s", wsrep_thd_trx_seqno(thd),
2151 wsrep_thd_query(thd));
2152 if (thd->wsrep_cs().begin_rsu(5000))
2153 {
2154 WSREP_WARN("RSU begin failed");
2155 }
2156 else
2157 {
2158 thd->variables.wsrep_on= 0;
2159 }
2160 return 0;
2161 }
2162
wsrep_RSU_end(THD * thd)2163 static void wsrep_RSU_end(THD *thd)
2164 {
2165 WSREP_DEBUG("RSU END: %lld : %s", wsrep_thd_trx_seqno(thd),
2166 wsrep_thd_query(thd));
2167 if (thd->wsrep_cs().end_rsu())
2168 {
2169 WSREP_WARN("Failed to end RSU, server may need to be restarted");
2170 }
2171 thd->variables.wsrep_on= 1;
2172 }
2173
wsrep_to_isolation_begin(THD * thd,const char * db_,const char * table_,const TABLE_LIST * table_list,Alter_info * alter_info,wsrep::key_array * fk_tables)2174 int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
2175 const TABLE_LIST* table_list,
2176 Alter_info* alter_info, wsrep::key_array* fk_tables)
2177 {
2178 /*
2179 No isolation for applier or replaying threads.
2180 */
2181 if (!wsrep_thd_is_local(thd))
2182 return 0;
2183
2184 int ret= 0;
2185 mysql_mutex_lock(&thd->LOCK_thd_data);
2186
2187 if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
2188 {
2189 WSREP_INFO("thread: %lld schema: %s query: %s has been aborted due to multi-master conflict",
2190 (longlong) thd->thread_id, thd->get_db(), thd->query());
2191 mysql_mutex_unlock(&thd->LOCK_thd_data);
2192 return WSREP_TRX_FAIL;
2193 }
2194 mysql_mutex_unlock(&thd->LOCK_thd_data);
2195
2196 DBUG_ASSERT(wsrep_thd_is_local(thd));
2197 DBUG_ASSERT(thd->wsrep_trx().ws_meta().seqno().is_undefined());
2198
2199 if (Wsrep_server_state::instance().desynced_on_pause())
2200 {
2201 my_message(ER_UNKNOWN_COM_ERROR,
2202 "Aborting TOI: Replication paused on node for FTWRL/BACKUP STAGE.", MYF(0));
2203 return -1;
2204 }
2205
2206 if (wsrep_debug && thd->mdl_context.has_locks())
2207 {
2208 WSREP_DEBUG("thread holds MDL locks at TI begin: %s %llu",
2209 wsrep_thd_query(thd), thd->thread_id);
2210 }
2211
2212 /*
2213 It makes sense to set auto_increment_* to defaults in TOI operations.
2214 Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
2215 TOI statement and auto inc variables for wsrep replication is constructed
2216 there. Variables are reset back in THD::reset_for_next_command() before
2217 processing of next command.
2218 */
2219 if (wsrep_auto_increment_control)
2220 {
2221 thd->variables.auto_increment_offset= 1;
2222 thd->variables.auto_increment_increment= 1;
2223 }
2224
2225 if (thd->variables.wsrep_on && wsrep_thd_is_local(thd))
2226 {
2227 switch (thd->variables.wsrep_OSU_method) {
2228 case WSREP_OSU_TOI:
2229 ret= wsrep_TOI_begin(thd, db_, table_, table_list, alter_info, fk_tables);
2230 break;
2231 case WSREP_OSU_RSU:
2232 ret= wsrep_RSU_begin(thd, db_, table_);
2233 break;
2234 default:
2235 WSREP_ERROR("Unsupported OSU method: %lu",
2236 thd->variables.wsrep_OSU_method);
2237 ret= -1;
2238 break;
2239 }
2240
2241 switch (ret) {
2242 case 0: /* wsrep_TOI_begin sould set toi mode */
2243 if (thd->variables.wsrep_OSU_method == WSREP_OSU_TOI)
2244 {
2245 /*
2246 TOI operations ignore the provided lock_wait_timeout once replicated,
2247 and restore it after operation is done.
2248 */
2249 thd->variables.saved_lock_wait_timeout= thd->variables.lock_wait_timeout;
2250 thd->variables.lock_wait_timeout= LONG_TIMEOUT;
2251 }
2252 break;
2253 case 1:
2254 /* TOI replication skipped, treat as success */
2255 ret= 0;
2256 break;
2257 case -1:
2258 /* TOI replication failed, treat as error */
2259 break;
2260 }
2261 }
2262
2263 return ret;
2264 }
2265
wsrep_to_isolation_end(THD * thd)2266 void wsrep_to_isolation_end(THD *thd)
2267 {
2268 DBUG_ASSERT(wsrep_thd_is_local_toi(thd) ||
2269 wsrep_thd_is_in_rsu(thd));
2270
2271 if (wsrep_thd_is_local_toi(thd))
2272 {
2273 thd->variables.lock_wait_timeout= thd->variables.saved_lock_wait_timeout;
2274 DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI);
2275 wsrep_TOI_end(thd);
2276 }
2277 else if (wsrep_thd_is_in_rsu(thd))
2278 {
2279 DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_RSU);
2280 wsrep_RSU_end(thd);
2281 }
2282 else
2283 {
2284 DBUG_ASSERT(0);
2285 }
2286 if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
2287 }
2288
2289 #define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \
2290 WSREP_##severity( \
2291 "%s\n" \
2292 "schema: %.*s\n" \
2293 "request: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)\n" \
2294 "granted: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)", \
2295 msg, schema_len, schema, \
2296 req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
2297 wsrep_thd_client_mode_str(req), wsrep_thd_client_state_str(req), wsrep_thd_transaction_state_str(req), \
2298 req->get_command(), req->lex->sql_command, req->query(), \
2299 gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
2300 wsrep_thd_client_mode_str(gra), wsrep_thd_client_state_str(gra), wsrep_thd_transaction_state_str(gra), \
2301 gra->get_command(), gra->lex->sql_command, gra->query());
2302
2303 /**
2304 Check if request for the metadata lock should be granted to the requester.
2305
2306 @param requestor_ctx The MDL context of the requestor
2307 @param ticket MDL ticket for the requested lock
2308
2309 @retval TRUE Lock request can be granted
2310 @retval FALSE Lock request cannot be granted
2311 */
2312
wsrep_handle_mdl_conflict(MDL_context * requestor_ctx,MDL_ticket * ticket,const MDL_key * key)2313 void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
2314 MDL_ticket *ticket,
2315 const MDL_key *key)
2316 {
2317 /* Fallback to the non-wsrep behaviour */
2318 if (!WSREP_ON) return;
2319
2320 THD *request_thd= requestor_ctx->get_thd();
2321 THD *granted_thd= ticket->get_ctx()->get_thd();
2322
2323 const char* schema= key->db_name();
2324 int schema_len= key->db_name_length();
2325
2326 mysql_mutex_lock(&request_thd->LOCK_thd_data);
2327 if (wsrep_thd_is_toi(request_thd) ||
2328 wsrep_thd_is_applying(request_thd)) {
2329
2330 mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2331 WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
2332 request_thd, granted_thd);
2333 ticket->wsrep_report(wsrep_debug);
2334
2335 /* Here we will call wsrep_abort_transaction so we should hold
2336 THD::LOCK_thd_data to protect victim from concurrent usage
2337 and THD::LOCK_thd_kill to protect from disconnect or delete. */
2338 wsrep_thd_LOCK(granted_thd);
2339
2340 if (wsrep_thd_is_toi(granted_thd) ||
2341 wsrep_thd_is_applying(granted_thd))
2342 {
2343 if (wsrep_thd_is_aborting(granted_thd))
2344 {
2345 WSREP_DEBUG("BF thread waiting for SR in aborting state");
2346 ticket->wsrep_report(wsrep_debug);
2347 wsrep_thd_UNLOCK(granted_thd);
2348 }
2349 else if (wsrep_thd_is_SR(granted_thd) && !wsrep_thd_is_SR(request_thd))
2350 {
2351 WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR",
2352 schema, schema_len, request_thd, granted_thd);
2353 wsrep_abort_thd(request_thd, granted_thd, 1);
2354 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2355 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2356 }
2357 else
2358 {
2359 WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
2360 request_thd, granted_thd);
2361 ticket->wsrep_report(true);
2362 wsrep_thd_UNLOCK(granted_thd);
2363 unireg_abort(1);
2364 }
2365 }
2366 else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
2367 granted_thd->mdl_context.has_explicit_locks())
2368 {
2369 WSREP_DEBUG("BF thread waiting for FLUSH");
2370 ticket->wsrep_report(wsrep_debug);
2371 wsrep_thd_UNLOCK(granted_thd);
2372 }
2373 else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
2374 {
2375 WSREP_DEBUG("DROP caused BF abort, conf %s",
2376 wsrep_thd_transaction_state_str(granted_thd));
2377 ticket->wsrep_report(wsrep_debug);
2378 wsrep_abort_thd(request_thd, granted_thd, 1);
2379 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2380 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2381 }
2382 else
2383 {
2384 WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
2385 request_thd, granted_thd);
2386 ticket->wsrep_report(wsrep_debug);
2387 if (granted_thd->wsrep_trx().active())
2388 {
2389 wsrep_abort_thd(request_thd, granted_thd, true);
2390 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2391 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2392 }
2393 else
2394 {
2395 /*
2396 Granted_thd is likely executing with wsrep_on=0. If the requesting
2397 thd is BF, BF abort and wait.
2398 */
2399 if (wsrep_thd_is_BF(request_thd, FALSE))
2400 {
2401 ha_abort_transaction(request_thd, granted_thd, TRUE);
2402 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_data);
2403 mysql_mutex_assert_not_owner(&granted_thd->LOCK_thd_kill);
2404 }
2405 else
2406 {
2407 WSREP_MDL_LOG(INFO, "MDL unknown BF-BF conflict", schema, schema_len,
2408 request_thd, granted_thd);
2409 ticket->wsrep_report(true);
2410 unireg_abort(1);
2411 }
2412 }
2413 }
2414 }
2415 else
2416 {
2417 mysql_mutex_unlock(&request_thd->LOCK_thd_data);
2418 }
2419 }
2420
2421 /**/
abort_replicated(THD * thd)2422 static bool abort_replicated(THD *thd)
2423 {
2424 bool ret_code= false;
2425 wsrep_thd_LOCK(thd);
2426 if (thd->wsrep_trx().state() == wsrep::transaction::s_committing)
2427 {
2428 WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
2429
2430 (void)wsrep_abort_thd(thd, thd, TRUE);
2431 ret_code= true;
2432 }
2433 else
2434 wsrep_thd_UNLOCK(thd);
2435
2436 return ret_code;
2437 }
2438
2439 /**/
is_client_connection(THD * thd)2440 static inline bool is_client_connection(THD *thd)
2441 {
2442 return (thd->wsrep_client_thread && thd->variables.wsrep_on);
2443 }
2444
is_replaying_connection(THD * thd)2445 static inline bool is_replaying_connection(THD *thd)
2446 {
2447 bool ret;
2448
2449 mysql_mutex_lock(&thd->LOCK_thd_data);
2450 ret= (thd->wsrep_trx().state() == wsrep::transaction::s_replaying) ? true : false;
2451 mysql_mutex_unlock(&thd->LOCK_thd_data);
2452
2453 return ret;
2454 }
2455
is_committing_connection(THD * thd)2456 static inline bool is_committing_connection(THD *thd)
2457 {
2458 bool ret;
2459
2460 mysql_mutex_lock(&thd->LOCK_thd_data);
2461 ret= (thd->wsrep_trx().state() == wsrep::transaction::s_committing) ? true : false;
2462 mysql_mutex_unlock(&thd->LOCK_thd_data);
2463
2464 return ret;
2465 }
2466
have_client_connections(THD * thd,void *)2467 static my_bool have_client_connections(THD *thd, void*)
2468 {
2469 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2470 (longlong) thd->thread_id));
2471 if (is_client_connection(thd) && thd->killed == KILL_CONNECTION)
2472 {
2473 WSREP_DEBUG("Informing thread %lld that it's time to die",
2474 thd->thread_id);
2475 (void)abort_replicated(thd);
2476 return true;
2477 }
2478 return 0;
2479 }
2480
wsrep_close_thread(THD * thd)2481 static void wsrep_close_thread(THD *thd)
2482 {
2483 thd->set_killed(KILL_CONNECTION);
2484 MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
2485 mysql_mutex_lock(&thd->LOCK_thd_kill);
2486 thd->abort_current_cond_wait(true);
2487 mysql_mutex_unlock(&thd->LOCK_thd_kill);
2488 }
2489
have_committing_connections(THD * thd,void *)2490 static my_bool have_committing_connections(THD *thd, void *)
2491 {
2492 return is_client_connection(thd) && is_committing_connection(thd) ? 1 : 0;
2493 }
2494
wsrep_wait_committing_connections_close(int wait_time)2495 int wsrep_wait_committing_connections_close(int wait_time)
2496 {
2497 int sleep_time= 100;
2498
2499 WSREP_DEBUG("wait for committing transaction to close: %d sleep: %d", wait_time, sleep_time);
2500 while (server_threads.iterate(have_committing_connections) && wait_time > 0)
2501 {
2502 WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
2503 my_sleep(sleep_time);
2504 wait_time -= sleep_time;
2505 }
2506 return server_threads.iterate(have_committing_connections);
2507 }
2508
kill_all_threads(THD * thd,THD * caller_thd)2509 static my_bool kill_all_threads(THD *thd, THD *caller_thd)
2510 {
2511 DBUG_PRINT("quit", ("Informing thread %lld that it's time to die",
2512 (longlong) thd->thread_id));
2513 WSREP_DEBUG("Informing thread %lld that it's time to die",
2514 thd->thread_id);
2515 /* We skip slave threads & scheduler on this first loop through. */
2516 if (is_client_connection(thd) && thd != caller_thd)
2517 {
2518 if (is_replaying_connection(thd))
2519 thd->set_killed(KILL_CONNECTION);
2520 else if (!abort_replicated(thd))
2521 {
2522 /* replicated transactions must be skipped */
2523 WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id);
2524 /* instead of wsrep_close_thread() we do now soft kill by THD::awake */
2525 thd->awake(KILL_CONNECTION);
2526 }
2527 }
2528 return 0;
2529 }
2530
kill_remaining_threads(THD * thd,THD * caller_thd)2531 static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)
2532 {
2533 #ifndef __bsdi__ // Bug in BSDI kernel
2534 if (is_client_connection(thd) &&
2535 !abort_replicated(thd) &&
2536 !is_replaying_connection(thd) &&
2537 thd_is_connection_alive(thd) &&
2538 thd != caller_thd)
2539 {
2540
2541 WSREP_INFO("killing local connection: %lld", (longlong) thd->thread_id);
2542 close_connection(thd);
2543 }
2544 #endif
2545 return 0;
2546 }
2547
wsrep_close_client_connections(my_bool wait_to_end,THD * except_caller_thd)2548 void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
2549 {
2550 /* Clear thread cache */
2551 kill_cached_threads++;
2552 flush_thread_cache();
2553
2554 /*
2555 First signal all threads that it's time to die
2556 */
2557 server_threads.iterate(kill_all_threads, except_caller_thd);
2558
2559 /*
2560 Force remaining threads to die by closing the connection to the client
2561 */
2562 server_threads.iterate(kill_remaining_threads, except_caller_thd);
2563
2564 DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)", THD_count::value()));
2565 WSREP_DEBUG("waiting for client connections to close: %u", THD_count::value());
2566
2567 while (wait_to_end && server_threads.iterate(have_client_connections))
2568 {
2569 sleep(1);
2570 DBUG_PRINT("quit",("One thread died (count=%u)", THD_count::value()));
2571 }
2572
2573 /* All client connection threads have now been aborted */
2574 }
2575
2576
wsrep_close_applier(THD * thd)2577 void wsrep_close_applier(THD *thd)
2578 {
2579 WSREP_DEBUG("closing applier %lld", (longlong) thd->thread_id);
2580 wsrep_close_thread(thd);
2581 }
2582
wsrep_close_threads_callback(THD * thd,THD * caller_thd)2583 static my_bool wsrep_close_threads_callback(THD *thd, THD *caller_thd)
2584 {
2585 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2586 (longlong) thd->thread_id));
2587 /* We skip slave threads & scheduler on this first loop through. */
2588 if (thd->wsrep_applier && thd != caller_thd)
2589 {
2590 WSREP_DEBUG("closing wsrep thread %lld", (longlong) thd->thread_id);
2591 wsrep_close_thread(thd);
2592 }
2593 return 0;
2594 }
2595
wsrep_close_threads(THD * thd)2596 void wsrep_close_threads(THD *thd)
2597 {
2598 server_threads.iterate(wsrep_close_threads_callback, thd);
2599 }
2600
wsrep_wait_appliers_close(THD * thd)2601 void wsrep_wait_appliers_close(THD *thd)
2602 {
2603 /* Wait for wsrep appliers to gracefully exit */
2604 mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2605 while (wsrep_running_threads > 2)
2606 /*
2607 2 is for rollbacker thread which needs to be killed explicitly.
2608 This gotta be fixed in a more elegant manner if we gonna have arbitrary
2609 number of non-applier wsrep threads.
2610 */
2611 {
2612 mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
2613 }
2614 mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2615 DBUG_PRINT("quit",("applier threads have died (count=%u)",
2616 uint32_t(wsrep_running_threads)));
2617
2618 /* Now kill remaining wsrep threads: rollbacker */
2619 wsrep_close_threads (thd);
2620 /* and wait for them to die */
2621 mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2622 while (wsrep_running_threads > 0)
2623 {
2624 mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
2625 }
2626 mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2627 DBUG_PRINT("quit",("all wsrep system threads have died"));
2628
2629 /* All wsrep applier threads have now been aborted. However, if this thread
2630 is also applier, we are still running...
2631 */
2632 }
2633
wsrep_must_ignore_error(THD * thd)2634 int wsrep_must_ignore_error(THD* thd)
2635 {
2636 const int error= thd->get_stmt_da()->sql_errno();
2637 const uint flags= sql_command_flags[thd->lex->sql_command];
2638
2639 DBUG_ASSERT(error);
2640 DBUG_ASSERT(wsrep_thd_is_toi(thd) || wsrep_thd_is_applying(thd));
2641
2642 if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL))
2643 goto ignore_error;
2644
2645 if ((flags & CF_WSREP_MAY_IGNORE_ERRORS) &&
2646 (wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DDL))
2647 {
2648 switch (error)
2649 {
2650 case ER_DB_DROP_EXISTS:
2651 case ER_BAD_TABLE_ERROR:
2652 case ER_CANT_DROP_FIELD_OR_KEY:
2653 goto ignore_error;
2654 }
2655 }
2656
2657 return 0;
2658
2659 ignore_error:
2660 WSREP_WARN("Ignoring error '%s' on query. "
2661 "Default database: '%s'. Query: '%s', Error_code: %d",
2662 thd->get_stmt_da()->message(),
2663 print_slave_db_safe(thd->db.str),
2664 thd->query(),
2665 error);
2666 return 1;
2667 }
2668
wsrep_ignored_error_code(Log_event * ev,int error)2669 int wsrep_ignored_error_code(Log_event* ev, int error)
2670 {
2671 const THD* thd= ev->thd;
2672
2673 DBUG_ASSERT(error);
2674 DBUG_ASSERT(wsrep_thd_is_applying(thd) &&
2675 !wsrep_thd_is_local_toi(thd));
2676
2677 if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DML))
2678 {
2679 const int ev_type= ev->get_type_code();
2680 if ((ev_type == DELETE_ROWS_EVENT || ev_type == DELETE_ROWS_EVENT_V1)
2681 && error == ER_KEY_NOT_FOUND)
2682 goto ignore_error;
2683 }
2684
2685 return 0;
2686
2687 ignore_error:
2688 WSREP_WARN("Ignoring error '%s' on %s event. Error_code: %d",
2689 thd->get_stmt_da()->message(),
2690 ev->get_type_str(),
2691 error);
2692 return 1;
2693 }
2694
wsrep_provider_is_SR_capable()2695 bool wsrep_provider_is_SR_capable()
2696 {
2697 return Wsrep_server_state::has_capability(wsrep::provider::capability::streaming);
2698 }
2699
wsrep_thd_retry_counter(const THD * thd)2700 int wsrep_thd_retry_counter(const THD *thd)
2701 {
2702 return thd->wsrep_retry_counter;
2703 }
2704
wsrep_thd_ignore_table(THD * thd)2705 extern bool wsrep_thd_ignore_table(THD *thd)
2706 {
2707 return thd->wsrep_ignore_table;
2708 }
2709
wsrep_is_show_query(enum enum_sql_command command)2710 bool wsrep_is_show_query(enum enum_sql_command command)
2711 {
2712 DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
2713 return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
2714 }
wsrep_create_like_table(THD * thd,TABLE_LIST * table,TABLE_LIST * src_table,HA_CREATE_INFO * create_info)2715 bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
2716 TABLE_LIST* src_table,
2717 HA_CREATE_INFO *create_info)
2718 {
2719 if (create_info->tmp_table())
2720 {
2721 /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */
2722 WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s",
2723 thd->query());
2724 }
2725 else if (!(thd->find_temporary_table(src_table)))
2726 {
2727 /* this is straight CREATE TABLE LIKE... with no tmp tables */
2728 WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2729 }
2730 else
2731 {
2732 /* Non-MERGE tables ignore this call. */
2733 if (src_table->table->file->extra(HA_EXTRA_ADD_CHILDREN_LIST))
2734 return (true);
2735
2736 char buf[2048];
2737 String query(buf, sizeof(buf), system_charset_info);
2738 query.length(0); // Have to zero it since constructor doesn't
2739
2740 int result __attribute__((unused))=
2741 show_create_table(thd, src_table, &query, NULL, WITH_DB_NAME);
2742 WSREP_DEBUG("TMP TABLE: %s ret_code %d", query.ptr(), result);
2743
2744 thd->wsrep_TOI_pre_query= query.ptr();
2745 thd->wsrep_TOI_pre_query_len= query.length();
2746
2747 WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2748
2749 thd->wsrep_TOI_pre_query= NULL;
2750 thd->wsrep_TOI_pre_query_len= 0;
2751
2752 /* Non-MERGE tables ignore this call. */
2753 src_table->table->file->extra(HA_EXTRA_DETACH_CHILDREN);
2754 }
2755
2756 return(false);
2757 #ifdef WITH_WSREP
2758 wsrep_error_label:
2759 thd->wsrep_TOI_pre_query= NULL;
2760 return (true);
2761 #endif
2762 }
2763
wsrep_create_trigger_query(THD * thd,uchar ** buf,size_t * buf_len)2764 int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
2765 {
2766 LEX *lex= thd->lex;
2767 String stmt_query;
2768
2769 LEX_CSTRING definer_user;
2770 LEX_CSTRING definer_host;
2771
2772 if (!lex->definer)
2773 {
2774 if (!thd->slave_thread)
2775 {
2776 if (!(lex->definer= create_default_definer(thd, false)))
2777 return 1;
2778 }
2779 }
2780
2781 if (lex->definer)
2782 {
2783 /* SUID trigger. */
2784 LEX_USER *d= get_current_user(thd, lex->definer);
2785
2786 if (!d)
2787 return 1;
2788
2789 definer_user= d->user;
2790 definer_host= d->host;
2791 }
2792 else
2793 {
2794 /* non-SUID trigger. */
2795
2796 definer_user.str= 0;
2797 definer_user.length= 0;
2798
2799 definer_host.str= 0;
2800 definer_host.length= 0;
2801 }
2802
2803 const LEX_CSTRING command[2]=
2804 {{ C_STRING_WITH_LEN("CREATE ") },
2805 { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
2806
2807 if (thd->lex->create_info.or_replace())
2808 stmt_query.append(command[1]);
2809 else
2810 stmt_query.append(command[0]);
2811
2812 append_definer(thd, &stmt_query, &definer_user, &definer_host);
2813
2814 LEX_CSTRING stmt_definition;
2815 stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
2816 stmt_definition.length= thd->lex->stmt_definition_end
2817 - thd->lex->stmt_definition_begin;
2818 trim_whitespace(thd->charset(), &stmt_definition);
2819
2820 stmt_query.append(stmt_definition.str, stmt_definition.length);
2821
2822 return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
2823 buf, buf_len);
2824 }
2825
start_wsrep_THD(void * arg)2826 void* start_wsrep_THD(void *arg)
2827 {
2828 THD *thd;
2829
2830 Wsrep_thd_args* thd_args= (Wsrep_thd_args*) arg;
2831
2832 if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
2833 {
2834 goto error;
2835 }
2836
2837 statistic_increment(thread_created, &LOCK_status);
2838
2839 if (wsrep_gtid_mode)
2840 {
2841 /* Adjust domain_id. */
2842 thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
2843 }
2844
2845 thd->real_id=pthread_self(); // Keep purify happy
2846
2847 my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
2848
2849 DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
2850 thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
2851
2852 server_threads.insert(thd);
2853
2854 /* from bootstrap()... */
2855 thd->bootstrap=1;
2856 thd->max_client_packet_length= thd->net.max_packet;
2857 thd->security_ctx->master_access= ~(ulong)0;
2858
2859 /* from handle_one_connection... */
2860 pthread_detach_this_thread();
2861
2862 mysql_thread_set_psi_id(thd->thread_id);
2863 thd->thr_create_utime= microsecond_interval_timer();
2864 if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
2865 {
2866 close_connection(thd, ER_OUT_OF_RESOURCES);
2867 statistic_increment(aborted_connects,&LOCK_status);
2868 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2869 goto error;
2870 }
2871
2872 // </5.1.17>
2873 /*
2874 handle_one_connection() is normally the only way a thread would
2875 start and would always be on the very high end of the stack ,
2876 therefore, the thread stack always starts at the address of the
2877 first local variable of handle_one_connection, which is thd. We
2878 need to know the start of the stack so that we could check for
2879 stack overruns.
2880 */
2881 DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld",
2882 (long long)thd->thread_id));
2883 /* now that we've called my_thread_init(), it is safe to call DBUG_* */
2884
2885 thd->thread_stack= (char*) &thd;
2886 wsrep_assign_from_threadvars(thd);
2887 if (wsrep_store_threadvars(thd))
2888 {
2889 close_connection(thd, ER_OUT_OF_RESOURCES);
2890 statistic_increment(aborted_connects,&LOCK_status);
2891 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
2892 delete thd;
2893 delete thd_args;
2894 goto error;
2895 }
2896
2897 thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
2898 thd->security_ctx->skip_grants();
2899
2900 /* handle_one_connection() again... */
2901 thd->proc_info= 0;
2902 thd->set_command(COM_SLEEP);
2903 thd->init_for_queries();
2904 mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2905
2906 wsrep_running_threads++;
2907
2908 switch (thd_args->thread_type()) {
2909 case WSREP_APPLIER_THREAD:
2910 wsrep_running_applier_threads++;
2911 break;
2912 case WSREP_ROLLBACKER_THREAD:
2913 wsrep_running_rollbacker_threads++;
2914 break;
2915 default:
2916 WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type());
2917 break;
2918 }
2919
2920 mysql_cond_broadcast(&COND_wsrep_slave_threads);
2921 mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2922
2923 WSREP_DEBUG("wsrep system thread %llu, %p starting",
2924 thd->thread_id, thd);
2925 thd_args->fun()(thd, static_cast<void *>(thd_args));
2926
2927 WSREP_DEBUG("wsrep system thread: %llu, %p closing",
2928 thd->thread_id, thd);
2929
2930 /* Wsrep may reset globals during thread context switches, store globals
2931 before cleanup. */
2932 wsrep_store_threadvars(thd);
2933
2934 close_connection(thd, 0);
2935
2936 mysql_mutex_lock(&LOCK_wsrep_slave_threads);
2937 DBUG_ASSERT(wsrep_running_threads > 0);
2938 wsrep_running_threads--;
2939
2940 switch (thd_args->thread_type()) {
2941 case WSREP_APPLIER_THREAD:
2942 DBUG_ASSERT(wsrep_running_applier_threads > 0);
2943 wsrep_running_applier_threads--;
2944 break;
2945 case WSREP_ROLLBACKER_THREAD:
2946 DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
2947 wsrep_running_rollbacker_threads--;
2948 break;
2949 default:
2950 WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type());
2951 break;
2952 }
2953
2954 delete thd_args;
2955 WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
2956 mysql_cond_broadcast(&COND_wsrep_slave_threads);
2957 mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
2958 /*
2959 Note: We can't call THD destructor without crashing
2960 if plugins have not been initialized. However, in most of the
2961 cases this means that pre SE initialization SST failed and
2962 we are going to exit anyway.
2963 */
2964 if (plugins_are_initialized)
2965 {
2966 net_end(&thd->net);
2967 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
2968 }
2969 else
2970 {
2971 /*
2972 TODO: lightweight cleanup to get rid of:
2973 'Error in my_thread_global_end(): 2 threads didn't exit'
2974 at server shutdown
2975 */
2976 }
2977
2978 server_threads.erase(thd);
2979 delete thd;
2980 my_thread_end();
2981 return(NULL);
2982
2983 error:
2984 WSREP_ERROR("Failed to create/initialize system thread");
2985
2986 /* Abort if its the first applier/rollbacker thread. */
2987 if (!mysqld_server_initialized)
2988 unireg_abort(1);
2989 else
2990 return NULL;
2991 }
2992
wsrep_fragment_unit(ulong unit)2993 enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit)
2994 {
2995 switch (unit)
2996 {
2997 case WSREP_FRAG_BYTES: return wsrep::streaming_context::bytes;
2998 case WSREP_FRAG_ROWS: return wsrep::streaming_context::row;
2999 case WSREP_FRAG_STATEMENTS: return wsrep::streaming_context::statement;
3000 default:
3001 DBUG_ASSERT(0);
3002 return wsrep::streaming_context::bytes;
3003 }
3004 }
3005
3006 /***** callbacks for wsrep service ************/
3007
get_wsrep_recovery()3008 my_bool get_wsrep_recovery()
3009 {
3010 return wsrep_recovery;
3011 }
3012
wsrep_consistency_check(THD * thd)3013 bool wsrep_consistency_check(THD *thd)
3014 {
3015 return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
3016 }
3017