1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <stddef.h>
24 #include <algorithm>
25 #include <list>
26 #include <set>
27 #include <sstream>
28 #include <string>
29 #include <vector>
30
31 #include <mysql/components/services/log_builtins.h>
32 #include "mutex_lock.h"
33 #include "my_dbug.h"
34 #include "plugin/group_replication/include/autorejoin.h"
35 #include "plugin/group_replication/include/gcs_event_handlers.h"
36 #include "plugin/group_replication/include/leave_group_on_failure.h"
37 #include "plugin/group_replication/include/observer_trans.h"
38 #include "plugin/group_replication/include/pipeline_stats.h"
39 #include "plugin/group_replication/include/plugin.h"
40 #include "plugin/group_replication/include/plugin_handlers/primary_election_invocation_handler.h"
41 #include "plugin/group_replication/include/plugin_handlers/remote_clone_handler.h"
42 #include "plugin/group_replication/include/plugin_messages/group_action_message.h"
43 #include "plugin/group_replication/include/plugin_messages/group_service_message.h"
44 #include "plugin/group_replication/include/plugin_messages/group_validation_message.h"
45 #include "plugin/group_replication/include/plugin_messages/sync_before_execution_message.h"
46 #include "plugin/group_replication/include/plugin_messages/transaction_prepared_message.h"
47 #include "plugin/group_replication/include/plugin_messages/transaction_with_guarantee_message.h"
48
49 using std::vector;
50
Plugin_gcs_events_handler(Applier_module_interface * applier_module,Recovery_module * recovery_module,Compatibility_module * compatibility_module,ulong components_stop_timeout)51 Plugin_gcs_events_handler::Plugin_gcs_events_handler(
52 Applier_module_interface *applier_module, Recovery_module *recovery_module,
53 Compatibility_module *compatibility_module, ulong components_stop_timeout)
54 : applier_module(applier_module),
55 recovery_module(recovery_module),
56 compatibility_manager(compatibility_module),
57 stop_wait_timeout(components_stop_timeout) {
58 this->temporary_states =
59 new std::set<Group_member_info *, Group_member_info_pointer_comparator>();
60 this->joiner_compatibility_status = new st_compatibility_types(INCOMPATIBLE);
61
62 #ifndef DBUG_OFF
63 set_number_of_members_on_view_changed_to_10 = false;
64 DBUG_EXECUTE_IF(
65 "group_replication_set_number_of_members_on_view_changed_to_10",
66 { set_number_of_members_on_view_changed_to_10 = true; };);
67 #endif
68 }
69
~Plugin_gcs_events_handler()70 Plugin_gcs_events_handler::~Plugin_gcs_events_handler() {
71 delete temporary_states;
72 delete joiner_compatibility_status;
73 }
74
on_message_received(const Gcs_message & message) const75 void Plugin_gcs_events_handler::on_message_received(
76 const Gcs_message &message) const {
77 Plugin_gcs_message::enum_cargo_type message_type =
78 Plugin_gcs_message::get_cargo_type(
79 message.get_message_data().get_payload());
80
81 const std::string message_origin = message.get_origin().get_member_id();
82 Plugin_gcs_message *processed_message = nullptr;
83
84 switch (message_type) {
85 case Plugin_gcs_message::CT_TRANSACTION_MESSAGE:
86 handle_transactional_message(message);
87 break;
88
89 case Plugin_gcs_message::CT_TRANSACTION_WITH_GUARANTEE_MESSAGE:
90 handle_transactional_with_guarantee_message(message);
91 break;
92
93 case Plugin_gcs_message::CT_TRANSACTION_PREPARED_MESSAGE:
94 handle_transaction_prepared_message(message);
95 break;
96
97 case Plugin_gcs_message::CT_SYNC_BEFORE_EXECUTION_MESSAGE:
98 handle_sync_before_execution_message(message);
99 break;
100
101 case Plugin_gcs_message::CT_CERTIFICATION_MESSAGE:
102 handle_certifier_message(message);
103 break;
104
105 case Plugin_gcs_message::CT_PIPELINE_STATS_MEMBER_MESSAGE:
106 handle_stats_message(message);
107 break;
108
109 case Plugin_gcs_message::CT_MESSAGE_SERVICE_MESSAGE: {
110 Group_service_message *service_message = new Group_service_message(
111 message.get_message_data().get_payload(),
112 message.get_message_data().get_payload_length());
113
114 message_service_handler->add(service_message);
115 } break;
116
117 /**
118 From this point messages are sent to message listeners and may be
119 skipped Messages above are directly processed and/or for performance we
120 do not want to add that extra weight.
121 */
122
123 case Plugin_gcs_message::CT_RECOVERY_MESSAGE:
124 processed_message =
125 new Recovery_message(message.get_message_data().get_payload(),
126 message.get_message_data().get_payload_length());
127 if (!pre_process_message(processed_message, message_origin))
128 handle_recovery_message(processed_message);
129 delete processed_message;
130 break;
131
132 case Plugin_gcs_message::CT_SINGLE_PRIMARY_MESSAGE:
133 processed_message = new Single_primary_message(
134 message.get_message_data().get_payload(),
135 message.get_message_data().get_payload_length());
136 if (!pre_process_message(processed_message, message_origin))
137 handle_single_primary_message(processed_message);
138 delete processed_message;
139 break;
140
141 case Plugin_gcs_message::CT_GROUP_ACTION_MESSAGE:
142 handle_group_action_message(message);
143 break;
144
145 case Plugin_gcs_message::CT_GROUP_VALIDATION_MESSAGE:
146 processed_message = new Group_validation_message(
147 message.get_message_data().get_payload(),
148 message.get_message_data().get_payload_length());
149 pre_process_message(processed_message, message_origin);
150 delete processed_message;
151 break;
152 default:
153 break; /* purecov: inspected */
154 }
155
156 /*
157 We need to see if a notification should be sent at this
158 point in time because we may have received a recovery
159 message that has updated our state.
160 */
161 notify_and_reset_ctx(m_notification_ctx);
162 }
163
pre_process_message(Plugin_gcs_message * plugin_message,const std::string & message_origin) const164 bool Plugin_gcs_events_handler::pre_process_message(
165 Plugin_gcs_message *plugin_message,
166 const std::string &message_origin) const {
167 bool skip_message = false;
168 int error = group_events_observation_manager->before_message_handling(
169 *plugin_message, message_origin, &skip_message);
170 return (error || skip_message);
171 }
172
handle_transactional_message(const Gcs_message & message) const173 void Plugin_gcs_events_handler::handle_transactional_message(
174 const Gcs_message &message) const {
175 const Group_member_info::Group_member_status member_status =
176 local_member_info->get_recovery_status();
177 if ((member_status == Group_member_info::MEMBER_IN_RECOVERY ||
178 member_status == Group_member_info::MEMBER_ONLINE) &&
179 this->applier_module) {
180 if (member_status == Group_member_info::MEMBER_IN_RECOVERY) {
181 applier_module->get_pipeline_stats_member_collector()
182 ->increment_transactions_delivered_during_recovery();
183 }
184
185 const unsigned char *payload_data = nullptr;
186 size_t payload_size = 0;
187 Plugin_gcs_message::get_first_payload_item_raw_data(
188 message.get_message_data().get_payload(), &payload_data, &payload_size);
189
190 this->applier_module->handle(payload_data, static_cast<ulong>(payload_size),
191 GROUP_REPLICATION_CONSISTENCY_EVENTUAL,
192 nullptr);
193 } else {
194 /* purecov: begin inspected */
195 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MSG_DISCARDED);
196 /* purecov: end */
197 }
198 }
199
handle_transactional_with_guarantee_message(const Gcs_message & message) const200 void Plugin_gcs_events_handler::handle_transactional_with_guarantee_message(
201 const Gcs_message &message) const {
202 const Group_member_info::Group_member_status member_status =
203 local_member_info->get_recovery_status();
204 if ((member_status == Group_member_info::MEMBER_IN_RECOVERY ||
205 member_status == Group_member_info::MEMBER_ONLINE) &&
206 this->applier_module) {
207 if (member_status == Group_member_info::MEMBER_IN_RECOVERY) {
208 applier_module->get_pipeline_stats_member_collector()
209 ->increment_transactions_delivered_during_recovery();
210 }
211
212 const unsigned char *payload_data = nullptr;
213 size_t payload_size = 0;
214 Plugin_gcs_message::get_first_payload_item_raw_data(
215 message.get_message_data().get_payload(), &payload_data, &payload_size);
216
217 enum_group_replication_consistency_level consistency_level =
218 Transaction_with_guarantee_message::decode_and_get_consistency_level(
219 message.get_message_data().get_payload(),
220 message.get_message_data().get_payload_length());
221
222 // Get ONLINE members that did receive this message.
223 std::list<Gcs_member_identifier> *online_members =
224 group_member_mgr->get_online_members_with_guarantees(
225 message.get_origin());
226
227 this->applier_module->handle(payload_data, static_cast<ulong>(payload_size),
228 consistency_level, online_members);
229 } else {
230 /* purecov: begin inspected */
231 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MSG_DISCARDED);
232 /* purecov: end */
233 }
234 }
235
handle_transaction_prepared_message(const Gcs_message & message) const236 void Plugin_gcs_events_handler::handle_transaction_prepared_message(
237 const Gcs_message &message) const {
238 if (this->applier_module == nullptr) {
239 /* purecov: begin inspected */
240 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MISSING_GRP_RPL_APPLIER);
241 return;
242 /* purecov: end */
243 }
244
245 Transaction_prepared_message transaction_prepared_message(
246 message.get_message_data().get_payload(),
247 message.get_message_data().get_payload_length());
248
249 Transaction_prepared_action_packet *transaction_prepared_action =
250 new Transaction_prepared_action_packet(
251 transaction_prepared_message.get_sid(),
252 transaction_prepared_message.get_gno(), message.get_origin());
253 this->applier_module->add_transaction_prepared_action_packet(
254 transaction_prepared_action);
255 }
256
handle_sync_before_execution_message(const Gcs_message & message) const257 void Plugin_gcs_events_handler::handle_sync_before_execution_message(
258 const Gcs_message &message) const {
259 if (this->applier_module == nullptr) {
260 /* purecov: begin inspected */
261 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MISSING_GRP_RPL_APPLIER);
262 return;
263 /* purecov: end */
264 }
265
266 Sync_before_execution_message sync_before_execution_message(
267 message.get_message_data().get_payload(),
268 message.get_message_data().get_payload_length());
269
270 Sync_before_execution_action_packet *sync_before_execution_action =
271 new Sync_before_execution_action_packet(
272 sync_before_execution_message.get_thread_id(), message.get_origin());
273 this->applier_module->add_sync_before_execution_action_packet(
274 sync_before_execution_action);
275 }
276
handle_certifier_message(const Gcs_message & message) const277 void Plugin_gcs_events_handler::handle_certifier_message(
278 const Gcs_message &message) const {
279 if (this->applier_module == nullptr) {
280 LogPluginErr(ERROR_LEVEL,
281 ER_GRP_RPL_MISSING_GRP_RPL_APPLIER); /* purecov: inspected */
282 return; /* purecov: inspected */
283 }
284
285 Certifier_interface *certifier =
286 this->applier_module->get_certification_handler()->get_certifier();
287
288 const unsigned char *payload_data = nullptr;
289 size_t payload_size = 0;
290 Plugin_gcs_message::get_first_payload_item_raw_data(
291 message.get_message_data().get_payload(), &payload_data, &payload_size);
292
293 if (certifier->handle_certifier_data(payload_data,
294 static_cast<ulong>(payload_size),
295 message.get_origin())) {
296 LogPluginErr(
297 ERROR_LEVEL,
298 ER_GRP_RPL_CERTIFIER_MSSG_PROCESS_ERROR); /* purecov: inspected */
299 }
300 }
301
handle_recovery_message(Plugin_gcs_message * processed_message) const302 void Plugin_gcs_events_handler::handle_recovery_message(
303 Plugin_gcs_message *processed_message) const {
304 Recovery_message *recovery_message = (Recovery_message *)processed_message;
305
306 std::string member_uuid = recovery_message->get_member_uuid();
307
308 bool is_local = !member_uuid.compare(local_member_info->get_uuid());
309 if (is_local) {
310 // Only change member status if member is still on recovery.
311 Group_member_info::Group_member_status member_status =
312 local_member_info->get_recovery_status();
313 if (member_status != Group_member_info::MEMBER_IN_RECOVERY) {
314 LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_SRV_NOT_ONLINE,
315 Group_member_info::get_member_status_string(
316 member_status)); /* purecov: inspected */
317 return; /* purecov: inspected */
318 }
319
320 LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_SRV_ONLINE);
321
322 /*
323 The member is declared as online upon receiving this message
324
325 A notification may be flagged and eventually triggered when
326 the on_message handle is finished.
327 */
328 group_member_mgr->update_member_status(
329 member_uuid, Group_member_info::MEMBER_ONLINE, m_notification_ctx);
330
331 /*
332 unblock threads waiting for the member to become ONLINE
333 */
334 terminate_wait_on_start_process();
335
336 /**
337 Re-check compatibility, members may leave during recovery.
338 Disable the read mode in the server if the member is:
339 - joining
340 - doesn't have a higher possible incompatible version
341 - We are not on Primary mode.
342 */
343 disable_read_mode_for_compatible_members(true);
344 } else {
345 Group_member_info *member_info =
346 group_member_mgr->get_group_member_info(member_uuid);
347 if (member_info != nullptr) {
348 LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_MEM_ONLINE,
349 member_info->get_hostname().c_str(),
350 member_info->get_port());
351 delete member_info;
352
353 /*
354 The member is declared as online upon receiving this message
355 We need to run this before running update_recovery_process
356
357 A notification may be flagged and eventually triggered when
358 the on_message handle is finished.
359 */
360 group_member_mgr->update_member_status(
361 member_uuid, Group_member_info::MEMBER_ONLINE, m_notification_ctx);
362
363 if (local_member_info->get_recovery_status() ==
364 Group_member_info::MEMBER_IN_RECOVERY) {
365 /*
366 Inform recovery of a possible new donor
367 */
368 recovery_module->update_recovery_process(false, false);
369 }
370 }
371 }
372
373 /*
374 Check if we were waiting for some server to recover to
375 elect a new leader.
376
377 Following line protects against servers joining the group
378 while the bootstrapped node has not yet finished recovery.
379 Therefore, it is going to become primary when it finishes recovery.
380 */
381 std::string no_primary("");
382 this->handle_leader_election_if_needed(DEAD_OLD_PRIMARY, no_primary);
383 }
384
handle_stats_message(const Gcs_message & message) const385 void Plugin_gcs_events_handler::handle_stats_message(
386 const Gcs_message &message) const {
387 if (this->applier_module == nullptr) {
388 LogPluginErr(ERROR_LEVEL,
389 ER_GRP_RPL_MISSING_GRP_RPL_APPLIER); /* purecov: inspected */
390 return; /* purecov: inspected */
391 }
392
393 this->applier_module->get_flow_control_module()->handle_stats_data(
394 message.get_message_data().get_payload(),
395 message.get_message_data().get_payload_length(),
396 message.get_origin().get_member_id());
397 }
398
handle_single_primary_message(Plugin_gcs_message * processed_message) const399 void Plugin_gcs_events_handler::handle_single_primary_message(
400 Plugin_gcs_message *processed_message) const {
401 if (this->applier_module == nullptr) {
402 LogPluginErr(ERROR_LEVEL,
403 ER_GRP_RPL_MISSING_GRP_RPL_APPLIER); /* purecov: inspected */
404 return; /* purecov: inspected */
405 }
406
407 Single_primary_message *single_primary_message =
408 (Single_primary_message *)processed_message;
409
410 if (single_primary_message->get_single_primary_message_type() ==
411 Single_primary_message::SINGLE_PRIMARY_QUEUE_APPLIED_MESSAGE) {
412 Single_primary_action_packet *single_primary_action =
413 new Single_primary_action_packet(
414 Single_primary_action_packet::QUEUE_APPLIED);
415 primary_election_handler->set_election_running(false);
416 this->applier_module->add_single_primary_action_packet(
417 single_primary_action);
418 }
419 if (single_primary_message->get_single_primary_message_type() ==
420 Single_primary_message::SINGLE_PRIMARY_PRIMARY_ELECTION) {
421 primary_election_handler->handle_primary_election_message(
422 single_primary_message, &m_notification_ctx);
423 }
424 }
425
handle_group_action_message(const Gcs_message & message) const426 void Plugin_gcs_events_handler::handle_group_action_message(
427 const Gcs_message &message) const {
428 if (group_action_coordinator == nullptr) {
429 LogPluginErr(
430 ERROR_LEVEL,
431 ER_GRP_RPL_MISSING_GRP_RPL_ACTION_COORDINATOR); /* purecov: inspected */
432 return; /* purecov: inspected */
433 }
434
435 Group_action_message::enum_action_message_type action_message_type =
436 Group_action_message::get_action_type(
437 message.get_message_data().get_payload());
438
439 Group_action_message *group_action_message = nullptr;
440 switch (action_message_type) {
441 case Group_action_message::ACTION_MULTI_PRIMARY_MESSAGE:
442 case Group_action_message::ACTION_PRIMARY_ELECTION_MESSAGE:
443 case Group_action_message::ACTION_SET_COMMUNICATION_PROTOCOL_MESSAGE:
444 group_action_message = new Group_action_message(
445 message.get_message_data().get_payload(),
446 message.get_message_data().get_payload_length());
447 break;
448 default:
449 break; /* purecov: inspected */
450 }
451
452 if (!pre_process_message(group_action_message,
453 message.get_origin().get_member_id())) {
454 group_action_coordinator->handle_action_message(
455 group_action_message, message.get_origin().get_member_id());
456 }
457 delete group_action_message;
458 }
459
on_suspicions(const std::vector<Gcs_member_identifier> & members,const std::vector<Gcs_member_identifier> & unreachable) const460 void Plugin_gcs_events_handler::on_suspicions(
461 const std::vector<Gcs_member_identifier> &members,
462 const std::vector<Gcs_member_identifier> &unreachable) const {
463 if (members.empty() && unreachable.empty()) // nothing to do
464 return; /* purecov: inspected */
465
466 DBUG_ASSERT(members.size() >= unreachable.size());
467
468 std::vector<Gcs_member_identifier> tmp_unreachable(unreachable);
469 std::vector<Gcs_member_identifier>::const_iterator mit;
470 std::vector<Gcs_member_identifier>::iterator uit;
471
472 if (!members.empty()) {
473 for (mit = members.begin(); mit != members.end(); mit++) {
474 Gcs_member_identifier member = *mit;
475 Group_member_info *member_info =
476 group_member_mgr->get_group_member_info_by_member_id(member);
477
478 if (member_info == nullptr) // Trying to update a non-existing member
479 continue; /* purecov: inspected */
480
481 uit = std::find(tmp_unreachable.begin(), tmp_unreachable.end(), member);
482 if (uit != tmp_unreachable.end()) {
483 if (!member_info->is_unreachable()) {
484 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEM_UNREACHABLE,
485 member_info->get_hostname().c_str(),
486 member_info->get_port());
487 // flag as a member having changed state
488 m_notification_ctx.set_member_state_changed();
489 member_info->set_unreachable();
490 }
491 // remove to not check again against this one
492 tmp_unreachable.erase(uit);
493 } else {
494 if (member_info->is_unreachable()) {
495 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEM_REACHABLE,
496 member_info->get_hostname().c_str(),
497 member_info->get_port());
498 /* purecov: begin inspected */
499 // flag as a member having changed state
500 m_notification_ctx.set_member_state_changed();
501 member_info->set_reachable();
502 /* purecov: end */
503 }
504 }
505 }
506 }
507
508 if ((members.size() - unreachable.size()) <= (members.size() / 2)) {
509 if (!group_partition_handler->get_timeout_on_unreachable())
510 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_SRV_BLOCKED);
511 else
512 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_SRV_BLOCKED_FOR_SECS,
513 group_partition_handler->get_timeout_on_unreachable());
514
515 if (!group_partition_handler->is_partition_handler_running() &&
516 !group_partition_handler->is_partition_handling_terminated())
517 group_partition_handler->launch_partition_handler_thread();
518
519 // flag as having lost quorum
520 m_notification_ctx.set_quorum_lost();
521 } else {
522 /*
523 This code is present on on_view_changed and on_suspicions as no assumption
524 can be made about the order in which these methods are invoked.
525 */
526 if (group_partition_handler->is_member_on_partition()) {
527 if (group_partition_handler->abort_partition_handler_if_running()) {
528 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_CHANGE_GRP_MEM_NOT_PROCESSED);
529 } else {
530 /* If it was not running or we canceled it in time */
531 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_CONTACT_RESTORED);
532 }
533 }
534 }
535 notify_and_reset_ctx(m_notification_ctx);
536 }
537
log_members_leaving_message(const Gcs_view & new_view) const538 void Plugin_gcs_events_handler::log_members_leaving_message(
539 const Gcs_view &new_view) const {
540 std::string members_leaving;
541 std::string primary_member_host;
542
543 get_hosts_from_view(new_view.get_leaving_members(), members_leaving,
544 primary_member_host);
545
546 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_REMOVED,
547 members_leaving.c_str());
548
549 if (!primary_member_host.empty())
550 LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_PRIMARY_MEMBER_LEFT_GRP,
551 primary_member_host.c_str());
552 }
553
log_members_joining_message(const Gcs_view & new_view) const554 void Plugin_gcs_events_handler::log_members_joining_message(
555 const Gcs_view &new_view) const {
556 std::string members_joining;
557 std::string primary_member_host;
558
559 get_hosts_from_view(new_view.get_joined_members(), members_joining,
560 primary_member_host);
561
562 LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_MEMBER_ADDED,
563 members_joining.c_str());
564 }
565
get_hosts_from_view(const std::vector<Gcs_member_identifier> & members,std::string & all_hosts,std::string & primary_host) const566 void Plugin_gcs_events_handler::get_hosts_from_view(
567 const std::vector<Gcs_member_identifier> &members, std::string &all_hosts,
568 std::string &primary_host) const {
569 std::stringstream hosts_string;
570 std::stringstream primary_string;
571 std::vector<Gcs_member_identifier>::const_iterator all_members_it =
572 members.begin();
573
574 while (all_members_it != members.end()) {
575 Group_member_info *member_info =
576 group_member_mgr->get_group_member_info_by_member_id((*all_members_it));
577 all_members_it++;
578
579 if (member_info == nullptr) continue;
580
581 hosts_string << member_info->get_hostname() << ":"
582 << member_info->get_port();
583
584 /**
585 Check in_primary_mode has been added for safety.
586 Since primary role is in single-primary mode.
587 */
588 if (member_info->in_primary_mode() &&
589 member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY) {
590 if (primary_string.rdbuf()->in_avail() != 0) primary_string << ", ";
591 primary_string << member_info->get_hostname() << ":"
592 << member_info->get_port();
593 }
594
595 if (all_members_it != members.end()) {
596 hosts_string << ", ";
597 }
598 }
599 all_hosts.assign(hosts_string.str());
600 primary_host.assign(primary_string.str());
601 }
602
on_view_changed(const Gcs_view & new_view,const Exchanged_data & exchanged_data) const603 void Plugin_gcs_events_handler::on_view_changed(
604 const Gcs_view &new_view, const Exchanged_data &exchanged_data) const {
605 bool is_leaving = is_member_on_vector(new_view.get_leaving_members(),
606 local_member_info->get_gcs_member_id());
607
608 bool is_primary =
609 (local_member_info->in_primary_mode() &&
610 local_member_info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY);
611
612 bool is_joining = is_member_on_vector(new_view.get_joined_members(),
613 local_member_info->get_gcs_member_id());
614
615 bool skip_election = false;
616 enum_primary_election_mode election_mode = DEAD_OLD_PRIMARY;
617 std::string suggested_primary("");
618 // Was member expelled from the group due to network failures?
619 if (this->was_member_expelled_from_group(new_view)) {
620 DBUG_ASSERT(is_leaving);
621 group_events_observation_manager->after_view_change(
622 new_view.get_joined_members(), new_view.get_leaving_members(),
623 new_view.get_members(), is_leaving, &skip_election, &election_mode,
624 suggested_primary);
625 goto end;
626 }
627
628 // An early error on the applier can render the join invalid
629 if (is_joining &&
630 local_member_info->get_recovery_status() ==
631 Group_member_info::MEMBER_ERROR &&
632 !autorejoin_module->is_autorejoin_ongoing()) {
633 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_EXIT_PLUGIN_ERROR);
634 gcs_module->notify_of_view_change_cancellation(
635 GROUP_REPLICATION_CONFIGURATION_ERROR);
636 } else {
637 /*
638 This code is present on on_view_changed and on_suspicions as no assumption
639 can be made about the order in which these methods are invoked.
640 */
641 if (!is_leaving && group_partition_handler->is_member_on_partition()) {
642 if (group_partition_handler->abort_partition_handler_if_running()) {
643 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_CHANGE_GRP_MEM_NOT_PROCESSED);
644 goto end;
645 } else {
646 /* If it was not running or we canceled it in time */
647 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_CONTACT_RESTORED);
648 }
649 }
650
651 /*
652 Maybe on_suspicions we already executed the above block but it was too
653 late. No point in repeating the message, but we need to break the view
654 install.
655 */
656 if (!is_leaving &&
657 group_partition_handler->is_partition_handling_terminated())
658 goto end;
659
660 if (!is_leaving && new_view.get_leaving_members().size() > 0)
661 log_members_leaving_message(new_view);
662
663 // update the Group Manager with all the received states
664 if (update_group_info_manager(new_view, exchanged_data, is_joining,
665 is_leaving) &&
666 is_joining) {
667 gcs_module->notify_of_view_change_cancellation();
668 return;
669 }
670
671 if (!is_joining && new_view.get_joined_members().size() > 0)
672 log_members_joining_message(new_view);
673
674 // enable conflict detection if someone on group have it enabled
675 if (local_member_info->in_primary_mode() &&
676 group_member_mgr->is_conflict_detection_enabled()) {
677 Certifier_interface *certifier =
678 this->applier_module->get_certification_handler()->get_certifier();
679 certifier->enable_conflict_detection();
680 }
681
682 // Inform any interested handler that the view changed
683 View_change_pipeline_action *vc_action =
684 new View_change_pipeline_action(is_leaving);
685
686 applier_module->handle_pipeline_action(vc_action);
687 delete vc_action;
688
689 // Update any recovery running process and handle state changes
690 this->handle_leaving_members(new_view, is_joining, is_leaving);
691
692 // Handle joining members
693 this->handle_joining_members(new_view, is_joining, is_leaving);
694
695 if (is_leaving) gcs_module->leave_coordination_member_left();
696
697 // Signal that the injected view was delivered
698 if (gcs_module->is_injected_view_modification())
699 gcs_module->notify_of_view_change_end();
700
701 group_events_observation_manager->after_view_change(
702 new_view.get_joined_members(), new_view.get_leaving_members(),
703 new_view.get_members(), is_leaving, &skip_election, &election_mode,
704 suggested_primary);
705
706 // Handle leader election if needed
707 if (!skip_election && !is_leaving) {
708 this->handle_leader_election_if_needed(election_mode, suggested_primary);
709 }
710 }
711
712 if (!is_leaving) {
713 std::string view_id_representation = "";
714 Gcs_view *view = gcs_module->get_current_view();
715 if (view != nullptr) {
716 view_id_representation = view->get_view_id().get_representation();
717 delete view;
718 }
719 disable_read_mode_for_compatible_members();
720 LogPluginErr(
721 SYSTEM_LEVEL, ER_GRP_RPL_MEMBER_CHANGE,
722 group_member_mgr->get_string_current_view_active_hosts().c_str(),
723 view_id_representation.c_str());
724 } else {
725 LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_MEMBER_LEFT_GRP);
726 }
727
728 end:
729 /* if I am the primary and I am leaving, notify about role change */
730 if (is_leaving && is_primary) {
731 group_member_mgr->update_member_role(
732 local_member_info->get_uuid(), Group_member_info::MEMBER_ROLE_SECONDARY,
733 m_notification_ctx);
734 }
735
736 /* flag view change */
737 m_notification_ctx.set_view_changed();
738 if (is_leaving)
739 /*
740 The leave view is an optimistic and local view.
741 Therefore its ID is not meaningful, since it is not
742 a global one.
743 */
744 m_notification_ctx.set_view_id("");
745 else
746 m_notification_ctx.set_view_id(new_view.get_view_id().get_representation());
747
748 /* trigger notification */
749 notify_and_reset_ctx(m_notification_ctx);
750 }
751
was_member_expelled_from_group(const Gcs_view & view) const752 bool Plugin_gcs_events_handler::was_member_expelled_from_group(
753 const Gcs_view &view) const {
754 DBUG_TRACE;
755 bool result = false;
756
757 if (view.get_error_code() == Gcs_view::MEMBER_EXPELLED) {
758 result = true;
759 const char *exit_state_action_abort_log_message =
760 "Member was expelled from the group due to network failures.";
761 leave_group_on_failure::mask leave_actions;
762 leave_actions.set(leave_group_on_failure::ALREADY_LEFT_GROUP, true);
763 leave_actions.set(leave_group_on_failure::CLEAN_GROUP_MEMBERSHIP, true);
764 leave_actions.set(leave_group_on_failure::STOP_APPLIER, true);
765 leave_actions.set(leave_group_on_failure::HANDLE_EXIT_STATE_ACTION, true);
766 leave_actions.set(leave_group_on_failure::HANDLE_AUTO_REJOIN, true);
767 leave_group_on_failure::leave(leave_actions, ER_GRP_RPL_MEMBER_EXPELLED,
768 PSESSION_INIT_THREAD, &m_notification_ctx,
769 exit_state_action_abort_log_message);
770 }
771
772 return result;
773 }
774
handle_leader_election_if_needed(enum_primary_election_mode election_mode,std::string & suggested_primary) const775 void Plugin_gcs_events_handler::handle_leader_election_if_needed(
776 enum_primary_election_mode election_mode,
777 std::string &suggested_primary) const {
778 /*
779 Can we get here when a change to multi master is cancelled and is being
780 undone? Yes but only on situations where the action was killed or the member
781 is stopping that will always result in a plugin restart.
782 */
783 if (election_mode == DEAD_OLD_PRIMARY &&
784 !local_member_info->in_primary_mode())
785 return;
786
787 primary_election_handler->execute_primary_election(
788 suggested_primary, election_mode, &m_notification_ctx);
789 }
790
update_group_info_manager(const Gcs_view & new_view,const Exchanged_data & exchanged_data,bool is_joining,bool is_leaving) const791 int Plugin_gcs_events_handler::update_group_info_manager(
792 const Gcs_view &new_view, const Exchanged_data &exchanged_data,
793 bool is_joining, bool is_leaving) const {
794 int error = 0;
795
796 // update the Group Manager with all the received states
797 vector<Group_member_info *> to_update;
798
799 if (!is_leaving) {
800 // Process local state of exchanged data.
801 if ((error = process_local_exchanged_data(exchanged_data, is_joining)))
802 goto err;
803
804 to_update.insert(to_update.end(), temporary_states->begin(),
805 temporary_states->end());
806
807 // Clean-up members that are leaving
808 vector<Gcs_member_identifier> leaving = new_view.get_leaving_members();
809 vector<Gcs_member_identifier>::iterator left_it;
810 vector<Group_member_info *>::iterator to_update_it;
811 for (left_it = leaving.begin(); left_it != leaving.end(); left_it++) {
812 for (to_update_it = to_update.begin(); to_update_it != to_update.end();
813 to_update_it++) {
814 if ((*left_it) == (*to_update_it)->get_gcs_member_id()) {
815 /* purecov: begin inspected */
816 delete (*to_update_it);
817 to_update.erase(to_update_it);
818 break;
819 /* purecov: end */
820 }
821 }
822 }
823 }
824 group_member_mgr->update(&to_update);
825 temporary_states->clear();
826
827 err:
828 DBUG_ASSERT(temporary_states->size() == 0);
829 return error;
830 }
831
handle_joining_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const832 void Plugin_gcs_events_handler::handle_joining_members(const Gcs_view &new_view,
833 bool is_joining,
834 bool is_leaving) const {
835 // nothing to do here
836 size_t number_of_members = new_view.get_members().size();
837 if (number_of_members == 0 || is_leaving) {
838 return;
839 }
840 size_t number_of_joining_members = new_view.get_joined_members().size();
841 size_t number_of_leaving_members = new_view.get_leaving_members().size();
842
843 /*
844 If we are joining, 3 scenarios exist:
845 1) We are incompatible with the group so we leave
846 2) We are alone so we declare ourselves online
847 3) We are in a group and recovery must happen
848 */
849 if (is_joining) {
850 int error = 0;
851 if ((error = check_group_compatibility(number_of_members))) {
852 gcs_module->notify_of_view_change_cancellation(error);
853 return;
854 }
855 gcs_module->notify_of_view_change_end();
856
857 /**
858 On the joining list there can be 2 types of members: online/recovering
859 members coming from old views where this member was not present and new
860 joining members that still have their status as offline.
861
862 As so, for offline members, their state is changed to member_in_recovery
863 after member compatibility with group is checked.
864 */
865 update_member_status(
866 new_view.get_joined_members(), Group_member_info::MEMBER_IN_RECOVERY,
867 Group_member_info::MEMBER_OFFLINE, Group_member_info::MEMBER_END);
868
869 /** Is an election running while I'm joining?*/
870 primary_election_handler->set_election_running(
871 is_group_running_a_primary_election());
872
873 /**
874 Set the read mode if not set during start (auto-start)
875 */
876 if (enable_server_read_mode(PSESSION_DEDICATED_THREAD)) {
877 /*
878 The notification will be triggered in the top level handle function
879 that calls this one. In this case, the on_view_changed handle.
880 */
881 leave_group_on_failure::mask leave_actions;
882 leave_actions.set(leave_group_on_failure::SKIP_SET_READ_ONLY, true);
883 leave_actions.set(leave_group_on_failure::SKIP_LEAVE_VIEW_WAIT, true);
884 leave_group_on_failure::leave(
885 leave_actions, ER_GRP_RPL_SUPER_READ_ONLY_ACTIVATE_ERROR,
886 PSESSION_DEDICATED_THREAD, &m_notification_ctx, "");
887 set_plugin_is_setting_read_mode(false);
888
889 return;
890 } else {
891 set_plugin_is_setting_read_mode(false);
892 }
893
894 /**
895 On the joining member log an error when group contains more members than
896 auto_increment_increment variable.
897 */
898 ulong auto_increment_increment = get_auto_increment_increment();
899
900 if (!local_member_info->in_primary_mode() &&
901 new_view.get_members().size() > auto_increment_increment) {
902 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_EXCEEDS_AUTO_INC_VALUE,
903 new_view.get_members().size(), auto_increment_increment);
904 }
905
906 /*
907 During the view change, a suspension packet is sent to the applier module
908 so all posterior transactions inbound are not applied, but queued, until
909 the member finishes recovery.
910 */
911 applier_module->add_suspension_packet();
912
913 /*
914 Marking the view in the joiner since the incoming event from the donor
915 is discarded in the Recovery process.
916 */
917
918 std::string view_id = new_view.get_view_id().get_representation();
919 View_change_packet *view_change_packet = new View_change_packet(view_id);
920 applier_module->add_view_change_packet(view_change_packet);
921
922 /*
923 Chose what is the strategy for recovery.
924 Note that even if clone is chosen, if an error occurs on its launch,
925 incremental recovery is again selected as the default choice.
926 */
927 Remote_clone_handler::enum_clone_check_result recovery_strategy =
928 Remote_clone_handler::DO_RECOVERY;
929
930 // The check is not needed if the member is alone
931 if (number_of_members > 1)
932 recovery_strategy = remote_clone_handler->check_clone_preconditions();
933
934 if (Remote_clone_handler::DO_CLONE == recovery_strategy) {
935 LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_RECOVERY_STRAT_CHOICE,
936 "Cloning from a remote group donor.");
937 /*
938 Launch the clone process. It will configure SSL options and the list
939 of allowed donors.
940 When terminated, the clone process will restart the server.
941 The whole start join process is still done as an error on cloning can
942 mean we fall back to incremental recovery.
943 */
944 if (remote_clone_handler->clone_server(
945 new_view.get_group_id().get_group_id(),
946 new_view.get_view_id().get_representation())) {
947 /* purecov: begin inspected */
948 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_RECOVERY_STRAT_FALLBACK,
949 "Incremental Recovery.");
950 recovery_strategy = Remote_clone_handler::DO_RECOVERY;
951 /* purecov: end */
952 }
953 }
954
955 if (Remote_clone_handler::DO_RECOVERY == recovery_strategy) {
956 LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_RECOVERY_STRAT_CHOICE,
957 "Incremental recovery from a group donor");
958 /*
959 Launch the recovery thread so we can receive missing data and the
960 certification information needed to apply the transactions queued after
961 this view change.
962
963 Recovery receives a view id, as a means to identify logically on joiners
964 and donors alike where this view change happened in the data. With that
965 info we can then ask for the donor to give the member all the data until
966 this point in the data, and the certification information for all the
967 data that comes next.
968
969 When alone, the server will go through Recovery to wait for the
970 consumption of his applier relay log that may contain transactions from
971 previous executions.
972 */
973 recovery_module->start_recovery(
974 new_view.get_group_id().get_group_id(),
975 new_view.get_view_id().get_representation());
976 } else if (Remote_clone_handler::CHECK_ERROR == recovery_strategy ||
977 Remote_clone_handler::NO_RECOVERY_POSSIBLE ==
978 recovery_strategy) {
979 if (Remote_clone_handler::NO_RECOVERY_POSSIBLE == recovery_strategy)
980 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NO_POSSIBLE_RECOVERY);
981 else {
982 /* purecov: begin inspected */
983 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_RECOVERY_EVAL_ERROR, "");
984 /* purecov: end */
985 }
986
987 /*
988 The notification will be triggered in the top level handle function
989 that calls this one. In this case, the on_view_changed handle.
990 */
991 leave_group_on_failure::mask leave_actions;
992 leave_actions.set(leave_group_on_failure::SKIP_LEAVE_VIEW_WAIT, true);
993 leave_group_on_failure::leave(leave_actions, 0, PSESSION_DEDICATED_THREAD,
994 &m_notification_ctx, "");
995 return;
996 }
997 }
998 /*
999 The condition
1000 number_of_joining_members == 0 && number_of_leaving_members == 0
1001 is needed due to the following scenario:
1002 We have a group with 2 members, one does crash (M2), and the group
1003 blocks with M1 ONLINE and M2 UNREACHABLE.
1004 Then M2 rejoins and the group unblocks.
1005 When M2 rejoins the group, from M2 perspective it is joining
1006 the group, that is, it does receive a view (V3) on which it is
1007 marked as a joining member.
1008 But from M1 perspective, M2 may never left, so the view delivered
1009 (V3) has the same members as V2, that is, M1 and M2, without joining
1010 members, thence we need to consider that condition and log that view.
1011 */
1012 else if (number_of_joining_members > 0 ||
1013 (number_of_joining_members == 0 && number_of_leaving_members == 0)) {
1014 /**
1015 On the joining list there can be 2 types of members: online/recovering
1016 members coming from old views where this member was not present and new
1017 joining members that still have their status as offline.
1018
1019 As so, for offline members, their state is changed to member_in_recovery.
1020 */
1021 update_member_status(
1022 new_view.get_joined_members(), Group_member_info::MEMBER_IN_RECOVERY,
1023 Group_member_info::MEMBER_OFFLINE, Group_member_info::MEMBER_END);
1024 /**
1025 If not a joining member, all members should record on their own binlogs a
1026 marking event that identifies the frontier between the data the joining
1027 member was to receive and the data it should queue.
1028 The joining member can then wait for this event to know it was all the
1029 needed data.
1030
1031 This packet will also pass in the certification process at this exact
1032 frontier giving us the opportunity to gather the necessary certification
1033 information to certify the transactions that will come after this view
1034 change. If selected as a donor, this info will also be sent to the joiner.
1035
1036 Associated to this process, we collect and intersect the executed GTID sets
1037 of all ONLINE members so we can cut the certification info to gather and
1038 transmit to the minimum.
1039 */
1040
1041 std::string view_id = new_view.get_view_id().get_representation();
1042 View_change_packet *view_change_packet = new View_change_packet(view_id);
1043 collect_members_executed_sets(view_change_packet);
1044 applier_module->add_view_change_packet(view_change_packet);
1045
1046 if (group_action_coordinator->is_group_action_running()) {
1047 LogPluginErr(WARNING_LEVEL,
1048 ER_GRP_RPL_JOINER_EXIT_WHEN_GROUP_ACTION_RUNNING);
1049 }
1050 }
1051 }
1052
handle_leaving_members(const Gcs_view & new_view,bool is_joining,bool is_leaving) const1053 void Plugin_gcs_events_handler::handle_leaving_members(const Gcs_view &new_view,
1054 bool is_joining,
1055 bool is_leaving) const {
1056 Group_member_info::Group_member_status member_status =
1057 local_member_info->get_recovery_status();
1058
1059 bool members_left = (new_view.get_leaving_members().size() > 0);
1060
1061 // if the member is joining or not in recovery, no need to update the process
1062 if (!is_joining && member_status == Group_member_info::MEMBER_IN_RECOVERY) {
1063 /*
1064 This method has 2 purposes:
1065 If a donor leaves, recovery needs to switch donor
1066 If this member leaves, recovery needs to shutdown.
1067 */
1068 recovery_module->update_recovery_process(members_left, is_leaving);
1069 }
1070
1071 if (members_left) {
1072 update_member_status(
1073 new_view.get_leaving_members(), Group_member_info::MEMBER_OFFLINE,
1074 Group_member_info::MEMBER_END, Group_member_info::MEMBER_ERROR);
1075
1076 if (!is_leaving) {
1077 Leaving_members_action_packet *leaving_members_action =
1078 new Leaving_members_action_packet(new_view.get_leaving_members());
1079 this->applier_module->add_leaving_members_action_packet(
1080 leaving_members_action);
1081 }
1082 }
1083
1084 if (is_leaving) {
1085 gcs_module->notify_of_view_change_end();
1086 }
1087 }
1088
is_member_on_vector(const vector<Gcs_member_identifier> & members,const Gcs_member_identifier & member_id) const1089 bool Plugin_gcs_events_handler::is_member_on_vector(
1090 const vector<Gcs_member_identifier> &members,
1091 const Gcs_member_identifier &member_id) const {
1092 vector<Gcs_member_identifier>::const_iterator it;
1093
1094 it = std::find(members.begin(), members.end(), member_id);
1095
1096 return it != members.end();
1097 }
1098
process_local_exchanged_data(const Exchanged_data & exchanged_data,bool is_joining) const1099 int Plugin_gcs_events_handler::process_local_exchanged_data(
1100 const Exchanged_data &exchanged_data, bool is_joining) const {
1101 uint local_uuid_found = 0;
1102
1103 /*
1104 For now, we are only carrying Group Member Info on Exchangeable data
1105 Since we are receiving the state from all Group members, one shall
1106 store it in a set to ensure that we don't have repetitions.
1107
1108 All collected data will be given to Group Member Manager at view install
1109 time.
1110 */
1111 for (Exchanged_data::const_iterator exchanged_data_it =
1112 exchanged_data.begin();
1113 exchanged_data_it != exchanged_data.end(); exchanged_data_it++) {
1114 const uchar *data = exchanged_data_it->second->get_payload();
1115 size_t length = exchanged_data_it->second->get_payload_length();
1116 Gcs_member_identifier *member_id = exchanged_data_it->first;
1117 if (data == nullptr) {
1118 /* purecov: begin inspected */
1119 Group_member_info *member_info =
1120 group_member_mgr->get_group_member_info_by_member_id(*member_id);
1121 if (member_info != nullptr) {
1122 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_DATA_NOT_PROVIDED_BY_MEM,
1123 member_info->get_hostname().c_str(),
1124 member_info->get_port());
1125 }
1126 continue;
1127 /* purecov: end */
1128 }
1129
1130 // Process data provided by member.
1131 vector<Group_member_info *> *member_infos =
1132 group_member_mgr->decode(data, length);
1133
1134 // This construct is here in order to deallocate memory of duplicates
1135 vector<Group_member_info *>::iterator member_infos_it;
1136 for (member_infos_it = member_infos->begin();
1137 member_infos_it != member_infos->end(); member_infos_it++) {
1138 if (local_member_info->get_uuid() == (*member_infos_it)->get_uuid()) {
1139 local_uuid_found++;
1140 }
1141
1142 /*
1143 Accept only the information the member has about himself
1144 Information received about other members is probably outdated
1145 */
1146 if (local_uuid_found < 2 &&
1147 (*member_infos_it)->get_gcs_member_id() == *member_id) {
1148 this->temporary_states->insert((*member_infos_it));
1149 } else {
1150 delete (*member_infos_it); /* purecov: inspected */
1151 }
1152 }
1153
1154 member_infos->clear();
1155 delete member_infos;
1156
1157 if (local_uuid_found > 1) {
1158 if (is_joining) {
1159 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_ALREADY_EXISTS,
1160 local_member_info->get_uuid().c_str());
1161 }
1162
1163 // Clean up temporary states.
1164 std::set<Group_member_info *,
1165 Group_member_info_pointer_comparator>::iterator
1166 temporary_states_it;
1167 for (temporary_states_it = temporary_states->begin();
1168 temporary_states_it != temporary_states->end();
1169 temporary_states_it++) {
1170 delete (*temporary_states_it);
1171 }
1172 temporary_states->clear();
1173
1174 return 1;
1175 }
1176 }
1177
1178 return 0;
1179 }
1180
get_exchangeable_data() const1181 Gcs_message_data *Plugin_gcs_events_handler::get_exchangeable_data() const {
1182 std::string server_executed_gtids;
1183 std::string server_purged_gtids;
1184 std::string applier_retrieved_gtids;
1185 Replication_thread_api applier_channel("group_replication_applier");
1186
1187 Sql_service_command_interface *sql_command_interface =
1188 new Sql_service_command_interface();
1189
1190 if (sql_command_interface->establish_session_connection(
1191 PSESSION_DEDICATED_THREAD, GROUPREPL_USER, get_plugin_pointer())) {
1192 /* purecov: begin inspected */
1193 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_GRP_CHANGE_INFO_EXTRACT_ERROR);
1194 goto sending;
1195 /* purecov: end */
1196 }
1197
1198 if (sql_command_interface->get_server_gtid_executed(server_executed_gtids)) {
1199 /* purecov: begin inspected */
1200 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_GTID_EXECUTED_EXTRACT_ERROR);
1201 goto sending;
1202 /* purecov: inspected */
1203 }
1204 if (sql_command_interface->get_server_gtid_purged(server_purged_gtids)) {
1205 /* purecov: begin inspected */
1206 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_GTID_PURGED_EXTRACT_ERROR);
1207 goto sending;
1208 /* purecov: end */
1209 }
1210 if (applier_channel.get_retrieved_gtid_set(applier_retrieved_gtids)) {
1211 LogPluginErr(WARNING_LEVEL,
1212 ER_GRP_RPL_GTID_SET_EXTRACT_ERROR); /* purecov: inspected */
1213 }
1214
1215 group_member_mgr->update_gtid_sets(local_member_info->get_uuid(),
1216 server_executed_gtids, server_purged_gtids,
1217 applier_retrieved_gtids);
1218 sending:
1219
1220 delete sql_command_interface;
1221
1222 std::vector<uchar> data;
1223
1224 /*
1225 When a member is auto-rejoining, it starts in the ERROR state. Normally
1226 the member would only change to the RECOVERY state when it was OFFLINE,
1227 but we want the member to be in ERROR state when an auto-rejoin occurs,
1228 so we force the change from ERROR to RECOVERY when the member is
1229 undergoing an auto-rejoin procedure.
1230 We do that change on the data exchange just before the view install, so
1231 that when all members do receive the view on which this member joins
1232 all do see the correct RECOVERY state.
1233 */
1234 if (autorejoin_module->is_autorejoin_ongoing() &&
1235 !get_error_state_due_to_error_during_autorejoin()) {
1236 group_member_mgr->update_member_status(
1237 local_member_info->get_uuid(), Group_member_info::MEMBER_IN_RECOVERY,
1238 m_notification_ctx);
1239 }
1240
1241 // alert joiners that an action or election is running
1242 local_member_info->set_is_group_action_running(
1243 group_action_coordinator->is_group_action_running());
1244 local_member_info->set_is_primary_election_running(
1245 primary_election_handler->is_an_election_running());
1246 Group_member_info *local_member_copy =
1247 new Group_member_info(*local_member_info);
1248 Group_member_info_manager_message *group_info_message =
1249 new Group_member_info_manager_message(local_member_copy);
1250 group_info_message->encode(&data);
1251 delete group_info_message;
1252
1253 Gcs_message_data *msg_data = new Gcs_message_data(0, data.size());
1254 msg_data->append_to_payload(&data.front(), data.size());
1255
1256 return msg_data;
1257 }
1258
update_member_status(const vector<Gcs_member_identifier> & members,Group_member_info::Group_member_status status,Group_member_info::Group_member_status old_status_equal_to,Group_member_info::Group_member_status old_status_different_from) const1259 void Plugin_gcs_events_handler::update_member_status(
1260 const vector<Gcs_member_identifier> &members,
1261 Group_member_info::Group_member_status status,
1262 Group_member_info::Group_member_status old_status_equal_to,
1263 Group_member_info::Group_member_status old_status_different_from) const {
1264 for (vector<Gcs_member_identifier>::const_iterator it = members.begin();
1265 it != members.end(); ++it) {
1266 Gcs_member_identifier member = *it;
1267 Group_member_info *member_info =
1268 group_member_mgr->get_group_member_info_by_member_id(member);
1269
1270 if (member_info == nullptr) {
1271 // Trying to update a non-existing member
1272 continue;
1273 }
1274
1275 // if (the old_status_equal_to is not defined or
1276 // the previous status is equal to old_status_equal_to)
1277 // and
1278 // (the old_status_different_from is not defined or
1279 // the previous status is different from old_status_different_from)
1280 if ((old_status_equal_to == Group_member_info::MEMBER_END ||
1281 member_info->get_recovery_status() == old_status_equal_to) &&
1282 (old_status_different_from == Group_member_info::MEMBER_END ||
1283 member_info->get_recovery_status() != old_status_different_from)) {
1284 /*
1285 The notification will be handled on the top level handle
1286 function that calls this one down the stack.
1287 */
1288 group_member_mgr->update_member_status(member_info->get_uuid(), status,
1289 m_notification_ctx);
1290 }
1291 }
1292 }
1293
1294 /**
1295 Here we check:
1296 1) If the number of members was exceeded
1297 2) If member version is compatible with the group
1298 3) If the gtid_assignment_block_size is equal to the group
1299 4) If the hash algorithm used is equal to the group
1300 5) If the member has more known transactions than the group
1301 6) If the member has the same configuration flags that the group has
1302 */
check_group_compatibility(size_t number_of_members) const1303 int Plugin_gcs_events_handler::check_group_compatibility(
1304 size_t number_of_members) const {
1305 /*
1306 Check if group size did reach the maximum number of members.
1307 */
1308 #ifndef DBUG_OFF
1309 if (set_number_of_members_on_view_changed_to_10) number_of_members = 10;
1310 #endif
1311 if (number_of_members > 9) {
1312 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_START_FAILED);
1313 return GROUP_REPLICATION_MAX_GROUP_SIZE;
1314 }
1315
1316 /*
1317 Check if the member is compatible with the group.
1318 It can be incompatible because its major version is lower or a rule says it.
1319 If incompatible notify whoever is waiting for the view with an error, so
1320 the plugin exits the group.
1321 */
1322 *joiner_compatibility_status = COMPATIBLE;
1323 int group_data_compatibility = 0;
1324 if (number_of_members > 1) {
1325 *joiner_compatibility_status = check_version_compatibility_with_group();
1326 group_data_compatibility = compare_member_transaction_sets();
1327 }
1328
1329 if (*joiner_compatibility_status == INCOMPATIBLE) {
1330 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_VER_INCOMPATIBLE);
1331 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1332 }
1333 if (*joiner_compatibility_status == READ_COMPATIBLE) {
1334 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_MEMBER_VER_READ_COMPATIBLE);
1335 }
1336
1337 /*
1338 All group members must have the same gtid_assignment_block_size
1339 and transaction-write-set-extraction value, if joiner has a
1340 different value it is not allowed to join.
1341 */
1342 if (number_of_members > 1 && compare_member_option_compatibility()) {
1343 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1344 }
1345
1346 /*
1347 Check that the joiner doesn't has more GTIDs than the rest of the group.
1348 All the executed and received transactions in the group are collected and
1349 merged into a GTID set and all joiner transactions must be contained in it.
1350 */
1351 if (group_data_compatibility) {
1352 if (group_data_compatibility > 0) {
1353 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_TRANS_NOT_PRESENT_IN_GRP);
1354 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1355 } else // error
1356 {
1357 /* purecov: begin inspected */
1358 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_TRANS_GREATER_THAN_GRP);
1359 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1360 /* purecov: end */
1361 }
1362 }
1363
1364 if (is_group_running_a_configuration_change()) {
1365 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_JOIN_WHEN_GROUP_ACTION_RUNNING);
1366 return GROUP_REPLICATION_CONFIGURATION_ERROR;
1367 }
1368
1369 return 0;
1370 }
1371
1372 Compatibility_type
check_version_compatibility_with_group() const1373 Plugin_gcs_events_handler::check_version_compatibility_with_group() const {
1374 bool override_lower_incompatibility = false;
1375 Compatibility_type compatibility_type = COMPATIBLE;
1376 bool read_compatible = false;
1377
1378 std::vector<Group_member_info *> *all_members =
1379 group_member_mgr->get_all_members();
1380 std::vector<Group_member_info *>::iterator all_members_it;
1381
1382 Member_version lowest_version(0xFFFFFF);
1383 std::set<Member_version> unique_version_set;
1384 /* Find lowest member version and unique versions of the group for
1385 * comparison. */
1386 for (all_members_it = all_members->begin();
1387 all_members_it != all_members->end(); all_members_it++) {
1388 /* Skip self */
1389 if ((*all_members_it)->get_uuid() != local_member_info->get_uuid()) {
1390 if ((*all_members_it)->get_member_version() < lowest_version)
1391 lowest_version = (*all_members_it)->get_member_version();
1392 unique_version_set.insert((*all_members_it)->get_member_version());
1393 }
1394 }
1395 for (auto it = unique_version_set.begin();
1396 it != unique_version_set.end() && compatibility_type != INCOMPATIBLE;
1397 ++it) {
1398 Member_version ver(*it);
1399 compatibility_type = compatibility_manager->check_local_incompatibility(
1400 ver, (ver == lowest_version));
1401
1402 if (compatibility_type == READ_COMPATIBLE) {
1403 read_compatible = true;
1404 }
1405
1406 if (compatibility_type == INCOMPATIBLE_LOWER_VERSION) {
1407 if (get_allow_local_lower_version_join()) {
1408 /*
1409 Despite between these two members the compatibility type
1410 is INCOMPATIBLE_LOWER_VERSION, when compared with others
1411 group members this server may be INCOMPATIBLE, so we need
1412 to test with all group members.
1413 */
1414 override_lower_incompatibility = true;
1415 compatibility_type = COMPATIBLE;
1416 } else {
1417 compatibility_type = INCOMPATIBLE;
1418 }
1419 }
1420 }
1421
1422 if (compatibility_type != INCOMPATIBLE && override_lower_incompatibility) {
1423 LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_MEMBER_VERSION_LOWER_THAN_GRP);
1424 }
1425
1426 if (read_compatible && compatibility_type != INCOMPATIBLE) {
1427 compatibility_type = READ_COMPATIBLE;
1428 }
1429
1430 // clean the members
1431 for (all_members_it = all_members->begin();
1432 all_members_it != all_members->end(); all_members_it++) {
1433 delete (*all_members_it);
1434 }
1435 delete all_members;
1436
1437 return compatibility_type;
1438 }
1439
compare_member_transaction_sets() const1440 int Plugin_gcs_events_handler::compare_member_transaction_sets() const {
1441 int result = 0;
1442
1443 Sid_map local_sid_map(nullptr);
1444 Sid_map group_sid_map(nullptr);
1445 Gtid_set local_member_set(&local_sid_map, nullptr);
1446 Gtid_set group_set(&group_sid_map, nullptr);
1447
1448 std::vector<Group_member_info *> *all_members =
1449 group_member_mgr->get_all_members();
1450 std::vector<Group_member_info *>::iterator all_members_it;
1451 for (all_members_it = all_members->begin();
1452 all_members_it != all_members->end(); all_members_it++) {
1453 std::string member_exec_set_str = (*all_members_it)->get_gtid_executed();
1454 std::string applier_ret_set_str = (*all_members_it)->get_gtid_retrieved();
1455 if ((*all_members_it)->get_gcs_member_id() ==
1456 local_member_info->get_gcs_member_id()) {
1457 if (local_member_set.add_gtid_text(member_exec_set_str.c_str()) !=
1458 RETURN_STATUS_OK ||
1459 local_member_set.add_gtid_text(applier_ret_set_str.c_str()) !=
1460 RETURN_STATUS_OK) {
1461 /* purecov: begin inspected */
1462 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_LOCAL_GTID_SETS_PROCESS_ERROR);
1463 result = -1;
1464 goto cleaning;
1465 /* purecov: end */
1466 }
1467 } else {
1468 if (group_set.add_gtid_text(member_exec_set_str.c_str()) !=
1469 RETURN_STATUS_OK ||
1470 group_set.add_gtid_text(applier_ret_set_str.c_str()) !=
1471 RETURN_STATUS_OK) {
1472 /* purecov: begin inspected */
1473 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_LOCAL_GTID_SETS_PROCESS_ERROR);
1474 result = -1;
1475 goto cleaning;
1476 /* purecov: end */
1477 }
1478 }
1479 }
1480
1481 /*
1482 Here we only error out if the joiner set is bigger, i.e, if they are equal
1483 no error is returned.
1484 One could argue that if a joiner has the same transaction set as the group
1485 then something is wrong as the group also has transaction associated to
1486 previous view changes.
1487 To reject this cases cause however false negatives when members leave and
1488 quickly rejoin the group or when groups are started by add several nodes at
1489 once.
1490 */
1491 if (!local_member_set.is_subset(&group_set)) {
1492 char *local_gtid_set_buf;
1493 local_member_set.to_string(&local_gtid_set_buf);
1494 char *group_gtid_set_buf;
1495 group_set.to_string(&group_gtid_set_buf);
1496 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_TRANS_GREATER_THAN_GRP,
1497 local_gtid_set_buf, group_gtid_set_buf);
1498 my_free(local_gtid_set_buf);
1499 my_free(group_gtid_set_buf);
1500 result = 1;
1501 }
1502
1503 cleaning:
1504
1505 // clean the members
1506 for (all_members_it = all_members->begin();
1507 all_members_it != all_members->end(); all_members_it++) {
1508 delete (*all_members_it);
1509 }
1510 delete all_members;
1511
1512 return result;
1513 }
1514
collect_members_executed_sets(View_change_packet * view_packet) const1515 void Plugin_gcs_events_handler::collect_members_executed_sets(
1516 View_change_packet *view_packet) const {
1517 std::vector<Group_member_info *> *all_members =
1518 group_member_mgr->get_all_members();
1519 std::vector<Group_member_info *>::iterator all_members_it;
1520 for (all_members_it = all_members->begin();
1521 all_members_it != all_members->end(); all_members_it++) {
1522 // Joining/Recovering members don't have valid GTID executed information
1523 if ((*all_members_it)->get_recovery_status() ==
1524 Group_member_info::MEMBER_IN_RECOVERY) {
1525 continue;
1526 }
1527
1528 std::string exec_set_str = (*all_members_it)->get_gtid_executed();
1529 view_packet->group_executed_set.push_back(exec_set_str);
1530 }
1531
1532 // clean the members
1533 for (all_members_it = all_members->begin();
1534 all_members_it != all_members->end(); all_members_it++) {
1535 delete (*all_members_it);
1536 }
1537 delete all_members;
1538 }
1539
compare_member_option_compatibility() const1540 int Plugin_gcs_events_handler::compare_member_option_compatibility() const {
1541 int result = 0;
1542
1543 std::vector<Group_member_info *> *all_members =
1544 group_member_mgr->get_all_members();
1545 std::vector<Group_member_info *>::iterator all_members_it;
1546 for (all_members_it = all_members->begin();
1547 all_members_it != all_members->end(); all_members_it++) {
1548 if (local_member_info->get_gtid_assignment_block_size() !=
1549 (*all_members_it)->get_gtid_assignment_block_size()) {
1550 result = 1;
1551 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_BLOCK_SIZE_DIFF_FROM_GRP,
1552 local_member_info->get_gtid_assignment_block_size(),
1553 (*all_members_it)->get_gtid_assignment_block_size());
1554 goto cleaning;
1555 }
1556
1557 if (local_member_info->get_write_set_extraction_algorithm() !=
1558 (*all_members_it)->get_write_set_extraction_algorithm()) {
1559 result = 1;
1560 LogPluginErr(
1561 ERROR_LEVEL, ER_GRP_RPL_TRANS_WRITE_SET_EXTRACT_DIFF_FROM_GRP,
1562 get_write_set_algorithm_string(
1563 local_member_info->get_write_set_extraction_algorithm()),
1564 get_write_set_algorithm_string(
1565 (*all_members_it)->get_write_set_extraction_algorithm()));
1566 goto cleaning;
1567 }
1568
1569 if (local_member_info->get_configuration_flags() !=
1570 (*all_members_it)->get_configuration_flags()) {
1571 const uint32 member_configuration_flags =
1572 (*all_members_it)->get_configuration_flags();
1573 const uint32 local_configuration_flags =
1574 local_member_info->get_configuration_flags();
1575
1576 result = 1;
1577 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_MEMBER_CFG_INCOMPATIBLE_WITH_GRP_CFG,
1578 Group_member_info::get_configuration_flags_string(
1579 local_configuration_flags)
1580 .c_str(),
1581 Group_member_info::get_configuration_flags_string(
1582 member_configuration_flags)
1583 .c_str());
1584 goto cleaning;
1585 }
1586
1587 if ((*all_members_it)->get_lower_case_table_names() !=
1588 DEFAULT_NOT_RECEIVED_LOWER_CASE_TABLE_NAMES &&
1589 local_member_info->get_lower_case_table_names() !=
1590 (*all_members_it)->get_lower_case_table_names()) {
1591 result = 1;
1592 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_LOWER_CASE_TABLE_NAMES_DIFF_FROM_GRP,
1593 local_member_info->get_lower_case_table_names(),
1594 (*all_members_it)->get_lower_case_table_names());
1595 goto cleaning;
1596 }
1597
1598 if (local_member_info->get_default_table_encryption() !=
1599 (*all_members_it)->get_default_table_encryption()) {
1600 result = 1;
1601 LogPluginErr(ERROR_LEVEL,
1602 ER_GRP_RPL_DEFAULT_TABLE_ENCRYPTION_DIFF_FROM_GRP,
1603 local_member_info->get_default_table_encryption(),
1604 (*all_members_it)->get_default_table_encryption());
1605 goto cleaning;
1606 }
1607 }
1608
1609 cleaning:
1610 for (all_members_it = all_members->begin();
1611 all_members_it != all_members->end(); all_members_it++)
1612 delete (*all_members_it);
1613 delete all_members;
1614
1615 return result;
1616 }
1617
is_group_running_a_configuration_change() const1618 bool Plugin_gcs_events_handler::is_group_running_a_configuration_change()
1619 const {
1620 bool is_action_running = false;
1621 std::vector<Group_member_info *> *all_members =
1622 group_member_mgr->get_all_members();
1623 for (Group_member_info *member_info : *all_members) {
1624 if (member_info->is_group_action_running()) {
1625 is_action_running = true;
1626 break;
1627 }
1628 }
1629 for (Group_member_info *member_info : *all_members) delete member_info;
1630 delete all_members;
1631
1632 return is_action_running;
1633 }
1634
is_group_running_a_primary_election() const1635 bool Plugin_gcs_events_handler::is_group_running_a_primary_election() const {
1636 bool is_election_running = false;
1637 std::vector<Group_member_info *> *all_members =
1638 group_member_mgr->get_all_members();
1639 for (Group_member_info *member_info : *all_members) {
1640 if (member_info->is_primary_election_running()) {
1641 is_election_running = true;
1642 break;
1643 }
1644 }
1645 for (Group_member_info *member_info : *all_members) delete member_info;
1646 delete all_members;
1647
1648 return is_election_running;
1649 }
1650
disable_read_mode_for_compatible_members(bool force_check) const1651 void Plugin_gcs_events_handler::disable_read_mode_for_compatible_members(
1652 bool force_check) const {
1653 Member_version lowest_version =
1654 group_member_mgr->get_group_lowest_online_version();
1655 /* We need to lock the operations of group_member_mgr so that member does not
1656 * changes it state to ERROR and enables read only mode after we check its
1657 * state here. If we read old ONLINE value and continue to disable read mode,
1658 * member will continue to be writable even in ERROR state. So lock protects
1659 * from this situation. */
1660 MUTEX_LOCK(lock, group_member_mgr->get_update_lock());
1661 if (local_member_info->get_recovery_status() ==
1662 Group_member_info::MEMBER_ONLINE &&
1663 (force_check || *joiner_compatibility_status != COMPATIBLE)) {
1664 *joiner_compatibility_status =
1665 Compatibility_module::check_version_incompatibility(
1666 local_member_info->get_member_version(), lowest_version);
1667 /* Some lower version left the group, now this member is new lowest
1668 * version. */
1669 if ((!local_member_info->in_primary_mode() &&
1670 *joiner_compatibility_status == COMPATIBLE) ||
1671 (local_member_info->in_primary_mode() &&
1672 local_member_info->get_role() ==
1673 Group_member_info::MEMBER_ROLE_PRIMARY)) {
1674 if (disable_server_read_mode(PSESSION_DEDICATED_THREAD)) {
1675 LogPluginErr(WARNING_LEVEL,
1676 ER_GRP_RPL_DISABLE_SRV_READ_MODE_RESTRICTED);
1677 }
1678 }
1679 }
1680 }
1681