1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "plugin_server_include.h"
24 #include "recovery_state_transfer.h"
25 #include "plugin_log.h"
26 #include "recovery_channel_state_observer.h"
27 #include "plugin_psi.h"
28 #include "plugin.h"
29 #include <mysql/group_replication_priv.h>
30 
31 using std::string;
32 
33 Recovery_state_transfer::
Recovery_state_transfer(char * recovery_channel_name,const string & member_uuid,Channel_observation_manager * channel_obsr_mngr)34 Recovery_state_transfer(char* recovery_channel_name,
35                         const string& member_uuid,
36                         Channel_observation_manager *channel_obsr_mngr)
37   : selected_donor(NULL), group_members(NULL),
38     donor_connection_retry_count(0),
39     recovery_aborted(false), donor_transfer_finished(false),
40     connected_to_donor(false), on_failover(false),
41     donor_connection_interface(recovery_channel_name),
42     channel_observation_manager(channel_obsr_mngr),
43     recovery_channel_observer(NULL),
44     recovery_use_ssl(false), recovery_ssl_verify_server_cert(false),
45     max_connection_attempts_to_donors(0), donor_reconnect_interval(0)
46 {
47   //set the recovery SSL options to 0
48   (void) strncpy(recovery_ssl_ca, "", 1);
49   (void) strncpy(recovery_ssl_capath, "", 1);
50   (void) strncpy(recovery_ssl_cert, "", 1);
51   (void) strncpy(recovery_ssl_cipher, "", 1);
52   (void) strncpy(recovery_ssl_key, "", 1);
53   (void) strncpy(recovery_ssl_crl, "", 1);
54   (void) strncpy(recovery_ssl_crlpath, "", 1);
55 
56   this->member_uuid= member_uuid;
57 
58   mysql_mutex_init(key_GR_LOCK_recovery, &recovery_lock, MY_MUTEX_INIT_FAST);
59   mysql_cond_init(key_GR_COND_recovery, &recovery_condition);
60   mysql_mutex_init(key_GR_LOCK_recovery_donor_selection,
61                    &donor_selection_lock,
62                    MY_MUTEX_INIT_FAST);
63 
64   recovery_channel_observer= new Recovery_channel_state_observer(this);
65 }
66 
~Recovery_state_transfer()67 Recovery_state_transfer::~Recovery_state_transfer()
68 {
69   if (group_members != NULL)
70   {
71     std::vector<Group_member_info*>::iterator member_it= group_members->begin();
72     while (member_it != group_members->end())
73     {
74         delete (*member_it);
75         ++member_it;
76     }
77   }
78   delete group_members;
79   delete recovery_channel_observer;
80   mysql_mutex_destroy(&recovery_lock);
81   mysql_cond_destroy(&recovery_condition);
82   mysql_mutex_destroy(&donor_selection_lock);
83 }
84 
initialize(const string & rec_view_id)85 void Recovery_state_transfer::initialize(const string& rec_view_id)
86 {
87   DBUG_ENTER("Recovery_state_transfer::initialize");
88 
89   //reset the recovery aborted flag
90   recovery_aborted= false;
91   //reset the donor transfer ending flag
92   donor_transfer_finished= false;
93   //reset the failover flag
94   on_failover= false;
95   //reset the donor channel thread error flag
96   donor_channel_thread_error= false;
97   //reset the retry count
98   donor_connection_retry_count= 0;
99 
100   this->view_id.clear();
101   this->view_id.append(rec_view_id);
102 
103   DBUG_VOID_RETURN;
104 }
105 
inform_of_applier_stop(my_thread_id thread_id,bool aborted)106 void Recovery_state_transfer::inform_of_applier_stop(my_thread_id thread_id,
107                                                      bool aborted)
108 {
109   DBUG_ENTER("Recovery_state_transfer::inform_of_applier_stop");
110 
111   /*
112     This method doesn't take any locks as it could lead to dead locks between
113     the connection process and this method that can be invoked in that context.
114     Since this only affects the recovery loop and the flag is reset at each
115     connection, no major concurrency issues should exist.
116   */
117 
118   //Act if:
119   if (
120     // we don't have all the data yet
121       !donor_transfer_finished &&
122       // recovery was not aborted
123       !recovery_aborted &&
124       // the signal belongs to the recovery donor channel thread
125       donor_connection_interface.is_own_event_applier(thread_id))
126   {
127     mysql_mutex_lock(&recovery_lock);
128     donor_channel_thread_error = true;
129     mysql_cond_broadcast(&recovery_condition);
130     mysql_mutex_unlock(&recovery_lock);
131   }
132 
133   DBUG_VOID_RETURN;
134 }
135 
inform_of_receiver_stop(my_thread_id thread_id)136 void Recovery_state_transfer::inform_of_receiver_stop(my_thread_id thread_id)
137 {
138   DBUG_ENTER("Recovery_state_transfer::inform_of_receiver_stop");
139 
140   /*
141     This method doesn't take any locks as it could lead to dead locks between
142     the connection process and this method that can be invoked in that context.
143     Since this only affects the recovery loop and the flag is reset at each
144     connection, no major concurrency issues should exist.
145   */
146 
147   //Act if:
148   if (!donor_transfer_finished && // we don't have all the data yet
149       !recovery_aborted  &&// recovery was not aborted
150       // the signal belongs to the recovery donor channel thread
151       donor_connection_interface.is_own_event_receiver(thread_id))
152   {
153     mysql_mutex_lock(&recovery_lock);
154     donor_channel_thread_error = true;
155     mysql_cond_broadcast(&recovery_condition);
156     mysql_mutex_unlock(&recovery_lock);
157   }
158 
159   DBUG_VOID_RETURN;
160 }
161 
initialize_group_info()162 void Recovery_state_transfer::initialize_group_info()
163 {
164   DBUG_ENTER("Recovery_state_transfer::initialize_group_info");
165 
166   selected_donor= NULL;
167   //Update the group member info
168   mysql_mutex_lock(&donor_selection_lock);
169   update_group_membership(false);
170   mysql_mutex_unlock(&donor_selection_lock);
171 
172   DBUG_VOID_RETURN;
173 }
174 
update_group_membership(bool update_donor)175 void Recovery_state_transfer::update_group_membership(bool update_donor)
176 {
177   DBUG_ENTER("Recovery_state_transfer::update_group_membership");
178 
179 #ifndef NDEBUG
180   mysql_mutex_assert_owner(&donor_selection_lock);
181 #endif
182 
183   // if needed update the reference to the donor member
184   string donor_uuid;
185   if (selected_donor != NULL && update_donor)
186   {
187     donor_uuid.assign(selected_donor->get_uuid());
188   }
189 
190   if (group_members != NULL)
191   {
192     std::vector<Group_member_info*>::iterator member_it= group_members->begin();
193     while (member_it != group_members->end())
194     {
195       delete (*member_it);
196       ++member_it;
197     }
198   }
199   delete group_members;
200 
201   group_members= group_member_mgr->get_all_members();
202 
203   //When updating the member list, also rebuild the suitable donor list
204   build_donor_list(&donor_uuid);
205 
206   DBUG_VOID_RETURN;
207 }
208 
abort_state_transfer()209 void Recovery_state_transfer::abort_state_transfer()
210 {
211   DBUG_ENTER("Recovery_state_transfer::abort_state_transfer");
212 
213   //Break the wait for view change event
214   mysql_mutex_lock(&recovery_lock);
215   recovery_aborted= true;
216   mysql_cond_broadcast(&recovery_condition);
217   mysql_mutex_unlock(&recovery_lock);
218 
219   DBUG_VOID_RETURN;
220 }
221 
update_recovery_process(bool did_members_left)222 int Recovery_state_transfer::update_recovery_process(bool did_members_left)
223 {
224   DBUG_ENTER("Recovery_state_transfer::update_recovery_process");
225   int error= 0;
226 
227   /*
228     Lock to avoid concurrency between this code that handles failover and
229     the establish_donor_connection method. We either:
230     1) lock first and see that the method did not run yet, updating the list
231        of group members that will be used there.
232     2) lock after the method executed, and if the selected donor is leaving
233        we stop the connection thread and select a new one.
234   */
235   mysql_mutex_lock(&donor_selection_lock);
236 
237   bool donor_left= false;
238   string current_donor_uuid;
239   string current_donor_hostname;
240   uint current_donor_port= 0;
241   /*
242     The selected donor can be NULL if:
243     * The donor was not yet chosen
244      or
245     * Was deleted in a previous group updated, but there was no need to
246       select a new one since as the data transfer is finished
247   */
248   if (selected_donor != NULL && did_members_left)
249   {
250     current_donor_uuid.assign(selected_donor->get_uuid());
251     current_donor_hostname.assign(selected_donor->get_hostname());
252     current_donor_port = selected_donor->get_port();
253     Group_member_info* current_donor=
254         group_member_mgr->get_group_member_info(current_donor_uuid);
255     donor_left= (current_donor == NULL);
256     delete current_donor;
257   }
258 
259   /*
260     Get updated information about the new group members.
261   */
262   update_group_membership(!donor_left);
263 
264   /*
265     It makes sense to cut our connection to the donor if:
266     1) The donor has left the building
267     and
268     2) We are already connected to him.
269   */
270   if (donor_left)
271   {
272     //The selected donor no longer holds a meaning after deleting the group
273     selected_donor= NULL;
274     if (connected_to_donor)
275     {
276       /*
277        The donor_transfer_finished flag is not lock protected on the recovery
278        thread so we have the scenarios.
279        1) The flag is true and we do nothing
280        2) The flag is false and remains false so we restart the connection, and
281        that new connection will deliver the rest of the data
282        3) The flag turns true while we are restarting the connection. In this
283        case we will probably create a new connection that won't be needed and
284        will be terminated the instant the lock is freed.
285       */
286       if (!donor_transfer_finished)
287       {
288         log_message(MY_INFORMATION_LEVEL,
289                     "The member with address %s:%u has unexpectedly disappeared,"
290                     " killing the current group replication recovery connection",
291                     current_donor_hostname.c_str(), current_donor_port);
292 
293         //Awake the recovery loop to connect to another donor
294         donor_failover();
295       }//else do nothing
296     }
297   }
298   mysql_mutex_unlock(&donor_selection_lock);
299 
300   DBUG_RETURN(error);
301 }
302 
303 void
end_state_transfer()304 Recovery_state_transfer::end_state_transfer()
305 {
306   DBUG_ENTER("Recovery_state_transfer::end_state_transfer");
307 
308   mysql_mutex_lock(&recovery_lock);
309   donor_transfer_finished= true;
310   mysql_cond_broadcast(&recovery_condition);
311   mysql_mutex_unlock(&recovery_lock);
312 
313   DBUG_VOID_RETURN;
314 }
315 
donor_failover()316 void Recovery_state_transfer::donor_failover()
317 {
318   DBUG_ENTER("Recovery_state_transfer::donor_failover");
319 
320   //Awake the recovery process so it can loop again to connect to another donor
321   mysql_mutex_lock(&recovery_lock);
322   on_failover= true;
323   mysql_cond_broadcast(&recovery_condition);
324   mysql_mutex_unlock(&recovery_lock);
325 
326   DBUG_VOID_RETURN;
327 }
328 
329 int
check_recovery_thread_status()330 Recovery_state_transfer::check_recovery_thread_status()
331 {
332   DBUG_ENTER("Recovery_state_transfer::check_recovery_thread_status");
333 
334   //if some of the threads are running
335   if (donor_connection_interface.is_receiver_thread_running() ||
336       donor_connection_interface.is_applier_thread_running())
337   {
338     return terminate_recovery_slave_threads(); /* purecov: inspected */
339   }
340   DBUG_RETURN(0);
341 }
342 
is_own_event_channel(my_thread_id id)343 bool Recovery_state_transfer::is_own_event_channel(my_thread_id id)
344 {
345   DBUG_ENTER("Recovery_state_transfer::is_own_event_channel");
346   DBUG_RETURN(donor_connection_interface.is_own_event_applier(id));
347 }
348 
build_donor_list(string * selected_donor_uuid)349 void Recovery_state_transfer::build_donor_list(string* selected_donor_uuid)
350 {
351   DBUG_ENTER("Recovery_state_transfer::build_donor_list");
352 
353   suitable_donors.clear();
354 
355   std::vector<Group_member_info*>::iterator member_it= group_members->begin();
356 
357   while (member_it != group_members->end())
358   {
359     Group_member_info* member= *member_it;
360     //is online and it's not me
361     string m_uuid= member->get_uuid();
362     bool is_online= member->get_recovery_status() ==
363         Group_member_info::MEMBER_ONLINE;
364     bool not_self= m_uuid.compare(member_uuid);
365 
366     if (is_online && not_self)
367     {
368       suitable_donors.push_back(member);
369     }
370 
371     //if requested, and if the donor is still in the group, update its reference
372     if (selected_donor_uuid != NULL && !m_uuid.compare(*selected_donor_uuid))
373     {
374       selected_donor= member;
375     }
376 
377     ++member_it;
378   }
379 
380   if (suitable_donors.size() > 1)
381   {
382     std::random_shuffle(suitable_donors.begin(), suitable_donors.end());
383   }
384 
385   //no need for errors if no donors exist, we thrown it in the connection method.
386   DBUG_VOID_RETURN;
387 }
388 
establish_donor_connection()389 int Recovery_state_transfer::establish_donor_connection()
390 {
391   DBUG_ENTER("Recovery_state_transfer::establish_donor_connection");
392 
393   int error= -1;
394   connected_to_donor= false;
395 
396   while (error != 0 && !recovery_aborted)
397   {
398     mysql_mutex_lock(&donor_selection_lock);
399 
400     DBUG_EXECUTE_IF("gr_reset_max_connection_attempts_to_donors", {
401       if (donor_connection_retry_count == 3) {
402         const char act[] =
403             "now signal signal.connection_attempt_3 wait_for "
404             "signal.reset_recovery_retry_count_done";
405         assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
406       }
407     };);
408     // max number of retries reached, abort
409     if (donor_connection_retry_count >= max_connection_attempts_to_donors)
410     {
411       log_message(MY_ERROR_LEVEL,
412                   "Maximum number of retries when trying to "
413                   "connect to a donor reached. "
414                   "Aborting group replication recovery.");
415       mysql_mutex_unlock(&donor_selection_lock);
416       DBUG_RETURN(error);
417     }
418 
419     if (group_member_mgr->get_number_of_members() == 1)
420     {
421       log_message(MY_ERROR_LEVEL,
422                   "All donors left. Aborting group replication recovery.");
423       mysql_mutex_unlock(&donor_selection_lock);
424       DBUG_RETURN(error);
425     }
426 
427     if(donor_connection_retry_count == 0)
428     {
429       log_message(MY_INFORMATION_LEVEL,
430                   "Establishing group recovery connection with a possible donor."
431                   " Attempt %d/%d",
432                   donor_connection_retry_count + 1,
433                   max_connection_attempts_to_donors);
434     }
435     else
436     {
437       log_message(MY_INFORMATION_LEVEL,
438                   "Retrying group recovery connection with another donor. "
439                   "Attempt %d/%d",
440                   donor_connection_retry_count + 1,
441                   max_connection_attempts_to_donors);
442     }
443 
444     //Rebuild the list, if empty
445     if (suitable_donors.empty())
446     {
447       mysql_mutex_unlock(&donor_selection_lock);
448 
449       struct timespec abstime;
450       set_timespec(&abstime, donor_reconnect_interval);
451 
452       mysql_mutex_lock(&recovery_lock);
453       mysql_cond_timedwait(&recovery_condition,
454                            &recovery_lock, &abstime);
455       mysql_mutex_unlock(&recovery_lock);
456 
457       mysql_mutex_lock(&donor_selection_lock);
458 
459       build_donor_list(NULL);
460       if (suitable_donors.empty())
461       {
462         log_message(MY_INFORMATION_LEVEL,
463                   "No valid donors exist in the group, retrying");
464         donor_connection_retry_count++;
465         mysql_mutex_unlock(&donor_selection_lock);
466         continue;
467       }
468     }
469 
470     donor_channel_thread_error= false;
471 
472     //Get the last element and delete it
473     selected_donor= suitable_donors.back();
474     suitable_donors.pop_back();
475     //increment the number of tries
476     donor_connection_retry_count++;
477 
478     if ((error= initialize_donor_connection()))
479     {
480       log_message(MY_ERROR_LEVEL,
481                   "Error when configuring the group recovery"
482                   " connection to the donor."); /* purecov: inspected */
483     }
484 
485     if (!error && !recovery_aborted)
486     {
487       error= start_recovery_donor_threads();
488     }
489 
490     if (!error)
491     {
492       connected_to_donor = true;
493       //if were on failover, now we are again connected to a valid server.
494       on_failover= false;
495     }
496 
497     mysql_mutex_unlock(&donor_selection_lock);
498 
499     /*
500       sleep so other method (recovery) can get some time
501       to grab the lock and update the group.
502     */
503     my_sleep(100);
504   }
505 
506   DBUG_RETURN(error);
507 }
508 
initialize_donor_connection()509 int Recovery_state_transfer::initialize_donor_connection()
510 {
511   DBUG_ENTER("Recovery_state_transfer::initialize_donor_connection");
512 
513   int error= 0;
514 
515   donor_connection_interface.purge_logs(false);
516 
517   char* hostname= const_cast<char*>(selected_donor->get_hostname().c_str());
518   uint port= selected_donor->get_port();
519 
520   error= donor_connection_interface.initialize_channel(hostname, port,
521                                                        NULL, NULL,
522                                                        recovery_use_ssl,
523                                                        recovery_ssl_ca,
524                                                        recovery_ssl_capath,
525                                                        recovery_ssl_cert,
526                                                        recovery_ssl_cipher,
527                                                        recovery_ssl_key,
528                                                        recovery_ssl_crl,
529                                                        recovery_ssl_crlpath,
530                                                        recovery_ssl_verify_server_cert,
531                                                        DEFAULT_THREAD_PRIORITY,
532                                                        1, false, true, true);
533 
534   if (!error)
535   {
536     log_message(MY_INFORMATION_LEVEL,
537                 "Establishing connection to a group replication recovery donor"
538                 " %s at %s port: %d.",
539                 selected_donor->get_uuid().c_str(),
540                 hostname,
541                 port);
542   }
543   else
544   {
545     log_message(MY_ERROR_LEVEL,
546                 "Error while creating the group replication recovery channel "
547                 "with donor %s at %s port: %d.",
548                 selected_donor->get_uuid().c_str(),
549                 hostname,
550                 port); /* purecov: inspected */
551   }
552 
553   DBUG_RETURN(error);
554 }
555 
start_recovery_donor_threads()556 int Recovery_state_transfer::start_recovery_donor_threads()
557 {
558   DBUG_ENTER("Recovery_state_transfer::start_recovery_donor_threads");
559 
560   int error= donor_connection_interface.start_threads(true, true,
561                                                       &view_id, true);
562 
563   if(!error)
564   {
565     DBUG_EXECUTE_IF("pause_after_io_thread_stop_hook",
566                     {
567                       const char act[]= "now "
568                                         "WAIT_FOR reached_stopping_io_thread";
569                       assert(!debug_sync_set_action(current_thd,
570                                                     STRING_WITH_LEN(act)));
571                     };);
572     DBUG_EXECUTE_IF("pause_after_sql_thread_stop_hook",
573                     {
574                       const char act[]= "now "
575                                         "WAIT_FOR reached_stopping_sql_thread";
576                       assert(!debug_sync_set_action(current_thd,
577                                                     STRING_WITH_LEN(act)));
578                     };);
579 
580     /*
581       Register a channel observer to detect SQL/IO thread stops
582       This is not done before the start as the hooks in place verify the
583       stopping thread id and that can lead to deadlocks with start itself.
584     */
585     channel_observation_manager
586       ->register_channel_observer(recovery_channel_observer);
587   }
588 
589   /*
590     We should unregister the observer and error out if the threads are stopping
591     or have stopped while the observer was being registered and the state
592     transfer is not yet completed.
593   */
594   bool is_receiver_stopping=
595          donor_connection_interface.is_receiver_thread_stopping();
596   bool is_receiver_stopped=
597          !donor_connection_interface.is_receiver_thread_running();
598   bool is_applier_stopping=
599          donor_connection_interface.is_applier_thread_stopping();
600   bool is_applier_stopped=
601          !donor_connection_interface.is_applier_thread_running();
602 
603   if (!error && !donor_transfer_finished &&
604       (is_receiver_stopping || is_receiver_stopped ||
605        is_applier_stopping || is_applier_stopped))
606   {
607     error= 1;
608     channel_observation_manager
609       ->unregister_channel_observer(recovery_channel_observer);
610     /*
611       At this point, at least one of the threads are about to stop (if it
612       didn't stopped yet).
613 
614       During retry attempts, we will:
615         a) reconfigure the receiver thread to point to a new donor;
616         b) start all thread channels;
617 
618       In order to not fail while doing (a) we must forcefully stop the
619       receiver thread if it didn't stopped yet, or else the reconfiguration
620       process will fail.
621     */
622     if ((is_applier_stopping || is_applier_stopped) &&
623         !(is_receiver_stopping || is_receiver_stopped))
624       donor_connection_interface.stop_threads(true /* receiver */,
625                                               false /* applier */);
626   }
627 
628   DBUG_EXECUTE_IF("pause_after_io_thread_stop_hook",
629                   {
630                     const char act[]= "now SIGNAL continue_to_stop_io_thread";
631                     assert(!debug_sync_set_action(current_thd,
632                                                   STRING_WITH_LEN(act)));
633                   };);
634   DBUG_EXECUTE_IF("pause_after_sql_thread_stop_hook",
635                   {
636                     const char act[]= "now SIGNAL continue_to_stop_sql_thread";
637                     assert(!debug_sync_set_action(current_thd,
638                                                   STRING_WITH_LEN(act)));
639                   };);
640 
641   if (error)
642   {
643     if (error == RPL_CHANNEL_SERVICE_RECEIVER_CONNECTION_ERROR)
644     {
645       log_message(MY_ERROR_LEVEL,
646                   "There was an error when connecting to the donor server. "
647                   "Please check that group_replication_recovery channel "
648                   "credentials and all MEMBER_HOST column values of "
649                   "performance_schema.replication_group_members table are "
650                   "correct and DNS resolvable.");
651       log_message(MY_ERROR_LEVEL,
652                   "For details please check "
653                   "performance_schema.replication_connection_status table "
654                   "and error log messages of Slave I/O for channel "
655                   "group_replication_recovery.");
656     }
657     else
658     {
659       log_message(MY_ERROR_LEVEL,
660                   "Error while starting the group replication recovery "
661                   "receiver/applier threads");
662     }
663   }
664 
665   DBUG_RETURN(error);
666 }
667 
terminate_recovery_slave_threads(bool purge_logs)668 int Recovery_state_transfer::terminate_recovery_slave_threads(bool purge_logs)
669 {
670   DBUG_ENTER("Recovery_state_transfer::terminate_recovery_slave_threads");
671 
672   log_message(MY_INFORMATION_LEVEL,
673               "Terminating existing group replication donor connection "
674               "and purging the corresponding logs.");
675 
676   int error= 0;
677 
678   //If the threads never started, the method just returns
679   if ((error= donor_connection_interface.stop_threads(true, true)))
680   {
681     log_message(MY_ERROR_LEVEL,
682                 "Error when stopping the group replication recovery's donor"
683                 " connection"); /* purecov: inspected */
684   }
685   else
686   {
687     if (purge_logs)
688     {
689       //If there is no repository in place nothing happens
690       error= purge_recovery_slave_threads_repos();
691     }
692   }
693 
694   DBUG_RETURN(error);
695 }
696 
purge_recovery_slave_threads_repos()697 int Recovery_state_transfer::purge_recovery_slave_threads_repos()
698 {
699   DBUG_ENTER("Recovery_state_transfer::purge_recovery_slave_threads_repos");
700 
701   int error= 0;
702   if ((error = donor_connection_interface.purge_logs(false)))
703   {
704     /* purecov: begin inspected */
705     log_message(MY_ERROR_LEVEL,
706                 "Error when purging the group replication recovery's relay logs");
707     DBUG_RETURN(error);
708     /* purecov: end */
709   }
710   error=
711     donor_connection_interface.initialize_channel(const_cast<char*>("<NULL>"),
712                                                   0,
713                                                   NULL, NULL,
714                                                   NULL,
715                                                   NULL,
716                                                   NULL,
717                                                   NULL,
718                                                   NULL,
719                                                   NULL,
720                                                   NULL,
721                                                   NULL,
722                                                   NULL,
723                                                   DEFAULT_THREAD_PRIORITY,
724                                                   1, false, true, true);
725 
726   DBUG_RETURN(error);
727 }
728 
729 
state_transfer(THD * recovery_thd)730 int Recovery_state_transfer::state_transfer(THD *recovery_thd)
731 {
732   DBUG_ENTER("Recovery_state_transfer::state_transfer");
733 
734   int error= 0;
735 
736   while (!donor_transfer_finished && !recovery_aborted)
737   {
738     /*
739       If an applier error happened: stop the slave threads.
740       We do not purge logs or reset channel configuration to
741       preserve the error information on performance schema
742       tables until the next recovery attempt.
743       Recovery_state_transfer::initialize_donor_connection() will
744       take care of that.
745     */
746     if (donor_channel_thread_error)
747     {
748       //Unsubscribe the listener until it connects again.
749       channel_observation_manager
750           ->unregister_channel_observer(recovery_channel_observer);
751 
752       if ((error= terminate_recovery_slave_threads(false)))
753       {
754         /* purecov: begin inspected */
755         log_message(MY_ERROR_LEVEL,
756                     "Can't kill the current group replication recovery donor"
757                     " connection after an applier error."
758                     " Recovery will shutdown.");
759         //if we can't stop, abort recovery
760        DBUG_RETURN(error);
761         /* purecov: end */
762       }
763     }
764 
765     //If the donor left, just terminate the threads with no log purging
766     if (on_failover)
767     {
768       //Unsubscribe the listener until it connects again.
769       channel_observation_manager
770           ->unregister_channel_observer(recovery_channel_observer);
771 
772       //Stop the threads before reconfiguring the connection
773       if ((error= donor_connection_interface.stop_threads(true, true)))
774       {
775         /* purecov: begin inspected */
776         log_message(MY_ERROR_LEVEL,
777                     "Can't kill the current group replication recovery donor"
778                     " connection during failover. Recovery will shutdown.");
779         //if we can't stop, abort recovery
780         DBUG_RETURN(error);
781         /* purecov: end */
782       }
783     }
784 
785 #ifndef _WIN32
786     THD_STAGE_INFO(recovery_thd, stage_connecting_to_master);
787 #endif
788 
789     if (!recovery_aborted)
790     {
791       //if the connection to the donor failed, abort recovery
792       if ((error = establish_donor_connection()))
793       {
794         break;
795       }
796     }
797 
798 #ifndef _WIN32
799     THD_STAGE_INFO(recovery_thd, stage_executing);
800 #endif
801 
802     /*
803       donor_transfer_finished    -> set by the set_retrieved_cert_info method.
804                                  lock: recovery_lock
805       recovery_aborted           -> set when stopping recovery
806                                  lock: run_lock
807       on_failover                -> set to true on update_recovery_process.
808                                  set to false when connected to a valid donor
809                                  lock: donor_selection_lock
810       donor_channel_thread_error -> set to true on inform_of_applier_stop or
811                                  inform_of_receiver_stop.
812                                  set to false before connecting to any donor
813                                  lock: donor_selection_lock
814     */
815     mysql_mutex_lock(&recovery_lock);
816     while (!donor_transfer_finished && !recovery_aborted &&
817            !on_failover && !donor_channel_thread_error)
818     {
819       mysql_cond_wait(&recovery_condition, &recovery_lock);
820     }
821     mysql_mutex_unlock(&recovery_lock);
822   }//if the current connection was terminated, connect again
823 
824   channel_observation_manager
825       ->unregister_channel_observer(recovery_channel_observer);
826   // do not purge logs if an error occur, keep the diagnose on SLAVE STATUS
827   terminate_recovery_slave_threads(!error);
828   connected_to_donor= false;
829 
830   DBUG_RETURN(error);
831 }
832