1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <list>
24 
25 #include <assert.h>
26 #include <errno.h>
27 #include <mysql/group_replication_priv.h>
28 #include <signal.h>
29 #include <time.h>
30 
31 #include <mutex_lock.h>
32 #include <mysql/components/services/log_builtins.h>
33 #include "my_byteorder.h"
34 #include "my_dbug.h"
35 #include "my_systime.h"
36 #include "plugin/group_replication/include/applier.h"
37 #include "plugin/group_replication/include/leave_group_on_failure.h"
38 #include "plugin/group_replication/include/plugin.h"
39 #include "plugin/group_replication/include/plugin_messages/single_primary_message.h"
40 #include "plugin/group_replication/include/plugin_server_include.h"
41 #include "plugin/group_replication/include/services/notification/notification.h"
42 #include "plugin/group_replication/libmysqlgcs/include/mysql/gcs/gcs_member_identifier.h"
43 #include "sql/protocol_classic.h"
44 
45 char applier_module_channel_name[] = "group_replication_applier";
46 bool applier_thread_is_exiting = false;
47 
launch_handler_thread(void * arg)48 static void *launch_handler_thread(void *arg) {
49   Applier_module *handler = (Applier_module *)arg;
50   handler->applier_thread_handle();
51   return nullptr;
52 }
53 
Applier_module()54 Applier_module::Applier_module()
55     : applier_thd_state(),
56       applier_aborted(false),
57       applier_error(0),
58       suspended(false),
59       waiting_for_applier_suspension(false),
60       shared_stop_write_lock(nullptr),
61       incoming(nullptr),
62       pipeline(nullptr),
63       stop_wait_timeout(LONG_TIMEOUT),
64       applier_channel_observer(nullptr) {
65   mysql_mutex_init(key_GR_LOCK_applier_module_run, &run_lock,
66                    MY_MUTEX_INIT_FAST);
67   mysql_cond_init(key_GR_COND_applier_module_run, &run_cond);
68   mysql_mutex_init(key_GR_LOCK_applier_module_suspend, &suspend_lock,
69                    MY_MUTEX_INIT_FAST);
70   mysql_cond_init(key_GR_COND_applier_module_suspend, &suspend_cond);
71   mysql_cond_init(key_GR_COND_applier_module_wait,
72                   &suspension_waiting_condition);
73 }
74 
~Applier_module()75 Applier_module::~Applier_module() {
76   if (this->incoming) {
77     while (!this->incoming->empty()) {
78       Packet *packet = nullptr;
79       this->incoming->pop(&packet);
80       delete packet;
81     }
82     delete incoming;
83   }
84   delete applier_channel_observer;
85 
86   mysql_mutex_destroy(&run_lock);
87   mysql_cond_destroy(&run_cond);
88   mysql_mutex_destroy(&suspend_lock);
89   mysql_cond_destroy(&suspend_cond);
90   mysql_cond_destroy(&suspension_waiting_condition);
91 }
92 
setup_applier_module(Handler_pipeline_type pipeline_type,bool reset_logs,ulong stop_timeout,rpl_sidno group_sidno,ulonglong gtid_assignment_block_size,Shared_writelock * shared_stop_lock)93 int Applier_module::setup_applier_module(Handler_pipeline_type pipeline_type,
94                                          bool reset_logs, ulong stop_timeout,
95                                          rpl_sidno group_sidno,
96                                          ulonglong gtid_assignment_block_size,
97                                          Shared_writelock *shared_stop_lock) {
98   DBUG_TRACE;
99 
100   int error = 0;
101 
102   // create the receiver queue
103   this->incoming = new Synchronized_queue<Packet *>();
104 
105   stop_wait_timeout = stop_timeout;
106 
107   pipeline = nullptr;
108 
109   if ((error = get_pipeline(pipeline_type, &pipeline))) {
110     return error;
111   }
112 
113   reset_applier_logs = reset_logs;
114   group_replication_sidno = group_sidno;
115   this->gtid_assignment_block_size = gtid_assignment_block_size;
116 
117   shared_stop_write_lock = shared_stop_lock;
118 
119   return error;
120 }
121 
purge_applier_queue_and_restart_applier_module()122 int Applier_module::purge_applier_queue_and_restart_applier_module() {
123   DBUG_TRACE;
124   int error = 0;
125 
126   /*
127     Here we are stopping applier thread intentionally and we will be starting
128     the applier thread after purging the relay logs. So we should ignore any
129     errors during the stop (eg: error due to stopping the applier thread in the
130     middle of applying the group of events). Hence unregister the applier
131     channel observer temporarily till the required work is done.
132   */
133   channel_observation_manager_list
134       ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
135       ->unregister_channel_observer(applier_channel_observer);
136 
137   /* Stop the applier thread */
138   Pipeline_action *stop_action = new Handler_stop_action();
139   error = pipeline->handle_action(stop_action);
140   delete stop_action;
141   if (error) return error; /* purecov: inspected */
142 
143   /* Purge the relay logs and initialize the channel*/
144   Handler_applier_configuration_action *applier_conf_action =
145       new Handler_applier_configuration_action(
146           applier_module_channel_name, true, /* purge relay logs always*/
147           stop_wait_timeout, group_replication_sidno);
148 
149   error = pipeline->handle_action(applier_conf_action);
150   delete applier_conf_action;
151   if (error) return error; /* purecov: inspected */
152 
153   channel_observation_manager_list
154       ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
155       ->register_channel_observer(applier_channel_observer);
156 
157   /* Start the applier thread */
158   Pipeline_action *start_action = new Handler_start_action();
159   error = pipeline->handle_action(start_action);
160   delete start_action;
161 
162   return error;
163 }
164 
setup_pipeline_handlers()165 int Applier_module::setup_pipeline_handlers() {
166   DBUG_TRACE;
167 
168   int error = 0;
169 
170   // Configure the applier handler trough a configuration action
171   Handler_applier_configuration_action *applier_conf_action =
172       new Handler_applier_configuration_action(
173           applier_module_channel_name, reset_applier_logs, stop_wait_timeout,
174           group_replication_sidno);
175 
176   error = pipeline->handle_action(applier_conf_action);
177   delete applier_conf_action;
178   if (error) return error; /* purecov: inspected */
179 
180   Handler_certifier_configuration_action *cert_conf_action =
181       new Handler_certifier_configuration_action(group_replication_sidno,
182                                                  gtid_assignment_block_size);
183 
184   error = pipeline->handle_action(cert_conf_action);
185 
186   delete cert_conf_action;
187 
188   return error;
189 }
190 
set_applier_thread_context()191 void Applier_module::set_applier_thread_context() {
192   THD *thd = new THD;
193   my_thread_init();
194   thd->set_new_thread_id();
195   thd->thread_stack = (char *)&thd;
196   thd->store_globals();
197   // Protocol is only initiated because of process list status
198   thd->get_protocol_classic()->init_net(nullptr);
199   /*
200     We only set the thread type so the applier thread shows up
201     in the process list.
202   */
203   thd->system_thread = SYSTEM_THREAD_SLAVE_IO;
204   // Make the thread have a better description on process list
205   thd->set_query(STRING_WITH_LEN("Group replication applier module"));
206   thd->set_query_for_display(
207       STRING_WITH_LEN("Group replication applier module"));
208 
209   // Needed to start replication threads
210   thd->security_context()->skip_grants();
211 
212   global_thd_manager_add_thd(thd);
213 
214   DBUG_EXECUTE_IF("group_replication_applier_thread_init_wait", {
215     const char act[] = "now wait_for signal.gr_applier_init_signal";
216     DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
217   });
218 
219   applier_thd = thd;
220 }
221 
clean_applier_thread_context()222 void Applier_module::clean_applier_thread_context() {
223   applier_thd->get_protocol_classic()->end_net();
224   applier_thd->release_resources();
225   global_thd_manager_remove_thd(applier_thd);
226 }
227 
inject_event_into_pipeline(Pipeline_event * pevent,Continuation * cont)228 int Applier_module::inject_event_into_pipeline(Pipeline_event *pevent,
229                                                Continuation *cont) {
230   int error = 0;
231   pipeline->handle_event(pevent, cont);
232 
233   if ((error = cont->wait()))
234     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_EVENT_HANDLING_ERROR, error);
235 
236   return error;
237 }
238 
apply_action_packet(Action_packet * action_packet)239 bool Applier_module::apply_action_packet(Action_packet *action_packet) {
240   enum_packet_action action = action_packet->packet_action;
241 
242   // packet used to break the queue blocking wait
243   if (action == TERMINATION_PACKET) {
244     return true;
245   }
246   // packet to signal the applier to suspend
247   if (action == SUSPENSION_PACKET) {
248     suspend_applier_module();
249     return false;
250   }
251 
252   if (action == CHECKPOINT_PACKET) {
253     Queue_checkpoint_packet *packet = (Queue_checkpoint_packet *)action_packet;
254     packet->signal_checkpoint_reached();
255     return false;
256   }
257 
258   return false; /* purecov: inspected */
259 }
260 
apply_view_change_packet(View_change_packet * view_change_packet,Format_description_log_event * fde_evt,Continuation * cont)261 int Applier_module::apply_view_change_packet(
262     View_change_packet *view_change_packet,
263     Format_description_log_event *fde_evt, Continuation *cont) {
264   int error = 0;
265 
266   Gtid_set *group_executed_set = nullptr;
267   Sid_map *sid_map = nullptr;
268   if (!view_change_packet->group_executed_set.empty()) {
269     sid_map = new Sid_map(nullptr);
270     group_executed_set = new Gtid_set(sid_map, nullptr);
271     if (intersect_group_executed_sets(view_change_packet->group_executed_set,
272                                       group_executed_set)) {
273       LogPluginErr(
274           WARNING_LEVEL,
275           ER_GRP_RPL_ERROR_GTID_EXECUTION_INFO); /* purecov: inspected */
276       delete sid_map;                            /* purecov: inspected */
277       delete group_executed_set;                 /* purecov: inspected */
278       group_executed_set = nullptr;              /* purecov: inspected */
279     }
280   }
281 
282   if (group_executed_set != nullptr) {
283     if (get_certification_handler()
284             ->get_certifier()
285             ->set_group_stable_transactions_set(group_executed_set)) {
286       LogPluginErr(WARNING_LEVEL,
287                    ER_GRP_RPL_CERTIFICATE_SIZE_ERROR); /* purecov: inspected */
288     }
289     delete sid_map;
290     delete group_executed_set;
291   }
292 
293   View_change_log_event *view_change_event =
294       new View_change_log_event(view_change_packet->view_id.c_str());
295 
296   Pipeline_event *pevent = new Pipeline_event(view_change_event, fde_evt);
297   pevent->mark_event(SINGLE_VIEW_EVENT);
298 
299   /*
300     If there are prepared consistent transactions waiting for the
301     prepare acknowledge, the View_change_log_event must be delayed
302     to after those transactions are committed, since they belong to
303     the previous view.
304   */
305   if (transaction_consistency_manager->has_local_prepared_transactions()) {
306     DBUG_PRINT("info", ("Delaying the log of the view '%s' to after local "
307                         "prepared transactions",
308                         view_change_packet->view_id.c_str()));
309     transaction_consistency_manager->schedule_view_change_event(pevent);
310     return error;
311   }
312 
313   error = inject_event_into_pipeline(pevent, cont);
314   if (!cont->is_transaction_discarded()) delete pevent;
315 
316   return error;
317 }
318 
apply_data_packet(Data_packet * data_packet,Format_description_log_event * fde_evt,Continuation * cont)319 int Applier_module::apply_data_packet(Data_packet *data_packet,
320                                       Format_description_log_event *fde_evt,
321                                       Continuation *cont) {
322   int error = 0;
323   uchar *payload = data_packet->payload;
324   uchar *payload_end = data_packet->payload + data_packet->len;
325 
326   DBUG_EXECUTE_IF("group_replication_before_apply_data_packet", {
327     const char act[] = "now wait_for continue_apply";
328     DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
329   });
330 
331   while ((payload != payload_end) && !error) {
332     uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);
333 
334     Data_packet *new_packet = new Data_packet(payload, event_len);
335     payload = payload + event_len;
336 
337     std::list<Gcs_member_identifier> *online_members = nullptr;
338     if (nullptr != data_packet->m_online_members) {
339       online_members =
340           new std::list<Gcs_member_identifier>(*data_packet->m_online_members);
341     }
342 
343     Pipeline_event *pevent =
344         new Pipeline_event(new_packet, fde_evt, UNDEFINED_EVENT_MODIFIER,
345                            data_packet->m_consistency_level, online_members);
346     error = inject_event_into_pipeline(pevent, cont);
347 
348     delete pevent;
349     DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
350       if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
351         error = 1;
352       }
353     });
354   }
355 
356   return error;
357 }
358 
apply_single_primary_action_packet(Single_primary_action_packet * packet)359 int Applier_module::apply_single_primary_action_packet(
360     Single_primary_action_packet *packet) {
361   int error = 0;
362   Certifier_interface *certifier = get_certification_handler()->get_certifier();
363 
364   switch (packet->action) {
365     case Single_primary_action_packet::NEW_PRIMARY:
366       certifier->enable_conflict_detection();
367       break;
368     case Single_primary_action_packet::QUEUE_APPLIED:
369       certifier->disable_conflict_detection();
370       break;
371     default:
372       DBUG_ASSERT(0); /* purecov: inspected */
373   }
374 
375   return error;
376 }
377 
apply_transaction_prepared_action_packet(Transaction_prepared_action_packet * packet)378 int Applier_module::apply_transaction_prepared_action_packet(
379     Transaction_prepared_action_packet *packet) {
380   return transaction_consistency_manager->handle_remote_prepare(
381       packet->get_sid(), packet->m_gno, packet->m_gcs_member_id);
382 }
383 
apply_sync_before_execution_action_packet(Sync_before_execution_action_packet * packet)384 int Applier_module::apply_sync_before_execution_action_packet(
385     Sync_before_execution_action_packet *packet) {
386   return transaction_consistency_manager->handle_sync_before_execution_message(
387       packet->m_thread_id, packet->m_gcs_member_id);
388 }
389 
apply_leaving_members_action_packet(Leaving_members_action_packet * packet)390 int Applier_module::apply_leaving_members_action_packet(
391     Leaving_members_action_packet *packet) {
392   return transaction_consistency_manager->handle_member_leave(
393       packet->m_leaving_members);
394 }
395 
applier_thread_handle()396 int Applier_module::applier_thread_handle() {
397   DBUG_TRACE;
398 
399   // set the thread context
400   set_applier_thread_context();
401   mysql_mutex_lock(&run_lock);
402   applier_thd_state.set_initialized();
403   mysql_mutex_unlock(&run_lock);
404 
405   Handler_THD_setup_action *thd_conf_action = nullptr;
406   Format_description_log_event *fde_evt = nullptr;
407   Continuation *cont = nullptr;
408   Packet *packet = nullptr;
409   bool loop_termination = false;
410   int packet_application_error = 0;
411 
412   applier_error = setup_pipeline_handlers();
413 
414   applier_channel_observer = new Applier_channel_state_observer();
415   channel_observation_manager_list
416       ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
417       ->register_channel_observer(applier_channel_observer);
418 
419   if (!applier_error) {
420     Pipeline_action *start_action = new Handler_start_action();
421     applier_error += pipeline->handle_action(start_action);
422     delete start_action;
423   }
424 
425   if (applier_error) {
426     goto end;
427   }
428 
429   mysql_mutex_lock(&run_lock);
430   applier_thread_is_exiting = false;
431   applier_thd_state.set_running();
432   if (stage_handler.initialize_stage_monitor())
433     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NO_STAGE_SERVICE);
434   stage_handler.set_stage(info_GR_STAGE_module_executing.m_key, __FILE__,
435                           __LINE__, 0, 0);
436   mysql_cond_broadcast(&run_cond);
437   mysql_mutex_unlock(&run_lock);
438 
439   fde_evt = new Format_description_log_event();
440   /*
441     Group replication does not use binary log checksumming on contents arriving
442     from certification. So, group replication applier channel shall behave as a
443     replication channel connected to a master that does not add checksum to its
444     binary log files.
445   */
446   fde_evt->common_footer->checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
447   cont = new Continuation();
448 
449   // Give the handlers access to the applier THD
450   thd_conf_action = new Handler_THD_setup_action(applier_thd);
451   // To prevent overwrite last error method
452   applier_error += pipeline->handle_action(thd_conf_action);
453   delete thd_conf_action;
454 
455   // applier main loop
456   while (!applier_error && !packet_application_error && !loop_termination) {
457     if (is_applier_thread_aborted()) break;
458 
459     this->incoming->front(&packet);  // blocking
460 
461     switch (packet->get_packet_type()) {
462       case ACTION_PACKET_TYPE:
463         this->incoming->pop();
464         loop_termination = apply_action_packet((Action_packet *)packet);
465         break;
466       case VIEW_CHANGE_PACKET_TYPE:
467         packet_application_error = apply_view_change_packet(
468             (View_change_packet *)packet, fde_evt, cont);
469         this->incoming->pop();
470         break;
471       case DATA_PACKET_TYPE:
472         packet_application_error =
473             apply_data_packet((Data_packet *)packet, fde_evt, cont);
474         // Remove from queue here, so the size only decreases after packet
475         // handling
476         this->incoming->pop();
477         break;
478       case SINGLE_PRIMARY_PACKET_TYPE:
479         packet_application_error = apply_single_primary_action_packet(
480             (Single_primary_action_packet *)packet);
481         this->incoming->pop();
482         break;
483       case TRANSACTION_PREPARED_PACKET_TYPE:
484         packet_application_error = apply_transaction_prepared_action_packet(
485             static_cast<Transaction_prepared_action_packet *>(packet));
486         this->incoming->pop();
487         break;
488       case SYNC_BEFORE_EXECUTION_PACKET_TYPE:
489         packet_application_error = apply_sync_before_execution_action_packet(
490             static_cast<Sync_before_execution_action_packet *>(packet));
491         this->incoming->pop();
492         break;
493       case LEAVING_MEMBERS_PACKET_TYPE:
494         packet_application_error = apply_leaving_members_action_packet(
495             static_cast<Leaving_members_action_packet *>(packet));
496         this->incoming->pop();
497         break;
498       default:
499         DBUG_ASSERT(0); /* purecov: inspected */
500     }
501 
502     delete packet;
503   }
504   if (packet_application_error) applier_error = packet_application_error;
505   delete fde_evt;
506   delete cont;
507 
508 end:
509 
510   // always remove the observer even the thread is no longer running
511   channel_observation_manager_list
512       ->get_channel_observation_manager(GROUP_CHANNEL_OBSERVATION_MANAGER_POS)
513       ->unregister_channel_observer(applier_channel_observer);
514 
515   // only try to leave if the applier managed to start
516   if (applier_error && applier_thd_state.is_running()) {
517     const char *exit_state_action_abort_log_message =
518         "Fatal error during execution on the Applier module of Group "
519         "Replication.";
520     leave_group_on_failure::mask leave_actions;
521     /*
522       Only follow exit_state_action if we were already inside a group. We may
523       happen to come across an applier error during the startup of GR (i.e.
524       during the execution of the START GROUP_REPLICATION command). We must not
525       follow exit_state_action on that situation.
526     */
527     leave_actions.set(leave_group_on_failure::HANDLE_EXIT_STATE_ACTION,
528                       gcs_module->belongs_to_group());
529     leave_group_on_failure::leave(
530         leave_actions, ER_GRP_RPL_APPLIER_EXECUTION_FATAL_ERROR,
531         PSESSION_USE_THREAD, nullptr, exit_state_action_abort_log_message);
532   }
533 
534   // Even on error cases, send a stop signal to all handlers that could be
535   // active
536   Pipeline_action *stop_action = new Handler_stop_action();
537   int local_applier_error = pipeline->handle_action(stop_action);
538   delete stop_action;
539 
540   Gcs_interface_factory::cleanup_thread_communication_resources(
541       Gcs_operations::get_gcs_engine());
542 
543   LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_APPLIER_THD_KILLED);
544 
545   DBUG_EXECUTE_IF("applier_thd_timeout", {
546     const char act[] = "now wait_for signal.applier_continue";
547     DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
548   });
549 
550   stage_handler.end_stage();
551   stage_handler.terminate_stage_monitor();
552 
553   clean_applier_thread_context();
554 
555   mysql_mutex_lock(&run_lock);
556 
557   /*
558     Don't overwrite applier_error when stop_applier_thread() doesn't return
559     error. So applier_error which is also referred by main thread
560     doesn't return true from initialize_applier_thread() when
561     start_applier_thread() fails and stop_applier_thread() succeeds.
562     Also use local var - local_applier_error, as the applier can be deleted
563     before the thread returns.
564   */
565   if (local_applier_error)
566     applier_error = local_applier_error; /* purecov: inspected */
567   else
568     local_applier_error = applier_error;
569 
570   applier_killed_status = false;
571   delete applier_thd;
572   applier_thd_state.set_terminated();
573   mysql_cond_broadcast(&run_cond);
574   mysql_mutex_unlock(&run_lock);
575 
576   my_thread_end();
577   applier_thread_is_exiting = true;
578   my_thread_exit(nullptr);
579 
580   return local_applier_error; /* purecov: inspected */
581 }
582 
initialize_applier_thread()583 int Applier_module::initialize_applier_thread() {
584   DBUG_TRACE;
585 
586   // avoid concurrency calls against stop invocations
587   mysql_mutex_lock(&run_lock);
588 
589   applier_thread_is_exiting = false;
590   applier_killed_status = false;
591   applier_error = 0;
592 
593   applier_thd_state.set_created();
594   if ((mysql_thread_create(key_GR_THD_applier_module_receiver, &applier_pthd,
595                            get_connection_attrib(), launch_handler_thread,
596                            (void *)this))) {
597     applier_thd_state.set_terminated();
598     mysql_mutex_unlock(&run_lock); /* purecov: inspected */
599     return 1;                      /* purecov: inspected */
600   }
601 
602   while (applier_thd_state.is_alive_not_running() && !applier_error) {
603     DBUG_PRINT("sleep", ("Waiting for applier thread to start"));
604     if (current_thd != nullptr && current_thd->is_killed()) {
605       applier_error = 1;
606       applier_killed_status = true;
607       LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_UNBLOCK_WAITING_THD);
608       break;
609     }
610 
611     struct timespec abstime;
612     set_timespec(&abstime, 1);
613 
614     mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
615   }
616 
617   mysql_mutex_unlock(&run_lock);
618   return applier_error;
619 }
620 
terminate_applier_pipeline()621 int Applier_module::terminate_applier_pipeline() {
622   int error = 0;
623   if (pipeline != nullptr) {
624     if ((error = pipeline->terminate_pipeline())) {
625       LogPluginErr(
626           WARNING_LEVEL,
627           ER_GRP_RPL_APPLIER_PIPELINE_NOT_DISPOSED); /* purecov: inspected */
628     }
629     // delete anyway, as we can't do much on error cases
630     delete pipeline;
631     pipeline = nullptr;
632   }
633   return error;
634 }
635 
terminate_applier_thread()636 int Applier_module::terminate_applier_thread() {
637   DBUG_TRACE;
638 
639   /* This lock code needs to be re-written from scratch*/
640   mysql_mutex_lock(&run_lock);
641 
642   applier_aborted = true;
643 
644   if (applier_thd_state.is_thread_dead()) {
645     goto delete_pipeline;
646   }
647 
648   while (applier_thd_state.is_thread_alive()) {
649     DBUG_PRINT("loop", ("killing group replication applier thread"));
650 
651     if (applier_thd_state.is_initialized()) {
652       mysql_mutex_lock(&applier_thd->LOCK_thd_data);
653 
654       if (applier_killed_status)
655         applier_thd->awake(THD::KILL_CONNECTION);
656       else
657         applier_thd->awake(THD::NOT_KILLED);
658 
659       mysql_mutex_unlock(&applier_thd->LOCK_thd_data);
660 
661       // before waiting for termination, signal the queue to unlock.
662       add_termination_packet();
663 
664       // also awake the applier in case it is suspended
665       awake_applier_module();
666     }
667 
668     /*
669       There is a small chance that thread might miss the first
670       alarm. To protect against it, resend the signal until it reacts
671     */
672     struct timespec abstime;
673     set_timespec(&abstime, (stop_wait_timeout == 1 ? 1 : 2));
674 #ifndef DBUG_OFF
675     int error =
676 #endif
677         mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
678 
679     if (stop_wait_timeout >= 1) {
680       stop_wait_timeout = stop_wait_timeout - (stop_wait_timeout == 1 ? 1 : 2);
681     }
682     if (applier_thd_state.is_thread_alive() &&
683         stop_wait_timeout <= 0)  // quit waiting
684     {
685       mysql_mutex_unlock(&run_lock);
686       return 1;
687     }
688     DBUG_ASSERT(error == ETIMEDOUT || error == 0);
689   }
690 
691   DBUG_ASSERT(!applier_thd_state.is_running());
692 
693 delete_pipeline:
694 
695   // The thread ended properly so we can terminate the pipeline
696   terminate_applier_pipeline();
697 
698   while (!applier_thread_is_exiting) {
699     /* Check if applier thread is exiting per microsecond. */
700     my_sleep(1);
701   }
702 
703   /*
704     Give applier thread one microsecond to exit completely after
705     it set applier_thread_is_exiting to true.
706   */
707   my_sleep(1);
708 
709   mysql_mutex_unlock(&run_lock);
710 
711   return 0;
712 }
713 
inform_of_applier_stop(char * channel_name,bool aborted)714 void Applier_module::inform_of_applier_stop(char *channel_name, bool aborted) {
715   DBUG_TRACE;
716 
717   if (!strcmp(channel_name, applier_module_channel_name) && aborted &&
718       applier_thd_state.is_thread_alive()) {
719     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_APPLIER_THD_EXECUTION_ABORTED);
720 
721     applier_error = 1;
722 
723     // before waiting for termination, signal the queue to unlock.
724     add_termination_packet();
725 
726     // also awake the applier in case it is suspended
727     awake_applier_module();
728   }
729 }
730 
wait_for_applier_complete_suspension(bool * abort_flag,bool wait_for_execution)731 int Applier_module::wait_for_applier_complete_suspension(
732     bool *abort_flag, bool wait_for_execution) {
733   int error = 0;
734 
735   mysql_mutex_lock(&suspend_lock);
736 
737   /*
738    We use an external flag to avoid race conditions.
739    A local flag could always lead to the scenario of
740      wait_for_applier_complete_suspension()
741 
742    >> thread switch
743 
744      break_applier_suspension_wait()
745        we_are_waiting = false;
746        awake
747 
748    thread switch <<
749 
750       we_are_waiting = true;
751       wait();
752   */
753   while (!suspended && !(*abort_flag) && !applier_aborted && !applier_error) {
754     mysql_cond_wait(&suspension_waiting_condition, &suspend_lock);
755   }
756 
757   mysql_mutex_unlock(&suspend_lock);
758 
759   if (applier_aborted || applier_error)
760     return APPLIER_THREAD_ABORTED; /* purecov: inspected */
761 
762   /**
763     Wait for the applier execution of pre suspension events (blocking method)
764     while(the wait method times out)
765       wait()
766   */
767   if (wait_for_execution) {
768     error = APPLIER_GTID_CHECK_TIMEOUT_ERROR;  // timeout error
769     while (error == APPLIER_GTID_CHECK_TIMEOUT_ERROR && !(*abort_flag))
770       error = wait_for_applier_event_execution(1, true);  // blocking
771   }
772 
773   return (error == APPLIER_RELAY_LOG_NOT_INITED);
774 }
775 
interrupt_applier_suspension_wait()776 void Applier_module::interrupt_applier_suspension_wait() {
777   mysql_mutex_lock(&suspend_lock);
778   mysql_cond_broadcast(&suspension_waiting_condition);
779   mysql_mutex_unlock(&suspend_lock);
780 }
781 
is_applier_thread_waiting()782 bool Applier_module::is_applier_thread_waiting() {
783   DBUG_TRACE;
784   Event_handler *event_applier = nullptr;
785   Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
786 
787   if (event_applier == nullptr) return false; /* purecov: inspected */
788 
789   bool result = ((Applier_handler *)event_applier)->is_applier_thread_waiting();
790 
791   return result;
792 }
793 
wait_for_applier_event_execution(double timeout,bool check_and_purge_partial_transactions)794 int Applier_module::wait_for_applier_event_execution(
795     double timeout, bool check_and_purge_partial_transactions) {
796   DBUG_TRACE;
797   int error = 0;
798   Event_handler *event_applier = nullptr;
799   Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
800 
801   if (event_applier && !(error = ((Applier_handler *)event_applier)
802                                      ->wait_for_gtid_execution(timeout))) {
803     /*
804       After applier thread is done, check if there is partial transaction
805       in the relay log. If so, applier thread must be holding the lock on it
806       and will never release it because there will not be any more events
807       coming into this channel. In this case, purge the relaylogs and restart
808       the applier thread will release the lock and update the applier thread
809       execution position correctly and safely.
810     */
811     if (check_and_purge_partial_transactions &&
812         ((Applier_handler *)event_applier)
813             ->is_partial_transaction_on_relay_log()) {
814       error = purge_applier_queue_and_restart_applier_module();
815     }
816   }
817   return error;
818 }
819 
get_retrieved_gtid_set(std::string & retrieved_set)820 bool Applier_module::get_retrieved_gtid_set(std::string &retrieved_set) {
821   Replication_thread_api applier_channel(applier_module_channel_name);
822   if (applier_channel.get_retrieved_gtid_set(retrieved_set)) {
823     /* purecov: begin inspected */
824     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_ERROR_GTID_SET_EXTRACTION,
825                  " cannot extract the applier module's retrieved set.");
826     return true;
827     /* purecov: end */
828   }
829   return false;
830 }
831 
wait_for_applier_event_execution(std::string & retrieved_set,double timeout,bool update_THD_status)832 int Applier_module::wait_for_applier_event_execution(std::string &retrieved_set,
833                                                      double timeout,
834                                                      bool update_THD_status) {
835   DBUG_TRACE;
836   int error = 0;
837   Event_handler *event_applier = nullptr;
838   Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
839 
840   if (event_applier) {
841     error = ((Applier_handler *)event_applier)
842                 ->wait_for_gtid_execution(retrieved_set, timeout,
843                                           update_THD_status);
844   }
845 
846   return error;
847 }
848 
wait_for_current_events_execution(std::shared_ptr<Continuation> checkpoint_condition,bool * abort_flag,bool update_THD_status)849 bool Applier_module::wait_for_current_events_execution(
850     std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
851     bool update_THD_status) {
852   DBUG_TRACE;
853   applier_module->queue_and_wait_on_queue_checkpoint(checkpoint_condition);
854   std::string current_retrieve_set;
855   if (applier_module->get_retrieved_gtid_set(current_retrieve_set)) return true;
856 
857   int error = 1;
858   while (!*abort_flag && error != 0) {
859     error = applier_module->wait_for_applier_event_execution(
860         current_retrieve_set, 1, update_THD_status);
861 
862     /* purecov: begin inspected */
863     if (error == -2) {  // error when waiting
864       return true;
865     }
866     /* purecov: end */
867   }
868   return false;
869 }
870 
get_certification_handler()871 Certification_handler *Applier_module::get_certification_handler() {
872   Event_handler *event_applier = nullptr;
873   Event_handler::get_handler_by_role(pipeline, CERTIFIER, &event_applier);
874 
875   // The only certification handler for now
876   return (Certification_handler *)event_applier;
877 }
878 
intersect_group_executed_sets(std::vector<std::string> & gtid_sets,Gtid_set * output_set)879 int Applier_module::intersect_group_executed_sets(
880     std::vector<std::string> &gtid_sets, Gtid_set *output_set) {
881   Sid_map *sid_map = output_set->get_sid_map();
882 
883   std::vector<std::string>::iterator set_iterator;
884   for (set_iterator = gtid_sets.begin(); set_iterator != gtid_sets.end();
885        set_iterator++) {
886     Gtid_set member_set(sid_map, nullptr);
887     Gtid_set intersection_result(sid_map, nullptr);
888 
889     std::string exec_set_str = (*set_iterator);
890 
891     if (member_set.add_gtid_text(exec_set_str.c_str()) != RETURN_STATUS_OK) {
892       return 1; /* purecov: inspected */
893     }
894 
895     if (output_set->is_empty()) {
896       if (output_set->add_gtid_set(&member_set)) {
897         return 1; /* purecov: inspected */
898       }
899     } else {
900       /*
901         We have three sets:
902           member_set:          the one sent from a given member;
903           output_set:        the one that contains the intersection of
904                                the computed sets until now;
905           intersection_result: the intersection between set and
906                                intersection_result.
907         So we compute the intersection between member_set and output_set, and
908         set that value to output_set to be used on the next intersection.
909       */
910       if (member_set.intersection(output_set, &intersection_result) !=
911           RETURN_STATUS_OK) {
912         return 1; /* purecov: inspected */
913       }
914 
915       output_set->clear();
916       if (output_set->add_gtid_set(&intersection_result) != RETURN_STATUS_OK) {
917         return 1; /* purecov: inspected */
918       }
919     }
920   }
921 
922 #if !defined(DBUG_OFF)
923   char *executed_set_string;
924   output_set->to_string(&executed_set_string);
925   DBUG_PRINT("info", ("View change GTID information: output_set: %s",
926                       executed_set_string));
927   my_free(executed_set_string);
928 #endif
929 
930   return 0;
931 }
932 
queue_certification_enabling_packet()933 void Applier_module::queue_certification_enabling_packet() {
934   incoming->push(new Single_primary_action_packet(
935       Single_primary_action_packet::NEW_PRIMARY));
936 }
937 
queue_and_wait_on_queue_checkpoint(std::shared_ptr<Continuation> checkpoint_condition)938 bool Applier_module::queue_and_wait_on_queue_checkpoint(
939     std::shared_ptr<Continuation> checkpoint_condition) {
940   incoming->push(new Queue_checkpoint_packet(checkpoint_condition));
941   return checkpoint_condition->wait() != 0;
942 }
943 
get_local_pipeline_stats()944 Pipeline_member_stats *Applier_module::get_local_pipeline_stats() {
945   // We need run_lock to get protection against STOP GR command.
946 
947   MUTEX_LOCK(guard, &run_lock);
948   Pipeline_member_stats *stats = nullptr;
949   auto cert = applier_module->get_certification_handler();
950   auto cert_module = (cert ? cert->get_certifier() : nullptr);
951   if (cert_module) {
952     stats = new Pipeline_member_stats(
953         get_pipeline_stats_member_collector(), get_message_queue_size(),
954         cert_module->get_negative_certified(),
955         cert_module->get_certification_info_size());
956     {
957       char *committed_transactions_buf = nullptr;
958       size_t committed_transactions_buf_length = 0;
959       int outcome = cert_module->get_group_stable_transactions_set_string(
960           &committed_transactions_buf, &committed_transactions_buf_length);
961       if (!outcome && committed_transactions_buf_length > 0)
962         stats->set_transaction_committed_all_members(
963             committed_transactions_buf, committed_transactions_buf_length);
964       my_free(committed_transactions_buf);
965     }
966     {
967       std::string last_conflict_free_transaction;
968       cert_module->get_last_conflict_free_transaction(
969           &last_conflict_free_transaction);
970       stats->set_transaction_last_conflict_free(last_conflict_free_transaction);
971     }
972 
973   } else {
974     stats = new Pipeline_member_stats(get_pipeline_stats_member_collector(),
975                                       get_message_queue_size(), 0, 0);
976   }
977   return stats;
978 }
979