1 /* Copyright (C) 2013-2021 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15
16 #include "mariadb.h"
17 #include "wsrep_thd.h"
18 #include "transaction.h"
19 #include "rpl_rli.h"
20 #include "log_event.h"
21 #include "sql_parse.h"
22 //#include "global_threads.h" // LOCK_thread_count, etc.
23 #include "sql_base.h" // close_thread_tables()
24 #include "mysqld.h" // start_wsrep_THD();
25 #include "debug_sync.h"
26
27 #include "slave.h" // opt_log_slave_updates
28 #include "rpl_filter.h"
29 #include "rpl_rli.h"
30 #include "rpl_mi.h"
31
32 #if (__LP64__)
33 static volatile int64 wsrep_bf_aborts_counter(0);
34 #define WSREP_ATOMIC_LOAD_LONG my_atomic_load64
35 #define WSREP_ATOMIC_ADD_LONG my_atomic_add64
36 #else
37 static volatile int32 wsrep_bf_aborts_counter(0);
38 #define WSREP_ATOMIC_LOAD_LONG my_atomic_load32
39 #define WSREP_ATOMIC_ADD_LONG my_atomic_add32
40 #endif
41
wsrep_show_bf_aborts(THD * thd,SHOW_VAR * var,char * buff,enum enum_var_type scope)42 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
43 enum enum_var_type scope)
44 {
45 wsrep_local_bf_aborts = WSREP_ATOMIC_LOAD_LONG(&wsrep_bf_aborts_counter);
46 var->type = SHOW_LONGLONG;
47 var->value = (char*)&wsrep_local_bf_aborts;
48 return 0;
49 }
50
51 /* must have (&thd->LOCK_thd_data) */
wsrep_client_rollback(THD * thd)52 void wsrep_client_rollback(THD *thd)
53 {
54 WSREP_DEBUG("client rollback due to BF abort for (%lld), query: %s",
55 (longlong) thd->thread_id, thd->query());
56
57 WSREP_ATOMIC_ADD_LONG(&wsrep_bf_aborts_counter, 1);
58
59 thd->wsrep_conflict_state= ABORTING;
60 mysql_mutex_unlock(&thd->LOCK_thd_data);
61 trans_rollback(thd);
62
63 if (thd->locked_tables_mode && thd->lock)
64 {
65 WSREP_DEBUG("unlocking tables for BF abort (%lld)",
66 (longlong) thd->thread_id);
67 thd->locked_tables_list.unlock_locked_tables(thd);
68 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
69 }
70
71 if (thd->global_read_lock.is_acquired())
72 {
73 WSREP_DEBUG("unlocking GRL for BF abort (%lld)",
74 (longlong) thd->thread_id);
75 thd->global_read_lock.unlock_global_read_lock(thd);
76 }
77
78 /* Release transactional metadata locks. */
79 thd->release_transactional_locks();
80
81 /* release explicit MDL locks */
82 thd->mdl_context.release_explicit_locks();
83
84 if (thd->get_binlog_table_maps())
85 {
86 WSREP_DEBUG("clearing binlog table map for BF abort (%lld)",
87 (longlong) thd->thread_id);
88 thd->clear_binlog_table_maps();
89 }
90 mysql_mutex_lock(&thd->LOCK_thd_data);
91 thd->wsrep_conflict_state= ABORTED;
92 }
93
94 #define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
95 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
96
wsrep_relay_group_init(THD * thd,const char * log_fname)97 static rpl_group_info* wsrep_relay_group_init(THD *thd, const char* log_fname)
98 {
99 Relay_log_info* rli= new Relay_log_info(false);
100
101 WSREP_DEBUG("wsrep_relay_group_init %s", log_fname);
102
103 if (!rli->relay_log.description_event_for_exec)
104 {
105 rli->relay_log.description_event_for_exec=
106 new Format_description_log_event(4);
107 }
108
109 static LEX_CSTRING connection_name= { STRING_WITH_LEN("wsrep") };
110
111 /*
112 Master_info's constructor initializes rpl_filter by either an already
113 constructed Rpl_filter object from global 'rpl_filters' list if the
114 specified connection name is same, or it constructs a new Rpl_filter
115 object and adds it to rpl_filters. This object is later destructed by
116 Mater_info's destructor by looking it up based on connection name in
117 rpl_filters list.
118
119 However, since all Master_info objects created here would share same
120 connection name ("wsrep"), destruction of any of the existing Master_info
121 objects (in wsrep_return_from_bf_mode()) would free rpl_filter referenced
122 by any/all existing Master_info objects.
123
124 In order to avoid that, we have added a check in Master_info's destructor
125 to not free the "wsrep" rpl_filter. It will eventually be freed by
126 free_all_rpl_filters() when server terminates.
127 */
128 rli->mi = new Master_info(&connection_name, false);
129
130 struct rpl_group_info *rgi= new rpl_group_info(rli);
131 rgi->thd= rli->sql_driver_thd= thd;
132
133 if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
134 {
135 rgi->deferred_events= new Deferred_log_events(rli);
136 }
137
138 return rgi;
139 }
140
wsrep_prepare_bf_thd(THD * thd,struct wsrep_thd_shadow * shadow)141 static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
142 {
143 shadow->options = thd->variables.option_bits;
144 shadow->server_status = thd->server_status;
145 shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
146 shadow->vio = thd->net.vio;
147
148 // Disable general logging on applier threads
149 thd->variables.option_bits |= OPTION_LOG_OFF;
150
151 /* enable binlogging regardless of log_slave_updates setting
152 this is for ensuring that both local and applier transaction go through
153 same commit ordering algorithm in group commit control
154 */
155 thd->variables.option_bits|= OPTION_BIN_LOG;
156
157 if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init(thd, "wsrep_relay");
158
159 /* thd->system_thread_info.rpl_sql_info isn't initialized. */
160 if (!thd->slave_thread)
161 thd->system_thread_info.rpl_sql_info=
162 new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
163
164 thd->wsrep_exec_mode= REPL_RECV;
165 thd->net.vio= 0;
166 thd->clear_error();
167
168 shadow->tx_isolation = thd->variables.tx_isolation;
169 thd->variables.tx_isolation = ISO_READ_COMMITTED;
170 thd->tx_isolation = ISO_READ_COMMITTED;
171
172 shadow->db = thd->db.str;
173 shadow->db_length = thd->db.length;
174 shadow->user_time = thd->user_time;
175 shadow->row_count_func= thd->get_row_count_func();
176 thd->reset_db(&null_clex_str);
177 }
178
wsrep_return_from_bf_mode(THD * thd,struct wsrep_thd_shadow * shadow)179 static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
180 {
181 LEX_CSTRING db= {shadow->db, shadow->db_length };
182 thd->variables.option_bits = shadow->options;
183 thd->server_status = shadow->server_status;
184 thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
185 thd->net.vio = shadow->vio;
186 thd->variables.tx_isolation = shadow->tx_isolation;
187 thd->user_time = shadow->user_time;
188 thd->reset_db(&db);
189
190 if (!thd->slave_thread)
191 delete thd->system_thread_info.rpl_sql_info;
192 delete thd->wsrep_rgi->rli->mi;
193 delete thd->wsrep_rgi->rli;
194
195 thd->wsrep_rgi->cleanup_after_session();
196 delete thd->wsrep_rgi;
197 thd->wsrep_rgi = NULL;
198 thd->set_row_count_func(shadow->row_count_func);
199 }
200
wsrep_replay_sp_transaction(THD * thd)201 void wsrep_replay_sp_transaction(THD* thd)
202 {
203 DBUG_ENTER("wsrep_replay_sp_transaction");
204 mysql_mutex_assert_owner(&thd->LOCK_thd_data);
205 DBUG_ASSERT(thd->wsrep_conflict_state == MUST_REPLAY);
206 DBUG_ASSERT(wsrep_thd_trx_seqno(thd) > 0);
207
208 WSREP_DEBUG("replaying SP transaction %llu", thd->thread_id);
209 close_thread_tables(thd);
210 if (thd->locked_tables_mode && thd->lock)
211 {
212 WSREP_DEBUG("releasing table lock for replaying (%u)",
213 thd->thread_id);
214 thd->locked_tables_list.unlock_locked_tables(thd);
215 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
216 }
217 thd->release_transactional_locks();
218
219 mysql_mutex_unlock(&thd->LOCK_thd_data);
220 THD *replay_thd= new THD(true);
221 replay_thd->thread_stack= thd->thread_stack;
222
223 struct wsrep_thd_shadow shadow;
224 wsrep_prepare_bf_thd(replay_thd, &shadow);
225 WSREP_DEBUG("replaying set for %p rgi %p", replay_thd, replay_thd->wsrep_rgi); replay_thd->wsrep_trx_meta= thd->wsrep_trx_meta;
226 replay_thd->wsrep_ws_handle= thd->wsrep_ws_handle;
227 replay_thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
228 replay_thd->wsrep_conflict_state= REPLAYING;
229
230 replay_thd->variables.option_bits|= OPTION_BEGIN;
231 replay_thd->server_status|= SERVER_STATUS_IN_TRANS;
232
233 thd->reset_globals();
234 replay_thd->store_globals();
235 wsrep_status_t rcode= wsrep->replay_trx(wsrep,
236 &replay_thd->wsrep_ws_handle,
237 (void*) replay_thd);
238
239 wsrep_return_from_bf_mode(replay_thd, &shadow);
240 replay_thd->reset_globals();
241 delete replay_thd;
242
243 mysql_mutex_lock(&thd->LOCK_thd_data);
244
245 thd->store_globals();
246
247 switch (rcode)
248 {
249 case WSREP_OK:
250 {
251 thd->wsrep_conflict_state= NO_CONFLICT;
252 thd->killed= NOT_KILLED;
253 wsrep_status_t rcode= wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
254 if (rcode != WSREP_OK)
255 {
256 WSREP_WARN("Post commit failed for SP replay: thd: %u error: %d",
257 thd->thread_id, rcode);
258 }
259 /* As replaying the transaction was successful, an error must not
260 be returned to client, so we need to reset the error state of
261 the diagnostics area */
262 thd->get_stmt_da()->reset_diagnostics_area();
263 break;
264 }
265 case WSREP_TRX_FAIL:
266 {
267 thd->wsrep_conflict_state= ABORTED;
268 wsrep_status_t rcode= wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
269 if (rcode != WSREP_OK)
270 {
271 WSREP_WARN("Post rollback failed for SP replay: thd: %u error: %d",
272 thd->thread_id, rcode);
273 }
274 if (thd->get_stmt_da()->is_set())
275 {
276 thd->get_stmt_da()->reset_diagnostics_area();
277 }
278 my_error(ER_LOCK_DEADLOCK, MYF(0));
279 break;
280 }
281 default:
282 WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
283 rcode,
284 (thd->db.str ? thd->db.str : "(null)"),
285 WSREP_QUERY(thd));
286 /* we're now in inconsistent state, must abort */
287 mysql_mutex_unlock(&thd->LOCK_thd_data);
288 unireg_abort(1);
289 break;
290 }
291
292 wsrep_cleanup_transaction(thd);
293
294 mysql_mutex_lock(&LOCK_wsrep_replaying);
295 wsrep_replaying--;
296 WSREP_DEBUG("replaying decreased: %d, thd: %u",
297 wsrep_replaying, thd->thread_id);
298 mysql_cond_broadcast(&COND_wsrep_replaying);
299 mysql_mutex_unlock(&LOCK_wsrep_replaying);
300
301 DBUG_VOID_RETURN;
302 }
303
wsrep_replay_transaction(THD * thd)304 void wsrep_replay_transaction(THD *thd)
305 {
306 DBUG_ENTER("wsrep_replay_transaction");
307 /* checking if BF trx must be replayed */
308 if (thd->wsrep_conflict_state== MUST_REPLAY) {
309 DBUG_ASSERT(wsrep_thd_trx_seqno(thd));
310 if (thd->wsrep_exec_mode!= REPL_RECV) {
311 if (thd->get_stmt_da()->is_sent())
312 {
313 WSREP_ERROR("replay issue, thd has reported status already");
314 }
315
316
317 /*
318 PS reprepare observer should have been removed already.
319 open_table() will fail if we have dangling observer here.
320 */
321 DBUG_ASSERT(thd->m_reprepare_observer == NULL);
322
323 struct da_shadow
324 {
325 enum Diagnostics_area::enum_diagnostics_status status;
326 ulonglong affected_rows;
327 ulonglong last_insert_id;
328 char message[MYSQL_ERRMSG_SIZE];
329 };
330 struct da_shadow da_status;
331 da_status.status= thd->get_stmt_da()->status();
332 if (da_status.status == Diagnostics_area::DA_OK)
333 {
334 da_status.affected_rows= thd->get_stmt_da()->affected_rows();
335 da_status.last_insert_id= thd->get_stmt_da()->last_insert_id();
336 strmake(da_status.message,
337 thd->get_stmt_da()->message(),
338 sizeof(da_status.message)-1);
339 }
340
341 thd->get_stmt_da()->reset_diagnostics_area();
342
343 thd->wsrep_conflict_state= REPLAYING;
344 mysql_mutex_unlock(&thd->LOCK_thd_data);
345
346 thd->reset_for_next_command();
347 thd->reset_killed();
348 close_thread_tables(thd);
349 if (thd->locked_tables_mode && thd->lock)
350 {
351 WSREP_DEBUG("releasing table lock for replaying (%lld)",
352 (longlong) thd->thread_id);
353 thd->locked_tables_list.unlock_locked_tables(thd);
354 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
355 }
356 thd->release_transactional_locks();
357 /*
358 Replaying will call MYSQL_START_STATEMENT when handling
359 BEGIN Query_log_event so end statement must be called before
360 replaying.
361 */
362 MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
363 thd->m_statement_psi= NULL;
364 thd->m_digest= NULL;
365 thd_proc_info(thd, "WSREP replaying trx");
366 WSREP_DEBUG("replay trx: %s %lld",
367 thd->query() ? thd->query() : "void",
368 (long long)wsrep_thd_trx_seqno(thd));
369 struct wsrep_thd_shadow shadow;
370 wsrep_prepare_bf_thd(thd, &shadow);
371
372 /* From trans_begin() */
373 thd->variables.option_bits|= OPTION_BEGIN;
374 thd->server_status|= SERVER_STATUS_IN_TRANS;
375
376 /* Allow tests to block the replayer thread using the DBUG facilities */
377 #ifdef ENABLED_DEBUG_SYNC
378 DBUG_EXECUTE_IF("sync.wsrep_replay_cb",
379 {
380 const char act[]=
381 "now "
382 "SIGNAL sync.wsrep_replay_cb_reached "
383 "WAIT_FOR signal.wsrep_replay_cb";
384 DBUG_ASSERT(!debug_sync_set_action(thd,
385 STRING_WITH_LEN(act)));
386 };);
387 #endif /* ENABLED_DEBUG_SYNC */
388
389 int rcode = wsrep->replay_trx(wsrep,
390 &thd->wsrep_ws_handle,
391 (void *)thd);
392
393 wsrep_return_from_bf_mode(thd, &shadow);
394 if (thd->wsrep_conflict_state!= REPLAYING)
395 WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
396
397 mysql_mutex_lock(&thd->LOCK_thd_data);
398
399 switch (rcode)
400 {
401 case WSREP_OK:
402 thd->wsrep_conflict_state= NO_CONFLICT;
403 wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
404 WSREP_DEBUG("trx_replay successful for: %lld %lld",
405 (longlong) thd->thread_id, (longlong) thd->real_id);
406 if (thd->get_stmt_da()->is_sent())
407 {
408 WSREP_WARN("replay ok, thd has reported status");
409 }
410 else if (thd->get_stmt_da()->is_set())
411 {
412 if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
413 thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
414 {
415 WSREP_WARN("replay ok, thd has error status %d",
416 thd->get_stmt_da()->status());
417 }
418 }
419 else
420 {
421 if (da_status.status == Diagnostics_area::DA_OK)
422 {
423 my_ok(thd,
424 da_status.affected_rows,
425 da_status.last_insert_id,
426 da_status.message);
427 }
428 else
429 {
430 my_ok(thd);
431 }
432 }
433 break;
434 case WSREP_TRX_FAIL:
435 if (thd->get_stmt_da()->is_sent())
436 {
437 WSREP_ERROR("replay failed, thd has reported status");
438 }
439 else
440 {
441 WSREP_DEBUG("replay failed, rolling back");
442 }
443 thd->wsrep_conflict_state= ABORTED;
444 wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
445 break;
446 default:
447 WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
448 rcode, thd->get_db(),
449 thd->query() ? thd->query() : "void");
450 /* we're now in inconsistent state, must abort */
451
452 /* http://bazaar.launchpad.net/~codership/codership-mysql/5.6/revision/3962#sql/wsrep_thd.cc */
453 mysql_mutex_unlock(&thd->LOCK_thd_data);
454
455 unireg_abort(1);
456 break;
457 }
458
459 wsrep_cleanup_transaction(thd);
460
461 mysql_mutex_lock(&LOCK_wsrep_replaying);
462 wsrep_replaying--;
463 WSREP_DEBUG("replaying decreased: %d, thd: %lld",
464 wsrep_replaying, (longlong) thd->thread_id);
465 mysql_cond_broadcast(&COND_wsrep_replaying);
466 mysql_mutex_unlock(&LOCK_wsrep_replaying);
467 }
468 }
469 DBUG_VOID_RETURN;
470 }
471
wsrep_replication_process(THD * thd)472 static void wsrep_replication_process(THD *thd)
473 {
474 int rcode;
475 DBUG_ENTER("wsrep_replication_process");
476
477 struct wsrep_thd_shadow shadow;
478 wsrep_prepare_bf_thd(thd, &shadow);
479
480 /* From trans_begin() */
481 thd->variables.option_bits|= OPTION_BEGIN;
482 thd->server_status|= SERVER_STATUS_IN_TRANS;
483
484 thd_proc_info(thd, "wsrep applier idle");
485 rcode = wsrep->recv(wsrep, (void *)thd);
486 DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
487
488 WSREP_INFO("applier thread exiting (code:%d)", rcode);
489
490 switch (rcode) {
491 case WSREP_OK:
492 case WSREP_NOT_IMPLEMENTED:
493 case WSREP_CONN_FAIL:
494 /* provider does not support slave operations / disconnected from group,
495 * just close applier thread */
496 break;
497 case WSREP_NODE_FAIL:
498 /* data inconsistency => SST is needed */
499 /* Note: we cannot just blindly restart replication here,
500 * SST might require server restart if storage engines must be
501 * initialized after SST */
502 WSREP_ERROR("node consistency compromised, aborting");
503 wsrep_kill_mysql(thd);
504 break;
505 case WSREP_WARNING:
506 case WSREP_TRX_FAIL:
507 case WSREP_TRX_MISSING:
508 /* these suggests a bug in provider code */
509 WSREP_WARN("bad return from recv() call: %d", rcode);
510 /* Shut down this node. */
511 /* fall through */
512 case WSREP_FATAL:
513 /* Cluster connectivity is lost.
514 *
515 * If applier was killed on purpose (KILL_CONNECTION), we
516 * avoid mysql shutdown. This is because the killer will then handle
517 * shutdown processing (or replication restarting)
518 */
519 if (thd->killed != KILL_CONNECTION)
520 {
521 wsrep_kill_mysql(thd);
522 }
523 break;
524 }
525
526 mysql_mutex_lock(&LOCK_thread_count);
527 wsrep_close_applier(thd);
528 mysql_cond_broadcast(&COND_thread_count);
529 mysql_mutex_unlock(&LOCK_thread_count);
530
531 if(thd->has_thd_temporary_tables())
532 {
533 WSREP_WARN("Applier %lld has temporary tables at exit.",
534 thd->thread_id);
535 }
536 wsrep_return_from_bf_mode(thd, &shadow);
537 DBUG_VOID_RETURN;
538 }
539
create_wsrep_THD(wsrep_thread_args * args,bool thread_count_lock)540 static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
541 {
542 if (!thread_count_lock)
543 mysql_mutex_lock(&LOCK_thread_count);
544
545 ulong old_wsrep_running_threads= wsrep_running_threads;
546
547 DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD ||
548 args->thread_type == WSREP_ROLLBACKER_THREAD);
549
550 bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD
551 ? key_wsrep_applier : key_wsrep_rollbacker,
552 &args->thread_id, &connection_attrib,
553 start_wsrep_THD, (void*)args);
554
555 if (res)
556 {
557 WSREP_ERROR("Can't create wsrep thread");
558 }
559
560 /*
561 if starting a thread on server startup, wait until the this thread's THD
562 is fully initialized (otherwise a THD initialization code might
563 try to access a partially initialized server data structure - MDEV-8208).
564 */
565 if (!mysqld_server_initialized)
566 {
567 while (old_wsrep_running_threads == wsrep_running_threads)
568 {
569 mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
570 }
571 }
572
573 if (!thread_count_lock)
574 mysql_mutex_unlock(&LOCK_thread_count);
575
576 return res;
577 }
578
wsrep_create_appliers(long threads,bool thread_count_lock)579 bool wsrep_create_appliers(long threads, bool thread_count_lock)
580 {
581 if (!wsrep_connected)
582 {
583 /* see wsrep_replication_start() for the logic */
584 if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
585 wsrep_provider && strcasecmp(wsrep_provider, "none"))
586 {
587 WSREP_ERROR("Trying to launch slave threads before creating "
588 "connection at '%s'", wsrep_cluster_address);
589 }
590 return true;
591 }
592
593 long wsrep_threads= 0;
594
595 while (wsrep_threads++ < threads) {
596 wsrep_thread_args* arg;
597
598 if((arg= (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL)
599 {
600 WSREP_ERROR("Can't allocate memory for wsrep replication thread %ld\n", wsrep_threads);
601 assert(0);
602 }
603
604 arg->thread_type= WSREP_APPLIER_THREAD;
605 arg->processor= wsrep_replication_process;
606
607 if (create_wsrep_THD(arg, thread_count_lock))
608 {
609 WSREP_ERROR("Can't create thread to manage wsrep replication");
610 my_free(arg);
611 return true;
612 }
613 }
614
615 return false;
616 }
617
wsrep_rollback_process(THD * thd)618 static void wsrep_rollback_process(THD *thd)
619 {
620 DBUG_ENTER("wsrep_rollback_process");
621
622 mysql_mutex_lock(&LOCK_wsrep_rollback);
623 wsrep_aborting_thd= NULL;
624
625 while (thd->killed == NOT_KILLED) {
626 thd_proc_info(thd, "WSREP aborter idle");
627 thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
628 thd->mysys_var->current_cond= &COND_wsrep_rollback;
629
630 mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
631
632 WSREP_DEBUG("WSREP rollback thread wakes for signal");
633
634 mysql_mutex_lock(&thd->mysys_var->mutex);
635 thd_proc_info(thd, "WSREP aborter active");
636 thd->mysys_var->current_mutex= 0;
637 thd->mysys_var->current_cond= 0;
638 mysql_mutex_unlock(&thd->mysys_var->mutex);
639
640 /* check for false alarms */
641 if (!wsrep_aborting_thd)
642 {
643 WSREP_DEBUG("WSREP rollback thread has empty abort queue");
644 }
645 /* process all entries in the queue */
646 while (wsrep_aborting_thd) {
647 THD *aborting;
648 wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
649 aborting = wsrep_aborting_thd->aborting_thd;
650 my_free(wsrep_aborting_thd);
651 wsrep_aborting_thd= next;
652 /*
653 * must release mutex, appliers my want to add more
654 * aborting thds in our work queue, while we rollback
655 */
656 mysql_mutex_unlock(&LOCK_wsrep_rollback);
657
658 mysql_mutex_lock(&aborting->LOCK_thd_data);
659 if (aborting->wsrep_conflict_state== ABORTED)
660 {
661 WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
662 (long long)aborting->real_id,
663 aborting->wsrep_conflict_state);
664
665 mysql_mutex_unlock(&aborting->LOCK_thd_data);
666 mysql_mutex_lock(&LOCK_wsrep_rollback);
667 continue;
668 }
669 aborting->wsrep_conflict_state= ABORTING;
670
671 mysql_mutex_unlock(&aborting->LOCK_thd_data);
672
673 set_current_thd(aborting);
674 aborting->store_globals();
675
676 mysql_mutex_lock(&aborting->LOCK_thd_data);
677 wsrep_client_rollback(aborting);
678 WSREP_DEBUG("WSREP rollbacker aborted thd: (%lld %lld)",
679 (longlong) aborting->thread_id,
680 (longlong) aborting->real_id);
681 mysql_mutex_unlock(&aborting->LOCK_thd_data);
682
683 set_current_thd(thd);
684 thd->store_globals();
685
686 mysql_mutex_lock(&LOCK_wsrep_rollback);
687 }
688 }
689
690 mysql_mutex_unlock(&LOCK_wsrep_rollback);
691 sql_print_information("WSREP: rollbacker thread exiting");
692
693 DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
694 DBUG_VOID_RETURN;
695 }
696
wsrep_create_rollbacker()697 void wsrep_create_rollbacker()
698 {
699 if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
700 {
701 wsrep_thread_args* arg;
702 if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
703 WSREP_ERROR("Can't allocate memory for wsrep rollbacker thread\n");
704 assert(0);
705 }
706
707 arg->thread_type = WSREP_ROLLBACKER_THREAD;
708 arg->processor = wsrep_rollback_process;
709
710 /* create rollbacker */
711 if (create_wsrep_THD(arg, false)) {
712 WSREP_WARN("Can't create thread to manage wsrep rollback");
713 my_free(arg);
714 return;
715 }
716 }
717 }
718
wsrep_thd_set_PA_safe(void * thd_ptr,my_bool safe)719 void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe)
720 {
721 if (thd_ptr)
722 {
723 THD* thd = (THD*)thd_ptr;
724 thd->wsrep_PA_safe = safe;
725 }
726 }
727
wsrep_thd_conflict_state(THD * thd,my_bool sync)728 enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd, my_bool sync)
729 {
730 enum wsrep_conflict_state state = NO_CONFLICT;
731 if (thd)
732 {
733 if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
734
735 state = thd->wsrep_conflict_state;
736 if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
737 }
738 return state;
739 }
740
wsrep_thd_is_wsrep(THD * thd)741 my_bool wsrep_thd_is_wsrep(THD *thd)
742 {
743 my_bool status = FALSE;
744 if (thd)
745 {
746 status = (WSREP(thd) && WSREP_PROVIDER_EXISTS);
747 }
748 return status;
749 }
750
wsrep_thd_is_BF(THD * thd,my_bool sync)751 my_bool wsrep_thd_is_BF(THD *thd, my_bool sync)
752 {
753 my_bool status = FALSE;
754 if (thd)
755 {
756 // THD can be BF only if provider exists
757 if (wsrep_thd_is_wsrep(thd))
758 {
759 if (sync)
760 mysql_mutex_lock(&thd->LOCK_thd_data);
761
762 status = ((thd->wsrep_exec_mode == REPL_RECV) ||
763 (thd->wsrep_exec_mode == TOTAL_ORDER));
764 if (sync)
765 mysql_mutex_unlock(&thd->LOCK_thd_data);
766 }
767 }
768 return status;
769 }
770
771 extern "C"
wsrep_thd_is_BF_or_commit(void * thd_ptr,my_bool sync)772 my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync)
773 {
774 bool status = FALSE;
775 if (thd_ptr)
776 {
777 THD* thd = (THD*)thd_ptr;
778 if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
779
780 status = ((thd->wsrep_exec_mode == REPL_RECV) ||
781 (thd->wsrep_exec_mode == TOTAL_ORDER) ||
782 (thd->wsrep_exec_mode == LOCAL_COMMIT));
783 if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
784 }
785 return status;
786 }
787
788 extern "C"
wsrep_thd_is_local(void * thd_ptr,my_bool sync)789 my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync)
790 {
791 bool status = FALSE;
792 if (thd_ptr)
793 {
794 THD* thd = (THD*)thd_ptr;
795 if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
796
797 status = (thd->wsrep_exec_mode == LOCAL_STATE);
798 if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
799 }
800 return status;
801 }
802
wsrep_abort_thd(void * bf_thd_ptr,void * victim_thd_ptr,my_bool signal)803 int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
804 {
805 THD *victim_thd= (THD *) victim_thd_ptr;
806 THD *bf_thd= (THD *) bf_thd_ptr;
807 DBUG_ENTER("wsrep_abort_thd");
808
809 mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
810 mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
811
812 if ( (WSREP(bf_thd) ||
813 ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
814 bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
815 victim_thd)
816 {
817 if ((victim_thd->wsrep_conflict_state == MUST_ABORT) ||
818 (victim_thd->wsrep_conflict_state == ABORTED) ||
819 (victim_thd->wsrep_conflict_state == ABORTING))
820 {
821 WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already "
822 "aborted. Ignoring.",
823 (bf_thd) ? (long long)bf_thd->real_id : 0,
824 (long long)victim_thd->real_id);
825 wsrep_thd_UNLOCK(victim_thd);
826 DBUG_RETURN(1);
827 }
828
829 WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
830 (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
831 ha_abort_transaction(bf_thd, victim_thd, signal);
832 }
833 else
834 {
835 WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
836 wsrep_thd_UNLOCK(victim_thd);
837 }
838
839 DBUG_RETURN(1);
840 }
841
842 extern "C"
wsrep_thd_in_locking_session(void * thd_ptr)843 int wsrep_thd_in_locking_session(void *thd_ptr)
844 {
845 if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
846 return 1;
847 }
848 return 0;
849 }
850
wsrep_thd_has_explicit_locks(THD * thd)851 bool wsrep_thd_has_explicit_locks(THD *thd)
852 {
853 assert(thd);
854 return thd->mdl_context.has_explicit_locks();
855 }
856
wsrep_thd_is_applier(MYSQL_THD thd)857 my_bool wsrep_thd_is_applier(MYSQL_THD thd)
858 {
859 my_bool is_applier= false;
860
861 if (thd && thd->wsrep_applier)
862 is_applier= true;
863
864 return (is_applier);
865 }
866
wsrep_set_load_multi_commit(THD * thd,bool split)867 void wsrep_set_load_multi_commit(THD *thd, bool split)
868 {
869 thd->wsrep_split_flag= split;
870 }
871
wsrep_is_load_multi_commit(THD * thd)872 bool wsrep_is_load_multi_commit(THD *thd)
873 {
874 return thd->wsrep_split_flag;
875 }
876
wsrep_report_bf_lock_wait(THD * thd,unsigned long long trx_id)877 void wsrep_report_bf_lock_wait(THD *thd,
878 unsigned long long trx_id)
879 {
880 if (thd)
881 {
882 WSREP_ERROR("Thread %s trx_id: %llu thread: %ld "
883 "seqno: %lld query_state: %s conf_state: %s exec_mode: %s "
884 "applier: %d query: %s",
885 wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
886 trx_id,
887 thd_get_thread_id(thd),
888 wsrep_thd_trx_seqno(thd),
889 wsrep_thd_query_state_str(thd),
890 wsrep_thd_conflict_state_str(thd),
891 wsrep_thd_exec_mode_str(thd),
892 thd->wsrep_applier,
893 wsrep_thd_query(thd));
894 }
895 }
896