1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <stddef.h>
24 #include <algorithm>
25 #include <list>
26 #include <set>
27 #include <sstream>
28 #include <string>
29 #include <vector>
30 
31 #include <mysql/components/services/log_builtins.h>
32 #include "mutex_lock.h"
33 #include "my_dbug.h"
34 #include "plugin/group_replication/include/autorejoin.h"
35 #include "plugin/group_replication/include/gcs_event_handlers.h"
36 #include "plugin/group_replication/include/leave_group_on_failure.h"
37 #include "plugin/group_replication/include/observer_trans.h"
38 #include "plugin/group_replication/include/pipeline_stats.h"
39 #include "plugin/group_replication/include/plugin.h"
40 #include "plugin/group_replication/include/plugin_handlers/primary_election_invocation_handler.h"
41 #include "plugin/group_replication/include/plugin_handlers/remote_clone_handler.h"
42 #include "plugin/group_replication/include/plugin_messages/group_action_message.h"
43 #include "plugin/group_replication/include/plugin_messages/group_service_message.h"
44 #include "plugin/group_replication/include/plugin_messages/group_validation_message.h"
45 #include "plugin/group_replication/include/plugin_messages/sync_before_execution_message.h"
46 #include "plugin/group_replication/include/plugin_messages/transaction_prepared_message.h"
47 #include "plugin/group_replication/include/plugin_messages/transaction_with_guarantee_message.h"
48 
49 using std::vector;
50 
Plugin_gcs_events_handler(Applier_module_interface * applier_module,Recovery_module * recovery_module,Compatibility_module * compatibility_module,ulong components_stop_timeout)51 Plugin_gcs_events_handler::Plugin_gcs_events_handler(
52     Applier_module_interface *applier_module, Recovery_module *recovery_module,
53     Compatibility_module *compatibility_module, ulong components_stop_timeout)
54     : applier_module(applier_module),
55       recovery_module(recovery_module),
56       compatibility_manager(compatibility_module),
57       stop_wait_timeout(components_stop_timeout) {
58   this->temporary_states =
59       new std::set<Group_member_info *, Group_member_info_pointer_comparator>();
60   this->joiner_compatibility_status = new st_compatibility_types(INCOMPATIBLE);
61 
62 #ifndef DBUG_OFF
63   set_number_of_members_on_view_changed_to_10 = false;
64   DBUG_EXECUTE_IF(
65       "group_replication_set_number_of_members_on_view_changed_to_10",
66       { set_number_of_members_on_view_changed_to_10 = true; };);
67 #endif
68 }
69 
~Plugin_gcs_events_handler()70 Plugin_gcs_events_handler::~Plugin_gcs_events_handler() {
71   delete temporary_states;
72   delete joiner_compatibility_status;
73 }
74 
on_message_received(const Gcs_message & message) const75 void Plugin_gcs_events_handler::on_message_received(
76     const Gcs_message &message) const {
77   Plugin_gcs_message::enum_cargo_type message_type =
78       Plugin_gcs_message::get_cargo_type(
79           message.get_message_data().get_payload());
80 
81   const std::string message_origin = message.get_origin().get_member_id();
82   Plugin_gcs_message *processed_message = nullptr;
83 
84   switch (message_type) {
85     case Plugin_gcs_message::CT_TRANSACTION_MESSAGE:
86       handle_transactional_message(message);
87       break;
88 
89     case Plugin_gcs_message::CT_TRANSACTION_WITH_GUARANTEE_MESSAGE:
90       handle_transactional_with_guarantee_message(message);
91       break;
92 
93     case Plugin_gcs_message::CT_TRANSACTION_PREPARED_MESSAGE:
94       handle_transaction_prepared_message(message);
95       break;
96 
97     case Plugin_gcs_message::CT_SYNC_BEFORE_EXECUTION_MESSAGE:
98       handle_sync_before_execution_message(message);
99       break;
100 
101     case Plugin_gcs_message::CT_CERTIFICATION_MESSAGE:
102       handle_certifier_message(message);
103       break;
104 
105     case Plugin_gcs_message::CT_PIPELINE_STATS_MEMBER_MESSAGE:
106       handle_stats_message(message);
107       break;
108 
109     case Plugin_gcs_message::CT_MESSAGE_SERVICE_MESSAGE: {
110       Group_service_message *service_message = new Group_service_message(
111           message.get_message_data().get_payload(),
112           message.get_message_data().get_payload_length());
113 
114       message_service_handler->add(service_message);
115     } break;
116 
117       /**
118         From this point messages are sent to message listeners and may be
119         skipped Messages above are directly processed and/or for performance we
120         do not want to add that extra weight.
121       */
122 
123     case Plugin_gcs_message::CT_RECOVERY_MESSAGE:
124       processed_message =
125           new Recovery_message(message.get_message_data().get_payload(),
126                                message.get_message_data().get_payload_length());
127       if (!pre_process_message(processed_message, message_origin))
128         handle_recovery_message(processed_message);
129       delete processed_message;
130       break;
131 
132     case Plugin_gcs_message::CT_SINGLE_PRIMARY_MESSAGE:
133       processed_message = new Single_primary_message(
134           message.get_message_data().get_payload(),
135           message.get_message_data().get_payload_length());
136       if (!pre_process_message(processed_message, message_origin))
137         handle_single_primary_message(processed_message);
138       delete processed_message;
139       break;
140 
141     case Plugin_gcs_message::CT_GROUP_ACTION_MESSAGE:
142       handle_group_action_message(message);
143       break;
144 
145     case Plugin_gcs_message::CT_GROUP_VALIDATION_MESSAGE:
146       processed_message = new Group_validation_message(
147           message.get_message_data().get_payload(),
148           message.get_message_data().get_payload_length());
149       pre_process_message(processed_message, message_origin);
150       delete processed_message;
151       break;
152     default:
153       break; /* purecov: inspected */
154   }
155 
156   /*
157    We need to see if a notification should be sent at this
158    point in time because we may have received a recovery
159    message that has updated our state.
160   */
161   notify_and_reset_ctx(m_notification_ctx);
162 }
163 
pre_process_message(Plugin_gcs_message * plugin_message,const std::string & message_origin) const164 bool Plugin_gcs_events_handler::pre_process_message(
165     Plugin_gcs_message *plugin_message,
166     const std::string &message_origin) const {
167   bool skip_message = false;
168   int error = group_events_observation_manager->before_message_handling(
169       *plugin_message, message_origin, &skip_message);
170   return (error || skip_message);
171 }
172 
handle_transactional_message(const Gcs_message & message) const173 void Plugin_gcs_events_handler::handle_transactional_message(
174     const Gcs_message &message) const {
175   const Group_member_info::Group_member_status member_status =
176       local_member_info->get_recovery_status();
177   if ((member_status == Group_member_info::MEMBER_IN_RECOVERY ||
178        member_status == Group_member_info::MEMBER_ONLINE) &&
179       this->applier_module) {
180     if (member_status == Group_member_info::MEMBER_IN_RECOVERY) {
181       applier_module->get_pipeline_stats_member_collector()
182           ->increment_transactions_delivered_during_recovery();
183     }
184 
185     const unsigned char *payload_data = nullptr;
186     size_t payload_size = 0;
187     Plugin_gcs_message::get_first_payload_item_raw_data(
188         message.get_message_data().get_payload(), &payload_data, &payload_size);
189 
190     this->applier_module->handle(payload_data, static_cast<ulong>(payload_size),
191                                  GROUP_REPLICATION_CONSISTENCY_EVENTUAL,
192                                  nullptr);
193   } else {
194     /* purecov: begin inspected */
195     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MSG_DISCARDED);
196     /* purecov: end */
197   }
198 }
199 
handle_transactional_with_guarantee_message(const Gcs_message & message) const200 void Plugin_gcs_events_handler::handle_transactional_with_guarantee_message(
201     const Gcs_message &message) const {
202   const Group_member_info::Group_member_status member_status =
203       local_member_info->get_recovery_status();
204   if ((member_status == Group_member_info::MEMBER_IN_RECOVERY ||
205        member_status == Group_member_info::MEMBER_ONLINE) &&
206       this->applier_module) {
207     if (member_status == Group_member_info::MEMBER_IN_RECOVERY) {
208       applier_module->get_pipeline_stats_member_collector()
209           ->increment_transactions_delivered_during_recovery();
210     }
211 
212     const unsigned char *payload_data = nullptr;
213     size_t payload_size = 0;
214     Plugin_gcs_message::get_first_payload_item_raw_data(
215         message.get_message_data().get_payload(), &payload_data, &payload_size);
216 
217     enum_group_replication_consistency_level consistency_level =
218         Transaction_with_guarantee_message::decode_and_get_consistency_level(
219             message.get_message_data().get_payload(),
220             message.get_message_data().get_payload_length());
221 
222     // Get ONLINE members that did receive this message.
223     std::list<Gcs_member_identifier> *online_members =
224         group_member_mgr->get_online_members_with_guarantees(
225             message.get_origin());
226 
227     this->applier_module->handle(payload_data, static_cast<ulong>(payload_size),
228                                  consistency_level, online_members);
229   } else {
230     /* purecov: begin inspected */
231     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MSG_DISCARDED);
232     /* purecov: end */
233   }
234 }
235 
handle_transaction_prepared_message(const Gcs_message & message) const236 void Plugin_gcs_events_handler::handle_transaction_prepared_message(
237     const Gcs_message &message) const {
238   if (this->applier_module == nullptr) {
239     /* purecov: begin inspected */
240     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MISSING_GRP_RPL_APPLIER);
241     return;
242     /* purecov: end */
243   }
244 
245   Transaction_prepared_message transaction_prepared_message(
246       message.get_message_data().get_payload(),
247       message.get_message_data().get_payload_length());
248 
249   Transaction_prepared_action_packet *transaction_prepared_action =
250       new Transaction_prepared_action_packet(
251           transaction_prepared_message.get_sid(),
252           transaction_prepared_message.get_gno(), message.get_origin());
253   this->applier_module->add_transaction_prepared_action_packet(
254       transaction_prepared_action);
255 }
256 
handle_sync_before_execution_message(const Gcs_message & message) const257 void Plugin_gcs_events_handler::handle_sync_before_execution_message(
258     const Gcs_message &message) const {
259   if (this->applier_module == nullptr) {
260     /* purecov: begin inspected */
261     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MISSING_GRP_RPL_APPLIER);
262     return;
263     /* purecov: end */
264   }
265 
266   Sync_before_execution_message sync_before_execution_message(
267       message.get_message_data().get_payload(),
268       message.get_message_data().get_payload_length());
269 
270   Sync_before_execution_action_packet *sync_before_execution_action =
271       new Sync_before_execution_action_packet(
272           sync_before_execution_message.get_thread_id(), message.get_origin());
273   this->applier_module->add_sync_before_execution_action_packet(
274       sync_before_execution_action);
275 }
276 
handle_certifier_message(const Gcs_message & message) const277 void Plugin_gcs_events_handler::handle_certifier_message(
278     const Gcs_message &message) const {
279   if (this->applier_module == nullptr) {
280     LogPluginErr(ERROR_LEVEL,
281                  ER_GRP_RPL_MISSING_GRP_RPL_APPLIER); /* purecov: inspected */
282     return;                                           /* purecov: inspected */
283   }
284 
285   Certifier_interface *certifier =
286       this->applier_module->get_certification_handler()->get_certifier();
287 
288   const unsigned char *payload_data = nullptr;
289   size_t payload_size = 0;
290   Plugin_gcs_message::get_first_payload_item_raw_data(
291       message.get_message_data().get_payload(), &payload_data, &payload_size);
292 
293   if (certifier->handle_certifier_data(payload_data,
294                                        static_cast<ulong>(payload_size),
295                                        message.get_origin())) {
296     LogPluginErr(
297         ERROR_LEVEL,
298         ER_GRP_RPL_CERTIFIER_MSSG_PROCESS_ERROR); /* purecov: inspected */
299   }
300 }
301 
handle_recovery_message(Plugin_gcs_message * processed_message) const302 void Plugin_gcs_events_handler::handle_recovery_message(
303     Plugin_gcs_message *processed_message) const {
304   Recovery_message *recovery_message = (Recovery_message *)processed_message;
305 
306   std::string member_uuid = recovery_message->get_member_uuid();
307 
308   bool is_local = !member_uuid.compare(local_member_info->get_uuid());
309   if (is_local) {
310     // Only change member status if member is still on recovery.
311     Group_member_info::Group_member_status member_status =
312         local_member_info->get_recovery_status();
313     if (member_status != Group_member_info::MEMBER_IN_RECOVERY) {
314       LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_SRV_NOT_ONLINE,
315                    Group_member_info::get_member_status_string(
316                        member_status)); /* purecov: inspected */
317       return;                           /* purecov: inspected */
318     }
319 
320     LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_SRV_ONLINE);
321 
322     /*
323      The member is declared as online upon receiving this message
324 
325      A notification may be flagged and eventually triggered when
326      the on_message handle is finished.
327     */
328     group_member_mgr->update_member_status(
329         member_uuid, Group_member_info::MEMBER_ONLINE, m_notification_ctx);
330 
331     /*
332       unblock threads waiting for the member to become ONLINE
333     */
334     terminate_wait_on_start_process();
335 
336     /**
337       Re-check compatibility, members may leave during recovery.
338       Disable the read mode in the server if the member is:
339       - joining
340       - doesn't have a higher possible incompatible version
341       - We are not on Primary mode.
342     */
343     disable_read_mode_for_compatible_members(true);
344   } else {
345     Group_member_info *member_info =
346         group_member_mgr->get_group_member_info(member_uuid);
347     if (member_info != nullptr) {
348       LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_MEM_ONLINE,
349                    member_info->get_hostname().c_str(),
350                    member_info->get_port());
351       delete member_info;
352 
353       /*
354        The member is declared as online upon receiving this message
355        We need to run this before running update_recovery_process
356 
357        A notification may be flagged and eventually triggered when
358        the on_message handle is finished.
359       */
360       group_member_mgr->update_member_status(
361           member_uuid, Group_member_info::MEMBER_ONLINE, m_notification_ctx);
362 
363       if (local_member_info->get_recovery_status() ==
364           Group_member_info::MEMBER_IN_RECOVERY) {
365         /*
366           Inform recovery of a possible new donor
367         */
368         recovery_module->update_recovery_process(false, false);
369       }
370     }
371   }
372 
373   /*
374    Check if we were waiting for some server to recover to
375    elect a new leader.
376 
377    Following line protects against servers joining the group
378    while the bootstrapped node has not yet finished recovery.
379    Therefore, it is going to become primary when it finishes recovery.
380    */
381   std::string no_primary("");
382   this->handle_leader_election_if_needed(DEAD_OLD_PRIMARY, no_primary);
383 }
384 
handle_stats_message(const Gcs_message & message) const385 void Plugin_gcs_events_handler::handle_stats_message(
386     const Gcs_message &message) const {
387   if (this->applier_module == nullptr) {
388     LogPluginErr(ERROR_LEVEL,
389                  ER_GRP_RPL_MISSING_GRP_RPL_APPLIER); /* purecov: inspected */
390     return;                                           /* purecov: inspected */
391   }
392 
393   this->applier_module->get_flow_control_module()->handle_stats_data(
394       message.get_message_data().get_payload(),
395       message.get_message_data().get_payload_length(),
396       message.get_origin().get_member_id());
397 }
398 
handle_single_primary_message(Plugin_gcs_message * processed_message) const399 void Plugin_gcs_events_handler::handle_single_primary_message(
400     Plugin_gcs_message *processed_message) const {
401   if (this->applier_module == nullptr) {
402     LogPluginErr(ERROR_LEVEL,
403                  ER_GRP_RPL_MISSING_GRP_RPL_APPLIER); /* purecov: inspected */
404     return;                                           /* purecov: inspected */
405   }
406 
407   Single_primary_message *single_primary_message =
408       (Single_primary_message *)processed_message;
409 
410   if (single_primary_message->get_single_primary_message_type() ==
411       Single_primary_message::SINGLE_PRIMARY_QUEUE_APPLIED_MESSAGE) {
412     Single_primary_action_packet *single_primary_action =
413         new Single_primary_action_packet(
414             Single_primary_action_packet::QUEUE_APPLIED);
415     primary_election_handler->set_election_running(false);
416     this->applier_module->add_single_primary_action_packet(
417         single_primary_action);
418   }
419   if (single_primary_message->get_single_primary_message_type() ==
420       Single_primary_message::SINGLE_PRIMARY_PRIMARY_ELECTION) {
421     primary_election_handler->handle_primary_election_message(
422         single_primary_message, &m_notification_ctx);
423   }
424 }
425 
handle_group_action_message(const Gcs_message & message) const426 void Plugin_gcs_events_handler::handle_group_action_message(
427     const Gcs_message &message) const {
428   if (group_action_coordinator == nullptr) {
429     LogPluginErr(
430         ERROR_LEVEL,
431         ER_GRP_RPL_MISSING_GRP_RPL_ACTION_COORDINATOR); /* purecov: inspected */
432     return;                                             /* purecov: inspected */
433   }
434 
435   Group_action_message::enum_action_message_type action_message_type =
436       Group_action_message::get_action_type(
437           message.get_message_data().get_payload());
438 
439   Group_action_message *group_action_message = nullptr;
440   switch (action_message_type) {
441     case Group_action_message::ACTION_MULTI_PRIMARY_MESSAGE:
442     case Group_action_message::ACTION_PRIMARY_ELECTION_MESSAGE:
443     case Group_action_message::ACTION_SET_COMMUNICATION_PROTOCOL_MESSAGE:
444       group_action_message = new Group_action_message(
445           message.get_message_data().get_payload(),
446           message.get_message_data().get_payload_length());
447       break;
448     default:
449       break; /* purecov: inspected */
450   }
451 
452   if (!pre_process_message(group_action_message,
453                            message.get_origin().get_member_id())) {
454     group_action_coordinator->handle_action_message(
455         group_action_message, message.get_origin().get_member_id());
456   }
457   delete group_action_message;
458 }
459 
on_suspicions(const std::vector<Gcs_member_identifier> & members,const std::vector<Gcs_member_identifier> & unreachable) const460 void Plugin_gcs_events_handler::on_suspicions(
461     const std::vector<Gcs_member_identifier> &members,
462     const std::vector<Gcs_member_identifier> &unreachable) const {
463   if (members.empty() && unreachable.empty())  // nothing to do
464     return;                                    /* purecov: inspected */
465 
466   DBUG_ASSERT(members.size() >= unreachable.size());
467 
468   std::vector<Gcs_member_identifier> tmp_unreachable(unreachable);
469   std::vector<Gcs_member_identifier>::const_iterator mit;
470   std::vector<Gcs_member_identifier>::iterator uit;
471 
472   if (!members.empty()) {
473     for (mit = members.begin(); mit != members.end(); mit++) {
474       Gcs_member_identifier member = *mit;
475       Group_member_info *member_info =
476           group_member_mgr->get_group_member_info_by_member_id(member);
477 
478       if (member_info == nullptr)  // Trying to update a non-existing member
479         continue;                  /* purecov: inspected */
480 
481       uit = std::find(tmp_unreachable.begin(), tmp_unreachable.end(), member);
482       if (uit != tmp_unreachable.end()) {
483         if (!member_info->is_unreachable()) {
484           LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEM_UNREACHABLE,
485                        member_info->get_hostname().c_str(),
486                        member_info->get_port());
487           // flag as a member having changed state
488           m_notification_ctx.set_member_state_changed();
489           member_info->set_unreachable();
490         }
491         // remove to not check again against this one
492         tmp_unreachable.erase(uit);
493       } else {
494         if (member_info->is_unreachable()) {
495           LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEM_REACHABLE,
496                        member_info->get_hostname().c_str(),
497                        member_info->get_port());
498           /* purecov: begin inspected */
499           // flag as a member having changed state
500           m_notification_ctx.set_member_state_changed();
501           member_info->set_reachable();
502           /* purecov: end */
503         }
504       }
505     }
506   }
507 
508   if ((members.size() - unreachable.size()) <= (members.size() / 2)) {
509     if (!group_partition_handler->get_timeout_on_unreachable())
510       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_SRV_BLOCKED);
511     else
512       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_SRV_BLOCKED_FOR_SECS,
513                    group_partition_handler->get_timeout_on_unreachable());
514 
515     if (!group_partition_handler->is_partition_handler_running() &&
516         !group_partition_handler->is_partition_handling_terminated())
517       group_partition_handler->launch_partition_handler_thread();
518 
519     // flag as having lost quorum
520     m_notification_ctx.set_quorum_lost();
521   } else {
522     /*
523       This code is present on on_view_changed and on_suspicions as no assumption
524       can be made about the order in which these methods are invoked.
525     */
526     if (group_partition_handler->is_member_on_partition()) {
527       if (group_partition_handler->abort_partition_handler_if_running()) {
528         LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_CHANGE_GRP_MEM_NOT_PROCESSED);
529       } else {
530         /* If it was not running or we canceled it in time */
531         LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_CONTACT_RESTORED);
532       }
533     }
534   }
535   notify_and_reset_ctx(m_notification_ctx);
536 }
537 
log_members_leaving_message(const Gcs_view & new_view) const538 void Plugin_gcs_events_handler::log_members_leaving_message(
539     const Gcs_view &new_view) const {
540   std::string members_leaving;
541   std::string primary_member_host;
542 
543   get_hosts_from_view(new_view.get_leaving_members(), members_leaving,
544                       primary_member_host);
545 
546   LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_REMOVED,
547                members_leaving.c_str());
548 
549   if (!primary_member_host.empty())
550     LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_PRIMARY_MEMBER_LEFT_GRP,
551                  primary_member_host.c_str());
552 }
553 
log_members_joining_message(const Gcs_view & new_view) const554 void Plugin_gcs_events_handler::log_members_joining_message(
555     const Gcs_view &new_view) const {
556   std::string members_joining;
557   std::string primary_member_host;
558 
559   get_hosts_from_view(new_view.get_joined_members(), members_joining,
560                       primary_member_host);
561 
562   LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_MEMBER_ADDED,
563                members_joining.c_str());
564 }
565 
get_hosts_from_view(const std::vector<Gcs_member_identifier> & members,std::string & all_hosts,std::string & primary_host) const566 void Plugin_gcs_events_handler::get_hosts_from_view(
567     const std::vector<Gcs_member_identifier> &members, std::string &all_hosts,
568     std::string &primary_host) const {
569   std::stringstream hosts_string;
570   std::stringstream primary_string;
571   std::vector<Gcs_member_identifier>::const_iterator all_members_it =
572       members.begin();
573 
574   while (all_members_it != members.end()) {
575     Group_member_info *member_info =
576         group_member_mgr->get_group_member_info_by_member_id((*all_members_it));
577     all_members_it++;
578 
579     if (member_info == nullptr) continue;
580 
581     hosts_string << member_info->get_hostname() << ":"
582                  << member_info->get_port();
583 
584     /**
585      Check in_primary_mode has been added for safety.
586      Since primary role is in single-primary mode.
587     */
588     if (member_info->in_primary_mode() &&
589         member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY) {
590       if (primary_string.rdbuf()->in_avail() != 0) primary_string << ", ";
591       primary_string << member_info->get_hostname() << ":"
592                      << member_info->get_port();
593     }
594 
595     if (all_members_it != members.end()) {
596       hosts_string << ", ";
597     }
598   }
599   all_hosts.assign(hosts_string.str());
600   primary_host.assign(primary_string.str());
601 }
602 
on_view_changed(const Gcs_view & new_view,const Exchanged_data & exchanged_data) const603 void Plugin_gcs_events_handler::on_view_changed(
604     const Gcs_view &new_view, const Exchanged_data &exchanged_data) const {
605   bool is_leaving = is_member_on_vector(new_view.get_leaving_members(),
606                                         local_member_info->get_gcs_member_id());
607 
608   bool is_primary =
609       (local_member_info->in_primary_mode() &&
610        local_member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY);
611 
612   bool is_joining = is_member_on_vector(new_view.get_joined_members(),
613                                         local_member_info->get_gcs_member_id());
614 
615   bool skip_election = false;
616   enum_primary_election_mode election_mode = DEAD_OLD_PRIMARY;
617   std::string suggested_primary("");
618   // Was member expelled from the group due to network failures?
619   if (this->was_member_expelled_from_group(new_view)) {
620     DBUG_ASSERT(is_leaving);
621     group_events_observation_manager->after_view_change(
622         new_view.get_joined_members(), new_view.get_leaving_members(),
623         new_view.get_members(), is_leaving, &skip_election, &election_mode,
624         suggested_primary);
625     goto end;
626   }
627 
628   // An early error on the applier can render the join invalid
629   if (is_joining &&
630       local_member_info->get_recovery_status() ==
631           Group_member_info::MEMBER_ERROR &&
632       !autorejoin_module->is_autorejoin_ongoing()) {
633     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_EXIT_PLUGIN_ERROR);
634     gcs_module->notify_of_view_change_cancellation(
635         GROUP_REPLICATION_CONFIGURATION_ERROR);
636   } else {
637     /*
638       This code is present on on_view_changed and on_suspicions as no assumption
639       can be made about the order in which these methods are invoked.
640     */
641     if (!is_leaving && group_partition_handler->is_member_on_partition()) {
642       if (group_partition_handler->abort_partition_handler_if_running()) {
643         LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_CHANGE_GRP_MEM_NOT_PROCESSED);
644         goto end;
645       } else {
646         /* If it was not running or we canceled it in time */
647         LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_CONTACT_RESTORED);
648       }
649     }
650 
651     /*
652       Maybe on_suspicions we already executed the above block but it was too
653       late. No point in repeating the message, but we need to break the view
654       install.
655     */
656     if (!is_leaving &&
657         group_partition_handler->is_partition_handling_terminated())
658       goto end;
659 
660     if (!is_leaving && new_view.get_leaving_members().size() > 0)
661       log_members_leaving_message(new_view);
662 
663     // update the Group Manager with all the received states
664     if (update_group_info_manager(new_view, exchanged_data, is_joining,
665                                   is_leaving) &&
666         is_joining) {
667       gcs_module->notify_of_view_change_cancellation();
668       return;
669     }
670 
671     if (!is_joining && new_view.get_joined_members().size() > 0)
672       log_members_joining_message(new_view);
673 
674     // enable conflict detection if someone on group have it enabled
675     if (local_member_info->in_primary_mode() &&
676         group_member_mgr->is_conflict_detection_enabled()) {
677       Certifier_interface *certifier =
678           this->applier_module->get_certification_handler()->get_certifier();
679       certifier->enable_conflict_detection();
680     }
681 
682     // Inform any interested handler that the view changed
683     View_change_pipeline_action *vc_action =
684         new View_change_pipeline_action(is_leaving);
685 
686     applier_module->handle_pipeline_action(vc_action);
687     delete vc_action;
688 
689     // Update any recovery running process and handle state changes
690     this->handle_leaving_members(new_view, is_joining, is_leaving);
691 
692     // Handle joining members
693     this->handle_joining_members(new_view, is_joining, is_leaving);
694 
695     if (is_leaving) gcs_module->leave_coordination_member_left();
696 
697     // Signal that the injected view was delivered
698     if (gcs_module->is_injected_view_modification())
699       gcs_module->notify_of_view_change_end();
700 
701     group_events_observation_manager->after_view_change(
702         new_view.get_joined_members(), new_view.get_leaving_members(),
703         new_view.get_members(), is_leaving, &skip_election, &election_mode,
704         suggested_primary);
705 
706     // Handle leader election if needed
707     if (!skip_election && !is_leaving) {
708       this->handle_leader_election_if_needed(election_mode, suggested_primary);
709     }
710   }
711 
712   if (!is_leaving) {
713     std::string view_id_representation = "";
714     Gcs_view *view = gcs_module->get_current_view();
715     if (view != nullptr) {
716       view_id_representation = view->get_view_id().get_representation();
717       delete view;
718     }
719     disable_read_mode_for_compatible_members();
720     LogPluginErr(
721         SYSTEM_LEVEL, ER_GRP_RPL_MEMBER_CHANGE,
722         group_member_mgr->get_string_current_view_active_hosts().c_str(),
723         view_id_representation.c_str());
724   } else {
725     LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_MEMBER_LEFT_GRP);
726   }
727 
728 end:
729   /* if I am the primary and I am leaving, notify about role change */
730   if (is_leaving && is_primary) {
731     group_member_mgr->update_member_role(
732         local_member_info->get_uuid(), Group_member_info::MEMBER_ROLE_SECONDARY,
733         m_notification_ctx);
734   }
735 
736   /* flag view change */
737   m_notification_ctx.set_view_changed();
738   if (is_leaving)
739     /*
740       The leave view is an optimistic and local view.
741       Therefore its ID is not meaningful, since it is not
742       a global one.
743      */
744     m_notification_ctx.set_view_id("");
745   else
746     m_notification_ctx.set_view_id(new_view.get_view_id().get_representation());
747 
748   /* trigger notification */
749   notify_and_reset_ctx(m_notification_ctx);
750 }
751 
was_member_expelled_from_group(const Gcs_view & view) const752 bool Plugin_gcs_events_handler::was_member_expelled_from_group(
753     const Gcs_view &view) const {
754   DBUG_TRACE;
755   bool result = false;
756 
757   if (view.get_error_code() == Gcs_view::MEMBER_EXPELLED) {
758     result = true;
759     const char *exit_state_action_abort_log_message =
760         "Member was expelled from the group due to network failures.";
761     leave_group_on_failure::mask leave_actions;
762     leave_actions.set(leave_group_on_failure::ALREADY_LEFT_GROUP, true);
763     leave_actions.set(leave_group_on_failure::CLEAN_GROUP_MEMBERSHIP, true);
764     leave_actions.set(leave_group_on_failure::STOP_APPLIER, true);
765     leave_actions.set(leave_group_on_failure::HANDLE_EXIT_STATE_ACTION, true);
766     leave_actions.set(leave_group_on_failure::HANDLE_AUTO_REJOIN, true);
767     leave_group_on_failure::leave(leave_actions, ER_GRP_RPL_MEMBER_EXPELLED,
768                                   PSESSION_INIT_THREAD, &m_notification_ctx,
769                                   exit_state_action_abort_log_message);
770   }
771 
772   return result;
773 }
774 
handle_leader_election_if_needed(enum_primary_election_mode election_mode,std::string & suggested_primary) const775 void Plugin_gcs_events_handler::handle_leader_election_if_needed(
776     enum_primary_election_mode election_mode,
777     std::string &suggested_primary) const {
778   /*
779     Can we get here when a change to multi master is cancelled and is being
780     undone? Yes but only on situations where the action was killed or the member
781     is stopping that will always result in a plugin restart.
782   */
783   if (election_mode == DEAD_OLD_PRIMARY &&
784       !local_member_info->in_primary_mode())
785     return;
786 
787   primary_election_handler->execute_primary_election(
788       suggested_primary, election_mode, &m_notification_ctx);
789 }
790 
update_group_info_manager(const Gcs_view & new_view,const Exchanged_data & exchanged_data,bool is_joining,bool is_leaving) const791 int Plugin_gcs_events_handler::update_group_info_manager(
792     const Gcs_view &new_view, const Exchanged_data &exchanged_data,
793     bool is_joining, bool is_leaving) const {
794   int error = 0;
795 
796   // update the Group Manager with all the received states
797   vector<Group_member_info *> to_update;
798 
799   if (!is_leaving) {
800     // Process local state of exchanged data.
801     if ((error = process_local_exchanged_data(exchanged_data, is_joining)))
802       goto err;
803 
804     to_update.insert(to_update.end(), temporary_states->begin(),
805                      temporary_states->end());
806 
807     // Clean-up members that are leaving
808     vector<Gcs_member_identifier> leaving = new_view.get_leaving_members();
809     vector<Gcs_member_identifier>::iterator left_it;
810     vector<Group_member_info *>::iterator to_update_it;
811     for (left_it = leaving.begin(); left_it != leaving.end(); left_it++) {
812       for (to_update_it = to_update.begin(); to_update_it != to_update.end();
813            to_update_it++) {
814         if ((*left_it) == (*to_update_it)->get_gcs_member_id()) {
815           /* purecov: begin inspected */
816           delete (*to_update_it);
817           to_update.erase(to_update_it);
818           break;
819           /* purecov: end */
820         }
821       }
822     }
823   }
824   group_member_mgr->update(&to_update);
825   temporary_states->clear();
826 
827 err:
828   DBUG_ASSERT(temporary_states->size() == 0);
829   return error;
830 }
831 
handle_joining_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const832 void Plugin_gcs_events_handler::handle_joining_members(const Gcs_view &new_view,
833                                                        bool is_joining,
834                                                        bool is_leaving) const {
835   // nothing to do here
836   size_t number_of_members = new_view.get_members().size();
837   if (number_of_members == 0 || is_leaving) {
838     return;
839   }
840   size_t number_of_joining_members = new_view.get_joined_members().size();
841   size_t number_of_leaving_members = new_view.get_leaving_members().size();
842 
843   /*
844    If we are joining, 3 scenarios exist:
845    1) We are incompatible with the group so we leave
846    2) We are alone so we declare ourselves online
847    3) We are in a group and recovery must happen
848   */
849   if (is_joining) {
850     int error = 0;
851     if ((error = check_group_compatibility(number_of_members))) {
852       gcs_module->notify_of_view_change_cancellation(error);
853       return;
854     }
855     gcs_module->notify_of_view_change_end();
856 
857     /**
858      On the joining list there can be 2 types of members: online/recovering
859      members coming from old views where this member was not present and new
860      joining members that still have their status as offline.
861 
862      As so, for offline members, their state is changed to member_in_recovery
863      after member compatibility with group is checked.
864     */
865     update_member_status(
866         new_view.get_joined_members(), Group_member_info::MEMBER_IN_RECOVERY,
867         Group_member_info::MEMBER_OFFLINE, Group_member_info::MEMBER_END);
868 
869     /** Is an election running while I'm joining?*/
870     primary_election_handler->set_election_running(
871         is_group_running_a_primary_election());
872 
873     /**
874       Set the read mode if not set during start (auto-start)
875     */
876     if (enable_server_read_mode(PSESSION_DEDICATED_THREAD)) {
877       /*
878         The notification will be triggered in the top level handle function
879         that calls this one. In this case, the on_view_changed handle.
880       */
881       leave_group_on_failure::mask leave_actions;
882       leave_actions.set(leave_group_on_failure::SKIP_SET_READ_ONLY, true);
883       leave_actions.set(leave_group_on_failure::SKIP_LEAVE_VIEW_WAIT, true);
884       leave_group_on_failure::leave(
885           leave_actions, ER_GRP_RPL_SUPER_READ_ONLY_ACTIVATE_ERROR,
886           PSESSION_DEDICATED_THREAD, &m_notification_ctx, "");
887       set_plugin_is_setting_read_mode(false);
888 
889       return;
890     } else {
891       set_plugin_is_setting_read_mode(false);
892     }
893 
894     /**
895       On the joining member log an error when group contains more members than
896       auto_increment_increment variable.
897     */
898     ulong auto_increment_increment = get_auto_increment_increment();
899 
900     if (!local_member_info->in_primary_mode() &&
901         new_view.get_members().size() > auto_increment_increment) {
902       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_EXCEEDS_AUTO_INC_VALUE,
903                    new_view.get_members().size(), auto_increment_increment);
904     }
905 
906     /*
907      During the view change, a suspension packet is sent to the applier module
908      so all posterior transactions inbound are not applied, but queued, until
909      the member finishes recovery.
910     */
911     applier_module->add_suspension_packet();
912 
913     /*
914      Marking the view in the joiner since the incoming event from the donor
915      is discarded in the Recovery process.
916      */
917 
918     std::string view_id = new_view.get_view_id().get_representation();
919     View_change_packet *view_change_packet = new View_change_packet(view_id);
920     applier_module->add_view_change_packet(view_change_packet);
921 
922     /*
923      Chose what is the strategy for recovery.
924      Note that even if clone is chosen, if an error occurs on its launch,
925      incremental recovery is again selected as the default choice.
926     */
927     Remote_clone_handler::enum_clone_check_result recovery_strategy =
928         Remote_clone_handler::DO_RECOVERY;
929 
930     // The check is not needed if the member is alone
931     if (number_of_members > 1)
932       recovery_strategy = remote_clone_handler->check_clone_preconditions();
933 
934     if (Remote_clone_handler::DO_CLONE == recovery_strategy) {
935       LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_RECOVERY_STRAT_CHOICE,
936                    "Cloning from a remote group donor.");
937       /*
938        Launch the clone process. It will configure SSL options and the list
939        of allowed donors.
940        When terminated, the clone process will restart the server.
941        The whole start join process is still done as an error on cloning can
942        mean we fall back to incremental recovery.
943       */
944       if (remote_clone_handler->clone_server(
945               new_view.get_group_id().get_group_id(),
946               new_view.get_view_id().get_representation())) {
947         /* purecov: begin inspected */
948         LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_RECOVERY_STRAT_FALLBACK,
949                      "Incremental Recovery.");
950         recovery_strategy = Remote_clone_handler::DO_RECOVERY;
951         /* purecov: end */
952       }
953     }
954 
955     if (Remote_clone_handler::DO_RECOVERY == recovery_strategy) {
956       LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_RECOVERY_STRAT_CHOICE,
957                    "Incremental recovery from a group donor");
958       /*
959        Launch the recovery thread so we can receive missing data and the
960        certification information needed to apply the transactions queued after
961        this view change.
962 
963        Recovery receives a view id, as a means to identify logically on joiners
964        and donors alike where this view change happened in the data. With that
965        info we can then ask for the donor to give the member all the data until
966        this point in the data, and the certification information for all the
967        data that comes next.
968 
969        When alone, the server will go through Recovery to wait for the
970        consumption of his applier relay log that may contain transactions from
971        previous executions.
972       */
973       recovery_module->start_recovery(
974           new_view.get_group_id().get_group_id(),
975           new_view.get_view_id().get_representation());
976     } else if (Remote_clone_handler::CHECK_ERROR == recovery_strategy ||
977                Remote_clone_handler::NO_RECOVERY_POSSIBLE ==
978                    recovery_strategy) {
979       if (Remote_clone_handler::NO_RECOVERY_POSSIBLE == recovery_strategy)
980         LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NO_POSSIBLE_RECOVERY);
981       else {
982         /* purecov: begin inspected */
983         LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_RECOVERY_EVAL_ERROR, "");
984         /* purecov: end */
985       }
986 
987       /*
988         The notification will be triggered in the top level handle function
989         that calls this one. In this case, the on_view_changed handle.
990       */
991       leave_group_on_failure::mask leave_actions;
992       leave_actions.set(leave_group_on_failure::SKIP_LEAVE_VIEW_WAIT, true);
993       leave_group_on_failure::leave(leave_actions, 0, PSESSION_DEDICATED_THREAD,
994                                     &m_notification_ctx, "");
995       return;
996     }
997   }
998   /*
999     The condition
1000       number_of_joining_members == 0 && number_of_leaving_members == 0
1001     is needed due to the following scenario:
1002     We have a group with 2 members, one does crash (M2), and the group
1003     blocks with M1 ONLINE and M2 UNREACHABLE.
1004     Then M2 rejoins and the group unblocks.
1005     When M2 rejoins the group, from M2 perspective it is joining
1006     the group, that is, it does receive a view (V3) on which it is
1007     marked as a joining member.
1008     But from M1 perspective, M2 may never left, so the view delivered
1009     (V3) has the same members as V2, that is, M1 and M2, without joining
1010     members, thence we need to consider that condition and log that view.
1011   */
1012   else if (number_of_joining_members > 0 ||
1013            (number_of_joining_members == 0 && number_of_leaving_members == 0)) {
1014     /**
1015      On the joining list there can be 2 types of members: online/recovering
1016      members coming from old views where this member was not present and new
1017      joining members that still have their status as offline.
1018 
1019      As so, for offline members, their state is changed to member_in_recovery.
1020     */
1021     update_member_status(
1022         new_view.get_joined_members(), Group_member_info::MEMBER_IN_RECOVERY,
1023         Group_member_info::MEMBER_OFFLINE, Group_member_info::MEMBER_END);
1024     /**
1025      If not a joining member, all members should record on their own binlogs a
1026      marking event that identifies the frontier between the data the joining
1027      member was to receive and the data it should queue.
1028      The joining member can then wait for this event to know it was all the
1029      needed data.
1030 
1031      This packet will also pass in the certification process at this exact
1032      frontier giving us the opportunity to gather the necessary certification
1033      information to certify the transactions that will come after this view
1034      change. If selected as a donor, this info will also be sent to the joiner.
1035 
1036      Associated to this process, we collect and intersect the executed GTID sets
1037      of all ONLINE members so we can cut the certification info to gather and
1038      transmit to the minimum.
1039     */
1040 
1041     std::string view_id = new_view.get_view_id().get_representation();
1042     View_change_packet *view_change_packet = new View_change_packet(view_id);
1043     collect_members_executed_sets(view_change_packet);
1044     applier_module->add_view_change_packet(view_change_packet);
1045 
1046     if (group_action_coordinator->is_group_action_running()) {
1047       LogPluginErr(WARNING_LEVEL,
1048                    ER_GRP_RPL_JOINER_EXIT_WHEN_GROUP_ACTION_RUNNING);
1049     }
1050   }
1051 }
1052 
handle_leaving_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const1053 void Plugin_gcs_events_handler::handle_leaving_members(const Gcs_view &new_view,
1054                                                        bool is_joining,
1055                                                        bool is_leaving) const {
1056   Group_member_info::Group_member_status member_status =
1057       local_member_info->get_recovery_status();
1058 
1059   bool members_left = (new_view.get_leaving_members().size() > 0);
1060 
1061   // if the member is joining or not in recovery, no need to update the process
1062   if (!is_joining && member_status == Group_member_info::MEMBER_IN_RECOVERY) {
1063     /*
1064      This method has 2 purposes:
1065      If a donor leaves, recovery needs to switch donor
1066      If this member leaves, recovery needs to shutdown.
1067     */
1068     recovery_module->update_recovery_process(members_left, is_leaving);
1069   }
1070 
1071   if (members_left) {
1072     update_member_status(
1073         new_view.get_leaving_members(), Group_member_info::MEMBER_OFFLINE,
1074         Group_member_info::MEMBER_END, Group_member_info::MEMBER_ERROR);
1075 
1076     if (!is_leaving) {
1077       Leaving_members_action_packet *leaving_members_action =
1078           new Leaving_members_action_packet(new_view.get_leaving_members());
1079       this->applier_module->add_leaving_members_action_packet(
1080           leaving_members_action);
1081     }
1082   }
1083 
1084   if (is_leaving) {
1085     gcs_module->notify_of_view_change_end();
1086   }
1087 }
1088 
is_member_on_vector(const vector<Gcs_member_identifier> & members,const Gcs_member_identifier & member_id) const1089 bool Plugin_gcs_events_handler::is_member_on_vector(
1090     const vector<Gcs_member_identifier> &members,
1091     const Gcs_member_identifier &member_id) const {
1092   vector<Gcs_member_identifier>::const_iterator it;
1093 
1094   it = std::find(members.begin(), members.end(), member_id);
1095 
1096   return it != members.end();
1097 }
1098 
process_local_exchanged_data(const Exchanged_data & exchanged_data,bool is_joining) const1099 int Plugin_gcs_events_handler::process_local_exchanged_data(
1100     const Exchanged_data &exchanged_data, bool is_joining) const {
1101   uint local_uuid_found = 0;
1102 
1103   /*
1104   For now, we are only carrying Group Member Info on Exchangeable data
1105   Since we are receiving the state from all Group members, one shall
1106   store it in a set to ensure that we don't have repetitions.
1107 
1108   All collected data will be given to Group Member Manager at view install
1109   time.
1110   */
1111   for (Exchanged_data::const_iterator exchanged_data_it =
1112            exchanged_data.begin();
1113        exchanged_data_it != exchanged_data.end(); exchanged_data_it++) {
1114     const uchar *data = exchanged_data_it->second->get_payload();
1115     size_t length = exchanged_data_it->second->get_payload_length();
1116     Gcs_member_identifier *member_id = exchanged_data_it->first;
1117     if (data == nullptr) {
1118       /* purecov: begin inspected */
1119       Group_member_info *member_info =
1120           group_member_mgr->get_group_member_info_by_member_id(*member_id);
1121       if (member_info != nullptr) {
1122         LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_DATA_NOT_PROVIDED_BY_MEM,
1123                      member_info->get_hostname().c_str(),
1124                      member_info->get_port());
1125       }
1126       continue;
1127       /* purecov: end */
1128     }
1129 
1130     // Process data provided by member.
1131     vector<Group_member_info *> *member_infos =
1132         group_member_mgr->decode(data, length);
1133 
1134     // This construct is here in order to deallocate memory of duplicates
1135     vector<Group_member_info *>::iterator member_infos_it;
1136     for (member_infos_it = member_infos->begin();
1137          member_infos_it != member_infos->end(); member_infos_it++) {
1138       if (local_member_info->get_uuid() == (*member_infos_it)->get_uuid()) {
1139         local_uuid_found++;
1140       }
1141 
1142       /*
1143         Accept only the information the member has about himself
1144         Information received about other members is probably outdated
1145       */
1146       if (local_uuid_found < 2 &&
1147           (*member_infos_it)->get_gcs_member_id() == *member_id) {
1148         this->temporary_states->insert((*member_infos_it));
1149       } else {
1150         delete (*member_infos_it); /* purecov: inspected */
1151       }
1152     }
1153 
1154     member_infos->clear();
1155     delete member_infos;
1156 
1157     if (local_uuid_found > 1) {
1158       if (is_joining) {
1159         LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_ALREADY_EXISTS,
1160                      local_member_info->get_uuid().c_str());
1161       }
1162 
1163       // Clean up temporary states.
1164       std::set<Group_member_info *,
1165                Group_member_info_pointer_comparator>::iterator
1166           temporary_states_it;
1167       for (temporary_states_it = temporary_states->begin();
1168            temporary_states_it != temporary_states->end();
1169            temporary_states_it++) {
1170         delete (*temporary_states_it);
1171       }
1172       temporary_states->clear();
1173 
1174       return 1;
1175     }
1176   }
1177 
1178   return 0;
1179 }
1180 
get_exchangeable_data() const1181 Gcs_message_data *Plugin_gcs_events_handler::get_exchangeable_data() const {
1182   std::string server_executed_gtids;
1183   std::string server_purged_gtids;
1184   std::string applier_retrieved_gtids;
1185   Replication_thread_api applier_channel("group_replication_applier");
1186 
1187   Sql_service_command_interface *sql_command_interface =
1188       new Sql_service_command_interface();
1189 
1190   if (sql_command_interface->establish_session_connection(
1191           PSESSION_DEDICATED_THREAD, GROUPREPL_USER, get_plugin_pointer())) {
1192     /* purecov: begin inspected */
1193     LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_GRP_CHANGE_INFO_EXTRACT_ERROR);
1194     goto sending;
1195     /* purecov: end */
1196   }
1197 
1198   if (sql_command_interface->get_server_gtid_executed(server_executed_gtids)) {
1199     /* purecov: begin inspected */
1200     LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_GTID_EXECUTED_EXTRACT_ERROR);
1201     goto sending;
1202     /* purecov: inspected */
1203   }
1204   if (sql_command_interface->get_server_gtid_purged(server_purged_gtids)) {
1205     /* purecov: begin inspected */
1206     LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_GTID_PURGED_EXTRACT_ERROR);
1207     goto sending;
1208     /* purecov: end */
1209   }
1210   if (applier_channel.get_retrieved_gtid_set(applier_retrieved_gtids)) {
1211     LogPluginErr(WARNING_LEVEL,
1212                  ER_GRP_RPL_GTID_SET_EXTRACT_ERROR); /* purecov: inspected */
1213   }
1214 
1215   group_member_mgr->update_gtid_sets(local_member_info->get_uuid(),
1216                                      server_executed_gtids, server_purged_gtids,
1217                                      applier_retrieved_gtids);
1218 sending:
1219 
1220   delete sql_command_interface;
1221 
1222   std::vector<uchar> data;
1223 
1224   /*
1225     When a member is auto-rejoining, it starts in the ERROR state. Normally
1226     the member would only change to the RECOVERY state when it was OFFLINE,
1227     but we want the member to be in ERROR state when an auto-rejoin occurs,
1228     so we force the change from ERROR to RECOVERY when the member is
1229     undergoing an auto-rejoin procedure.
1230     We do that change on the data exchange just before the view install, so
1231     that when all members do receive the view on which this member joins
1232     all do see the correct RECOVERY state.
1233   */
1234   if (autorejoin_module->is_autorejoin_ongoing() &&
1235       !get_error_state_due_to_error_during_autorejoin()) {
1236     group_member_mgr->update_member_status(
1237         local_member_info->get_uuid(), Group_member_info::MEMBER_IN_RECOVERY,
1238         m_notification_ctx);
1239   }
1240 
1241   // alert joiners that an action or election is running
1242   local_member_info->set_is_group_action_running(
1243       group_action_coordinator->is_group_action_running());
1244   local_member_info->set_is_primary_election_running(
1245       primary_election_handler->is_an_election_running());
1246   Group_member_info *local_member_copy =
1247       new Group_member_info(*local_member_info);
1248   Group_member_info_manager_message *group_info_message =
1249       new Group_member_info_manager_message(local_member_copy);
1250   group_info_message->encode(&data);
1251   delete group_info_message;
1252 
1253   Gcs_message_data *msg_data = new Gcs_message_data(0, data.size());
1254   msg_data->append_to_payload(&data.front(), data.size());
1255 
1256   return msg_data;
1257 }
1258 
update_member_status(const vector<Gcs_member_identifier> & members,Group_member_info::Group_member_status status,Group_member_info::Group_member_status old_status_equal_to,Group_member_info::Group_member_status old_status_different_from) const1259 void Plugin_gcs_events_handler::update_member_status(
1260     const vector<Gcs_member_identifier> &members,
1261     Group_member_info::Group_member_status status,
1262     Group_member_info::Group_member_status old_status_equal_to,
1263     Group_member_info::Group_member_status old_status_different_from) const {
1264   for (vector<Gcs_member_identifier>::const_iterator it = members.begin();
1265        it != members.end(); ++it) {
1266     Gcs_member_identifier member = *it;
1267     Group_member_info *member_info =
1268         group_member_mgr->get_group_member_info_by_member_id(member);
1269 
1270     if (member_info == nullptr) {
1271       // Trying to update a non-existing member
1272       continue;
1273     }
1274 
1275     // if  (the old_status_equal_to is not defined or
1276     //      the previous status is equal to old_status_equal_to)
1277     //    and
1278     //     (the old_status_different_from is not defined or
1279     //      the previous status is different from old_status_different_from)
1280     if ((old_status_equal_to == Group_member_info::MEMBER_END ||
1281          member_info->get_recovery_status() == old_status_equal_to) &&
1282         (old_status_different_from == Group_member_info::MEMBER_END ||
1283          member_info->get_recovery_status() != old_status_different_from)) {
1284       /*
1285         The notification will be handled on the top level handle
1286         function that calls this one down the stack.
1287       */
1288       group_member_mgr->update_member_status(member_info->get_uuid(), status,
1289                                              m_notification_ctx);
1290     }
1291   }
1292 }
1293 
1294 /**
1295   Here we check:
1296   1) If the number of members was exceeded
1297   2) If member version is compatible with the group
1298   3) If the gtid_assignment_block_size is equal to the group
1299   4) If the hash algorithm used is equal to the group
1300   5) If the member has more known transactions than the group
1301   6) If the member has the same configuration flags that the group has
1302 */
check_group_compatibility(size_t number_of_members) const1303 int Plugin_gcs_events_handler::check_group_compatibility(
1304     size_t number_of_members) const {
1305 /*
1306   Check if group size did reach the maximum number of members.
1307 */
1308 #ifndef DBUG_OFF
1309   if (set_number_of_members_on_view_changed_to_10) number_of_members = 10;
1310 #endif
1311   if (number_of_members > 9) {
1312     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_START_FAILED);
1313     return GROUP_REPLICATION_MAX_GROUP_SIZE;
1314   }
1315 
1316   /*
1317     Check if the member is compatible with the group.
1318     It can be incompatible because its major version is lower or a rule says it.
1319     If incompatible notify whoever is waiting for the view with an error, so
1320     the plugin exits the group.
1321   */
1322   *joiner_compatibility_status = COMPATIBLE;
1323   int group_data_compatibility = 0;
1324   if (number_of_members > 1) {
1325     *joiner_compatibility_status = check_version_compatibility_with_group();
1326     group_data_compatibility = compare_member_transaction_sets();
1327   }
1328 
1329   if (*joiner_compatibility_status == INCOMPATIBLE) {
1330     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_VER_INCOMPATIBLE);
1331     return GROUP_REPLICATION_CONFIGURATION_ERROR;
1332   }
1333   if (*joiner_compatibility_status == READ_COMPATIBLE) {
1334     LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_VER_READ_COMPATIBLE);
1335   }
1336 
1337   /*
1338     All group members must have the same gtid_assignment_block_size
1339     and transaction-write-set-extraction value, if joiner has a
1340     different value it is not allowed to join.
1341   */
1342   if (number_of_members > 1 && compare_member_option_compatibility()) {
1343     return GROUP_REPLICATION_CONFIGURATION_ERROR;
1344   }
1345 
1346   /*
1347     Check that the joiner doesn't has more GTIDs than the rest of the group.
1348     All the executed and received transactions in the group are collected and
1349     merged into a GTID set and all joiner transactions must be contained in it.
1350   */
1351   if (group_data_compatibility) {
1352     if (group_data_compatibility > 0) {
1353       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_TRANS_NOT_PRESENT_IN_GRP);
1354       return GROUP_REPLICATION_CONFIGURATION_ERROR;
1355     } else  // error
1356     {
1357       /* purecov: begin inspected */
1358       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_TRANS_GREATER_THAN_GRP);
1359       return GROUP_REPLICATION_CONFIGURATION_ERROR;
1360       /* purecov: end */
1361     }
1362   }
1363 
1364   if (is_group_running_a_configuration_change()) {
1365     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_JOIN_WHEN_GROUP_ACTION_RUNNING);
1366     return GROUP_REPLICATION_CONFIGURATION_ERROR;
1367   }
1368 
1369   return 0;
1370 }
1371 
1372 Compatibility_type
check_version_compatibility_with_group() const1373 Plugin_gcs_events_handler::check_version_compatibility_with_group() const {
1374   bool override_lower_incompatibility = false;
1375   Compatibility_type compatibility_type = COMPATIBLE;
1376   bool read_compatible = false;
1377 
1378   std::vector<Group_member_info *> *all_members =
1379       group_member_mgr->get_all_members();
1380   std::vector<Group_member_info *>::iterator all_members_it;
1381 
1382   Member_version lowest_version(0xFFFFFF);
1383   std::set<Member_version> unique_version_set;
1384   /* Find lowest member version and unique versions of the group for
1385    * comparison. */
1386   for (all_members_it = all_members->begin();
1387        all_members_it != all_members->end(); all_members_it++) {
1388     /* Skip self */
1389     if ((*all_members_it)->get_uuid() != local_member_info->get_uuid()) {
1390       if ((*all_members_it)->get_member_version() < lowest_version)
1391         lowest_version = (*all_members_it)->get_member_version();
1392       unique_version_set.insert((*all_members_it)->get_member_version());
1393     }
1394   }
1395   for (auto it = unique_version_set.begin();
1396        it != unique_version_set.end() && compatibility_type != INCOMPATIBLE;
1397        ++it) {
1398     Member_version ver(*it);
1399     compatibility_type = compatibility_manager->check_local_incompatibility(
1400         ver, (ver == lowest_version));
1401 
1402     if (compatibility_type == READ_COMPATIBLE) {
1403       read_compatible = true;
1404     }
1405 
1406     if (compatibility_type == INCOMPATIBLE_LOWER_VERSION) {
1407       if (get_allow_local_lower_version_join()) {
1408         /*
1409           Despite between these two members the compatibility type
1410           is INCOMPATIBLE_LOWER_VERSION, when compared with others
1411           group members this server may be INCOMPATIBLE, so we need
1412           to test with all group members.
1413         */
1414         override_lower_incompatibility = true;
1415         compatibility_type = COMPATIBLE;
1416       } else {
1417         compatibility_type = INCOMPATIBLE;
1418       }
1419     }
1420   }
1421 
1422   if (compatibility_type != INCOMPATIBLE && override_lower_incompatibility) {
1423     LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_MEMBER_VERSION_LOWER_THAN_GRP);
1424   }
1425 
1426   if (read_compatible && compatibility_type != INCOMPATIBLE) {
1427     compatibility_type = READ_COMPATIBLE;
1428   }
1429 
1430   // clean the members
1431   for (all_members_it = all_members->begin();
1432        all_members_it != all_members->end(); all_members_it++) {
1433     delete (*all_members_it);
1434   }
1435   delete all_members;
1436 
1437   return compatibility_type;
1438 }
1439 
compare_member_transaction_sets() const1440 int Plugin_gcs_events_handler::compare_member_transaction_sets() const {
1441   int result = 0;
1442 
1443   Sid_map local_sid_map(nullptr);
1444   Sid_map group_sid_map(nullptr);
1445   Gtid_set local_member_set(&local_sid_map, nullptr);
1446   Gtid_set group_set(&group_sid_map, nullptr);
1447 
1448   std::vector<Group_member_info *> *all_members =
1449       group_member_mgr->get_all_members();
1450   std::vector<Group_member_info *>::iterator all_members_it;
1451   for (all_members_it = all_members->begin();
1452        all_members_it != all_members->end(); all_members_it++) {
1453     std::string member_exec_set_str = (*all_members_it)->get_gtid_executed();
1454     std::string applier_ret_set_str = (*all_members_it)->get_gtid_retrieved();
1455     if ((*all_members_it)->get_gcs_member_id() ==
1456         local_member_info->get_gcs_member_id()) {
1457       if (local_member_set.add_gtid_text(member_exec_set_str.c_str()) !=
1458               RETURN_STATUS_OK ||
1459           local_member_set.add_gtid_text(applier_ret_set_str.c_str()) !=
1460               RETURN_STATUS_OK) {
1461         /* purecov: begin inspected */
1462         LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_LOCAL_GTID_SETS_PROCESS_ERROR);
1463         result = -1;
1464         goto cleaning;
1465         /* purecov: end */
1466       }
1467     } else {
1468       if (group_set.add_gtid_text(member_exec_set_str.c_str()) !=
1469               RETURN_STATUS_OK ||
1470           group_set.add_gtid_text(applier_ret_set_str.c_str()) !=
1471               RETURN_STATUS_OK) {
1472         /* purecov: begin inspected */
1473         LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_LOCAL_GTID_SETS_PROCESS_ERROR);
1474         result = -1;
1475         goto cleaning;
1476         /* purecov: end */
1477       }
1478     }
1479   }
1480 
1481   /*
1482     Here we only error out if the joiner set is bigger, i.e, if they are equal
1483     no error is returned.
1484     One could argue that if a joiner has the same transaction set as the group
1485     then something is wrong as the group also has transaction associated to
1486     previous view changes.
1487     To reject this cases cause however false negatives when members leave and
1488     quickly rejoin the group or when groups are started by add several nodes at
1489     once.
1490   */
1491   if (!local_member_set.is_subset(&group_set)) {
1492     char *local_gtid_set_buf;
1493     local_member_set.to_string(&local_gtid_set_buf);
1494     char *group_gtid_set_buf;
1495     group_set.to_string(&group_gtid_set_buf);
1496     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_TRANS_GREATER_THAN_GRP,
1497                  local_gtid_set_buf, group_gtid_set_buf);
1498     my_free(local_gtid_set_buf);
1499     my_free(group_gtid_set_buf);
1500     result = 1;
1501   }
1502 
1503 cleaning:
1504 
1505   // clean the members
1506   for (all_members_it = all_members->begin();
1507        all_members_it != all_members->end(); all_members_it++) {
1508     delete (*all_members_it);
1509   }
1510   delete all_members;
1511 
1512   return result;
1513 }
1514 
collect_members_executed_sets(View_change_packet * view_packet) const1515 void Plugin_gcs_events_handler::collect_members_executed_sets(
1516     View_change_packet *view_packet) const {
1517   std::vector<Group_member_info *> *all_members =
1518       group_member_mgr->get_all_members();
1519   std::vector<Group_member_info *>::iterator all_members_it;
1520   for (all_members_it = all_members->begin();
1521        all_members_it != all_members->end(); all_members_it++) {
1522     // Joining/Recovering members don't have valid GTID executed information
1523     if ((*all_members_it)->get_recovery_status() ==
1524         Group_member_info::MEMBER_IN_RECOVERY) {
1525       continue;
1526     }
1527 
1528     std::string exec_set_str = (*all_members_it)->get_gtid_executed();
1529     view_packet->group_executed_set.push_back(exec_set_str);
1530   }
1531 
1532   // clean the members
1533   for (all_members_it = all_members->begin();
1534        all_members_it != all_members->end(); all_members_it++) {
1535     delete (*all_members_it);
1536   }
1537   delete all_members;
1538 }
1539 
compare_member_option_compatibility() const1540 int Plugin_gcs_events_handler::compare_member_option_compatibility() const {
1541   int result = 0;
1542 
1543   std::vector<Group_member_info *> *all_members =
1544       group_member_mgr->get_all_members();
1545   std::vector<Group_member_info *>::iterator all_members_it;
1546   for (all_members_it = all_members->begin();
1547        all_members_it != all_members->end(); all_members_it++) {
1548     if (local_member_info->get_gtid_assignment_block_size() !=
1549         (*all_members_it)->get_gtid_assignment_block_size()) {
1550       result = 1;
1551       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_BLOCK_SIZE_DIFF_FROM_GRP,
1552                    local_member_info->get_gtid_assignment_block_size(),
1553                    (*all_members_it)->get_gtid_assignment_block_size());
1554       goto cleaning;
1555     }
1556 
1557     if (local_member_info->get_write_set_extraction_algorithm() !=
1558         (*all_members_it)->get_write_set_extraction_algorithm()) {
1559       result = 1;
1560       LogPluginErr(
1561           ERROR_LEVEL, ER_GRP_RPL_TRANS_WRITE_SET_EXTRACT_DIFF_FROM_GRP,
1562           get_write_set_algorithm_string(
1563               local_member_info->get_write_set_extraction_algorithm()),
1564           get_write_set_algorithm_string(
1565               (*all_members_it)->get_write_set_extraction_algorithm()));
1566       goto cleaning;
1567     }
1568 
1569     if (local_member_info->get_configuration_flags() !=
1570         (*all_members_it)->get_configuration_flags()) {
1571       const uint32 member_configuration_flags =
1572           (*all_members_it)->get_configuration_flags();
1573       const uint32 local_configuration_flags =
1574           local_member_info->get_configuration_flags();
1575 
1576       result = 1;
1577       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_CFG_INCOMPATIBLE_WITH_GRP_CFG,
1578                    Group_member_info::get_configuration_flags_string(
1579                        local_configuration_flags)
1580                        .c_str(),
1581                    Group_member_info::get_configuration_flags_string(
1582                        member_configuration_flags)
1583                        .c_str());
1584       goto cleaning;
1585     }
1586 
1587     if ((*all_members_it)->get_lower_case_table_names() !=
1588             DEFAULT_NOT_RECEIVED_LOWER_CASE_TABLE_NAMES &&
1589         local_member_info->get_lower_case_table_names() !=
1590             (*all_members_it)->get_lower_case_table_names()) {
1591       result = 1;
1592       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_LOWER_CASE_TABLE_NAMES_DIFF_FROM_GRP,
1593                    local_member_info->get_lower_case_table_names(),
1594                    (*all_members_it)->get_lower_case_table_names());
1595       goto cleaning;
1596     }
1597 
1598     if (local_member_info->get_default_table_encryption() !=
1599         (*all_members_it)->get_default_table_encryption()) {
1600       result = 1;
1601       LogPluginErr(ERROR_LEVEL,
1602                    ER_GRP_RPL_DEFAULT_TABLE_ENCRYPTION_DIFF_FROM_GRP,
1603                    local_member_info->get_default_table_encryption(),
1604                    (*all_members_it)->get_default_table_encryption());
1605       goto cleaning;
1606     }
1607   }
1608 
1609 cleaning:
1610   for (all_members_it = all_members->begin();
1611        all_members_it != all_members->end(); all_members_it++)
1612     delete (*all_members_it);
1613   delete all_members;
1614 
1615   return result;
1616 }
1617 
is_group_running_a_configuration_change() const1618 bool Plugin_gcs_events_handler::is_group_running_a_configuration_change()
1619     const {
1620   bool is_action_running = false;
1621   std::vector<Group_member_info *> *all_members =
1622       group_member_mgr->get_all_members();
1623   for (Group_member_info *member_info : *all_members) {
1624     if (member_info->is_group_action_running()) {
1625       is_action_running = true;
1626       break;
1627     }
1628   }
1629   for (Group_member_info *member_info : *all_members) delete member_info;
1630   delete all_members;
1631 
1632   return is_action_running;
1633 }
1634 
is_group_running_a_primary_election() const1635 bool Plugin_gcs_events_handler::is_group_running_a_primary_election() const {
1636   bool is_election_running = false;
1637   std::vector<Group_member_info *> *all_members =
1638       group_member_mgr->get_all_members();
1639   for (Group_member_info *member_info : *all_members) {
1640     if (member_info->is_primary_election_running()) {
1641       is_election_running = true;
1642       break;
1643     }
1644   }
1645   for (Group_member_info *member_info : *all_members) delete member_info;
1646   delete all_members;
1647 
1648   return is_election_running;
1649 }
1650 
disable_read_mode_for_compatible_members(bool force_check) const1651 void Plugin_gcs_events_handler::disable_read_mode_for_compatible_members(
1652     bool force_check) const {
1653   Member_version lowest_version =
1654       group_member_mgr->get_group_lowest_online_version();
1655   /* We need to lock the operations of group_member_mgr so that member does not
1656    * changes it state to ERROR and enables read only mode after we check its
1657    * state here. If we read old ONLINE value and continue to disable read mode,
1658    * member will continue to be writable even in ERROR state. So lock protects
1659    * from this situation. */
1660   MUTEX_LOCK(lock, group_member_mgr->get_update_lock());
1661   if (local_member_info->get_recovery_status() ==
1662           Group_member_info::MEMBER_ONLINE &&
1663       (force_check || *joiner_compatibility_status != COMPATIBLE)) {
1664     *joiner_compatibility_status =
1665         Compatibility_module::check_version_incompatibility(
1666             local_member_info->get_member_version(), lowest_version);
1667     /* Some lower version left the group, now this member is new lowest
1668      * version. */
1669     if ((!local_member_info->in_primary_mode() &&
1670          *joiner_compatibility_status == COMPATIBLE) ||
1671         (local_member_info->in_primary_mode() &&
1672          local_member_info->get_role() ==
1673              Group_member_info::MEMBER_ROLE_PRIMARY)) {
1674       if (disable_server_read_mode(PSESSION_DEDICATED_THREAD)) {
1675         LogPluginErr(WARNING_LEVEL,
1676                      ER_GRP_RPL_DISABLE_SRV_READ_MODE_RESTRICTED);
1677       }
1678     }
1679   }
1680 }
1681