1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include <algorithm>
24 #include <string>
25 #include <sstream>
26 #include <vector>
27 
28 #include "gcs_event_handlers.h"
29 #include "plugin.h"
30 #include "pipeline_stats.h"
31 #include "single_primary_message.h"
32 
33 using std::vector;
34 
35 
36 Plugin_gcs_events_handler::
Plugin_gcs_events_handler(Applier_module_interface * applier_module,Recovery_module * recovery_module,Plugin_gcs_view_modification_notifier * vc_notifier,Compatibility_module * compatibility_module,ulong components_stop_timeout)37 Plugin_gcs_events_handler(Applier_module_interface* applier_module,
38                           Recovery_module* recovery_module,
39                           Plugin_gcs_view_modification_notifier* vc_notifier,
40                           Compatibility_module* compatibility_module,
41                           ulong components_stop_timeout)
42 : applier_module(applier_module), recovery_module(recovery_module),
43   view_change_notifier(vc_notifier),
44   compatibility_manager(compatibility_module),
45   stop_wait_timeout(components_stop_timeout)
46 {
47   this->temporary_states= new std::set<Group_member_info*,
48                                        Group_member_info_pointer_comparator>();
49   this->joiner_compatibility_status= new st_compatibility_types(INCOMPATIBLE);
50 
51 #ifndef NDEBUG
52     set_number_of_members_on_view_changed_to_10= false;
53     DBUG_EXECUTE_IF("group_replication_set_number_of_members_on_view_changed_to_10",
54                     { set_number_of_members_on_view_changed_to_10= true; };);
55 #endif
56 }
57 
~Plugin_gcs_events_handler()58 Plugin_gcs_events_handler::~Plugin_gcs_events_handler()
59 {
60   delete temporary_states;
61   delete joiner_compatibility_status;
62 }
63 
64 void
on_message_received(const Gcs_message & message) const65 Plugin_gcs_events_handler::on_message_received(const Gcs_message& message) const
66 {
67   Plugin_gcs_message::enum_cargo_type message_type=
68       Plugin_gcs_message::get_cargo_type(
69           message.get_message_data().get_payload());
70 
71   switch (message_type)
72   {
73   case Plugin_gcs_message::CT_TRANSACTION_MESSAGE:
74     handle_transactional_message(message);
75     break;
76 
77   case Plugin_gcs_message::CT_CERTIFICATION_MESSAGE:
78     handle_certifier_message(message);
79     break;
80 
81   case Plugin_gcs_message::CT_RECOVERY_MESSAGE:
82     handle_recovery_message(message);
83     break;
84 
85   case Plugin_gcs_message::CT_PIPELINE_STATS_MEMBER_MESSAGE:
86     handle_stats_message(message);
87     break;
88 
89   case Plugin_gcs_message::CT_SINGLE_PRIMARY_MESSAGE:
90     handle_single_primary_message(message);
91     break;
92 
93   default:
94     break; /* purecov: inspected */
95   }
96 }
97 
98 void
99 Plugin_gcs_events_handler::
handle_transactional_message(const Gcs_message & message) const100 handle_transactional_message(const Gcs_message& message) const
101 {
102   if ( (local_member_info->get_recovery_status() == Group_member_info::MEMBER_IN_RECOVERY ||
103         local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE) &&
104         this->applier_module)
105   {
106     const unsigned char* payload_data= NULL;
107     uint64 payload_size= 0;
108     Plugin_gcs_message::get_first_payload_item_raw_data(
109         message.get_message_data().get_payload(),
110         &payload_data, &payload_size);
111 
112     this->applier_module->handle(payload_data, static_cast<ulong>(payload_size));
113   }
114   else
115   {
116     log_message(MY_ERROR_LEVEL,
117                 "Message received while the plugin is not ready,"
118                 " message discarded"); /* purecov: inspected */
119   }
120 }
121 
122 void
handle_certifier_message(const Gcs_message & message) const123 Plugin_gcs_events_handler::handle_certifier_message(const Gcs_message& message) const
124 {
125   if (this->applier_module == NULL)
126   {
127     log_message(MY_ERROR_LEVEL,
128                 "Message received without a proper group replication applier"); /* purecov: inspected */
129     return; /* purecov: inspected */
130   }
131 
132   Certifier_interface *certifier=
133       this->applier_module->get_certification_handler()->get_certifier();
134 
135   const unsigned char* payload_data= NULL;
136   uint64 payload_size= 0;
137   Plugin_gcs_message::get_first_payload_item_raw_data(
138       message.get_message_data().get_payload(),
139       &payload_data, &payload_size);
140 
141   if (certifier->handle_certifier_data(payload_data,
142                                        static_cast<ulong>(payload_size),
143                                        message.get_origin()))
144   {
145     log_message(MY_ERROR_LEVEL, "Error processing message in Certifier"); /* purecov: inspected */
146   }
147 }
148 
149 void
handle_recovery_message(const Gcs_message & message) const150 Plugin_gcs_events_handler::handle_recovery_message(const Gcs_message& message) const
151 {
152   Recovery_message recovery_message(message.get_message_data().get_payload(),
153                                     message.get_message_data().get_payload_length());
154 
155   std::string member_uuid= recovery_message.get_member_uuid();
156 
157   bool is_local= !member_uuid.compare(local_member_info->get_uuid());
158   if(is_local)
159   {
160     // Only change member status if member is still on recovery.
161     Group_member_info::Group_member_status member_status=
162         local_member_info->get_recovery_status();
163     if (member_status != Group_member_info::MEMBER_IN_RECOVERY)
164     {
165       log_message(MY_INFORMATION_LEVEL,
166                   "This server was not declared online since it is on status %s",
167                   Group_member_info::get_member_status_string(member_status)); /* purecov: inspected */
168       return; /* purecov: inspected */
169     }
170 
171     log_message(MY_INFORMATION_LEVEL,
172                 "This server was declared online within the replication group");
173 
174     /**
175     Disable the read mode in the server if the member is:
176     - joining
177     - doesn't have a higher possible incompatible version
178     - We are not on Primary mode.
179     */
180     if (*joiner_compatibility_status != READ_COMPATIBLE &&
181         (local_member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY ||
182          !local_member_info->in_primary_mode()))
183     {
184       if (disable_server_read_mode(PSESSION_INIT_THREAD))
185       {
186         log_message(MY_WARNING_LEVEL,
187                     "When declaring the plugin online it was not possible to "
188                     "disable the server read mode. "
189                     "Try to disable it manually."); /* purecov: inspected */
190       }
191     }
192 
193     // The member is declared as online upon receiving this message
194     group_member_mgr->update_member_status(member_uuid,
195                                            Group_member_info::MEMBER_ONLINE);
196   }
197   else
198   {
199     Group_member_info* member_info= group_member_mgr->get_group_member_info(member_uuid);
200     if (member_info != NULL)
201     {
202       log_message(MY_INFORMATION_LEVEL,
203                   "The member with address %s:%u was declared online within the "
204                   "replication group",
205                   member_info->get_hostname().c_str(), member_info->get_port());
206       delete member_info;
207 
208       // The member is declared as online upon receiving this message
209       // We need to run this before running update_recovery_process
210       group_member_mgr->update_member_status(member_uuid,
211                                              Group_member_info::MEMBER_ONLINE);
212 
213       if (local_member_info->get_recovery_status() ==
214               Group_member_info::MEMBER_IN_RECOVERY)
215       {
216         /*
217           Inform recovery of a possible new donor
218         */
219         recovery_module->update_recovery_process(false, false);
220       }
221     }
222   }
223 
224   /*
225    Check if we were waiting for some server to recover to
226    elect a new leader.
227 
228    Following line protects against servers joining the group
229    while the bootstrapped node has not yet finished recovery.
230    Therefore, it is going to become primary when it finishes recovery.
231    */
232   this->handle_leader_election_if_needed();
233 }
234 
235 void
handle_stats_message(const Gcs_message & message) const236 Plugin_gcs_events_handler::handle_stats_message(const Gcs_message& message) const
237 {
238   if (this->applier_module == NULL)
239   {
240     log_message(MY_ERROR_LEVEL,
241                 "Message received without a proper group replication applier"); /* purecov: inspected */
242     return; /* purecov: inspected */
243   }
244 
245   this->applier_module->get_flow_control_module()
246       ->handle_stats_data(message.get_message_data().get_payload(),
247                           message.get_message_data().get_payload_length(),
248                           message.get_origin().get_member_id());
249 }
250 
251 void
handle_single_primary_message(const Gcs_message & message) const252 Plugin_gcs_events_handler::handle_single_primary_message(const Gcs_message& message) const
253 {
254   if (this->applier_module == NULL)
255   {
256     log_message(MY_ERROR_LEVEL,
257                 "Message received without a proper group replication applier"); /* purecov: inspected */
258     return; /* purecov: inspected */
259   }
260 
261   Single_primary_message
262       single_primary_message(message.get_message_data().get_payload(),
263                              message.get_message_data().get_payload_length());
264 
265   if (single_primary_message.get_single_primary_message_type() ==
266       Single_primary_message::SINGLE_PRIMARY_QUEUE_APPLIED_MESSAGE)
267   {
268     Single_primary_action_packet *single_primary_action=
269         new Single_primary_action_packet(Single_primary_action_packet::QUEUE_APPLIED);
270     this->applier_module->add_single_primary_action_packet(single_primary_action);
271   }
272 }
273 
274 void
on_suspicions(const std::vector<Gcs_member_identifier> & members,const std::vector<Gcs_member_identifier> & unreachable) const275 Plugin_gcs_events_handler::on_suspicions(const std::vector<Gcs_member_identifier>& members,
276                                          const std::vector<Gcs_member_identifier>& unreachable) const
277 {
278   if (members.empty() && unreachable.empty()) // nothing to do
279     return; /* purecov: inspected */
280 
281   assert(members.size() >= unreachable.size());
282 
283   std::vector<Gcs_member_identifier> tmp_unreachable(unreachable);
284   std::vector<Gcs_member_identifier>::const_iterator mit;
285   std::vector<Gcs_member_identifier>::iterator uit;
286 
287   if (!members.empty())
288   {
289     for (mit= members.begin(); mit != members.end(); mit ++)
290     {
291       Gcs_member_identifier member= *mit;
292       Group_member_info* member_info=
293         group_member_mgr->get_group_member_info_by_member_id(member);
294 
295       if (member_info == NULL) //Trying to update a non-existing member
296         continue; /* purecov: inspected */
297 
298       uit= std::find(tmp_unreachable.begin(), tmp_unreachable.end(), member);
299       if (uit != tmp_unreachable.end())
300       {
301         if (!member_info->is_unreachable())
302           log_message(MY_WARNING_LEVEL,
303                       "Member with address %s:%u has become unreachable.",
304                       member_info->get_hostname().c_str(), member_info->get_port());
305 
306         group_member_mgr->set_member_unreachable(member_info->get_uuid());
307 
308         // remove to not check again against this one
309         tmp_unreachable.erase(uit);
310       }
311       else
312       {
313         if (member_info->is_unreachable())
314           log_message(MY_WARNING_LEVEL,
315                       "Member with address %s:%u is reachable again.",
316                       member_info->get_hostname().c_str(), member_info->get_port());
317 
318         group_member_mgr->set_member_reachable(member_info->get_uuid());
319       }
320 
321       delete member_info;
322     }
323   }
324 
325   if ((members.size() - unreachable.size()) <= (members.size() / 2))
326   {
327     if (!group_partition_handler->get_timeout_on_unreachable())
328       log_message(MY_ERROR_LEVEL,
329                   "This server is not able to reach a majority of members "
330                   "in the group. This server will now block all updates. "
331                   "The server will remain blocked until contact with the "
332                   "majority is restored. "
333                   "It is possible to use group_replication_force_members "
334                   "to force a new group membership.");
335     else
336       log_message(MY_ERROR_LEVEL,
337                   "This server is not able to reach a majority of members "
338                   "in the group. This server will now block all updates. "
339                   "The server will remain blocked for the next %lu seconds. "
340                   "Unless contact with the majority is restored, after this "
341                   "time the member will error out and leave the group. "
342                   "It is possible to use group_replication_force_members "
343                   "to force a new group membership.",
344                   group_partition_handler->get_timeout_on_unreachable());
345 
346     if (!group_partition_handler->is_partition_handler_running() &&
347         !group_partition_handler->is_partition_handling_terminated())
348       group_partition_handler->launch_partition_handler_thread();
349   }
350   else
351   {
352     /*
353       This code is present on on_view_changed and on_suspicions as no assumption
354       can be made about the order in which these methods are invoked.
355     */
356     if (group_partition_handler->is_member_on_partition())
357     {
358       if (group_partition_handler->abort_partition_handler_if_running())
359       {
360         log_message(MY_WARNING_LEVEL,
361                     "A group membership change was received but the plugin is "
362                     "already leaving due to the configured timeout on "
363                     "group_replication_unreachable_majority_timeout option.");
364       }
365       else
366       {
367         /* If it was not running or we canceled it in time */
368         log_message(MY_WARNING_LEVEL,
369                     "The member has resumed contact with a majority of the "
370                     "members in the group. Regular operation is restored and "
371                     "transactions are unblocked.");
372       }
373     }
374   }
375 }
376 
377 void
log_members_leaving_message(const Gcs_view & new_view) const378 Plugin_gcs_events_handler::log_members_leaving_message(const Gcs_view& new_view) const
379 {
380   std::string members_leaving;
381   std::string primary_member_host;
382 
383   get_hosts_from_view(new_view.get_leaving_members(), members_leaving, primary_member_host);
384 
385   log_message(MY_WARNING_LEVEL,
386               "Members removed from the group: %s",
387               members_leaving.c_str());
388 
389   if (!primary_member_host.empty())
390     log_message(MY_INFORMATION_LEVEL,
391                 "Primary server with address %s left the group. "
392                 "Electing new Primary.",
393                 primary_member_host.c_str());
394 }
395 
396 void
log_members_joining_message(const Gcs_view & new_view) const397 Plugin_gcs_events_handler::log_members_joining_message(const Gcs_view& new_view) const
398 {
399   std::string members_joining;
400   std::string primary_member_host;
401 
402   get_hosts_from_view(new_view.get_joined_members(), members_joining, primary_member_host);
403 
404   log_message(MY_INFORMATION_LEVEL,
405               "Members joined the group: %s",
406               members_joining.c_str());
407 }
408 
409 void
get_hosts_from_view(const std::vector<Gcs_member_identifier> & members,std::string & all_hosts,std::string & primary_host) const410 Plugin_gcs_events_handler::get_hosts_from_view(const std::vector<Gcs_member_identifier> &members,
411                          std::string& all_hosts, std::string& primary_host) const
412 {
413   std::stringstream hosts_string;
414   std::stringstream primary_string;
415   std::vector<Gcs_member_identifier>::const_iterator all_members_it= members.begin();
416 
417   while (all_members_it != members.end())
418   {
419     Group_member_info* member_info= group_member_mgr->
420                                      get_group_member_info_by_member_id((*all_members_it));
421     all_members_it++;
422 
423     if (member_info == NULL)
424       continue;
425 
426     hosts_string << member_info->get_hostname() << ":" << member_info->get_port();
427 
428     /**
429      Check in_primary_mode has been added for safety.
430      Since primary role is in single-primary mode.
431     */
432     if (member_info->in_primary_mode() &&
433         member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY)
434     {
435       if (primary_string.rdbuf()->in_avail() != 0)
436         primary_string << ", ";
437       primary_string << member_info->get_hostname() << ":" << member_info->get_port();
438     }
439 
440     if (all_members_it != members.end())
441     {
442       hosts_string << ", ";
443     }
444 
445     delete member_info;
446   }
447   all_hosts.assign (hosts_string.str());
448   primary_host.assign (primary_string.str());
449 }
450 
451 void
on_view_changed(const Gcs_view & new_view,const Exchanged_data & exchanged_data) const452 Plugin_gcs_events_handler::on_view_changed(const Gcs_view& new_view,
453                                            const Exchanged_data &exchanged_data)
454                                            const
455 {
456   bool is_leaving= is_member_on_vector(new_view.get_leaving_members(),
457                                        local_member_info->get_gcs_member_id());
458 
459   bool is_joining= is_member_on_vector(new_view.get_joined_members(),
460                                        local_member_info->get_gcs_member_id());
461 
462   // Was member expelled from the group due to network failures?
463   if (this->was_member_expelled_from_group(new_view))
464     return;
465 
466   //An early error on the applier can render the join invalid
467   if (is_joining &&
468       local_member_info->get_recovery_status() == Group_member_info::MEMBER_ERROR)
469   {
470     log_message(MY_ERROR_LEVEL,
471                 "There was a previous plugin error while the member joined the group. "
472                 "The member will now exit the group.");
473     view_change_notifier->cancel_view_modification(GROUP_REPLICATION_CONFIGURATION_ERROR);
474     return;
475   }
476 
477   /*
478     This code is present on on_view_changed and on_suspicions as no assumption
479     can be made about the order in which these methods are invoked.
480   */
481   if (!is_leaving && group_partition_handler->is_member_on_partition())
482   {
483     if (group_partition_handler->abort_partition_handler_if_running())
484     {
485       log_message(MY_WARNING_LEVEL,
486                   "A group membership change was received but the plugin is "
487                   "already leaving due to the configured timeout on "
488                   "group_replication_unreachable_majority_timeout option.");
489       return;
490     }
491     else
492     {
493       /* If it was not running or we canceled it in time */
494       log_message(MY_WARNING_LEVEL,
495                   "The member has resumed contact with a majority of the "
496                   "members in the group. Regular operation is restored and "
497                   "transactions are unblocked.");
498     }
499   }
500 
501   /*
502     Maybe on_suspicions we already executed the above block but it was too late.
503     No point in repeating the message, but we need to break the view install.
504   */
505   if (!is_leaving && group_partition_handler->is_partition_handling_terminated())
506     return;
507 
508   if (!is_leaving && new_view.get_leaving_members().size() > 0)
509     log_members_leaving_message(new_view);
510 
511   //update the Group Manager with all the received states
512   if (update_group_info_manager(new_view, exchanged_data, is_joining, is_leaving) &&
513       is_joining)
514   {
515     view_change_notifier->cancel_view_modification();
516     return;
517   }
518 
519   if (!is_joining && new_view.get_joined_members().size() > 0)
520     log_members_joining_message(new_view);
521 
522   //enable conflict detection if someone on group have it enabled
523   if (local_member_info->in_primary_mode() &&
524       group_member_mgr->is_conflict_detection_enabled())
525   {
526     Certifier_interface *certifier=
527         this->applier_module->get_certification_handler()->get_certifier();
528     certifier->enable_conflict_detection();
529   }
530 
531   //Inform any interested handler that the view changed
532   View_change_pipeline_action *vc_action=
533     new View_change_pipeline_action(is_leaving);
534 
535   applier_module->handle_pipeline_action(vc_action);
536   delete vc_action;
537 
538   //Update any recovery running process and handle state changes
539   this->handle_leaving_members(new_view, is_joining, is_leaving);
540 
541   //Handle joining members
542   this->handle_joining_members(new_view, is_joining, is_leaving);
543 
544   if (is_leaving)
545     gcs_module->leave_coordination_member_left();
546 
547   // Handle leader election if needed
548   this->handle_leader_election_if_needed();
549 
550   //Signal that the injected view was delivered
551   if (view_change_notifier->is_injected_view_modification())
552     view_change_notifier->end_view_modification();
553 
554   if (!is_leaving)
555   {
556     log_message(MY_INFORMATION_LEVEL,
557                 "Group membership changed to %s on view %s.",
558                 group_member_mgr->get_string_current_view_active_hosts().c_str(),
559                 new_view.get_view_id().get_representation().c_str());
560   }
561   else
562   {
563     log_message(MY_INFORMATION_LEVEL,
564                 "Group membership changed: This member has left the group.");
565   }
566 }
567 
568 bool
was_member_expelled_from_group(const Gcs_view & view) const569 Plugin_gcs_events_handler::was_member_expelled_from_group(const Gcs_view& view) const
570 {
571   DBUG_ENTER("Plugin_gcs_events_handler::was_member_expelled_from_group");
572   bool result= false;
573 
574   if (view.get_error_code() == Gcs_view::MEMBER_EXPELLED)
575   {
576     result= true;
577     log_message(MY_ERROR_LEVEL,
578                 "Member was expelled from the group due to network failures, "
579                 "changing member status to ERROR.");
580 
581     // Delete all members from group info except the local one.
582     std::vector<Group_member_info*> to_update;
583     group_member_mgr->update(&to_update);
584     group_member_mgr->update_member_status(local_member_info->get_uuid(),
585                                            Group_member_info::MEMBER_ERROR);
586     group_member_mgr->update_member_role(local_member_info->get_uuid(),
587                                          Group_member_info::MEMBER_ROLE_SECONDARY);
588 
589     bool aborted= false;
590     applier_module->add_suspension_packet();
591     int error= applier_module->wait_for_applier_complete_suspension(&aborted, false);
592     /*
593       We do not need to kill ongoing transactions when the applier
594       is already stopping.
595     */
596     if (!error)
597       applier_module->kill_pending_transactions(true, true);
598   }
599 
600   DBUG_RETURN(result);
601 }
602 
603 std::vector<Group_member_info*>::iterator
sort_and_get_lowest_version_member_position(std::vector<Group_member_info * > * all_members_info) const604 Plugin_gcs_events_handler::sort_and_get_lowest_version_member_position(
605   std::vector<Group_member_info*>* all_members_info) const
606 {
607   std::vector<Group_member_info*>::iterator it;
608 
609   // sort in ascending order of lower member version
610   std::sort(all_members_info->begin(), all_members_info->end(),
611             Group_member_info::comparator_group_member_version);
612 
613   /* if vector contains only single version then leader should be picked from
614      all members
615    */
616   std::vector<Group_member_info*>::iterator lowest_version_end=
617     all_members_info->end();
618 
619   /* first member will have lowest version as members are already
620      sorted above using member_version.
621    */
622   it= all_members_info->begin();
623   Group_member_info* first_member= *it;
624   uint32 lowest_major_version=
625     first_member->get_member_version().get_major_version();
626 
627   /* to avoid read compatibility issue leader should be picked only from lowest
628      version members so save position where member version differs.
629 
630      set lowest_version_end when major version changes
631 
632      eg: for a list: 5.7.18, 5.7.18, 5.7.19, 5.7.20, 5.7.21, 8.0.2
633          the members to be considered for election will be:
634             5.7.18, 5.7.18, 5.7.19, 5.7.20, 5.7.21
635          and server_uuid based algorithm will be used to elect primary
636 
637      eg: for a list: 5.7.20, 5.7.21, 8.0.2, 8.0.2
638          the members to be considered for election will be:
639             5.7.20, 5.7.21
640          and member weight based algorithm will be used to elect primary
641   */
642   for(it= all_members_info->begin() + 1; it != all_members_info->end(); it++)
643   {
644     if (lowest_major_version != (*it)->get_member_version().get_major_version())
645     {
646       lowest_version_end= it;
647       break;
648     }
649   }
650 
651   return lowest_version_end;
652 }
653 
sort_members_for_election(std::vector<Group_member_info * > * all_members_info,std::vector<Group_member_info * >::iterator lowest_version_end) const654 void Plugin_gcs_events_handler::sort_members_for_election(
655        std::vector<Group_member_info*>* all_members_info,
656        std::vector<Group_member_info*>::iterator lowest_version_end) const
657 {
658   Group_member_info* first_member= *(all_members_info->begin());
659   Member_version lowest_version= first_member->get_member_version();
660 
661   // sort only lower version members as they only will be needed to pick leader
662   if (lowest_version >= PRIMARY_ELECTION_MEMBER_WEIGHT_VERSION)
663     std::sort(all_members_info->begin(), lowest_version_end,
664               Group_member_info::comparator_group_member_weight);
665   else
666     std::sort(all_members_info->begin(), lowest_version_end,
667               Group_member_info::comparator_group_member_uuid);
668 }
669 
handle_leader_election_if_needed() const670 void Plugin_gcs_events_handler::handle_leader_election_if_needed() const
671 {
672   // take action if in single leader mode
673   if (!local_member_info->in_primary_mode())
674     return;
675 
676   bool am_i_leaving= true;
677 #ifndef NDEBUG
678   int n=0;
679 #endif
680   Group_member_info* the_primary= NULL;
681   std::vector<Group_member_info*>* all_members_info=
682     group_member_mgr->get_all_members();
683 
684   std::vector<Group_member_info*>::iterator it;
685   std::vector<Group_member_info*>::iterator lowest_version_end;
686 
687   /* sort members based on member_version and get first iterator position
688      where member version differs
689    */
690   lowest_version_end=
691     sort_and_get_lowest_version_member_position(all_members_info);
692 
693   /*  Sort lower version members based on member weight if member version
694       is greater than equal to PRIMARY_ELECTION_MEMBER_WEIGHT_VERSION or uuid.
695    */
696   sort_members_for_election(all_members_info, lowest_version_end);
697 
698   /*
699    1. Iterate over the list of all members and check if there is a primary
700       defined already.
701    2. Check if I am leaving the group or not.
702    */
703   for(it= all_members_info->begin(); it != all_members_info->end(); it++)
704   {
705 #ifndef NDEBUG
706     assert(!(n > 1));
707 #endif
708 
709     Group_member_info* member= *it;
710     if (the_primary == NULL &&
711         member->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY)
712     {
713       the_primary= member;
714 #ifndef NDEBUG
715       n++;
716 #endif
717     }
718 
719     /* Found the primary and it is me. Check that I am not offline. */
720     if (!member->get_uuid().compare(local_member_info->get_uuid()))
721     {
722       am_i_leaving= member->get_recovery_status() == Group_member_info::MEMBER_OFFLINE;
723     }
724   }
725 
726   /* If I am not leaving, then run election. Otherwise do nothing. */
727   if (!am_i_leaving)
728   {
729     Sql_service_command_interface *sql_command_interface=
730         new Sql_service_command_interface();
731     bool skip_set_super_readonly= false;
732     if (sql_command_interface == NULL ||
733         sql_command_interface->
734             establish_session_connection(PSESSION_INIT_THREAD,
735                                          get_plugin_pointer()) ||
736         sql_command_interface->set_interface_user(GROUPREPL_USER))
737     {
738       log_message(MY_WARNING_LEVEL,
739                   "Unable to open session to (re)set read only mode. Skipping."); /* purecov: inspected */
740       /*
741        Unable to open session to (re)set read only mode.
742        Mark that we should skipping that part code.
743        */
744       skip_set_super_readonly= true; /* purecov: inspected */
745     }
746 
747 
748     /*
749      There is no primary in the member list. Pick one from
750      the list of ONLINE members. The picked one is the first
751      viable on in the list that was sorted at the beginning
752      of this function.
753 
754      The assumption is that std::sort(...) is deterministic
755      on all members.
756 
757      To pick leaders from only lowest version members loop
758      till lowest_version_end.
759     */
760     if (the_primary == NULL)
761     {
762       for (it= all_members_info->begin();
763            it != lowest_version_end && the_primary == NULL;
764            it++)
765       {
766         Group_member_info* mi= *it;
767 
768         assert(mi);
769         if (mi &&
770             mi->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
771           the_primary= mi;
772       }
773     }
774 
775     // take actions on the primary
776     if (the_primary != NULL)
777     {
778       std::string primary_uuid= the_primary->get_uuid();
779       const bool is_primary_local= !primary_uuid.compare(local_member_info->get_uuid());
780       const bool has_primary_changed=
781           Group_member_info::MEMBER_ROLE_PRIMARY != the_primary->get_role();
782 
783       if (has_primary_changed)
784       {
785         /*
786           A new primary was elected, inform certifier to enable conflict
787           detection until the new primary apply all relay logs.
788         */
789         Single_primary_action_packet *single_primary_action=
790             new Single_primary_action_packet(Single_primary_action_packet::NEW_PRIMARY);
791         applier_module->add_single_primary_action_packet(single_primary_action);
792 
793         // declare this as the new primary
794         group_member_mgr->update_member_role(primary_uuid,
795                                              Group_member_info::MEMBER_ROLE_PRIMARY);
796 
797         log_message(MY_INFORMATION_LEVEL, "A new primary with address %s:%u "
798                     "was elected, enabling conflict detection until the new "
799                     "primary applies all relay logs.",
800                     the_primary->get_hostname().c_str(),
801                     the_primary->get_port());
802 
803         // Check if the session was established, it can (re)set read only mode.
804         if (!skip_set_super_readonly)
805         {
806           if (is_primary_local)
807           {
808             if (disable_super_read_only_mode(sql_command_interface))
809             {
810               log_message(MY_WARNING_LEVEL,
811                           "Unable to disable super read only flag. "
812                           "Try to disable it manually."); /* purecov: inspected */
813             }
814           }
815           else
816           {
817             if (enable_super_read_only_mode(sql_command_interface))
818             {
819               log_message(MY_WARNING_LEVEL,
820                           "Unable to set super read only flag. "
821                           "Try to set it manually."); /* purecov: inspected */
822             }
823           }
824         }
825         /* code position limits messaging to primary change */
826         if (is_primary_local)
827           log_message(MY_INFORMATION_LEVEL,
828                       "This server is working as primary member.");
829         else
830           log_message(MY_INFORMATION_LEVEL,
831                       "This server is working as secondary member with primary "
832                       "member address %s:%u.",
833                       the_primary->get_hostname().c_str(),
834                       the_primary->get_port());
835       }
836     }
837     else if (!skip_set_super_readonly)
838     {
839       /*
840        If there is only one server in the group, no need to pollute the error log with
841        an entry about no suitable candidate while (quick) recovery is running for the first member.
842       */
843       if (all_members_info->size() != 1)
844       {
845         // There are no servers in the group or they are all
846         // recoverying WARN to the user
847         log_message(MY_WARNING_LEVEL,
848                     "Unable to set any member as primary. No suitable candidate."); /* purecov: inspected */
849       }
850 
851       if(enable_super_read_only_mode(sql_command_interface))
852       {
853         log_message(MY_WARNING_LEVEL,
854                     "Unable to set super read only flag. "
855                     "Try to set it manually."); /* purecov: inspected */
856       }
857     }
858     delete sql_command_interface;
859   }
860 
861   //clean the members
862   for (it= all_members_info->begin(); it!= all_members_info->end(); it++)
863   {
864     delete (*it);
865   }
866   delete all_members_info;
867 }
868 
869 int Plugin_gcs_events_handler::
update_group_info_manager(const Gcs_view & new_view,const Exchanged_data & exchanged_data,bool is_joining,bool is_leaving) const870 update_group_info_manager(const Gcs_view& new_view,
871                           const Exchanged_data &exchanged_data,
872                           bool is_joining,
873                           bool is_leaving) const
874 {
875   int error= 0;
876 
877   //update the Group Manager with all the received states
878   vector<Group_member_info*> to_update;
879 
880   if(!is_leaving)
881   {
882     //Process local state of exchanged data.
883     if ((error= process_local_exchanged_data(exchanged_data, is_joining)))
884       goto err;
885 
886     to_update.insert(to_update.end(),
887                      temporary_states->begin(),
888                      temporary_states->end());
889 
890     //Clean-up members that are leaving
891     vector<Gcs_member_identifier> leaving= new_view.get_leaving_members();
892     vector<Gcs_member_identifier>::iterator left_it;
893     vector<Group_member_info*>::iterator to_update_it;
894     for(left_it= leaving.begin(); left_it != leaving.end(); left_it++)
895     {
896       for(to_update_it= to_update.begin();
897           to_update_it != to_update.end();
898           to_update_it++)
899       {
900         if( (*left_it) == (*to_update_it)->get_gcs_member_id() )
901         {
902           /* purecov: begin inspected */
903           delete (*to_update_it);
904           to_update.erase(to_update_it);
905           break;
906           /* purecov: end */
907         }
908       }
909     }
910   }
911   group_member_mgr->update(&to_update);
912   temporary_states->clear();
913 
914 err:
915   assert(temporary_states->size() == 0);
916   return error;
917 }
918 
handle_joining_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const919 void Plugin_gcs_events_handler::handle_joining_members(const Gcs_view& new_view,
920                                                        bool is_joining,
921                                                        bool is_leaving)
922                                                        const
923 {
924   //nothing to do here
925   size_t number_of_members= new_view.get_members().size();
926   if (number_of_members == 0 || is_leaving)
927   {
928     return;
929   }
930   size_t number_of_joining_members= new_view.get_joined_members().size();
931   size_t number_of_leaving_members= new_view.get_leaving_members().size();
932 
933   /*
934    If we are joining, 3 scenarios exist:
935    1) We are incompatible with the group so we leave
936    2) We are alone so we declare ourselves online
937    3) We are in a group and recovery must happen
938   */
939   if (is_joining)
940   {
941     int error= 0;
942     if ((error= check_group_compatibility(number_of_members)))
943     {
944       view_change_notifier->cancel_view_modification(error);
945       return;
946     }
947     view_change_notifier->end_view_modification();
948 
949     /**
950      On the joining list there can be 2 types of members: online/recovering
951      members coming from old views where this member was not present and new
952      joining members that still have their status as offline.
953 
954      As so, for offline members, their state is changed to member_in_recovery
955      after member compatibility with group is checked.
956     */
957     update_member_status(new_view.get_joined_members(),
958                          Group_member_info::MEMBER_IN_RECOVERY,
959                          Group_member_info::MEMBER_OFFLINE,
960                          Group_member_info::MEMBER_END);
961     /**
962       Set the read mode if not set during start (auto-start)
963     */
964     if (enable_server_read_mode(PSESSION_INIT_THREAD))
965     {
966       log_message(MY_ERROR_LEVEL,
967                   "Error when activating super_read_only mode on start. "
968                   "The member will now exit the group.");
969       group_member_mgr->update_member_status(local_member_info->get_uuid(),
970                                              Group_member_info::MEMBER_ERROR);
971       this->leave_group_on_error();
972       return;
973     }
974 
975     /**
976       On the joining member log an error when group contains more members than
977       auto_increment_increment variable.
978     */
979     ulong auto_increment_increment= get_auto_increment_increment();
980 
981     if (new_view.get_members().size() > auto_increment_increment)
982     {
983       log_message(MY_ERROR_LEVEL,
984                   "Group contains %lu members which is greater than"
985                   " auto_increment_increment value of %lu."
986                   " This can lead to an higher rate of transactional aborts.",
987                   new_view.get_members().size(), auto_increment_increment);
988     }
989 
990     /*
991      During the view change, a suspension packet is sent to the applier module
992      so all posterior transactions inbound are not applied, but queued, until
993      the member finishes recovery.
994     */
995     applier_module->add_suspension_packet();
996 
997     /*
998      Marking the view in the joiner since the incoming event from the donor
999      is discarded in the Recovery process.
1000      */
1001 
1002     std::string view_id= new_view.get_view_id().get_representation();
1003     View_change_packet * view_change_packet= new View_change_packet(view_id);
1004     applier_module->add_view_change_packet(view_change_packet);
1005 
1006     /*
1007      Launch the recovery thread so we can receive missing data and the
1008      certification information needed to apply the transactions queued after
1009      this view change.
1010 
1011      Recovery receives a view id, as a means to identify logically on joiners
1012      and donors alike where this view change happened in the data. With that
1013      info we can then ask for the donor to give the member all the data until
1014      this point in the data, and the certification information for all the data
1015      that comes next.
1016 
1017      When alone, the server will go through Recovery to wait for the consumption
1018      of his applier relay log that may contain transactions from previous
1019      executions.
1020     */
1021     recovery_module->start_recovery(new_view.get_group_id().get_group_id(),
1022                                     new_view.get_view_id()
1023                                                       .get_representation());
1024   }
1025   /*
1026     The condition
1027       number_of_joining_members == 0 && number_of_leaving_members == 0
1028     is needed due to the following scenario:
1029     We have a group with 2 members, one does crash (M2), and the group
1030     blocks with M1 ONLINE and M2 UNREACHABLE.
1031     Then M2 rejoins and the group unblocks.
1032     When M2 rejoins the group, from M2 perspective it is joining
1033     the group, that is, it does receive a view (V3) on which it is
1034     marked as a joining member.
1035     But from M1 perspective, M2 may never left, so the view delivered
1036     (V3) has the same members as V2, that is, M1 and M2, without joining
1037     members, thence we need to consider that condition and log that view.
1038   */
1039   else if (number_of_joining_members > 0 ||
1040            (number_of_joining_members == 0 && number_of_leaving_members == 0))
1041   {
1042     /**
1043      On the joining list there can be 2 types of members: online/recovering
1044      members coming from old views where this member was not present and new
1045      joining members that still have their status as offline.
1046 
1047      As so, for offline members, their state is changed to member_in_recovery.
1048     */
1049     update_member_status(new_view.get_joined_members(),
1050                          Group_member_info::MEMBER_IN_RECOVERY,
1051                          Group_member_info::MEMBER_OFFLINE,
1052                          Group_member_info::MEMBER_END);
1053     /**
1054      If not a joining member, all members should record on their own binlogs a
1055      marking event that identifies the frontier between the data the joining
1056      member was to receive and the data it should queue.
1057      The joining member can then wait for this event to know it was all the
1058      needed data.
1059 
1060      This packet will also pass in the certification process at this exact
1061      frontier giving us the opportunity to gather the necessary certification
1062      information to certify the transactions that will come after this view
1063      change. If selected as a donor, this info will also be sent to the joiner.
1064 
1065      Associated to this process, we collect and intersect the executed GTID sets
1066      of all ONLINE members so we can cut the certification info to gather and
1067      transmit to the minimum.
1068     */
1069 
1070     std::string view_id= new_view.get_view_id().get_representation();
1071     View_change_packet * view_change_packet= new View_change_packet(view_id);
1072     collect_members_executed_sets(new_view.get_joined_members(), view_change_packet);
1073     applier_module->add_view_change_packet(view_change_packet);
1074   }
1075 }
1076 
1077 void
handle_leaving_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const1078 Plugin_gcs_events_handler::handle_leaving_members(const Gcs_view& new_view,
1079                                                   bool is_joining,
1080                                                   bool is_leaving)
1081                                                   const
1082 {
1083   Group_member_info::Group_member_status member_status=
1084       local_member_info->get_recovery_status();
1085 
1086   bool members_left= (new_view.get_leaving_members().size() > 0);
1087 
1088   //if the member is joining or not in recovery, no need to update the process
1089   if (!is_joining && member_status == Group_member_info::MEMBER_IN_RECOVERY)
1090   {
1091     /*
1092      This method has 2 purposes:
1093      If a donor leaves, recovery needs to switch donor
1094      If this member leaves, recovery needs to shutdown.
1095     */
1096     recovery_module->update_recovery_process(members_left, is_leaving);
1097   }
1098 
1099   if (members_left)
1100   {
1101     update_member_status(new_view.get_leaving_members(),
1102                          Group_member_info::MEMBER_OFFLINE,
1103                          Group_member_info::MEMBER_END,
1104                          Group_member_info::MEMBER_ERROR);
1105   }
1106 
1107   if (is_leaving)
1108   {
1109     view_change_notifier->end_view_modification();
1110   }
1111 }
1112 
1113 bool
1114 Plugin_gcs_events_handler::
is_member_on_vector(const vector<Gcs_member_identifier> & members,const Gcs_member_identifier & member_id) const1115 is_member_on_vector(const vector<Gcs_member_identifier>& members,
1116                     const Gcs_member_identifier& member_id)
1117                     const
1118 {
1119   vector<Gcs_member_identifier>::const_iterator it;
1120 
1121   it= std::find(members.begin(), members.end(), member_id);
1122 
1123   return it != members.end();
1124 }
1125 
1126 int
1127 Plugin_gcs_events_handler::
process_local_exchanged_data(const Exchanged_data & exchanged_data,bool is_joining) const1128 process_local_exchanged_data(const Exchanged_data &exchanged_data,
1129                              bool is_joining)
1130                              const
1131 {
1132   uint local_uuid_found= 0;
1133 
1134   /*
1135   For now, we are only carrying Group Member Info on Exchangeable data
1136   Since we are receiving the state from all Group members, one shall
1137   store it in a set to ensure that we don't have repetitions.
1138 
1139   All collected data will be given to Group Member Manager at view install
1140   time.
1141   */
1142   for (Exchanged_data::const_iterator exchanged_data_it= exchanged_data.begin();
1143        exchanged_data_it != exchanged_data.end();
1144        exchanged_data_it++)
1145   {
1146     const uchar* data= exchanged_data_it->second->get_payload();
1147     uint64 length= exchanged_data_it->second->get_payload_length();
1148     Gcs_member_identifier* member_id= exchanged_data_it->first;
1149     if (data == NULL)
1150     {
1151       /* purecov: begin inspected */
1152       Group_member_info * member_info= group_member_mgr->get_group_member_info_by_member_id(*member_id);
1153       if (member_info != NULL)
1154       {
1155         log_message(MY_ERROR_LEVEL, "Member with address '%s:%u' didn't provide any data"
1156                                     " during the last group change. Group"
1157                                     " information can be outdated and lead to"
1158                                     " errors on recovery",
1159                                     member_info->get_hostname().c_str(), member_info->get_port());
1160         delete member_info;
1161       }
1162       continue;
1163       /* purecov: end */
1164     }
1165 
1166     //Process data provided by member.
1167     vector<Group_member_info*>* member_infos=
1168         group_member_mgr->decode(data, length);
1169 
1170     //This construct is here in order to deallocate memory of duplicates
1171     vector<Group_member_info*>::iterator member_infos_it;
1172     for(member_infos_it= member_infos->begin();
1173         member_infos_it != member_infos->end();
1174         member_infos_it++)
1175     {
1176       if (local_member_info->get_uuid() == (*member_infos_it)->get_uuid())
1177       {
1178         local_uuid_found++;
1179       }
1180 
1181       /*
1182         Accept only the information the member has about himself
1183         Information received about other members is probably outdated
1184       */
1185       if (local_uuid_found < 2 &&
1186           (*member_infos_it)->get_gcs_member_id() == *member_id)
1187       {
1188         this->temporary_states->insert((*member_infos_it));
1189       }
1190       else
1191       {
1192         delete (*member_infos_it); /* purecov: inspected */
1193       }
1194     }
1195 
1196     member_infos->clear();
1197     delete member_infos;
1198 
1199     if (local_uuid_found > 1)
1200     {
1201       if (is_joining)
1202       {
1203         log_message(MY_ERROR_LEVEL,
1204                     "There is already a member with server_uuid %s. "
1205                     "The member will now exit the group.",
1206                     local_member_info->get_uuid().c_str());
1207       }
1208 
1209       // Clean up temporary states.
1210       std::set<Group_member_info*,Group_member_info_pointer_comparator>::iterator
1211           temporary_states_it;
1212       for (temporary_states_it= temporary_states->begin();
1213            temporary_states_it != temporary_states->end();
1214            temporary_states_it++)
1215       {
1216         delete (*temporary_states_it);
1217       }
1218       temporary_states->clear();
1219 
1220       return 1;
1221     }
1222   }
1223 
1224   return 0;
1225 }
1226 
1227 Gcs_message_data*
get_exchangeable_data() const1228 Plugin_gcs_events_handler::get_exchangeable_data() const
1229 {
1230   std::string server_executed_gtids;
1231   std::string applier_retrieved_gtids;
1232   Replication_thread_api applier_channel("group_replication_applier");
1233 
1234   Sql_service_command_interface *sql_command_interface=
1235       new Sql_service_command_interface();
1236 
1237   if (sql_command_interface->
1238           establish_session_connection(PSESSION_INIT_THREAD,
1239                                        get_plugin_pointer()) ||
1240       sql_command_interface->set_interface_user(GROUPREPL_USER)
1241      )
1242   {
1243     log_message(MY_WARNING_LEVEL,
1244                 "Error when extracting information for group change. "
1245                 "Operations and checks made to group joiners may be incomplete"); /* purecov: inspected */
1246     goto sending; /* purecov: inspected */
1247   }
1248 
1249   if (sql_command_interface->get_server_gtid_executed(server_executed_gtids))
1250   {
1251     log_message(MY_WARNING_LEVEL,
1252                 "Error when extracting this member GTID executed set. "
1253                 "Operations and checks made to group joiners may be incomplete"); /* purecov: inspected */
1254     goto sending; /* purecov: inspected */
1255   }
1256   if (applier_channel.get_retrieved_gtid_set(applier_retrieved_gtids))
1257   {
1258     log_message(MY_WARNING_LEVEL,
1259                 "Error when extracting this member retrieved set for its applier. "
1260                 "Operations and checks made to group joiners may be incomplete"); /* purecov: inspected */
1261   }
1262 
1263   group_member_mgr->update_gtid_sets(local_member_info->get_uuid(),
1264                                      server_executed_gtids,
1265                                      applier_retrieved_gtids);
1266 sending:
1267 
1268   delete sql_command_interface;
1269 
1270   std::vector<uchar> data;
1271 
1272   Group_member_info* local_member_copy= new Group_member_info(*local_member_info);
1273   Group_member_info_manager_message *group_info_message=
1274     new Group_member_info_manager_message(local_member_copy);
1275   group_info_message->encode(&data);
1276   delete group_info_message;
1277 
1278   Gcs_message_data* msg_data= new Gcs_message_data(0, data.size());
1279   msg_data->append_to_payload(&data.front(), data.size());
1280 
1281   return msg_data;
1282 }
1283 
1284 void
1285 Plugin_gcs_events_handler::
update_member_status(const vector<Gcs_member_identifier> & members,Group_member_info::Group_member_status status,Group_member_info::Group_member_status old_status_equal_to,Group_member_info::Group_member_status old_status_different_from) const1286 update_member_status(const vector<Gcs_member_identifier>& members,
1287                      Group_member_info::Group_member_status status,
1288                      Group_member_info::Group_member_status old_status_equal_to,
1289                      Group_member_info::Group_member_status old_status_different_from)
1290                      const
1291 {
1292   for (vector<Gcs_member_identifier>::const_iterator it= members.begin();
1293        it != members.end();
1294        ++it)
1295   {
1296     Gcs_member_identifier member = *it;
1297     Group_member_info* member_info=
1298         group_member_mgr->get_group_member_info_by_member_id(member);
1299 
1300     if (member_info == NULL)
1301     {
1302       //Trying to update a non-existing member
1303       continue;
1304     }
1305 
1306     // if  (the old_status_equal_to is not defined or
1307     //      the previous status is equal to old_status_equal_to)
1308     //    and
1309     //     (the old_status_different_from is not defined or
1310     //      the previous status is different from old_status_different_from)
1311     if ((old_status_equal_to == Group_member_info::MEMBER_END ||
1312         member_info->get_recovery_status() == old_status_equal_to) &&
1313        (old_status_different_from == Group_member_info::MEMBER_END ||
1314         member_info->get_recovery_status() != old_status_different_from))
1315     {
1316       group_member_mgr->update_member_status(member_info->get_uuid(), status);
1317     }
1318 
1319     delete member_info;
1320   }
1321 }
1322 
1323 /**
1324   Here we check:
1325   1) If the number of members was exceeded
1326   2) If member version is compatible with the group
1327   3) If the gtid_assignment_block_size is equal to the group
1328   4) If the hash algorithm used is equal to the group
1329   5) If the member has more known transactions than the group
1330   6) If the member has the same configuration flags that the group has
1331 */
1332 int
check_group_compatibility(size_t number_of_members) const1333 Plugin_gcs_events_handler::check_group_compatibility(size_t number_of_members) const
1334 {
1335   /*
1336     Check if group size did reach the maximum number of members.
1337   */
1338 #ifndef NDEBUG
1339   if (set_number_of_members_on_view_changed_to_10)
1340     number_of_members= 10;
1341 #endif
1342   if (number_of_members > 9)
1343   {
1344     log_message(MY_ERROR_LEVEL,
1345                 "The START GROUP_REPLICATION command failed since the group "
1346                 "already has 9 members");
1347     return GROUP_REPLICATION_MAX_GROUP_SIZE;
1348   }
1349 
1350   /*
1351     Check if the member is compatible with the group.
1352     It can be incompatible because its major version is lower or a rule says it.
1353     If incompatible notify whoever is waiting for the view with an error, so
1354     the plugin exits the group.
1355   */
1356   *joiner_compatibility_status= COMPATIBLE;
1357   int group_data_compatibility= 0;
1358   if (number_of_members > 1)
1359   {
1360     *joiner_compatibility_status= check_version_compatibility_with_group();
1361     group_data_compatibility= compare_member_transaction_sets();
1362   }
1363 
1364   if (*joiner_compatibility_status == INCOMPATIBLE)
1365   {
1366     log_message(MY_ERROR_LEVEL,
1367                 "Member version is incompatible with the group");
1368     return GROUP_REPLICATION_CONFIGURATION_ERROR;
1369   }
1370 
1371   /*
1372     All group members must have the same gtid_assignment_block_size
1373     and transaction-write-set-extraction value, if joiner has a
1374     different value it is not allowed to join.
1375   */
1376   if (number_of_members > 1 &&
1377       compare_member_option_compatibility())
1378   {
1379     return GROUP_REPLICATION_CONFIGURATION_ERROR;
1380   }
1381 
1382   /*
1383     Check that the joiner doesn't has more GTIDs than the rest of the group.
1384     All the executed and received transactions in the group are collected and
1385     merged into a GTID set and all joiner transactions must be contained in it.
1386   */
1387   if (group_data_compatibility)
1388   {
1389     if (group_data_compatibility > 0)
1390     {
1391       if(get_allow_local_disjoint_gtids_join())
1392       {
1393         log_message(MY_WARNING_LEVEL,
1394                     "The member contains transactions not present in the group. "
1395                     "It is only allowed to join due to "
1396                     "group_replication_allow_local_disjoint_gtids_join option");
1397       }
1398       else {
1399         log_message(MY_ERROR_LEVEL,
1400                     "The member contains transactions not present in the group. "
1401                     "The member will now exit the group.");
1402         log_message(MY_INFORMATION_LEVEL,
1403                     "To force this member into the group you can use the "
1404                     "group_replication_allow_local_disjoint_gtids_join option");
1405         return GROUP_REPLICATION_CONFIGURATION_ERROR;
1406       }
1407     }
1408     else //error
1409     {
1410       /* purecov: begin inspected */
1411       if(get_allow_local_disjoint_gtids_join())
1412       {
1413         log_message(MY_WARNING_LEVEL,
1414                     "It was not possible to assess if the member has more "
1415                     "transactions than the group. "
1416                     "It is only allowed to join due to the "
1417                     "group_replication_allow_local_disjoint_gtids_join option");
1418       }
1419       else {
1420         log_message(MY_ERROR_LEVEL,
1421                     "It was not possible to assess if the member has more "
1422                     "transactions than the group. "
1423                     "The member will now exit the group.");
1424         log_message(MY_INFORMATION_LEVEL,
1425                     "To force this member into the group you can use the "
1426                     "group_replication_allow_local_disjoint_gtids_join option");
1427         return GROUP_REPLICATION_CONFIGURATION_ERROR;
1428       }
1429       /* purecov: end */
1430     }
1431   }
1432 
1433   return 0;
1434 }
1435 
1436 Compatibility_type
check_version_compatibility_with_group() const1437 Plugin_gcs_events_handler::check_version_compatibility_with_group() const
1438 {
1439   bool override_lower_incompatibility= false;
1440   Compatibility_type compatibility_type= INCOMPATIBLE;
1441   bool read_compatible= false;
1442 
1443   std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1444   std::vector<Group_member_info*>::iterator all_members_it;
1445   Member_version lowest_version(0xFFFFFF);
1446 
1447   for (all_members_it = all_members->begin();
1448        all_members_it != all_members->end(); all_members_it++)
1449   {
1450     if ((*all_members_it)->get_uuid() != local_member_info->get_uuid() &&
1451         (*all_members_it)->get_member_version() < lowest_version)
1452     {
1453       lowest_version = (*all_members_it)->get_member_version();
1454     }
1455   }
1456 
1457   for (all_members_it= all_members->begin();
1458        all_members_it!= all_members->end();
1459        all_members_it++)
1460   {
1461     Member_version member_version= (*all_members_it)->get_member_version();
1462     compatibility_type=
1463       compatibility_manager->check_local_incompatibility(member_version,
1464                                          member_version == lowest_version);
1465 
1466     if (compatibility_type == READ_COMPATIBLE)
1467     {
1468       read_compatible= true;
1469     }
1470 
1471     if (compatibility_type == INCOMPATIBLE)
1472     {
1473       break;
1474     }
1475 
1476     if (compatibility_type == INCOMPATIBLE_LOWER_VERSION)
1477     {
1478       if (get_allow_local_lower_version_join())
1479       {
1480         /*
1481           Despite between these two members the compatibility type
1482           is INCOMPATIBLE_LOWER_VERSION, when compared with others
1483           group members this server may be INCOMPATIBLE, so we need
1484           to test with all group members.
1485         */
1486         override_lower_incompatibility= true;
1487         compatibility_type= COMPATIBLE;
1488       }
1489       else
1490       {
1491         compatibility_type= INCOMPATIBLE;
1492         break;
1493       }
1494     }
1495   }
1496 
1497   if (compatibility_type != INCOMPATIBLE && override_lower_incompatibility)
1498   {
1499     log_message(MY_INFORMATION_LEVEL,
1500                 "Member version is lower than some group member, but since "
1501                 "option 'group_replication_allow_local_lower_version_join' "
1502                 "is enabled, member will be allowed to join");
1503   }
1504 
1505   if (read_compatible && compatibility_type != INCOMPATIBLE)
1506   {
1507     compatibility_type= READ_COMPATIBLE;
1508   }
1509 
1510   //clean the members
1511   for (all_members_it= all_members->begin();
1512        all_members_it!= all_members->end();
1513        all_members_it++)
1514   {
1515     delete (*all_members_it);
1516   }
1517   delete all_members;
1518 
1519   return compatibility_type;
1520 }
1521 
compare_member_transaction_sets() const1522 int Plugin_gcs_events_handler::compare_member_transaction_sets() const
1523 {
1524   int result= 0;
1525 
1526   Sid_map local_sid_map(NULL);
1527   Sid_map group_sid_map(NULL);
1528   Gtid_set local_member_set(&local_sid_map, NULL);
1529   Gtid_set group_set(&group_sid_map, NULL);
1530 
1531   std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1532   std::vector<Group_member_info*>::iterator all_members_it;
1533   for (all_members_it= all_members->begin();
1534        all_members_it!= all_members->end();
1535        all_members_it++) {
1536 
1537     std::string member_exec_set_str= (*all_members_it)->get_gtid_executed();
1538     std::string applier_ret_set_str= (*all_members_it)->get_gtid_retrieved();
1539     if ((*all_members_it)->get_gcs_member_id() ==
1540             local_member_info->get_gcs_member_id())
1541     {
1542       if (local_member_set.
1543               add_gtid_text(member_exec_set_str.c_str()) != RETURN_STATUS_OK ||
1544           local_member_set.
1545               add_gtid_text(applier_ret_set_str.c_str()) != RETURN_STATUS_OK)
1546       {
1547         /* purecov: begin inspected */
1548         log_message(MY_ERROR_LEVEL,
1549                     "Error processing local GTID sets when comparing this member"
1550                     " transactions against the group");
1551         result= -1;
1552         goto cleaning;
1553         /* purecov: end */
1554       }
1555     }
1556     else
1557     {
1558       if (group_set.
1559               add_gtid_text(member_exec_set_str.c_str()) != RETURN_STATUS_OK ||
1560           group_set.
1561               add_gtid_text(applier_ret_set_str.c_str()) != RETURN_STATUS_OK)
1562       {
1563         /* purecov: begin inspected */
1564         log_message(MY_ERROR_LEVEL,
1565                     "Error processing group GTID sets when comparing this member"
1566                     " transactions with the group");
1567         result= -1;
1568         goto cleaning;
1569         /* purecov: end */
1570       }
1571     }
1572 
1573   }
1574 
1575   /*
1576     Here we only error out if the joiner set is bigger, i.e, if they are equal
1577     no error is returned.
1578     One could argue that if a joiner has the same transaction set as the group
1579     then something is wrong as the group also has transaction associated to
1580     previous view changes.
1581     To reject this cases cause however false negatives when members leave and
1582     quickly rejoin the group or when groups are started by add several nodes at
1583     once.
1584   */
1585   if (!local_member_set.is_subset(&group_set))
1586   {
1587     char *local_gtid_set_buf;
1588     local_member_set.to_string(&local_gtid_set_buf);
1589     char *group_gtid_set_buf;
1590     group_set.to_string(&group_gtid_set_buf);
1591     log_message(MY_ERROR_LEVEL,
1592                 "This member has more executed transactions than those present"
1593                 " in the group. Local transactions: %s > Group transactions: %s",
1594                 local_gtid_set_buf, group_gtid_set_buf);
1595     my_free(local_gtid_set_buf);
1596     my_free(group_gtid_set_buf);
1597     result= 1;
1598   }
1599 
1600 cleaning:
1601 
1602   //clean the members
1603   for (all_members_it= all_members->begin();
1604        all_members_it!= all_members->end();
1605        all_members_it++)
1606   {
1607     delete (*all_members_it);
1608   }
1609   delete all_members;
1610 
1611   return result;
1612 }
1613 
1614 void Plugin_gcs_events_handler::
collect_members_executed_sets(const vector<Gcs_member_identifier> & joining_members,View_change_packet * view_packet) const1615 collect_members_executed_sets(const vector<Gcs_member_identifier> &joining_members,
1616                               View_change_packet *view_packet) const
1617 {
1618   std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1619   std::vector<Group_member_info*>::iterator all_members_it;
1620   for (all_members_it= all_members->begin();
1621        all_members_it!= all_members->end();
1622        all_members_it++)
1623   {
1624 
1625     // Joining/Recovering members don't have valid GTID executed information
1626     if ((*all_members_it)->get_recovery_status() ==
1627             Group_member_info::MEMBER_IN_RECOVERY)
1628     {
1629       continue;
1630     }
1631 
1632     std::string exec_set_str= (*all_members_it)->get_gtid_executed();
1633     view_packet->group_executed_set.push_back(exec_set_str);
1634   }
1635 
1636   //clean the members
1637   for (all_members_it= all_members->begin();
1638        all_members_it!= all_members->end();
1639        all_members_it++)
1640   {
1641     delete (*all_members_it);
1642   }
1643   delete all_members;
1644 }
1645 
1646 int
compare_member_option_compatibility() const1647 Plugin_gcs_events_handler::compare_member_option_compatibility() const
1648 {
1649   int result= 0;
1650 
1651   std::vector<Group_member_info*> *all_members= group_member_mgr->get_all_members();
1652   std::vector<Group_member_info*>::iterator all_members_it;
1653   for (all_members_it= all_members->begin();
1654        all_members_it!= all_members->end();
1655        all_members_it++)
1656   {
1657     if (local_member_info->get_gtid_assignment_block_size() !=
1658         (*all_members_it)->get_gtid_assignment_block_size())
1659     {
1660       result= 1;
1661       log_message(MY_ERROR_LEVEL,
1662                   "The member is configured with a "
1663                   "group_replication_gtid_assignment_block_size option "
1664                   "value '%llu' different from the group '%llu'. "
1665                   "The member will now exit the group.",
1666                   local_member_info->get_gtid_assignment_block_size(),
1667                   (*all_members_it)->get_gtid_assignment_block_size());
1668       goto cleaning;
1669     }
1670 
1671     if (local_member_info->get_write_set_extraction_algorithm() !=
1672        (*all_members_it)->get_write_set_extraction_algorithm())
1673     {
1674       result= 1;
1675       log_message(MY_ERROR_LEVEL,
1676                   "The member is configured with a "
1677                   "transaction-write-set-extraction option "
1678                   "value '%s' different from the group '%s'. "
1679                   "The member will now exit the group.",
1680                   get_write_set_algorithm_string(
1681                       local_member_info->get_write_set_extraction_algorithm()),
1682                   get_write_set_algorithm_string(
1683                       (*all_members_it)->get_write_set_extraction_algorithm()));
1684       goto cleaning;
1685     }
1686 
1687     if (local_member_info->get_configuration_flags() !=
1688         (*all_members_it)->get_configuration_flags())
1689     {
1690       const uint32 member_configuration_flags = (*all_members_it)->get_configuration_flags();
1691       const uint32 local_configuration_flags = local_member_info->get_configuration_flags();
1692 
1693       result= 1;
1694       log_message(MY_ERROR_LEVEL,
1695                   "The member configuration is not compatible with "
1696                   "the group configuration. Variables such as "
1697                   "single_primary_mode or enforce_update_everywhere_checks "
1698                   "must have the same value on every server in the group. "
1699                   "(member configuration option: [%s], group configuration "
1700                   "option: [%s]).",
1701                   Group_member_info::get_configuration_flags_string(local_configuration_flags).c_str(),
1702                   Group_member_info::get_configuration_flags_string(member_configuration_flags).c_str());
1703       goto cleaning;
1704     }
1705 
1706     if ((*all_members_it)->get_lower_case_table_names() !=
1707          DEFAULT_NOT_RECEIVED_LOWER_CASE_TABLE_NAMES &&
1708         local_member_info->get_lower_case_table_names() !=
1709         (*all_members_it)->get_lower_case_table_names())
1710     {
1711       result= 1;
1712       log_message(MY_ERROR_LEVEL,
1713                   "The member is configured with a lower_case_table_names "
1714                   "option value '%lu' different from the group '%lu'. "
1715                   "The member will now exit the group. If there is existing "
1716                   "data on member, it may be incompatible with group if "
1717                   "created with a lower_case_table_names value different from "
1718                   "the group.",
1719                   local_member_info->get_lower_case_table_names(),
1720                   (*all_members_it)->get_lower_case_table_names());
1721       goto cleaning;
1722     }
1723   }
1724 
1725 cleaning:
1726   for (all_members_it= all_members->begin();
1727        all_members_it!= all_members->end();
1728        all_members_it++)
1729     delete (*all_members_it);
1730   delete all_members;
1731 
1732   return result;
1733 }
1734 
1735 void
leave_group_on_error() const1736 Plugin_gcs_events_handler::leave_group_on_error() const
1737 {
1738   Gcs_operations::enum_leave_state state= gcs_module->leave();
1739   int error= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
1740                               stop_wait_timeout);
1741   if (error)
1742   {
1743     log_message(MY_ERROR_LEVEL,
1744                 "Error stopping all replication channels while server was"
1745                 " leaving the group. Please check the error log for additional"
1746                 " details. Got error: %d", error);
1747   }
1748 
1749   std::stringstream ss;
1750   plugin_log_level log_severity= MY_WARNING_LEVEL;
1751   switch (state)
1752   {
1753     case Gcs_operations::ERROR_WHEN_LEAVING:
1754       /* purecov: begin inspected */
1755       ss << "Unable to confirm whether the server has left the group or not. "
1756             "Check performance_schema.replication_group_members to check group membership information.";
1757       log_severity= MY_ERROR_LEVEL;
1758       break;
1759       /* purecov: end */
1760     case Gcs_operations::ALREADY_LEAVING:
1761       /* purecov: begin inspected */
1762       ss << "Skipping leave operation: concurrent attempt to leave the group is on-going.";
1763       break;
1764       /* purecov: end */
1765     case Gcs_operations::ALREADY_LEFT:
1766       /* purecov: begin inspected */
1767       ss << "Skipping leave operation: member already left the group.";
1768       break;
1769       /* purecov: end */
1770     case Gcs_operations::NOW_LEAVING:
1771       return;
1772   }
1773   log_message(log_severity, ss.str().c_str()); /* purecov: inspected */
1774 }
1775