1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "plugin_server_include.h"
24 #include "recovery_state_transfer.h"
25 #include "plugin_log.h"
26 #include "recovery_channel_state_observer.h"
27 #include "plugin_psi.h"
28 #include "plugin.h"
29 #include <mysql/group_replication_priv.h>
30
31 using std::string;
32
33 Recovery_state_transfer::
Recovery_state_transfer(char * recovery_channel_name,const string & member_uuid,Channel_observation_manager * channel_obsr_mngr)34 Recovery_state_transfer(char* recovery_channel_name,
35 const string& member_uuid,
36 Channel_observation_manager *channel_obsr_mngr)
37 : selected_donor(NULL), group_members(NULL),
38 donor_connection_retry_count(0),
39 recovery_aborted(false), donor_transfer_finished(false),
40 connected_to_donor(false), on_failover(false),
41 donor_connection_interface(recovery_channel_name),
42 channel_observation_manager(channel_obsr_mngr),
43 recovery_channel_observer(NULL),
44 recovery_use_ssl(false), recovery_ssl_verify_server_cert(false),
45 max_connection_attempts_to_donors(0), donor_reconnect_interval(0)
46 {
47 //set the recovery SSL options to 0
48 (void) strncpy(recovery_ssl_ca, "", 1);
49 (void) strncpy(recovery_ssl_capath, "", 1);
50 (void) strncpy(recovery_ssl_cert, "", 1);
51 (void) strncpy(recovery_ssl_cipher, "", 1);
52 (void) strncpy(recovery_ssl_key, "", 1);
53 (void) strncpy(recovery_ssl_crl, "", 1);
54 (void) strncpy(recovery_ssl_crlpath, "", 1);
55
56 this->member_uuid= member_uuid;
57
58 mysql_mutex_init(key_GR_LOCK_recovery, &recovery_lock, MY_MUTEX_INIT_FAST);
59 mysql_cond_init(key_GR_COND_recovery, &recovery_condition);
60 mysql_mutex_init(key_GR_LOCK_recovery_donor_selection,
61 &donor_selection_lock,
62 MY_MUTEX_INIT_FAST);
63
64 recovery_channel_observer= new Recovery_channel_state_observer(this);
65 }
66
~Recovery_state_transfer()67 Recovery_state_transfer::~Recovery_state_transfer()
68 {
69 if (group_members != NULL)
70 {
71 std::vector<Group_member_info*>::iterator member_it= group_members->begin();
72 while (member_it != group_members->end())
73 {
74 delete (*member_it);
75 ++member_it;
76 }
77 }
78 delete group_members;
79 delete recovery_channel_observer;
80 mysql_mutex_destroy(&recovery_lock);
81 mysql_cond_destroy(&recovery_condition);
82 mysql_mutex_destroy(&donor_selection_lock);
83 }
84
initialize(const string & rec_view_id)85 void Recovery_state_transfer::initialize(const string& rec_view_id)
86 {
87 DBUG_ENTER("Recovery_state_transfer::initialize");
88
89 //reset the recovery aborted flag
90 recovery_aborted= false;
91 //reset the donor transfer ending flag
92 donor_transfer_finished= false;
93 //reset the failover flag
94 on_failover= false;
95 //reset the donor channel thread error flag
96 donor_channel_thread_error= false;
97 //reset the retry count
98 donor_connection_retry_count= 0;
99
100 this->view_id.clear();
101 this->view_id.append(rec_view_id);
102
103 DBUG_VOID_RETURN;
104 }
105
inform_of_applier_stop(my_thread_id thread_id,bool aborted)106 void Recovery_state_transfer::inform_of_applier_stop(my_thread_id thread_id,
107 bool aborted)
108 {
109 DBUG_ENTER("Recovery_state_transfer::inform_of_applier_stop");
110
111 /*
112 This method doesn't take any locks as it could lead to dead locks between
113 the connection process and this method that can be invoked in that context.
114 Since this only affects the recovery loop and the flag is reset at each
115 connection, no major concurrency issues should exist.
116 */
117
118 //Act if:
119 if (
120 // we don't have all the data yet
121 !donor_transfer_finished &&
122 // recovery was not aborted
123 !recovery_aborted &&
124 // the signal belongs to the recovery donor channel thread
125 donor_connection_interface.is_own_event_applier(thread_id))
126 {
127 mysql_mutex_lock(&recovery_lock);
128 donor_channel_thread_error = true;
129 mysql_cond_broadcast(&recovery_condition);
130 mysql_mutex_unlock(&recovery_lock);
131 }
132
133 DBUG_VOID_RETURN;
134 }
135
inform_of_receiver_stop(my_thread_id thread_id)136 void Recovery_state_transfer::inform_of_receiver_stop(my_thread_id thread_id)
137 {
138 DBUG_ENTER("Recovery_state_transfer::inform_of_receiver_stop");
139
140 /*
141 This method doesn't take any locks as it could lead to dead locks between
142 the connection process and this method that can be invoked in that context.
143 Since this only affects the recovery loop and the flag is reset at each
144 connection, no major concurrency issues should exist.
145 */
146
147 //Act if:
148 if (!donor_transfer_finished && // we don't have all the data yet
149 !recovery_aborted &&// recovery was not aborted
150 // the signal belongs to the recovery donor channel thread
151 donor_connection_interface.is_own_event_receiver(thread_id))
152 {
153 mysql_mutex_lock(&recovery_lock);
154 donor_channel_thread_error = true;
155 mysql_cond_broadcast(&recovery_condition);
156 mysql_mutex_unlock(&recovery_lock);
157 }
158
159 DBUG_VOID_RETURN;
160 }
161
initialize_group_info()162 void Recovery_state_transfer::initialize_group_info()
163 {
164 DBUG_ENTER("Recovery_state_transfer::initialize_group_info");
165
166 selected_donor= NULL;
167 //Update the group member info
168 mysql_mutex_lock(&donor_selection_lock);
169 update_group_membership(false);
170 mysql_mutex_unlock(&donor_selection_lock);
171
172 DBUG_VOID_RETURN;
173 }
174
update_group_membership(bool update_donor)175 void Recovery_state_transfer::update_group_membership(bool update_donor)
176 {
177 DBUG_ENTER("Recovery_state_transfer::update_group_membership");
178
179 #ifndef NDEBUG
180 mysql_mutex_assert_owner(&donor_selection_lock);
181 #endif
182
183 // if needed update the reference to the donor member
184 string donor_uuid;
185 if (selected_donor != NULL && update_donor)
186 {
187 donor_uuid.assign(selected_donor->get_uuid());
188 }
189
190 if (group_members != NULL)
191 {
192 std::vector<Group_member_info*>::iterator member_it= group_members->begin();
193 while (member_it != group_members->end())
194 {
195 delete (*member_it);
196 ++member_it;
197 }
198 }
199 delete group_members;
200
201 group_members= group_member_mgr->get_all_members();
202
203 //When updating the member list, also rebuild the suitable donor list
204 build_donor_list(&donor_uuid);
205
206 DBUG_VOID_RETURN;
207 }
208
abort_state_transfer()209 void Recovery_state_transfer::abort_state_transfer()
210 {
211 DBUG_ENTER("Recovery_state_transfer::abort_state_transfer");
212
213 //Break the wait for view change event
214 mysql_mutex_lock(&recovery_lock);
215 recovery_aborted= true;
216 mysql_cond_broadcast(&recovery_condition);
217 mysql_mutex_unlock(&recovery_lock);
218
219 DBUG_VOID_RETURN;
220 }
221
update_recovery_process(bool did_members_left)222 int Recovery_state_transfer::update_recovery_process(bool did_members_left)
223 {
224 DBUG_ENTER("Recovery_state_transfer::update_recovery_process");
225 int error= 0;
226
227 /*
228 Lock to avoid concurrency between this code that handles failover and
229 the establish_donor_connection method. We either:
230 1) lock first and see that the method did not run yet, updating the list
231 of group members that will be used there.
232 2) lock after the method executed, and if the selected donor is leaving
233 we stop the connection thread and select a new one.
234 */
235 mysql_mutex_lock(&donor_selection_lock);
236
237 bool donor_left= false;
238 string current_donor_uuid;
239 string current_donor_hostname;
240 uint current_donor_port= 0;
241 /*
242 The selected donor can be NULL if:
243 * The donor was not yet chosen
244 or
245 * Was deleted in a previous group updated, but there was no need to
246 select a new one since as the data transfer is finished
247 */
248 if (selected_donor != NULL && did_members_left)
249 {
250 current_donor_uuid.assign(selected_donor->get_uuid());
251 current_donor_hostname.assign(selected_donor->get_hostname());
252 current_donor_port = selected_donor->get_port();
253 Group_member_info* current_donor=
254 group_member_mgr->get_group_member_info(current_donor_uuid);
255 donor_left= (current_donor == NULL);
256 delete current_donor;
257 }
258
259 /*
260 Get updated information about the new group members.
261 */
262 update_group_membership(!donor_left);
263
264 /*
265 It makes sense to cut our connection to the donor if:
266 1) The donor has left the building
267 and
268 2) We are already connected to him.
269 */
270 if (donor_left)
271 {
272 //The selected donor no longer holds a meaning after deleting the group
273 selected_donor= NULL;
274 if (connected_to_donor)
275 {
276 /*
277 The donor_transfer_finished flag is not lock protected on the recovery
278 thread so we have the scenarios.
279 1) The flag is true and we do nothing
280 2) The flag is false and remains false so we restart the connection, and
281 that new connection will deliver the rest of the data
282 3) The flag turns true while we are restarting the connection. In this
283 case we will probably create a new connection that won't be needed and
284 will be terminated the instant the lock is freed.
285 */
286 if (!donor_transfer_finished)
287 {
288 log_message(MY_INFORMATION_LEVEL,
289 "The member with address %s:%u has unexpectedly disappeared,"
290 " killing the current group replication recovery connection",
291 current_donor_hostname.c_str(), current_donor_port);
292
293 //Awake the recovery loop to connect to another donor
294 donor_failover();
295 }//else do nothing
296 }
297 }
298 mysql_mutex_unlock(&donor_selection_lock);
299
300 DBUG_RETURN(error);
301 }
302
303 void
end_state_transfer()304 Recovery_state_transfer::end_state_transfer()
305 {
306 DBUG_ENTER("Recovery_state_transfer::end_state_transfer");
307
308 mysql_mutex_lock(&recovery_lock);
309 donor_transfer_finished= true;
310 mysql_cond_broadcast(&recovery_condition);
311 mysql_mutex_unlock(&recovery_lock);
312
313 DBUG_VOID_RETURN;
314 }
315
donor_failover()316 void Recovery_state_transfer::donor_failover()
317 {
318 DBUG_ENTER("Recovery_state_transfer::donor_failover");
319
320 //Awake the recovery process so it can loop again to connect to another donor
321 mysql_mutex_lock(&recovery_lock);
322 on_failover= true;
323 mysql_cond_broadcast(&recovery_condition);
324 mysql_mutex_unlock(&recovery_lock);
325
326 DBUG_VOID_RETURN;
327 }
328
329 int
check_recovery_thread_status()330 Recovery_state_transfer::check_recovery_thread_status()
331 {
332 DBUG_ENTER("Recovery_state_transfer::check_recovery_thread_status");
333
334 //if some of the threads are running
335 if (donor_connection_interface.is_receiver_thread_running() ||
336 donor_connection_interface.is_applier_thread_running())
337 {
338 return terminate_recovery_slave_threads(); /* purecov: inspected */
339 }
340 DBUG_RETURN(0);
341 }
342
is_own_event_channel(my_thread_id id)343 bool Recovery_state_transfer::is_own_event_channel(my_thread_id id)
344 {
345 DBUG_ENTER("Recovery_state_transfer::is_own_event_channel");
346 DBUG_RETURN(donor_connection_interface.is_own_event_applier(id));
347 }
348
build_donor_list(string * selected_donor_uuid)349 void Recovery_state_transfer::build_donor_list(string* selected_donor_uuid)
350 {
351 DBUG_ENTER("Recovery_state_transfer::build_donor_list");
352
353 suitable_donors.clear();
354
355 std::vector<Group_member_info*>::iterator member_it= group_members->begin();
356
357 while (member_it != group_members->end())
358 {
359 Group_member_info* member= *member_it;
360 //is online and it's not me
361 string m_uuid= member->get_uuid();
362 bool is_online= member->get_recovery_status() ==
363 Group_member_info::MEMBER_ONLINE;
364 bool not_self= m_uuid.compare(member_uuid);
365
366 if (is_online && not_self)
367 {
368 suitable_donors.push_back(member);
369 }
370
371 //if requested, and if the donor is still in the group, update its reference
372 if (selected_donor_uuid != NULL && !m_uuid.compare(*selected_donor_uuid))
373 {
374 selected_donor= member;
375 }
376
377 ++member_it;
378 }
379
380 if (suitable_donors.size() > 1)
381 {
382 std::random_shuffle(suitable_donors.begin(), suitable_donors.end());
383 }
384
385 //no need for errors if no donors exist, we thrown it in the connection method.
386 DBUG_VOID_RETURN;
387 }
388
establish_donor_connection()389 int Recovery_state_transfer::establish_donor_connection()
390 {
391 DBUG_ENTER("Recovery_state_transfer::establish_donor_connection");
392
393 int error= -1;
394 connected_to_donor= false;
395
396 while (error != 0 && !recovery_aborted)
397 {
398 mysql_mutex_lock(&donor_selection_lock);
399
400 DBUG_EXECUTE_IF("gr_reset_max_connection_attempts_to_donors", {
401 if (donor_connection_retry_count == 3) {
402 const char act[] =
403 "now signal signal.connection_attempt_3 wait_for "
404 "signal.reset_recovery_retry_count_done";
405 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
406 }
407 };);
408 // max number of retries reached, abort
409 if (donor_connection_retry_count >= max_connection_attempts_to_donors)
410 {
411 log_message(MY_ERROR_LEVEL,
412 "Maximum number of retries when trying to "
413 "connect to a donor reached. "
414 "Aborting group replication recovery.");
415 mysql_mutex_unlock(&donor_selection_lock);
416 DBUG_RETURN(error);
417 }
418
419 if (group_member_mgr->get_number_of_members() == 1)
420 {
421 log_message(MY_ERROR_LEVEL,
422 "All donors left. Aborting group replication recovery.");
423 mysql_mutex_unlock(&donor_selection_lock);
424 DBUG_RETURN(error);
425 }
426
427 if(donor_connection_retry_count == 0)
428 {
429 log_message(MY_INFORMATION_LEVEL,
430 "Establishing group recovery connection with a possible donor."
431 " Attempt %d/%d",
432 donor_connection_retry_count + 1,
433 max_connection_attempts_to_donors);
434 }
435 else
436 {
437 log_message(MY_INFORMATION_LEVEL,
438 "Retrying group recovery connection with another donor. "
439 "Attempt %d/%d",
440 donor_connection_retry_count + 1,
441 max_connection_attempts_to_donors);
442 }
443
444 //Rebuild the list, if empty
445 if (suitable_donors.empty())
446 {
447 mysql_mutex_unlock(&donor_selection_lock);
448
449 struct timespec abstime;
450 set_timespec(&abstime, donor_reconnect_interval);
451
452 mysql_mutex_lock(&recovery_lock);
453 mysql_cond_timedwait(&recovery_condition,
454 &recovery_lock, &abstime);
455 mysql_mutex_unlock(&recovery_lock);
456
457 mysql_mutex_lock(&donor_selection_lock);
458
459 build_donor_list(NULL);
460 if (suitable_donors.empty())
461 {
462 log_message(MY_INFORMATION_LEVEL,
463 "No valid donors exist in the group, retrying");
464 donor_connection_retry_count++;
465 mysql_mutex_unlock(&donor_selection_lock);
466 continue;
467 }
468 }
469
470 donor_channel_thread_error= false;
471
472 //Get the last element and delete it
473 selected_donor= suitable_donors.back();
474 suitable_donors.pop_back();
475 //increment the number of tries
476 donor_connection_retry_count++;
477
478 if ((error= initialize_donor_connection()))
479 {
480 log_message(MY_ERROR_LEVEL,
481 "Error when configuring the group recovery"
482 " connection to the donor."); /* purecov: inspected */
483 }
484
485 if (!error && !recovery_aborted)
486 {
487 error= start_recovery_donor_threads();
488 }
489
490 if (!error)
491 {
492 connected_to_donor = true;
493 //if were on failover, now we are again connected to a valid server.
494 on_failover= false;
495 }
496
497 mysql_mutex_unlock(&donor_selection_lock);
498
499 /*
500 sleep so other method (recovery) can get some time
501 to grab the lock and update the group.
502 */
503 my_sleep(100);
504 }
505
506 DBUG_RETURN(error);
507 }
508
initialize_donor_connection()509 int Recovery_state_transfer::initialize_donor_connection()
510 {
511 DBUG_ENTER("Recovery_state_transfer::initialize_donor_connection");
512
513 int error= 0;
514
515 donor_connection_interface.purge_logs(false);
516
517 char* hostname= const_cast<char*>(selected_donor->get_hostname().c_str());
518 uint port= selected_donor->get_port();
519
520 error= donor_connection_interface.initialize_channel(hostname, port,
521 NULL, NULL,
522 recovery_use_ssl,
523 recovery_ssl_ca,
524 recovery_ssl_capath,
525 recovery_ssl_cert,
526 recovery_ssl_cipher,
527 recovery_ssl_key,
528 recovery_ssl_crl,
529 recovery_ssl_crlpath,
530 recovery_ssl_verify_server_cert,
531 DEFAULT_THREAD_PRIORITY,
532 1, false, true, true);
533
534 if (!error)
535 {
536 log_message(MY_INFORMATION_LEVEL,
537 "Establishing connection to a group replication recovery donor"
538 " %s at %s port: %d.",
539 selected_donor->get_uuid().c_str(),
540 hostname,
541 port);
542 }
543 else
544 {
545 log_message(MY_ERROR_LEVEL,
546 "Error while creating the group replication recovery channel "
547 "with donor %s at %s port: %d.",
548 selected_donor->get_uuid().c_str(),
549 hostname,
550 port); /* purecov: inspected */
551 }
552
553 DBUG_RETURN(error);
554 }
555
start_recovery_donor_threads()556 int Recovery_state_transfer::start_recovery_donor_threads()
557 {
558 DBUG_ENTER("Recovery_state_transfer::start_recovery_donor_threads");
559
560 int error= donor_connection_interface.start_threads(true, true,
561 &view_id, true);
562
563 if(!error)
564 {
565 DBUG_EXECUTE_IF("pause_after_io_thread_stop_hook",
566 {
567 const char act[]= "now "
568 "WAIT_FOR reached_stopping_io_thread";
569 assert(!debug_sync_set_action(current_thd,
570 STRING_WITH_LEN(act)));
571 };);
572 DBUG_EXECUTE_IF("pause_after_sql_thread_stop_hook",
573 {
574 const char act[]= "now "
575 "WAIT_FOR reached_stopping_sql_thread";
576 assert(!debug_sync_set_action(current_thd,
577 STRING_WITH_LEN(act)));
578 };);
579
580 /*
581 Register a channel observer to detect SQL/IO thread stops
582 This is not done before the start as the hooks in place verify the
583 stopping thread id and that can lead to deadlocks with start itself.
584 */
585 channel_observation_manager
586 ->register_channel_observer(recovery_channel_observer);
587 }
588
589 /*
590 We should unregister the observer and error out if the threads are stopping
591 or have stopped while the observer was being registered and the state
592 transfer is not yet completed.
593 */
594 bool is_receiver_stopping=
595 donor_connection_interface.is_receiver_thread_stopping();
596 bool is_receiver_stopped=
597 !donor_connection_interface.is_receiver_thread_running();
598 bool is_applier_stopping=
599 donor_connection_interface.is_applier_thread_stopping();
600 bool is_applier_stopped=
601 !donor_connection_interface.is_applier_thread_running();
602
603 if (!error && !donor_transfer_finished &&
604 (is_receiver_stopping || is_receiver_stopped ||
605 is_applier_stopping || is_applier_stopped))
606 {
607 error= 1;
608 channel_observation_manager
609 ->unregister_channel_observer(recovery_channel_observer);
610 /*
611 At this point, at least one of the threads are about to stop (if it
612 didn't stopped yet).
613
614 During retry attempts, we will:
615 a) reconfigure the receiver thread to point to a new donor;
616 b) start all thread channels;
617
618 In order to not fail while doing (a) we must forcefully stop the
619 receiver thread if it didn't stopped yet, or else the reconfiguration
620 process will fail.
621 */
622 if ((is_applier_stopping || is_applier_stopped) &&
623 !(is_receiver_stopping || is_receiver_stopped))
624 donor_connection_interface.stop_threads(true /* receiver */,
625 false /* applier */);
626 }
627
628 DBUG_EXECUTE_IF("pause_after_io_thread_stop_hook",
629 {
630 const char act[]= "now SIGNAL continue_to_stop_io_thread";
631 assert(!debug_sync_set_action(current_thd,
632 STRING_WITH_LEN(act)));
633 };);
634 DBUG_EXECUTE_IF("pause_after_sql_thread_stop_hook",
635 {
636 const char act[]= "now SIGNAL continue_to_stop_sql_thread";
637 assert(!debug_sync_set_action(current_thd,
638 STRING_WITH_LEN(act)));
639 };);
640
641 if (error)
642 {
643 if (error == RPL_CHANNEL_SERVICE_RECEIVER_CONNECTION_ERROR)
644 {
645 log_message(MY_ERROR_LEVEL,
646 "There was an error when connecting to the donor server. "
647 "Please check that group_replication_recovery channel "
648 "credentials and all MEMBER_HOST column values of "
649 "performance_schema.replication_group_members table are "
650 "correct and DNS resolvable.");
651 log_message(MY_ERROR_LEVEL,
652 "For details please check "
653 "performance_schema.replication_connection_status table "
654 "and error log messages of Slave I/O for channel "
655 "group_replication_recovery.");
656 }
657 else
658 {
659 log_message(MY_ERROR_LEVEL,
660 "Error while starting the group replication recovery "
661 "receiver/applier threads");
662 }
663 }
664
665 DBUG_RETURN(error);
666 }
667
terminate_recovery_slave_threads(bool purge_logs)668 int Recovery_state_transfer::terminate_recovery_slave_threads(bool purge_logs)
669 {
670 DBUG_ENTER("Recovery_state_transfer::terminate_recovery_slave_threads");
671
672 log_message(MY_INFORMATION_LEVEL,
673 "Terminating existing group replication donor connection "
674 "and purging the corresponding logs.");
675
676 int error= 0;
677
678 //If the threads never started, the method just returns
679 if ((error= donor_connection_interface.stop_threads(true, true)))
680 {
681 log_message(MY_ERROR_LEVEL,
682 "Error when stopping the group replication recovery's donor"
683 " connection"); /* purecov: inspected */
684 }
685 else
686 {
687 if (purge_logs)
688 {
689 //If there is no repository in place nothing happens
690 error= purge_recovery_slave_threads_repos();
691 }
692 }
693
694 DBUG_RETURN(error);
695 }
696
purge_recovery_slave_threads_repos()697 int Recovery_state_transfer::purge_recovery_slave_threads_repos()
698 {
699 DBUG_ENTER("Recovery_state_transfer::purge_recovery_slave_threads_repos");
700
701 int error= 0;
702 if ((error = donor_connection_interface.purge_logs(false)))
703 {
704 /* purecov: begin inspected */
705 log_message(MY_ERROR_LEVEL,
706 "Error when purging the group replication recovery's relay logs");
707 DBUG_RETURN(error);
708 /* purecov: end */
709 }
710 error=
711 donor_connection_interface.initialize_channel(const_cast<char*>("<NULL>"),
712 0,
713 NULL, NULL,
714 NULL,
715 NULL,
716 NULL,
717 NULL,
718 NULL,
719 NULL,
720 NULL,
721 NULL,
722 NULL,
723 DEFAULT_THREAD_PRIORITY,
724 1, false, true, true);
725
726 DBUG_RETURN(error);
727 }
728
729
state_transfer(THD * recovery_thd)730 int Recovery_state_transfer::state_transfer(THD *recovery_thd)
731 {
732 DBUG_ENTER("Recovery_state_transfer::state_transfer");
733
734 int error= 0;
735
736 while (!donor_transfer_finished && !recovery_aborted)
737 {
738 /*
739 If an applier error happened: stop the slave threads.
740 We do not purge logs or reset channel configuration to
741 preserve the error information on performance schema
742 tables until the next recovery attempt.
743 Recovery_state_transfer::initialize_donor_connection() will
744 take care of that.
745 */
746 if (donor_channel_thread_error)
747 {
748 //Unsubscribe the listener until it connects again.
749 channel_observation_manager
750 ->unregister_channel_observer(recovery_channel_observer);
751
752 if ((error= terminate_recovery_slave_threads(false)))
753 {
754 /* purecov: begin inspected */
755 log_message(MY_ERROR_LEVEL,
756 "Can't kill the current group replication recovery donor"
757 " connection after an applier error."
758 " Recovery will shutdown.");
759 //if we can't stop, abort recovery
760 DBUG_RETURN(error);
761 /* purecov: end */
762 }
763 }
764
765 //If the donor left, just terminate the threads with no log purging
766 if (on_failover)
767 {
768 //Unsubscribe the listener until it connects again.
769 channel_observation_manager
770 ->unregister_channel_observer(recovery_channel_observer);
771
772 //Stop the threads before reconfiguring the connection
773 if ((error= donor_connection_interface.stop_threads(true, true)))
774 {
775 /* purecov: begin inspected */
776 log_message(MY_ERROR_LEVEL,
777 "Can't kill the current group replication recovery donor"
778 " connection during failover. Recovery will shutdown.");
779 //if we can't stop, abort recovery
780 DBUG_RETURN(error);
781 /* purecov: end */
782 }
783 }
784
785 #ifndef _WIN32
786 THD_STAGE_INFO(recovery_thd, stage_connecting_to_master);
787 #endif
788
789 if (!recovery_aborted)
790 {
791 //if the connection to the donor failed, abort recovery
792 if ((error = establish_donor_connection()))
793 {
794 break;
795 }
796 }
797
798 #ifndef _WIN32
799 THD_STAGE_INFO(recovery_thd, stage_executing);
800 #endif
801
802 /*
803 donor_transfer_finished -> set by the set_retrieved_cert_info method.
804 lock: recovery_lock
805 recovery_aborted -> set when stopping recovery
806 lock: run_lock
807 on_failover -> set to true on update_recovery_process.
808 set to false when connected to a valid donor
809 lock: donor_selection_lock
810 donor_channel_thread_error -> set to true on inform_of_applier_stop or
811 inform_of_receiver_stop.
812 set to false before connecting to any donor
813 lock: donor_selection_lock
814 */
815 mysql_mutex_lock(&recovery_lock);
816 while (!donor_transfer_finished && !recovery_aborted &&
817 !on_failover && !donor_channel_thread_error)
818 {
819 mysql_cond_wait(&recovery_condition, &recovery_lock);
820 }
821 mysql_mutex_unlock(&recovery_lock);
822 }//if the current connection was terminated, connect again
823
824 channel_observation_manager
825 ->unregister_channel_observer(recovery_channel_observer);
826 // do not purge logs if an error occur, keep the diagnose on SLAVE STATUS
827 terminate_recovery_slave_threads(!error);
828 connected_to_donor= false;
829
830 DBUG_RETURN(error);
831 }
832