1 /* Copyright (C) 2013-2021 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15
16 #include "mariadb.h"
17 #include "wsrep_thd.h"
18 #include "wsrep_trans_observer.h"
19 #include "wsrep_high_priority_service.h"
20 #include "wsrep_storage_service.h"
21 #include "transaction.h"
22 #include "rpl_rli.h"
23 #include "log_event.h"
24 #include "sql_parse.h"
25 #include "mysqld.h" // start_wsrep_THD();
26 #include "wsrep_applier.h" // start_wsrep_THD();
27 #include "mysql/service_wsrep.h"
28 #include "debug_sync.h"
29 #include "slave.h"
30 #include "rpl_rli.h"
31 #include "rpl_mi.h"
32
33 extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
34
35 static Wsrep_thd_queue* wsrep_rollback_queue= 0;
36 static Atomic_counter<uint64_t> wsrep_bf_aborts_counter;
37
38
wsrep_show_bf_aborts(THD * thd,SHOW_VAR * var,char * buff,enum enum_var_type scope)39 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
40 enum enum_var_type scope)
41 {
42 wsrep_local_bf_aborts= wsrep_bf_aborts_counter;
43 var->type= SHOW_LONGLONG;
44 var->value= (char*)&wsrep_local_bf_aborts;
45 return 0;
46 }
47
wsrep_replication_process(THD * thd,void * arg)48 static void wsrep_replication_process(THD *thd,
49 void* arg __attribute__((unused)))
50 {
51 DBUG_ENTER("wsrep_replication_process");
52
53 Wsrep_applier_service applier_service(thd);
54
55 WSREP_INFO("Starting applier thread %llu", thd->thread_id);
56 enum wsrep::provider::status
57 ret= Wsrep_server_state::get_provider().run_applier(&applier_service);
58
59 WSREP_INFO("Applier thread exiting ret: %d thd: %llu", ret, thd->thread_id);
60 mysql_mutex_lock(&LOCK_wsrep_slave_threads);
61 wsrep_close_applier(thd);
62 mysql_cond_broadcast(&COND_wsrep_slave_threads);
63 mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
64
65 delete thd->wsrep_rgi->rli->mi;
66 delete thd->wsrep_rgi->rli;
67
68 thd->wsrep_rgi->cleanup_after_session();
69 delete thd->wsrep_rgi;
70 thd->wsrep_rgi= NULL;
71
72
73 if(thd->has_thd_temporary_tables())
74 {
75 WSREP_WARN("Applier %lld has temporary tables at exit.",
76 thd->thread_id);
77 }
78 DBUG_VOID_RETURN;
79 }
80
create_wsrep_THD(Wsrep_thd_args * args,bool mutex_protected)81 static bool create_wsrep_THD(Wsrep_thd_args* args, bool mutex_protected)
82 {
83 if (!mutex_protected)
84 mysql_mutex_lock(&LOCK_wsrep_slave_threads);
85
86 ulong old_wsrep_running_threads= wsrep_running_threads;
87
88 DBUG_ASSERT(args->thread_type() == WSREP_APPLIER_THREAD ||
89 args->thread_type() == WSREP_ROLLBACKER_THREAD);
90
91 bool res= mysql_thread_create(args->thread_type() == WSREP_APPLIER_THREAD
92 ? key_wsrep_applier : key_wsrep_rollbacker,
93 args->thread_id(), &connection_attrib,
94 start_wsrep_THD, (void*)args);
95
96 if (res)
97 WSREP_ERROR("Can't create wsrep thread");
98
99 /*
100 if starting a thread on server startup, wait until the this thread's THD
101 is fully initialized (otherwise a THD initialization code might
102 try to access a partially initialized server data structure - MDEV-8208).
103 */
104 if (!mysqld_server_initialized)
105 {
106 while (old_wsrep_running_threads == wsrep_running_threads)
107 {
108 mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
109 }
110 }
111
112 if (!mutex_protected)
113 mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
114
115 return res;
116 }
117
wsrep_create_appliers(long threads,bool mutex_protected)118 bool wsrep_create_appliers(long threads, bool mutex_protected)
119 {
120 /* Dont' start slave threads if wsrep-provider or wsrep-cluster-address
121 is not set.
122 */
123 if (!WSREP_PROVIDER_EXISTS)
124 {
125 return false;
126 }
127
128 DBUG_ASSERT(wsrep_cluster_address[0]);
129
130 long wsrep_threads=0;
131
132 while (wsrep_threads++ < threads)
133 {
134 Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process,
135 WSREP_APPLIER_THREAD,
136 pthread_self()));
137 if (create_wsrep_THD(args, mutex_protected))
138 {
139 WSREP_WARN("Can't create thread to manage wsrep replication");
140 return true;
141 }
142 }
143
144 return false;
145 }
146
wsrep_remove_streaming_fragments(THD * thd,const char * ctx)147 static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx)
148 {
149 wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
150 Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx);
151 storage_service->store_globals();
152 storage_service->adopt_transaction(thd->wsrep_trx());
153 storage_service->remove_fragments();
154 storage_service->commit(wsrep::ws_handle(transaction_id, 0),
155 wsrep::ws_meta());
156 Wsrep_server_state::instance().server_service()
157 .release_storage_service(storage_service);
158 wsrep_store_threadvars(thd);
159 }
160
wsrep_rollback_high_priority(THD * thd,THD * rollbacker)161 static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker)
162 {
163 WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)",
164 thd->thread_id, thd->real_id);
165 char* orig_thread_stack= thd->thread_stack;
166 thd->thread_stack= rollbacker->thread_stack;
167 DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
168 /* Must be streaming and must have been removed from the
169 server state streaming appliers map. */
170 DBUG_ASSERT(thd->wsrep_trx().is_streaming());
171 DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
172 thd->wsrep_trx().server_id(),
173 thd->wsrep_trx().id()));
174 DBUG_ASSERT(thd->wsrep_applier_service);
175
176 /* Fragment removal should happen before rollback to make
177 the transaction non-observable in SR table after the rollback
178 completes. For correctness the order does not matter here,
179 but currently it is mandated by checks in some MTR tests. */
180 wsrep_remove_streaming_fragments(thd, "high priority");
181 thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
182 wsrep::ws_meta());
183 thd->wsrep_applier_service->after_apply();
184 thd->thread_stack= orig_thread_stack;
185 WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
186 thd->thread_id, thd->real_id);
187 /* Will free THD */
188 Wsrep_server_state::instance().server_service()
189 .release_high_priority_service(thd->wsrep_applier_service);
190 }
191
wsrep_rollback_local(THD * thd,THD * rollbacker)192 static void wsrep_rollback_local(THD *thd, THD *rollbacker)
193 {
194 WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)",
195 thd->thread_id, thd->real_id);
196 char* orig_thread_stack= thd->thread_stack;
197 thd->thread_stack= rollbacker->thread_stack;
198 if (thd->wsrep_trx().is_streaming())
199 {
200 wsrep_remove_streaming_fragments(thd, "local");
201 }
202 /* Set thd->event_scheduler.data temporarily to NULL to avoid
203 callbacks to threadpool wait_begin() during rollback. */
204 auto saved_esd= thd->event_scheduler.data;
205 thd->event_scheduler.data= 0;
206 mysql_mutex_lock(&thd->LOCK_thd_data);
207 /* prepare THD for rollback processing */
208 thd->reset_for_next_command();
209 thd->lex->sql_command= SQLCOM_ROLLBACK;
210 mysql_mutex_unlock(&thd->LOCK_thd_data);
211 /* Perform a client rollback, restore globals and signal
212 the victim only when all the resources have been
213 released */
214 thd->wsrep_cs().client_service().bf_rollback();
215 wsrep_reset_threadvars(thd);
216 /* Assign saved event_scheduler.data back before letting
217 client to continue. */
218 thd->event_scheduler.data= saved_esd;
219 thd->thread_stack= orig_thread_stack;
220 thd->wsrep_cs().sync_rollback_complete();
221 WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
222 thd->thread_id, thd->real_id);
223 }
224
wsrep_rollback_process(THD * rollbacker,void * arg)225 static void wsrep_rollback_process(THD *rollbacker,
226 void *arg __attribute__((unused)))
227 {
228 DBUG_ENTER("wsrep_rollback_process");
229
230 THD* thd= NULL;
231 DBUG_ASSERT(!wsrep_rollback_queue);
232 wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker);
233 WSREP_INFO("Starting rollbacker thread %llu", rollbacker->thread_id);
234
235 thd_proc_info(rollbacker, "wsrep aborter idle");
236 while ((thd= wsrep_rollback_queue->pop_front()) != NULL)
237 {
238 mysql_mutex_lock(&thd->LOCK_thd_data);
239 wsrep::client_state& cs(thd->wsrep_cs());
240 const wsrep::transaction& tx(cs.transaction());
241 if (tx.state() == wsrep::transaction::s_aborted)
242 {
243 WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
244 (long long)thd->real_id,
245 tx.state());
246 mysql_mutex_unlock(&thd->LOCK_thd_data);
247 continue;
248 }
249 mysql_mutex_unlock(&thd->LOCK_thd_data);
250
251 wsrep_reset_threadvars(rollbacker);
252 wsrep_store_threadvars(thd);
253 thd->wsrep_cs().acquire_ownership();
254
255 thd_proc_info(rollbacker, "wsrep aborter active");
256
257 /* Rollback methods below may free thd pointer. Do not try
258 to access it after method returns. */
259 if (wsrep_thd_is_applying(thd))
260 {
261 wsrep_rollback_high_priority(thd, rollbacker);
262 }
263 else
264 {
265 wsrep_rollback_local(thd, rollbacker);
266 }
267 wsrep_store_threadvars(rollbacker);
268 thd_proc_info(rollbacker, "wsrep aborter idle");
269 }
270
271 delete wsrep_rollback_queue;
272 wsrep_rollback_queue= NULL;
273
274 WSREP_INFO("rollbacker thread exiting %llu", rollbacker->thread_id);
275
276 DBUG_ASSERT(rollbacker->killed != NOT_KILLED);
277 DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
278 DBUG_VOID_RETURN;
279 }
280
wsrep_create_rollbacker()281 void wsrep_create_rollbacker()
282 {
283 DBUG_ASSERT(wsrep_cluster_address[0]);
284 Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_rollback_process,
285 WSREP_ROLLBACKER_THREAD,
286 pthread_self()));
287
288 /* create rollbacker */
289 if (create_wsrep_THD(args, false))
290 WSREP_WARN("Can't create thread to manage wsrep rollback");
291 }
292
293 /*
294 Start async rollback process
295
296 Asserts thd->LOCK_thd_data ownership
297 */
wsrep_fire_rollbacker(THD * thd)298 void wsrep_fire_rollbacker(THD *thd)
299 {
300 DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
301 DBUG_PRINT("wsrep",("enqueuing trx abort for %llu", thd->thread_id));
302 WSREP_DEBUG("enqueuing trx abort for (%llu)", thd->thread_id);
303 if (wsrep_rollback_queue->push_back(thd))
304 {
305 WSREP_WARN("duplicate thd %llu for rollbacker",
306 thd->thread_id);
307 }
308 }
309
310
wsrep_abort_thd(THD * bf_thd_ptr,THD * victim_thd_ptr,my_bool signal)311 int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal)
312 {
313 DBUG_ENTER("wsrep_abort_thd");
314 THD *victim_thd= (THD *) victim_thd_ptr;
315 THD *bf_thd= (THD *) bf_thd_ptr;
316
317 mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
318 mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
319
320 /* Note that when you use RSU node is desynced from cluster, thus WSREP(thd)
321 might not be true.
322 */
323 if ((WSREP(bf_thd) ||
324 ((WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
325 wsrep_thd_is_toi(bf_thd))) &&
326 victim_thd &&
327 !wsrep_thd_is_aborting(victim_thd))
328 {
329 WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
330 (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
331 ha_abort_transaction(bf_thd, victim_thd, signal);
332 }
333 else
334 {
335 WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
336 wsrep_thd_UNLOCK(victim_thd);
337 }
338
339 DBUG_RETURN(1);
340 }
341
wsrep_bf_abort(THD * bf_thd,THD * victim_thd)342 bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd)
343 {
344 WSREP_LOG_THD(bf_thd, "BF aborter before");
345 WSREP_LOG_THD(victim_thd, "victim before");
346
347 mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
348 mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
349
350 DBUG_EXECUTE_IF("sync.wsrep_bf_abort",
351 {
352 const char act[]=
353 "now "
354 "SIGNAL sync.wsrep_bf_abort_reached "
355 "WAIT_FOR signal.wsrep_bf_abort";
356 DBUG_ASSERT(!debug_sync_set_action(bf_thd,
357 STRING_WITH_LEN(act)));
358 };);
359
360 if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active())
361 {
362 WSREP_DEBUG("wsrep_bf_abort, BF abort for non active transaction");
363 switch (victim_thd->wsrep_trx().state())
364 {
365 case wsrep::transaction::s_aborting: /* fall through */
366 case wsrep::transaction::s_aborted:
367 WSREP_DEBUG("victim thd is already aborted or in aborting state.");
368 return false;
369 default:
370 break;
371 }
372 /* Test: galera_create_table_as_select. Here we enter wsrep-lib
373 were LOCK_thd_data will be acquired, thus we need to release it.
374 However, we can still hold LOCK_thd_kill to protect from
375 disconnect or delete. */
376 mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
377 wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id());
378 mysql_mutex_lock(&victim_thd->LOCK_thd_data);
379 }
380
381 bool ret;
382 wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno());
383
384 if (wsrep_thd_is_toi(bf_thd))
385 {
386 /* Here we enter wsrep-lib were LOCK_thd_data will be acquired,
387 thus we need to release it. However, we can still hold
388 LOCK_thd_kill to protect from disconnect or delete. */
389 mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
390 ret= victim_thd->wsrep_cs().total_order_bf_abort(bf_seqno);
391 mysql_mutex_lock(&victim_thd->LOCK_thd_data);
392 }
393 else
394 {
395 /* Test: mysql-wsrep-features#165. Here we enter wsrep-lib
396 were LOCK_thd_data will be acquired and later LOCK_thd_kill
397 thus we need to release them. */
398 wsrep_thd_UNLOCK(victim_thd);
399 ret= victim_thd->wsrep_cs().bf_abort(bf_seqno);
400 wsrep_thd_LOCK(victim_thd);
401 }
402 if (ret)
403 {
404 wsrep_bf_aborts_counter++;
405 }
406 return ret;
407 }
408
wsrep_create_threadvars()409 int wsrep_create_threadvars()
410 {
411 int ret= 0;
412 if (thread_handling == SCHEDULER_TYPES_COUNT)
413 {
414 /* Caller should have called wsrep_reset_threadvars() before this
415 method. */
416 DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys));
417 pthread_setspecific(THR_KEY_mysys, 0);
418 ret= my_thread_init();
419 }
420 return ret;
421 }
422
wsrep_delete_threadvars()423 void wsrep_delete_threadvars()
424 {
425 if (thread_handling == SCHEDULER_TYPES_COUNT)
426 {
427 /* The caller should have called wsrep_store_threadvars() before
428 this method. */
429 DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys));
430 /* Reset psi state to avoid deallocating applier thread
431 psi_thread. */
432 #ifdef HAVE_PSI_INTERFACE
433 PSI_thread *psi_thread= PSI_CALL_get_thread();
434 if (PSI_server)
435 {
436 PSI_server->set_thread(0);
437 }
438 #endif /* HAVE_PSI_INTERFACE */
439 my_thread_end();
440 PSI_CALL_set_thread(psi_thread);
441 pthread_setspecific(THR_KEY_mysys, 0);
442 }
443 }
444
wsrep_assign_from_threadvars(THD * thd)445 void wsrep_assign_from_threadvars(THD *thd)
446 {
447 if (thread_handling == SCHEDULER_TYPES_COUNT)
448 {
449 st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
450 DBUG_ASSERT(mysys_var);
451 thd->set_mysys_var(mysys_var);
452 }
453 }
454
wsrep_save_threadvars()455 Wsrep_threadvars wsrep_save_threadvars()
456 {
457 return Wsrep_threadvars{
458 current_thd,
459 (st_my_thread_var*) pthread_getspecific(THR_KEY_mysys)
460 };
461 }
462
wsrep_restore_threadvars(const Wsrep_threadvars & globals)463 void wsrep_restore_threadvars(const Wsrep_threadvars& globals)
464 {
465 set_current_thd(globals.cur_thd);
466 pthread_setspecific(THR_KEY_mysys, globals.mysys_var);
467 }
468
wsrep_store_threadvars(THD * thd)469 void wsrep_store_threadvars(THD *thd)
470 {
471 if (thread_handling == SCHEDULER_TYPES_COUNT)
472 {
473 pthread_setspecific(THR_KEY_mysys, thd->mysys_var);
474 }
475 thd->store_globals();
476 }
477
wsrep_reset_threadvars(THD * thd)478 void wsrep_reset_threadvars(THD *thd)
479 {
480 if (thread_handling == SCHEDULER_TYPES_COUNT)
481 {
482 pthread_setspecific(THR_KEY_mysys, 0);
483 }
484 else
485 {
486 thd->reset_globals();
487 }
488 }
489