1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include <signal.h>
24 #include "recovery.h"
25 #include "recovery_message.h"
26 #include "member_info.h"
27 #include "plugin_log.h"
28 #include "recovery_channel_state_observer.h"
29 #include "plugin.h"
30
31 using std::list;
32 using std::string;
33 using std::vector;
34
35 /** The number of queued transactions below which we declare the member online */
36 static uint RECOVERY_TRANSACTION_THRESHOLD= 0;
37
38 /** The relay log name*/
39 static char recovery_channel_name[]= "group_replication_recovery";
40
launch_handler_thread(void * arg)41 static void *launch_handler_thread(void* arg)
42 {
43 Recovery_module *handler= (Recovery_module*) arg;
44 handler->recovery_thread_handle();
45 return 0;
46 }
47
48 Recovery_module::
Recovery_module(Applier_module_interface * applier,Channel_observation_manager * channel_obsr_mngr,ulong components_stop_timeout)49 Recovery_module(Applier_module_interface *applier,
50 Channel_observation_manager *channel_obsr_mngr,
51 ulong components_stop_timeout)
52 : applier_module(applier), recovery_state_transfer(recovery_channel_name,
53 local_member_info->get_uuid(), channel_obsr_mngr),
54 recovery_running(false), recovery_starting(false),
55 recovery_completion_policy(RECOVERY_POLICY_WAIT_CERTIFIED),
56 stop_wait_timeout(components_stop_timeout)
57 {
58 mysql_mutex_init(key_GR_LOCK_recovery_module_run, &run_lock, MY_MUTEX_INIT_FAST);
59 mysql_cond_init(key_GR_COND_recovery_module_run, &run_cond);
60 }
61
~Recovery_module()62 Recovery_module::~Recovery_module()
63 {
64 mysql_mutex_destroy(&run_lock);
65 mysql_cond_destroy(&run_cond);
66 }
67
68 int
start_recovery(const string & group_name,const string & rec_view_id)69 Recovery_module::start_recovery(const string& group_name,
70 const string& rec_view_id)
71 {
72 DBUG_ENTER("Recovery_module::start_recovery");
73
74 mysql_mutex_lock(&run_lock);
75
76 if (recovery_state_transfer.check_recovery_thread_status())
77 {
78 /* purecov: begin inspected */
79 log_message(MY_ERROR_LEVEL,
80 "A previous recovery session is still running. "
81 "Please stop the group replication plugin and"
82 " wait for it to stop.");
83 DBUG_RETURN(1);
84 /* purecov: end */
85 }
86
87 this->group_name= group_name;
88 recovery_state_transfer.initialize(rec_view_id);
89
90 //reset the recovery aborted status here to avoid concurrency
91 recovery_aborted= false;
92
93 recovery_starting= true;
94
95 if (mysql_thread_create(key_GR_THD_recovery,
96 &recovery_pthd,
97 get_connection_attrib(),
98 launch_handler_thread,
99 (void*)this))
100 {
101 /* purecov: begin inspected */
102 mysql_mutex_unlock(&run_lock);
103 DBUG_RETURN(1);
104 /* purecov: end */
105 }
106
107 while (!recovery_running && !recovery_aborted)
108 {
109 DBUG_PRINT("sleep",("Waiting for recovery thread to start"));
110 mysql_cond_wait(&run_cond, &run_lock);
111 }
112 mysql_mutex_unlock(&run_lock);
113
114 DBUG_RETURN(0);
115 }
116
117 int
stop_recovery()118 Recovery_module::stop_recovery()
119 {
120 DBUG_ENTER("Recovery_module::stop_recovery");
121
122 mysql_mutex_lock(&run_lock);
123
124 if (!recovery_running)
125 {
126 if (!recovery_starting)
127 {
128 mysql_mutex_unlock(&run_lock);
129 DBUG_RETURN(0);
130 }
131 }
132
133 recovery_aborted= true;
134
135 while (recovery_running || recovery_starting)
136 {
137 DBUG_PRINT("loop", ("killing group replication recovery thread"));
138
139 mysql_mutex_lock(&recovery_thd->LOCK_thd_data);
140
141 recovery_thd->awake(THD::NOT_KILLED);
142 mysql_mutex_unlock(&recovery_thd->LOCK_thd_data);
143
144 //Break the wait for the applier suspension
145 applier_module->interrupt_applier_suspension_wait();
146 //Break the state transfer process
147 recovery_state_transfer.abort_state_transfer();
148
149 /*
150 There is a small chance that thread might miss the first
151 alarm. To protect against it, resend the signal until it reacts
152 */
153 struct timespec abstime;
154 set_timespec(&abstime, 2);
155 #ifndef NDEBUG
156 int error=
157 #endif
158 mysql_cond_timedwait(&run_cond, &run_lock, &abstime);
159 if (stop_wait_timeout >= 2)
160 {
161 stop_wait_timeout= stop_wait_timeout - 2;
162 }
163 /* purecov: begin inspected */
164 else if (recovery_running) // quit waiting
165 {
166 mysql_mutex_unlock(&run_lock);
167 DBUG_RETURN(1);
168 }
169 /* purecov: inspected */
170 assert(error == ETIMEDOUT || error == 0);
171 }
172
173 assert(!recovery_running);
174
175 mysql_mutex_unlock(&run_lock);
176
177 DBUG_RETURN(0);
178 }
179
180 /*
181 If recovery failed, it's no use to continue in the group as the member cannot
182 take an active part in it, so it must leave.
183 */
leave_group_on_recovery_failure()184 void Recovery_module::leave_group_on_recovery_failure()
185 {
186 log_message(MY_ERROR_LEVEL, "Fatal error during the Recovery process of "
187 "Group Replication. The server will leave the group.");
188 //tell the update process that we are already stopping
189 recovery_aborted= true;
190
191 //If you can't leave at least force the Error state.
192 group_member_mgr->update_member_status(local_member_info->get_uuid(),
193 Group_member_info::MEMBER_ERROR);
194
195 if (view_change_notifier != NULL &&
196 !view_change_notifier->is_view_modification_ongoing())
197 {
198 view_change_notifier->start_view_modification();
199 }
200
201 Gcs_operations::enum_leave_state state= gcs_module->leave();
202
203 int error= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
204 stop_wait_timeout);
205 if (error)
206 {
207 log_message(MY_ERROR_LEVEL,
208 "Error stopping all replication channels while server was"
209 " leaving the group. Please check the error log for additional"
210 " details. Got error: %d", error);
211 }
212
213 std::stringstream ss;
214 bool has_error= true;
215 plugin_log_level log_severity= MY_WARNING_LEVEL;
216 switch (state)
217 {
218 case Gcs_operations::ERROR_WHEN_LEAVING:
219 /* purecov: begin inspected */
220 ss << "Unable to confirm whether the server has left the group or not. "
221 "Check performance_schema.replication_group_members to check group membership information.";
222 log_severity= MY_ERROR_LEVEL;
223 break;
224 /* purecov: end */
225 case Gcs_operations::ALREADY_LEAVING:
226 ss << "Skipping leave operation: concurrent attempt to leave the group is on-going.";
227 break;
228 case Gcs_operations::ALREADY_LEFT:
229 /* purecov: begin inspected */
230 ss << "Skipping leave operation: member already left the group.";
231 break;
232 /* purecov: end */
233 case Gcs_operations::NOW_LEAVING:
234 has_error= false;
235 break;
236 }
237
238 if (has_error)
239 log_message(log_severity, ss.str().c_str());
240
241 if (view_change_notifier != NULL)
242 {
243 log_message(MY_INFORMATION_LEVEL, "Going to wait for view modification");
244 if (view_change_notifier->wait_for_view_modification())
245 {
246 log_message(MY_WARNING_LEVEL, "On shutdown there was a timeout receiving "
247 "a view change. This can lead to a possible"
248 " inconsistent state. Check the log for "
249 "more details");
250 }
251 }
252
253 if (exit_state_action_var == EXIT_STATE_ACTION_ABORT_SERVER)
254 abort_plugin_process("Fatal error during execution of Group Replication");
255 }
256
257 /*
258 Recovery core method:
259
260 * Step 0: Declare recovery running after extracting group information
261
262 * Step 1: Wait for the applier to execute pending transactions and suspend.
263 Even if the joiner is alone, it goes trough this phase so it is declared
264 ONLINE only when it executed all pending local transactions.
265
266 * Step 2: Declare the node ONLINE if alone.
267 This is done solely based on the number of member the group had when
268 recovery started. No further group changes affect this decision.
269
270 * Step 3: State transfer.
271 This can be summarized as:
272 1) Connect to a donor
273 2) Wait until the data comes.
274 It can be interrupted/terminated by:
275 > recovery_aborted is set to true. This means recovery was aborted.
276 The wait is awaken and the loop is broken. The thread shutdowns.
277 > on_failover is set to true. This means the current donor left.
278 The loop cycles and another donor is selected.
279 The threads are stopped but the logs are not purged.
280 A connections is established to another donor.
281 > donor_channel_applier_error is set to true. This means an error was
282 detected in the recovery applier.
283 When the loop cycles, kill the threads and purge the logs
284 A connections is established to another donor.
285 > donor_transfer_finished. This means we received all the data.
286 The loop exits
287
288 * Step 4: Awake the applier and wait for the execution of cached transactions.
289
290 * Step 5: Notify the group that we are now online if no error occurred.
291 This is done even if the member is alone in the group.
292
293 * Step 6: If an error occurred and recovery is impossible leave the group.
294 We leave the group but the plugin is left running.
295
296 * Step 7: Terminate the recovery thread.
297 */
298 int
recovery_thread_handle()299 Recovery_module::recovery_thread_handle()
300 {
301 DBUG_ENTER("Recovery_module::recovery_thread_handle");
302
303 /* Step 0 */
304
305 int error= 0;
306
307 set_recovery_thread_context();
308
309 //take this before the start method returns
310 size_t number_of_members= group_member_mgr->get_number_of_members();
311 recovery_state_transfer.initialize_group_info();
312
313 mysql_mutex_lock(&run_lock);
314 recovery_running= true;
315 recovery_starting= false;
316 mysql_cond_broadcast(&run_cond);
317 mysql_mutex_unlock(&run_lock);
318
319 #ifndef _WIN32
320 THD_STAGE_INFO(recovery_thd, stage_executing);
321 #endif
322
323 /* Step 1 */
324
325 //wait for the appliers suspension
326 error= applier_module->wait_for_applier_complete_suspension(&recovery_aborted);
327
328 //If the applier is already stopped then something went wrong and we are
329 //already leaving the group
330 if (error == APPLIER_THREAD_ABORTED)
331 {
332 /* purecov: begin inspected */
333 error= 0;
334 recovery_aborted= true;
335 goto cleanup;
336 /* purecov: end */
337 }
338
339 if (!recovery_aborted && error)
340 {
341 /* purecov: begin inspected */
342 log_message(MY_ERROR_LEVEL,
343 "Can't evaluate the group replication applier execution status. "
344 "Group replication recovery will shutdown to avoid data "
345 "corruption.");
346 goto cleanup;
347 /* purecov: end */
348 }
349
350 #ifndef NDEBUG
351 DBUG_EXECUTE_IF("recovery_thread_start_wait_num_of_members",
352 {
353 assert(number_of_members != 1);
354 DBUG_SET("d,recovery_thread_start_wait");
355 });
356 DBUG_EXECUTE_IF("recovery_thread_start_wait",
357 {
358 const char act[]= "now wait_for signal.recovery_continue";
359 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
360 });
361 #endif // NDEBUG
362
363 /* Step 2 */
364
365 if (number_of_members == 1)
366 {
367 if (!recovery_aborted)
368 {
369 log_message(MY_INFORMATION_LEVEL,
370 "Only one server alive."
371 " Declaring this server as online within the replication group");
372 }
373 goto single_member_online;
374 }
375
376 /* Step 3 */
377
378 error= recovery_state_transfer.state_transfer(recovery_thd);
379
380 #ifndef NDEBUG
381 DBUG_EXECUTE_IF("recovery_thread_wait_before_finish",
382 {
383 const char act[]= "now wait_for signal.recovery_end";
384 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
385 });
386 #endif // NDEBUG
387
388 if (error)
389 {
390 goto cleanup;
391 }
392
393 single_member_online:
394
395 /* Step 4 */
396
397 /**
398 If recovery fails or is aborted, it never makes sense to awake the applier,
399 as that would lead to the certification and execution of transactions on
400 the wrong context.
401 */
402 if (!recovery_aborted)
403 applier_module->awake_applier_module();
404
405 error= wait_for_applier_module_recovery();
406
407 cleanup:
408
409 /* Step 5 */
410
411 //if finished, declare the member online
412 if (!recovery_aborted && !error)
413 {
414 notify_group_recovery_end();
415 }
416
417 /* Step 6 */
418
419 /*
420 If recovery failed, it's no use to continue in the group as the member cannot
421 take an active part in it, so it must leave.
422 */
423 if (error)
424 {
425 leave_group_on_recovery_failure();
426 }
427
428 #ifndef NDEBUG
429 DBUG_EXECUTE_IF("recovery_thread_wait_before_cleanup",
430 {
431 const char act[]= "now wait_for signal.recovery_end_end";
432 debug_sync_set_action(current_thd, STRING_WITH_LEN(act));
433 });
434 #endif // NDEBUG
435
436 /* Step 7 */
437
438 clean_recovery_thread_context();
439
440 mysql_mutex_lock(&run_lock);
441 delete recovery_thd;
442
443 recovery_aborted= true; // to avoid the start missing signals
444 recovery_running= false;
445 mysql_cond_broadcast(&run_cond);
446 mysql_mutex_unlock(&run_lock);
447
448 Gcs_interface_factory::cleanup(Gcs_operations::get_gcs_engine());
449
450 my_thread_end();
451 my_thread_exit(0);
452
453 DBUG_RETURN(error); /* purecov: inspected */
454 }
455
456 int
update_recovery_process(bool did_members_left,bool is_leaving)457 Recovery_module::update_recovery_process(bool did_members_left, bool is_leaving)
458 {
459 DBUG_ENTER("Recovery_module::update_recovery_process");
460
461 int error= 0;
462
463 if (recovery_running)
464 {
465 /*
466 If I left the Group... the group manager will only have me so recovery
467 should stop.
468 But if it was the plugin that chose to leave the group then it will stop
469 by recovery in the process.
470 */
471 if (is_leaving && !recovery_aborted)
472 {
473 stop_recovery();
474 }
475 else if (!recovery_aborted)
476 {
477 recovery_state_transfer.update_recovery_process(did_members_left);
478 }
479 }
480
481 DBUG_RETURN(error);
482 }
483
484 int
set_retrieved_cert_info(void * info)485 Recovery_module::set_retrieved_cert_info(void* info)
486 {
487 DBUG_ENTER("Recovery_module::set_retrieved_cert_info");
488
489 View_change_log_event* view_change_event= static_cast<View_change_log_event*>(info);
490 // Transmit the certification info into the pipeline
491 Handler_certifier_information_action *cert_action=
492 new Handler_certifier_information_action(view_change_event->get_certification_info());
493
494 int error= applier_module->handle_pipeline_action(cert_action);
495 delete cert_action;
496 if (error)
497 {
498 /* purecov: begin inspected */
499 log_message(MY_ERROR_LEVEL, "Error when processing Certification "
500 "information in the Recovery process");
501 leave_group_on_recovery_failure();
502 DBUG_RETURN(1);
503 /* purecov: end */
504 }
505
506 recovery_state_transfer.end_state_transfer();
507
508 DBUG_RETURN(0);
509 }
510
511 void
set_recovery_thread_context()512 Recovery_module::set_recovery_thread_context()
513 {
514 THD* thd= new THD;
515 my_thread_init();
516 thd->set_new_thread_id();
517 thd->thread_stack= (char*) &thd;
518 mysql_thread_set_psi_id(thd->thread_id());
519 thd->store_globals();
520
521 global_thd_manager_add_thd(thd);
522 thd->security_context()->skip_grants();
523
524 thd->slave_thread= 1;
525 recovery_thd= thd;
526 }
527
528 void
clean_recovery_thread_context()529 Recovery_module::clean_recovery_thread_context()
530 {
531 recovery_thd->release_resources();
532 THD_CHECK_SENTRY(recovery_thd);
533 global_thd_manager_remove_thd(recovery_thd);
534 }
535
wait_for_applier_module_recovery()536 int Recovery_module::wait_for_applier_module_recovery()
537 {
538 DBUG_ENTER("Recovery_module::wait_for_applier_module_recovery");
539
540 bool applier_monitoring= true;
541 while (!recovery_aborted && applier_monitoring)
542 {
543 size_t queue_size = applier_module->get_message_queue_size();
544 if (queue_size <= RECOVERY_TRANSACTION_THRESHOLD)
545 {
546 if (recovery_completion_policy == RECOVERY_POLICY_WAIT_EXECUTED)
547 {
548 int error= applier_module->wait_for_applier_event_execution(1, false);
549 if (!error)
550 applier_monitoring= false;
551 /* purecov: begin inspected */
552 if (error == -2) //error when waiting
553 {
554 applier_monitoring= false;
555 log_message(MY_ERROR_LEVEL,
556 "It is not possible to ensure the execution of group"
557 " transactions received during recovery.");
558 DBUG_RETURN(1);
559 }
560 /* purecov: end */
561 }
562 else
563 {
564 applier_monitoring= false;
565 }
566 }
567 else
568 {
569 my_sleep(100 * queue_size);
570 }
571 }
572
573 if (applier_module->get_applier_status() == APPLIER_ERROR &&
574 !recovery_aborted)
575 DBUG_RETURN(1); /* purecov: inspected */
576
577 DBUG_RETURN(0);
578 }
579
notify_group_recovery_end()580 void Recovery_module::notify_group_recovery_end()
581 {
582 DBUG_ENTER("Recovery_module::notify_group_recovery_end");
583
584 Recovery_message recovery_msg(Recovery_message::RECOVERY_END_MESSAGE,
585 local_member_info->get_uuid());
586 enum_gcs_error msg_error= gcs_module->send_message(recovery_msg);
587 if (msg_error != GCS_OK)
588 {
589 log_message(MY_ERROR_LEVEL,
590 "Error while sending message for group replication recovery"); /* purecov: inspected */
591 }
592
593 DBUG_VOID_RETURN;
594 }
595
is_own_event_channel(my_thread_id id)596 bool Recovery_module::is_own_event_channel(my_thread_id id)
597 {
598 DBUG_ENTER("Recovery_module::is_own_event_channel");
599 DBUG_RETURN(recovery_state_transfer.is_own_event_channel(id));
600 }
601