1 /* Copyright (C) 2013-2021 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License along
13    with this program; if not, write to the Free Software Foundation, Inc.,
14    51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15 
16 #include "mariadb.h"
17 #include "wsrep_thd.h"
18 #include "transaction.h"
19 #include "rpl_rli.h"
20 #include "log_event.h"
21 #include "sql_parse.h"
22 //#include "global_threads.h" // LOCK_thread_count, etc.
23 #include "sql_base.h" // close_thread_tables()
24 #include "mysqld.h"   // start_wsrep_THD();
25 #include "debug_sync.h"
26 
27 #include "slave.h"    // opt_log_slave_updates
28 #include "rpl_filter.h"
29 #include "rpl_rli.h"
30 #include "rpl_mi.h"
31 
32 #if (__LP64__)
33 static volatile int64 wsrep_bf_aborts_counter(0);
34 #define WSREP_ATOMIC_LOAD_LONG my_atomic_load64
35 #define WSREP_ATOMIC_ADD_LONG  my_atomic_add64
36 #else
37 static volatile int32 wsrep_bf_aborts_counter(0);
38 #define WSREP_ATOMIC_LOAD_LONG my_atomic_load32
39 #define WSREP_ATOMIC_ADD_LONG  my_atomic_add32
40 #endif
41 
wsrep_show_bf_aborts(THD * thd,SHOW_VAR * var,char * buff,enum enum_var_type scope)42 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
43                           enum enum_var_type scope)
44 {
45   wsrep_local_bf_aborts = WSREP_ATOMIC_LOAD_LONG(&wsrep_bf_aborts_counter);
46   var->type = SHOW_LONGLONG;
47   var->value = (char*)&wsrep_local_bf_aborts;
48   return 0;
49 }
50 
51 /* must have (&thd->LOCK_thd_data) */
wsrep_client_rollback(THD * thd)52 void wsrep_client_rollback(THD *thd)
53 {
54   WSREP_DEBUG("client rollback due to BF abort for (%lld), query: %s",
55               (longlong) thd->thread_id, thd->query());
56 
57   WSREP_ATOMIC_ADD_LONG(&wsrep_bf_aborts_counter, 1);
58 
59   thd->wsrep_conflict_state= ABORTING;
60   mysql_mutex_unlock(&thd->LOCK_thd_data);
61   trans_rollback(thd);
62 
63   if (thd->locked_tables_mode && thd->lock)
64   {
65     WSREP_DEBUG("unlocking tables for BF abort (%lld)",
66                 (longlong) thd->thread_id);
67     thd->locked_tables_list.unlock_locked_tables(thd);
68     thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
69   }
70 
71   if (thd->global_read_lock.is_acquired())
72   {
73     WSREP_DEBUG("unlocking GRL for BF abort (%lld)",
74                 (longlong) thd->thread_id);
75     thd->global_read_lock.unlock_global_read_lock(thd);
76   }
77 
78   /* Release transactional metadata locks. */
79   thd->release_transactional_locks();
80 
81   /* release explicit MDL locks */
82   thd->mdl_context.release_explicit_locks();
83 
84   if (thd->get_binlog_table_maps())
85   {
86     WSREP_DEBUG("clearing binlog table map for BF abort (%lld)",
87                 (longlong) thd->thread_id);
88     thd->clear_binlog_table_maps();
89   }
90   mysql_mutex_lock(&thd->LOCK_thd_data);
91   thd->wsrep_conflict_state= ABORTED;
92 }
93 
94 #define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
95 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
96 
wsrep_relay_group_init(THD * thd,const char * log_fname)97 static rpl_group_info* wsrep_relay_group_init(THD *thd, const char* log_fname)
98 {
99   Relay_log_info* rli= new Relay_log_info(false);
100 
101   WSREP_DEBUG("wsrep_relay_group_init %s", log_fname);
102 
103   if (!rli->relay_log.description_event_for_exec)
104   {
105     rli->relay_log.description_event_for_exec=
106       new Format_description_log_event(4);
107   }
108 
109   static LEX_CSTRING connection_name= { STRING_WITH_LEN("wsrep") };
110 
111   /*
112     Master_info's constructor initializes rpl_filter by either an already
113     constructed Rpl_filter object from global 'rpl_filters' list if the
114     specified connection name is same, or it constructs a new Rpl_filter
115     object and adds it to rpl_filters. This object is later destructed by
116     Mater_info's destructor by looking it up based on connection name in
117     rpl_filters list.
118 
119     However, since all Master_info objects created here would share same
120     connection name ("wsrep"), destruction of any of the existing Master_info
121     objects (in wsrep_return_from_bf_mode()) would free rpl_filter referenced
122     by any/all existing Master_info objects.
123 
124     In order to avoid that, we have added a check in Master_info's destructor
125     to not free the "wsrep" rpl_filter. It will eventually be freed by
126     free_all_rpl_filters() when server terminates.
127   */
128   rli->mi = new Master_info(&connection_name, false);
129 
130   struct rpl_group_info *rgi= new rpl_group_info(rli);
131   rgi->thd= rli->sql_driver_thd= thd;
132 
133   if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
134   {
135     rgi->deferred_events= new Deferred_log_events(rli);
136   }
137 
138   return rgi;
139 }
140 
wsrep_prepare_bf_thd(THD * thd,struct wsrep_thd_shadow * shadow)141 static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
142 {
143   shadow->options       = thd->variables.option_bits;
144   shadow->server_status = thd->server_status;
145   shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
146   shadow->vio           = thd->net.vio;
147 
148   // Disable general logging on applier threads
149   thd->variables.option_bits |= OPTION_LOG_OFF;
150 
151   /* enable binlogging regardless of log_slave_updates setting
152      this is for ensuring that both local and applier transaction go through
153      same commit ordering algorithm in group commit control
154    */
155   thd->variables.option_bits|= OPTION_BIN_LOG;
156 
157   if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init(thd, "wsrep_relay");
158 
159   /* thd->system_thread_info.rpl_sql_info isn't initialized. */
160   if (!thd->slave_thread)
161     thd->system_thread_info.rpl_sql_info=
162       new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
163 
164   thd->wsrep_exec_mode= REPL_RECV;
165   thd->net.vio= 0;
166   thd->clear_error();
167 
168   shadow->tx_isolation        = thd->variables.tx_isolation;
169   thd->variables.tx_isolation = ISO_READ_COMMITTED;
170   thd->tx_isolation           = ISO_READ_COMMITTED;
171 
172   shadow->db            = thd->db.str;
173   shadow->db_length     = thd->db.length;
174   shadow->user_time     = thd->user_time;
175   shadow->row_count_func= thd->get_row_count_func();
176   thd->reset_db(&null_clex_str);
177 }
178 
wsrep_return_from_bf_mode(THD * thd,struct wsrep_thd_shadow * shadow)179 static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
180 {
181   LEX_CSTRING db= {shadow->db, shadow->db_length };
182   thd->variables.option_bits  = shadow->options;
183   thd->server_status          = shadow->server_status;
184   thd->wsrep_exec_mode        = shadow->wsrep_exec_mode;
185   thd->net.vio                = shadow->vio;
186   thd->variables.tx_isolation = shadow->tx_isolation;
187   thd->user_time              = shadow->user_time;
188   thd->reset_db(&db);
189 
190   if (!thd->slave_thread)
191     delete thd->system_thread_info.rpl_sql_info;
192   delete thd->wsrep_rgi->rli->mi;
193   delete thd->wsrep_rgi->rli;
194 
195   thd->wsrep_rgi->cleanup_after_session();
196   delete thd->wsrep_rgi;
197   thd->wsrep_rgi = NULL;
198   thd->set_row_count_func(shadow->row_count_func);
199 }
200 
wsrep_replay_sp_transaction(THD * thd)201 void wsrep_replay_sp_transaction(THD* thd)
202 {
203   DBUG_ENTER("wsrep_replay_sp_transaction");
204   mysql_mutex_assert_owner(&thd->LOCK_thd_data);
205   DBUG_ASSERT(thd->wsrep_conflict_state == MUST_REPLAY);
206   DBUG_ASSERT(wsrep_thd_trx_seqno(thd) > 0);
207 
208   WSREP_DEBUG("replaying SP transaction %llu", thd->thread_id);
209   close_thread_tables(thd);
210   if (thd->locked_tables_mode && thd->lock)
211   {
212     WSREP_DEBUG("releasing table lock for replaying (%u)",
213                 thd->thread_id);
214     thd->locked_tables_list.unlock_locked_tables(thd);
215     thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
216   }
217   thd->release_transactional_locks();
218 
219   mysql_mutex_unlock(&thd->LOCK_thd_data);
220   THD *replay_thd= new THD(true);
221   replay_thd->thread_stack= thd->thread_stack;
222 
223   struct wsrep_thd_shadow shadow;
224   wsrep_prepare_bf_thd(replay_thd, &shadow);
225   WSREP_DEBUG("replaying set for %p rgi %p", replay_thd, replay_thd->wsrep_rgi);  replay_thd->wsrep_trx_meta= thd->wsrep_trx_meta;
226   replay_thd->wsrep_ws_handle= thd->wsrep_ws_handle;
227   replay_thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
228   replay_thd->wsrep_conflict_state= REPLAYING;
229 
230   replay_thd->variables.option_bits|= OPTION_BEGIN;
231   replay_thd->server_status|= SERVER_STATUS_IN_TRANS;
232 
233   thd->reset_globals();
234   replay_thd->store_globals();
235   wsrep_status_t rcode= wsrep->replay_trx(wsrep,
236                                           &replay_thd->wsrep_ws_handle,
237                                           (void*) replay_thd);
238 
239   wsrep_return_from_bf_mode(replay_thd, &shadow);
240   replay_thd->reset_globals();
241   delete replay_thd;
242 
243   mysql_mutex_lock(&thd->LOCK_thd_data);
244 
245   thd->store_globals();
246 
247   switch (rcode)
248   {
249   case WSREP_OK:
250     {
251       thd->wsrep_conflict_state= NO_CONFLICT;
252       thd->killed= NOT_KILLED;
253       wsrep_status_t rcode= wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
254       if (rcode != WSREP_OK)
255       {
256         WSREP_WARN("Post commit failed for SP replay: thd: %u error: %d",
257                    thd->thread_id, rcode);
258       }
259       /* As replaying the transaction was successful, an error must not
260          be returned to client, so we need to reset the error state of
261          the diagnostics area */
262       thd->get_stmt_da()->reset_diagnostics_area();
263       break;
264     }
265   case WSREP_TRX_FAIL:
266     {
267       thd->wsrep_conflict_state= ABORTED;
268       wsrep_status_t rcode= wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
269       if (rcode != WSREP_OK)
270       {
271         WSREP_WARN("Post rollback failed for SP replay: thd: %u error: %d",
272                    thd->thread_id, rcode);
273       }
274       if (thd->get_stmt_da()->is_set())
275       {
276         thd->get_stmt_da()->reset_diagnostics_area();
277       }
278       my_error(ER_LOCK_DEADLOCK, MYF(0));
279       break;
280     }
281   default:
282     WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
283                 rcode,
284                 (thd->db.str ? thd->db.str : "(null)"),
285                 WSREP_QUERY(thd));
286     /* we're now in inconsistent state, must abort */
287     mysql_mutex_unlock(&thd->LOCK_thd_data);
288     unireg_abort(1);
289     break;
290   }
291 
292   wsrep_cleanup_transaction(thd);
293 
294   mysql_mutex_lock(&LOCK_wsrep_replaying);
295   wsrep_replaying--;
296   WSREP_DEBUG("replaying decreased: %d, thd: %u",
297               wsrep_replaying, thd->thread_id);
298   mysql_cond_broadcast(&COND_wsrep_replaying);
299   mysql_mutex_unlock(&LOCK_wsrep_replaying);
300 
301   DBUG_VOID_RETURN;
302 }
303 
wsrep_replay_transaction(THD * thd)304 void wsrep_replay_transaction(THD *thd)
305 {
306   DBUG_ENTER("wsrep_replay_transaction");
307   /* checking if BF trx must be replayed */
308   if (thd->wsrep_conflict_state== MUST_REPLAY) {
309     DBUG_ASSERT(wsrep_thd_trx_seqno(thd));
310     if (thd->wsrep_exec_mode!= REPL_RECV) {
311       if (thd->get_stmt_da()->is_sent())
312       {
313         WSREP_ERROR("replay issue, thd has reported status already");
314       }
315 
316 
317       /*
318         PS reprepare observer should have been removed already.
319         open_table() will fail if we have dangling observer here.
320       */
321       DBUG_ASSERT(thd->m_reprepare_observer == NULL);
322 
323       struct da_shadow
324       {
325           enum Diagnostics_area::enum_diagnostics_status status;
326           ulonglong affected_rows;
327           ulonglong last_insert_id;
328           char message[MYSQL_ERRMSG_SIZE];
329       };
330       struct da_shadow da_status;
331       da_status.status= thd->get_stmt_da()->status();
332       if (da_status.status == Diagnostics_area::DA_OK)
333       {
334         da_status.affected_rows= thd->get_stmt_da()->affected_rows();
335         da_status.last_insert_id= thd->get_stmt_da()->last_insert_id();
336         strmake(da_status.message,
337                 thd->get_stmt_da()->message(),
338                 sizeof(da_status.message)-1);
339       }
340 
341       thd->get_stmt_da()->reset_diagnostics_area();
342 
343       thd->wsrep_conflict_state= REPLAYING;
344       mysql_mutex_unlock(&thd->LOCK_thd_data);
345 
346       thd->reset_for_next_command();
347       thd->reset_killed();
348       close_thread_tables(thd);
349       if (thd->locked_tables_mode && thd->lock)
350       {
351         WSREP_DEBUG("releasing table lock for replaying (%lld)",
352                     (longlong) thd->thread_id);
353         thd->locked_tables_list.unlock_locked_tables(thd);
354         thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
355       }
356       thd->release_transactional_locks();
357       /*
358         Replaying will call MYSQL_START_STATEMENT when handling
359         BEGIN Query_log_event so end statement must be called before
360         replaying.
361       */
362       MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
363       thd->m_statement_psi= NULL;
364       thd->m_digest= NULL;
365       thd_proc_info(thd, "WSREP replaying trx");
366       WSREP_DEBUG("replay trx: %s %lld",
367                   thd->query() ? thd->query() : "void",
368                   (long long)wsrep_thd_trx_seqno(thd));
369       struct wsrep_thd_shadow shadow;
370       wsrep_prepare_bf_thd(thd, &shadow);
371 
372       /* From trans_begin() */
373       thd->variables.option_bits|= OPTION_BEGIN;
374       thd->server_status|= SERVER_STATUS_IN_TRANS;
375 
376       /* Allow tests to block the replayer thread using the DBUG facilities */
377 #ifdef ENABLED_DEBUG_SYNC
378       DBUG_EXECUTE_IF("sync.wsrep_replay_cb",
379       {
380         const char act[]=
381           "now "
382           "SIGNAL sync.wsrep_replay_cb_reached "
383           "WAIT_FOR signal.wsrep_replay_cb";
384         DBUG_ASSERT(!debug_sync_set_action(thd,
385                                            STRING_WITH_LEN(act)));
386        };);
387 #endif /* ENABLED_DEBUG_SYNC */
388 
389       int rcode = wsrep->replay_trx(wsrep,
390                                     &thd->wsrep_ws_handle,
391                                     (void *)thd);
392 
393       wsrep_return_from_bf_mode(thd, &shadow);
394       if (thd->wsrep_conflict_state!= REPLAYING)
395         WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
396 
397       mysql_mutex_lock(&thd->LOCK_thd_data);
398 
399       switch (rcode)
400       {
401       case WSREP_OK:
402         thd->wsrep_conflict_state= NO_CONFLICT;
403         wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
404         WSREP_DEBUG("trx_replay successful for: %lld %lld",
405                     (longlong) thd->thread_id, (longlong) thd->real_id);
406         if (thd->get_stmt_da()->is_sent())
407         {
408           WSREP_WARN("replay ok, thd has reported status");
409         }
410         else if (thd->get_stmt_da()->is_set())
411         {
412           if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
413               thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
414           {
415             WSREP_WARN("replay ok, thd has error status %d",
416                        thd->get_stmt_da()->status());
417           }
418         }
419         else
420         {
421           if (da_status.status == Diagnostics_area::DA_OK)
422           {
423             my_ok(thd,
424                   da_status.affected_rows,
425                   da_status.last_insert_id,
426                   da_status.message);
427           }
428           else
429           {
430             my_ok(thd);
431           }
432         }
433         break;
434       case WSREP_TRX_FAIL:
435         if (thd->get_stmt_da()->is_sent())
436         {
437           WSREP_ERROR("replay failed, thd has reported status");
438         }
439         else
440         {
441           WSREP_DEBUG("replay failed, rolling back");
442         }
443         thd->wsrep_conflict_state= ABORTED;
444         wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
445         break;
446       default:
447         WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
448                     rcode, thd->get_db(),
449                     thd->query() ? thd->query() : "void");
450         /* we're now in inconsistent state, must abort */
451 
452         /* http://bazaar.launchpad.net/~codership/codership-mysql/5.6/revision/3962#sql/wsrep_thd.cc */
453         mysql_mutex_unlock(&thd->LOCK_thd_data);
454 
455         unireg_abort(1);
456         break;
457       }
458 
459       wsrep_cleanup_transaction(thd);
460 
461       mysql_mutex_lock(&LOCK_wsrep_replaying);
462       wsrep_replaying--;
463       WSREP_DEBUG("replaying decreased: %d, thd: %lld",
464                   wsrep_replaying, (longlong) thd->thread_id);
465       mysql_cond_broadcast(&COND_wsrep_replaying);
466       mysql_mutex_unlock(&LOCK_wsrep_replaying);
467     }
468   }
469   DBUG_VOID_RETURN;
470 }
471 
wsrep_replication_process(THD * thd)472 static void wsrep_replication_process(THD *thd)
473 {
474   int rcode;
475   DBUG_ENTER("wsrep_replication_process");
476 
477   struct wsrep_thd_shadow shadow;
478   wsrep_prepare_bf_thd(thd, &shadow);
479 
480   /* From trans_begin() */
481   thd->variables.option_bits|= OPTION_BEGIN;
482   thd->server_status|= SERVER_STATUS_IN_TRANS;
483 
484   thd_proc_info(thd, "wsrep applier idle");
485   rcode = wsrep->recv(wsrep, (void *)thd);
486   DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
487 
488   WSREP_INFO("applier thread exiting (code:%d)", rcode);
489 
490   switch (rcode) {
491   case WSREP_OK:
492   case WSREP_NOT_IMPLEMENTED:
493   case WSREP_CONN_FAIL:
494     /* provider does not support slave operations / disconnected from group,
495      * just close applier thread */
496     break;
497   case WSREP_NODE_FAIL:
498     /* data inconsistency => SST is needed */
499     /* Note: we cannot just blindly restart replication here,
500      * SST might require server restart if storage engines must be
501      * initialized after SST */
502     WSREP_ERROR("node consistency compromised, aborting");
503     wsrep_kill_mysql(thd);
504     break;
505   case WSREP_WARNING:
506   case WSREP_TRX_FAIL:
507   case WSREP_TRX_MISSING:
508     /* these suggests a bug in provider code */
509     WSREP_WARN("bad return from recv() call: %d", rcode);
510     /* Shut down this node. */
511     /* fall through */
512   case WSREP_FATAL:
513     /* Cluster connectivity is lost.
514      *
515      * If applier was killed on purpose (KILL_CONNECTION), we
516      * avoid mysql shutdown. This is because the killer will then handle
517      * shutdown processing (or replication restarting)
518      */
519     if (thd->killed != KILL_CONNECTION)
520     {
521       wsrep_kill_mysql(thd);
522     }
523     break;
524   }
525 
526   mysql_mutex_lock(&LOCK_thread_count);
527   wsrep_close_applier(thd);
528   mysql_cond_broadcast(&COND_thread_count);
529   mysql_mutex_unlock(&LOCK_thread_count);
530 
531   if(thd->has_thd_temporary_tables())
532   {
533     WSREP_WARN("Applier %lld has temporary tables at exit.",
534                thd->thread_id);
535   }
536   wsrep_return_from_bf_mode(thd, &shadow);
537   DBUG_VOID_RETURN;
538 }
539 
create_wsrep_THD(wsrep_thread_args * args,bool thread_count_lock)540 static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
541 {
542   if (!thread_count_lock)
543     mysql_mutex_lock(&LOCK_thread_count);
544 
545   ulong old_wsrep_running_threads= wsrep_running_threads;
546 
547   DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD ||
548               args->thread_type == WSREP_ROLLBACKER_THREAD);
549 
550   bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD
551                                 ? key_wsrep_applier : key_wsrep_rollbacker,
552                                 &args->thread_id, &connection_attrib,
553                                 start_wsrep_THD, (void*)args);
554 
555   if (res)
556   {
557     WSREP_ERROR("Can't create wsrep thread");
558   }
559 
560   /*
561     if starting a thread on server startup, wait until the this thread's THD
562     is fully initialized (otherwise a THD initialization code might
563     try to access a partially initialized server data structure - MDEV-8208).
564   */
565   if (!mysqld_server_initialized)
566   {
567     while (old_wsrep_running_threads == wsrep_running_threads)
568     {
569       mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
570     }
571   }
572 
573   if (!thread_count_lock)
574     mysql_mutex_unlock(&LOCK_thread_count);
575 
576   return res;
577 }
578 
wsrep_create_appliers(long threads,bool thread_count_lock)579 bool wsrep_create_appliers(long threads, bool thread_count_lock)
580 {
581   if (!wsrep_connected)
582   {
583     /* see wsrep_replication_start() for the logic */
584     if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
585         wsrep_provider && strcasecmp(wsrep_provider, "none"))
586     {
587       WSREP_ERROR("Trying to launch slave threads before creating "
588                   "connection at '%s'", wsrep_cluster_address);
589     }
590     return true;
591   }
592 
593   long wsrep_threads= 0;
594 
595   while (wsrep_threads++ < threads) {
596     wsrep_thread_args* arg;
597 
598     if((arg= (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL)
599     {
600       WSREP_ERROR("Can't allocate memory for wsrep replication thread %ld\n", wsrep_threads);
601       assert(0);
602     }
603 
604     arg->thread_type= WSREP_APPLIER_THREAD;
605     arg->processor= wsrep_replication_process;
606 
607     if (create_wsrep_THD(arg, thread_count_lock))
608     {
609       WSREP_ERROR("Can't create thread to manage wsrep replication");
610       my_free(arg);
611       return true;
612     }
613   }
614 
615   return false;
616 }
617 
wsrep_rollback_process(THD * thd)618 static void wsrep_rollback_process(THD *thd)
619 {
620   DBUG_ENTER("wsrep_rollback_process");
621 
622   mysql_mutex_lock(&LOCK_wsrep_rollback);
623   wsrep_aborting_thd= NULL;
624 
625   while (thd->killed == NOT_KILLED) {
626     thd_proc_info(thd, "WSREP aborter idle");
627     thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
628     thd->mysys_var->current_cond=  &COND_wsrep_rollback;
629 
630     mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
631 
632     WSREP_DEBUG("WSREP rollback thread wakes for signal");
633 
634     mysql_mutex_lock(&thd->mysys_var->mutex);
635     thd_proc_info(thd, "WSREP aborter active");
636     thd->mysys_var->current_mutex= 0;
637     thd->mysys_var->current_cond=  0;
638     mysql_mutex_unlock(&thd->mysys_var->mutex);
639 
640     /* check for false alarms */
641     if (!wsrep_aborting_thd)
642     {
643       WSREP_DEBUG("WSREP rollback thread has empty abort queue");
644     }
645     /* process all entries in the queue */
646     while (wsrep_aborting_thd) {
647       THD *aborting;
648       wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
649       aborting = wsrep_aborting_thd->aborting_thd;
650       my_free(wsrep_aborting_thd);
651       wsrep_aborting_thd= next;
652       /*
653        * must release mutex, appliers my want to add more
654        * aborting thds in our work queue, while we rollback
655        */
656       mysql_mutex_unlock(&LOCK_wsrep_rollback);
657 
658       mysql_mutex_lock(&aborting->LOCK_thd_data);
659       if (aborting->wsrep_conflict_state== ABORTED)
660       {
661         WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
662                     (long long)aborting->real_id,
663                     aborting->wsrep_conflict_state);
664 
665         mysql_mutex_unlock(&aborting->LOCK_thd_data);
666         mysql_mutex_lock(&LOCK_wsrep_rollback);
667         continue;
668       }
669       aborting->wsrep_conflict_state= ABORTING;
670 
671       mysql_mutex_unlock(&aborting->LOCK_thd_data);
672 
673       set_current_thd(aborting);
674       aborting->store_globals();
675 
676       mysql_mutex_lock(&aborting->LOCK_thd_data);
677       wsrep_client_rollback(aborting);
678       WSREP_DEBUG("WSREP rollbacker aborted thd: (%lld %lld)",
679                   (longlong) aborting->thread_id,
680                   (longlong) aborting->real_id);
681       mysql_mutex_unlock(&aborting->LOCK_thd_data);
682 
683       set_current_thd(thd);
684       thd->store_globals();
685 
686       mysql_mutex_lock(&LOCK_wsrep_rollback);
687     }
688   }
689 
690   mysql_mutex_unlock(&LOCK_wsrep_rollback);
691   sql_print_information("WSREP: rollbacker thread exiting");
692 
693   DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
694   DBUG_VOID_RETURN;
695 }
696 
wsrep_create_rollbacker()697 void wsrep_create_rollbacker()
698 {
699   if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
700   {
701     wsrep_thread_args* arg;
702     if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
703       WSREP_ERROR("Can't allocate memory for wsrep rollbacker thread\n");
704       assert(0);
705     }
706 
707     arg->thread_type = WSREP_ROLLBACKER_THREAD;
708     arg->processor = wsrep_rollback_process;
709 
710     /* create rollbacker */
711     if (create_wsrep_THD(arg, false)) {
712       WSREP_WARN("Can't create thread to manage wsrep rollback");
713       my_free(arg);
714       return;
715     }
716   }
717 }
718 
wsrep_thd_set_PA_safe(void * thd_ptr,my_bool safe)719 void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe)
720 {
721   if (thd_ptr)
722   {
723     THD* thd = (THD*)thd_ptr;
724     thd->wsrep_PA_safe = safe;
725   }
726 }
727 
wsrep_thd_conflict_state(THD * thd,my_bool sync)728 enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd, my_bool sync)
729 {
730   enum wsrep_conflict_state state = NO_CONFLICT;
731   if (thd)
732   {
733     if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
734 
735     state = thd->wsrep_conflict_state;
736     if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
737   }
738   return state;
739 }
740 
wsrep_thd_is_wsrep(THD * thd)741 my_bool wsrep_thd_is_wsrep(THD *thd)
742 {
743   my_bool status = FALSE;
744   if (thd)
745   {
746     status = (WSREP(thd) && WSREP_PROVIDER_EXISTS);
747   }
748   return status;
749 }
750 
wsrep_thd_is_BF(THD * thd,my_bool sync)751 my_bool wsrep_thd_is_BF(THD *thd, my_bool sync)
752 {
753   my_bool status = FALSE;
754   if (thd)
755   {
756     // THD can be BF only if provider exists
757     if (wsrep_thd_is_wsrep(thd))
758     {
759       if (sync)
760 	mysql_mutex_lock(&thd->LOCK_thd_data);
761 
762       status = ((thd->wsrep_exec_mode == REPL_RECV)    ||
763       	        (thd->wsrep_exec_mode == TOTAL_ORDER));
764       if (sync)
765         mysql_mutex_unlock(&thd->LOCK_thd_data);
766     }
767   }
768   return status;
769 }
770 
771 extern "C"
wsrep_thd_is_BF_or_commit(void * thd_ptr,my_bool sync)772 my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync)
773 {
774   bool status = FALSE;
775   if (thd_ptr)
776   {
777     THD* thd = (THD*)thd_ptr;
778     if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
779 
780     status = ((thd->wsrep_exec_mode == REPL_RECV)    ||
781 	      (thd->wsrep_exec_mode == TOTAL_ORDER)  ||
782 	      (thd->wsrep_exec_mode == LOCAL_COMMIT));
783     if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
784   }
785   return status;
786 }
787 
788 extern "C"
wsrep_thd_is_local(void * thd_ptr,my_bool sync)789 my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync)
790 {
791   bool status = FALSE;
792   if (thd_ptr)
793   {
794     THD* thd = (THD*)thd_ptr;
795     if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
796 
797     status = (thd->wsrep_exec_mode == LOCAL_STATE);
798     if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
799   }
800   return status;
801 }
802 
wsrep_abort_thd(void * bf_thd_ptr,void * victim_thd_ptr,my_bool signal)803 int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
804 {
805   THD *victim_thd= (THD *) victim_thd_ptr;
806   THD *bf_thd= (THD *) bf_thd_ptr;
807   DBUG_ENTER("wsrep_abort_thd");
808 
809   mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
810   mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
811 
812   if ( (WSREP(bf_thd) ||
813          ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
814            bf_thd->wsrep_exec_mode == TOTAL_ORDER) )                         &&
815        victim_thd)
816   {
817     if ((victim_thd->wsrep_conflict_state == MUST_ABORT) ||
818         (victim_thd->wsrep_conflict_state == ABORTED) ||
819         (victim_thd->wsrep_conflict_state == ABORTING))
820     {
821       WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already "
822                   "aborted. Ignoring.",
823                   (bf_thd) ? (long long)bf_thd->real_id : 0,
824                   (long long)victim_thd->real_id);
825       wsrep_thd_UNLOCK(victim_thd);
826       DBUG_RETURN(1);
827     }
828 
829     WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
830                 (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
831     ha_abort_transaction(bf_thd, victim_thd, signal);
832   }
833   else
834   {
835     WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
836     wsrep_thd_UNLOCK(victim_thd);
837   }
838 
839   DBUG_RETURN(1);
840 }
841 
842 extern "C"
wsrep_thd_in_locking_session(void * thd_ptr)843 int wsrep_thd_in_locking_session(void *thd_ptr)
844 {
845   if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
846     return 1;
847   }
848   return 0;
849 }
850 
wsrep_thd_has_explicit_locks(THD * thd)851 bool wsrep_thd_has_explicit_locks(THD *thd)
852 {
853   assert(thd);
854   return thd->mdl_context.has_explicit_locks();
855 }
856 
wsrep_thd_is_applier(MYSQL_THD thd)857 my_bool wsrep_thd_is_applier(MYSQL_THD thd)
858 {
859   my_bool is_applier= false;
860 
861   if (thd && thd->wsrep_applier)
862     is_applier= true;
863 
864   return (is_applier);
865 }
866 
wsrep_set_load_multi_commit(THD * thd,bool split)867 void wsrep_set_load_multi_commit(THD *thd, bool split)
868 {
869    thd->wsrep_split_flag= split;
870 }
871 
wsrep_is_load_multi_commit(THD * thd)872 bool wsrep_is_load_multi_commit(THD *thd)
873 {
874    return thd->wsrep_split_flag;
875 }
876 
wsrep_report_bf_lock_wait(THD * thd,unsigned long long trx_id)877 void wsrep_report_bf_lock_wait(THD *thd,
878                                unsigned long long trx_id)
879 {
880   if (thd)
881   {
882     WSREP_ERROR("Thread %s trx_id: %llu thread: %ld "
883                 "seqno: %lld query_state: %s conf_state: %s exec_mode: %s "
884                 "applier: %d query: %s",
885                 wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
886                 trx_id,
887                 thd_get_thread_id(thd),
888                 wsrep_thd_trx_seqno(thd),
889                 wsrep_thd_query_state_str(thd),
890                 wsrep_thd_conflict_state_str(thd),
891                 wsrep_thd_exec_mode_str(thd),
892                 thd->wsrep_applier,
893                 wsrep_thd_query(thd));
894   }
895 }
896