1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include <signal.h>
24 #include "applier.h"
25 #include <mysql/group_replication_priv.h>
26 #include "plugin_log.h"
27 #include "plugin.h"
28 #include "single_primary_message.h"
29 
30 char applier_module_channel_name[] = "group_replication_applier";
31 bool applier_thread_is_exiting= false;
32 
launch_handler_thread(void * arg)33 static void *launch_handler_thread(void* arg)
34 {
35   Applier_module *handler= (Applier_module*) arg;
36   handler->applier_thread_handle();
37   return 0;
38 }
39 
Applier_module()40 Applier_module::Applier_module()
41   :applier_running(false), applier_aborted(false), applier_error(0),
42    suspended(false), waiting_for_applier_suspension(false),
43    shared_stop_write_lock(NULL), incoming(NULL), pipeline(NULL),
44    fde_evt(BINLOG_VERSION), stop_wait_timeout(LONG_TIMEOUT),
45    applier_channel_observer(NULL)
46 {
47   mysql_mutex_init(key_GR_LOCK_applier_module_run, &run_lock, MY_MUTEX_INIT_FAST);
48   mysql_cond_init(key_GR_COND_applier_module_run, &run_cond);
49   mysql_mutex_init(key_GR_LOCK_applier_module_suspend, &suspend_lock, MY_MUTEX_INIT_FAST);
50   mysql_cond_init(key_GR_COND_applier_module_suspend, &suspend_cond);
51   mysql_cond_init(key_GR_COND_applier_module_wait, &suspension_waiting_condition);
52 }
53 
~Applier_module()54 Applier_module::~Applier_module(){
55   if (this->incoming)
56   {
57     while (!this->incoming->empty())
58     {
59       Packet *packet= NULL;
60       this->incoming->pop(&packet);
61       delete packet;
62     }
63     delete incoming;
64   }
65   delete applier_channel_observer;
66 
67   mysql_mutex_destroy(&run_lock);
68   mysql_cond_destroy(&run_cond);
69   mysql_mutex_destroy(&suspend_lock);
70   mysql_cond_destroy(&suspend_cond);
71   mysql_cond_destroy(&suspension_waiting_condition);
72 }
73 
74 int
setup_applier_module(Handler_pipeline_type pipeline_type,bool reset_logs,ulong stop_timeout,rpl_sidno group_sidno,ulonglong gtid_assignment_block_size,Shared_writelock * shared_stop_lock)75 Applier_module::setup_applier_module(Handler_pipeline_type pipeline_type,
76                                      bool reset_logs,
77                                      ulong stop_timeout,
78                                      rpl_sidno group_sidno,
79                                      ulonglong gtid_assignment_block_size,
80                                      Shared_writelock *shared_stop_lock)
81 {
82   DBUG_ENTER("Applier_module::setup_applier_module");
83 
84   int error= 0;
85 
86   //create the receiver queue
87   this->incoming= new Synchronized_queue<Packet*>();
88 
89   stop_wait_timeout= stop_timeout;
90 
91   pipeline= NULL;
92 
93   if ( (error= get_pipeline(pipeline_type, &pipeline)) )
94   {
95     DBUG_RETURN(error);
96   }
97 
98   reset_applier_logs= reset_logs;
99   group_replication_sidno= group_sidno;
100   this->gtid_assignment_block_size= gtid_assignment_block_size;
101 
102   shared_stop_write_lock= shared_stop_lock;
103 
104   DBUG_RETURN(error);
105 }
106 
107 
108 int
purge_applier_queue_and_restart_applier_module()109 Applier_module::purge_applier_queue_and_restart_applier_module()
110 {
111   DBUG_ENTER("Applier_module::purge_applier_queue_and_restart_applier_module");
112   int error= 0;
113 
114   /*
115     Here we are stopping applier thread intentionally and we will be starting
116     the applier thread after purging the relay logs. So we should ignore any
117     errors during the stop (eg: error due to stopping the applier thread in the
118     middle of applying the group of events). Hence unregister the applier channel
119     observer temporarily till the required work is done.
120   */
121   channel_observation_manager->unregister_channel_observer(applier_channel_observer);
122 
123   /* Stop the applier thread */
124   Pipeline_action *stop_action= new Handler_stop_action();
125   error= pipeline->handle_action(stop_action);
126   delete stop_action;
127   if (error)
128     DBUG_RETURN(error); /* purecov: inspected */
129 
130   /* Purge the relay logs and initialize the channel*/
131   Handler_applier_configuration_action *applier_conf_action=
132     new Handler_applier_configuration_action(applier_module_channel_name,
133                                              true, /* purge relay logs always*/
134                                              stop_wait_timeout,
135                                              group_replication_sidno);
136 
137   error= pipeline->handle_action(applier_conf_action);
138   delete applier_conf_action;
139   if (error)
140     DBUG_RETURN(error); /* purecov: inspected */
141 
142   channel_observation_manager->register_channel_observer(applier_channel_observer);
143 
144   /* Start the applier thread */
145   Pipeline_action *start_action = new Handler_start_action();
146   error= pipeline->handle_action(start_action);
147   delete start_action;
148 
149   DBUG_RETURN(error);
150 }
151 
152 
153 int
setup_pipeline_handlers()154 Applier_module::setup_pipeline_handlers()
155 {
156   DBUG_ENTER("Applier_module::setup_pipeline_handlers");
157 
158   int error= 0;
159 
160   //Configure the applier handler trough a configuration action
161   Handler_applier_configuration_action *applier_conf_action=
162     new Handler_applier_configuration_action(applier_module_channel_name,
163                                              reset_applier_logs,
164                                              stop_wait_timeout,
165                                              group_replication_sidno);
166 
167   error= pipeline->handle_action(applier_conf_action);
168   delete applier_conf_action;
169   if (error)
170     DBUG_RETURN(error); /* purecov: inspected */
171 
172   Handler_certifier_configuration_action *cert_conf_action=
173     new Handler_certifier_configuration_action(group_replication_sidno,
174                                                gtid_assignment_block_size);
175 
176   error = pipeline->handle_action(cert_conf_action);
177 
178   delete cert_conf_action;
179 
180   DBUG_RETURN(error);
181 }
182 
183 void
set_applier_thread_context()184 Applier_module::set_applier_thread_context()
185 {
186   my_thread_init();
187   THD *thd= new THD;
188   thd->set_new_thread_id();
189   thd->thread_stack= (char*) &thd;
190   thd->store_globals();
191 
192   thd->get_protocol_classic()->init_net(0);
193   thd->slave_thread= true;
194   //TODO: See of the creation of a new type is desirable.
195   thd->system_thread= SYSTEM_THREAD_SLAVE_IO;
196   thd->security_context()->skip_grants();
197 
198   global_thd_manager_add_thd(thd);
199 
200   thd->init_for_queries();
201   set_slave_thread_options(thd);
202 #ifndef _WIN32
203   THD_STAGE_INFO(thd, stage_executing);
204 #endif
205   applier_thd= thd;
206 }
207 
208 void
clean_applier_thread_context()209 Applier_module::clean_applier_thread_context()
210 {
211   applier_thd->get_protocol_classic()->end_net();
212   applier_thd->release_resources();
213   THD_CHECK_SENTRY(applier_thd);
214   global_thd_manager_remove_thd(applier_thd);
215 }
216 
217 int
inject_event_into_pipeline(Pipeline_event * pevent,Continuation * cont)218 Applier_module::inject_event_into_pipeline(Pipeline_event* pevent,
219                                            Continuation* cont)
220 {
221   int error= 0;
222   pipeline->handle_event(pevent, cont);
223 
224   if ((error= cont->wait()))
225     log_message(MY_ERROR_LEVEL, "Error at event handling! Got error: %d", error);
226 
227   return error;
228 }
229 
230 
231 
apply_action_packet(Action_packet * action_packet)232 bool Applier_module::apply_action_packet(Action_packet *action_packet)
233 {
234   enum_packet_action action= action_packet->packet_action;
235 
236   //packet used to break the queue blocking wait
237   if (action == TERMINATION_PACKET)
238   {
239      return true;
240   }
241   //packet to signal the applier to suspend
242   if (action == SUSPENSION_PACKET)
243   {
244     suspend_applier_module();
245     return false;
246   }
247   return false; /* purecov: inspected */
248 }
249 
250 int
apply_view_change_packet(View_change_packet * view_change_packet,Format_description_log_event * fde_evt,IO_CACHE * cache,Continuation * cont)251 Applier_module::apply_view_change_packet(View_change_packet *view_change_packet,
252                                          Format_description_log_event *fde_evt,
253                                          IO_CACHE *cache,
254                                          Continuation *cont)
255 {
256   int error= 0;
257 
258   Gtid_set *group_executed_set= NULL;
259   Sid_map *sid_map= NULL;
260   if (!view_change_packet->group_executed_set.empty())
261   {
262     sid_map= new Sid_map(NULL);
263     group_executed_set= new Gtid_set(sid_map, NULL);
264     if (intersect_group_executed_sets(view_change_packet->group_executed_set,
265                                       group_executed_set))
266     {
267        log_message(MY_WARNING_LEVEL,
268                    "Error when extracting group GTID execution information, "
269                    "some recovery operations may face future issues"); /* purecov: inspected */
270        delete sid_map;            /* purecov: inspected */
271        delete group_executed_set; /* purecov: inspected */
272        group_executed_set= NULL;  /* purecov: inspected */
273     }
274   }
275 
276   if (group_executed_set != NULL)
277   {
278     if (get_certification_handler()->get_certifier()->
279         set_group_stable_transactions_set(group_executed_set))
280     {
281       log_message(MY_WARNING_LEVEL,
282                   "An error happened when trying to reduce the Certification "
283                   " information size for transmission"); /* purecov: inspected */
284     }
285     delete sid_map;
286     delete group_executed_set;
287   }
288 
289   View_change_log_event* view_change_event
290       = new View_change_log_event((char*)view_change_packet->view_id.c_str());
291 
292   Pipeline_event* pevent= new Pipeline_event(view_change_event, fde_evt, cache);
293   pevent->mark_event(SINGLE_VIEW_EVENT);
294   error= inject_event_into_pipeline(pevent, cont);
295   //When discarded, the VCLE logging was delayed, so don't delete it
296   if (!cont->is_transaction_discarded())
297     delete pevent;
298 
299   return error;
300 }
301 
apply_data_packet(Data_packet * data_packet,Format_description_log_event * fde_evt,IO_CACHE * cache,Continuation * cont)302 int Applier_module::apply_data_packet(Data_packet *data_packet,
303                                       Format_description_log_event *fde_evt,
304                                       IO_CACHE *cache,
305                                       Continuation *cont)
306 {
307   int error= 0;
308   uchar* payload= data_packet->payload;
309   uchar* payload_end= data_packet->payload + data_packet->len;
310 
311   DBUG_EXECUTE_IF("group_replication_before_apply_data_packet", {
312     const char act[] = "now wait_for continue_apply";
313     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
314   });
315 
316   if (check_single_primary_queue_status())
317     return 1; /* purecov: inspected */
318 
319   while ((payload != payload_end) && !error)
320   {
321     uint event_len= uint4korr(((uchar*)payload) + EVENT_LEN_OFFSET);
322 
323     Data_packet* new_packet= new Data_packet(payload, event_len);
324     payload= payload + event_len;
325 
326     Pipeline_event* pevent= new Pipeline_event(new_packet, fde_evt, cache);
327     error= inject_event_into_pipeline(pevent, cont);
328 
329     delete pevent;
330     DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event",
331                     {
332                       if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT)
333                       {
334                         error=1;
335                       }
336                     }
337                    );
338   }
339 
340   return error;
341 }
342 
343 int
apply_single_primary_action_packet(Single_primary_action_packet * packet)344 Applier_module::apply_single_primary_action_packet(Single_primary_action_packet *packet)
345 {
346   int error= 0;
347   Certifier_interface *certifier= get_certification_handler()->get_certifier();
348 
349   switch (packet->action)
350   {
351     case Single_primary_action_packet::NEW_PRIMARY:
352       certifier->enable_conflict_detection();
353       break;
354     case Single_primary_action_packet::QUEUE_APPLIED:
355       certifier->disable_conflict_detection();
356       break;
357     default:
358       assert(0); /* purecov: inspected */
359   }
360 
361   return error;
362 }
363 
364 
365 int
applier_thread_handle()366 Applier_module::applier_thread_handle()
367 {
368   DBUG_ENTER("ApplierModule::applier_thread_handle()");
369 
370   //set the thread context
371   set_applier_thread_context();
372 
373   Handler_THD_setup_action *thd_conf_action= NULL;
374   Format_description_log_event* fde_evt= NULL;
375   Continuation* cont= NULL;
376   Packet *packet= NULL;
377   bool loop_termination = false;
378   int packet_application_error= 0;
379 
380   IO_CACHE *cache= (IO_CACHE*) my_malloc(PSI_NOT_INSTRUMENTED,
381                                          sizeof(IO_CACHE),
382                                          MYF(MY_ZEROFILL));
383   if (!cache || (!my_b_inited(cache) &&
384                  open_cached_file(cache, mysql_tmpdir,
385                                   "group_replication_pipeline_applier_cache",
386                                   SHARED_EVENT_IO_CACHE_SIZE,
387                                   MYF(MY_WME))))
388   {
389     my_free(cache);   /* purecov: inspected */
390     cache= NULL;      /* purecov: inspected */
391     log_message(MY_ERROR_LEVEL,
392                 "Failed to create group replication pipeline applier cache!"); /* purecov: inspected */
393     applier_error= 1; /* purecov: inspected */
394     goto end;         /* purecov: inspected */
395   }
396 
397   applier_error= setup_pipeline_handlers();
398 
399   applier_channel_observer= new Applier_channel_state_observer();
400   channel_observation_manager
401       ->register_channel_observer(applier_channel_observer);
402 
403   if (!applier_error)
404   {
405     Pipeline_action *start_action = new Handler_start_action();
406     applier_error= pipeline->handle_action(start_action);
407     delete start_action;
408   }
409 
410   if (applier_error)
411   {
412     goto end;
413   }
414 
415   mysql_mutex_lock(&run_lock);
416   applier_thread_is_exiting= false;
417   applier_running= true;
418   mysql_cond_broadcast(&run_cond);
419   mysql_mutex_unlock(&run_lock);
420 
421   fde_evt= new Format_description_log_event(BINLOG_VERSION);
422   cont= new Continuation();
423 
424   //Give the handlers access to the applier THD
425   thd_conf_action= new Handler_THD_setup_action(applier_thd);
426   // To prevent overwrite last error method
427   applier_error+= pipeline->handle_action(thd_conf_action);
428   delete thd_conf_action;
429 
430   //applier main loop
431   while (!applier_error && !packet_application_error && !loop_termination)
432   {
433     if (is_applier_thread_aborted())
434       break;
435 
436     this->incoming->front(&packet); // blocking
437 
438     switch (packet->get_packet_type())
439     {
440       case ACTION_PACKET_TYPE:
441           this->incoming->pop();
442           loop_termination= apply_action_packet((Action_packet*)packet);
443           break;
444       case VIEW_CHANGE_PACKET_TYPE:
445           packet_application_error=
446             apply_view_change_packet((View_change_packet*)packet,
447                                      fde_evt, cache, cont);
448           this->incoming->pop();
449           break;
450       case DATA_PACKET_TYPE:
451           packet_application_error= apply_data_packet((Data_packet*)packet,
452                                                       fde_evt, cache, cont);
453           //Remove from queue here, so the size only decreases after packet handling
454          this->incoming->pop();
455           break;
456       case SINGLE_PRIMARY_PACKET_TYPE:
457           packet_application_error=
458               apply_single_primary_action_packet((Single_primary_action_packet*)packet);
459           this->incoming->pop();
460           break;
461       default:
462         assert(0); /* purecov: inspected */
463     }
464 
465     delete packet;
466   }
467   if (packet_application_error)
468     applier_error= packet_application_error;
469   delete fde_evt;
470   delete cont;
471 
472 end:
473 
474   //always remove the observer even the thread is no longer running
475   channel_observation_manager
476       ->unregister_channel_observer(applier_channel_observer);
477 
478   //only try to leave if the applier managed to start
479   if (applier_error && applier_running)
480     leave_group_on_failure();
481 
482   //Even on error cases, send a stop signal to all handlers that could be active
483   Pipeline_action *stop_action= new Handler_stop_action();
484   int local_applier_error= pipeline->handle_action(stop_action);
485   delete stop_action;
486 
487   Gcs_interface_factory::cleanup(Gcs_operations::get_gcs_engine());
488 
489   log_message(MY_INFORMATION_LEVEL, "The group replication applier thread"
490                                     " was killed");
491 
492   DBUG_EXECUTE_IF("applier_thd_timeout",
493                   {
494                     const char act[]= "now wait_for signal.applier_continue";
495                     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
496                   });
497 
498   if (cache != NULL)
499   {
500     close_cached_file(cache);
501     my_free(cache);
502   }
503 
504   clean_applier_thread_context();
505 
506   mysql_mutex_lock(&run_lock);
507   delete applier_thd;
508 
509   /*
510     Don't overwrite applier_error when stop_applier_thread() doesn't return
511     error. So applier_error which is also referred by main thread
512     doesn't return true from initialize_applier_thread() when
513     start_applier_thread() fails and stop_applier_thread() succeeds.
514     Also use local var - local_applier_error, as the applier can be deleted
515     before the thread returns.
516   */
517   if (local_applier_error)
518     applier_error= local_applier_error; /* purecov: inspected */
519   else
520     local_applier_error= applier_error;
521 
522   applier_running= false;
523   mysql_cond_broadcast(&run_cond);
524   mysql_mutex_unlock(&run_lock);
525 
526   my_thread_end();
527   applier_thread_is_exiting= true;
528   my_thread_exit(0);
529 
530   DBUG_RETURN(local_applier_error); /* purecov: inspected */
531 }
532 
533 int
initialize_applier_thread()534 Applier_module::initialize_applier_thread()
535 {
536   DBUG_ENTER("Applier_module::initialize_applier_thd");
537 
538   //avoid concurrency calls against stop invocations
539   mysql_mutex_lock(&run_lock);
540 
541   applier_error= 0;
542 
543   if ((mysql_thread_create(key_GR_THD_applier_module_receiver,
544                            &applier_pthd,
545                            get_connection_attrib(),
546                            launch_handler_thread,
547                            (void*)this)))
548   {
549     mysql_mutex_unlock(&run_lock); /* purecov: inspected */
550     DBUG_RETURN(1);                /* purecov: inspected */
551   }
552 
553   while (!applier_running && !applier_error)
554   {
555     DBUG_PRINT("sleep",("Waiting for applier thread to start"));
556     mysql_cond_wait(&run_cond, &run_lock);
557   }
558 
559   mysql_mutex_unlock(&run_lock);
560   DBUG_RETURN(applier_error);
561 }
562 
563 int
terminate_applier_pipeline()564 Applier_module::terminate_applier_pipeline()
565 {
566   int error= 0;
567   if (pipeline != NULL)
568   {
569     if ((error= pipeline->terminate_pipeline()))
570     {
571       log_message(MY_WARNING_LEVEL,
572                   "The group replication applier pipeline was not properly"
573                   " disposed. Check the error log for further info."); /* purecov: inspected */
574     }
575     //delete anyway, as we can't do much on error cases
576     delete pipeline;
577     pipeline= NULL;
578   }
579   return error;
580 }
581 
582 int
terminate_applier_thread()583 Applier_module::terminate_applier_thread()
584 {
585   DBUG_ENTER("Applier_module::terminate_applier_thread");
586 
587   /* This lock code needs to be re-written from scratch*/
588   mysql_mutex_lock(&run_lock);
589 
590   applier_aborted= true;
591 
592   if (!applier_running)
593   {
594     goto delete_pipeline;
595   }
596 
597   while (applier_running)
598   {
599     DBUG_PRINT("loop", ("killing group replication applier thread"));
600 
601     mysql_mutex_lock(&applier_thd->LOCK_thd_data);
602 
603     applier_thd->awake(THD::NOT_KILLED);
604     mysql_mutex_unlock(&applier_thd->LOCK_thd_data);
605 
606     //before waiting for termination, signal the queue to unlock.
607     add_termination_packet();
608 
609     //also awake the applier in case it is suspended
610     awake_applier_module();
611 
612     /*
613       There is a small chance that thread might miss the first
614       alarm. To protect against it, resend the signal until it reacts
615     */
616     struct timespec abstime;
617     set_timespec(&abstime, 2);
618 #ifndef NDEBUG
619     int error=
620 #endif
621       mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
622     if (stop_wait_timeout >= 2)
623     {
624       stop_wait_timeout= stop_wait_timeout - 2;
625     }
626     else if (applier_running) // quit waiting
627     {
628       mysql_mutex_unlock(&run_lock);
629       DBUG_RETURN(1);
630     }
631     assert(error == ETIMEDOUT || error == 0);
632   }
633 
634   assert(!applier_running);
635 
636 delete_pipeline:
637 
638   //The thread ended properly so we can terminate the pipeline
639   terminate_applier_pipeline();
640 
641   while (!applier_thread_is_exiting)
642   {
643     /* Check if applier thread is exiting per microsecond. */
644     my_sleep(1);
645   }
646 
647   /*
648     Give applier thread one microsecond to exit completely after
649     it set applier_thread_is_exiting to true.
650   */
651   my_sleep(1);
652 
653   mysql_mutex_unlock(&run_lock);
654 
655   DBUG_RETURN(0);
656 }
657 
inform_of_applier_stop(char * channel_name,bool aborted)658 void Applier_module::inform_of_applier_stop(char* channel_name,
659                                             bool aborted)
660 {
661   DBUG_ENTER("Applier_module::inform_of_applier_stop");
662 
663   if (!strcmp(channel_name, applier_module_channel_name) &&
664       aborted && applier_running )
665   {
666     log_message(MY_ERROR_LEVEL,
667                 "The applier thread execution was aborted."
668                 " Unable to process more transactions,"
669                 " this member will now leave the group.");
670 
671     applier_error= 1;
672 
673     //before waiting for termination, signal the queue to unlock.
674     add_termination_packet();
675 
676     //also awake the applier in case it is suspended
677     awake_applier_module();
678   }
679 
680   DBUG_VOID_RETURN;
681 }
682 
leave_group_on_failure()683 void Applier_module::leave_group_on_failure()
684 {
685   DBUG_ENTER("Applier_module::leave_group_on_failure");
686 
687   log_message(MY_ERROR_LEVEL,
688               "Fatal error during execution on the Applier process of "
689               "Group Replication. The server will now leave the group.");
690 
691   group_member_mgr->update_member_status(local_member_info->get_uuid(),
692                                          Group_member_info::MEMBER_ERROR);
693 
694   bool set_read_mode= false;
695   if (view_change_notifier != NULL &&
696       !view_change_notifier->is_view_modification_ongoing())
697   {
698     view_change_notifier->start_view_modification();
699   }
700   Gcs_operations::enum_leave_state state= gcs_module->leave();
701 
702   int error= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
703                               stop_wait_timeout);
704   if (error)
705   {
706     log_message(MY_ERROR_LEVEL,
707                 "Error stopping all replication channels while server was"
708                 " leaving the group. Please check the error log for additional"
709                 " details. Got error: %d", error);
710   }
711 
712   std::stringstream ss;
713   plugin_log_level log_severity= MY_WARNING_LEVEL;
714   switch (state)
715   {
716     case Gcs_operations::ERROR_WHEN_LEAVING:
717       ss << "Unable to confirm whether the server has left the group or not. "
718             "Check performance_schema.replication_group_members to check group membership information.";
719       log_severity= MY_ERROR_LEVEL;
720       break;
721     case Gcs_operations::ALREADY_LEAVING:
722       ss << "Skipping leave operation: concurrent attempt to leave the group is on-going."; /* purecov: inspected */
723       break; /* purecov: inspected */
724     case Gcs_operations::ALREADY_LEFT:
725       ss << "Skipping leave operation: member already left the group."; /* purecov: inspected */
726       break; /* purecov: inspected */
727     case Gcs_operations::NOW_LEAVING:
728       set_read_mode= true;
729       ss << "The server was automatically set into read only mode after an error was detected.";
730       log_severity= MY_ERROR_LEVEL;
731       break;
732   }
733   log_message(log_severity, ss.str().c_str());
734 
735   kill_pending_transactions(set_read_mode, false);
736 
737   DBUG_VOID_RETURN;
738 }
739 
kill_pending_transactions(bool set_read_mode,bool threaded_sql_session)740 void Applier_module::kill_pending_transactions(bool set_read_mode,
741                                                bool threaded_sql_session)
742 {
743   DBUG_ENTER("Applier_module::kill_pending_transactions");
744 
745   //Stop any more transactions from waiting
746   bool already_locked= shared_stop_write_lock->try_grab_write_lock();
747 
748   //kill pending transactions
749   blocked_transaction_handler->unblock_waiting_transactions();
750 
751   if (!already_locked)
752     shared_stop_write_lock->release_write_lock();
753 
754   if (set_read_mode)
755   {
756     if (threaded_sql_session)
757       enable_server_read_mode(PSESSION_INIT_THREAD);
758     else
759       enable_server_read_mode(PSESSION_USE_THREAD);
760   }
761 
762   if (view_change_notifier != NULL)
763   {
764     log_message(MY_INFORMATION_LEVEL, "Going to wait for view modification");
765     if (view_change_notifier->wait_for_view_modification())
766     {
767       log_message(MY_ERROR_LEVEL, "On shutdown there was a timeout receiving a "
768                                   "view change. This can lead to a possible "
769                                   "inconsistent state. Check the log for more "
770                                   "details");
771     }
772   }
773 
774   /*
775     Only abort() if we successfully asked to leave() the group (and we have
776     group_replication_exit_state_action set to ABORT_SERVER).
777     We don't want to abort() during the execution of START GROUP_REPLICATION or
778     STOP GROUP_REPLICATION.
779   */
780   if (set_read_mode &&
781       exit_state_action_var == EXIT_STATE_ACTION_ABORT_SERVER)
782   {
783     abort_plugin_process("Fatal error during execution of Group Replication");
784   }
785 
786   DBUG_VOID_RETURN;
787 }
788 
789 int
wait_for_applier_complete_suspension(bool * abort_flag,bool wait_for_execution)790 Applier_module::wait_for_applier_complete_suspension(bool *abort_flag,
791                                                      bool wait_for_execution)
792 {
793   int error= 0;
794 
795   mysql_mutex_lock(&suspend_lock);
796 
797   /*
798    We use an external flag to avoid race conditions.
799    A local flag could always lead to the scenario of
800      wait_for_applier_complete_suspension()
801 
802    >> thread switch
803 
804      break_applier_suspension_wait()
805        we_are_waiting = false;
806        awake
807 
808    thread switch <<
809 
810       we_are_waiting = true;
811       wait();
812   */
813   while (!suspended && !(*abort_flag) && !applier_aborted && !applier_error)
814   {
815     mysql_cond_wait(&suspension_waiting_condition, &suspend_lock);
816   }
817 
818   mysql_mutex_unlock(&suspend_lock);
819 
820   if (applier_aborted || applier_error)
821       return APPLIER_THREAD_ABORTED; /* purecov: inspected */
822 
823   /**
824     Wait for the applier execution of pre suspension events (blocking method)
825     while(the wait method times out)
826       wait()
827   */
828   if (wait_for_execution)
829   {
830     error= APPLIER_GTID_CHECK_TIMEOUT_ERROR; //timeout error
831     while (error == APPLIER_GTID_CHECK_TIMEOUT_ERROR && !(*abort_flag))
832       error= wait_for_applier_event_execution(1, true); //blocking
833   }
834 
835   return (error == APPLIER_RELAY_LOG_NOT_INITED);
836 }
837 
838 void
interrupt_applier_suspension_wait()839 Applier_module::interrupt_applier_suspension_wait()
840 {
841   mysql_mutex_lock(&suspend_lock);
842   mysql_cond_broadcast(&suspension_waiting_condition);
843   mysql_mutex_unlock(&suspend_lock);
844 }
845 
846 bool
is_applier_thread_waiting()847 Applier_module::is_applier_thread_waiting()
848 {
849   DBUG_ENTER("Applier_module::is_applier_thread_waiting");
850   Event_handler* event_applier= NULL;
851   Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
852 
853   if (event_applier == NULL)
854     return false; /* purecov: inspected */
855 
856   bool result= ((Applier_handler*)event_applier)->is_applier_thread_waiting();
857 
858   DBUG_RETURN(result);
859 }
860 
861 int
wait_for_applier_event_execution(double timeout,bool check_and_purge_partial_transactions)862 Applier_module::wait_for_applier_event_execution(double timeout,
863                                                  bool check_and_purge_partial_transactions)
864 {
865   DBUG_ENTER("Applier_module::wait_for_applier_event_execution");
866   int error= 0;
867   Event_handler* event_applier= NULL;
868   Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
869 
870   if (event_applier &&
871       !(error= ((Applier_handler*)event_applier)->wait_for_gtid_execution(timeout)))
872   {
873     /*
874       After applier thread is done, check if there is partial transaction
875       in the relay log. If so, applier thread must be holding the lock on it
876       and will never release it because there will not be any more events
877       coming into this channel. In this case, purge the relaylogs and restart
878       the applier thread will release the lock and update the applier thread
879       execution position correctly and safely.
880     */
881     if (check_and_purge_partial_transactions &&
882         ((Applier_handler*)event_applier)->is_partial_transaction_on_relay_log())
883     {
884         error= purge_applier_queue_and_restart_applier_module();
885     }
886   }
887   DBUG_RETURN(error);
888 }
889 
890 
get_certification_handler()891 Certification_handler* Applier_module::get_certification_handler(){
892 
893   Event_handler* event_applier= NULL;
894   Event_handler::get_handler_by_role(pipeline, CERTIFIER, &event_applier);
895 
896   //The only certification handler for now
897   return (Certification_handler*) event_applier;
898 }
899 
900 int
intersect_group_executed_sets(std::vector<std::string> & gtid_sets,Gtid_set * output_set)901 Applier_module::intersect_group_executed_sets(std::vector<std::string>& gtid_sets,
902                                               Gtid_set* output_set)
903 {
904   Sid_map* sid_map= output_set->get_sid_map();
905 
906   std::vector<std::string>::iterator set_iterator;
907   for (set_iterator= gtid_sets.begin();
908        set_iterator!= gtid_sets.end();
909        set_iterator++)
910   {
911 
912     Gtid_set member_set(sid_map, NULL);
913     Gtid_set intersection_result(sid_map, NULL);
914 
915     std::string exec_set_str= (*set_iterator);
916 
917     if (member_set.add_gtid_text(exec_set_str.c_str()) != RETURN_STATUS_OK)
918     {
919       return 1; /* purecov: inspected */
920     }
921 
922     if (output_set->is_empty())
923     {
924       if (output_set->add_gtid_set(&member_set))
925       {
926       return 1; /* purecov: inspected */
927       }
928     }
929     else
930     {
931       /*
932         We have three sets:
933           member_set:          the one sent from a given member;
934           output_set:        the one that contains the intersection of
935                                the computed sets until now;
936           intersection_result: the intersection between set and
937                                intersection_result.
938         So we compute the intersection between member_set and output_set, and
939         set that value to output_set to be used on the next intersection.
940       */
941       if (member_set.intersection(output_set, &intersection_result) != RETURN_STATUS_OK)
942       {
943         return 1; /* purecov: inspected */
944       }
945 
946       output_set->clear();
947       if (output_set->add_gtid_set(&intersection_result) != RETURN_STATUS_OK)
948       {
949         return 1; /* purecov: inspected */
950       }
951     }
952   }
953 
954 #if !defined(NDEBUG)
955   char *executed_set_string;
956   output_set->to_string(&executed_set_string);
957   DBUG_PRINT("info", ("View change GTID information: output_set: %s",
958              executed_set_string));
959   my_free(executed_set_string);
960 #endif
961 
962   return 0;
963 }
964 
check_single_primary_queue_status()965 int Applier_module::check_single_primary_queue_status()
966 {
967   /*
968     If the 1) group is on single primary mode, 2) this member is the
969     primary one, and 3) the group replication applier did apply all
970     previous primary transactions, we can switch off conflict
971     detection since all transactions will originate from the same
972     primary.
973   */
974   if (get_certification_handler()->get_certifier()->is_conflict_detection_enable() &&
975       local_member_info->in_primary_mode() &&
976       local_member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY &&
977       is_applier_thread_waiting())
978   {
979     Single_primary_message
980         single_primary_message(Single_primary_message::SINGLE_PRIMARY_QUEUE_APPLIED_MESSAGE);
981     if (gcs_module->send_message(single_primary_message))
982     {
983       log_message(MY_ERROR_LEVEL,
984                   "Error sending single primary message informing "
985                   "that primary did apply relay logs"); /* purecov: inspected */
986       return 1; /* purecov: inspected */
987     }
988   }
989 
990   return 0;
991 }
992 
get_local_pipeline_stats()993 Pipeline_member_stats *Applier_module::get_local_pipeline_stats()
994 {
995   // We need run_lock to get protection against STOP GR command.
996 
997   Mutex_autolock auto_lock_mutex(&run_lock);
998   Pipeline_member_stats *stats= NULL;
999   Certification_handler *cert= this->get_certification_handler();
1000   Certifier_interface *cert_module= (cert ? cert->get_certifier() : NULL);
1001   if (cert_module)
1002   {
1003     stats= new Pipeline_member_stats(
1004         get_pipeline_stats_member_collector(), get_message_queue_size(),
1005         cert_module->get_negative_certified(),
1006         cert_module->get_certification_info_size());
1007     {
1008       char *committed_transactions_buf= NULL;
1009       size_t committed_transactions_buf_length= 0;
1010       int outcome= cert_module->get_group_stable_transactions_set_string(
1011           &committed_transactions_buf, &committed_transactions_buf_length);
1012 
1013       if (!outcome && committed_transactions_buf_length > 0)
1014         stats->set_transaction_committed_all_members(committed_transactions_buf,
1015                 committed_transactions_buf_length);
1016       my_free(committed_transactions_buf);
1017     }
1018     {
1019       std::string last_conflict_free_transaction;
1020       cert_module->get_last_conflict_free_transaction(
1021           &last_conflict_free_transaction);
1022       stats->set_transaction_last_conflict_free(last_conflict_free_transaction);
1023     }
1024   }
1025   else
1026   {
1027     stats= new Pipeline_member_stats(get_pipeline_stats_member_collector(),
1028                                      get_message_queue_size(), 0, 0);
1029   }
1030   return stats;
1031 }
1032