1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include <signal.h>
24 #include "recovery.h"
25 #include "recovery_message.h"
26 #include "member_info.h"
27 #include "plugin_log.h"
28 #include "recovery_channel_state_observer.h"
29 #include "plugin.h"
30 
31 using std::list;
32 using std::string;
33 using std::vector;
34 
35 /** The number of queued transactions below which we declare the member online */
36 static uint RECOVERY_TRANSACTION_THRESHOLD= 0;
37 
38 /** The relay log name*/
39 static char recovery_channel_name[]= "group_replication_recovery";
40 
launch_handler_thread(void * arg)41 static void *launch_handler_thread(void* arg)
42 {
43   Recovery_module *handler= (Recovery_module*) arg;
44   handler->recovery_thread_handle();
45   return 0;
46 }
47 
48 Recovery_module::
Recovery_module(Applier_module_interface * applier,Channel_observation_manager * channel_obsr_mngr,ulong components_stop_timeout)49 Recovery_module(Applier_module_interface *applier,
50                 Channel_observation_manager *channel_obsr_mngr,
51                 ulong components_stop_timeout)
52   : applier_module(applier), recovery_state_transfer(recovery_channel_name,
53     local_member_info->get_uuid(), channel_obsr_mngr),
54     recovery_running(false), recovery_starting(false),
55     recovery_completion_policy(RECOVERY_POLICY_WAIT_CERTIFIED),
56     stop_wait_timeout(components_stop_timeout)
57 {
58   mysql_mutex_init(key_GR_LOCK_recovery_module_run, &run_lock, MY_MUTEX_INIT_FAST);
59   mysql_cond_init(key_GR_COND_recovery_module_run, &run_cond);
60 }
61 
~Recovery_module()62 Recovery_module::~Recovery_module()
63 {
64   mysql_mutex_destroy(&run_lock);
65   mysql_cond_destroy(&run_cond);
66 }
67 
68 int
start_recovery(const string & group_name,const string & rec_view_id)69 Recovery_module::start_recovery(const string& group_name,
70                                 const string& rec_view_id)
71 {
72   DBUG_ENTER("Recovery_module::start_recovery");
73 
74   mysql_mutex_lock(&run_lock);
75 
76   if (recovery_state_transfer.check_recovery_thread_status())
77   {
78     /* purecov: begin inspected */
79     log_message(MY_ERROR_LEVEL,
80                 "A previous recovery session is still running. "
81                 "Please stop the group replication plugin and"
82                 " wait for it to stop.");
83     DBUG_RETURN(1);
84     /* purecov: end */
85   }
86 
87   this->group_name= group_name;
88   recovery_state_transfer.initialize(rec_view_id);
89 
90   //reset the recovery aborted status here to avoid concurrency
91   recovery_aborted= false;
92 
93   recovery_starting= true;
94 
95   if (mysql_thread_create(key_GR_THD_recovery,
96                           &recovery_pthd,
97                           get_connection_attrib(),
98                           launch_handler_thread,
99                           (void*)this))
100   {
101     /* purecov: begin inspected */
102     mysql_mutex_unlock(&run_lock);
103     DBUG_RETURN(1);
104     /* purecov: end */
105   }
106 
107   while (!recovery_running && !recovery_aborted)
108   {
109     DBUG_PRINT("sleep",("Waiting for recovery thread to start"));
110     mysql_cond_wait(&run_cond, &run_lock);
111   }
112   mysql_mutex_unlock(&run_lock);
113 
114   DBUG_RETURN(0);
115 }
116 
117 int
stop_recovery()118 Recovery_module::stop_recovery()
119 {
120   DBUG_ENTER("Recovery_module::stop_recovery");
121 
122   mysql_mutex_lock(&run_lock);
123 
124   if (!recovery_running)
125   {
126     if (!recovery_starting)
127     {
128       mysql_mutex_unlock(&run_lock);
129       DBUG_RETURN(0);
130     }
131   }
132 
133   recovery_aborted= true;
134 
135   while (recovery_running || recovery_starting)
136   {
137     DBUG_PRINT("loop", ("killing group replication recovery thread"));
138 
139     mysql_mutex_lock(&recovery_thd->LOCK_thd_data);
140 
141     recovery_thd->awake(THD::NOT_KILLED);
142     mysql_mutex_unlock(&recovery_thd->LOCK_thd_data);
143 
144     //Break the wait for the applier suspension
145     applier_module->interrupt_applier_suspension_wait();
146     //Break the state transfer process
147     recovery_state_transfer.abort_state_transfer();
148 
149     /*
150       There is a small chance that thread might miss the first
151       alarm. To protect against it, resend the signal until it reacts
152     */
153     struct timespec abstime;
154     set_timespec(&abstime, 2);
155 #ifndef NDEBUG
156     int error=
157 #endif
158     mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
159     if (stop_wait_timeout >= 2)
160     {
161       stop_wait_timeout= stop_wait_timeout - 2;
162     }
163     /* purecov: begin inspected */
164     else if (recovery_running) // quit waiting
165     {
166       mysql_mutex_unlock(&run_lock);
167       DBUG_RETURN(1);
168     }
169     /* purecov: inspected */
170     assert(error == ETIMEDOUT || error == 0);
171   }
172 
173   assert(!recovery_running);
174 
175   mysql_mutex_unlock(&run_lock);
176 
177   DBUG_RETURN(0);
178 }
179 
180   /*
181    If recovery failed, it's no use to continue in the group as the member cannot
182    take an active part in it, so it must leave.
183   */
leave_group_on_recovery_failure()184 void Recovery_module::leave_group_on_recovery_failure()
185 {
186   log_message(MY_ERROR_LEVEL, "Fatal error during the Recovery process of "
187               "Group Replication. The server will leave the group.");
188   //tell the update process that we are already stopping
189   recovery_aborted= true;
190 
191   //If you can't leave at least force the Error state.
192   group_member_mgr->update_member_status(local_member_info->get_uuid(),
193                                          Group_member_info::MEMBER_ERROR);
194 
195   if (view_change_notifier != NULL &&
196       !view_change_notifier->is_view_modification_ongoing())
197   {
198     view_change_notifier->start_view_modification();
199   }
200 
201   Gcs_operations::enum_leave_state state= gcs_module->leave();
202 
203   int error= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
204                               stop_wait_timeout);
205   if (error)
206   {
207     log_message(MY_ERROR_LEVEL,
208                 "Error stopping all replication channels while server was"
209                 " leaving the group. Please check the error log for additional"
210                 " details. Got error: %d", error);
211   }
212 
213   std::stringstream ss;
214   bool has_error= true;
215   plugin_log_level log_severity= MY_WARNING_LEVEL;
216   switch (state)
217   {
218     case Gcs_operations::ERROR_WHEN_LEAVING:
219       /* purecov: begin inspected */
220       ss << "Unable to confirm whether the server has left the group or not. "
221             "Check performance_schema.replication_group_members to check group membership information.";
222       log_severity= MY_ERROR_LEVEL;
223       break;
224       /* purecov: end */
225     case Gcs_operations::ALREADY_LEAVING:
226       ss << "Skipping leave operation: concurrent attempt to leave the group is on-going.";
227       break;
228     case Gcs_operations::ALREADY_LEFT:
229       /* purecov: begin inspected */
230       ss << "Skipping leave operation: member already left the group.";
231       break;
232       /* purecov: end */
233     case Gcs_operations::NOW_LEAVING:
234       has_error= false;
235       break;
236   }
237 
238   if (has_error)
239     log_message(log_severity, ss.str().c_str());
240 
241   if (view_change_notifier != NULL)
242   {
243     log_message(MY_INFORMATION_LEVEL, "Going to wait for view modification");
244     if (view_change_notifier->wait_for_view_modification())
245     {
246       log_message(MY_WARNING_LEVEL, "On shutdown there was a timeout receiving "
247                                     "a view change. This can lead to a possible"
248                                     " inconsistent state. Check the log for "
249                                     "more details");
250     }
251   }
252 
253   if (exit_state_action_var == EXIT_STATE_ACTION_ABORT_SERVER)
254     abort_plugin_process("Fatal error during execution of Group Replication");
255 }
256 
257 /*
258   Recovery core method:
259 
260   * Step 0: Declare recovery running after extracting group information
261 
262   * Step 1: Wait for the applier to execute pending transactions and suspend.
263     Even if the joiner is alone, it goes trough this phase so it is declared
264     ONLINE only when it executed all pending local transactions.
265 
266   * Step 2: Declare the node ONLINE if alone.
267     This is done solely based on the number of member the group had when
268     recovery started. No further group changes affect this decision.
269 
270   * Step 3:  State transfer.
271     This can be summarized as:
272       1) Connect to a donor
273       2) Wait until the data comes.
274     It can be interrupted/terminated by:
275       > recovery_aborted is set to true. This means recovery was aborted.
276         The wait is awaken and the loop is broken. The thread shutdowns.
277       > on_failover is set to true. This means the current donor left.
278         The loop cycles and another donor is selected.
279         The threads are stopped but the logs are not purged.
280         A connections is established to another donor.
281       > donor_channel_applier_error is set to true. This means an error was
282         detected in the recovery applier.
283         When the loop cycles, kill the threads and purge the logs
284         A connections is established to another donor.
285       > donor_transfer_finished. This means we received all the data.
286         The loop exits
287 
288   * Step 4: Awake the applier and wait for the execution of cached transactions.
289 
290   * Step 5: Notify the group that we are now online if no error occurred.
291     This is done even if the member is alone in the group.
292 
293   * Step 6: If an error occurred and recovery is impossible leave the group.
294     We leave the group but the plugin is left running.
295 
296   * Step 7: Terminate the recovery thread.
297 */
298 int
recovery_thread_handle()299 Recovery_module::recovery_thread_handle()
300 {
301   DBUG_ENTER("Recovery_module::recovery_thread_handle");
302 
303   /* Step 0 */
304 
305   int error= 0;
306 
307   set_recovery_thread_context();
308 
309   //take this before the start method returns
310   size_t number_of_members= group_member_mgr->get_number_of_members();
311   recovery_state_transfer.initialize_group_info();
312 
313   mysql_mutex_lock(&run_lock);
314   recovery_running= true;
315   recovery_starting= false;
316   mysql_cond_broadcast(&run_cond);
317   mysql_mutex_unlock(&run_lock);
318 
319 #ifndef _WIN32
320   THD_STAGE_INFO(recovery_thd, stage_executing);
321 #endif
322 
323   /* Step 1 */
324 
325   //wait for the appliers suspension
326   error= applier_module->wait_for_applier_complete_suspension(&recovery_aborted);
327 
328   //If the applier is already stopped then something went wrong and we are
329   //already leaving the group
330   if (error == APPLIER_THREAD_ABORTED)
331   {
332       /* purecov: begin inspected */
333       error= 0;
334       recovery_aborted= true;
335       goto cleanup;
336       /* purecov: end */
337   }
338 
339   if (!recovery_aborted && error)
340   {
341     /* purecov: begin inspected */
342     log_message(MY_ERROR_LEVEL,
343                 "Can't evaluate the group replication applier execution status. "
344                 "Group replication recovery will shutdown to avoid data "
345                 "corruption.");
346     goto cleanup;
347     /* purecov: end */
348   }
349 
350 #ifndef NDEBUG
351   DBUG_EXECUTE_IF("recovery_thread_start_wait_num_of_members",
352                   {
353                     assert(number_of_members != 1);
354                     DBUG_SET("d,recovery_thread_start_wait");
355                   });
356   DBUG_EXECUTE_IF("recovery_thread_start_wait",
357                   {
358                     const char act[]= "now wait_for signal.recovery_continue";
359                     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
360                   });
361 #endif // NDEBUG
362 
363   /* Step 2 */
364 
365   if (number_of_members == 1)
366   {
367     if (!recovery_aborted)
368     {
369       log_message(MY_INFORMATION_LEVEL,
370                   "Only one server alive."
371                   " Declaring this server as online within the replication group");
372     }
373     goto single_member_online;
374   }
375 
376   /* Step 3 */
377 
378   error= recovery_state_transfer.state_transfer(recovery_thd);
379 
380 #ifndef NDEBUG
381   DBUG_EXECUTE_IF("recovery_thread_wait_before_finish",
382                   {
383                     const char act[]= "now wait_for signal.recovery_end";
384                     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
385                   });
386 #endif // NDEBUG
387 
388   if (error)
389   {
390     goto cleanup;
391   }
392 
393 single_member_online:
394 
395   /* Step 4 */
396 
397   /**
398     If recovery fails or is aborted, it never makes sense to awake the applier,
399     as that would lead to the certification and execution of transactions on
400     the wrong context.
401   */
402   if (!recovery_aborted)
403     applier_module->awake_applier_module();
404 
405   error= wait_for_applier_module_recovery();
406 
407 cleanup:
408 
409   /* Step 5 */
410 
411   //if finished, declare the member online
412   if (!recovery_aborted && !error)
413   {
414     notify_group_recovery_end();
415   }
416 
417   /* Step 6 */
418 
419   /*
420    If recovery failed, it's no use to continue in the group as the member cannot
421    take an active part in it, so it must leave.
422   */
423   if (error)
424   {
425     leave_group_on_recovery_failure();
426   }
427 
428 #ifndef NDEBUG
429   DBUG_EXECUTE_IF("recovery_thread_wait_before_cleanup",
430                   {
431                     const char act[]= "now wait_for signal.recovery_end_end";
432                     debug_sync_set_action(current_thd, STRING_WITH_LEN(act));
433                   });
434 #endif // NDEBUG
435 
436   /* Step 7 */
437 
438   clean_recovery_thread_context();
439 
440   mysql_mutex_lock(&run_lock);
441   delete recovery_thd;
442 
443   recovery_aborted= true;  // to avoid the start missing signals
444   recovery_running= false;
445   mysql_cond_broadcast(&run_cond);
446   mysql_mutex_unlock(&run_lock);
447 
448   Gcs_interface_factory::cleanup(Gcs_operations::get_gcs_engine());
449 
450   my_thread_end();
451   my_thread_exit(0);
452 
453   DBUG_RETURN(error); /* purecov: inspected */
454 }
455 
456 int
update_recovery_process(bool did_members_left,bool is_leaving)457 Recovery_module::update_recovery_process(bool did_members_left, bool is_leaving)
458 {
459   DBUG_ENTER("Recovery_module::update_recovery_process");
460 
461   int error= 0;
462 
463   if (recovery_running)
464   {
465     /*
466       If I left the Group... the group manager will only have me so recovery
467       should stop.
468       But if it was the plugin that chose to leave the group then it will stop
469       by recovery in the process.
470     */
471     if (is_leaving && !recovery_aborted)
472     {
473       stop_recovery();
474     }
475     else if (!recovery_aborted)
476     {
477       recovery_state_transfer.update_recovery_process(did_members_left);
478     }
479   }
480 
481   DBUG_RETURN(error);
482 }
483 
484 int
set_retrieved_cert_info(void * info)485 Recovery_module::set_retrieved_cert_info(void* info)
486 {
487   DBUG_ENTER("Recovery_module::set_retrieved_cert_info");
488 
489   View_change_log_event* view_change_event= static_cast<View_change_log_event*>(info);
490   // Transmit the certification info into the pipeline
491   Handler_certifier_information_action *cert_action=
492     new Handler_certifier_information_action(view_change_event->get_certification_info());
493 
494   int error= applier_module->handle_pipeline_action(cert_action);
495   delete cert_action;
496   if (error)
497   {
498     /* purecov: begin inspected */
499     log_message(MY_ERROR_LEVEL, "Error when processing Certification "
500                 "information in the Recovery process");
501     leave_group_on_recovery_failure();
502     DBUG_RETURN(1);
503     /* purecov: end */
504   }
505 
506   recovery_state_transfer.end_state_transfer();
507 
508   DBUG_RETURN(0);
509 }
510 
511 void
set_recovery_thread_context()512 Recovery_module::set_recovery_thread_context()
513 {
514   THD* thd= new THD;
515   my_thread_init();
516   thd->set_new_thread_id();
517   thd->thread_stack= (char*) &thd;
518   mysql_thread_set_psi_id(thd->thread_id());
519   thd->store_globals();
520 
521   global_thd_manager_add_thd(thd);
522   thd->security_context()->skip_grants();
523 
524   thd->slave_thread= 1;
525   recovery_thd= thd;
526 }
527 
528 void
clean_recovery_thread_context()529 Recovery_module::clean_recovery_thread_context()
530 {
531   recovery_thd->release_resources();
532   THD_CHECK_SENTRY(recovery_thd);
533   global_thd_manager_remove_thd(recovery_thd);
534 }
535 
wait_for_applier_module_recovery()536 int Recovery_module::wait_for_applier_module_recovery()
537 {
538   DBUG_ENTER("Recovery_module::wait_for_applier_module_recovery");
539 
540   bool applier_monitoring= true;
541   while (!recovery_aborted && applier_monitoring)
542   {
543     size_t queue_size = applier_module->get_message_queue_size();
544     if (queue_size <= RECOVERY_TRANSACTION_THRESHOLD)
545     {
546       if (recovery_completion_policy == RECOVERY_POLICY_WAIT_EXECUTED)
547       {
548         int error= applier_module->wait_for_applier_event_execution(1, false);
549         if (!error)
550           applier_monitoring= false;
551         /* purecov: begin inspected */
552         if (error == -2) //error when waiting
553         {
554           applier_monitoring= false;
555           log_message(MY_ERROR_LEVEL,
556                       "It is not possible to ensure the execution of group"
557                       " transactions received during recovery.");
558           DBUG_RETURN(1);
559         }
560         /* purecov: end */
561       }
562       else
563       {
564         applier_monitoring= false;
565       }
566     }
567     else
568     {
569       my_sleep(100 * queue_size);
570     }
571   }
572 
573   if (applier_module->get_applier_status() == APPLIER_ERROR &&
574       !recovery_aborted)
575     DBUG_RETURN(1); /* purecov: inspected */
576 
577   DBUG_RETURN(0);
578 }
579 
notify_group_recovery_end()580 void Recovery_module::notify_group_recovery_end()
581 {
582   DBUG_ENTER("Recovery_module::notify_group_recovery_end");
583 
584   Recovery_message recovery_msg(Recovery_message::RECOVERY_END_MESSAGE,
585                                 local_member_info->get_uuid());
586   enum_gcs_error msg_error= gcs_module->send_message(recovery_msg);
587   if (msg_error != GCS_OK)
588   {
589     log_message(MY_ERROR_LEVEL,
590                 "Error while sending message for group replication recovery"); /* purecov: inspected */
591   }
592 
593   DBUG_VOID_RETURN;
594 }
595 
is_own_event_channel(my_thread_id id)596 bool Recovery_module::is_own_event_channel(my_thread_id id)
597 {
598   DBUG_ENTER("Recovery_module::is_own_event_channel");
599   DBUG_RETURN(recovery_state_transfer.is_own_event_channel(id));
600 }
601