1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <list>
24
25 #include <assert.h>
26 #include <errno.h>
27 #include <mysql/group_replication_priv.h>
28 #include <signal.h>
29 #include <time.h>
30
31 #include <mutex_lock.h>
32 #include <mysql/components/services/log_builtins.h>
33 #include "my_byteorder.h"
34 #include "my_dbug.h"
35 #include "my_systime.h"
36 #include "plugin/group_replication/include/applier.h"
37 #include "plugin/group_replication/include/leave_group_on_failure.h"
38 #include "plugin/group_replication/include/plugin.h"
39 #include "plugin/group_replication/include/plugin_messages/single_primary_message.h"
40 #include "plugin/group_replication/include/plugin_server_include.h"
41 #include "plugin/group_replication/include/services/notification/notification.h"
42 #include "plugin/group_replication/libmysqlgcs/include/mysql/gcs/gcs_member_identifier.h"
43 #include "sql/protocol_classic.h"
44
45 char applier_module_channel_name[] = "group_replication_applier";
46 bool applier_thread_is_exiting = false;
47
launch_handler_thread(void * arg)48 static void *launch_handler_thread(void *arg) {
49 Applier_module *handler = (Applier_module *)arg;
50 handler->applier_thread_handle();
51 return nullptr;
52 }
53
Applier_module()54 Applier_module::Applier_module()
55 : applier_thd_state(),
56 applier_aborted(false),
57 applier_error(0),
58 suspended(false),
59 waiting_for_applier_suspension(false),
60 shared_stop_write_lock(nullptr),
61 incoming(nullptr),
62 pipeline(nullptr),
63 stop_wait_timeout(LONG_TIMEOUT),
64 applier_channel_observer(nullptr) {
65 mysql_mutex_init(key_GR_LOCK_applier_module_run, &run_lock,
66 MY_MUTEX_INIT_FAST);
67 mysql_cond_init(key_GR_COND_applier_module_run, &run_cond);
68 mysql_mutex_init(key_GR_LOCK_applier_module_suspend, &suspend_lock,
69 MY_MUTEX_INIT_FAST);
70 mysql_cond_init(key_GR_COND_applier_module_suspend, &suspend_cond);
71 mysql_cond_init(key_GR_COND_applier_module_wait,
72 &suspension_waiting_condition);
73 }
74
~Applier_module()75 Applier_module::~Applier_module() {
76 if (this->incoming) {
77 while (!this->incoming->empty()) {
78 Packet *packet = nullptr;
79 this->incoming->pop(&packet);
80 delete packet;
81 }
82 delete incoming;
83 }
84 delete applier_channel_observer;
85
86 mysql_mutex_destroy(&run_lock);
87 mysql_cond_destroy(&run_cond);
88 mysql_mutex_destroy(&suspend_lock);
89 mysql_cond_destroy(&suspend_cond);
90 mysql_cond_destroy(&suspension_waiting_condition);
91 }
92
setup_applier_module(Handler_pipeline_type pipeline_type,bool reset_logs,ulong stop_timeout,rpl_sidno group_sidno,ulonglong gtid_assignment_block_size,Shared_writelock * shared_stop_lock)93 int Applier_module::setup_applier_module(Handler_pipeline_type pipeline_type,
94 bool reset_logs, ulong stop_timeout,
95 rpl_sidno group_sidno,
96 ulonglong gtid_assignment_block_size,
97 Shared_writelock *shared_stop_lock) {
98 DBUG_TRACE;
99
100 int error = 0;
101
102 // create the receiver queue
103 this->incoming = new Synchronized_queue<Packet *>();
104
105 stop_wait_timeout = stop_timeout;
106
107 pipeline = nullptr;
108
109 if ((error = get_pipeline(pipeline_type, &pipeline))) {
110 return error;
111 }
112
113 reset_applier_logs = reset_logs;
114 group_replication_sidno = group_sidno;
115 this->gtid_assignment_block_size = gtid_assignment_block_size;
116
117 shared_stop_write_lock = shared_stop_lock;
118
119 return error;
120 }
121
purge_applier_queue_and_restart_applier_module()122 int Applier_module::purge_applier_queue_and_restart_applier_module() {
123 DBUG_TRACE;
124 int error = 0;
125
126 /*
127 Here we are stopping applier thread intentionally and we will be starting
128 the applier thread after purging the relay logs. So we should ignore any
129 errors during the stop (eg: error due to stopping the applier thread in the
130 middle of applying the group of events). Hence unregister the applier
131 channel observer temporarily till the required work is done.
132 */
133 channel_observation_manager_list
134 ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
135 ->unregister_channel_observer(applier_channel_observer);
136
137 /* Stop the applier thread */
138 Pipeline_action *stop_action = new Handler_stop_action();
139 error = pipeline->handle_action(stop_action);
140 delete stop_action;
141 if (error) return error; /* purecov: inspected */
142
143 /* Purge the relay logs and initialize the channel*/
144 Handler_applier_configuration_action *applier_conf_action =
145 new Handler_applier_configuration_action(
146 applier_module_channel_name, true, /* purge relay logs always*/
147 stop_wait_timeout, group_replication_sidno);
148
149 error = pipeline->handle_action(applier_conf_action);
150 delete applier_conf_action;
151 if (error) return error; /* purecov: inspected */
152
153 channel_observation_manager_list
154 ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
155 ->register_channel_observer(applier_channel_observer);
156
157 /* Start the applier thread */
158 Pipeline_action *start_action = new Handler_start_action();
159 error = pipeline->handle_action(start_action);
160 delete start_action;
161
162 return error;
163 }
164
setup_pipeline_handlers()165 int Applier_module::setup_pipeline_handlers() {
166 DBUG_TRACE;
167
168 int error = 0;
169
170 // Configure the applier handler trough a configuration action
171 Handler_applier_configuration_action *applier_conf_action =
172 new Handler_applier_configuration_action(
173 applier_module_channel_name, reset_applier_logs, stop_wait_timeout,
174 group_replication_sidno);
175
176 error = pipeline->handle_action(applier_conf_action);
177 delete applier_conf_action;
178 if (error) return error; /* purecov: inspected */
179
180 Handler_certifier_configuration_action *cert_conf_action =
181 new Handler_certifier_configuration_action(group_replication_sidno,
182 gtid_assignment_block_size);
183
184 error = pipeline->handle_action(cert_conf_action);
185
186 delete cert_conf_action;
187
188 return error;
189 }
190
set_applier_thread_context()191 void Applier_module::set_applier_thread_context() {
192 THD *thd = new THD;
193 my_thread_init();
194 thd->set_new_thread_id();
195 thd->thread_stack = (char *)&thd;
196 thd->store_globals();
197 // Protocol is only initiated because of process list status
198 thd->get_protocol_classic()->init_net(nullptr);
199 /*
200 We only set the thread type so the applier thread shows up
201 in the process list.
202 */
203 thd->system_thread = SYSTEM_THREAD_SLAVE_IO;
204 // Make the thread have a better description on process list
205 thd->set_query(STRING_WITH_LEN("Group replication applier module"));
206 thd->set_query_for_display(
207 STRING_WITH_LEN("Group replication applier module"));
208
209 // Needed to start replication threads
210 thd->security_context()->skip_grants();
211
212 global_thd_manager_add_thd(thd);
213
214 DBUG_EXECUTE_IF("group_replication_applier_thread_init_wait", {
215 const char act[] = "now wait_for signal.gr_applier_init_signal";
216 DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
217 });
218
219 applier_thd = thd;
220 }
221
clean_applier_thread_context()222 void Applier_module::clean_applier_thread_context() {
223 applier_thd->get_protocol_classic()->end_net();
224 applier_thd->release_resources();
225 global_thd_manager_remove_thd(applier_thd);
226 }
227
inject_event_into_pipeline(Pipeline_event * pevent,Continuation * cont)228 int Applier_module::inject_event_into_pipeline(Pipeline_event *pevent,
229 Continuation *cont) {
230 int error = 0;
231 pipeline->handle_event(pevent, cont);
232
233 if ((error = cont->wait()))
234 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_EVENT_HANDLING_ERROR, error);
235
236 return error;
237 }
238
apply_action_packet(Action_packet * action_packet)239 bool Applier_module::apply_action_packet(Action_packet *action_packet) {
240 enum_packet_action action = action_packet->packet_action;
241
242 // packet used to break the queue blocking wait
243 if (action == TERMINATION_PACKET) {
244 return true;
245 }
246 // packet to signal the applier to suspend
247 if (action == SUSPENSION_PACKET) {
248 suspend_applier_module();
249 return false;
250 }
251
252 if (action == CHECKPOINT_PACKET) {
253 Queue_checkpoint_packet *packet = (Queue_checkpoint_packet *)action_packet;
254 packet->signal_checkpoint_reached();
255 return false;
256 }
257
258 return false; /* purecov: inspected */
259 }
260
apply_view_change_packet(View_change_packet * view_change_packet,Format_description_log_event * fde_evt,Continuation * cont)261 int Applier_module::apply_view_change_packet(
262 View_change_packet *view_change_packet,
263 Format_description_log_event *fde_evt, Continuation *cont) {
264 int error = 0;
265
266 Gtid_set *group_executed_set = nullptr;
267 Sid_map *sid_map = nullptr;
268 if (!view_change_packet->group_executed_set.empty()) {
269 sid_map = new Sid_map(nullptr);
270 group_executed_set = new Gtid_set(sid_map, nullptr);
271 if (intersect_group_executed_sets(view_change_packet->group_executed_set,
272 group_executed_set)) {
273 LogPluginErr(
274 WARNING_LEVEL,
275 ER_GRP_RPL_ERROR_GTID_EXECUTION_INFO); /* purecov: inspected */
276 delete sid_map; /* purecov: inspected */
277 delete group_executed_set; /* purecov: inspected */
278 group_executed_set = nullptr; /* purecov: inspected */
279 }
280 }
281
282 if (group_executed_set != nullptr) {
283 if (get_certification_handler()
284 ->get_certifier()
285 ->set_group_stable_transactions_set(group_executed_set)) {
286 LogPluginErr(WARNING_LEVEL,
287 ER_GRP_RPL_CERTIFICATE_SIZE_ERROR); /* purecov: inspected */
288 }
289 delete sid_map;
290 delete group_executed_set;
291 }
292
293 View_change_log_event *view_change_event =
294 new View_change_log_event(view_change_packet->view_id.c_str());
295
296 Pipeline_event *pevent = new Pipeline_event(view_change_event, fde_evt);
297 pevent->mark_event(SINGLE_VIEW_EVENT);
298
299 /*
300 If there are prepared consistent transactions waiting for the
301 prepare acknowledge, the View_change_log_event must be delayed
302 to after those transactions are committed, since they belong to
303 the previous view.
304 */
305 if (transaction_consistency_manager->has_local_prepared_transactions()) {
306 DBUG_PRINT("info", ("Delaying the log of the view '%s' to after local "
307 "prepared transactions",
308 view_change_packet->view_id.c_str()));
309 transaction_consistency_manager->schedule_view_change_event(pevent);
310 return error;
311 }
312
313 error = inject_event_into_pipeline(pevent, cont);
314 if (!cont->is_transaction_discarded()) delete pevent;
315
316 return error;
317 }
318
apply_data_packet(Data_packet * data_packet,Format_description_log_event * fde_evt,Continuation * cont)319 int Applier_module::apply_data_packet(Data_packet *data_packet,
320 Format_description_log_event *fde_evt,
321 Continuation *cont) {
322 int error = 0;
323 uchar *payload = data_packet->payload;
324 uchar *payload_end = data_packet->payload + data_packet->len;
325
326 DBUG_EXECUTE_IF("group_replication_before_apply_data_packet", {
327 const char act[] = "now wait_for continue_apply";
328 DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
329 });
330
331 while ((payload != payload_end) && !error) {
332 uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);
333
334 Data_packet *new_packet = new Data_packet(payload, event_len);
335 payload = payload + event_len;
336
337 std::list<Gcs_member_identifier> *online_members = nullptr;
338 if (nullptr != data_packet->m_online_members) {
339 online_members =
340 new std::list<Gcs_member_identifier>(*data_packet->m_online_members);
341 }
342
343 Pipeline_event *pevent =
344 new Pipeline_event(new_packet, fde_evt, UNDEFINED_EVENT_MODIFIER,
345 data_packet->m_consistency_level, online_members);
346 error = inject_event_into_pipeline(pevent, cont);
347
348 delete pevent;
349 DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
350 if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
351 error = 1;
352 }
353 });
354 }
355
356 return error;
357 }
358
apply_single_primary_action_packet(Single_primary_action_packet * packet)359 int Applier_module::apply_single_primary_action_packet(
360 Single_primary_action_packet *packet) {
361 int error = 0;
362 Certifier_interface *certifier = get_certification_handler()->get_certifier();
363
364 switch (packet->action) {
365 case Single_primary_action_packet::NEW_PRIMARY:
366 certifier->enable_conflict_detection();
367 break;
368 case Single_primary_action_packet::QUEUE_APPLIED:
369 certifier->disable_conflict_detection();
370 break;
371 default:
372 DBUG_ASSERT(0); /* purecov: inspected */
373 }
374
375 return error;
376 }
377
apply_transaction_prepared_action_packet(Transaction_prepared_action_packet * packet)378 int Applier_module::apply_transaction_prepared_action_packet(
379 Transaction_prepared_action_packet *packet) {
380 return transaction_consistency_manager->handle_remote_prepare(
381 packet->get_sid(), packet->m_gno, packet->m_gcs_member_id);
382 }
383
apply_sync_before_execution_action_packet(Sync_before_execution_action_packet * packet)384 int Applier_module::apply_sync_before_execution_action_packet(
385 Sync_before_execution_action_packet *packet) {
386 return transaction_consistency_manager->handle_sync_before_execution_message(
387 packet->m_thread_id, packet->m_gcs_member_id);
388 }
389
apply_leaving_members_action_packet(Leaving_members_action_packet * packet)390 int Applier_module::apply_leaving_members_action_packet(
391 Leaving_members_action_packet *packet) {
392 return transaction_consistency_manager->handle_member_leave(
393 packet->m_leaving_members);
394 }
395
applier_thread_handle()396 int Applier_module::applier_thread_handle() {
397 DBUG_TRACE;
398
399 // set the thread context
400 set_applier_thread_context();
401 mysql_mutex_lock(&run_lock);
402 applier_thd_state.set_initialized();
403 mysql_mutex_unlock(&run_lock);
404
405 Handler_THD_setup_action *thd_conf_action = nullptr;
406 Format_description_log_event *fde_evt = nullptr;
407 Continuation *cont = nullptr;
408 Packet *packet = nullptr;
409 bool loop_termination = false;
410 int packet_application_error = 0;
411
412 applier_error = setup_pipeline_handlers();
413
414 applier_channel_observer = new Applier_channel_state_observer();
415 channel_observation_manager_list
416 ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
417 ->register_channel_observer(applier_channel_observer);
418
419 if (!applier_error) {
420 Pipeline_action *start_action = new Handler_start_action();
421 applier_error += pipeline->handle_action(start_action);
422 delete start_action;
423 }
424
425 if (applier_error) {
426 goto end;
427 }
428
429 mysql_mutex_lock(&run_lock);
430 applier_thread_is_exiting = false;
431 applier_thd_state.set_running();
432 if (stage_handler.initialize_stage_monitor())
433 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NO_STAGE_SERVICE);
434 stage_handler.set_stage(info_GR_STAGE_module_executing.m_key, __FILE__,
435 __LINE__, 0, 0);
436 mysql_cond_broadcast(&run_cond);
437 mysql_mutex_unlock(&run_lock);
438
439 fde_evt = new Format_description_log_event();
440 /*
441 Group replication does not use binary log checksumming on contents arriving
442 from certification. So, group replication applier channel shall behave as a
443 replication channel connected to a master that does not add checksum to its
444 binary log files.
445 */
446 fde_evt->common_footer->checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
447 cont = new Continuation();
448
449 // Give the handlers access to the applier THD
450 thd_conf_action = new Handler_THD_setup_action(applier_thd);
451 // To prevent overwrite last error method
452 applier_error += pipeline->handle_action(thd_conf_action);
453 delete thd_conf_action;
454
455 // applier main loop
456 while (!applier_error && !packet_application_error && !loop_termination) {
457 if (is_applier_thread_aborted()) break;
458
459 this->incoming->front(&packet); // blocking
460
461 switch (packet->get_packet_type()) {
462 case ACTION_PACKET_TYPE:
463 this->incoming->pop();
464 loop_termination = apply_action_packet((Action_packet *)packet);
465 break;
466 case VIEW_CHANGE_PACKET_TYPE:
467 packet_application_error = apply_view_change_packet(
468 (View_change_packet *)packet, fde_evt, cont);
469 this->incoming->pop();
470 break;
471 case DATA_PACKET_TYPE:
472 packet_application_error =
473 apply_data_packet((Data_packet *)packet, fde_evt, cont);
474 // Remove from queue here, so the size only decreases after packet
475 // handling
476 this->incoming->pop();
477 break;
478 case SINGLE_PRIMARY_PACKET_TYPE:
479 packet_application_error = apply_single_primary_action_packet(
480 (Single_primary_action_packet *)packet);
481 this->incoming->pop();
482 break;
483 case TRANSACTION_PREPARED_PACKET_TYPE:
484 packet_application_error = apply_transaction_prepared_action_packet(
485 static_cast<Transaction_prepared_action_packet *>(packet));
486 this->incoming->pop();
487 break;
488 case SYNC_BEFORE_EXECUTION_PACKET_TYPE:
489 packet_application_error = apply_sync_before_execution_action_packet(
490 static_cast<Sync_before_execution_action_packet *>(packet));
491 this->incoming->pop();
492 break;
493 case LEAVING_MEMBERS_PACKET_TYPE:
494 packet_application_error = apply_leaving_members_action_packet(
495 static_cast<Leaving_members_action_packet *>(packet));
496 this->incoming->pop();
497 break;
498 default:
499 DBUG_ASSERT(0); /* purecov: inspected */
500 }
501
502 delete packet;
503 }
504 if (packet_application_error) applier_error = packet_application_error;
505 delete fde_evt;
506 delete cont;
507
508 end:
509
510 // always remove the observer even the thread is no longer running
511 channel_observation_manager_list
512 ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
513 ->unregister_channel_observer(applier_channel_observer);
514
515 // only try to leave if the applier managed to start
516 if (applier_error && applier_thd_state.is_running()) {
517 const char *exit_state_action_abort_log_message =
518 "Fatal error during execution on the Applier module of Group "
519 "Replication.";
520 leave_group_on_failure::mask leave_actions;
521 /*
522 Only follow exit_state_action if we were already inside a group. We may
523 happen to come across an applier error during the startup of GR (i.e.
524 during the execution of the START GROUP_REPLICATION command). We must not
525 follow exit_state_action on that situation.
526 */
527 leave_actions.set(leave_group_on_failure::HANDLE_EXIT_STATE_ACTION,
528 gcs_module->belongs_to_group());
529 leave_group_on_failure::leave(
530 leave_actions, ER_GRP_RPL_APPLIER_EXECUTION_FATAL_ERROR,
531 PSESSION_USE_THREAD, nullptr, exit_state_action_abort_log_message);
532 }
533
534 // Even on error cases, send a stop signal to all handlers that could be
535 // active
536 Pipeline_action *stop_action = new Handler_stop_action();
537 int local_applier_error = pipeline->handle_action(stop_action);
538 delete stop_action;
539
540 Gcs_interface_factory::cleanup_thread_communication_resources(
541 Gcs_operations::get_gcs_engine());
542
543 LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_APPLIER_THD_KILLED);
544
545 DBUG_EXECUTE_IF("applier_thd_timeout", {
546 const char act[] = "now wait_for signal.applier_continue";
547 DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
548 });
549
550 stage_handler.end_stage();
551 stage_handler.terminate_stage_monitor();
552
553 clean_applier_thread_context();
554
555 mysql_mutex_lock(&run_lock);
556
557 /*
558 Don't overwrite applier_error when stop_applier_thread() doesn't return
559 error. So applier_error which is also referred by main thread
560 doesn't return true from initialize_applier_thread() when
561 start_applier_thread() fails and stop_applier_thread() succeeds.
562 Also use local var - local_applier_error, as the applier can be deleted
563 before the thread returns.
564 */
565 if (local_applier_error)
566 applier_error = local_applier_error; /* purecov: inspected */
567 else
568 local_applier_error = applier_error;
569
570 applier_killed_status = false;
571 delete applier_thd;
572 applier_thd_state.set_terminated();
573 mysql_cond_broadcast(&run_cond);
574 mysql_mutex_unlock(&run_lock);
575
576 my_thread_end();
577 applier_thread_is_exiting = true;
578 my_thread_exit(nullptr);
579
580 return local_applier_error; /* purecov: inspected */
581 }
582
initialize_applier_thread()583 int Applier_module::initialize_applier_thread() {
584 DBUG_TRACE;
585
586 // avoid concurrency calls against stop invocations
587 mysql_mutex_lock(&run_lock);
588
589 applier_thread_is_exiting = false;
590 applier_killed_status = false;
591 applier_error = 0;
592
593 applier_thd_state.set_created();
594 if ((mysql_thread_create(key_GR_THD_applier_module_receiver, &applier_pthd,
595 get_connection_attrib(), launch_handler_thread,
596 (void *)this))) {
597 applier_thd_state.set_terminated();
598 mysql_mutex_unlock(&run_lock); /* purecov: inspected */
599 return 1; /* purecov: inspected */
600 }
601
602 while (applier_thd_state.is_alive_not_running() && !applier_error) {
603 DBUG_PRINT("sleep", ("Waiting for applier thread to start"));
604 if (current_thd != nullptr && current_thd->is_killed()) {
605 applier_error = 1;
606 applier_killed_status = true;
607 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_UNBLOCK_WAITING_THD);
608 break;
609 }
610
611 struct timespec abstime;
612 set_timespec(&abstime, 1);
613
614 mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
615 }
616
617 mysql_mutex_unlock(&run_lock);
618 return applier_error;
619 }
620
terminate_applier_pipeline()621 int Applier_module::terminate_applier_pipeline() {
622 int error = 0;
623 if (pipeline != nullptr) {
624 if ((error = pipeline->terminate_pipeline())) {
625 LogPluginErr(
626 WARNING_LEVEL,
627 ER_GRP_RPL_APPLIER_PIPELINE_NOT_DISPOSED); /* purecov: inspected */
628 }
629 // delete anyway, as we can't do much on error cases
630 delete pipeline;
631 pipeline = nullptr;
632 }
633 return error;
634 }
635
terminate_applier_thread()636 int Applier_module::terminate_applier_thread() {
637 DBUG_TRACE;
638
639 /* This lock code needs to be re-written from scratch*/
640 mysql_mutex_lock(&run_lock);
641
642 applier_aborted = true;
643
644 if (applier_thd_state.is_thread_dead()) {
645 goto delete_pipeline;
646 }
647
648 while (applier_thd_state.is_thread_alive()) {
649 DBUG_PRINT("loop", ("killing group replication applier thread"));
650
651 if (applier_thd_state.is_initialized()) {
652 mysql_mutex_lock(&applier_thd->LOCK_thd_data);
653
654 if (applier_killed_status)
655 applier_thd->awake(THD::KILL_CONNECTION);
656 else
657 applier_thd->awake(THD::NOT_KILLED);
658
659 mysql_mutex_unlock(&applier_thd->LOCK_thd_data);
660
661 // before waiting for termination, signal the queue to unlock.
662 add_termination_packet();
663
664 // also awake the applier in case it is suspended
665 awake_applier_module();
666 }
667
668 /*
669 There is a small chance that thread might miss the first
670 alarm. To protect against it, resend the signal until it reacts
671 */
672 struct timespec abstime;
673 set_timespec(&abstime, (stop_wait_timeout == 1 ? 1 : 2));
674 #ifndef DBUG_OFF
675 int error =
676 #endif
677 mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
678
679 if (stop_wait_timeout >= 1) {
680 stop_wait_timeout = stop_wait_timeout - (stop_wait_timeout == 1 ? 1 : 2);
681 }
682 if (applier_thd_state.is_thread_alive() &&
683 stop_wait_timeout <= 0) // quit waiting
684 {
685 mysql_mutex_unlock(&run_lock);
686 return 1;
687 }
688 DBUG_ASSERT(error == ETIMEDOUT || error == 0);
689 }
690
691 DBUG_ASSERT(!applier_thd_state.is_running());
692
693 delete_pipeline:
694
695 // The thread ended properly so we can terminate the pipeline
696 terminate_applier_pipeline();
697
698 while (!applier_thread_is_exiting) {
699 /* Check if applier thread is exiting per microsecond. */
700 my_sleep(1);
701 }
702
703 /*
704 Give applier thread one microsecond to exit completely after
705 it set applier_thread_is_exiting to true.
706 */
707 my_sleep(1);
708
709 mysql_mutex_unlock(&run_lock);
710
711 return 0;
712 }
713
inform_of_applier_stop(char * channel_name,bool aborted)714 void Applier_module::inform_of_applier_stop(char *channel_name, bool aborted) {
715 DBUG_TRACE;
716
717 if (!strcmp(channel_name, applier_module_channel_name) && aborted &&
718 applier_thd_state.is_thread_alive()) {
719 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_APPLIER_THD_EXECUTION_ABORTED);
720
721 applier_error = 1;
722
723 // before waiting for termination, signal the queue to unlock.
724 add_termination_packet();
725
726 // also awake the applier in case it is suspended
727 awake_applier_module();
728 }
729 }
730
wait_for_applier_complete_suspension(bool * abort_flag,bool wait_for_execution)731 int Applier_module::wait_for_applier_complete_suspension(
732 bool *abort_flag, bool wait_for_execution) {
733 int error = 0;
734
735 mysql_mutex_lock(&suspend_lock);
736
737 /*
738 We use an external flag to avoid race conditions.
739 A local flag could always lead to the scenario of
740 wait_for_applier_complete_suspension()
741
742 >> thread switch
743
744 break_applier_suspension_wait()
745 we_are_waiting = false;
746 awake
747
748 thread switch <<
749
750 we_are_waiting = true;
751 wait();
752 */
753 while (!suspended && !(*abort_flag) && !applier_aborted && !applier_error) {
754 mysql_cond_wait(&suspension_waiting_condition, &suspend_lock);
755 }
756
757 mysql_mutex_unlock(&suspend_lock);
758
759 if (applier_aborted || applier_error)
760 return APPLIER_THREAD_ABORTED; /* purecov: inspected */
761
762 /**
763 Wait for the applier execution of pre suspension events (blocking method)
764 while(the wait method times out)
765 wait()
766 */
767 if (wait_for_execution) {
768 error = APPLIER_GTID_CHECK_TIMEOUT_ERROR; // timeout error
769 while (error == APPLIER_GTID_CHECK_TIMEOUT_ERROR && !(*abort_flag))
770 error = wait_for_applier_event_execution(1, true); // blocking
771 }
772
773 return (error == APPLIER_RELAY_LOG_NOT_INITED);
774 }
775
interrupt_applier_suspension_wait()776 void Applier_module::interrupt_applier_suspension_wait() {
777 mysql_mutex_lock(&suspend_lock);
778 mysql_cond_broadcast(&suspension_waiting_condition);
779 mysql_mutex_unlock(&suspend_lock);
780 }
781
is_applier_thread_waiting()782 bool Applier_module::is_applier_thread_waiting() {
783 DBUG_TRACE;
784 Event_handler *event_applier = nullptr;
785 Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
786
787 if (event_applier == nullptr) return false; /* purecov: inspected */
788
789 bool result = ((Applier_handler *)event_applier)->is_applier_thread_waiting();
790
791 return result;
792 }
793
wait_for_applier_event_execution(double timeout,bool check_and_purge_partial_transactions)794 int Applier_module::wait_for_applier_event_execution(
795 double timeout, bool check_and_purge_partial_transactions) {
796 DBUG_TRACE;
797 int error = 0;
798 Event_handler *event_applier = nullptr;
799 Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
800
801 if (event_applier && !(error = ((Applier_handler *)event_applier)
802 ->wait_for_gtid_execution(timeout))) {
803 /*
804 After applier thread is done, check if there is partial transaction
805 in the relay log. If so, applier thread must be holding the lock on it
806 and will never release it because there will not be any more events
807 coming into this channel. In this case, purge the relaylogs and restart
808 the applier thread will release the lock and update the applier thread
809 execution position correctly and safely.
810 */
811 if (check_and_purge_partial_transactions &&
812 ((Applier_handler *)event_applier)
813 ->is_partial_transaction_on_relay_log()) {
814 error = purge_applier_queue_and_restart_applier_module();
815 }
816 }
817 return error;
818 }
819
get_retrieved_gtid_set(std::string & retrieved_set)820 bool Applier_module::get_retrieved_gtid_set(std::string &retrieved_set) {
821 Replication_thread_api applier_channel(applier_module_channel_name);
822 if (applier_channel.get_retrieved_gtid_set(retrieved_set)) {
823 /* purecov: begin inspected */
824 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_ERROR_GTID_SET_EXTRACTION,
825 " cannot extract the applier module's retrieved set.");
826 return true;
827 /* purecov: end */
828 }
829 return false;
830 }
831
wait_for_applier_event_execution(std::string & retrieved_set,double timeout,bool update_THD_status)832 int Applier_module::wait_for_applier_event_execution(std::string &retrieved_set,
833 double timeout,
834 bool update_THD_status) {
835 DBUG_TRACE;
836 int error = 0;
837 Event_handler *event_applier = nullptr;
838 Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
839
840 if (event_applier) {
841 error = ((Applier_handler *)event_applier)
842 ->wait_for_gtid_execution(retrieved_set, timeout,
843 update_THD_status);
844 }
845
846 return error;
847 }
848
wait_for_current_events_execution(std::shared_ptr<Continuation> checkpoint_condition,bool * abort_flag,bool update_THD_status)849 bool Applier_module::wait_for_current_events_execution(
850 std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
851 bool update_THD_status) {
852 DBUG_TRACE;
853 applier_module->queue_and_wait_on_queue_checkpoint(checkpoint_condition);
854 std::string current_retrieve_set;
855 if (applier_module->get_retrieved_gtid_set(current_retrieve_set)) return true;
856
857 int error = 1;
858 while (!*abort_flag && error != 0) {
859 error = applier_module->wait_for_applier_event_execution(
860 current_retrieve_set, 1, update_THD_status);
861
862 /* purecov: begin inspected */
863 if (error == -2) { // error when waiting
864 return true;
865 }
866 /* purecov: end */
867 }
868 return false;
869 }
870
get_certification_handler()871 Certification_handler *Applier_module::get_certification_handler() {
872 Event_handler *event_applier = nullptr;
873 Event_handler::get_handler_by_role(pipeline, CERTIFIER, &event_applier);
874
875 // The only certification handler for now
876 return (Certification_handler *)event_applier;
877 }
878
intersect_group_executed_sets(std::vector<std::string> & gtid_sets,Gtid_set * output_set)879 int Applier_module::intersect_group_executed_sets(
880 std::vector<std::string> >id_sets, Gtid_set *output_set) {
881 Sid_map *sid_map = output_set->get_sid_map();
882
883 std::vector<std::string>::iterator set_iterator;
884 for (set_iterator = gtid_sets.begin(); set_iterator != gtid_sets.end();
885 set_iterator++) {
886 Gtid_set member_set(sid_map, nullptr);
887 Gtid_set intersection_result(sid_map, nullptr);
888
889 std::string exec_set_str = (*set_iterator);
890
891 if (member_set.add_gtid_text(exec_set_str.c_str()) != RETURN_STATUS_OK) {
892 return 1; /* purecov: inspected */
893 }
894
895 if (output_set->is_empty()) {
896 if (output_set->add_gtid_set(&member_set)) {
897 return 1; /* purecov: inspected */
898 }
899 } else {
900 /*
901 We have three sets:
902 member_set: the one sent from a given member;
903 output_set: the one that contains the intersection of
904 the computed sets until now;
905 intersection_result: the intersection between set and
906 intersection_result.
907 So we compute the intersection between member_set and output_set, and
908 set that value to output_set to be used on the next intersection.
909 */
910 if (member_set.intersection(output_set, &intersection_result) !=
911 RETURN_STATUS_OK) {
912 return 1; /* purecov: inspected */
913 }
914
915 output_set->clear();
916 if (output_set->add_gtid_set(&intersection_result) != RETURN_STATUS_OK) {
917 return 1; /* purecov: inspected */
918 }
919 }
920 }
921
922 #if !defined(DBUG_OFF)
923 char *executed_set_string;
924 output_set->to_string(&executed_set_string);
925 DBUG_PRINT("info", ("View change GTID information: output_set: %s",
926 executed_set_string));
927 my_free(executed_set_string);
928 #endif
929
930 return 0;
931 }
932
queue_certification_enabling_packet()933 void Applier_module::queue_certification_enabling_packet() {
934 incoming->push(new Single_primary_action_packet(
935 Single_primary_action_packet::NEW_PRIMARY));
936 }
937
queue_and_wait_on_queue_checkpoint(std::shared_ptr<Continuation> checkpoint_condition)938 bool Applier_module::queue_and_wait_on_queue_checkpoint(
939 std::shared_ptr<Continuation> checkpoint_condition) {
940 incoming->push(new Queue_checkpoint_packet(checkpoint_condition));
941 return checkpoint_condition->wait() != 0;
942 }
943
get_local_pipeline_stats()944 Pipeline_member_stats *Applier_module::get_local_pipeline_stats() {
945 // We need run_lock to get protection against STOP GR command.
946
947 MUTEX_LOCK(guard, &run_lock);
948 Pipeline_member_stats *stats = nullptr;
949 auto cert = applier_module->get_certification_handler();
950 auto cert_module = (cert ? cert->get_certifier() : nullptr);
951 if (cert_module) {
952 stats = new Pipeline_member_stats(
953 get_pipeline_stats_member_collector(), get_message_queue_size(),
954 cert_module->get_negative_certified(),
955 cert_module->get_certification_info_size());
956 {
957 char *committed_transactions_buf = nullptr;
958 size_t committed_transactions_buf_length = 0;
959 int outcome = cert_module->get_group_stable_transactions_set_string(
960 &committed_transactions_buf, &committed_transactions_buf_length);
961 if (!outcome && committed_transactions_buf_length > 0)
962 stats->set_transaction_committed_all_members(
963 committed_transactions_buf, committed_transactions_buf_length);
964 my_free(committed_transactions_buf);
965 }
966 {
967 std::string last_conflict_free_transaction;
968 cert_module->get_last_conflict_free_transaction(
969 &last_conflict_free_transaction);
970 stats->set_transaction_last_conflict_free(last_conflict_free_transaction);
971 }
972
973 } else {
974 stats = new Pipeline_member_stats(get_pipeline_stats_member_collector(),
975 get_message_queue_size(), 0, 0);
976 }
977 return stats;
978 }
979