1 /* Copyright (C) 2013-2021 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License along
13    with this program; if not, write to the Free Software Foundation, Inc.,
14    51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15 
16 #include "mariadb.h"
17 #include "wsrep_thd.h"
18 #include "wsrep_trans_observer.h"
19 #include "wsrep_high_priority_service.h"
20 #include "wsrep_storage_service.h"
21 #include "transaction.h"
22 #include "rpl_rli.h"
23 #include "log_event.h"
24 #include "sql_parse.h"
25 #include "mysqld.h"   // start_wsrep_THD();
26 #include "wsrep_applier.h"   // start_wsrep_THD();
27 #include "mysql/service_wsrep.h"
28 #include "debug_sync.h"
29 #include "slave.h"
30 #include "rpl_rli.h"
31 #include "rpl_mi.h"
32 
33 extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
34 
35 static Wsrep_thd_queue* wsrep_rollback_queue= 0;
36 static Atomic_counter<uint64_t> wsrep_bf_aborts_counter;
37 
38 
wsrep_show_bf_aborts(THD * thd,SHOW_VAR * var,char * buff,enum enum_var_type scope)39 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
40                           enum enum_var_type scope)
41 {
42   wsrep_local_bf_aborts= wsrep_bf_aborts_counter;
43   var->type= SHOW_LONGLONG;
44   var->value= (char*)&wsrep_local_bf_aborts;
45   return 0;
46 }
47 
wsrep_replication_process(THD * thd,void * arg)48 static void wsrep_replication_process(THD *thd,
49                                       void* arg __attribute__((unused)))
50 {
51   DBUG_ENTER("wsrep_replication_process");
52 
53   Wsrep_applier_service applier_service(thd);
54 
55   WSREP_INFO("Starting applier thread %llu", thd->thread_id);
56   enum wsrep::provider::status
57     ret= Wsrep_server_state::get_provider().run_applier(&applier_service);
58 
59   WSREP_INFO("Applier thread exiting ret: %d thd: %llu", ret, thd->thread_id);
60   mysql_mutex_lock(&LOCK_wsrep_slave_threads);
61   wsrep_close_applier(thd);
62   mysql_cond_broadcast(&COND_wsrep_slave_threads);
63   mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
64 
65   delete thd->wsrep_rgi->rli->mi;
66   delete thd->wsrep_rgi->rli;
67 
68   thd->wsrep_rgi->cleanup_after_session();
69   delete thd->wsrep_rgi;
70   thd->wsrep_rgi= NULL;
71 
72 
73   if(thd->has_thd_temporary_tables())
74   {
75     WSREP_WARN("Applier %lld has temporary tables at exit.",
76                thd->thread_id);
77   }
78   DBUG_VOID_RETURN;
79 }
80 
create_wsrep_THD(Wsrep_thd_args * args,bool mutex_protected)81 static bool create_wsrep_THD(Wsrep_thd_args* args, bool mutex_protected)
82 {
83   if (!mutex_protected)
84     mysql_mutex_lock(&LOCK_wsrep_slave_threads);
85 
86   ulong old_wsrep_running_threads= wsrep_running_threads;
87 
88   DBUG_ASSERT(args->thread_type() == WSREP_APPLIER_THREAD ||
89               args->thread_type() == WSREP_ROLLBACKER_THREAD);
90 
91   bool res= mysql_thread_create(args->thread_type() == WSREP_APPLIER_THREAD
92                                 ? key_wsrep_applier : key_wsrep_rollbacker,
93                                 args->thread_id(), &connection_attrib,
94                                 start_wsrep_THD, (void*)args);
95 
96   if (res)
97     WSREP_ERROR("Can't create wsrep thread");
98 
99   /*
100     if starting a thread on server startup, wait until the this thread's THD
101     is fully initialized (otherwise a THD initialization code might
102     try to access a partially initialized server data structure - MDEV-8208).
103   */
104   if (!mysqld_server_initialized)
105   {
106     while (old_wsrep_running_threads == wsrep_running_threads)
107     {
108       mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
109     }
110   }
111 
112   if (!mutex_protected)
113     mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
114 
115   return res;
116 }
117 
wsrep_create_appliers(long threads,bool mutex_protected)118 bool wsrep_create_appliers(long threads, bool mutex_protected)
119 {
120   /*  Dont' start slave threads if wsrep-provider or wsrep-cluster-address
121       is not set.
122   */
123   if (!WSREP_PROVIDER_EXISTS)
124   {
125     return false;
126   }
127 
128   DBUG_ASSERT(wsrep_cluster_address[0]);
129 
130   long wsrep_threads=0;
131 
132   while (wsrep_threads++ < threads)
133   {
134     Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process,
135                                             WSREP_APPLIER_THREAD,
136                                             pthread_self()));
137     if (create_wsrep_THD(args, mutex_protected))
138     {
139       WSREP_WARN("Can't create thread to manage wsrep replication");
140       return true;
141     }
142   }
143 
144   return false;
145 }
146 
wsrep_remove_streaming_fragments(THD * thd,const char * ctx)147 static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx)
148 {
149   wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
150   Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx);
151   storage_service->store_globals();
152   storage_service->adopt_transaction(thd->wsrep_trx());
153   storage_service->remove_fragments();
154   storage_service->commit(wsrep::ws_handle(transaction_id, 0),
155                           wsrep::ws_meta());
156   Wsrep_server_state::instance().server_service()
157     .release_storage_service(storage_service);
158   wsrep_store_threadvars(thd);
159 }
160 
wsrep_rollback_high_priority(THD * thd,THD * rollbacker)161 static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker)
162 {
163   WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)",
164               thd->thread_id, thd->real_id);
165   char* orig_thread_stack= thd->thread_stack;
166   thd->thread_stack= rollbacker->thread_stack;
167   DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
168   /* Must be streaming and must have been removed from the
169      server state streaming appliers map. */
170   DBUG_ASSERT(thd->wsrep_trx().is_streaming());
171   DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
172                 thd->wsrep_trx().server_id(),
173                 thd->wsrep_trx().id()));
174   DBUG_ASSERT(thd->wsrep_applier_service);
175 
176   /* Fragment removal should happen before rollback to make
177      the transaction non-observable in SR table after the rollback
178      completes. For correctness the order does not matter here,
179      but currently it is mandated by checks in some MTR tests. */
180   wsrep_remove_streaming_fragments(thd, "high priority");
181   thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
182                                        wsrep::ws_meta());
183   thd->wsrep_applier_service->after_apply();
184   thd->thread_stack= orig_thread_stack;
185   WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
186               thd->thread_id, thd->real_id);
187   /* Will free THD */
188   Wsrep_server_state::instance().server_service()
189     .release_high_priority_service(thd->wsrep_applier_service);
190 }
191 
wsrep_rollback_local(THD * thd,THD * rollbacker)192 static void wsrep_rollback_local(THD *thd, THD *rollbacker)
193 {
194   WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)",
195               thd->thread_id, thd->real_id);
196   char* orig_thread_stack= thd->thread_stack;
197   thd->thread_stack= rollbacker->thread_stack;
198   if (thd->wsrep_trx().is_streaming())
199   {
200     wsrep_remove_streaming_fragments(thd, "local");
201   }
202   /* Set thd->event_scheduler.data temporarily to NULL to avoid
203      callbacks to threadpool wait_begin() during rollback. */
204   auto saved_esd= thd->event_scheduler.data;
205   thd->event_scheduler.data= 0;
206   mysql_mutex_lock(&thd->LOCK_thd_data);
207   /* prepare THD for rollback processing */
208   thd->reset_for_next_command();
209   thd->lex->sql_command= SQLCOM_ROLLBACK;
210   mysql_mutex_unlock(&thd->LOCK_thd_data);
211   /* Perform a client rollback, restore globals and signal
212      the victim only when all the resources have been
213      released */
214   thd->wsrep_cs().client_service().bf_rollback();
215   wsrep_reset_threadvars(thd);
216   /* Assign saved event_scheduler.data back before letting
217      client to continue. */
218   thd->event_scheduler.data= saved_esd;
219   thd->thread_stack= orig_thread_stack;
220   thd->wsrep_cs().sync_rollback_complete();
221   WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
222               thd->thread_id, thd->real_id);
223 }
224 
wsrep_rollback_process(THD * rollbacker,void * arg)225 static void wsrep_rollback_process(THD *rollbacker,
226                                    void *arg __attribute__((unused)))
227 {
228   DBUG_ENTER("wsrep_rollback_process");
229 
230   THD* thd= NULL;
231   DBUG_ASSERT(!wsrep_rollback_queue);
232   wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker);
233   WSREP_INFO("Starting rollbacker thread %llu", rollbacker->thread_id);
234 
235   thd_proc_info(rollbacker, "wsrep aborter idle");
236   while ((thd= wsrep_rollback_queue->pop_front()) != NULL)
237   {
238     mysql_mutex_lock(&thd->LOCK_thd_data);
239     wsrep::client_state& cs(thd->wsrep_cs());
240     const wsrep::transaction& tx(cs.transaction());
241     if (tx.state() == wsrep::transaction::s_aborted)
242     {
243       WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
244                   (long long)thd->real_id,
245                   tx.state());
246       mysql_mutex_unlock(&thd->LOCK_thd_data);
247       continue;
248     }
249     mysql_mutex_unlock(&thd->LOCK_thd_data);
250 
251     wsrep_reset_threadvars(rollbacker);
252     wsrep_store_threadvars(thd);
253     thd->wsrep_cs().acquire_ownership();
254 
255     thd_proc_info(rollbacker, "wsrep aborter active");
256 
257     /* Rollback methods below may free thd pointer. Do not try
258        to access it after method returns. */
259     if (wsrep_thd_is_applying(thd))
260     {
261       wsrep_rollback_high_priority(thd, rollbacker);
262     }
263     else
264     {
265       wsrep_rollback_local(thd, rollbacker);
266     }
267     wsrep_store_threadvars(rollbacker);
268     thd_proc_info(rollbacker, "wsrep aborter idle");
269   }
270 
271   delete wsrep_rollback_queue;
272   wsrep_rollback_queue= NULL;
273 
274   WSREP_INFO("rollbacker thread exiting %llu", rollbacker->thread_id);
275 
276   DBUG_ASSERT(rollbacker->killed != NOT_KILLED);
277   DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
278   DBUG_VOID_RETURN;
279 }
280 
wsrep_create_rollbacker()281 void wsrep_create_rollbacker()
282 {
283   DBUG_ASSERT(wsrep_cluster_address[0]);
284   Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_rollback_process,
285                                           WSREP_ROLLBACKER_THREAD,
286                                           pthread_self()));
287 
288   /* create rollbacker */
289   if (create_wsrep_THD(args, false))
290     WSREP_WARN("Can't create thread to manage wsrep rollback");
291 }
292 
293 /*
294   Start async rollback process
295 
296   Asserts thd->LOCK_thd_data ownership
297  */
wsrep_fire_rollbacker(THD * thd)298 void wsrep_fire_rollbacker(THD *thd)
299 {
300   DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
301   DBUG_PRINT("wsrep",("enqueuing trx abort for %llu", thd->thread_id));
302   WSREP_DEBUG("enqueuing trx abort for (%llu)", thd->thread_id);
303   if (wsrep_rollback_queue->push_back(thd))
304   {
305     WSREP_WARN("duplicate thd %llu for rollbacker",
306                thd->thread_id);
307   }
308 }
309 
310 
wsrep_abort_thd(THD * bf_thd_ptr,THD * victim_thd_ptr,my_bool signal)311 int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal)
312 {
313   DBUG_ENTER("wsrep_abort_thd");
314   THD *victim_thd= (THD *) victim_thd_ptr;
315   THD *bf_thd= (THD *) bf_thd_ptr;
316 
317   mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
318   mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
319 
320   /* Note that when you use RSU node is desynced from cluster, thus WSREP(thd)
321   might not be true.
322   */
323   if ((WSREP(bf_thd) ||
324        ((WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
325 	 wsrep_thd_is_toi(bf_thd))) &&
326        victim_thd &&
327        !wsrep_thd_is_aborting(victim_thd))
328   {
329       WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
330                   (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
331       ha_abort_transaction(bf_thd, victim_thd, signal);
332   }
333   else
334   {
335     WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
336     wsrep_thd_UNLOCK(victim_thd);
337   }
338 
339   DBUG_RETURN(1);
340 }
341 
wsrep_bf_abort(THD * bf_thd,THD * victim_thd)342 bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd)
343 {
344   WSREP_LOG_THD(bf_thd, "BF aborter before");
345   WSREP_LOG_THD(victim_thd, "victim before");
346 
347   mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
348   mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
349 
350   DBUG_EXECUTE_IF("sync.wsrep_bf_abort",
351                   {
352                     const char act[]=
353                       "now "
354                       "SIGNAL sync.wsrep_bf_abort_reached "
355                       "WAIT_FOR signal.wsrep_bf_abort";
356                     DBUG_ASSERT(!debug_sync_set_action(bf_thd,
357                                                        STRING_WITH_LEN(act)));
358                   };);
359 
360   if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active())
361   {
362     WSREP_DEBUG("wsrep_bf_abort, BF abort for non active transaction");
363     switch (victim_thd->wsrep_trx().state())
364     {
365     case wsrep::transaction::s_aborting: /* fall through */
366     case wsrep::transaction::s_aborted:
367       WSREP_DEBUG("victim thd is already aborted or in aborting state.");
368       return false;
369     default:
370       break;
371     }
372     /* Test: galera_create_table_as_select. Here we enter wsrep-lib
373     were LOCK_thd_data will be acquired, thus we need to release it.
374     However, we can still hold LOCK_thd_kill to protect from
375     disconnect or delete. */
376     mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
377     wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id());
378     mysql_mutex_lock(&victim_thd->LOCK_thd_data);
379   }
380 
381   bool ret;
382   wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno());
383 
384   if (wsrep_thd_is_toi(bf_thd))
385   {
386     /* Here we enter wsrep-lib were LOCK_thd_data will be acquired,
387     thus we need to release it. However, we can still hold
388     LOCK_thd_kill to protect from disconnect or delete. */
389     mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
390     ret= victim_thd->wsrep_cs().total_order_bf_abort(bf_seqno);
391     mysql_mutex_lock(&victim_thd->LOCK_thd_data);
392   }
393   else
394   {
395     /* Test: mysql-wsrep-features#165. Here we enter wsrep-lib
396     were LOCK_thd_data will be acquired and later LOCK_thd_kill
397     thus we need to release them. */
398     wsrep_thd_UNLOCK(victim_thd);
399     ret= victim_thd->wsrep_cs().bf_abort(bf_seqno);
400     wsrep_thd_LOCK(victim_thd);
401   }
402   if (ret)
403   {
404     wsrep_bf_aborts_counter++;
405   }
406   return ret;
407 }
408 
wsrep_create_threadvars()409 int wsrep_create_threadvars()
410 {
411   int ret= 0;
412   if (thread_handling == SCHEDULER_TYPES_COUNT)
413   {
414     /* Caller should have called wsrep_reset_threadvars() before this
415        method. */
416     DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys));
417     pthread_setspecific(THR_KEY_mysys, 0);
418     ret= my_thread_init();
419   }
420   return ret;
421 }
422 
wsrep_delete_threadvars()423 void wsrep_delete_threadvars()
424 {
425   if (thread_handling == SCHEDULER_TYPES_COUNT)
426   {
427     /* The caller should have called wsrep_store_threadvars() before
428        this method. */
429     DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys));
430     /* Reset psi state to avoid deallocating applier thread
431        psi_thread. */
432 #ifdef HAVE_PSI_INTERFACE
433     PSI_thread *psi_thread= PSI_CALL_get_thread();
434     if (PSI_server)
435     {
436       PSI_server->set_thread(0);
437     }
438 #endif /* HAVE_PSI_INTERFACE */
439     my_thread_end();
440     PSI_CALL_set_thread(psi_thread);
441     pthread_setspecific(THR_KEY_mysys, 0);
442   }
443 }
444 
wsrep_assign_from_threadvars(THD * thd)445 void wsrep_assign_from_threadvars(THD *thd)
446 {
447   if (thread_handling == SCHEDULER_TYPES_COUNT)
448   {
449     st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
450     DBUG_ASSERT(mysys_var);
451     thd->set_mysys_var(mysys_var);
452   }
453 }
454 
wsrep_save_threadvars()455 Wsrep_threadvars wsrep_save_threadvars()
456 {
457   return Wsrep_threadvars{
458     current_thd,
459     (st_my_thread_var*) pthread_getspecific(THR_KEY_mysys)
460   };
461 }
462 
wsrep_restore_threadvars(const Wsrep_threadvars & globals)463 void wsrep_restore_threadvars(const Wsrep_threadvars& globals)
464 {
465   set_current_thd(globals.cur_thd);
466   pthread_setspecific(THR_KEY_mysys, globals.mysys_var);
467 }
468 
wsrep_store_threadvars(THD * thd)469 int wsrep_store_threadvars(THD *thd)
470 {
471   if (thread_handling ==  SCHEDULER_TYPES_COUNT)
472   {
473     pthread_setspecific(THR_KEY_mysys, thd->mysys_var);
474   }
475   return thd->store_globals();
476 }
477 
wsrep_reset_threadvars(THD * thd)478 void wsrep_reset_threadvars(THD *thd)
479 {
480   if (thread_handling == SCHEDULER_TYPES_COUNT)
481   {
482     pthread_setspecific(THR_KEY_mysys, 0);
483   }
484   else
485   {
486     thd->reset_globals();
487   }
488 }
489