1 /* Copyright (C) 2013 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
15
16 #include "wsrep_thd.h"
17
18 #include "transaction.h"
19 #include "rpl_rli.h"
20 #include "log_event.h"
21 #include "sql_parse.h"
22 #include "sql_base.h" // close_thread_tables()
23 #include "mysqld.h" // start_wsrep_THD();
24 #include "debug_sync.h"
25
26 static long long wsrep_bf_aborts_counter = 0;
27
wsrep_show_bf_aborts(THD * thd,SHOW_VAR * var,char * buff)28 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff)
29 {
30 wsrep_local_bf_aborts = my_atomic_load64(&wsrep_bf_aborts_counter);
31 var->type = SHOW_LONGLONG;
32 var->value = (char*)&wsrep_local_bf_aborts;
33 return 0;
34 }
35
36 /* must have (&thd->LOCK_wsrep_thd) */
wsrep_client_rollback(THD * thd)37 void wsrep_client_rollback(THD *thd)
38 {
39 WSREP_DEBUG("client rollback due to BF abort for (%u), query: %s",
40 thd->thread_id(), WSREP_QUERY(thd));
41 my_atomic_add64(&wsrep_bf_aborts_counter, 1);
42
43 thd->wsrep_conflict_state= ABORTING;
44 mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
45 trans_rollback(thd);
46
47 if (thd->locked_tables_mode && thd->lock)
48 {
49 WSREP_DEBUG("unlocking tables for BF abort (%u)", thd->thread_id());
50 thd->locked_tables_list.unlock_locked_tables(thd);
51 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
52 }
53
54 if (thd->global_read_lock.is_acquired())
55 {
56 WSREP_DEBUG("unlocking GRL for BF abort (%u)", thd->thread_id());
57 thd->global_read_lock.unlock_global_read_lock(thd);
58 }
59
60 /* Release transactional metadata locks. */
61 thd->mdl_context.release_transactional_locks();
62
63 /* release explicit MDL locks */
64 thd->mdl_context.release_explicit_locks();
65
66 if (thd->get_binlog_table_maps())
67 {
68 WSREP_DEBUG("clearing binlog table map for BF abort (%u)", thd->thread_id());
69 thd->clear_binlog_table_maps();
70 }
71 mysql_mutex_lock(&thd->LOCK_wsrep_thd);
72 thd->wsrep_conflict_state= ABORTED;
73 }
74
75 #define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
76 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
77 #include "rpl_info_factory.h"
78
wsrep_relay_log_init(const char * log_fname)79 static Relay_log_info* wsrep_relay_log_init(const char* log_fname)
80 {
81 uint rli_option = INFO_REPOSITORY_DUMMY;
82 Relay_log_info *rli= NULL;
83 rli = Rpl_info_factory::create_rli(rli_option, false, "wsrep", true);
84 if (!rli)
85 {
86 WSREP_ERROR("Failed to create RLI for wsrep thread, aborting");
87 unireg_abort(MYSQLD_ABORT_EXIT);
88 }
89 rli->set_rli_description_event(
90 new Format_description_log_event(BINLOG_VERSION));
91
92 rli->current_mts_submode= new Mts_submode_wsrep();
93 return (rli);
94 }
95
wsrep_prepare_bf_thd(THD * thd,struct wsrep_thd_shadow * shadow)96 static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
97 {
98 shadow->options = thd->variables.option_bits;
99 shadow->server_status = thd->server_status;
100 shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
101 shadow->vio = thd->active_vio;
102 shadow->rli_slave = thd->rli_slave;
103 // Disable general logging on applier threads
104 thd->variables.option_bits |= OPTION_LOG_OFF;
105 // Enable binlogging if opt_log_slave_updates is set
106 if (opt_log_slave_updates)
107 thd->variables.option_bits|= OPTION_BIN_LOG;
108 else
109 thd->variables.option_bits&= ~(OPTION_BIN_LOG);
110
111 /*
112 in 5.7, we declare applying to happen as with slave threads
113 this set here is for replaying, when local threads will operate
114 as slave for the duration of replaying
115 */
116 thd->slave_thread = TRUE;
117 if (!thd->wsrep_rli)
118 {
119 thd->wsrep_rli = wsrep_relay_log_init("wsrep_relay");
120 thd->rli_slave = thd->wsrep_rli;
121 thd->wsrep_rli->info_thd= thd;
122 thd->init_for_queries(thd->wsrep_rli);
123 }
124 thd->wsrep_rli->info_thd = thd;
125
126 thd->wsrep_exec_mode= REPL_RECV;
127 thd->set_active_vio(0);
128 thd->clear_error();
129
130 shadow->tx_isolation = thd->variables.tx_isolation;
131 thd->variables.tx_isolation = ISO_READ_COMMITTED;
132 thd->tx_isolation = ISO_READ_COMMITTED;
133
134 shadow->db = thd->db();
135 thd->reset_db(NULL_CSTR);
136
137 shadow->user_time = thd->user_time;
138 shadow->row_count_func= thd->get_row_count_func();
139 }
140
wsrep_return_from_bf_mode(THD * thd,struct wsrep_thd_shadow * shadow)141 static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
142 {
143 thd->variables.option_bits = shadow->options;
144 thd->server_status = shadow->server_status;
145 thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
146 thd->set_active_vio(shadow->vio);
147 thd->variables.tx_isolation = shadow->tx_isolation;
148 thd->reset_db(shadow->db);
149 thd->user_time = shadow->user_time;
150 assert(thd->rli_slave == thd->wsrep_rli);
151 thd->rli_slave = shadow->rli_slave;
152
153 delete thd->wsrep_rli->current_mts_submode;
154 thd->wsrep_rli->current_mts_submode = 0;
155 thd->wsrep_rli->cleanup_after_session();
156 delete thd->wsrep_rli;
157 thd->wsrep_rli = 0;
158 thd->slave_thread = (thd->rli_slave) ? TRUE : FALSE;
159 thd->set_row_count_func(shadow->row_count_func);
160 }
161
wsrep_replay_sp_transaction(THD * thd)162 void wsrep_replay_sp_transaction(THD* thd)
163 {
164 DBUG_ENTER("wsrep_replay_sp_transaction");
165 mysql_mutex_assert_owner(&thd->LOCK_wsrep_thd);
166 assert(thd->wsrep_conflict_state == MUST_REPLAY);
167 assert(thd->sp_runtime_ctx);
168 assert(wsrep_thd_trx_seqno(thd) > 0);
169
170 close_thread_tables(thd);
171 if (thd->locked_tables_mode && thd->lock)
172 {
173 WSREP_DEBUG("releasing table lock for replaying (%u)",
174 thd->thread_id());
175 thd->locked_tables_list.unlock_locked_tables(thd);
176 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
177 }
178 thd->mdl_context.release_transactional_locks();
179
180 THD *replay_thd= new THD(true, true);
181 replay_thd->thread_stack= thd->thread_stack;
182
183 struct wsrep_thd_shadow shadow;
184 wsrep_prepare_bf_thd(replay_thd, &shadow);
185 replay_thd->wsrep_trx_meta= thd->wsrep_trx_meta;
186 replay_thd->wsrep_ws_handle= thd->wsrep_ws_handle;
187 replay_thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
188 replay_thd->wsrep_conflict_state= REPLAYING;
189
190 replay_thd->variables.option_bits|= OPTION_BEGIN;
191 replay_thd->server_status|= SERVER_STATUS_IN_TRANS;
192
193 mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
194
195 thd->restore_globals();
196 replay_thd->store_globals();
197 wsrep_status_t rcode= wsrep->replay_trx(wsrep,
198 &replay_thd->wsrep_ws_handle,
199 (void*) replay_thd);
200
201 wsrep_return_from_bf_mode(replay_thd, &shadow);
202 replay_thd->restore_globals();
203 delete replay_thd;
204
205 mysql_mutex_lock(&thd->LOCK_wsrep_thd);
206
207 thd->store_globals();
208
209 switch (rcode)
210 {
211 case WSREP_OK:
212 {
213 thd->wsrep_conflict_state= NO_CONFLICT;
214 thd->killed= THD::NOT_KILLED;
215 wsrep_status_t rcode= wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
216 if (rcode != WSREP_OK)
217 {
218 WSREP_WARN("Post commit failed for SP replay: thd: %u error: %d",
219 thd->thread_id(), rcode);
220 }
221 /* As replaying the transaction was successful, an error must not
222 be returned to client, so we need to reset the error state of
223 the diagnostics area */
224 thd->get_stmt_da()->reset_diagnostics_area();
225 break;
226 }
227 case WSREP_TRX_FAIL:
228 {
229 thd->wsrep_conflict_state= ABORTED;
230 wsrep_status_t rcode= wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
231 if (rcode != WSREP_OK)
232 {
233 WSREP_WARN("Post rollback failed for SP replay: thd: %u error: %d",
234 thd->thread_id(), rcode);
235 }
236 if (thd->get_stmt_da()->is_set())
237 {
238 thd->get_stmt_da()->reset_diagnostics_area();
239 }
240 my_error(ER_LOCK_DEADLOCK, MYF(0));
241 break;
242 }
243 default:
244 WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
245 rcode,
246 (thd->db().str ? thd->db().str : "(null)"),
247 WSREP_QUERY(thd));
248 /* we're now in inconsistent state, must abort */
249 mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
250 unireg_abort(1);
251 break;
252 }
253
254 wsrep_cleanup_transaction(thd);
255
256 mysql_mutex_lock(&LOCK_wsrep_replaying);
257 wsrep_replaying--;
258 WSREP_DEBUG("replaying decreased: %d, thd: %u",
259 wsrep_replaying, thd->thread_id());
260 mysql_cond_broadcast(&COND_wsrep_replaying);
261 mysql_mutex_unlock(&LOCK_wsrep_replaying);
262
263 DBUG_VOID_RETURN;
264 }
265
wsrep_replay_transaction(THD * thd)266 void wsrep_replay_transaction(THD *thd)
267 {
268 DBUG_ENTER("wsrep_replay_transaction");
269 /* checking if BF trx must be replayed */
270 if (thd->wsrep_conflict_state== MUST_REPLAY) {
271 assert(wsrep_thd_trx_seqno(thd));
272 if (thd->wsrep_exec_mode!= REPL_RECV) {
273 if (thd->get_stmt_da()->is_sent())
274 {
275 WSREP_ERROR("replay issue, thd has reported status already");
276 }
277
278 /* PS reprepare observer should have been removed already
279 open_table() will fail if we have dangling observer here
280 */
281 if (thd->get_reprepare_observer() && wsrep_log_conflicts)
282 {
283 WSREP_WARN("dangling observer in replay transaction: (thr %u %lld %s)",
284 thd->thread_id(), thd->query_id, thd->query().str);
285 }
286
287 struct da_shadow
288 {
289 enum Diagnostics_area::enum_diagnostics_status status;
290 ulonglong affected_rows;
291 ulonglong last_insert_id;
292 char message[MYSQL_ERRMSG_SIZE];
293 };
294 struct da_shadow da_status;
295 da_status.status= thd->get_stmt_da()->status();
296 if (da_status.status == Diagnostics_area::DA_OK)
297 {
298 da_status.affected_rows= thd->get_stmt_da()->affected_rows();
299 da_status.last_insert_id= thd->get_stmt_da()->last_insert_id();
300 strmake(da_status.message,
301 thd->get_stmt_da()->message_text(),
302 sizeof(da_status.message)-1);
303 }
304
305 thd->get_stmt_da()->reset_diagnostics_area();
306
307 thd->wsrep_conflict_state= REPLAYING;
308 mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
309
310 mysql_reset_thd_for_next_command(thd);
311 thd->killed= THD::NOT_KILLED;
312 close_thread_tables(thd);
313 if (thd->locked_tables_mode && thd->lock)
314 {
315 WSREP_DEBUG("releasing table lock for replaying (%u)",
316 thd->thread_id());
317 thd->locked_tables_list.unlock_locked_tables(thd);
318 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
319 }
320 thd->mdl_context.release_transactional_locks();
321 /*
322 Replaying will call MYSQL_START_STATEMENT when handling
323 BEGIN Query_log_event so end statement must be called before
324 replaying.
325 */
326 MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
327 thd->m_statement_psi= NULL;
328 thd->m_digest= NULL;
329 thd_proc_info(thd, "wsrep replaying trx");
330 WSREP_DEBUG("replay trx: %s %lld",
331 WSREP_QUERY(thd),
332 (long long)wsrep_thd_trx_seqno(thd));
333 struct wsrep_thd_shadow shadow;
334 wsrep_prepare_bf_thd(thd, &shadow);
335
336 /* From trans_begin() */
337 thd->variables.option_bits|= OPTION_BEGIN;
338 thd->server_status|= SERVER_STATUS_IN_TRANS;
339
340 // Allow tests to block the applier thread using the DBUG facilities.
341 DBUG_EXECUTE_IF("sync.wsrep_replay_cb",
342 {
343 const char act[]=
344 "now "
345 "SIGNAL sync.wsrep_replay_cb_reached "
346 "WAIT_FOR signal.wsrep_replay_cb";
347 assert(!debug_sync_set_action(thd,
348 STRING_WITH_LEN(act)));
349 };);
350 int rcode = wsrep->replay_trx(wsrep,
351 &thd->wsrep_ws_handle,
352 (void *)thd);
353
354 wsrep_return_from_bf_mode(thd, &shadow);
355 if (thd->wsrep_conflict_state!= REPLAYING)
356 WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
357
358 mysql_mutex_lock(&thd->LOCK_wsrep_thd);
359
360 switch (rcode)
361 {
362 case WSREP_OK:
363 thd->wsrep_conflict_state= NO_CONFLICT;
364 wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
365 WSREP_DEBUG("trx_replay successful for: %u %llu",
366 thd->thread_id(), (long long)thd->real_id);
367 if (thd->get_stmt_da()->is_sent())
368 {
369 WSREP_WARN("replay ok, thd has reported status");
370 }
371 else if (thd->get_stmt_da()->is_set())
372 {
373 if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK)
374 {
375 WSREP_WARN("replay ok, thd has error status %d",
376 thd->get_stmt_da()->status());
377 }
378 }
379 else
380 {
381 if (da_status.status == Diagnostics_area::DA_OK)
382 {
383 my_ok(thd,
384 da_status.affected_rows,
385 da_status.last_insert_id,
386 da_status.message);
387 }
388 else
389 {
390 my_ok(thd);
391 }
392 }
393 break;
394 case WSREP_TRX_FAIL:
395 if (thd->get_stmt_da()->is_sent())
396 {
397 WSREP_ERROR("replay failed, thd has reported status");
398 }
399 else
400 {
401 WSREP_DEBUG("replay failed, rolling back");
402 //my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
403 }
404 thd->wsrep_conflict_state= ABORTED;
405 wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
406 break;
407 default:
408 WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
409 rcode,
410 (thd->db().str ? thd->db().str : "(null)"),
411 WSREP_QUERY(thd));
412 /* we're now in inconsistent state, must abort */
413 mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
414 unireg_abort(1);
415 break;
416 }
417
418 wsrep_cleanup_transaction(thd);
419
420 mysql_mutex_lock(&LOCK_wsrep_replaying);
421 wsrep_replaying--;
422 WSREP_DEBUG("replaying decreased: %d, thd: %u",
423 wsrep_replaying, thd->thread_id());
424 mysql_cond_broadcast(&COND_wsrep_replaying);
425 mysql_mutex_unlock(&LOCK_wsrep_replaying);
426 }
427 }
428 DBUG_VOID_RETURN;
429 }
430
wsrep_replication_process(THD * thd)431 static void wsrep_replication_process(THD *thd)
432 {
433 int rcode;
434 DBUG_ENTER("wsrep_replication_process");
435
436 struct wsrep_thd_shadow shadow;
437
438 wsrep_prepare_bf_thd(thd, &shadow);
439
440 /* From trans_begin() */
441 thd->variables.option_bits|= OPTION_BEGIN;
442 thd->server_status|= SERVER_STATUS_IN_TRANS;
443
444 rcode = wsrep->recv(wsrep, (void *)thd);
445 DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
446
447 WSREP_INFO("applier thread exiting (code:%d)", rcode);
448
449 switch (rcode) {
450 case WSREP_OK:
451 case WSREP_NOT_IMPLEMENTED:
452 case WSREP_CONN_FAIL:
453 /* provider does not support slave operations / disconnected from group,
454 * just close applier thread */
455 break;
456 case WSREP_NODE_FAIL:
457 /* data inconsistency => SST is needed */
458 /* Note: we cannot just blindly restart replication here,
459 * SST might require server restart if storage engines must be
460 * initialized after SST */
461 WSREP_ERROR("node consistency compromised, aborting");
462 wsrep_kill_mysql(thd);
463 break;
464 case WSREP_WARNING:
465 case WSREP_TRX_FAIL:
466 case WSREP_TRX_MISSING:
467 /* these suggests a bug in provider code */
468 WSREP_WARN("bad return from recv() call: %d", rcode);
469 /* fall through to node shutdown */
470 case WSREP_FATAL:
471 /* Cluster connectivity is lost.
472 *
473 * If applier was killed on purpose (KILL_CONNECTION), we
474 * avoid mysql shutdown. This is because the killer will then handle
475 * shutdown processing (or replication restarting)
476 */
477 if (thd->killed != THD::KILL_CONNECTION)
478 {
479 wsrep_kill_mysql(thd);
480 }
481 break;
482 }
483
484 wsrep_close_applier(thd);
485
486 TABLE *tmp;
487 while ((tmp = thd->temporary_tables))
488 {
489 WSREP_WARN("Applier %u, has temporary tables at exit: %s.%s",
490 thd->thread_id(),
491 (tmp->s) ? tmp->s->db.str : "void",
492 (tmp->s) ? tmp->s->table_name.str : "void");
493 }
494 wsrep_return_from_bf_mode(thd, &shadow);
495 DBUG_VOID_RETURN;
496 }
497
create_wsrep_THD(wsrep_thd_processor_fun processor)498 static bool create_wsrep_THD(wsrep_thd_processor_fun processor)
499 {
500 my_thread_handle hThread;
501 bool res= mysql_thread_create(
502 key_thread_handle_wsrep,
503 &hThread, &connection_attrib,
504 start_wsrep_THD, (void*)processor);
505 #ifdef WITH_WSREP_OUT
506 /*
507 MariaDB bug https://jira.mariadb.org/browse/MDEV-8208 has not been observed
508 in this server version. However, if it surfaces, the server startup must be
509 should be delayed here until wsrep_running_threads count ascends
510 */
511 #endif /* WITH_WSREP_OUT */
512 return res;
513 }
514
wsrep_create_appliers(long threads)515 void wsrep_create_appliers(long threads)
516 {
517 if (!wsrep_connected)
518 {
519 /* see wsrep_replication_start() for the logic */
520 if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
521 wsrep_provider && strcasecmp(wsrep_provider, "none"))
522 {
523 WSREP_ERROR("Trying to launch slave threads before creating "
524 "connection at '%s'", wsrep_cluster_address);
525 assert(0);
526 }
527 return;
528 }
529
530 long wsrep_threads=0;
531 while (wsrep_threads++ < threads) {
532 if (create_wsrep_THD(wsrep_replication_process))
533 WSREP_WARN("Can't create thread to manage wsrep replication");
534 }
535 }
536
wsrep_rollback_process(THD * thd)537 static void wsrep_rollback_process(THD *thd)
538 {
539 DBUG_ENTER("wsrep_rollback_process");
540
541 mysql_mutex_lock(&LOCK_wsrep_rollback);
542 wsrep_aborting_thd= NULL;
543 while (thd->killed == THD::NOT_KILLED) {
544 thd_proc_info(thd, "wsrep aborter idle");
545 mysql_mutex_lock(&thd->LOCK_thd_data);
546 thd->current_mutex= &LOCK_wsrep_rollback;
547 thd->current_cond= &COND_wsrep_rollback;
548 mysql_mutex_unlock(&thd->LOCK_thd_data);
549
550 mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
551
552 if (thd->killed != THD::NOT_KILLED)
553 {
554 WSREP_DEBUG("rollbacker thread canceled");
555 break;
556 }
557 WSREP_DEBUG("WSREP rollback thread wakes for signal");
558
559 mysql_mutex_lock(&thd->LOCK_thd_data);
560 thd_proc_info(thd, "wsrep aborter active");
561 thd->current_mutex= 0;
562 thd->current_cond= 0;
563 mysql_mutex_unlock(&thd->LOCK_thd_data);
564
565 /* check for false alarms */
566 if (!wsrep_aborting_thd)
567 {
568 WSREP_DEBUG("WSREP rollback thread has empty abort queue");
569 }
570 /* process all entries in the queue */
571 while (wsrep_aborting_thd) {
572 THD *aborting;
573 wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
574 aborting = wsrep_aborting_thd->aborting_thd;
575 my_free(wsrep_aborting_thd);
576 wsrep_aborting_thd= next;
577 /*
578 * must release mutex, appliers my want to add more
579 * aborting thds in our work queue, while we rollback
580 */
581 mysql_mutex_unlock(&LOCK_wsrep_rollback);
582
583 mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
584 if (aborting->wsrep_conflict_state== ABORTED)
585 {
586 WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
587 (long long)aborting->real_id,
588 aborting->wsrep_conflict_state);
589
590 mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
591 mysql_mutex_lock(&LOCK_wsrep_rollback);
592 continue;
593 }
594 aborting->wsrep_conflict_state= ABORTING;
595
596 mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
597
598 aborting->store_globals();
599
600 mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
601 wsrep_client_rollback(aborting);
602 WSREP_DEBUG("WSREP rollbacker aborted thd: (%u %llu)",
603 aborting->thread_id(), (long long)aborting->real_id);
604 mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
605
606 thd->store_globals();
607 mysql_mutex_lock(&LOCK_wsrep_rollback);
608 }
609 }
610
611 mysql_mutex_unlock(&LOCK_wsrep_rollback);
612 mysql_mutex_lock(&thd->LOCK_thd_data);
613 thd_proc_info(thd, "wsrep aborter shutting down");
614 thd->current_mutex= 0;
615 thd->current_cond= 0;
616 mysql_mutex_unlock(&thd->LOCK_thd_data);
617
618 sql_print_information("WSREP: rollbacker thread exiting");
619 DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
620 DBUG_VOID_RETURN;
621 }
622
wsrep_create_rollbacker()623 void wsrep_create_rollbacker()
624 {
625 if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
626 {
627 /* create rollbacker */
628 if (create_wsrep_THD(wsrep_rollback_process))
629 WSREP_WARN("Can't create thread to manage wsrep rollback");
630 }
631 }
632
wsrep_thd_set_PA_safe(void * thd_ptr,my_bool safe)633 void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe)
634 {
635 if (thd_ptr)
636 {
637 THD* thd = (THD*)thd_ptr;
638 thd->wsrep_PA_safe = safe;
639 }
640 }
641
wsrep_thd_conflict_state(void * thd_ptr,my_bool sync)642 int wsrep_thd_conflict_state(void *thd_ptr, my_bool sync)
643 {
644 int state = -1;
645 if (thd_ptr)
646 {
647 THD* thd = (THD*)thd_ptr;
648 if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
649
650 state = thd->wsrep_conflict_state;
651 if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
652 }
653 return state;
654 }
655
wsrep_thd_is_BF(void * thd_ptr,my_bool sync)656 my_bool wsrep_thd_is_BF(void *thd_ptr, my_bool sync)
657 {
658 my_bool status = FALSE;
659 if (thd_ptr)
660 {
661 THD* thd = (THD*)thd_ptr;
662 if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
663
664 status = ((thd->wsrep_exec_mode == REPL_RECV) ||
665 (thd->wsrep_exec_mode == TOTAL_ORDER));
666 if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
667 }
668 return status;
669 }
670
671 extern "C"
wsrep_thd_is_BF_or_commit(void * thd_ptr,my_bool sync)672 my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync)
673 {
674 bool status = FALSE;
675 if (thd_ptr)
676 {
677 THD* thd = (THD*)thd_ptr;
678 if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
679
680 status = ((thd->wsrep_exec_mode == REPL_RECV) ||
681 (thd->wsrep_exec_mode == TOTAL_ORDER) ||
682 (thd->wsrep_exec_mode == LOCAL_COMMIT));
683 if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
684 }
685 return status;
686 }
687
688 extern "C"
wsrep_thd_is_local(void * thd_ptr,my_bool sync)689 my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync)
690 {
691 bool status = FALSE;
692 if (thd_ptr)
693 {
694 THD* thd = (THD*)thd_ptr;
695 if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
696
697 status = (thd->wsrep_exec_mode == LOCAL_STATE);
698 if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
699 }
700 return status;
701 }
702
wsrep_abort_thd(void * bf_thd_ptr,void * victim_thd_ptr,my_bool signal)703 int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
704 {
705 THD *victim_thd = (THD *) victim_thd_ptr;
706 THD *bf_thd = (THD *) bf_thd_ptr;
707 DBUG_ENTER("wsrep_abort_thd");
708
709 if ( (WSREP(bf_thd) ||
710 ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
711 bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
712 victim_thd)
713 {
714 WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
715 (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
716 ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
717 }
718 else
719 {
720 WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
721 }
722
723 DBUG_RETURN(1);
724 }
725
wsrep_thd_in_locking_session(void * thd_ptr)726 int wsrep_thd_in_locking_session(void *thd_ptr)
727 {
728 if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
729 return 1;
730 }
731 return 0;
732 }
733
wsrep_thd_has_explicit_locks(THD * thd)734 bool wsrep_thd_has_explicit_locks(THD *thd)
735 {
736 assert(thd);
737 return (thd->mdl_context.wsrep_has_explicit_locks());
738 }
739
740 /*
741 Get auto increment variables for THD. Use global settings for
742 applier threads.
743 */
744 extern "C"
wsrep_thd_auto_increment_variables(THD * thd,unsigned long long * offset,unsigned long long * increment)745 void wsrep_thd_auto_increment_variables(THD* thd,
746 unsigned long long* offset,
747 unsigned long long* increment)
748 {
749 if (thd->wsrep_exec_mode == REPL_RECV &&
750 thd->wsrep_conflict_state != REPLAYING)
751 {
752 *offset= global_system_variables.auto_increment_offset;
753 *increment= global_system_variables.auto_increment_increment;
754 }
755 else
756 {
757 *offset= thd->variables.auto_increment_offset;
758 *increment= thd->variables.auto_increment_increment;
759 }
760 }
761