1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include <signal.h>
24 #include "applier.h"
25 #include <mysql/group_replication_priv.h>
26 #include "plugin_log.h"
27 #include "plugin.h"
28 #include "single_primary_message.h"
29
30 char applier_module_channel_name[] = "group_replication_applier";
31 bool applier_thread_is_exiting= false;
32
launch_handler_thread(void * arg)33 static void *launch_handler_thread(void* arg)
34 {
35 Applier_module *handler= (Applier_module*) arg;
36 handler->applier_thread_handle();
37 return 0;
38 }
39
Applier_module()40 Applier_module::Applier_module()
41 :applier_running(false), applier_aborted(false), applier_error(0),
42 suspended(false), waiting_for_applier_suspension(false),
43 shared_stop_write_lock(NULL), incoming(NULL), pipeline(NULL),
44 fde_evt(BINLOG_VERSION), stop_wait_timeout(LONG_TIMEOUT),
45 applier_channel_observer(NULL)
46 {
47 mysql_mutex_init(key_GR_LOCK_applier_module_run, &run_lock, MY_MUTEX_INIT_FAST);
48 mysql_cond_init(key_GR_COND_applier_module_run, &run_cond);
49 mysql_mutex_init(key_GR_LOCK_applier_module_suspend, &suspend_lock, MY_MUTEX_INIT_FAST);
50 mysql_cond_init(key_GR_COND_applier_module_suspend, &suspend_cond);
51 mysql_cond_init(key_GR_COND_applier_module_wait, &suspension_waiting_condition);
52 }
53
~Applier_module()54 Applier_module::~Applier_module(){
55 if (this->incoming)
56 {
57 while (!this->incoming->empty())
58 {
59 Packet *packet= NULL;
60 this->incoming->pop(&packet);
61 delete packet;
62 }
63 delete incoming;
64 }
65 delete applier_channel_observer;
66
67 mysql_mutex_destroy(&run_lock);
68 mysql_cond_destroy(&run_cond);
69 mysql_mutex_destroy(&suspend_lock);
70 mysql_cond_destroy(&suspend_cond);
71 mysql_cond_destroy(&suspension_waiting_condition);
72 }
73
74 int
setup_applier_module(Handler_pipeline_type pipeline_type,bool reset_logs,ulong stop_timeout,rpl_sidno group_sidno,ulonglong gtid_assignment_block_size,Shared_writelock * shared_stop_lock)75 Applier_module::setup_applier_module(Handler_pipeline_type pipeline_type,
76 bool reset_logs,
77 ulong stop_timeout,
78 rpl_sidno group_sidno,
79 ulonglong gtid_assignment_block_size,
80 Shared_writelock *shared_stop_lock)
81 {
82 DBUG_ENTER("Applier_module::setup_applier_module");
83
84 int error= 0;
85
86 //create the receiver queue
87 this->incoming= new Synchronized_queue<Packet*>();
88
89 stop_wait_timeout= stop_timeout;
90
91 pipeline= NULL;
92
93 if ( (error= get_pipeline(pipeline_type, &pipeline)) )
94 {
95 DBUG_RETURN(error);
96 }
97
98 reset_applier_logs= reset_logs;
99 group_replication_sidno= group_sidno;
100 this->gtid_assignment_block_size= gtid_assignment_block_size;
101
102 shared_stop_write_lock= shared_stop_lock;
103
104 DBUG_RETURN(error);
105 }
106
107
108 int
purge_applier_queue_and_restart_applier_module()109 Applier_module::purge_applier_queue_and_restart_applier_module()
110 {
111 DBUG_ENTER("Applier_module::purge_applier_queue_and_restart_applier_module");
112 int error= 0;
113
114 /*
115 Here we are stopping applier thread intentionally and we will be starting
116 the applier thread after purging the relay logs. So we should ignore any
117 errors during the stop (eg: error due to stopping the applier thread in the
118 middle of applying the group of events). Hence unregister the applier channel
119 observer temporarily till the required work is done.
120 */
121 channel_observation_manager->unregister_channel_observer(applier_channel_observer);
122
123 /* Stop the applier thread */
124 Pipeline_action *stop_action= new Handler_stop_action();
125 error= pipeline->handle_action(stop_action);
126 delete stop_action;
127 if (error)
128 DBUG_RETURN(error); /* purecov: inspected */
129
130 /* Purge the relay logs and initialize the channel*/
131 Handler_applier_configuration_action *applier_conf_action=
132 new Handler_applier_configuration_action(applier_module_channel_name,
133 true, /* purge relay logs always*/
134 stop_wait_timeout,
135 group_replication_sidno);
136
137 error= pipeline->handle_action(applier_conf_action);
138 delete applier_conf_action;
139 if (error)
140 DBUG_RETURN(error); /* purecov: inspected */
141
142 channel_observation_manager->register_channel_observer(applier_channel_observer);
143
144 /* Start the applier thread */
145 Pipeline_action *start_action = new Handler_start_action();
146 error= pipeline->handle_action(start_action);
147 delete start_action;
148
149 DBUG_RETURN(error);
150 }
151
152
153 int
setup_pipeline_handlers()154 Applier_module::setup_pipeline_handlers()
155 {
156 DBUG_ENTER("Applier_module::setup_pipeline_handlers");
157
158 int error= 0;
159
160 //Configure the applier handler trough a configuration action
161 Handler_applier_configuration_action *applier_conf_action=
162 new Handler_applier_configuration_action(applier_module_channel_name,
163 reset_applier_logs,
164 stop_wait_timeout,
165 group_replication_sidno);
166
167 error= pipeline->handle_action(applier_conf_action);
168 delete applier_conf_action;
169 if (error)
170 DBUG_RETURN(error); /* purecov: inspected */
171
172 Handler_certifier_configuration_action *cert_conf_action=
173 new Handler_certifier_configuration_action(group_replication_sidno,
174 gtid_assignment_block_size);
175
176 error = pipeline->handle_action(cert_conf_action);
177
178 delete cert_conf_action;
179
180 DBUG_RETURN(error);
181 }
182
183 void
set_applier_thread_context()184 Applier_module::set_applier_thread_context()
185 {
186 my_thread_init();
187 THD *thd= new THD;
188 thd->set_new_thread_id();
189 thd->thread_stack= (char*) &thd;
190 thd->store_globals();
191
192 thd->get_protocol_classic()->init_net(0);
193 thd->slave_thread= true;
194 //TODO: See of the creation of a new type is desirable.
195 thd->system_thread= SYSTEM_THREAD_SLAVE_IO;
196 thd->security_context()->skip_grants();
197
198 global_thd_manager_add_thd(thd);
199
200 thd->init_for_queries();
201 set_slave_thread_options(thd);
202 #ifndef _WIN32
203 THD_STAGE_INFO(thd, stage_executing);
204 #endif
205 applier_thd= thd;
206 }
207
208 void
clean_applier_thread_context()209 Applier_module::clean_applier_thread_context()
210 {
211 applier_thd->get_protocol_classic()->end_net();
212 applier_thd->release_resources();
213 THD_CHECK_SENTRY(applier_thd);
214 global_thd_manager_remove_thd(applier_thd);
215 }
216
217 int
inject_event_into_pipeline(Pipeline_event * pevent,Continuation * cont)218 Applier_module::inject_event_into_pipeline(Pipeline_event* pevent,
219 Continuation* cont)
220 {
221 int error= 0;
222 pipeline->handle_event(pevent, cont);
223
224 if ((error= cont->wait()))
225 log_message(MY_ERROR_LEVEL, "Error at event handling! Got error: %d", error);
226
227 return error;
228 }
229
230
231
apply_action_packet(Action_packet * action_packet)232 bool Applier_module::apply_action_packet(Action_packet *action_packet)
233 {
234 enum_packet_action action= action_packet->packet_action;
235
236 //packet used to break the queue blocking wait
237 if (action == TERMINATION_PACKET)
238 {
239 return true;
240 }
241 //packet to signal the applier to suspend
242 if (action == SUSPENSION_PACKET)
243 {
244 suspend_applier_module();
245 return false;
246 }
247 return false; /* purecov: inspected */
248 }
249
250 int
apply_view_change_packet(View_change_packet * view_change_packet,Format_description_log_event * fde_evt,IO_CACHE * cache,Continuation * cont)251 Applier_module::apply_view_change_packet(View_change_packet *view_change_packet,
252 Format_description_log_event *fde_evt,
253 IO_CACHE *cache,
254 Continuation *cont)
255 {
256 int error= 0;
257
258 Gtid_set *group_executed_set= NULL;
259 Sid_map *sid_map= NULL;
260 if (!view_change_packet->group_executed_set.empty())
261 {
262 sid_map= new Sid_map(NULL);
263 group_executed_set= new Gtid_set(sid_map, NULL);
264 if (intersect_group_executed_sets(view_change_packet->group_executed_set,
265 group_executed_set))
266 {
267 log_message(MY_WARNING_LEVEL,
268 "Error when extracting group GTID execution information, "
269 "some recovery operations may face future issues"); /* purecov: inspected */
270 delete sid_map; /* purecov: inspected */
271 delete group_executed_set; /* purecov: inspected */
272 group_executed_set= NULL; /* purecov: inspected */
273 }
274 }
275
276 if (group_executed_set != NULL)
277 {
278 if (get_certification_handler()->get_certifier()->
279 set_group_stable_transactions_set(group_executed_set))
280 {
281 log_message(MY_WARNING_LEVEL,
282 "An error happened when trying to reduce the Certification "
283 " information size for transmission"); /* purecov: inspected */
284 }
285 delete sid_map;
286 delete group_executed_set;
287 }
288
289 View_change_log_event* view_change_event
290 = new View_change_log_event((char*)view_change_packet->view_id.c_str());
291
292 Pipeline_event* pevent= new Pipeline_event(view_change_event, fde_evt, cache);
293 pevent->mark_event(SINGLE_VIEW_EVENT);
294 error= inject_event_into_pipeline(pevent, cont);
295 //When discarded, the VCLE logging was delayed, so don't delete it
296 if (!cont->is_transaction_discarded())
297 delete pevent;
298
299 return error;
300 }
301
apply_data_packet(Data_packet * data_packet,Format_description_log_event * fde_evt,IO_CACHE * cache,Continuation * cont)302 int Applier_module::apply_data_packet(Data_packet *data_packet,
303 Format_description_log_event *fde_evt,
304 IO_CACHE *cache,
305 Continuation *cont)
306 {
307 int error= 0;
308 uchar* payload= data_packet->payload;
309 uchar* payload_end= data_packet->payload + data_packet->len;
310
311 DBUG_EXECUTE_IF("group_replication_before_apply_data_packet", {
312 const char act[] = "now wait_for continue_apply";
313 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
314 });
315
316 if (check_single_primary_queue_status())
317 return 1; /* purecov: inspected */
318
319 while ((payload != payload_end) && !error)
320 {
321 uint event_len= uint4korr(((uchar*)payload) + EVENT_LEN_OFFSET);
322
323 Data_packet* new_packet= new Data_packet(payload, event_len);
324 payload= payload + event_len;
325
326 Pipeline_event* pevent= new Pipeline_event(new_packet, fde_evt, cache);
327 error= inject_event_into_pipeline(pevent, cont);
328
329 delete pevent;
330 DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event",
331 {
332 if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT)
333 {
334 error=1;
335 }
336 }
337 );
338 }
339
340 return error;
341 }
342
343 int
apply_single_primary_action_packet(Single_primary_action_packet * packet)344 Applier_module::apply_single_primary_action_packet(Single_primary_action_packet *packet)
345 {
346 int error= 0;
347 Certifier_interface *certifier= get_certification_handler()->get_certifier();
348
349 switch (packet->action)
350 {
351 case Single_primary_action_packet::NEW_PRIMARY:
352 certifier->enable_conflict_detection();
353 break;
354 case Single_primary_action_packet::QUEUE_APPLIED:
355 certifier->disable_conflict_detection();
356 break;
357 default:
358 assert(0); /* purecov: inspected */
359 }
360
361 return error;
362 }
363
364
365 int
applier_thread_handle()366 Applier_module::applier_thread_handle()
367 {
368 DBUG_ENTER("ApplierModule::applier_thread_handle()");
369
370 //set the thread context
371 set_applier_thread_context();
372
373 Handler_THD_setup_action *thd_conf_action= NULL;
374 Format_description_log_event* fde_evt= NULL;
375 Continuation* cont= NULL;
376 Packet *packet= NULL;
377 bool loop_termination = false;
378 int packet_application_error= 0;
379
380 IO_CACHE *cache= (IO_CACHE*) my_malloc(PSI_NOT_INSTRUMENTED,
381 sizeof(IO_CACHE),
382 MYF(MY_ZEROFILL));
383 if (!cache || (!my_b_inited(cache) &&
384 open_cached_file(cache, mysql_tmpdir,
385 "group_replication_pipeline_applier_cache",
386 SHARED_EVENT_IO_CACHE_SIZE,
387 MYF(MY_WME))))
388 {
389 my_free(cache); /* purecov: inspected */
390 cache= NULL; /* purecov: inspected */
391 log_message(MY_ERROR_LEVEL,
392 "Failed to create group replication pipeline applier cache!"); /* purecov: inspected */
393 applier_error= 1; /* purecov: inspected */
394 goto end; /* purecov: inspected */
395 }
396
397 applier_error= setup_pipeline_handlers();
398
399 applier_channel_observer= new Applier_channel_state_observer();
400 channel_observation_manager
401 ->register_channel_observer(applier_channel_observer);
402
403 if (!applier_error)
404 {
405 Pipeline_action *start_action = new Handler_start_action();
406 applier_error= pipeline->handle_action(start_action);
407 delete start_action;
408 }
409
410 if (applier_error)
411 {
412 goto end;
413 }
414
415 mysql_mutex_lock(&run_lock);
416 applier_thread_is_exiting= false;
417 applier_running= true;
418 mysql_cond_broadcast(&run_cond);
419 mysql_mutex_unlock(&run_lock);
420
421 fde_evt= new Format_description_log_event(BINLOG_VERSION);
422 cont= new Continuation();
423
424 //Give the handlers access to the applier THD
425 thd_conf_action= new Handler_THD_setup_action(applier_thd);
426 // To prevent overwrite last error method
427 applier_error+= pipeline->handle_action(thd_conf_action);
428 delete thd_conf_action;
429
430 //applier main loop
431 while (!applier_error && !packet_application_error && !loop_termination)
432 {
433 if (is_applier_thread_aborted())
434 break;
435
436 this->incoming->front(&packet); // blocking
437
438 switch (packet->get_packet_type())
439 {
440 case ACTION_PACKET_TYPE:
441 this->incoming->pop();
442 loop_termination= apply_action_packet((Action_packet*)packet);
443 break;
444 case VIEW_CHANGE_PACKET_TYPE:
445 packet_application_error=
446 apply_view_change_packet((View_change_packet*)packet,
447 fde_evt, cache, cont);
448 this->incoming->pop();
449 break;
450 case DATA_PACKET_TYPE:
451 packet_application_error= apply_data_packet((Data_packet*)packet,
452 fde_evt, cache, cont);
453 //Remove from queue here, so the size only decreases after packet handling
454 this->incoming->pop();
455 break;
456 case SINGLE_PRIMARY_PACKET_TYPE:
457 packet_application_error=
458 apply_single_primary_action_packet((Single_primary_action_packet*)packet);
459 this->incoming->pop();
460 break;
461 default:
462 assert(0); /* purecov: inspected */
463 }
464
465 delete packet;
466 }
467 if (packet_application_error)
468 applier_error= packet_application_error;
469 delete fde_evt;
470 delete cont;
471
472 end:
473
474 //always remove the observer even the thread is no longer running
475 channel_observation_manager
476 ->unregister_channel_observer(applier_channel_observer);
477
478 //only try to leave if the applier managed to start
479 if (applier_error && applier_running)
480 leave_group_on_failure();
481
482 //Even on error cases, send a stop signal to all handlers that could be active
483 Pipeline_action *stop_action= new Handler_stop_action();
484 int local_applier_error= pipeline->handle_action(stop_action);
485 delete stop_action;
486
487 Gcs_interface_factory::cleanup(Gcs_operations::get_gcs_engine());
488
489 log_message(MY_INFORMATION_LEVEL, "The group replication applier thread"
490 " was killed");
491
492 DBUG_EXECUTE_IF("applier_thd_timeout",
493 {
494 const char act[]= "now wait_for signal.applier_continue";
495 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
496 });
497
498 if (cache != NULL)
499 {
500 close_cached_file(cache);
501 my_free(cache);
502 }
503
504 clean_applier_thread_context();
505
506 mysql_mutex_lock(&run_lock);
507 delete applier_thd;
508
509 /*
510 Don't overwrite applier_error when stop_applier_thread() doesn't return
511 error. So applier_error which is also referred by main thread
512 doesn't return true from initialize_applier_thread() when
513 start_applier_thread() fails and stop_applier_thread() succeeds.
514 Also use local var - local_applier_error, as the applier can be deleted
515 before the thread returns.
516 */
517 if (local_applier_error)
518 applier_error= local_applier_error; /* purecov: inspected */
519 else
520 local_applier_error= applier_error;
521
522 applier_running= false;
523 mysql_cond_broadcast(&run_cond);
524 mysql_mutex_unlock(&run_lock);
525
526 my_thread_end();
527 applier_thread_is_exiting= true;
528 my_thread_exit(0);
529
530 DBUG_RETURN(local_applier_error); /* purecov: inspected */
531 }
532
533 int
initialize_applier_thread()534 Applier_module::initialize_applier_thread()
535 {
536 DBUG_ENTER("Applier_module::initialize_applier_thd");
537
538 //avoid concurrency calls against stop invocations
539 mysql_mutex_lock(&run_lock);
540
541 applier_error= 0;
542
543 if ((mysql_thread_create(key_GR_THD_applier_module_receiver,
544 &applier_pthd,
545 get_connection_attrib(),
546 launch_handler_thread,
547 (void*)this)))
548 {
549 mysql_mutex_unlock(&run_lock); /* purecov: inspected */
550 DBUG_RETURN(1); /* purecov: inspected */
551 }
552
553 while (!applier_running && !applier_error)
554 {
555 DBUG_PRINT("sleep",("Waiting for applier thread to start"));
556 mysql_cond_wait(&run_cond, &run_lock);
557 }
558
559 mysql_mutex_unlock(&run_lock);
560 DBUG_RETURN(applier_error);
561 }
562
563 int
terminate_applier_pipeline()564 Applier_module::terminate_applier_pipeline()
565 {
566 int error= 0;
567 if (pipeline != NULL)
568 {
569 if ((error= pipeline->terminate_pipeline()))
570 {
571 log_message(MY_WARNING_LEVEL,
572 "The group replication applier pipeline was not properly"
573 " disposed. Check the error log for further info."); /* purecov: inspected */
574 }
575 //delete anyway, as we can't do much on error cases
576 delete pipeline;
577 pipeline= NULL;
578 }
579 return error;
580 }
581
582 int
terminate_applier_thread()583 Applier_module::terminate_applier_thread()
584 {
585 DBUG_ENTER("Applier_module::terminate_applier_thread");
586
587 /* This lock code needs to be re-written from scratch*/
588 mysql_mutex_lock(&run_lock);
589
590 applier_aborted= true;
591
592 if (!applier_running)
593 {
594 goto delete_pipeline;
595 }
596
597 while (applier_running)
598 {
599 DBUG_PRINT("loop", ("killing group replication applier thread"));
600
601 mysql_mutex_lock(&applier_thd->LOCK_thd_data);
602
603 applier_thd->awake(THD::NOT_KILLED);
604 mysql_mutex_unlock(&applier_thd->LOCK_thd_data);
605
606 //before waiting for termination, signal the queue to unlock.
607 add_termination_packet();
608
609 //also awake the applier in case it is suspended
610 awake_applier_module();
611
612 /*
613 There is a small chance that thread might miss the first
614 alarm. To protect against it, resend the signal until it reacts
615 */
616 struct timespec abstime;
617 set_timespec(&abstime, 2);
618 #ifndef NDEBUG
619 int error=
620 #endif
621 mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
622 if (stop_wait_timeout >= 2)
623 {
624 stop_wait_timeout= stop_wait_timeout - 2;
625 }
626 else if (applier_running) // quit waiting
627 {
628 mysql_mutex_unlock(&run_lock);
629 DBUG_RETURN(1);
630 }
631 assert(error == ETIMEDOUT || error == 0);
632 }
633
634 assert(!applier_running);
635
636 delete_pipeline:
637
638 //The thread ended properly so we can terminate the pipeline
639 terminate_applier_pipeline();
640
641 while (!applier_thread_is_exiting)
642 {
643 /* Check if applier thread is exiting per microsecond. */
644 my_sleep(1);
645 }
646
647 /*
648 Give applier thread one microsecond to exit completely after
649 it set applier_thread_is_exiting to true.
650 */
651 my_sleep(1);
652
653 mysql_mutex_unlock(&run_lock);
654
655 DBUG_RETURN(0);
656 }
657
inform_of_applier_stop(char * channel_name,bool aborted)658 void Applier_module::inform_of_applier_stop(char* channel_name,
659 bool aborted)
660 {
661 DBUG_ENTER("Applier_module::inform_of_applier_stop");
662
663 if (!strcmp(channel_name, applier_module_channel_name) &&
664 aborted && applier_running )
665 {
666 log_message(MY_ERROR_LEVEL,
667 "The applier thread execution was aborted."
668 " Unable to process more transactions,"
669 " this member will now leave the group.");
670
671 applier_error= 1;
672
673 //before waiting for termination, signal the queue to unlock.
674 add_termination_packet();
675
676 //also awake the applier in case it is suspended
677 awake_applier_module();
678 }
679
680 DBUG_VOID_RETURN;
681 }
682
leave_group_on_failure()683 void Applier_module::leave_group_on_failure()
684 {
685 DBUG_ENTER("Applier_module::leave_group_on_failure");
686
687 log_message(MY_ERROR_LEVEL,
688 "Fatal error during execution on the Applier process of "
689 "Group Replication. The server will now leave the group.");
690
691 group_member_mgr->update_member_status(local_member_info->get_uuid(),
692 Group_member_info::MEMBER_ERROR);
693
694 bool set_read_mode= false;
695 if (view_change_notifier != NULL &&
696 !view_change_notifier->is_view_modification_ongoing())
697 {
698 view_change_notifier->start_view_modification();
699 }
700 Gcs_operations::enum_leave_state state= gcs_module->leave();
701
702 int error= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
703 stop_wait_timeout);
704 if (error)
705 {
706 log_message(MY_ERROR_LEVEL,
707 "Error stopping all replication channels while server was"
708 " leaving the group. Please check the error log for additional"
709 " details. Got error: %d", error);
710 }
711
712 std::stringstream ss;
713 plugin_log_level log_severity= MY_WARNING_LEVEL;
714 switch (state)
715 {
716 case Gcs_operations::ERROR_WHEN_LEAVING:
717 ss << "Unable to confirm whether the server has left the group or not. "
718 "Check performance_schema.replication_group_members to check group membership information.";
719 log_severity= MY_ERROR_LEVEL;
720 break;
721 case Gcs_operations::ALREADY_LEAVING:
722 ss << "Skipping leave operation: concurrent attempt to leave the group is on-going."; /* purecov: inspected */
723 break; /* purecov: inspected */
724 case Gcs_operations::ALREADY_LEFT:
725 ss << "Skipping leave operation: member already left the group."; /* purecov: inspected */
726 break; /* purecov: inspected */
727 case Gcs_operations::NOW_LEAVING:
728 set_read_mode= true;
729 ss << "The server was automatically set into read only mode after an error was detected.";
730 log_severity= MY_ERROR_LEVEL;
731 break;
732 }
733 log_message(log_severity, ss.str().c_str());
734
735 kill_pending_transactions(set_read_mode, false);
736
737 DBUG_VOID_RETURN;
738 }
739
kill_pending_transactions(bool set_read_mode,bool threaded_sql_session)740 void Applier_module::kill_pending_transactions(bool set_read_mode,
741 bool threaded_sql_session)
742 {
743 DBUG_ENTER("Applier_module::kill_pending_transactions");
744
745 //Stop any more transactions from waiting
746 bool already_locked= shared_stop_write_lock->try_grab_write_lock();
747
748 //kill pending transactions
749 blocked_transaction_handler->unblock_waiting_transactions();
750
751 if (!already_locked)
752 shared_stop_write_lock->release_write_lock();
753
754 if (set_read_mode)
755 {
756 if (threaded_sql_session)
757 enable_server_read_mode(PSESSION_INIT_THREAD);
758 else
759 enable_server_read_mode(PSESSION_USE_THREAD);
760 }
761
762 if (view_change_notifier != NULL)
763 {
764 log_message(MY_INFORMATION_LEVEL, "Going to wait for view modification");
765 if (view_change_notifier->wait_for_view_modification())
766 {
767 log_message(MY_ERROR_LEVEL, "On shutdown there was a timeout receiving a "
768 "view change. This can lead to a possible "
769 "inconsistent state. Check the log for more "
770 "details");
771 }
772 }
773
774 /*
775 Only abort() if we successfully asked to leave() the group (and we have
776 group_replication_exit_state_action set to ABORT_SERVER).
777 We don't want to abort() during the execution of START GROUP_REPLICATION or
778 STOP GROUP_REPLICATION.
779 */
780 if (set_read_mode &&
781 exit_state_action_var == EXIT_STATE_ACTION_ABORT_SERVER)
782 {
783 abort_plugin_process("Fatal error during execution of Group Replication");
784 }
785
786 DBUG_VOID_RETURN;
787 }
788
789 int
wait_for_applier_complete_suspension(bool * abort_flag,bool wait_for_execution)790 Applier_module::wait_for_applier_complete_suspension(bool *abort_flag,
791 bool wait_for_execution)
792 {
793 int error= 0;
794
795 mysql_mutex_lock(&suspend_lock);
796
797 /*
798 We use an external flag to avoid race conditions.
799 A local flag could always lead to the scenario of
800 wait_for_applier_complete_suspension()
801
802 >> thread switch
803
804 break_applier_suspension_wait()
805 we_are_waiting = false;
806 awake
807
808 thread switch <<
809
810 we_are_waiting = true;
811 wait();
812 */
813 while (!suspended && !(*abort_flag) && !applier_aborted && !applier_error)
814 {
815 mysql_cond_wait(&suspension_waiting_condition, &suspend_lock);
816 }
817
818 mysql_mutex_unlock(&suspend_lock);
819
820 if (applier_aborted || applier_error)
821 return APPLIER_THREAD_ABORTED; /* purecov: inspected */
822
823 /**
824 Wait for the applier execution of pre suspension events (blocking method)
825 while(the wait method times out)
826 wait()
827 */
828 if (wait_for_execution)
829 {
830 error= APPLIER_GTID_CHECK_TIMEOUT_ERROR; //timeout error
831 while (error == APPLIER_GTID_CHECK_TIMEOUT_ERROR && !(*abort_flag))
832 error= wait_for_applier_event_execution(1, true); //blocking
833 }
834
835 return (error == APPLIER_RELAY_LOG_NOT_INITED);
836 }
837
838 void
interrupt_applier_suspension_wait()839 Applier_module::interrupt_applier_suspension_wait()
840 {
841 mysql_mutex_lock(&suspend_lock);
842 mysql_cond_broadcast(&suspension_waiting_condition);
843 mysql_mutex_unlock(&suspend_lock);
844 }
845
846 bool
is_applier_thread_waiting()847 Applier_module::is_applier_thread_waiting()
848 {
849 DBUG_ENTER("Applier_module::is_applier_thread_waiting");
850 Event_handler* event_applier= NULL;
851 Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
852
853 if (event_applier == NULL)
854 return false; /* purecov: inspected */
855
856 bool result= ((Applier_handler*)event_applier)->is_applier_thread_waiting();
857
858 DBUG_RETURN(result);
859 }
860
861 int
wait_for_applier_event_execution(double timeout,bool check_and_purge_partial_transactions)862 Applier_module::wait_for_applier_event_execution(double timeout,
863 bool check_and_purge_partial_transactions)
864 {
865 DBUG_ENTER("Applier_module::wait_for_applier_event_execution");
866 int error= 0;
867 Event_handler* event_applier= NULL;
868 Event_handler::get_handler_by_role(pipeline, APPLIER, &event_applier);
869
870 if (event_applier &&
871 !(error= ((Applier_handler*)event_applier)->wait_for_gtid_execution(timeout)))
872 {
873 /*
874 After applier thread is done, check if there is partial transaction
875 in the relay log. If so, applier thread must be holding the lock on it
876 and will never release it because there will not be any more events
877 coming into this channel. In this case, purge the relaylogs and restart
878 the applier thread will release the lock and update the applier thread
879 execution position correctly and safely.
880 */
881 if (check_and_purge_partial_transactions &&
882 ((Applier_handler*)event_applier)->is_partial_transaction_on_relay_log())
883 {
884 error= purge_applier_queue_and_restart_applier_module();
885 }
886 }
887 DBUG_RETURN(error);
888 }
889
890
get_certification_handler()891 Certification_handler* Applier_module::get_certification_handler(){
892
893 Event_handler* event_applier= NULL;
894 Event_handler::get_handler_by_role(pipeline, CERTIFIER, &event_applier);
895
896 //The only certification handler for now
897 return (Certification_handler*) event_applier;
898 }
899
900 int
intersect_group_executed_sets(std::vector<std::string> & gtid_sets,Gtid_set * output_set)901 Applier_module::intersect_group_executed_sets(std::vector<std::string>& gtid_sets,
902 Gtid_set* output_set)
903 {
904 Sid_map* sid_map= output_set->get_sid_map();
905
906 std::vector<std::string>::iterator set_iterator;
907 for (set_iterator= gtid_sets.begin();
908 set_iterator!= gtid_sets.end();
909 set_iterator++)
910 {
911
912 Gtid_set member_set(sid_map, NULL);
913 Gtid_set intersection_result(sid_map, NULL);
914
915 std::string exec_set_str= (*set_iterator);
916
917 if (member_set.add_gtid_text(exec_set_str.c_str()) != RETURN_STATUS_OK)
918 {
919 return 1; /* purecov: inspected */
920 }
921
922 if (output_set->is_empty())
923 {
924 if (output_set->add_gtid_set(&member_set))
925 {
926 return 1; /* purecov: inspected */
927 }
928 }
929 else
930 {
931 /*
932 We have three sets:
933 member_set: the one sent from a given member;
934 output_set: the one that contains the intersection of
935 the computed sets until now;
936 intersection_result: the intersection between set and
937 intersection_result.
938 So we compute the intersection between member_set and output_set, and
939 set that value to output_set to be used on the next intersection.
940 */
941 if (member_set.intersection(output_set, &intersection_result) != RETURN_STATUS_OK)
942 {
943 return 1; /* purecov: inspected */
944 }
945
946 output_set->clear();
947 if (output_set->add_gtid_set(&intersection_result) != RETURN_STATUS_OK)
948 {
949 return 1; /* purecov: inspected */
950 }
951 }
952 }
953
954 #if !defined(NDEBUG)
955 char *executed_set_string;
956 output_set->to_string(&executed_set_string);
957 DBUG_PRINT("info", ("View change GTID information: output_set: %s",
958 executed_set_string));
959 my_free(executed_set_string);
960 #endif
961
962 return 0;
963 }
964
check_single_primary_queue_status()965 int Applier_module::check_single_primary_queue_status()
966 {
967 /*
968 If the 1) group is on single primary mode, 2) this member is the
969 primary one, and 3) the group replication applier did apply all
970 previous primary transactions, we can switch off conflict
971 detection since all transactions will originate from the same
972 primary.
973 */
974 if (get_certification_handler()->get_certifier()->is_conflict_detection_enable() &&
975 local_member_info->in_primary_mode() &&
976 local_member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY &&
977 is_applier_thread_waiting())
978 {
979 Single_primary_message
980 single_primary_message(Single_primary_message::SINGLE_PRIMARY_QUEUE_APPLIED_MESSAGE);
981 if (gcs_module->send_message(single_primary_message))
982 {
983 log_message(MY_ERROR_LEVEL,
984 "Error sending single primary message informing "
985 "that primary did apply relay logs"); /* purecov: inspected */
986 return 1; /* purecov: inspected */
987 }
988 }
989
990 return 0;
991 }
992
get_local_pipeline_stats()993 Pipeline_member_stats *Applier_module::get_local_pipeline_stats()
994 {
995 // We need run_lock to get protection against STOP GR command.
996
997 Mutex_autolock auto_lock_mutex(&run_lock);
998 Pipeline_member_stats *stats= NULL;
999 Certification_handler *cert= this->get_certification_handler();
1000 Certifier_interface *cert_module= (cert ? cert->get_certifier() : NULL);
1001 if (cert_module)
1002 {
1003 stats= new Pipeline_member_stats(
1004 get_pipeline_stats_member_collector(), get_message_queue_size(),
1005 cert_module->get_negative_certified(),
1006 cert_module->get_certification_info_size());
1007 {
1008 char *committed_transactions_buf= NULL;
1009 size_t committed_transactions_buf_length= 0;
1010 int outcome= cert_module->get_group_stable_transactions_set_string(
1011 &committed_transactions_buf, &committed_transactions_buf_length);
1012
1013 if (!outcome && committed_transactions_buf_length > 0)
1014 stats->set_transaction_committed_all_members(committed_transactions_buf,
1015 committed_transactions_buf_length);
1016 my_free(committed_transactions_buf);
1017 }
1018 {
1019 std::string last_conflict_free_transaction;
1020 cert_module->get_last_conflict_free_transaction(
1021 &last_conflict_free_transaction);
1022 stats->set_transaction_last_conflict_free(last_conflict_free_transaction);
1023 }
1024 }
1025 else
1026 {
1027 stats= new Pipeline_member_stats(get_pipeline_stats_member_collector(),
1028 get_message_queue_size(), 0, 0);
1029 }
1030 return stats;
1031 }
1032