1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include <algorithm>
24 #include <string>
25 #include <sstream>
26 #include <vector>
27
28 #include "gcs_event_handlers.h"
29 #include "plugin.h"
30 #include "pipeline_stats.h"
31 #include "single_primary_message.h"
32
33 using std::vector;
34
35
36 Plugin_gcs_events_handler::
Plugin_gcs_events_handler(Applier_module_interface * applier_module,Recovery_module * recovery_module,Plugin_gcs_view_modification_notifier * vc_notifier,Compatibility_module * compatibility_module,ulong components_stop_timeout)37 Plugin_gcs_events_handler(Applier_module_interface* applier_module,
38 Recovery_module* recovery_module,
39 Plugin_gcs_view_modification_notifier* vc_notifier,
40 Compatibility_module* compatibility_module,
41 ulong components_stop_timeout)
42 : applier_module(applier_module), recovery_module(recovery_module),
43 view_change_notifier(vc_notifier),
44 compatibility_manager(compatibility_module),
45 stop_wait_timeout(components_stop_timeout)
46 {
47 this->temporary_states= new std::set<Group_member_info*,
48 Group_member_info_pointer_comparator>();
49 this->joiner_compatibility_status= new st_compatibility_types(INCOMPATIBLE);
50
51 #ifndef NDEBUG
52 set_number_of_members_on_view_changed_to_10= false;
53 DBUG_EXECUTE_IF("group_replication_set_number_of_members_on_view_changed_to_10",
54 { set_number_of_members_on_view_changed_to_10= true; };);
55 #endif
56 }
57
~Plugin_gcs_events_handler()58 Plugin_gcs_events_handler::~Plugin_gcs_events_handler()
59 {
60 delete temporary_states;
61 delete joiner_compatibility_status;
62 }
63
64 void
on_message_received(const Gcs_message & message) const65 Plugin_gcs_events_handler::on_message_received(const Gcs_message& message) const
66 {
67 Plugin_gcs_message::enum_cargo_type message_type=
68 Plugin_gcs_message::get_cargo_type(
69 message.get_message_data().get_payload());
70
71 switch (message_type)
72 {
73 case Plugin_gcs_message::CT_TRANSACTION_MESSAGE:
74 handle_transactional_message(message);
75 break;
76
77 case Plugin_gcs_message::CT_CERTIFICATION_MESSAGE:
78 handle_certifier_message(message);
79 break;
80
81 case Plugin_gcs_message::CT_RECOVERY_MESSAGE:
82 handle_recovery_message(message);
83 break;
84
85 case Plugin_gcs_message::CT_PIPELINE_STATS_MEMBER_MESSAGE:
86 handle_stats_message(message);
87 break;
88
89 case Plugin_gcs_message::CT_SINGLE_PRIMARY_MESSAGE:
90 handle_single_primary_message(message);
91 break;
92
93 default:
94 break; /* purecov: inspected */
95 }
96 }
97
98 void
99 Plugin_gcs_events_handler::
handle_transactional_message(const Gcs_message & message) const100 handle_transactional_message(const Gcs_message& message) const
101 {
102 if ( (local_member_info->get_recovery_status() == Group_member_info::MEMBER_IN_RECOVERY ||
103 local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE) &&
104 this->applier_module)
105 {
106 const unsigned char* payload_data= NULL;
107 uint64 payload_size= 0;
108 Plugin_gcs_message::get_first_payload_item_raw_data(
109 message.get_message_data().get_payload(),
110 &payload_data, &payload_size);
111
112 this->applier_module->handle(payload_data, static_cast<ulong>(payload_size));
113 }
114 else
115 {
116 log_message(MY_ERROR_LEVEL,
117 "Message received while the plugin is not ready,"
118 " message discarded"); /* purecov: inspected */
119 }
120 }
121
122 void
handle_certifier_message(const Gcs_message & message) const123 Plugin_gcs_events_handler::handle_certifier_message(const Gcs_message& message) const
124 {
125 if (this->applier_module == NULL)
126 {
127 log_message(MY_ERROR_LEVEL,
128 "Message received without a proper group replication applier"); /* purecov: inspected */
129 return; /* purecov: inspected */
130 }
131
132 Certifier_interface *certifier=
133 this->applier_module->get_certification_handler()->get_certifier();
134
135 const unsigned char* payload_data= NULL;
136 uint64 payload_size= 0;
137 Plugin_gcs_message::get_first_payload_item_raw_data(
138 message.get_message_data().get_payload(),
139 &payload_data, &payload_size);
140
141 if (certifier->handle_certifier_data(payload_data,
142 static_cast<ulong>(payload_size),
143 message.get_origin()))
144 {
145 log_message(MY_ERROR_LEVEL, "Error processing message in Certifier"); /* purecov: inspected */
146 }
147 }
148
149 void
handle_recovery_message(const Gcs_message & message) const150 Plugin_gcs_events_handler::handle_recovery_message(const Gcs_message& message) const
151 {
152 Recovery_message recovery_message(message.get_message_data().get_payload(),
153 message.get_message_data().get_payload_length());
154
155 std::string member_uuid= recovery_message.get_member_uuid();
156
157 bool is_local= !member_uuid.compare(local_member_info->get_uuid());
158 if(is_local)
159 {
160 // Only change member status if member is still on recovery.
161 Group_member_info::Group_member_status member_status=
162 local_member_info->get_recovery_status();
163 if (member_status != Group_member_info::MEMBER_IN_RECOVERY)
164 {
165 log_message(MY_INFORMATION_LEVEL,
166 "This server was not declared online since it is on status %s",
167 Group_member_info::get_member_status_string(member_status)); /* purecov: inspected */
168 return; /* purecov: inspected */
169 }
170
171 log_message(MY_INFORMATION_LEVEL,
172 "This server was declared online within the replication group");
173
174 /**
175 Disable the read mode in the server if the member is:
176 - joining
177 - doesn't have a higher possible incompatible version
178 - We are not on Primary mode.
179 */
180 if (*joiner_compatibility_status != READ_COMPATIBLE &&
181 (local_member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY ||
182 !local_member_info->in_primary_mode()))
183 {
184 if (disable_server_read_mode(PSESSION_INIT_THREAD))
185 {
186 log_message(MY_WARNING_LEVEL,
187 "When declaring the plugin online it was not possible to "
188 "disable the server read mode. "
189 "Try to disable it manually."); /* purecov: inspected */
190 }
191 }
192
193 // The member is declared as online upon receiving this message
194 group_member_mgr->update_member_status(member_uuid,
195 Group_member_info::MEMBER_ONLINE);
196 }
197 else
198 {
199 Group_member_info* member_info= group_member_mgr->get_group_member_info(member_uuid);
200 if (member_info != NULL)
201 {
202 log_message(MY_INFORMATION_LEVEL,
203 "The member with address %s:%u was declared online within the "
204 "replication group",
205 member_info->get_hostname().c_str(), member_info->get_port());
206 delete member_info;
207
208 // The member is declared as online upon receiving this message
209 // We need to run this before running update_recovery_process
210 group_member_mgr->update_member_status(member_uuid,
211 Group_member_info::MEMBER_ONLINE);
212
213 if (local_member_info->get_recovery_status() ==
214 Group_member_info::MEMBER_IN_RECOVERY)
215 {
216 /*
217 Inform recovery of a possible new donor
218 */
219 recovery_module->update_recovery_process(false, false);
220 }
221 }
222 }
223
224 /*
225 Check if we were waiting for some server to recover to
226 elect a new leader.
227
228 Following line protects against servers joining the group
229 while the bootstrapped node has not yet finished recovery.
230 Therefore, it is going to become primary when it finishes recovery.
231 */
232 this->handle_leader_election_if_needed();
233 }
234
235 void
handle_stats_message(const Gcs_message & message) const236 Plugin_gcs_events_handler::handle_stats_message(const Gcs_message& message) const
237 {
238 if (this->applier_module == NULL)
239 {
240 log_message(MY_ERROR_LEVEL,
241 "Message received without a proper group replication applier"); /* purecov: inspected */
242 return; /* purecov: inspected */
243 }
244
245 this->applier_module->get_flow_control_module()
246 ->handle_stats_data(message.get_message_data().get_payload(),
247 message.get_message_data().get_payload_length(),
248 message.get_origin().get_member_id());
249 }
250
251 void
handle_single_primary_message(const Gcs_message & message) const252 Plugin_gcs_events_handler::handle_single_primary_message(const Gcs_message& message) const
253 {
254 if (this->applier_module == NULL)
255 {
256 log_message(MY_ERROR_LEVEL,
257 "Message received without a proper group replication applier"); /* purecov: inspected */
258 return; /* purecov: inspected */
259 }
260
261 Single_primary_message
262 single_primary_message(message.get_message_data().get_payload(),
263 message.get_message_data().get_payload_length());
264
265 if (single_primary_message.get_single_primary_message_type() ==
266 Single_primary_message::SINGLE_PRIMARY_QUEUE_APPLIED_MESSAGE)
267 {
268 Single_primary_action_packet *single_primary_action=
269 new Single_primary_action_packet(Single_primary_action_packet::QUEUE_APPLIED);
270 this->applier_module->add_single_primary_action_packet(single_primary_action);
271 }
272 }
273
274 void
on_suspicions(const std::vector<Gcs_member_identifier> & members,const std::vector<Gcs_member_identifier> & unreachable) const275 Plugin_gcs_events_handler::on_suspicions(const std::vector<Gcs_member_identifier>& members,
276 const std::vector<Gcs_member_identifier>& unreachable) const
277 {
278 if (members.empty() && unreachable.empty()) // nothing to do
279 return; /* purecov: inspected */
280
281 assert(members.size() >= unreachable.size());
282
283 std::vector<Gcs_member_identifier> tmp_unreachable(unreachable);
284 std::vector<Gcs_member_identifier>::const_iterator mit;
285 std::vector<Gcs_member_identifier>::iterator uit;
286
287 if (!members.empty())
288 {
289 for (mit= members.begin(); mit != members.end(); mit ++)
290 {
291 Gcs_member_identifier member= *mit;
292 Group_member_info* member_info=
293 group_member_mgr->get_group_member_info_by_member_id(member);
294
295 if (member_info == NULL) //Trying to update a non-existing member
296 continue; /* purecov: inspected */
297
298 uit= std::find(tmp_unreachable.begin(), tmp_unreachable.end(), member);
299 if (uit != tmp_unreachable.end())
300 {
301 if (!member_info->is_unreachable())
302 log_message(MY_WARNING_LEVEL,
303 "Member with address %s:%u has become unreachable.",
304 member_info->get_hostname().c_str(), member_info->get_port());
305
306 group_member_mgr->set_member_unreachable(member_info->get_uuid());
307
308 // remove to not check again against this one
309 tmp_unreachable.erase(uit);
310 }
311 else
312 {
313 if (member_info->is_unreachable())
314 log_message(MY_WARNING_LEVEL,
315 "Member with address %s:%u is reachable again.",
316 member_info->get_hostname().c_str(), member_info->get_port());
317
318 group_member_mgr->set_member_reachable(member_info->get_uuid());
319 }
320
321 delete member_info;
322 }
323 }
324
325 if ((members.size() - unreachable.size()) <= (members.size() / 2))
326 {
327 if (!group_partition_handler->get_timeout_on_unreachable())
328 log_message(MY_ERROR_LEVEL,
329 "This server is not able to reach a majority of members "
330 "in the group. This server will now block all updates. "
331 "The server will remain blocked until contact with the "
332 "majority is restored. "
333 "It is possible to use group_replication_force_members "
334 "to force a new group membership.");
335 else
336 log_message(MY_ERROR_LEVEL,
337 "This server is not able to reach a majority of members "
338 "in the group. This server will now block all updates. "
339 "The server will remain blocked for the next %lu seconds. "
340 "Unless contact with the majority is restored, after this "
341 "time the member will error out and leave the group. "
342 "It is possible to use group_replication_force_members "
343 "to force a new group membership.",
344 group_partition_handler->get_timeout_on_unreachable());
345
346 if (!group_partition_handler->is_partition_handler_running() &&
347 !group_partition_handler->is_partition_handling_terminated())
348 group_partition_handler->launch_partition_handler_thread();
349 }
350 else
351 {
352 /*
353 This code is present on on_view_changed and on_suspicions as no assumption
354 can be made about the order in which these methods are invoked.
355 */
356 if (group_partition_handler->is_member_on_partition())
357 {
358 if (group_partition_handler->abort_partition_handler_if_running())
359 {
360 log_message(MY_WARNING_LEVEL,
361 "A group membership change was received but the plugin is "
362 "already leaving due to the configured timeout on "
363 "group_replication_unreachable_majority_timeout option.");
364 }
365 else
366 {
367 /* If it was not running or we canceled it in time */
368 log_message(MY_WARNING_LEVEL,
369 "The member has resumed contact with a majority of the "
370 "members in the group. Regular operation is restored and "
371 "transactions are unblocked.");
372 }
373 }
374 }
375 }
376
377 void
log_members_leaving_message(const Gcs_view & new_view) const378 Plugin_gcs_events_handler::log_members_leaving_message(const Gcs_view& new_view) const
379 {
380 std::string members_leaving;
381 std::string primary_member_host;
382
383 get_hosts_from_view(new_view.get_leaving_members(), members_leaving, primary_member_host);
384
385 log_message(MY_WARNING_LEVEL,
386 "Members removed from the group: %s",
387 members_leaving.c_str());
388
389 if (!primary_member_host.empty())
390 log_message(MY_INFORMATION_LEVEL,
391 "Primary server with address %s left the group. "
392 "Electing new Primary.",
393 primary_member_host.c_str());
394 }
395
396 void
log_members_joining_message(const Gcs_view & new_view) const397 Plugin_gcs_events_handler::log_members_joining_message(const Gcs_view& new_view) const
398 {
399 std::string members_joining;
400 std::string primary_member_host;
401
402 get_hosts_from_view(new_view.get_joined_members(), members_joining, primary_member_host);
403
404 log_message(MY_INFORMATION_LEVEL,
405 "Members joined the group: %s",
406 members_joining.c_str());
407 }
408
409 void
get_hosts_from_view(const std::vector<Gcs_member_identifier> & members,std::string & all_hosts,std::string & primary_host) const410 Plugin_gcs_events_handler::get_hosts_from_view(const std::vector<Gcs_member_identifier> &members,
411 std::string& all_hosts, std::string& primary_host) const
412 {
413 std::stringstream hosts_string;
414 std::stringstream primary_string;
415 std::vector<Gcs_member_identifier>::const_iterator all_members_it= members.begin();
416
417 while (all_members_it != members.end())
418 {
419 Group_member_info* member_info= group_member_mgr->
420 get_group_member_info_by_member_id((*all_members_it));
421 all_members_it++;
422
423 if (member_info == NULL)
424 continue;
425
426 hosts_string << member_info->get_hostname() << ":" << member_info->get_port();
427
428 /**
429 Check in_primary_mode has been added for safety.
430 Since primary role is in single-primary mode.
431 */
432 if (member_info->in_primary_mode() &&
433 member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY)
434 {
435 if (primary_string.rdbuf()->in_avail() != 0)
436 primary_string << ", ";
437 primary_string << member_info->get_hostname() << ":" << member_info->get_port();
438 }
439
440 if (all_members_it != members.end())
441 {
442 hosts_string << ", ";
443 }
444
445 delete member_info;
446 }
447 all_hosts.assign (hosts_string.str());
448 primary_host.assign (primary_string.str());
449 }
450
451 void
on_view_changed(const Gcs_view & new_view,const Exchanged_data & exchanged_data) const452 Plugin_gcs_events_handler::on_view_changed(const Gcs_view& new_view,
453 const Exchanged_data &exchanged_data)
454 const
455 {
456 bool is_leaving= is_member_on_vector(new_view.get_leaving_members(),
457 local_member_info->get_gcs_member_id());
458
459 bool is_joining= is_member_on_vector(new_view.get_joined_members(),
460 local_member_info->get_gcs_member_id());
461
462 // Was member expelled from the group due to network failures?
463 if (this->was_member_expelled_from_group(new_view))
464 return;
465
466 //An early error on the applier can render the join invalid
467 if (is_joining &&
468 local_member_info->get_recovery_status() == Group_member_info::MEMBER_ERROR)
469 {
470 log_message(MY_ERROR_LEVEL,
471 "There was a previous plugin error while the member joined the group. "
472 "The member will now exit the group.");
473 view_change_notifier->cancel_view_modification(GROUP_REPLICATION_CONFIGURATION_ERROR);
474 return;
475 }
476
477 /*
478 This code is present on on_view_changed and on_suspicions as no assumption
479 can be made about the order in which these methods are invoked.
480 */
481 if (!is_leaving && group_partition_handler->is_member_on_partition())
482 {
483 if (group_partition_handler->abort_partition_handler_if_running())
484 {
485 log_message(MY_WARNING_LEVEL,
486 "A group membership change was received but the plugin is "
487 "already leaving due to the configured timeout on "
488 "group_replication_unreachable_majority_timeout option.");
489 return;
490 }
491 else
492 {
493 /* If it was not running or we canceled it in time */
494 log_message(MY_WARNING_LEVEL,
495 "The member has resumed contact with a majority of the "
496 "members in the group. Regular operation is restored and "
497 "transactions are unblocked.");
498 }
499 }
500
501 /*
502 Maybe on_suspicions we already executed the above block but it was too late.
503 No point in repeating the message, but we need to break the view install.
504 */
505 if (!is_leaving && group_partition_handler->is_partition_handling_terminated())
506 return;
507
508 if (!is_leaving && new_view.get_leaving_members().size() > 0)
509 log_members_leaving_message(new_view);
510
511 //update the Group Manager with all the received states
512 if (update_group_info_manager(new_view, exchanged_data, is_joining, is_leaving) &&
513 is_joining)
514 {
515 view_change_notifier->cancel_view_modification();
516 return;
517 }
518
519 if (!is_joining && new_view.get_joined_members().size() > 0)
520 log_members_joining_message(new_view);
521
522 //enable conflict detection if someone on group have it enabled
523 if (local_member_info->in_primary_mode() &&
524 group_member_mgr->is_conflict_detection_enabled())
525 {
526 Certifier_interface *certifier=
527 this->applier_module->get_certification_handler()->get_certifier();
528 certifier->enable_conflict_detection();
529 }
530
531 //Inform any interested handler that the view changed
532 View_change_pipeline_action *vc_action=
533 new View_change_pipeline_action(is_leaving);
534
535 applier_module->handle_pipeline_action(vc_action);
536 delete vc_action;
537
538 //Update any recovery running process and handle state changes
539 this->handle_leaving_members(new_view, is_joining, is_leaving);
540
541 //Handle joining members
542 this->handle_joining_members(new_view, is_joining, is_leaving);
543
544 if (is_leaving)
545 gcs_module->leave_coordination_member_left();
546
547 // Handle leader election if needed
548 this->handle_leader_election_if_needed();
549
550 //Signal that the injected view was delivered
551 if (view_change_notifier->is_injected_view_modification())
552 view_change_notifier->end_view_modification();
553
554 if (!is_leaving)
555 {
556 log_message(MY_INFORMATION_LEVEL,
557 "Group membership changed to %s on view %s.",
558 group_member_mgr->get_string_current_view_active_hosts().c_str(),
559 new_view.get_view_id().get_representation().c_str());
560 }
561 else
562 {
563 log_message(MY_INFORMATION_LEVEL,
564 "Group membership changed: This member has left the group.");
565 }
566 }
567
568 bool
was_member_expelled_from_group(const Gcs_view & view) const569 Plugin_gcs_events_handler::was_member_expelled_from_group(const Gcs_view& view) const
570 {
571 DBUG_ENTER("Plugin_gcs_events_handler::was_member_expelled_from_group");
572 bool result= false;
573
574 if (view.get_error_code() == Gcs_view::MEMBER_EXPELLED)
575 {
576 result= true;
577 log_message(MY_ERROR_LEVEL,
578 "Member was expelled from the group due to network failures, "
579 "changing member status to ERROR.");
580
581 // Delete all members from group info except the local one.
582 std::vector<Group_member_info*> to_update;
583 group_member_mgr->update(&to_update);
584 group_member_mgr->update_member_status(local_member_info->get_uuid(),
585 Group_member_info::MEMBER_ERROR);
586 group_member_mgr->update_member_role(local_member_info->get_uuid(),
587 Group_member_info::MEMBER_ROLE_SECONDARY);
588
589 bool aborted= false;
590 applier_module->add_suspension_packet();
591 int error= applier_module->wait_for_applier_complete_suspension(&aborted, false);
592 /*
593 We do not need to kill ongoing transactions when the applier
594 is already stopping.
595 */
596 if (!error)
597 applier_module->kill_pending_transactions(true, true);
598 }
599
600 DBUG_RETURN(result);
601 }
602
603 std::vector<Group_member_info*>::iterator
sort_and_get_lowest_version_member_position(std::vector<Group_member_info * > * all_members_info) const604 Plugin_gcs_events_handler::sort_and_get_lowest_version_member_position(
605 std::vector<Group_member_info*>* all_members_info) const
606 {
607 std::vector<Group_member_info*>::iterator it;
608
609 // sort in ascending order of lower member version
610 std::sort(all_members_info->begin(), all_members_info->end(),
611 Group_member_info::comparator_group_member_version);
612
613 /* if vector contains only single version then leader should be picked from
614 all members
615 */
616 std::vector<Group_member_info*>::iterator lowest_version_end=
617 all_members_info->end();
618
619 /* first member will have lowest version as members are already
620 sorted above using member_version.
621 */
622 it= all_members_info->begin();
623 Group_member_info* first_member= *it;
624 uint32 lowest_major_version=
625 first_member->get_member_version().get_major_version();
626
627 /* to avoid read compatibility issue leader should be picked only from lowest
628 version members so save position where member version differs.
629
630 set lowest_version_end when major version changes
631
632 eg: for a list: 5.7.18, 5.7.18, 5.7.19, 5.7.20, 5.7.21, 8.0.2
633 the members to be considered for election will be:
634 5.7.18, 5.7.18, 5.7.19, 5.7.20, 5.7.21
635 and server_uuid based algorithm will be used to elect primary
636
637 eg: for a list: 5.7.20, 5.7.21, 8.0.2, 8.0.2
638 the members to be considered for election will be:
639 5.7.20, 5.7.21
640 and member weight based algorithm will be used to elect primary
641 */
642 for(it= all_members_info->begin() + 1; it != all_members_info->end(); it++)
643 {
644 if (lowest_major_version != (*it)->get_member_version().get_major_version())
645 {
646 lowest_version_end= it;
647 break;
648 }
649 }
650
651 return lowest_version_end;
652 }
653
sort_members_for_election(std::vector<Group_member_info * > * all_members_info,std::vector<Group_member_info * >::iterator lowest_version_end) const654 void Plugin_gcs_events_handler::sort_members_for_election(
655 std::vector<Group_member_info*>* all_members_info,
656 std::vector<Group_member_info*>::iterator lowest_version_end) const
657 {
658 Group_member_info* first_member= *(all_members_info->begin());
659 Member_version lowest_version= first_member->get_member_version();
660
661 // sort only lower version members as they only will be needed to pick leader
662 if (lowest_version >= PRIMARY_ELECTION_MEMBER_WEIGHT_VERSION)
663 std::sort(all_members_info->begin(), lowest_version_end,
664 Group_member_info::comparator_group_member_weight);
665 else
666 std::sort(all_members_info->begin(), lowest_version_end,
667 Group_member_info::comparator_group_member_uuid);
668 }
669
handle_leader_election_if_needed() const670 void Plugin_gcs_events_handler::handle_leader_election_if_needed() const
671 {
672 // take action if in single leader mode
673 if (!local_member_info->in_primary_mode())
674 return;
675
676 bool am_i_leaving= true;
677 #ifndef NDEBUG
678 int n=0;
679 #endif
680 Group_member_info* the_primary= NULL;
681 std::vector<Group_member_info*>* all_members_info=
682 group_member_mgr->get_all_members();
683
684 std::vector<Group_member_info*>::iterator it;
685 std::vector<Group_member_info*>::iterator lowest_version_end;
686
687 /* sort members based on member_version and get first iterator position
688 where member version differs
689 */
690 lowest_version_end=
691 sort_and_get_lowest_version_member_position(all_members_info);
692
693 /* Sort lower version members based on member weight if member version
694 is greater than equal to PRIMARY_ELECTION_MEMBER_WEIGHT_VERSION or uuid.
695 */
696 sort_members_for_election(all_members_info, lowest_version_end);
697
698 /*
699 1. Iterate over the list of all members and check if there is a primary
700 defined already.
701 2. Check if I am leaving the group or not.
702 */
703 for(it= all_members_info->begin(); it != all_members_info->end(); it++)
704 {
705 #ifndef NDEBUG
706 assert(!(n > 1));
707 #endif
708
709 Group_member_info* member= *it;
710 if (the_primary == NULL &&
711 member->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY)
712 {
713 the_primary= member;
714 #ifndef NDEBUG
715 n++;
716 #endif
717 }
718
719 /* Found the primary and it is me. Check that I am not offline. */
720 if (!member->get_uuid().compare(local_member_info->get_uuid()))
721 {
722 am_i_leaving= member->get_recovery_status() == Group_member_info::MEMBER_OFFLINE;
723 }
724 }
725
726 /* If I am not leaving, then run election. Otherwise do nothing. */
727 if (!am_i_leaving)
728 {
729 Sql_service_command_interface *sql_command_interface=
730 new Sql_service_command_interface();
731 bool skip_set_super_readonly= false;
732 if (sql_command_interface == NULL ||
733 sql_command_interface->
734 establish_session_connection(PSESSION_INIT_THREAD,
735 get_plugin_pointer()) ||
736 sql_command_interface->set_interface_user(GROUPREPL_USER))
737 {
738 log_message(MY_WARNING_LEVEL,
739 "Unable to open session to (re)set read only mode. Skipping."); /* purecov: inspected */
740 /*
741 Unable to open session to (re)set read only mode.
742 Mark that we should skipping that part code.
743 */
744 skip_set_super_readonly= true; /* purecov: inspected */
745 }
746
747
748 /*
749 There is no primary in the member list. Pick one from
750 the list of ONLINE members. The picked one is the first
751 viable on in the list that was sorted at the beginning
752 of this function.
753
754 The assumption is that std::sort(...) is deterministic
755 on all members.
756
757 To pick leaders from only lowest version members loop
758 till lowest_version_end.
759 */
760 if (the_primary == NULL)
761 {
762 for (it= all_members_info->begin();
763 it != lowest_version_end && the_primary == NULL;
764 it++)
765 {
766 Group_member_info* mi= *it;
767
768 assert(mi);
769 if (mi &&
770 mi->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
771 the_primary= mi;
772 }
773 }
774
775 // take actions on the primary
776 if (the_primary != NULL)
777 {
778 std::string primary_uuid= the_primary->get_uuid();
779 const bool is_primary_local= !primary_uuid.compare(local_member_info->get_uuid());
780 const bool has_primary_changed=
781 Group_member_info::MEMBER_ROLE_PRIMARY != the_primary->get_role();
782
783 if (has_primary_changed)
784 {
785 /*
786 A new primary was elected, inform certifier to enable conflict
787 detection until the new primary apply all relay logs.
788 */
789 Single_primary_action_packet *single_primary_action=
790 new Single_primary_action_packet(Single_primary_action_packet::NEW_PRIMARY);
791 applier_module->add_single_primary_action_packet(single_primary_action);
792
793 // declare this as the new primary
794 group_member_mgr->update_member_role(primary_uuid,
795 Group_member_info::MEMBER_ROLE_PRIMARY);
796
797 log_message(MY_INFORMATION_LEVEL, "A new primary with address %s:%u "
798 "was elected, enabling conflict detection until the new "
799 "primary applies all relay logs.",
800 the_primary->get_hostname().c_str(),
801 the_primary->get_port());
802
803 // Check if the session was established, it can (re)set read only mode.
804 if (!skip_set_super_readonly)
805 {
806 if (is_primary_local)
807 {
808 if (disable_super_read_only_mode(sql_command_interface))
809 {
810 log_message(MY_WARNING_LEVEL,
811 "Unable to disable super read only flag. "
812 "Try to disable it manually."); /* purecov: inspected */
813 }
814 }
815 else
816 {
817 if (enable_super_read_only_mode(sql_command_interface))
818 {
819 log_message(MY_WARNING_LEVEL,
820 "Unable to set super read only flag. "
821 "Try to set it manually."); /* purecov: inspected */
822 }
823 }
824 }
825 /* code position limits messaging to primary change */
826 if (is_primary_local)
827 log_message(MY_INFORMATION_LEVEL,
828 "This server is working as primary member.");
829 else
830 log_message(MY_INFORMATION_LEVEL,
831 "This server is working as secondary member with primary "
832 "member address %s:%u.",
833 the_primary->get_hostname().c_str(),
834 the_primary->get_port());
835 }
836 }
837 else if (!skip_set_super_readonly)
838 {
839 /*
840 If there is only one server in the group, no need to pollute the error log with
841 an entry about no suitable candidate while (quick) recovery is running for the first member.
842 */
843 if (all_members_info->size() != 1)
844 {
845 // There are no servers in the group or they are all
846 // recoverying WARN to the user
847 log_message(MY_WARNING_LEVEL,
848 "Unable to set any member as primary. No suitable candidate."); /* purecov: inspected */
849 }
850
851 if(enable_super_read_only_mode(sql_command_interface))
852 {
853 log_message(MY_WARNING_LEVEL,
854 "Unable to set super read only flag. "
855 "Try to set it manually."); /* purecov: inspected */
856 }
857 }
858 delete sql_command_interface;
859 }
860
861 //clean the members
862 for (it= all_members_info->begin(); it!= all_members_info->end(); it++)
863 {
864 delete (*it);
865 }
866 delete all_members_info;
867 }
868
869 int Plugin_gcs_events_handler::
update_group_info_manager(const Gcs_view & new_view,const Exchanged_data & exchanged_data,bool is_joining,bool is_leaving) const870 update_group_info_manager(const Gcs_view& new_view,
871 const Exchanged_data &exchanged_data,
872 bool is_joining,
873 bool is_leaving) const
874 {
875 int error= 0;
876
877 //update the Group Manager with all the received states
878 vector<Group_member_info*> to_update;
879
880 if(!is_leaving)
881 {
882 //Process local state of exchanged data.
883 if ((error= process_local_exchanged_data(exchanged_data, is_joining)))
884 goto err;
885
886 to_update.insert(to_update.end(),
887 temporary_states->begin(),
888 temporary_states->end());
889
890 //Clean-up members that are leaving
891 vector<Gcs_member_identifier> leaving= new_view.get_leaving_members();
892 vector<Gcs_member_identifier>::iterator left_it;
893 vector<Group_member_info*>::iterator to_update_it;
894 for(left_it= leaving.begin(); left_it != leaving.end(); left_it++)
895 {
896 for(to_update_it= to_update.begin();
897 to_update_it != to_update.end();
898 to_update_it++)
899 {
900 if( (*left_it) == (*to_update_it)->get_gcs_member_id() )
901 {
902 /* purecov: begin inspected */
903 delete (*to_update_it);
904 to_update.erase(to_update_it);
905 break;
906 /* purecov: end */
907 }
908 }
909 }
910 }
911 group_member_mgr->update(&to_update);
912 temporary_states->clear();
913
914 err:
915 assert(temporary_states->size() == 0);
916 return error;
917 }
918
handle_joining_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const919 void Plugin_gcs_events_handler::handle_joining_members(const Gcs_view& new_view,
920 bool is_joining,
921 bool is_leaving)
922 const
923 {
924 //nothing to do here
925 size_t number_of_members= new_view.get_members().size();
926 if (number_of_members == 0 || is_leaving)
927 {
928 return;
929 }
930 size_t number_of_joining_members= new_view.get_joined_members().size();
931 size_t number_of_leaving_members= new_view.get_leaving_members().size();
932
933 /*
934 If we are joining, 3 scenarios exist:
935 1) We are incompatible with the group so we leave
936 2) We are alone so we declare ourselves online
937 3) We are in a group and recovery must happen
938 */
939 if (is_joining)
940 {
941 int error= 0;
942 if ((error= check_group_compatibility(number_of_members)))
943 {
944 view_change_notifier->cancel_view_modification(error);
945 return;
946 }
947 view_change_notifier->end_view_modification();
948
949 /**
950 On the joining list there can be 2 types of members: online/recovering
951 members coming from old views where this member was not present and new
952 joining members that still have their status as offline.
953
954 As so, for offline members, their state is changed to member_in_recovery
955 after member compatibility with group is checked.
956 */
957 update_member_status(new_view.get_joined_members(),
958 Group_member_info::MEMBER_IN_RECOVERY,
959 Group_member_info::MEMBER_OFFLINE,
960 Group_member_info::MEMBER_END);
961 /**
962 Set the read mode if not set during start (auto-start)
963 */
964 if (enable_server_read_mode(PSESSION_INIT_THREAD))
965 {
966 log_message(MY_ERROR_LEVEL,
967 "Error when activating super_read_only mode on start. "
968 "The member will now exit the group.");
969 group_member_mgr->update_member_status(local_member_info->get_uuid(),
970 Group_member_info::MEMBER_ERROR);
971 this->leave_group_on_error();
972 return;
973 }
974
975 /**
976 On the joining member log an error when group contains more members than
977 auto_increment_increment variable.
978 */
979 ulong auto_increment_increment= get_auto_increment_increment();
980
981 if (new_view.get_members().size() > auto_increment_increment)
982 {
983 log_message(MY_ERROR_LEVEL,
984 "Group contains %lu members which is greater than"
985 " auto_increment_increment value of %lu."
986 " This can lead to an higher rate of transactional aborts.",
987 new_view.get_members().size(), auto_increment_increment);
988 }
989
990 /*
991 During the view change, a suspension packet is sent to the applier module
992 so all posterior transactions inbound are not applied, but queued, until
993 the member finishes recovery.
994 */
995 applier_module->add_suspension_packet();
996
997 /*
998 Marking the view in the joiner since the incoming event from the donor
999 is discarded in the Recovery process.
1000 */
1001
1002 std::string view_id= new_view.get_view_id().get_representation();
1003 View_change_packet * view_change_packet= new View_change_packet(view_id);
1004 applier_module->add_view_change_packet(view_change_packet);
1005
1006 /*
1007 Launch the recovery thread so we can receive missing data and the
1008 certification information needed to apply the transactions queued after
1009 this view change.
1010
1011 Recovery receives a view id, as a means to identify logically on joiners
1012 and donors alike where this view change happened in the data. With that
1013 info we can then ask for the donor to give the member all the data until
1014 this point in the data, and the certification information for all the data
1015 that comes next.
1016
1017 When alone, the server will go through Recovery to wait for the consumption
1018 of his applier relay log that may contain transactions from previous
1019 executions.
1020 */
1021 recovery_module->start_recovery(new_view.get_group_id().get_group_id(),
1022 new_view.get_view_id()
1023 .get_representation());
1024 }
1025 /*
1026 The condition
1027 number_of_joining_members == 0 && number_of_leaving_members == 0
1028 is needed due to the following scenario:
1029 We have a group with 2 members, one does crash (M2), and the group
1030 blocks with M1 ONLINE and M2 UNREACHABLE.
1031 Then M2 rejoins and the group unblocks.
1032 When M2 rejoins the group, from M2 perspective it is joining
1033 the group, that is, it does receive a view (V3) on which it is
1034 marked as a joining member.
1035 But from M1 perspective, M2 may never left, so the view delivered
1036 (V3) has the same members as V2, that is, M1 and M2, without joining
1037 members, thence we need to consider that condition and log that view.
1038 */
1039 else if (number_of_joining_members > 0 ||
1040 (number_of_joining_members == 0 && number_of_leaving_members == 0))
1041 {
1042 /**
1043 On the joining list there can be 2 types of members: online/recovering
1044 members coming from old views where this member was not present and new
1045 joining members that still have their status as offline.
1046
1047 As so, for offline members, their state is changed to member_in_recovery.
1048 */
1049 update_member_status(new_view.get_joined_members(),
1050 Group_member_info::MEMBER_IN_RECOVERY,
1051 Group_member_info::MEMBER_OFFLINE,
1052 Group_member_info::MEMBER_END);
1053 /**
1054 If not a joining member, all members should record on their own binlogs a
1055 marking event that identifies the frontier between the data the joining
1056 member was to receive and the data it should queue.
1057 The joining member can then wait for this event to know it was all the
1058 needed data.
1059
1060 This packet will also pass in the certification process at this exact
1061 frontier giving us the opportunity to gather the necessary certification
1062 information to certify the transactions that will come after this view
1063 change. If selected as a donor, this info will also be sent to the joiner.
1064
1065 Associated to this process, we collect and intersect the executed GTID sets
1066 of all ONLINE members so we can cut the certification info to gather and
1067 transmit to the minimum.
1068 */
1069
1070 std::string view_id= new_view.get_view_id().get_representation();
1071 View_change_packet * view_change_packet= new View_change_packet(view_id);
1072 collect_members_executed_sets(new_view.get_joined_members(), view_change_packet);
1073 applier_module->add_view_change_packet(view_change_packet);
1074 }
1075 }
1076
1077 void
handle_leaving_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const1078 Plugin_gcs_events_handler::handle_leaving_members(const Gcs_view& new_view,
1079 bool is_joining,
1080 bool is_leaving)
1081 const
1082 {
1083 Group_member_info::Group_member_status member_status=
1084 local_member_info->get_recovery_status();
1085
1086 bool members_left= (new_view.get_leaving_members().size() > 0);
1087
1088 //if the member is joining or not in recovery, no need to update the process
1089 if (!is_joining && member_status == Group_member_info::MEMBER_IN_RECOVERY)
1090 {
1091 /*
1092 This method has 2 purposes:
1093 If a donor leaves, recovery needs to switch donor
1094 If this member leaves, recovery needs to shutdown.
1095 */
1096 recovery_module->update_recovery_process(members_left, is_leaving);
1097 }
1098
1099 if (members_left)
1100 {
1101 update_member_status(new_view.get_leaving_members(),
1102 Group_member_info::MEMBER_OFFLINE,
1103 Group_member_info::MEMBER_END,
1104 Group_member_info::MEMBER_ERROR);
1105 }
1106
1107 if (is_leaving)
1108 {
1109 view_change_notifier->end_view_modification();
1110 }
1111 }
1112
1113 bool
1114 Plugin_gcs_events_handler::
is_member_on_vector(const vector<Gcs_member_identifier> & members,const Gcs_member_identifier & member_id) const1115 is_member_on_vector(const vector<Gcs_member_identifier>& members,
1116 const Gcs_member_identifier& member_id)
1117 const
1118 {
1119 vector<Gcs_member_identifier>::const_iterator it;
1120
1121 it= std::find(members.begin(), members.end(), member_id);
1122
1123 return it != members.end();
1124 }
1125
1126 int
1127 Plugin_gcs_events_handler::
process_local_exchanged_data(const Exchanged_data & exchanged_data,bool is_joining) const1128 process_local_exchanged_data(const Exchanged_data &exchanged_data,
1129 bool is_joining)
1130 const
1131 {
1132 uint local_uuid_found= 0;
1133
1134 /*
1135 For now, we are only carrying Group Member Info on Exchangeable data
1136 Since we are receiving the state from all Group members, one shall
1137 store it in a set to ensure that we don't have repetitions.
1138
1139 All collected data will be given to Group Member Manager at view install
1140 time.
1141 */
1142 for (Exchanged_data::const_iterator exchanged_data_it= exchanged_data.begin();
1143 exchanged_data_it != exchanged_data.end();
1144 exchanged_data_it++)
1145 {
1146 const uchar* data= exchanged_data_it->second->get_payload();
1147 uint64 length= exchanged_data_it->second->get_payload_length();
1148 Gcs_member_identifier* member_id= exchanged_data_it->first;
1149 if (data == NULL)
1150 {
1151 /* purecov: begin inspected */
1152 Group_member_info * member_info= group_member_mgr->get_group_member_info_by_member_id(*member_id);
1153 if (member_info != NULL)
1154 {
1155 log_message(MY_ERROR_LEVEL, "Member with address '%s:%u' didn't provide any data"
1156 " during the last group change. Group"
1157 " information can be outdated and lead to"
1158 " errors on recovery",
1159 member_info->get_hostname().c_str(), member_info->get_port());
1160 delete member_info;
1161 }
1162 continue;
1163 /* purecov: end */
1164 }
1165
1166 //Process data provided by member.
1167 vector<Group_member_info*>* member_infos=
1168 group_member_mgr->decode(data, length);
1169
1170 //This construct is here in order to deallocate memory of duplicates
1171 vector<Group_member_info*>::iterator member_infos_it;
1172 for(member_infos_it= member_infos->begin();
1173 member_infos_it != member_infos->end();
1174 member_infos_it++)
1175 {
1176 if (local_member_info->get_uuid() == (*member_infos_it)->get_uuid())
1177 {
1178 local_uuid_found++;
1179 }
1180
1181 /*
1182 Accept only the information the member has about himself
1183 Information received about other members is probably outdated
1184 */
1185 if (local_uuid_found < 2 &&
1186 (*member_infos_it)->get_gcs_member_id() == *member_id)
1187 {
1188 this->temporary_states->insert((*member_infos_it));
1189 }
1190 else
1191 {
1192 delete (*member_infos_it); /* purecov: inspected */
1193 }
1194 }
1195
1196 member_infos->clear();
1197 delete member_infos;
1198
1199 if (local_uuid_found > 1)
1200 {
1201 if (is_joining)
1202 {
1203 log_message(MY_ERROR_LEVEL,
1204 "There is already a member with server_uuid %s. "
1205 "The member will now exit the group.",
1206 local_member_info->get_uuid().c_str());
1207 }
1208
1209 // Clean up temporary states.
1210 std::set<Group_member_info*,Group_member_info_pointer_comparator>::iterator
1211 temporary_states_it;
1212 for (temporary_states_it= temporary_states->begin();
1213 temporary_states_it != temporary_states->end();
1214 temporary_states_it++)
1215 {
1216 delete (*temporary_states_it);
1217 }
1218 temporary_states->clear();
1219
1220 return 1;
1221 }
1222 }
1223
1224 return 0;
1225 }
1226
1227 Gcs_message_data*
get_exchangeable_data() const1228 Plugin_gcs_events_handler::get_exchangeable_data() const
1229 {
1230 std::string server_executed_gtids;
1231 std::string applier_retrieved_gtids;
1232 Replication_thread_api applier_channel("group_replication_applier");
1233
1234 Sql_service_command_interface *sql_command_interface=
1235 new Sql_service_command_interface();
1236
1237 if (sql_command_interface->
1238 establish_session_connection(PSESSION_INIT_THREAD,
1239 get_plugin_pointer()) ||
1240 sql_command_interface->set_interface_user(GROUPREPL_USER)
1241 )
1242 {
1243 log_message(MY_WARNING_LEVEL,
1244 "Error when extracting information for group change. "
1245 "Operations and checks made to group joiners may be incomplete"); /* purecov: inspected */
1246 goto sending; /* purecov: inspected */
1247 }
1248
1249 if (sql_command_interface->get_server_gtid_executed(server_executed_gtids))
1250 {
1251 log_message(MY_WARNING_LEVEL,
1252 "Error when extracting this member GTID executed set. "
1253 "Operations and checks made to group joiners may be incomplete"); /* purecov: inspected */
1254 goto sending; /* purecov: inspected */
1255 }
1256 if (applier_channel.get_retrieved_gtid_set(applier_retrieved_gtids))
1257 {
1258 log_message(MY_WARNING_LEVEL,
1259 "Error when extracting this member retrieved set for its applier. "
1260 "Operations and checks made to group joiners may be incomplete"); /* purecov: inspected */
1261 }
1262
1263 group_member_mgr->update_gtid_sets(local_member_info->get_uuid(),
1264 server_executed_gtids,
1265 applier_retrieved_gtids);
1266 sending:
1267
1268 delete sql_command_interface;
1269
1270 std::vector<uchar> data;
1271
1272 Group_member_info* local_member_copy= new Group_member_info(*local_member_info);
1273 Group_member_info_manager_message *group_info_message=
1274 new Group_member_info_manager_message(local_member_copy);
1275 group_info_message->encode(&data);
1276 delete group_info_message;
1277
1278 Gcs_message_data* msg_data= new Gcs_message_data(0, data.size());
1279 msg_data->append_to_payload(&data.front(), data.size());
1280
1281 return msg_data;
1282 }
1283
1284 void
1285 Plugin_gcs_events_handler::
update_member_status(const vector<Gcs_member_identifier> & members,Group_member_info::Group_member_status status,Group_member_info::Group_member_status old_status_equal_to,Group_member_info::Group_member_status old_status_different_from) const1286 update_member_status(const vector<Gcs_member_identifier>& members,
1287 Group_member_info::Group_member_status status,
1288 Group_member_info::Group_member_status old_status_equal_to,
1289 Group_member_info::Group_member_status old_status_different_from)
1290 const
1291 {
1292 for (vector<Gcs_member_identifier>::const_iterator it= members.begin();
1293 it != members.end();
1294 ++it)
1295 {
1296 Gcs_member_identifier member = *it;
1297 Group_member_info* member_info=
1298 group_member_mgr->get_group_member_info_by_member_id(member);
1299
1300 if (member_info == NULL)
1301 {
1302 //Trying to update a non-existing member
1303 continue;
1304 }
1305
1306 // if (the old_status_equal_to is not defined or
1307 // the previous status is equal to old_status_equal_to)
1308 // and
1309 // (the old_status_different_from is not defined or
1310 // the previous status is different from old_status_different_from)
1311 if ((old_status_equal_to == Group_member_info::MEMBER_END ||
1312 member_info->get_recovery_status() == old_status_equal_to) &&
1313 (old_status_different_from == Group_member_info::MEMBER_END ||
1314 member_info->get_recovery_status() != old_status_different_from))
1315 {
1316 group_member_mgr->update_member_status(member_info->get_uuid(), status);
1317 }
1318
1319 delete member_info;
1320 }
1321 }
1322
1323 /**
1324 Here we check:
1325 1) If the number of members was exceeded
1326 2) If member version is compatible with the group
1327 3) If the gtid_assignment_block_size is equal to the group
1328 4) If the hash algorithm used is equal to the group
1329 5) If the member has more known transactions than the group
1330 6) If the member has the same configuration flags that the group has
1331 */
1332 int
check_group_compatibility(size_t number_of_members) const1333 Plugin_gcs_events_handler::check_group_compatibility(size_t number_of_members) const
1334 {
1335 /*
1336 Check if group size did reach the maximum number of members.
1337 */
1338 #ifndef NDEBUG
1339 if (set_number_of_members_on_view_changed_to_10)
1340 number_of_members= 10;
1341 #endif
1342 if (number_of_members > 9)
1343 {
1344 log_message(MY_ERROR_LEVEL,
1345 "The START GROUP_REPLICATION command failed since the group "
1346 "already has 9 members");
1347 return GROUP_REPLICATION_MAX_GROUP_SIZE;
1348 }
1349
1350 /*
1351 Check if the member is compatible with the group.
1352 It can be incompatible because its major version is lower or a rule says it.
1353 If incompatible notify whoever is waiting for the view with an error, so
1354 the plugin exits the group.
1355 */
1356 *joiner_compatibility_status= COMPATIBLE;
1357 int group_data_compatibility= 0;
1358 if (number_of_members > 1)
1359 {
1360 *joiner_compatibility_status= check_version_compatibility_with_group();
1361 group_data_compatibility= compare_member_transaction_sets();
1362 }
1363
1364 if (*joiner_compatibility_status == INCOMPATIBLE)
1365 {
1366 log_message(MY_ERROR_LEVEL,
1367 "Member version is incompatible with the group");
1368 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1369 }
1370
1371 /*
1372 All group members must have the same gtid_assignment_block_size
1373 and transaction-write-set-extraction value, if joiner has a
1374 different value it is not allowed to join.
1375 */
1376 if (number_of_members > 1 &&
1377 compare_member_option_compatibility())
1378 {
1379 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1380 }
1381
1382 /*
1383 Check that the joiner doesn't has more GTIDs than the rest of the group.
1384 All the executed and received transactions in the group are collected and
1385 merged into a GTID set and all joiner transactions must be contained in it.
1386 */
1387 if (group_data_compatibility)
1388 {
1389 if (group_data_compatibility > 0)
1390 {
1391 if(get_allow_local_disjoint_gtids_join())
1392 {
1393 log_message(MY_WARNING_LEVEL,
1394 "The member contains transactions not present in the group. "
1395 "It is only allowed to join due to "
1396 "group_replication_allow_local_disjoint_gtids_join option");
1397 }
1398 else {
1399 log_message(MY_ERROR_LEVEL,
1400 "The member contains transactions not present in the group. "
1401 "The member will now exit the group.");
1402 log_message(MY_INFORMATION_LEVEL,
1403 "To force this member into the group you can use the "
1404 "group_replication_allow_local_disjoint_gtids_join option");
1405 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1406 }
1407 }
1408 else //error
1409 {
1410 /* purecov: begin inspected */
1411 if(get_allow_local_disjoint_gtids_join())
1412 {
1413 log_message(MY_WARNING_LEVEL,
1414 "It was not possible to assess if the member has more "
1415 "transactions than the group. "
1416 "It is only allowed to join due to the "
1417 "group_replication_allow_local_disjoint_gtids_join option");
1418 }
1419 else {
1420 log_message(MY_ERROR_LEVEL,
1421 "It was not possible to assess if the member has more "
1422 "transactions than the group. "
1423 "The member will now exit the group.");
1424 log_message(MY_INFORMATION_LEVEL,
1425 "To force this member into the group you can use the "
1426 "group_replication_allow_local_disjoint_gtids_join option");
1427 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1428 }
1429 /* purecov: end */
1430 }
1431 }
1432
1433 return 0;
1434 }
1435
1436 Compatibility_type
check_version_compatibility_with_group() const1437 Plugin_gcs_events_handler::check_version_compatibility_with_group() const
1438 {
1439 bool override_lower_incompatibility= false;
1440 Compatibility_type compatibility_type= INCOMPATIBLE;
1441 bool read_compatible= false;
1442
1443 std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1444 std::vector<Group_member_info*>::iterator all_members_it;
1445 Member_version lowest_version(0xFFFFFF);
1446
1447 for (all_members_it = all_members->begin();
1448 all_members_it != all_members->end(); all_members_it++)
1449 {
1450 if ((*all_members_it)->get_uuid() != local_member_info->get_uuid() &&
1451 (*all_members_it)->get_member_version() < lowest_version)
1452 {
1453 lowest_version = (*all_members_it)->get_member_version();
1454 }
1455 }
1456
1457 for (all_members_it= all_members->begin();
1458 all_members_it!= all_members->end();
1459 all_members_it++)
1460 {
1461 Member_version member_version= (*all_members_it)->get_member_version();
1462 compatibility_type=
1463 compatibility_manager->check_local_incompatibility(member_version,
1464 member_version == lowest_version);
1465
1466 if (compatibility_type == READ_COMPATIBLE)
1467 {
1468 read_compatible= true;
1469 }
1470
1471 if (compatibility_type == INCOMPATIBLE)
1472 {
1473 break;
1474 }
1475
1476 if (compatibility_type == INCOMPATIBLE_LOWER_VERSION)
1477 {
1478 if (get_allow_local_lower_version_join())
1479 {
1480 /*
1481 Despite between these two members the compatibility type
1482 is INCOMPATIBLE_LOWER_VERSION, when compared with others
1483 group members this server may be INCOMPATIBLE, so we need
1484 to test with all group members.
1485 */
1486 override_lower_incompatibility= true;
1487 compatibility_type= COMPATIBLE;
1488 }
1489 else
1490 {
1491 compatibility_type= INCOMPATIBLE;
1492 break;
1493 }
1494 }
1495 }
1496
1497 if (compatibility_type != INCOMPATIBLE && override_lower_incompatibility)
1498 {
1499 log_message(MY_INFORMATION_LEVEL,
1500 "Member version is lower than some group member, but since "
1501 "option 'group_replication_allow_local_lower_version_join' "
1502 "is enabled, member will be allowed to join");
1503 }
1504
1505 if (read_compatible && compatibility_type != INCOMPATIBLE)
1506 {
1507 compatibility_type= READ_COMPATIBLE;
1508 }
1509
1510 //clean the members
1511 for (all_members_it= all_members->begin();
1512 all_members_it!= all_members->end();
1513 all_members_it++)
1514 {
1515 delete (*all_members_it);
1516 }
1517 delete all_members;
1518
1519 return compatibility_type;
1520 }
1521
compare_member_transaction_sets() const1522 int Plugin_gcs_events_handler::compare_member_transaction_sets() const
1523 {
1524 int result= 0;
1525
1526 Sid_map local_sid_map(NULL);
1527 Sid_map group_sid_map(NULL);
1528 Gtid_set local_member_set(&local_sid_map, NULL);
1529 Gtid_set group_set(&group_sid_map, NULL);
1530
1531 std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1532 std::vector<Group_member_info*>::iterator all_members_it;
1533 for (all_members_it= all_members->begin();
1534 all_members_it!= all_members->end();
1535 all_members_it++) {
1536
1537 std::string member_exec_set_str= (*all_members_it)->get_gtid_executed();
1538 std::string applier_ret_set_str= (*all_members_it)->get_gtid_retrieved();
1539 if ((*all_members_it)->get_gcs_member_id() ==
1540 local_member_info->get_gcs_member_id())
1541 {
1542 if (local_member_set.
1543 add_gtid_text(member_exec_set_str.c_str()) != RETURN_STATUS_OK ||
1544 local_member_set.
1545 add_gtid_text(applier_ret_set_str.c_str()) != RETURN_STATUS_OK)
1546 {
1547 /* purecov: begin inspected */
1548 log_message(MY_ERROR_LEVEL,
1549 "Error processing local GTID sets when comparing this member"
1550 " transactions against the group");
1551 result= -1;
1552 goto cleaning;
1553 /* purecov: end */
1554 }
1555 }
1556 else
1557 {
1558 if (group_set.
1559 add_gtid_text(member_exec_set_str.c_str()) != RETURN_STATUS_OK ||
1560 group_set.
1561 add_gtid_text(applier_ret_set_str.c_str()) != RETURN_STATUS_OK)
1562 {
1563 /* purecov: begin inspected */
1564 log_message(MY_ERROR_LEVEL,
1565 "Error processing group GTID sets when comparing this member"
1566 " transactions with the group");
1567 result= -1;
1568 goto cleaning;
1569 /* purecov: end */
1570 }
1571 }
1572
1573 }
1574
1575 /*
1576 Here we only error out if the joiner set is bigger, i.e, if they are equal
1577 no error is returned.
1578 One could argue that if a joiner has the same transaction set as the group
1579 then something is wrong as the group also has transaction associated to
1580 previous view changes.
1581 To reject this cases cause however false negatives when members leave and
1582 quickly rejoin the group or when groups are started by add several nodes at
1583 once.
1584 */
1585 if (!local_member_set.is_subset(&group_set))
1586 {
1587 char *local_gtid_set_buf;
1588 local_member_set.to_string(&local_gtid_set_buf);
1589 char *group_gtid_set_buf;
1590 group_set.to_string(&group_gtid_set_buf);
1591 log_message(MY_ERROR_LEVEL,
1592 "This member has more executed transactions than those present"
1593 " in the group. Local transactions: %s > Group transactions: %s",
1594 local_gtid_set_buf, group_gtid_set_buf);
1595 my_free(local_gtid_set_buf);
1596 my_free(group_gtid_set_buf);
1597 result= 1;
1598 }
1599
1600 cleaning:
1601
1602 //clean the members
1603 for (all_members_it= all_members->begin();
1604 all_members_it!= all_members->end();
1605 all_members_it++)
1606 {
1607 delete (*all_members_it);
1608 }
1609 delete all_members;
1610
1611 return result;
1612 }
1613
1614 void Plugin_gcs_events_handler::
collect_members_executed_sets(const vector<Gcs_member_identifier> & joining_members,View_change_packet * view_packet) const1615 collect_members_executed_sets(const vector<Gcs_member_identifier> &joining_members,
1616 View_change_packet *view_packet) const
1617 {
1618 std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1619 std::vector<Group_member_info*>::iterator all_members_it;
1620 for (all_members_it= all_members->begin();
1621 all_members_it!= all_members->end();
1622 all_members_it++)
1623 {
1624
1625 // Joining/Recovering members don't have valid GTID executed information
1626 if ((*all_members_it)->get_recovery_status() ==
1627 Group_member_info::MEMBER_IN_RECOVERY)
1628 {
1629 continue;
1630 }
1631
1632 std::string exec_set_str= (*all_members_it)->get_gtid_executed();
1633 view_packet->group_executed_set.push_back(exec_set_str);
1634 }
1635
1636 //clean the members
1637 for (all_members_it= all_members->begin();
1638 all_members_it!= all_members->end();
1639 all_members_it++)
1640 {
1641 delete (*all_members_it);
1642 }
1643 delete all_members;
1644 }
1645
1646 int
compare_member_option_compatibility() const1647 Plugin_gcs_events_handler::compare_member_option_compatibility() const
1648 {
1649 int result= 0;
1650
1651 std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1652 std::vector<Group_member_info*>::iterator all_members_it;
1653 for (all_members_it= all_members->begin();
1654 all_members_it!= all_members->end();
1655 all_members_it++)
1656 {
1657 if (local_member_info->get_gtid_assignment_block_size() !=
1658 (*all_members_it)->get_gtid_assignment_block_size())
1659 {
1660 result= 1;
1661 log_message(MY_ERROR_LEVEL,
1662 "The member is configured with a "
1663 "group_replication_gtid_assignment_block_size option "
1664 "value '%llu' different from the group '%llu'. "
1665 "The member will now exit the group.",
1666 local_member_info->get_gtid_assignment_block_size(),
1667 (*all_members_it)->get_gtid_assignment_block_size());
1668 goto cleaning;
1669 }
1670
1671 if (local_member_info->get_write_set_extraction_algorithm() !=
1672 (*all_members_it)->get_write_set_extraction_algorithm())
1673 {
1674 result= 1;
1675 log_message(MY_ERROR_LEVEL,
1676 "The member is configured with a "
1677 "transaction-write-set-extraction option "
1678 "value '%s' different from the group '%s'. "
1679 "The member will now exit the group.",
1680 get_write_set_algorithm_string(
1681 local_member_info->get_write_set_extraction_algorithm()),
1682 get_write_set_algorithm_string(
1683 (*all_members_it)->get_write_set_extraction_algorithm()));
1684 goto cleaning;
1685 }
1686
1687 if (local_member_info->get_configuration_flags() !=
1688 (*all_members_it)->get_configuration_flags())
1689 {
1690 const uint32 member_configuration_flags = (*all_members_it)->get_configuration_flags();
1691 const uint32 local_configuration_flags = local_member_info->get_configuration_flags();
1692
1693 result= 1;
1694 log_message(MY_ERROR_LEVEL,
1695 "The member configuration is not compatible with "
1696 "the group configuration. Variables such as "
1697 "single_primary_mode or enforce_update_everywhere_checks "
1698 "must have the same value on every server in the group. "
1699 "(member configuration option: [%s], group configuration "
1700 "option: [%s]).",
1701 Group_member_info::get_configuration_flags_string(local_configuration_flags).c_str(),
1702 Group_member_info::get_configuration_flags_string(member_configuration_flags).c_str());
1703 goto cleaning;
1704 }
1705
1706 if ((*all_members_it)->get_lower_case_table_names() !=
1707 DEFAULT_NOT_RECEIVED_LOWER_CASE_TABLE_NAMES &&
1708 local_member_info->get_lower_case_table_names() !=
1709 (*all_members_it)->get_lower_case_table_names())
1710 {
1711 result= 1;
1712 log_message(MY_ERROR_LEVEL,
1713 "The member is configured with a lower_case_table_names "
1714 "option value '%lu' different from the group '%lu'. "
1715 "The member will now exit the group. If there is existing "
1716 "data on member, it may be incompatible with group if "
1717 "created with a lower_case_table_names value different from "
1718 "the group.",
1719 local_member_info->get_lower_case_table_names(),
1720 (*all_members_it)->get_lower_case_table_names());
1721 goto cleaning;
1722 }
1723 }
1724
1725 cleaning:
1726 for (all_members_it= all_members->begin();
1727 all_members_it!= all_members->end();
1728 all_members_it++)
1729 delete (*all_members_it);
1730 delete all_members;
1731
1732 return result;
1733 }
1734
1735 void
leave_group_on_error() const1736 Plugin_gcs_events_handler::leave_group_on_error() const
1737 {
1738 Gcs_operations::enum_leave_state state= gcs_module->leave();
1739 int error= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
1740 stop_wait_timeout);
1741 if (error)
1742 {
1743 log_message(MY_ERROR_LEVEL,
1744 "Error stopping all replication channels while server was"
1745 " leaving the group. Please check the error log for additional"
1746 " details. Got error: %d", error);
1747 }
1748
1749 std::stringstream ss;
1750 plugin_log_level log_severity= MY_WARNING_LEVEL;
1751 switch (state)
1752 {
1753 case Gcs_operations::ERROR_WHEN_LEAVING:
1754 /* purecov: begin inspected */
1755 ss << "Unable to confirm whether the server has left the group or not. "
1756 "Check performance_schema.replication_group_members to check group membership information.";
1757 log_severity= MY_ERROR_LEVEL;
1758 break;
1759 /* purecov: end */
1760 case Gcs_operations::ALREADY_LEAVING:
1761 /* purecov: begin inspected */
1762 ss << "Skipping leave operation: concurrent attempt to leave the group is on-going.";
1763 break;
1764 /* purecov: end */
1765 case Gcs_operations::ALREADY_LEFT:
1766 /* purecov: begin inspected */
1767 ss << "Skipping leave operation: member already left the group.";
1768 break;
1769 /* purecov: end */
1770 case Gcs_operations::NOW_LEAVING:
1771 return;
1772 }
1773 log_message(log_severity, ss.str().c_str()); /* purecov: inspected */
1774 }
1775