1 /* Copyright (C) 2013 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License along
13    with this program; if not, write to the Free Software Foundation, Inc.,
14    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
15 
16 #include "wsrep_thd.h"
17 
18 #include "transaction.h"
19 #include "rpl_rli.h"
20 #include "log_event.h"
21 #include "sql_parse.h"
22 #include "sql_base.h" // close_thread_tables()
23 #include "mysqld.h"   // start_wsrep_THD();
24 #include "debug_sync.h"
25 
26 static long long wsrep_bf_aborts_counter = 0;
27 
wsrep_show_bf_aborts(THD * thd,SHOW_VAR * var,char * buff)28 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff)
29 {
30     wsrep_local_bf_aborts = my_atomic_load64(&wsrep_bf_aborts_counter);
31     var->type = SHOW_LONGLONG;
32     var->value = (char*)&wsrep_local_bf_aborts;
33     return 0;
34 }
35 
36 /* must have (&thd->LOCK_wsrep_thd) */
wsrep_client_rollback(THD * thd)37 void wsrep_client_rollback(THD *thd)
38 {
39   WSREP_DEBUG("client rollback due to BF abort for (%u), query: %s",
40               thd->thread_id(), WSREP_QUERY(thd));
41   my_atomic_add64(&wsrep_bf_aborts_counter, 1);
42 
43   thd->wsrep_conflict_state= ABORTING;
44   mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
45   trans_rollback(thd);
46 
47   if (thd->locked_tables_mode && thd->lock)
48   {
49     WSREP_DEBUG("unlocking tables for BF abort (%u)", thd->thread_id());
50     thd->locked_tables_list.unlock_locked_tables(thd);
51     thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
52   }
53 
54   if (thd->global_read_lock.is_acquired())
55   {
56     WSREP_DEBUG("unlocking GRL for BF abort (%u)", thd->thread_id());
57     thd->global_read_lock.unlock_global_read_lock(thd);
58   }
59 
60   /* Release transactional metadata locks. */
61   thd->mdl_context.release_transactional_locks();
62 
63   /* release explicit MDL locks */
64   thd->mdl_context.release_explicit_locks();
65 
66   if (thd->get_binlog_table_maps())
67   {
68     WSREP_DEBUG("clearing binlog table map for BF abort (%u)", thd->thread_id());
69     thd->clear_binlog_table_maps();
70   }
71   mysql_mutex_lock(&thd->LOCK_wsrep_thd);
72   thd->wsrep_conflict_state= ABORTED;
73 }
74 
75 #define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
76 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
77 #include "rpl_info_factory.h"
78 
wsrep_relay_log_init(const char * log_fname)79 static Relay_log_info* wsrep_relay_log_init(const char* log_fname)
80 {
81   uint rli_option = INFO_REPOSITORY_DUMMY;
82   Relay_log_info *rli= NULL;
83   rli = Rpl_info_factory::create_rli(rli_option, false, "wsrep", true);
84   if (!rli)
85   {
86     WSREP_ERROR("Failed to create RLI for wsrep thread, aborting");
87     unireg_abort(MYSQLD_ABORT_EXIT);
88   }
89   rli->set_rli_description_event(
90       new Format_description_log_event(BINLOG_VERSION));
91 
92   rli->current_mts_submode= new Mts_submode_wsrep();
93   return (rli);
94 }
95 
wsrep_prepare_bf_thd(THD * thd,struct wsrep_thd_shadow * shadow)96 static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
97 {
98   shadow->options       = thd->variables.option_bits;
99   shadow->server_status = thd->server_status;
100   shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
101   shadow->vio           = thd->active_vio;
102   shadow->rli_slave     = thd->rli_slave;
103   // Disable general logging on applier threads
104   thd->variables.option_bits |= OPTION_LOG_OFF;
105   // Enable binlogging if opt_log_slave_updates is set
106   if (opt_log_slave_updates)
107     thd->variables.option_bits|= OPTION_BIN_LOG;
108   else
109     thd->variables.option_bits&= ~(OPTION_BIN_LOG);
110 
111   /*
112     in 5.7, we declare applying to happen as with slave threads
113     this set here is for replaying, when local threads will operate
114     as slave for the duration of replaying
115   */
116   thd->slave_thread = TRUE;
117   if (!thd->wsrep_rli)
118   {
119     thd->wsrep_rli = wsrep_relay_log_init("wsrep_relay");
120     thd->rli_slave = thd->wsrep_rli;
121     thd->wsrep_rli->info_thd= thd;
122     thd->init_for_queries(thd->wsrep_rli);
123   }
124   thd->wsrep_rli->info_thd = thd;
125 
126   thd->wsrep_exec_mode= REPL_RECV;
127   thd->set_active_vio(0);
128   thd->clear_error();
129 
130   shadow->tx_isolation        = thd->variables.tx_isolation;
131   thd->variables.tx_isolation = ISO_READ_COMMITTED;
132   thd->tx_isolation           = ISO_READ_COMMITTED;
133 
134   shadow->db = thd->db();
135   thd->reset_db(NULL_CSTR);
136 
137   shadow->user_time = thd->user_time;
138   shadow->row_count_func= thd->get_row_count_func();
139 }
140 
wsrep_return_from_bf_mode(THD * thd,struct wsrep_thd_shadow * shadow)141 static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
142 {
143   thd->variables.option_bits  = shadow->options;
144   thd->server_status          = shadow->server_status;
145   thd->wsrep_exec_mode        = shadow->wsrep_exec_mode;
146   thd->set_active_vio(shadow->vio);
147   thd->variables.tx_isolation = shadow->tx_isolation;
148   thd->reset_db(shadow->db);
149   thd->user_time              = shadow->user_time;
150   assert(thd->rli_slave == thd->wsrep_rli);
151   thd->rli_slave              = shadow->rli_slave;
152 
153   delete thd->wsrep_rli->current_mts_submode;
154   thd->wsrep_rli->current_mts_submode = 0;
155   thd->wsrep_rli->cleanup_after_session();
156   delete thd->wsrep_rli;
157   thd->wsrep_rli = 0;
158   thd->slave_thread = (thd->rli_slave) ? TRUE : FALSE;
159   thd->set_row_count_func(shadow->row_count_func);
160 }
161 
wsrep_replay_sp_transaction(THD * thd)162 void wsrep_replay_sp_transaction(THD* thd)
163 {
164   DBUG_ENTER("wsrep_replay_sp_transaction");
165   mysql_mutex_assert_owner(&thd->LOCK_wsrep_thd);
166   assert(thd->wsrep_conflict_state == MUST_REPLAY);
167   assert(thd->sp_runtime_ctx);
168   assert(wsrep_thd_trx_seqno(thd) > 0);
169 
170   close_thread_tables(thd);
171   if (thd->locked_tables_mode && thd->lock)
172   {
173     WSREP_DEBUG("releasing table lock for replaying (%u)",
174                 thd->thread_id());
175     thd->locked_tables_list.unlock_locked_tables(thd);
176     thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
177   }
178   thd->mdl_context.release_transactional_locks();
179 
180   THD *replay_thd= new THD(true, true);
181   replay_thd->thread_stack= thd->thread_stack;
182 
183   struct wsrep_thd_shadow shadow;
184   wsrep_prepare_bf_thd(replay_thd, &shadow);
185   replay_thd->wsrep_trx_meta= thd->wsrep_trx_meta;
186   replay_thd->wsrep_ws_handle= thd->wsrep_ws_handle;
187   replay_thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
188   replay_thd->wsrep_conflict_state= REPLAYING;
189 
190   replay_thd->variables.option_bits|= OPTION_BEGIN;
191   replay_thd->server_status|= SERVER_STATUS_IN_TRANS;
192 
193   mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
194 
195   thd->restore_globals();
196   replay_thd->store_globals();
197   wsrep_status_t rcode= wsrep->replay_trx(wsrep,
198                                           &replay_thd->wsrep_ws_handle,
199                                           (void*) replay_thd);
200 
201   wsrep_return_from_bf_mode(replay_thd, &shadow);
202   replay_thd->restore_globals();
203   delete replay_thd;
204 
205   mysql_mutex_lock(&thd->LOCK_wsrep_thd);
206 
207   thd->store_globals();
208 
209   switch (rcode)
210   {
211   case WSREP_OK:
212     {
213       thd->wsrep_conflict_state= NO_CONFLICT;
214       thd->killed= THD::NOT_KILLED;
215       wsrep_status_t rcode= wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
216       if (rcode != WSREP_OK)
217       {
218         WSREP_WARN("Post commit failed for SP replay: thd: %u error: %d",
219                    thd->thread_id(), rcode);
220       }
221       /* As replaying the transaction was successful, an error must not
222          be returned to client, so we need to reset the error state of
223          the diagnostics area */
224       thd->get_stmt_da()->reset_diagnostics_area();
225       break;
226     }
227   case WSREP_TRX_FAIL:
228     {
229       thd->wsrep_conflict_state= ABORTED;
230       wsrep_status_t rcode= wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
231       if (rcode != WSREP_OK)
232       {
233         WSREP_WARN("Post rollback failed for SP replay: thd: %u error: %d",
234                    thd->thread_id(), rcode);
235       }
236       if (thd->get_stmt_da()->is_set())
237       {
238         thd->get_stmt_da()->reset_diagnostics_area();
239       }
240       my_error(ER_LOCK_DEADLOCK, MYF(0));
241       break;
242     }
243   default:
244     WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
245                 rcode,
246                 (thd->db().str ? thd->db().str : "(null)"),
247                 WSREP_QUERY(thd));
248     /* we're now in inconsistent state, must abort */
249     mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
250     unireg_abort(1);
251     break;
252   }
253 
254   wsrep_cleanup_transaction(thd);
255 
256   mysql_mutex_lock(&LOCK_wsrep_replaying);
257   wsrep_replaying--;
258   WSREP_DEBUG("replaying decreased: %d, thd: %u",
259               wsrep_replaying, thd->thread_id());
260   mysql_cond_broadcast(&COND_wsrep_replaying);
261   mysql_mutex_unlock(&LOCK_wsrep_replaying);
262 
263   DBUG_VOID_RETURN;
264 }
265 
wsrep_replay_transaction(THD * thd)266 void wsrep_replay_transaction(THD *thd)
267 {
268   DBUG_ENTER("wsrep_replay_transaction");
269   /* checking if BF trx must be replayed */
270   if (thd->wsrep_conflict_state== MUST_REPLAY) {
271     assert(wsrep_thd_trx_seqno(thd));
272     if (thd->wsrep_exec_mode!= REPL_RECV) {
273       if (thd->get_stmt_da()->is_sent())
274       {
275         WSREP_ERROR("replay issue, thd has reported status already");
276       }
277 
278       /* PS reprepare observer should have been removed already
279          open_table() will fail if we have dangling observer here
280        */
281       if (thd->get_reprepare_observer() && wsrep_log_conflicts)
282       {
283         WSREP_WARN("dangling observer in replay transaction: (thr %u %lld %s)",
284                    thd->thread_id(), thd->query_id, thd->query().str);
285       }
286 
287       struct da_shadow
288       {
289           enum Diagnostics_area::enum_diagnostics_status status;
290           ulonglong affected_rows;
291           ulonglong last_insert_id;
292           char message[MYSQL_ERRMSG_SIZE];
293       };
294       struct da_shadow da_status;
295       da_status.status= thd->get_stmt_da()->status();
296       if (da_status.status == Diagnostics_area::DA_OK)
297       {
298         da_status.affected_rows= thd->get_stmt_da()->affected_rows();
299         da_status.last_insert_id= thd->get_stmt_da()->last_insert_id();
300         strmake(da_status.message,
301                 thd->get_stmt_da()->message_text(),
302                 sizeof(da_status.message)-1);
303       }
304 
305       thd->get_stmt_da()->reset_diagnostics_area();
306 
307       thd->wsrep_conflict_state= REPLAYING;
308       mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
309 
310       mysql_reset_thd_for_next_command(thd);
311       thd->killed= THD::NOT_KILLED;
312       close_thread_tables(thd);
313       if (thd->locked_tables_mode && thd->lock)
314       {
315         WSREP_DEBUG("releasing table lock for replaying (%u)",
316                     thd->thread_id());
317         thd->locked_tables_list.unlock_locked_tables(thd);
318         thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
319       }
320       thd->mdl_context.release_transactional_locks();
321       /*
322         Replaying will call MYSQL_START_STATEMENT when handling
323         BEGIN Query_log_event so end statement must be called before
324         replaying.
325       */
326       MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
327       thd->m_statement_psi= NULL;
328       thd->m_digest= NULL;
329       thd_proc_info(thd, "wsrep replaying trx");
330       WSREP_DEBUG("replay trx: %s %lld",
331                   WSREP_QUERY(thd),
332                   (long long)wsrep_thd_trx_seqno(thd));
333       struct wsrep_thd_shadow shadow;
334       wsrep_prepare_bf_thd(thd, &shadow);
335 
336       /* From trans_begin() */
337       thd->variables.option_bits|= OPTION_BEGIN;
338       thd->server_status|= SERVER_STATUS_IN_TRANS;
339 
340       // Allow tests to block the applier thread using the DBUG facilities.
341       DBUG_EXECUTE_IF("sync.wsrep_replay_cb",
342                       {
343                         const char act[]=
344                           "now "
345                           "SIGNAL sync.wsrep_replay_cb_reached "
346                           "WAIT_FOR signal.wsrep_replay_cb";
347                         assert(!debug_sync_set_action(thd,
348                                                            STRING_WITH_LEN(act)));
349                       };);
350       int rcode = wsrep->replay_trx(wsrep,
351                                     &thd->wsrep_ws_handle,
352                                     (void *)thd);
353 
354       wsrep_return_from_bf_mode(thd, &shadow);
355       if (thd->wsrep_conflict_state!= REPLAYING)
356         WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
357 
358       mysql_mutex_lock(&thd->LOCK_wsrep_thd);
359 
360       switch (rcode)
361       {
362       case WSREP_OK:
363         thd->wsrep_conflict_state= NO_CONFLICT;
364         wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
365         WSREP_DEBUG("trx_replay successful for: %u %llu",
366                     thd->thread_id(), (long long)thd->real_id);
367         if (thd->get_stmt_da()->is_sent())
368         {
369           WSREP_WARN("replay ok, thd has reported status");
370         }
371         else if (thd->get_stmt_da()->is_set())
372         {
373           if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK)
374           {
375             WSREP_WARN("replay ok, thd has error status %d",
376                        thd->get_stmt_da()->status());
377           }
378         }
379         else
380         {
381           if (da_status.status == Diagnostics_area::DA_OK)
382           {
383             my_ok(thd,
384                   da_status.affected_rows,
385                   da_status.last_insert_id,
386                   da_status.message);
387           }
388           else
389           {
390             my_ok(thd);
391           }
392         }
393         break;
394       case WSREP_TRX_FAIL:
395         if (thd->get_stmt_da()->is_sent())
396         {
397           WSREP_ERROR("replay failed, thd has reported status");
398         }
399         else
400         {
401           WSREP_DEBUG("replay failed, rolling back");
402           //my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
403         }
404         thd->wsrep_conflict_state= ABORTED;
405         wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
406         break;
407       default:
408         WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
409                     rcode,
410                     (thd->db().str ? thd->db().str : "(null)"),
411                     WSREP_QUERY(thd));
412         /* we're now in inconsistent state, must abort */
413 	mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
414         unireg_abort(1);
415         break;
416       }
417 
418       wsrep_cleanup_transaction(thd);
419 
420       mysql_mutex_lock(&LOCK_wsrep_replaying);
421       wsrep_replaying--;
422       WSREP_DEBUG("replaying decreased: %d, thd: %u",
423                   wsrep_replaying, thd->thread_id());
424       mysql_cond_broadcast(&COND_wsrep_replaying);
425       mysql_mutex_unlock(&LOCK_wsrep_replaying);
426     }
427   }
428   DBUG_VOID_RETURN;
429 }
430 
wsrep_replication_process(THD * thd)431 static void wsrep_replication_process(THD *thd)
432 {
433   int rcode;
434   DBUG_ENTER("wsrep_replication_process");
435 
436   struct wsrep_thd_shadow shadow;
437 
438   wsrep_prepare_bf_thd(thd, &shadow);
439 
440   /* From trans_begin() */
441   thd->variables.option_bits|= OPTION_BEGIN;
442   thd->server_status|= SERVER_STATUS_IN_TRANS;
443 
444   rcode = wsrep->recv(wsrep, (void *)thd);
445   DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
446 
447   WSREP_INFO("applier thread exiting (code:%d)", rcode);
448 
449   switch (rcode) {
450   case WSREP_OK:
451   case WSREP_NOT_IMPLEMENTED:
452   case WSREP_CONN_FAIL:
453     /* provider does not support slave operations / disconnected from group,
454      * just close applier thread */
455     break;
456   case WSREP_NODE_FAIL:
457     /* data inconsistency => SST is needed */
458     /* Note: we cannot just blindly restart replication here,
459      * SST might require server restart if storage engines must be
460      * initialized after SST */
461     WSREP_ERROR("node consistency compromised, aborting");
462     wsrep_kill_mysql(thd);
463     break;
464   case WSREP_WARNING:
465   case WSREP_TRX_FAIL:
466   case WSREP_TRX_MISSING:
467     /* these suggests a bug in provider code */
468     WSREP_WARN("bad return from recv() call: %d", rcode);
469     /* fall through to node shutdown */
470   case WSREP_FATAL:
471     /* Cluster connectivity is lost.
472      *
473      * If applier was killed on purpose (KILL_CONNECTION), we
474      * avoid mysql shutdown. This is because the killer will then handle
475      * shutdown processing (or replication restarting)
476      */
477     if (thd->killed != THD::KILL_CONNECTION)
478     {
479       wsrep_kill_mysql(thd);
480     }
481     break;
482   }
483 
484   wsrep_close_applier(thd);
485 
486   TABLE *tmp;
487   while ((tmp = thd->temporary_tables))
488   {
489     WSREP_WARN("Applier %u, has temporary tables at exit: %s.%s",
490                thd->thread_id(),
491                (tmp->s) ? tmp->s->db.str : "void",
492                (tmp->s) ? tmp->s->table_name.str : "void");
493   }
494   wsrep_return_from_bf_mode(thd, &shadow);
495   DBUG_VOID_RETURN;
496 }
497 
create_wsrep_THD(wsrep_thd_processor_fun processor)498 static bool create_wsrep_THD(wsrep_thd_processor_fun processor)
499 {
500   my_thread_handle hThread;
501   bool res= mysql_thread_create(
502                             key_thread_handle_wsrep,
503                             &hThread, &connection_attrib,
504                             start_wsrep_THD, (void*)processor);
505 #ifdef WITH_WSREP_OUT
506   /*
507     MariaDB bug https://jira.mariadb.org/browse/MDEV-8208 has not been observed
508     in this server version. However, if it surfaces, the server startup must be
509     should be delayed here until wsrep_running_threads count ascends
510   */
511 #endif /* WITH_WSREP_OUT */
512   return res;
513 }
514 
wsrep_create_appliers(long threads)515 void wsrep_create_appliers(long threads)
516 {
517   if (!wsrep_connected)
518   {
519     /* see wsrep_replication_start() for the logic */
520     if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
521         wsrep_provider && strcasecmp(wsrep_provider, "none"))
522     {
523       WSREP_ERROR("Trying to launch slave threads before creating "
524                   "connection at '%s'", wsrep_cluster_address);
525       assert(0);
526     }
527     return;
528   }
529 
530   long wsrep_threads=0;
531   while (wsrep_threads++ < threads) {
532     if (create_wsrep_THD(wsrep_replication_process))
533       WSREP_WARN("Can't create thread to manage wsrep replication");
534   }
535 }
536 
wsrep_rollback_process(THD * thd)537 static void wsrep_rollback_process(THD *thd)
538 {
539   DBUG_ENTER("wsrep_rollback_process");
540 
541   mysql_mutex_lock(&LOCK_wsrep_rollback);
542   wsrep_aborting_thd= NULL;
543   while (thd->killed == THD::NOT_KILLED) {
544     thd_proc_info(thd, "wsrep aborter idle");
545     mysql_mutex_lock(&thd->LOCK_thd_data);
546     thd->current_mutex= &LOCK_wsrep_rollback;
547     thd->current_cond=  &COND_wsrep_rollback;
548     mysql_mutex_unlock(&thd->LOCK_thd_data);
549 
550     mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
551 
552     if (thd->killed != THD::NOT_KILLED)
553     {
554       WSREP_DEBUG("rollbacker thread canceled");
555       break;
556     }
557     WSREP_DEBUG("WSREP rollback thread wakes for signal");
558 
559     mysql_mutex_lock(&thd->LOCK_thd_data);
560     thd_proc_info(thd, "wsrep aborter active");
561     thd->current_mutex= 0;
562     thd->current_cond=  0;
563     mysql_mutex_unlock(&thd->LOCK_thd_data);
564 
565     /* check for false alarms */
566     if (!wsrep_aborting_thd)
567     {
568       WSREP_DEBUG("WSREP rollback thread has empty abort queue");
569     }
570     /* process all entries in the queue */
571     while (wsrep_aborting_thd) {
572       THD *aborting;
573       wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
574       aborting = wsrep_aborting_thd->aborting_thd;
575       my_free(wsrep_aborting_thd);
576       wsrep_aborting_thd= next;
577       /*
578        * must release mutex, appliers my want to add more
579        * aborting thds in our work queue, while we rollback
580        */
581       mysql_mutex_unlock(&LOCK_wsrep_rollback);
582 
583       mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
584       if (aborting->wsrep_conflict_state== ABORTED)
585       {
586         WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
587                     (long long)aborting->real_id,
588                     aborting->wsrep_conflict_state);
589 
590         mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
591         mysql_mutex_lock(&LOCK_wsrep_rollback);
592         continue;
593       }
594       aborting->wsrep_conflict_state= ABORTING;
595 
596       mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
597 
598       aborting->store_globals();
599 
600       mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
601       wsrep_client_rollback(aborting);
602       WSREP_DEBUG("WSREP rollbacker aborted thd: (%u %llu)",
603                   aborting->thread_id(), (long long)aborting->real_id);
604       mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
605 
606       thd->store_globals();
607       mysql_mutex_lock(&LOCK_wsrep_rollback);
608     }
609   }
610 
611   mysql_mutex_unlock(&LOCK_wsrep_rollback);
612   mysql_mutex_lock(&thd->LOCK_thd_data);
613   thd_proc_info(thd, "wsrep aborter shutting down");
614   thd->current_mutex= 0;
615   thd->current_cond=  0;
616   mysql_mutex_unlock(&thd->LOCK_thd_data);
617 
618   sql_print_information("WSREP: rollbacker thread exiting");
619   DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
620   DBUG_VOID_RETURN;
621 }
622 
wsrep_create_rollbacker()623 void wsrep_create_rollbacker()
624 {
625   if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
626   {
627     /* create rollbacker */
628     if (create_wsrep_THD(wsrep_rollback_process))
629       WSREP_WARN("Can't create thread to manage wsrep rollback");
630   }
631 }
632 
wsrep_thd_set_PA_safe(void * thd_ptr,my_bool safe)633 void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe)
634 {
635   if (thd_ptr)
636   {
637     THD* thd = (THD*)thd_ptr;
638     thd->wsrep_PA_safe = safe;
639   }
640 }
641 
wsrep_thd_conflict_state(void * thd_ptr,my_bool sync)642 int wsrep_thd_conflict_state(void *thd_ptr, my_bool sync)
643 {
644   int state = -1;
645   if (thd_ptr)
646   {
647     THD* thd = (THD*)thd_ptr;
648     if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
649 
650     state = thd->wsrep_conflict_state;
651     if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
652   }
653   return state;
654 }
655 
wsrep_thd_is_BF(void * thd_ptr,my_bool sync)656 my_bool wsrep_thd_is_BF(void *thd_ptr, my_bool sync)
657 {
658   my_bool status = FALSE;
659   if (thd_ptr)
660   {
661     THD* thd = (THD*)thd_ptr;
662     if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
663 
664     status = ((thd->wsrep_exec_mode == REPL_RECV)    ||
665               (thd->wsrep_exec_mode == TOTAL_ORDER));
666     if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
667   }
668   return status;
669 }
670 
671 extern "C"
wsrep_thd_is_BF_or_commit(void * thd_ptr,my_bool sync)672 my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync)
673 {
674   bool status = FALSE;
675   if (thd_ptr)
676   {
677     THD* thd = (THD*)thd_ptr;
678     if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
679 
680     status = ((thd->wsrep_exec_mode == REPL_RECV)    ||
681               (thd->wsrep_exec_mode == TOTAL_ORDER)  ||
682               (thd->wsrep_exec_mode == LOCAL_COMMIT));
683     if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
684   }
685   return status;
686 }
687 
688 extern "C"
wsrep_thd_is_local(void * thd_ptr,my_bool sync)689 my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync)
690 {
691   bool status = FALSE;
692   if (thd_ptr)
693   {
694     THD* thd = (THD*)thd_ptr;
695     if (sync) mysql_mutex_lock(&thd->LOCK_wsrep_thd);
696 
697     status = (thd->wsrep_exec_mode == LOCAL_STATE);
698     if (sync) mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
699   }
700   return status;
701 }
702 
wsrep_abort_thd(void * bf_thd_ptr,void * victim_thd_ptr,my_bool signal)703 int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
704 {
705   THD *victim_thd = (THD *) victim_thd_ptr;
706   THD *bf_thd     = (THD *) bf_thd_ptr;
707   DBUG_ENTER("wsrep_abort_thd");
708 
709   if ( (WSREP(bf_thd) ||
710          ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
711            bf_thd->wsrep_exec_mode == TOTAL_ORDER) )                         &&
712        victim_thd)
713   {
714     WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
715                 (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
716     ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
717   }
718   else
719   {
720     WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
721   }
722 
723   DBUG_RETURN(1);
724 }
725 
wsrep_thd_in_locking_session(void * thd_ptr)726 int wsrep_thd_in_locking_session(void *thd_ptr)
727 {
728   if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
729     return 1;
730   }
731   return 0;
732 }
733 
wsrep_thd_has_explicit_locks(THD * thd)734 bool wsrep_thd_has_explicit_locks(THD *thd)
735 {
736   assert(thd);
737   return (thd->mdl_context.wsrep_has_explicit_locks());
738 }
739 
740 /*
741   Get auto increment variables for THD. Use global settings for
742   applier threads.
743  */
744 extern "C"
wsrep_thd_auto_increment_variables(THD * thd,unsigned long long * offset,unsigned long long * increment)745 void wsrep_thd_auto_increment_variables(THD* thd,
746                                         unsigned long long* offset,
747                                         unsigned long long* increment)
748 {
749   if (thd->wsrep_exec_mode == REPL_RECV &&
750       thd->wsrep_conflict_state != REPLAYING)
751   {
752     *offset= global_system_variables.auto_increment_offset;
753     *increment= global_system_variables.auto_increment_increment;
754   }
755   else
756   {
757     *offset= thd->variables.auto_increment_offset;
758     *increment= thd->variables.auto_increment_increment;
759   }
760 }
761