1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include <sstream>
24 #include <mysql/service_rpl_transaction_write_set.h>
25
26 #include "observer_server_actions.h"
27 #include "observer_server_state.h"
28 #include "observer_trans.h"
29 #include "plugin.h"
30 #include "plugin_log.h"
31 #include "pipeline_stats.h"
32
33 using std::string;
34
35 /* Plugin generic fields */
36
37 static MYSQL_PLUGIN plugin_info_ptr= NULL;
38 unsigned int plugin_version= 0;
39
40 //The plugin running flag and lock
41 static mysql_mutex_t plugin_running_mutex;
42 int32 group_replication_running= 0;
43 int32 group_replication_stopping= 0;
44 bool wait_on_engine_initialization= false;
45 bool server_shutdown_status= false;
46 bool plugin_is_auto_starting= false;
47 bool plugin_is_waiting_to_set_server_read_mode= false;
48 bool plugin_is_being_uninstalled= false;
49
50 /* Plugin modules */
51 //The plugin applier
52 Applier_module *applier_module= NULL;
53 //The plugin recovery module
54 Recovery_module *recovery_module= NULL;
55 //The plugin group communication module
56 Gcs_operations *gcs_module= NULL;
57 //The channel observation module
58 Channel_observation_manager *channel_observation_manager= NULL;
59 //The Single primary channel observation module
60 Asynchronous_channels_state_observer
61 *asynchronous_channels_state_observer= NULL;
62 //Lock to check if the plugin is running or not.
63 Checkable_rwlock *plugin_stop_lock;
64 //Class to coordinate access to the plugin stop lock
65 Shared_writelock *shared_plugin_stop_lock;
66 //Initialization thread for server starts
67 Delayed_initialization_thread *delayed_initialization_thread= NULL;
68 //The transaction handler for network partitions
69 Group_partition_handling *group_partition_handler= NULL;
70 //The handler for transaction killing when an error or partition happens
71 Blocked_transaction_handler *blocked_transaction_handler= NULL;
72
73 /* Group communication options */
74 char *local_address_var= NULL;
75 char *group_seeds_var= NULL;
76 char *force_members_var= NULL;
77 bool force_members_running= false;
78 static mysql_mutex_t force_members_running_mutex;
79 my_bool bootstrap_group_var= false;
80 ulong poll_spin_loops_var= 0;
81 ulong ssl_mode_var= 0;
82
83 const char* ssl_mode_values[]= {
84 "DISABLED",
85 "REQUIRED",
86 "VERIFY_CA",
87 "VERIFY_IDENTITY",
88 (char*)0
89 };
90
91 static const char *bool_type_allowed_values[]= {
92 "OFF",
93 "ON",
94 (const char*)0
95 };
96
97 static TYPELIB plugin_bool_typelib= {
98 sizeof(bool_type_allowed_values) / sizeof(*bool_type_allowed_values) - 1, // names count
99 "", // type name
100 bool_type_allowed_values, // value names
101 NULL // count
102 };
103
104 #define IP_WHITELIST_STR_BUFFER_LENGTH 1024
105 char *ip_whitelist_var= NULL;
106 const char *IP_WHITELIST_DEFAULT= "AUTOMATIC";
107
108 //The plugin auto increment handler
109 Plugin_group_replication_auto_increment *auto_increment_handler= NULL;
110 Plugin_gcs_events_handler* events_handler= NULL;
111 Plugin_gcs_view_modification_notifier* view_change_notifier= NULL;
112
113 /* Group management information */
114 Group_member_info_manager_interface *group_member_mgr= NULL;
115 Group_member_info* local_member_info= NULL;
116
117 /*Compatibility management*/
118 Compatibility_module* compatibility_mgr= NULL;
119
120 /* Plugin group related options */
121 const char *group_replication_plugin_name= "group_replication";
122 char *group_name_var= NULL;
123 my_bool start_group_replication_at_boot_var= true;
124 rpl_sidno group_sidno;
125 my_bool single_primary_mode_var= FALSE;
126 my_bool enforce_update_everywhere_checks_var= TRUE;
127
128 /* Applier module related */
129 bool known_server_reset;
130
131 //Recovery ssl options
132
133 // Option map entries that map the different SSL options to integer
134 static const int RECOVERY_SSL_CA_OPT= 1;
135 static const int RECOVERY_SSL_CAPATH_OPT= 2;
136 static const int RECOVERY_SSL_CERT_OPT= 3;
137 static const int RECOVERY_SSL_CIPHER_OPT= 4;
138 static const int RECOVERY_SSL_KEY_OPT= 5;
139 static const int RECOVERY_SSL_CRL_OPT= 6;
140 static const int RECOVERY_SSL_CRLPATH_OPT= 7;
141 //The option map <SSL var_name, SSL var code>
142 std::map<const char*, int> recovery_ssl_opt_map;
143
144 // SSL options
145 my_bool recovery_use_ssl_var= false;
146 char* recovery_ssl_ca_var= NULL;
147 char* recovery_ssl_capath_var= NULL;
148 char* recovery_ssl_cert_var= NULL;
149 char* recovery_ssl_cipher_var= NULL;
150 char* recovery_ssl_key_var= NULL;
151 char* recovery_ssl_crl_var= NULL;
152 char* recovery_ssl_crlpath_var= NULL;
153 my_bool recovery_ssl_verify_server_cert_var= false;
154 ulong recovery_completion_policy_var;
155
156 ulong recovery_retry_count_var= 0;
157 ulong recovery_reconnect_interval_var= 0;
158
159 /* Write set extraction algorithm*/
160 int write_set_extraction_algorithm= HASH_ALGORITHM_OFF;
161
162 /* Lower case table name */
163 uint gr_lower_case_table_names= 0;
164
165 /* Generic components variables */
166 ulong components_stop_timeout_var= LONG_TIMEOUT;
167
168 /* The timeout before going to error when majority becomes unreachable */
169 ulong timeout_on_unreachable_var= 0;
170
171 /*
172 Exit state action that is executed when a server involuntarily leaves the
173 group.
174 */
175 ulong exit_state_action_var;
176
177 /**
178 The default value for auto_increment_increment is choosen taking into
179 account the maximum usable values for each possible auto_increment_increment
180 and what is a normal group expected size.
181 */
182 #define DEFAULT_AUTO_INCREMENT_INCREMENT 7
183 #define MIN_AUTO_INCREMENT_INCREMENT 1
184 #define MAX_AUTO_INCREMENT_INCREMENT 65535
185 ulong auto_increment_increment_var= DEFAULT_AUTO_INCREMENT_INCREMENT;
186
187 /* compression options */
188 #define DEFAULT_COMPRESSION_THRESHOLD 1000000
189 #define MAX_COMPRESSION_THRESHOLD UINT_MAX32
190 #define MIN_COMPRESSION_THRESHOLD 0
191 ulong compression_threshold_var= DEFAULT_COMPRESSION_THRESHOLD;
192
193 /* GTID assignment block size options */
194 #define DEFAULT_GTID_ASSIGNMENT_BLOCK_SIZE 1000000
195 #define MIN_GTID_ASSIGNMENT_BLOCK_SIZE 1
196 #define MAX_GTID_ASSIGNMENT_BLOCK_SIZE GNO_END
197 ulonglong gtid_assignment_block_size_var= DEFAULT_GTID_ASSIGNMENT_BLOCK_SIZE;
198
199 /* Flow control options */
200 ulong flow_control_mode_var= FCM_QUOTA;
201 #define DEFAULT_FLOW_CONTROL_THRESHOLD 25000
202 #define MAX_FLOW_CONTROL_THRESHOLD INT_MAX32
203 #define MIN_FLOW_CONTROL_THRESHOLD 0
204 int flow_control_certifier_threshold_var= DEFAULT_FLOW_CONTROL_THRESHOLD;
205 int flow_control_applier_threshold_var= DEFAULT_FLOW_CONTROL_THRESHOLD;
206
207 /* Transaction size limits */
208 #define DEFAULT_TRANSACTION_SIZE_LIMIT 0
209 #define MAX_TRANSACTION_SIZE_LIMIT 2147483647
210 #define MIN_TRANSACTION_SIZE_LIMIT 0
211 ulong transaction_size_limit_base_var= DEFAULT_TRANSACTION_SIZE_LIMIT;
212 int64 transaction_size_limit_var;
213
214 /* Member Weight limits */
215 #define DEFAULT_MEMBER_WEIGHT 50
216 #define MAX_MEMBER_WEIGHT 100
217 #define MIN_MEMBER_WEIGHT 0
218 uint member_weight_var= DEFAULT_MEMBER_WEIGHT;
219
220 /* Downgrade options */
221 char allow_local_lower_version_join_var= 0;
222
223 /* Allow errand transactions */
224 char allow_local_disjoint_gtids_join_var= 0;
225
226 /* Certification latch */
227 Wait_ticket<my_thread_id> *certification_latch;
228
229 /*
230 Internal auxiliary functions signatures.
231 */
232 static int check_group_name_string(const char *str,
233 bool is_var_update= false);
234
235 static int check_recovery_ssl_string(const char *str, const char *var_name,
236 bool is_var_update= false);
237
238 static int check_if_server_properly_configured();
239
240 static bool init_group_sidno();
241
242 static void initialize_ssl_option_map();
243
244 int configure_group_communication(st_server_ssl_variables *ssl_variables);
245 int configure_group_member_manager(char *hostname, char *uuid,
246 uint port, unsigned int server_version);
247 bool check_async_channel_running_on_secondary();
248 int configure_compatibility_manager();
249 int initialize_recovery_module();
250 int configure_and_start_applier_module();
251 void initialize_asynchronous_channels_observer();
252 void initialize_group_partition_handler();
253 int start_group_communication();
254 int leave_group();
255 int terminate_plugin_modules(bool flag_stop_async_channel= false);
256 int terminate_applier_module();
257 int terminate_recovery_module();
258 void terminate_asynchronous_channels_observer();
259
260 /*
261 Auxiliary public functions.
262 */
get_plugin_pointer()263 void *get_plugin_pointer()
264 {
265 return plugin_info_ptr;
266 }
267
get_plugin_running_lock()268 mysql_mutex_t* get_plugin_running_lock()
269 {
270 return &plugin_running_mutex;
271 }
272
plugin_is_group_replication_running()273 bool plugin_is_group_replication_running()
274 {
275 return my_atomic_load32(&group_replication_running);
276 }
277
get_plugin_is_stopping()278 bool get_plugin_is_stopping()
279 {
280 return my_atomic_load32(&group_replication_stopping);
281 }
282
plugin_group_replication_set_retrieved_certification_info(void * info)283 int plugin_group_replication_set_retrieved_certification_info(void* info)
284 {
285 return recovery_module->set_retrieved_cert_info(info);
286 }
287
log_message(enum plugin_log_level level,const char * format,...)288 int log_message(enum plugin_log_level level, const char *format, ...)
289 {
290 va_list args;
291 char buff[1024];
292
293 va_start(args, format);
294 my_vsnprintf(buff, sizeof(buff), format, args);
295 va_end(args);
296 return my_plugin_log_message(&plugin_info_ptr, level, buff);
297 }
298
option_deprecation_warning(MYSQL_THD thd,const char * name)299 static void option_deprecation_warning(MYSQL_THD thd, const char* name)
300 {
301 if (thd != NULL)
302 {
303 push_warning_printf(thd, Sql_condition::SL_WARNING,
304 ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT,
305 ER_THD(thd, ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT),
306 name);
307 }
308 log_message(MY_WARNING_LEVEL,
309 ER_DEFAULT(ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT),
310 name);
311 }
312
313 /*
314 Plugin interface.
315 */
316 struct st_mysql_group_replication group_replication_descriptor=
317 {
318 MYSQL_GROUP_REPLICATION_INTERFACE_VERSION,
319 plugin_group_replication_start,
320 plugin_group_replication_stop,
321 plugin_is_group_replication_running,
322 plugin_group_replication_set_retrieved_certification_info,
323 plugin_get_connection_status,
324 plugin_get_group_members,
325 plugin_get_group_member_stats,
326 plugin_get_group_members_number,
327 };
328
329 bool
plugin_get_connection_status(const GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS & callbacks)330 plugin_get_connection_status(
331 const GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS& callbacks)
332 {
333 char* channel_name= applier_module_channel_name;
334
335 return get_connection_status(callbacks, group_name_var, channel_name,
336 plugin_is_group_replication_running());
337 }
338
339 bool
plugin_get_group_members(uint index,const GROUP_REPLICATION_GROUP_MEMBERS_CALLBACKS & callbacks)340 plugin_get_group_members(
341 uint index, const GROUP_REPLICATION_GROUP_MEMBERS_CALLBACKS& callbacks)
342 {
343 char* channel_name= applier_module_channel_name;
344
345 return get_group_members_info(index, callbacks, group_member_mgr,
346 group_name_var, channel_name);
347 }
348
plugin_get_group_members_number()349 uint plugin_get_group_members_number()
350 {
351 return group_member_mgr == NULL? 1 :
352 (uint)group_member_mgr
353 ->get_number_of_members();
354 }
355
356 bool
plugin_get_group_member_stats(const GROUP_REPLICATION_GROUP_MEMBER_STATS_CALLBACKS & callbacks)357 plugin_get_group_member_stats(
358 const GROUP_REPLICATION_GROUP_MEMBER_STATS_CALLBACKS& callbacks)
359 {
360 char* channel_name= applier_module_channel_name;
361
362 return get_group_member_stats(callbacks, group_member_mgr, applier_module,
363 gcs_module, group_name_var, channel_name);
364 }
365
plugin_group_replication_start()366 int plugin_group_replication_start()
367 {
368 DBUG_ENTER("plugin_group_replication_start");
369
370 Mutex_autolock auto_lock_mutex(&plugin_running_mutex);
371
372 DBUG_EXECUTE_IF("group_replication_wait_on_start",
373 {
374 const char act[]= "now signal signal.start_waiting wait_for signal.start_continue";
375 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
376 });
377
378 if (plugin_is_group_replication_running())
379 DBUG_RETURN(GROUP_REPLICATION_ALREADY_RUNNING);
380 if (check_if_server_properly_configured())
381 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
382 if (check_group_name_string(group_name_var))
383 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
384 if (check_recovery_ssl_string(recovery_ssl_ca_var, "ssl_ca") ||
385 check_recovery_ssl_string(recovery_ssl_capath_var, "ssl_capath") ||
386 check_recovery_ssl_string(recovery_ssl_cert_var, "ssl_cert_pointer") ||
387 check_recovery_ssl_string(recovery_ssl_cipher_var,
388 "ssl_cipher_pointer") ||
389 check_recovery_ssl_string(recovery_ssl_key_var, "ssl_key_pointer") ||
390 check_recovery_ssl_string(recovery_ssl_crl_var, "ssl_crl_pointer") ||
391 check_recovery_ssl_string(recovery_ssl_crlpath_var,
392 "ssl_crlpath_pointer"))
393 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
394 if (!start_group_replication_at_boot_var &&
395 !server_engine_initialized())
396 {
397 log_message(MY_ERROR_LEVEL,
398 "Unable to start Group Replication. Replication applier "
399 "infrastructure is not initialized since the server was "
400 "started with server_id=0. Please, restart the server "
401 "with server_id larger than 0.");
402 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
403 }
404 if (force_members_var != NULL &&
405 strlen(force_members_var) > 0)
406 {
407 log_message(MY_ERROR_LEVEL,
408 "group_replication_force_members must be empty "
409 "on group start. Current value: '%s'",
410 force_members_var);
411 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
412 }
413 if (init_group_sidno())
414 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR); /* purecov: inspected */
415
416 if (allow_local_disjoint_gtids_join_var)
417 {
418 option_deprecation_warning(current_thd,
419 "group_replication_allow_local_disjoint_gtids_join");
420 }
421
422 /*
423 Instantiate certification latch.
424 */
425 certification_latch= new Wait_ticket<my_thread_id>();
426
427 // GR delayed initialization.
428 if (!server_engine_initialized())
429 {
430 wait_on_engine_initialization= true;
431 plugin_is_auto_starting= false;
432
433 delayed_initialization_thread= new Delayed_initialization_thread();
434 if (delayed_initialization_thread->launch_initialization_thread())
435 {
436 /* purecov: begin inspected */
437 log_message(MY_ERROR_LEVEL,
438 "It was not possible to guarantee the initialization of plugin"
439 " structures on server start");
440 delete delayed_initialization_thread;
441 delayed_initialization_thread= NULL;
442 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
443 /* purecov: end */
444 }
445
446 DBUG_RETURN(0); //leave the decision for later
447 }
448
449 DBUG_RETURN(initialize_plugin_and_join(PSESSION_DEDICATED_THREAD,
450 NULL));
451 }
452
initialize_plugin_and_join(enum_plugin_con_isolation sql_api_isolation,Delayed_initialization_thread * delayed_init_thd)453 int initialize_plugin_and_join(enum_plugin_con_isolation sql_api_isolation,
454 Delayed_initialization_thread *delayed_init_thd)
455 {
456 DBUG_ENTER("initialize_plugin_and_join");
457
458 int error= 0;
459
460 //Avoid unnecessary operations
461 bool enabled_super_read_only= false;
462 bool read_only_mode= false, super_read_only_mode=false;
463 bool write_set_limits_set = false;
464
465 st_server_ssl_variables server_ssl_variables=
466 {false,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL};
467
468 char *hostname, *uuid;
469 uint port;
470 unsigned int server_version;
471
472 Sql_service_command_interface *sql_command_interface=
473 new Sql_service_command_interface();
474
475 // GCS interface.
476 if ((error= gcs_module->initialize()))
477 goto err; /* purecov: inspected */
478
479 // Setup SQL service interface.
480 if (sql_command_interface->
481 establish_session_connection(sql_api_isolation, plugin_info_ptr))
482 {
483 error =1; /* purecov: inspected */
484 goto err; /* purecov: inspected */
485 }
486
487 if (sql_command_interface->set_interface_user(GROUPREPL_USER))
488 {
489 error =1; /* purecov: inspected */
490 goto err; /* purecov: inspected */
491 }
492
493 get_read_mode_state(sql_command_interface, &read_only_mode,
494 &super_read_only_mode);
495
496 /*
497 At this point in the code, set the super_read_only mode here on the
498 server to protect recovery and version module of the Group Replication.
499 This can only be done on START command though, on installs there are
500 deadlock issues.
501 */
502 if (!plugin_is_auto_starting &&
503 enable_super_read_only_mode(sql_command_interface))
504 {
505 /* purecov: begin inspected */
506 error =1;
507 log_message(MY_ERROR_LEVEL,
508 "Could not enable the server read only mode and guarantee a "
509 "safe recovery execution");
510 goto err;
511 /* purecov: end */
512 }
513 enabled_super_read_only= true;
514 if (delayed_init_thd)
515 delayed_init_thd->signal_read_mode_ready();
516
517 require_full_write_set(1);
518 set_write_set_memory_size_limit(get_transaction_size_limit());
519 write_set_limits_set = true;
520
521 get_server_parameters(&hostname, &port, &uuid, &server_version,
522 &server_ssl_variables);
523
524 // Setup GCS.
525 if ((error= configure_group_communication(&server_ssl_variables)))
526 {
527 log_message(MY_ERROR_LEVEL,
528 "Error on group communication engine initialization");
529 goto err;
530 }
531
532 // Setup Group Member Manager.
533 if ((error= configure_group_member_manager(hostname, uuid, port,
534 server_version)))
535 goto err; /* purecov: inspected */
536
537 if (check_async_channel_running_on_secondary())
538 {
539 error= 1;
540 log_message(MY_ERROR_LEVEL, "Can't start group replication on secondary"
541 " member with single primary-mode while"
542 " asynchronous replication channels are"
543 " running.");
544 goto err; /* purecov: inspected */
545 }
546
547 configure_compatibility_manager();
548 DBUG_EXECUTE_IF("group_replication_compatibility_rule_error_major",
549 {
550 Member_version other_version= plugin_version + (0x010000);
551 Member_version current_version= plugin_version;
552 compatibility_mgr->add_incompatibility(current_version,
553 other_version);
554 };);
555 DBUG_EXECUTE_IF("group_replication_compatibility_rule_error_minor",
556 {
557 Member_version other_version= plugin_version;
558 Member_version current_version= plugin_version + (0x000100);
559 compatibility_mgr->add_incompatibility(current_version,
560 other_version);
561 };);
562 DBUG_EXECUTE_IF("group_replication_compatibility_higher_minor_version",
563 {
564 Member_version higher_version= plugin_version + (0x000100);
565 compatibility_mgr->set_local_version(higher_version);
566 };);
567 DBUG_EXECUTE_IF("group_replication_compatibility_higher_major_version",
568 {
569 Member_version higher_version= plugin_version + (0x010000);
570 compatibility_mgr->set_local_version(higher_version);
571 };);
572 DBUG_EXECUTE_IF("group_replication_compatibility_restore_version",
573 {
574 Member_version current_version= plugin_version;
575 compatibility_mgr->set_local_version(current_version);
576 };);
577
578 // need to be initialized before applier, is called on kill_pending_transactions
579 blocked_transaction_handler= new Blocked_transaction_handler();
580
581 if ((error= initialize_recovery_module()))
582 goto err; /* purecov: inspected */
583
584 //we can only start the applier if the log has been initialized
585 if (configure_and_start_applier_module())
586 {
587 error= GROUP_REPLICATION_REPLICATION_APPLIER_INIT_ERROR;
588 goto err;
589 }
590
591 initialize_asynchronous_channels_observer();
592 initialize_group_partition_handler();
593
594 DBUG_EXECUTE_IF("group_replication_before_joining_the_group",
595 {
596 const char act[]= "now wait_for signal.continue_group_join";
597 assert(!debug_sync_set_action(current_thd,
598 STRING_WITH_LEN(act)));
599 });
600
601 if ((error= start_group_communication()))
602 {
603 log_message(MY_ERROR_LEVEL,
604 "Error on group communication engine start");
605 goto err;
606 }
607
608 if (view_change_notifier->wait_for_view_modification())
609 {
610 if (!view_change_notifier->is_cancelled())
611 {
612 //Only log a error when a view modification was not cancelled.
613 log_message(MY_ERROR_LEVEL,
614 "Timeout on wait for view after joining group");
615 }
616 error= view_change_notifier->get_error();
617 goto err;
618 }
619 my_atomic_store32(&group_replication_running, 1);
620 my_atomic_store32(&group_replication_stopping, 0);
621 log_primary_member_details();
622
623 err:
624 if (error)
625 {
626 //Unblock the possible stuck delayed thread
627 if (delayed_init_thd)
628 delayed_init_thd->signal_read_mode_ready();
629 leave_group();
630 terminate_plugin_modules();
631
632 if (write_set_limits_set) {
633 // Remove server constraints on write set collection
634 update_write_set_memory_size_limit(0);
635 require_full_write_set(0);
636 }
637
638 if (!server_shutdown_status && server_engine_initialized()
639 && enabled_super_read_only)
640 {
641 set_read_mode_state(sql_command_interface, read_only_mode,
642 super_read_only_mode);
643 }
644
645 if (certification_latch != NULL)
646 {
647 delete certification_latch; /* purecov: inspected */
648 certification_latch= NULL; /* purecov: inspected */
649 }
650 }
651
652 delete sql_command_interface;
653 plugin_is_auto_starting= false;
654
655 DBUG_RETURN(error);
656 }
657
configure_group_member_manager(char * hostname,char * uuid,uint port,unsigned int server_version)658 int configure_group_member_manager(char *hostname, char *uuid,
659 uint port, unsigned int server_version)
660 {
661 DBUG_ENTER("configure_group_member_manager");
662
663 /*
664 Ensure that group communication interfaces are initialized
665 and ready to use, since plugin can leave the group on errors
666 but continue to be active.
667 */
668 std::string gcs_local_member_identifier;
669 if (gcs_module->get_local_member_identifier(gcs_local_member_identifier))
670 {
671 /* purecov: begin inspected */
672 log_message(MY_ERROR_LEVEL, "Error calling group communication interfaces");
673 DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR);
674 /* purecov: end */
675 }
676
677 if (!strcmp(uuid, group_name_var))
678 {
679 log_message(MY_ERROR_LEVEL,
680 "Member server_uuid is incompatible with the group. "
681 "Server_uuid %s matches group_name %s.", uuid, group_name_var);
682 DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
683 }
684 //Configure Group Member Manager
685 plugin_version= server_version;
686
687 uint32 local_version= plugin_version;
688 DBUG_EXECUTE_IF("group_replication_compatibility_higher_patch_version",
689 {
690 local_version= plugin_version + (0x000001);
691 };);
692 DBUG_EXECUTE_IF("group_replication_compatibility_higher_minor_version",
693 {
694 local_version= plugin_version + (0x000100);
695 };);
696 DBUG_EXECUTE_IF("group_replication_compatibility_higher_major_version",
697 {
698 local_version= plugin_version + (0x010000);
699 };);
700 Member_version local_member_plugin_version(local_version);
701
702 DBUG_EXECUTE_IF("group_replication_force_member_uuid",
703 {
704 uuid= const_cast<char*>("cccccccc-cccc-cccc-cccc-cccccccccccc");
705 };);
706 delete local_member_info;
707 local_member_info= new Group_member_info(hostname,
708 port,
709 uuid,
710 write_set_extraction_algorithm,
711 gcs_local_member_identifier,
712 Group_member_info::MEMBER_OFFLINE,
713 local_member_plugin_version,
714 gtid_assignment_block_size_var,
715 Group_member_info::MEMBER_ROLE_SECONDARY,
716 single_primary_mode_var,
717 enforce_update_everywhere_checks_var,
718 member_weight_var,
719 gr_lower_case_table_names);
720
721 //Create the membership info visible for the group
722 delete group_member_mgr;
723 group_member_mgr= new Group_member_info_manager(local_member_info);
724
725 log_message(MY_INFORMATION_LEVEL,
726 "Member configuration: "
727 "member_id: %lu; "
728 "member_uuid: \"%s\"; "
729 "single-primary mode: \"%s\"; "
730 "group_replication_auto_increment_increment: %lu; ",
731 get_server_id(),
732 (local_member_info != NULL) ? local_member_info->get_uuid().c_str() : "NULL",
733 single_primary_mode_var ? "true" : "false",
734 auto_increment_increment_var);
735
736 DBUG_RETURN(0);
737 }
738
init_compatibility_manager()739 void init_compatibility_manager()
740 {
741 if (compatibility_mgr != NULL)
742 {
743 delete compatibility_mgr; /* purecov: inspected */
744 }
745
746 compatibility_mgr= new Compatibility_module();
747 }
748
749
configure_compatibility_manager()750 int configure_compatibility_manager()
751 {
752 Member_version local_member_version(plugin_version);
753 compatibility_mgr->set_local_version(local_member_version);
754
755 /*
756 If needed.. configure here static rules of incompatibility.
757
758 Example:
759 Member_version local_member_version(plugin_version);
760 Member_version remote_member_version(0x080001);
761 compatibility_mgr->add_incompatibility(local_member_version,
762 remote_member_version);
763
764 Member_version local_member_version(plugin_version);
765 Member_version remote_member_min_version(0x080000);
766 Member_version remote_member_max_version(0x080005);
767 compatibility_mgr->add_incompatibility(local_member_version,
768 remote_member_min_version,
769 remote_member_max_version);
770 */
771
772 return 0;
773 }
774
leave_group()775 int leave_group()
776 {
777 if (gcs_module->belongs_to_group())
778 {
779 view_change_notifier->start_view_modification();
780
781 Gcs_operations::enum_leave_state state= gcs_module->leave();
782
783 std::stringstream ss;
784 plugin_log_level log_severity= MY_WARNING_LEVEL;
785 switch (state)
786 {
787 case Gcs_operations::ERROR_WHEN_LEAVING:
788 /* purecov: begin inspected */
789 ss << "Unable to confirm whether the server has left the group or not. "
790 "Check performance_schema.replication_group_members to check group membership information.";
791 log_severity= MY_ERROR_LEVEL;
792 break;
793 /* purecov: end */
794 case Gcs_operations::ALREADY_LEAVING:
795 ss << "Skipping leave operation: concurrent attempt to leave the group is on-going.";
796 break;
797 case Gcs_operations::ALREADY_LEFT:
798 /* purecov: begin inspected */
799 ss << "Skipping leave operation: member already left the group.";
800 break;
801 /* purecov: end */
802 case Gcs_operations::NOW_LEAVING:
803 goto bypass_message;
804 }
805 log_message(log_severity, ss.str().c_str());
806 bypass_message:
807 //Wait anyway
808 log_message(MY_INFORMATION_LEVEL, "Going to wait for view modification");
809 if (view_change_notifier->wait_for_view_modification())
810 {
811 log_message(MY_WARNING_LEVEL,
812 "On shutdown there was a timeout receiving a view change. "
813 "This can lead to a possible inconsistent state. "
814 "Check the log for more details");
815 }
816 }
817 else
818 {
819 /*
820 Even when we do not belong to the group we invoke leave()
821 to prevent the following situation:
822 1) Server joins group;
823 2) Server leaves group before receiving the view on which
824 it joined the group.
825 If we do not leave preemptively, the server will only leave
826 the group when the communication layer failure detector
827 detects that it left.
828 */
829 log_message(MY_INFORMATION_LEVEL,
830 "Requesting to leave the group despite of not "
831 "being a member");
832 gcs_module->leave();
833 }
834
835 // Finalize GCS.
836 gcs_module->finalize();
837
838 if (auto_increment_handler != NULL)
839 {
840 auto_increment_handler->reset_auto_increment_variables();
841 }
842
843 // Destroy handlers and notifiers
844 delete events_handler;
845 events_handler= NULL;
846
847 return 0;
848 }
849
plugin_group_replication_stop()850 int plugin_group_replication_stop()
851 {
852 DBUG_ENTER("plugin_group_replication_stop");
853
854 Mutex_autolock auto_lock_mutex(&plugin_running_mutex);
855
856 DBUG_EXECUTE_IF("group_replication_wait_on_stop",
857 {
858 const char act[]= "now signal signal.stop_waiting wait_for signal.stop_continue";
859 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
860 });
861
862 /*
863 We delete the delayed initialization object here because:
864
865 1) It is invoked even if the plugin is stopped as failed starts may still
866 leave the class instantiated. This way, either the stop command or the
867 deinit process that calls this method will always clean this class
868
869 2) Its use is on before_handle_connection, meaning no stop command can be
870 made before that. This makes this delete safe under the plugin running
871 mutex.
872 */
873 if (delayed_initialization_thread != NULL)
874 {
875 wait_on_engine_initialization= false;
876 delayed_initialization_thread->signal_thread_ready();
877 delayed_initialization_thread->wait_for_thread_end();
878 delete delayed_initialization_thread;
879 delayed_initialization_thread= NULL;
880 }
881
882 if (!plugin_is_group_replication_running())
883 {
884 DBUG_RETURN(0);
885 }
886
887 my_atomic_store32(&group_replication_stopping, 1);
888
889 shared_plugin_stop_lock->grab_write_lock();
890 log_message(MY_INFORMATION_LEVEL,
891 "Plugin 'group_replication' is stopping.");
892
893 plugin_is_waiting_to_set_server_read_mode= true;
894
895 DBUG_EXECUTE_IF("group_replication_hold_stop_before_leave_the_group", {
896 const char act[] =
897 "now signal signal.stopping_before_leave_the_group "
898 "wait_for signal.resume_stop_before_leave_the_group";
899 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
900 });
901
902 // wait for all transactions waiting for certification
903 bool timeout=
904 certification_latch->block_until_empty(TRANSACTION_KILL_TIMEOUT);
905 if (timeout)
906 {
907 //if they are blocked, kill them
908 blocked_transaction_handler->unblock_waiting_transactions();
909 }
910
911 /* first leave all joined groups (currently one) */
912 leave_group();
913
914 int error= terminate_plugin_modules(true);
915
916 my_atomic_store32(&group_replication_running, 0);
917 shared_plugin_stop_lock->release_write_lock();
918 log_message(MY_INFORMATION_LEVEL,
919 "Plugin 'group_replication' has been stopped.");
920
921 // Enable super_read_only.
922 if (!server_shutdown_status &&
923 !plugin_is_being_uninstalled &&
924 server_engine_initialized())
925 {
926 if (enable_server_read_mode(PSESSION_DEDICATED_THREAD))
927 {
928 log_message(MY_ERROR_LEVEL,
929 "On plugin shutdown it was not possible to enable the "
930 "server read only mode. Local transactions will be accepted "
931 "and committed."); /* purecov: inspected */
932 }
933 plugin_is_waiting_to_set_server_read_mode= false;
934 }
935
936 // Remove server constraints on write set collection
937 update_write_set_memory_size_limit(0);
938 require_full_write_set(0);
939
940 DBUG_RETURN(error);
941 }
942
terminate_plugin_modules(bool flag_stop_async_channel)943 int terminate_plugin_modules(bool flag_stop_async_channel)
944 {
945
946 if(terminate_recovery_module())
947 {
948 //Do not throw an error since recovery is not vital, but warn either way
949 log_message(MY_WARNING_LEVEL,
950 "On shutdown there was a timeout on the Group Replication "
951 "recovery module termination. Check the log for more details"); /* purecov: inspected */
952 }
953
954 DBUG_EXECUTE_IF("group_replication_after_recovery_module_terminated",
955 {
956 const char act[]= "now wait_for signal.termination_continue";
957 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
958 });
959
960 /*
961 The applier is only shutdown after the communication layer to avoid
962 messages being delivered in the current view, but not applied
963 */
964 int error= 0;
965 if((error= terminate_applier_module()))
966 {
967 log_message(MY_ERROR_LEVEL,
968 "On shutdown there was a timeout on the Group Replication"
969 " applier termination.");
970 }
971
972 terminate_asynchronous_channels_observer();
973
974 if (flag_stop_async_channel)
975 {
976 int channel_err= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
977 components_stop_timeout_var);
978 if (channel_err)
979 {
980 log_message(MY_ERROR_LEVEL,
981 "Error stopping all replication channels while server was"
982 " leaving the group. Please check the error log for "
983 "additional details. Got error: %d", channel_err);
984 if (!error)
985 {
986 error= GROUP_REPLICATION_CONFIGURATION_ERROR;
987 }
988 }
989 }
990
991 delete group_partition_handler;
992 group_partition_handler= NULL;
993
994 delete blocked_transaction_handler;
995 blocked_transaction_handler= NULL;
996
997 /*
998 Destroy certification latch.
999 */
1000 if (certification_latch != NULL)
1001 {
1002 delete certification_latch;
1003 certification_latch= NULL;
1004 }
1005
1006 /*
1007 Clear server sessions opened caches on transactions observer.
1008 */
1009 observer_trans_clear_io_cache_unused_list();
1010
1011 if (group_member_mgr != NULL && local_member_info != NULL)
1012 {
1013 group_member_mgr->update_member_status(local_member_info->get_uuid(),
1014 Group_member_info::MEMBER_OFFLINE);
1015 }
1016
1017 return error;
1018 }
1019
plugin_group_replication_init(MYSQL_PLUGIN plugin_info)1020 int plugin_group_replication_init(MYSQL_PLUGIN plugin_info)
1021 {
1022 // Reset plugin local variables.
1023 my_atomic_store32(&group_replication_running, 0);
1024 my_atomic_store32(&group_replication_stopping, 0);
1025 plugin_is_being_uninstalled= false;
1026 plugin_is_waiting_to_set_server_read_mode= false;
1027
1028 // Register all PSI keys at the time plugin init
1029 #ifdef HAVE_PSI_INTERFACE
1030 register_all_group_replication_psi_keys();
1031 #endif /* HAVE_PSI_INTERFACE */
1032
1033 mysql_mutex_init(key_GR_LOCK_plugin_running, &plugin_running_mutex,
1034 MY_MUTEX_INIT_FAST);
1035 mysql_mutex_init(key_GR_LOCK_force_members_running,
1036 &force_members_running_mutex,
1037 MY_MUTEX_INIT_FAST);
1038
1039 plugin_stop_lock= new Checkable_rwlock(
1040 #ifdef HAVE_PSI_INTERFACE
1041 key_GR_RWLOCK_plugin_stop
1042 #endif /* HAVE_PSI_INTERFACE */
1043 );
1044
1045 shared_plugin_stop_lock= new Shared_writelock(plugin_stop_lock);
1046
1047 //Initialize transactions observer structures
1048 observer_trans_initialize();
1049
1050 plugin_info_ptr= plugin_info;
1051
1052 if (group_replication_init())
1053 {
1054 /* purecov: begin inspected */
1055 log_message(MY_ERROR_LEVEL,
1056 "Failure during Group Replication handler initialization");
1057 return 1;
1058 /* purecov: end */
1059 }
1060
1061 if(register_server_state_observer(&server_state_observer,
1062 (void *)plugin_info_ptr))
1063 {
1064 /* purecov: begin inspected */
1065 log_message(MY_ERROR_LEVEL,
1066 "Failure when registering the server state observers");
1067 return 1;
1068 /* purecov: end */
1069 }
1070
1071 if (register_trans_observer(&trans_observer, (void *)plugin_info_ptr))
1072 {
1073 /* purecov: begin inspected */
1074 log_message(MY_ERROR_LEVEL,
1075 "Failure when registering the transactions state observers");
1076 return 1;
1077 /* purecov: end */
1078 }
1079
1080 if (register_binlog_transmit_observer(&binlog_transmit_observer,
1081 (void *)plugin_info_ptr))
1082 {
1083 /* purecov: begin inspected */
1084 log_message(MY_ERROR_LEVEL,
1085 "Failure when registering the binlog state observers");
1086 return 1;
1087 /* purecov: end */
1088 }
1089
1090 //Initialize the recovery SSL option map
1091 initialize_ssl_option_map();
1092
1093 //Initialize channel observation and auto increment handlers before start
1094 auto_increment_handler= new Plugin_group_replication_auto_increment();
1095 channel_observation_manager= new Channel_observation_manager(plugin_info);
1096 view_change_notifier= new Plugin_gcs_view_modification_notifier();
1097 gcs_module= new Gcs_operations();
1098
1099 //Initialize the compatibility module before starting
1100 init_compatibility_manager();
1101
1102 // Set the atomic var to the value of the base plugin variable
1103 transaction_size_limit_var = transaction_size_limit_base_var;
1104
1105 plugin_is_auto_starting= start_group_replication_at_boot_var;
1106 if (start_group_replication_at_boot_var && plugin_group_replication_start())
1107 {
1108 log_message(MY_ERROR_LEVEL,
1109 "Unable to start Group Replication on boot");
1110 }
1111
1112 return 0;
1113 }
1114
plugin_group_replication_deinit(void * p)1115 int plugin_group_replication_deinit(void *p)
1116 {
1117 // If plugin was not initialized, there is nothing to do here.
1118 if (plugin_info_ptr == NULL)
1119 return 0;
1120
1121 plugin_is_being_uninstalled= true;
1122 my_atomic_store32(&group_replication_stopping, 1);
1123 int observer_unregister_error= 0;
1124
1125 if (plugin_group_replication_stop())
1126 log_message(MY_ERROR_LEVEL,
1127 "Failure when stopping Group Replication on plugin uninstall");
1128
1129 if (group_member_mgr != NULL)
1130 {
1131 delete group_member_mgr;
1132 group_member_mgr= NULL;
1133 }
1134
1135 if (local_member_info != NULL)
1136 {
1137 delete local_member_info;
1138 local_member_info= NULL;
1139 }
1140
1141 if (compatibility_mgr != NULL)
1142 {
1143 delete compatibility_mgr;
1144 compatibility_mgr= NULL;
1145 }
1146
1147 if (unregister_server_state_observer(&server_state_observer, p))
1148 {
1149 log_message(MY_ERROR_LEVEL,
1150 "Failure when unregistering the server state observers");
1151 observer_unregister_error++;
1152 }
1153
1154 if (unregister_trans_observer(&trans_observer, p))
1155 {
1156 log_message(MY_ERROR_LEVEL,
1157 "Failure when unregistering the transactions state observers");
1158 observer_unregister_error++;
1159 }
1160
1161 if (unregister_binlog_transmit_observer(&binlog_transmit_observer, p))
1162 {
1163 log_message(MY_ERROR_LEVEL,
1164 "Failure when unregistering the binlog state observers");
1165 observer_unregister_error++;
1166 }
1167
1168 if (observer_unregister_error == 0)
1169 log_message(MY_INFORMATION_LEVEL,
1170 "All Group Replication server observers"
1171 " have been successfully unregistered");
1172
1173 if (channel_observation_manager != NULL)
1174 {
1175 delete channel_observation_manager;
1176 channel_observation_manager= NULL;
1177 }
1178
1179 delete gcs_module;
1180 gcs_module= NULL;
1181
1182 delete view_change_notifier;
1183 view_change_notifier= NULL;
1184
1185 if(auto_increment_handler != NULL)
1186 {
1187 delete auto_increment_handler;
1188 auto_increment_handler= NULL;
1189 }
1190
1191 mysql_mutex_destroy(&plugin_running_mutex);
1192 mysql_mutex_destroy(&force_members_running_mutex);
1193
1194 delete shared_plugin_stop_lock;
1195 shared_plugin_stop_lock= NULL;
1196 delete plugin_stop_lock;
1197 plugin_stop_lock= NULL;
1198
1199 //Terminate transactions observer structures
1200 observer_trans_terminate();
1201
1202 plugin_info_ptr= NULL;
1203
1204 return observer_unregister_error;
1205 }
1206
init_group_sidno()1207 static bool init_group_sidno()
1208 {
1209 DBUG_ENTER("init_group_sidno");
1210 rpl_sid group_sid;
1211
1212 if (group_sid.parse(group_name_var) != RETURN_STATUS_OK)
1213 {
1214 /* purecov: begin inspected */
1215 log_message(MY_ERROR_LEVEL, "Unable to parse the group name.");
1216 DBUG_RETURN(true);
1217 /* purecov: end */
1218 }
1219
1220 group_sidno = get_sidno_from_global_sid_map(group_sid);
1221 if (group_sidno <= 0)
1222 {
1223 /* purecov: begin inspected */
1224 log_message(MY_ERROR_LEVEL, "Unable to generate the sidno for the group.");
1225 DBUG_RETURN(true);
1226 /* purecov: end */
1227 }
1228
1229 DBUG_RETURN(false);
1230 }
1231
configure_and_start_applier_module()1232 int configure_and_start_applier_module()
1233 {
1234 DBUG_ENTER("configure_and_start_applier_module");
1235
1236 int error= 0;
1237
1238 //The applier did not stop properly or suffered a configuration error
1239 if (applier_module != NULL)
1240 {
1241 if ((error= applier_module->is_running())) //it is still running?
1242 {
1243 log_message(MY_ERROR_LEVEL,
1244 "Cannot start the Group Replication applier as a previous "
1245 "shutdown is still running: "
1246 "The thread will stop once its task is complete.");
1247 DBUG_RETURN(error);
1248 }
1249 else
1250 {
1251 //clean a possible existent pipeline
1252 applier_module->terminate_applier_pipeline();
1253 //delete it and create from scratch
1254 delete applier_module;
1255 }
1256 }
1257
1258 applier_module= new Applier_module();
1259
1260 recovery_module->set_applier_module(applier_module);
1261
1262 //For now, only defined pipelines are accepted.
1263 error=
1264 applier_module->setup_applier_module(STANDARD_GROUP_REPLICATION_PIPELINE,
1265 known_server_reset,
1266 components_stop_timeout_var,
1267 group_sidno,
1268 gtid_assignment_block_size_var,
1269 shared_plugin_stop_lock);
1270 if (error)
1271 {
1272 //Delete the possible existing pipeline
1273 applier_module->terminate_applier_pipeline();
1274 delete applier_module;
1275 applier_module= NULL;
1276 DBUG_RETURN(error);
1277 }
1278
1279 known_server_reset= false;
1280
1281 if ((error= applier_module->initialize_applier_thread()))
1282 {
1283 log_message(MY_ERROR_LEVEL,
1284 "Unable to initialize the Group Replication applier module.");
1285 //terminate the applier_thread if running
1286 if (!applier_module->terminate_applier_thread())
1287 {
1288 delete applier_module;
1289 applier_module= NULL;
1290 }
1291 }
1292 else
1293 log_message(MY_INFORMATION_LEVEL,
1294 "Group Replication applier module successfully initialized!");
1295
1296 DBUG_RETURN(error);
1297 }
1298
initialize_group_partition_handler()1299 void initialize_group_partition_handler()
1300 {
1301 group_partition_handler=
1302 new Group_partition_handling(shared_plugin_stop_lock,
1303 timeout_on_unreachable_var);
1304 }
1305
terminate_applier_module()1306 int terminate_applier_module()
1307 {
1308
1309 int error= 0;
1310 if (applier_module != NULL)
1311 {
1312 if (!applier_module->terminate_applier_thread()) //all goes fine
1313 {
1314 delete applier_module;
1315 applier_module= NULL;
1316 }
1317 else
1318 {
1319 error= GROUP_REPLICATION_APPLIER_STOP_TIMEOUT;
1320 }
1321 }
1322 return error;
1323 }
1324
configure_group_communication(st_server_ssl_variables * ssl_variables)1325 int configure_group_communication(st_server_ssl_variables *ssl_variables)
1326 {
1327 DBUG_ENTER("configure_group_communication");
1328
1329 // GCS interface parameters.
1330 Gcs_interface_parameters gcs_module_parameters;
1331 gcs_module_parameters.add_parameter("group_name",
1332 std::string(group_name_var));
1333 if (local_address_var != NULL)
1334 gcs_module_parameters.add_parameter("local_node",
1335 std::string(local_address_var));
1336 if (group_seeds_var != NULL)
1337 gcs_module_parameters.add_parameter("peer_nodes",
1338 std::string(group_seeds_var));
1339 const std::string bootstrap_group_string=
1340 bootstrap_group_var ? "true" : "false";
1341 gcs_module_parameters.add_parameter("bootstrap_group", bootstrap_group_string);
1342 std::stringstream poll_spin_loops_stream_buffer;
1343 poll_spin_loops_stream_buffer << poll_spin_loops_var;
1344 gcs_module_parameters.add_parameter("poll_spin_loops",
1345 poll_spin_loops_stream_buffer.str());
1346
1347 // Compression parameter
1348 if (compression_threshold_var > 0)
1349 {
1350 std::stringstream ss;
1351 ss << compression_threshold_var;
1352 gcs_module_parameters.add_parameter("compression", std::string("on"));
1353 gcs_module_parameters.add_parameter("compression_threshold", ss.str());
1354 }
1355 else
1356 {
1357 gcs_module_parameters.add_parameter("compression", std::string("off")); /* purecov: inspected */
1358 }
1359
1360 // SSL parameters.
1361 std::string ssl_mode(ssl_mode_values[ssl_mode_var]);
1362 if (ssl_mode_var > 0)
1363 {
1364 std::string ssl_key(ssl_variables->ssl_key ? ssl_variables->ssl_key : "");
1365 std::string ssl_cert(ssl_variables->ssl_cert ? ssl_variables->ssl_cert : "");
1366 std::string ssl_ca(ssl_variables->ssl_ca ? ssl_variables->ssl_ca : "");
1367 std::string ssl_capath(ssl_variables->ssl_capath ? ssl_variables->ssl_capath : "");
1368 std::string ssl_cipher(ssl_variables->ssl_cipher ? ssl_variables->ssl_cipher : "");
1369 std::string ssl_crl(ssl_variables->ssl_crl ? ssl_variables->ssl_crl : "");
1370 std::string ssl_crlpath(ssl_variables->ssl_crlpath ? ssl_variables->ssl_crlpath : "");
1371 std::string tls_version(ssl_variables->tls_version? ssl_variables->tls_version : "");
1372
1373 // SSL support on server.
1374 if (ssl_variables->have_ssl_opt)
1375 {
1376 gcs_module_parameters.add_parameter("ssl_mode", ssl_mode);
1377 gcs_module_parameters.add_parameter("server_key_file", ssl_key);
1378 gcs_module_parameters.add_parameter("server_cert_file", ssl_cert);
1379 gcs_module_parameters.add_parameter("client_key_file", ssl_key);
1380 gcs_module_parameters.add_parameter("client_cert_file", ssl_cert);
1381 gcs_module_parameters.add_parameter("ca_file", ssl_ca);
1382 if (!ssl_capath.empty())
1383 gcs_module_parameters.add_parameter("ca_path", ssl_capath); /* purecov: inspected */
1384 gcs_module_parameters.add_parameter("cipher", ssl_cipher);
1385 gcs_module_parameters.add_parameter("tls_version", tls_version);
1386
1387 if (!ssl_crl.empty())
1388 gcs_module_parameters.add_parameter("crl_file", ssl_crl); /* purecov: inspected */
1389 if (!ssl_crlpath.empty())
1390 gcs_module_parameters.add_parameter("crl_path", ssl_crlpath); /* purecov: inspected */
1391
1392 log_message(MY_INFORMATION_LEVEL,
1393 "Group communication SSL configuration: "
1394 "group_replication_ssl_mode: \"%s\"; "
1395 "server_key_file: \"%s\"; "
1396 "server_cert_file: \"%s\"; "
1397 "client_key_file: \"%s\"; "
1398 "client_cert_file: \"%s\"; "
1399 "ca_file: \"%s\"; "
1400 "ca_path: \"%s\"; "
1401 "cipher: \"%s\"; "
1402 "tls_version: \"%s\"; "
1403 "crl_file: \"%s\"; "
1404 "crl_path: \"%s\"",
1405 ssl_mode.c_str(), ssl_key.c_str(), ssl_cert.c_str(),
1406 ssl_key.c_str(), ssl_cert.c_str(), ssl_ca.c_str(),
1407 ssl_capath.c_str(), ssl_cipher.c_str(), tls_version.c_str(),
1408 ssl_crl.c_str(), ssl_crlpath.c_str());
1409 }
1410 // No SSL support on server.
1411 else
1412 {
1413 /* purecov: begin inspected */
1414 log_message(MY_ERROR_LEVEL,
1415 "MySQL server does not have SSL support and "
1416 "group_replication_ssl_mode is \"%s\", START "
1417 "GROUP_REPLICATION will abort", ssl_mode.c_str());
1418 DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR);
1419 /* purecov: end */
1420 }
1421 }
1422 // GCS SSL disabled.
1423 else
1424 {
1425 gcs_module_parameters.add_parameter("ssl_mode", ssl_mode);
1426 log_message(MY_INFORMATION_LEVEL,
1427 "Group communication SSL configuration: "
1428 "group_replication_ssl_mode: \"%s\"", ssl_mode.c_str());
1429 }
1430
1431 if (ip_whitelist_var != NULL)
1432 {
1433 std::string v(ip_whitelist_var);
1434 v.erase(std::remove(v.begin(), v.end(), ' '), v.end());
1435 std::transform(v.begin(), v.end(), v.begin(), ::tolower);
1436
1437 // if the user specified a list other than automatic
1438 // then we need to pass it to the GCS, otherwise we
1439 // do nothing and let GCS scan for the proper IPs
1440 if (v.find("automatic") == std::string::npos)
1441 {
1442 gcs_module_parameters.add_parameter("ip_whitelist",
1443 std::string(ip_whitelist_var));
1444 }
1445 }
1446
1447 // Configure GCS.
1448 if (gcs_module->configure(gcs_module_parameters))
1449 {
1450 log_message(MY_ERROR_LEVEL,
1451 "Unable to initialize the group communication engine");
1452 DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR);
1453 }
1454 log_message(MY_INFORMATION_LEVEL,
1455 "Initialized group communication with configuration: "
1456 "group_replication_group_name: \"%s\"; "
1457 "group_replication_local_address: \"%s\"; "
1458 "group_replication_group_seeds: \"%s\"; "
1459 "group_replication_bootstrap_group: %s; "
1460 "group_replication_poll_spin_loops: %lu; "
1461 "group_replication_compression_threshold: %lu; "
1462 "group_replication_ip_whitelist: \"%s\"",
1463 group_name_var, local_address_var, group_seeds_var,
1464 bootstrap_group_var ? "true" : "false",
1465 poll_spin_loops_var, compression_threshold_var,
1466 ip_whitelist_var);
1467
1468 DBUG_RETURN(0);
1469 }
1470
start_group_communication()1471 int start_group_communication()
1472 {
1473 DBUG_ENTER("start_group_communication");
1474
1475 if (auto_increment_handler != NULL)
1476 {
1477 auto_increment_handler->
1478 set_auto_increment_variables(auto_increment_increment_var,
1479 get_server_id());
1480 }
1481
1482 events_handler= new Plugin_gcs_events_handler(applier_module,
1483 recovery_module,
1484 view_change_notifier,
1485 compatibility_mgr,
1486 components_stop_timeout_var);
1487
1488 view_change_notifier->start_view_modification();
1489
1490 if (gcs_module->join(*events_handler, *events_handler))
1491 DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_JOIN_ERROR);
1492
1493 DBUG_RETURN(0);
1494 }
1495
check_async_channel_running_on_secondary()1496 bool check_async_channel_running_on_secondary()
1497 {
1498 /* To stop group replication to start on secondary member with single primary-
1499 mode, when any async channels are running, we verify whether member is not
1500 bootstrapping. As only when the member is bootstrapping, it can be the
1501 primary leader on a single primary member context.
1502 */
1503 if (single_primary_mode_var && !bootstrap_group_var)
1504 {
1505 if (is_any_slave_channel_running(
1506 CHANNEL_RECEIVER_THREAD | CHANNEL_APPLIER_THREAD))
1507 {
1508 return true;
1509 }
1510 }
1511
1512 return false;
1513 }
1514
initialize_asynchronous_channels_observer()1515 void initialize_asynchronous_channels_observer()
1516 {
1517 if (single_primary_mode_var)
1518 {
1519 asynchronous_channels_state_observer=
1520 new Asynchronous_channels_state_observer();
1521 channel_observation_manager
1522 ->register_channel_observer(asynchronous_channels_state_observer);
1523 }
1524 }
1525
terminate_asynchronous_channels_observer()1526 void terminate_asynchronous_channels_observer()
1527 {
1528 if (asynchronous_channels_state_observer != NULL)
1529 {
1530 channel_observation_manager
1531 ->unregister_channel_observer(asynchronous_channels_state_observer);
1532 delete asynchronous_channels_state_observer;
1533 asynchronous_channels_state_observer= NULL;
1534 }
1535 }
1536
initialize_recovery_module()1537 int initialize_recovery_module()
1538 {
1539 recovery_module = new Recovery_module(applier_module,
1540 channel_observation_manager,
1541 components_stop_timeout_var);
1542
1543 recovery_module->set_recovery_ssl_options(recovery_use_ssl_var,
1544 recovery_ssl_ca_var,
1545 recovery_ssl_capath_var,
1546 recovery_ssl_cert_var,
1547 recovery_ssl_cipher_var,
1548 recovery_ssl_key_var,
1549 recovery_ssl_crl_var,
1550 recovery_ssl_crlpath_var,
1551 recovery_ssl_verify_server_cert_var);
1552 recovery_module->
1553 set_recovery_completion_policy(
1554 (enum_recovery_completion_policies) recovery_completion_policy_var);
1555 recovery_module->set_recovery_donor_retry_count(recovery_retry_count_var);
1556 recovery_module->
1557 set_recovery_donor_reconnect_interval(recovery_reconnect_interval_var);
1558
1559 return 0;
1560 }
1561
terminate_recovery_module()1562 int terminate_recovery_module()
1563 {
1564 int error= 0;
1565 if(recovery_module != NULL)
1566 {
1567 error = recovery_module->stop_recovery();
1568 delete recovery_module;
1569 recovery_module= NULL;
1570 }
1571 return error;
1572 }
1573
server_engine_initialized()1574 bool server_engine_initialized()
1575 {
1576 //check if empty channel exists, i.e, the slave structures are initialized
1577 return channel_is_active("", CHANNEL_NO_THD);
1578 }
1579
register_server_reset_master()1580 void register_server_reset_master(){
1581 known_server_reset= true;
1582 }
1583
get_allow_local_lower_version_join()1584 bool get_allow_local_lower_version_join()
1585 {
1586 DBUG_ENTER("get_allow_local_lower_version_join");
1587 DBUG_RETURN(allow_local_lower_version_join_var);
1588 }
1589
get_allow_local_disjoint_gtids_join()1590 bool get_allow_local_disjoint_gtids_join()
1591 {
1592 DBUG_ENTER("get_allow_local_disjoint_gtids_join");
1593 DBUG_RETURN(allow_local_disjoint_gtids_join_var);
1594 }
1595
get_transaction_size_limit()1596 ulong get_transaction_size_limit()
1597 {
1598 DBUG_ENTER("get_transaction_size_limit");
1599 assert(my_atomic_load64(&transaction_size_limit_var)>=0);
1600 ulong limit = static_cast<ulong>(my_atomic_load64(&transaction_size_limit_var));
1601 DBUG_RETURN(limit);
1602 }
1603
is_plugin_waiting_to_set_server_read_mode()1604 bool is_plugin_waiting_to_set_server_read_mode()
1605 {
1606 DBUG_ENTER("is_plugin_waiting_to_set_server_read_mode");
1607 DBUG_RETURN(plugin_is_waiting_to_set_server_read_mode);
1608 }
1609
1610 /*
1611 This method is used to accomplish the startup validations of the plugin
1612 regarding system configuration.
1613
1614 It currently verifies:
1615 - Binlog enabled
1616 - Binlog checksum mode
1617 - Binlog format
1618 - Gtid mode
1619 - LOG_SLAVE_UPDATES
1620 - Single primary mode configuration
1621
1622 @return If the operation succeed or failed
1623 @retval 0 in case of success
1624 @retval 1 in case of failure
1625 */
check_if_server_properly_configured()1626 static int check_if_server_properly_configured()
1627 {
1628 DBUG_ENTER("check_if_server_properly_configured");
1629
1630 //Struct that holds startup and runtime requirements
1631 Trans_context_info startup_pre_reqs;
1632
1633 get_server_startup_prerequirements(startup_pre_reqs, !plugin_is_auto_starting);
1634
1635 if(!startup_pre_reqs.binlog_enabled)
1636 {
1637 log_message(MY_ERROR_LEVEL, "Binlog must be enabled for Group Replication");
1638 DBUG_RETURN(1);
1639 }
1640
1641 if(startup_pre_reqs.binlog_checksum_options != binary_log::BINLOG_CHECKSUM_ALG_OFF)
1642 {
1643 log_message(MY_ERROR_LEVEL, "binlog_checksum should be NONE for Group Replication");
1644 DBUG_RETURN(1);
1645 }
1646
1647 if(startup_pre_reqs.binlog_format != BINLOG_FORMAT_ROW)
1648 {
1649 log_message(MY_ERROR_LEVEL, "Binlog format should be ROW for Group Replication");
1650 DBUG_RETURN(1);
1651 }
1652
1653 if(startup_pre_reqs.gtid_mode != GTID_MODE_ON)
1654 {
1655 log_message(MY_ERROR_LEVEL, "Gtid mode should be ON for Group Replication");
1656 DBUG_RETURN(1);
1657 }
1658
1659 if(startup_pre_reqs.log_slave_updates != true)
1660 {
1661 log_message(MY_ERROR_LEVEL,
1662 "LOG_SLAVE_UPDATES should be ON for Group Replication");
1663 DBUG_RETURN(1);
1664 }
1665
1666 if(startup_pre_reqs.transaction_write_set_extraction ==
1667 HASH_ALGORITHM_OFF)
1668 {
1669 log_message(MY_ERROR_LEVEL,
1670 "Extraction of transaction write sets requires an hash algorithm "
1671 "configuration. Please, double check that the parameter "
1672 "transaction-write-set-extraction is set to a valid algorithm.");
1673 DBUG_RETURN(1);
1674 }
1675 else
1676 {
1677 write_set_extraction_algorithm=
1678 startup_pre_reqs.transaction_write_set_extraction;
1679 }
1680
1681 if (startup_pre_reqs.mi_repository_type != 1) //INFO_REPOSITORY_TABLE
1682 {
1683 log_message(MY_ERROR_LEVEL, "Master info repository must be set to TABLE.");
1684 DBUG_RETURN(1);
1685 }
1686
1687 if (startup_pre_reqs.rli_repository_type != 1) //INFO_REPOSITORY_TABLE
1688 {
1689 log_message(MY_ERROR_LEVEL, "Relay log info repository must be set to TABLE");
1690 DBUG_RETURN(1);
1691 }
1692
1693 if (startup_pre_reqs.parallel_applier_workers > 0)
1694 {
1695 if (startup_pre_reqs.parallel_applier_type != CHANNEL_MTS_PARALLEL_TYPE_LOGICAL_CLOCK)
1696 {
1697 log_message(MY_ERROR_LEVEL,
1698 "In order to use parallel applier on Group Replication, parameter "
1699 "slave-parallel-type must be set to 'LOGICAL_CLOCK'.");
1700 DBUG_RETURN(1);
1701 }
1702
1703 if (!startup_pre_reqs.parallel_applier_preserve_commit_order)
1704 {
1705 log_message(MY_WARNING_LEVEL,
1706 "Group Replication requires slave-preserve-commit-order "
1707 "to be set to ON when using more than 1 applier threads.");
1708 DBUG_RETURN(1);
1709 }
1710 }
1711
1712 if (single_primary_mode_var && enforce_update_everywhere_checks_var)
1713 {
1714 log_message(MY_ERROR_LEVEL,
1715 "Is is not allowed to run single primary mode with "
1716 "'enforce_update_everywhere_checks' enabled.");
1717 DBUG_RETURN(1);
1718 }
1719
1720 gr_lower_case_table_names= startup_pre_reqs.lower_case_table_names;
1721 assert (gr_lower_case_table_names <= 2);
1722 #ifndef NDEBUG
1723 DBUG_EXECUTE_IF("group_replication_skip_encode_lower_case_table_names",
1724 {
1725 gr_lower_case_table_names = SKIP_ENCODING_LOWER_CASE_TABLE_NAMES;
1726 });
1727 #endif
1728
1729
1730 DBUG_RETURN(0);
1731 }
1732
check_group_name_string(const char * str,bool is_var_update)1733 static int check_group_name_string(const char *str, bool is_var_update)
1734 {
1735 DBUG_ENTER("check_group_name_string");
1736
1737 if (!str)
1738 {
1739 if(!is_var_update)
1740 log_message(MY_ERROR_LEVEL, "The group name option is mandatory");
1741 else
1742 my_message(ER_WRONG_VALUE_FOR_VAR,
1743 "The group name option is mandatory",
1744 MYF(0)); /* purecov: inspected */
1745 DBUG_RETURN(1);
1746 }
1747
1748 if (strlen(str) > UUID_LENGTH)
1749 {
1750 if(!is_var_update)
1751 log_message(MY_ERROR_LEVEL, "The group name '%s' is not a valid UUID, its"
1752 " length is too big", str);
1753 else
1754 my_message(ER_WRONG_VALUE_FOR_VAR,
1755 "The group name is not a valid UUID, its length is too big",
1756 MYF(0));
1757 DBUG_RETURN(1);
1758 }
1759
1760 if (!Uuid::is_valid(str))
1761 {
1762 if(!is_var_update)
1763 log_message(MY_ERROR_LEVEL, "The group name '%s' is not a valid UUID", str); /* purecov: inspected */
1764 else
1765 my_message(ER_WRONG_VALUE_FOR_VAR, "The group name is not a valid UUID",
1766 MYF(0));
1767 DBUG_RETURN(1);
1768 }
1769
1770 DBUG_RETURN(0);
1771 }
1772
check_group_name(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)1773 static int check_group_name(MYSQL_THD thd, SYS_VAR *var, void* save,
1774 struct st_mysql_value *value)
1775 {
1776 DBUG_ENTER("check_group_name");
1777
1778 char buff[NAME_CHAR_LEN];
1779 const char *str;
1780
1781 if (plugin_is_group_replication_running())
1782 {
1783 my_message(ER_GROUP_REPLICATION_RUNNING,
1784 "The group name cannot be changed when Group Replication is running",
1785 MYF(0));
1786 DBUG_RETURN(1);
1787 }
1788
1789 (*(const char **) save)= NULL;
1790
1791 int length= sizeof(buff);
1792 if ((str= value->val_str(value, buff, &length)))
1793 str= thd->strmake(str, length);
1794 else
1795 DBUG_RETURN(1); /* purecov: inspected */
1796
1797 if (check_group_name_string(str, true))
1798 DBUG_RETURN(1);
1799
1800 *(const char**)save= str;
1801
1802 DBUG_RETURN(0);
1803 }
1804
1805 //Recovery module's module variable update/validate methods
1806
update_recovery_retry_count(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1807 static void update_recovery_retry_count(MYSQL_THD thd, SYS_VAR *var,
1808 void *var_ptr, const void *save)
1809 {
1810 DBUG_ENTER("update_recovery_retry_count");
1811
1812 (*(ulong*) var_ptr)= (*(ulong*) save);
1813 ulong in_val= *static_cast<const ulong*>(save);
1814
1815 if (recovery_module != NULL)
1816 {
1817 recovery_module->set_recovery_donor_retry_count(in_val);
1818 }
1819
1820 DBUG_VOID_RETURN;
1821 }
1822
update_recovery_reconnect_interval(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1823 static void update_recovery_reconnect_interval(MYSQL_THD thd, SYS_VAR *var,
1824 void *var_ptr, const void *save)
1825 {
1826 DBUG_ENTER("update_recovery_reconnect_interval");
1827
1828 (*(ulong*) var_ptr)= (*(ulong*) save);
1829 ulong in_val= *static_cast<const ulong*>(save);
1830
1831 if (recovery_module != NULL)
1832 {
1833 recovery_module->
1834 set_recovery_donor_reconnect_interval(in_val);
1835 }
1836
1837 DBUG_VOID_RETURN;
1838 }
1839
1840 //Recovery SSL options
1841
1842 static void
update_ssl_use(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1843 update_ssl_use(MYSQL_THD thd, SYS_VAR *var,
1844 void *var_ptr, const void *save)
1845 {
1846 DBUG_ENTER("update_ssl_use");
1847
1848 bool use_ssl_val= *((my_bool *) save);
1849 (*(my_bool *) var_ptr)= (*(my_bool *) save);
1850
1851 if (recovery_module != NULL)
1852 {
1853 recovery_module->set_recovery_use_ssl(use_ssl_val);
1854 }
1855
1856 DBUG_VOID_RETURN;
1857 }
1858
check_recovery_ssl_string(const char * str,const char * var_name,bool is_var_update)1859 static int check_recovery_ssl_string(const char *str, const char *var_name,
1860 bool is_var_update)
1861 {
1862 DBUG_ENTER("check_recovery_ssl_string");
1863
1864 if (strlen(str) > FN_REFLEN)
1865 {
1866 if(!is_var_update)
1867 log_message(MY_ERROR_LEVEL,
1868 "The given value for recovery ssl option '%s' is invalid"
1869 " as its length is beyond the limit", var_name);
1870 else
1871 my_message(ER_WRONG_VALUE_FOR_VAR,
1872 "The given value for recovery ssl option is invalid"
1873 " as its length is beyond the limit",
1874 MYF(0));
1875 DBUG_RETURN(1);
1876 }
1877
1878 DBUG_RETURN(0);
1879 }
1880
check_recovery_ssl_option(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)1881 static int check_recovery_ssl_option(MYSQL_THD thd, SYS_VAR *var, void* save,
1882 struct st_mysql_value *value)
1883 {
1884 DBUG_ENTER("check_recovery_ssl_option");
1885
1886 char buff[STRING_BUFFER_USUAL_SIZE];
1887 const char *str= NULL;
1888
1889 (*(const char **) save)= NULL;
1890
1891 int length= sizeof(buff);
1892 if ((str= value->val_str(value, buff, &length)))
1893 str= thd->strmake(str, length);
1894 else
1895 DBUG_RETURN(1); /* purecov: inspected */
1896
1897 if (str != NULL && check_recovery_ssl_string(str, var->name, true))
1898 {
1899 DBUG_RETURN(1);
1900 }
1901
1902 *(const char**)save= str;
1903
1904 DBUG_RETURN(0);
1905 }
1906
update_recovery_ssl_option(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1907 static void update_recovery_ssl_option(MYSQL_THD thd, SYS_VAR *var,
1908 void *var_ptr, const void *save)
1909 {
1910 DBUG_ENTER("update_recovery_ssl_option");
1911
1912
1913 const char *new_option_val= *(const char**)save;
1914 (*(const char **) var_ptr)= (*(const char **) save);
1915
1916 //According to the var name, get the operation code and act accordingly
1917 switch(recovery_ssl_opt_map[var->name])
1918 {
1919 case RECOVERY_SSL_CA_OPT:
1920 if (recovery_module != NULL)
1921 recovery_module->set_recovery_ssl_ca(new_option_val);
1922 break;
1923 case RECOVERY_SSL_CAPATH_OPT:
1924 if (recovery_module != NULL)
1925 recovery_module->set_recovery_ssl_capath(new_option_val);
1926 break;
1927 case RECOVERY_SSL_CERT_OPT:
1928 if (recovery_module != NULL)
1929 recovery_module->set_recovery_ssl_cert(new_option_val);
1930 break;
1931 case RECOVERY_SSL_CIPHER_OPT:
1932 if (recovery_module != NULL)
1933 recovery_module->set_recovery_ssl_cipher(new_option_val);
1934 break;
1935 case RECOVERY_SSL_KEY_OPT:
1936 if (recovery_module != NULL)
1937 recovery_module->set_recovery_ssl_key(new_option_val);
1938 break;
1939 case RECOVERY_SSL_CRL_OPT:
1940 if (recovery_module != NULL)
1941 recovery_module->set_recovery_ssl_crl(new_option_val);
1942 break;
1943 case RECOVERY_SSL_CRLPATH_OPT:
1944 if (recovery_module != NULL)
1945 recovery_module->set_recovery_ssl_crlpath(new_option_val);
1946 break;
1947 default:
1948 assert(0); /* purecov: inspected */
1949 }
1950
1951 DBUG_VOID_RETURN;
1952 }
1953
1954 static void
update_ssl_server_cert_verification(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1955 update_ssl_server_cert_verification(MYSQL_THD thd, SYS_VAR *var,
1956 void *var_ptr, const void *save)
1957 {
1958 DBUG_ENTER("update_ssl_server_cert_verification");
1959
1960 bool ssl_verify_server_cert= *((my_bool *) save);
1961 (*(my_bool *) var_ptr)= (*(my_bool *) save);
1962
1963 if (recovery_module != NULL)
1964 {
1965 recovery_module->
1966 set_recovery_ssl_verify_server_cert(ssl_verify_server_cert);
1967 }
1968
1969 DBUG_VOID_RETURN;
1970 }
1971
1972 // Recovery threshold update method
1973
1974 static void
update_recovery_completion_policy(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1975 update_recovery_completion_policy(MYSQL_THD thd, SYS_VAR *var,
1976 void *var_ptr, const void *save)
1977 {
1978 DBUG_ENTER("update_recovery_completion_policy");
1979
1980 ulong in_val= *static_cast<const ulong*>(save);
1981 (*(ulong*) var_ptr)= (*(ulong*) save);
1982
1983 if (recovery_module != NULL)
1984 {
1985 recovery_module->
1986 set_recovery_completion_policy(
1987 (enum_recovery_completion_policies)in_val);
1988 }
1989
1990 DBUG_VOID_RETURN;
1991 }
1992
1993 //Component timeout update method
1994
update_component_timeout(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1995 static void update_component_timeout(MYSQL_THD thd, SYS_VAR *var,
1996 void *var_ptr, const void *save)
1997 {
1998 DBUG_ENTER("update_component_timeout");
1999
2000 ulong in_val= *static_cast<const ulong*>(save);
2001 (*(ulong*) var_ptr)= (*(ulong*) save);
2002
2003 if (applier_module != NULL)
2004 {
2005 applier_module->set_stop_wait_timeout(in_val);
2006 }
2007 if (recovery_module != NULL)
2008 {
2009 recovery_module->set_stop_wait_timeout(in_val);
2010 }
2011 if (events_handler != NULL)
2012 {
2013 events_handler->set_stop_wait_timeout(in_val);
2014 }
2015
2016 DBUG_VOID_RETURN;
2017 }
2018
check_auto_increment_increment(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2019 static int check_auto_increment_increment(MYSQL_THD thd, SYS_VAR *var,
2020 void* save,
2021 struct st_mysql_value *value)
2022 {
2023 DBUG_ENTER("check_auto_increment_increment");
2024
2025 longlong in_val;
2026 value->val_int(value, &in_val);
2027
2028 if (plugin_is_group_replication_running())
2029 {
2030 my_message(ER_GROUP_REPLICATION_RUNNING,
2031 "The group auto_increment_increment cannot be changed"
2032 " when Group Replication is running",
2033 MYF(0));
2034 DBUG_RETURN(1);
2035 }
2036
2037 if (in_val > MAX_AUTO_INCREMENT_INCREMENT ||
2038 in_val < MIN_AUTO_INCREMENT_INCREMENT)
2039 {
2040 std::stringstream ss;
2041 ss << "The value " << in_val << " is not within the range of "
2042 "accepted values for the option "
2043 "group_replication_auto_increment_increment. The value "
2044 "must be between " << MIN_AUTO_INCREMENT_INCREMENT <<
2045 " and " << MAX_AUTO_INCREMENT_INCREMENT << " inclusive.";
2046 my_message(ER_WRONG_VALUE_FOR_VAR, ss.str().c_str(), MYF(0));
2047 DBUG_RETURN(1);
2048 }
2049
2050 *(longlong*)save= in_val;
2051 DBUG_RETURN(0);
2052 }
2053
2054 //Communication layer options.
2055
check_ip_whitelist_preconditions(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2056 static int check_ip_whitelist_preconditions(MYSQL_THD thd, SYS_VAR *var,
2057 void *save,
2058 struct st_mysql_value *value)
2059 {
2060 DBUG_ENTER("check_ip_whitelist_preconditions");
2061
2062 char buff[IP_WHITELIST_STR_BUFFER_LENGTH];
2063 const char *str;
2064 int length= sizeof(buff);
2065
2066 if (plugin_is_group_replication_running())
2067 {
2068 my_message(ER_GROUP_REPLICATION_RUNNING,
2069 "The IP whitelist cannot be set while Group Replication "
2070 "is running", MYF(0));
2071 DBUG_RETURN(1);
2072 }
2073
2074 (*(const char **) save)= NULL;
2075
2076 if ((str= value->val_str(value, buff, &length)))
2077 str= thd->strmake(str, length);
2078 else // NULL value is not allowed
2079 DBUG_RETURN(1); /* purecov: inspected */
2080
2081 // remove trailing whitespaces
2082 std::string v(str);
2083 v.erase(std::remove(v.begin(), v.end(), ' '), v.end());
2084 std::transform(v.begin(), v.end(), v.begin(), ::tolower);
2085 if (v.find("automatic") != std::string::npos && v.size() != 9)
2086 {
2087 my_message(ER_GROUP_REPLICATION_CONFIGURATION,
2088 "The IP whitelist is invalid. Make sure that AUTOMATIC "
2089 "when specifying \"AUTOMATIC\" the list contains no "
2090 "other values.", MYF(0));
2091 DBUG_RETURN(1);
2092 }
2093
2094 *(const char**)save= str;
2095
2096 DBUG_RETURN(0);
2097 }
2098
check_compression_threshold(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2099 static int check_compression_threshold(MYSQL_THD thd, SYS_VAR *var,
2100 void* save,
2101 struct st_mysql_value *value)
2102 {
2103 DBUG_ENTER("check_compression_threshold");
2104
2105 longlong in_val;
2106 value->val_int(value, &in_val);
2107
2108 if (plugin_is_group_replication_running())
2109 {
2110 my_message(ER_GROUP_REPLICATION_RUNNING,
2111 "The compression threshold cannot be set while "
2112 "Group Replication is running",
2113 MYF(0));
2114 DBUG_RETURN(1);
2115 }
2116
2117 if (in_val > MAX_COMPRESSION_THRESHOLD || in_val < 0)
2118 {
2119 std::stringstream ss;
2120 ss << "The value " << in_val << " is not within the range of "
2121 "accepted values for the option compression_threshold!";
2122 my_message(ER_WRONG_VALUE_FOR_VAR, ss.str().c_str(), MYF(0));
2123 DBUG_RETURN(1);
2124 }
2125
2126 *(longlong*)save= in_val;
2127
2128 DBUG_RETURN(0);
2129 }
2130
check_force_members(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2131 static int check_force_members(MYSQL_THD thd, SYS_VAR *var,
2132 void* save,
2133 struct st_mysql_value *value)
2134 {
2135 DBUG_ENTER("check_force_members");
2136 int error= 0;
2137 char buff[STRING_BUFFER_USUAL_SIZE];
2138 const char *str= NULL;
2139 (*(const char **) save)= NULL;
2140 int length= 0;
2141
2142 // Only one set force_members can run at a time.
2143 mysql_mutex_lock(&force_members_running_mutex);
2144 if (force_members_running)
2145 {
2146 log_message(MY_ERROR_LEVEL,
2147 "There is one group_replication_force_members "
2148 "operation already ongoing");
2149 mysql_mutex_unlock(&force_members_running_mutex);
2150 DBUG_RETURN(1);
2151 }
2152 force_members_running= true;
2153 mysql_mutex_unlock(&force_members_running_mutex);
2154
2155 #ifndef NDEBUG
2156 DBUG_EXECUTE_IF("group_replication_wait_on_check_force_members",
2157 {
2158 const char act[]= "now wait_for waiting";
2159 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
2160 });
2161 #endif
2162
2163 // String validations.
2164 length= sizeof(buff);
2165 if ((str= value->val_str(value, buff, &length)))
2166 str= thd->strmake(str, length);
2167 else
2168 {
2169 error= 1; /* purecov: inspected */
2170 goto end; /* purecov: inspected */
2171 }
2172
2173 // If option value is empty string, just update its value.
2174 if (length == 0)
2175 goto update_value;
2176
2177 // if group replication isn't running and majority is reachable you can't
2178 // update force_members
2179 if (!plugin_is_group_replication_running() ||
2180 !group_member_mgr->is_majority_unreachable())
2181 {
2182 log_message(MY_ERROR_LEVEL,
2183 "group_replication_force_members can only be updated"
2184 " when Group Replication is running and a majority of the"
2185 " members are unreachable");
2186 error= 1;
2187 goto end;
2188 }
2189
2190 if ((error= gcs_module->force_members(str)))
2191 goto end;
2192
2193 update_value:
2194 *(const char**)save= str;
2195
2196 end:
2197 mysql_mutex_lock(&force_members_running_mutex);
2198 force_members_running= false;
2199 mysql_mutex_unlock(&force_members_running_mutex);
2200
2201 DBUG_RETURN(error);
2202 }
2203
check_gtid_assignment_block_size(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2204 static int check_gtid_assignment_block_size(MYSQL_THD thd, SYS_VAR *var,
2205 void* save,
2206 struct st_mysql_value *value)
2207 {
2208 DBUG_ENTER("check_gtid_assignment_block_size");
2209
2210 longlong in_val;
2211 value->val_int(value, &in_val);
2212
2213 if (plugin_is_group_replication_running())
2214 {
2215 my_message(ER_GROUP_REPLICATION_RUNNING,
2216 "The GTID assignment block size cannot be set while "
2217 "Group Replication is running", MYF(0));
2218 DBUG_RETURN(1);
2219 }
2220
2221 if (in_val > MAX_GTID_ASSIGNMENT_BLOCK_SIZE ||
2222 in_val < MIN_GTID_ASSIGNMENT_BLOCK_SIZE)
2223 {
2224 std::stringstream ss;
2225 ss << "The value " << in_val << " is not within the range of "
2226 "accepted values for the option gtid_assignment_block_size. "
2227 "The value must be between " << MIN_GTID_ASSIGNMENT_BLOCK_SIZE <<
2228 " and " << MAX_GTID_ASSIGNMENT_BLOCK_SIZE << " inclusive.";
2229 my_message(ER_WRONG_VALUE_FOR_VAR, ss.str().c_str(), MYF(0));
2230 DBUG_RETURN(1);
2231 }
2232
2233 *(longlong*)save= in_val;
2234
2235 DBUG_RETURN(0);
2236 }
2237
2238 static bool
get_bool_value_using_type_lib(struct st_mysql_value * value,my_bool & resulting_value)2239 get_bool_value_using_type_lib(struct st_mysql_value *value,
2240 my_bool &resulting_value)
2241 {
2242 DBUG_ENTER("get_bool_value_using_type_lib");
2243 longlong value_to_check;
2244
2245 if (MYSQL_VALUE_TYPE_STRING == value->value_type(value))
2246 {
2247 const unsigned int flags = 0;
2248
2249 char text_buffer[10] = { 0 };
2250 int text_buffer_size = sizeof(text_buffer);
2251 const char *text_value = value->val_str(value,text_buffer, &text_buffer_size);
2252
2253 if (NULL == text_value)
2254 DBUG_RETURN(false);
2255
2256 // Return index inside bool_type_allowed_values array
2257 // (first element start with index 1)
2258 value_to_check = find_type(text_value, &plugin_bool_typelib, flags);
2259
2260 if (0 == value_to_check)
2261 {
2262 DBUG_RETURN(false);
2263 }
2264
2265 // Move the index value to 0,1 values (OFF, ON)
2266 --value_to_check;
2267 }
2268 else
2269 {
2270 // Do implicit conversion to int
2271 value->val_int(value, &value_to_check);
2272 }
2273
2274 resulting_value = value_to_check > 0 ? TRUE : FALSE;
2275
2276 DBUG_RETURN(true);
2277 }
2278
2279 static int
check_single_primary_mode(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2280 check_single_primary_mode(MYSQL_THD thd, SYS_VAR *var,
2281 void* save,
2282 struct st_mysql_value *value)
2283 {
2284 DBUG_ENTER("check_single_primary_mode");
2285 my_bool single_primary_mode_val;
2286
2287 if (!get_bool_value_using_type_lib(value, single_primary_mode_val))
2288 DBUG_RETURN(1);
2289
2290 if (plugin_is_group_replication_running())
2291 {
2292 my_message(ER_GROUP_REPLICATION_RUNNING,
2293 "Cannot change into or from single primary mode while "
2294 "Group Replication is running.", MYF(0));
2295 DBUG_RETURN(1);
2296 }
2297
2298 if (single_primary_mode_val && enforce_update_everywhere_checks_var)
2299 {
2300 my_message(ER_WRONG_VALUE_FOR_VAR,
2301 "Cannot turn ON single_primary_mode while "
2302 "enforce_update_everywhere_checks is enabled.",
2303 MYF(0));
2304 DBUG_RETURN(1);
2305 }
2306
2307 *(my_bool *)save = single_primary_mode_val;
2308
2309 DBUG_RETURN(0);
2310 }
2311
2312 static int
check_enforce_update_everywhere_checks(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2313 check_enforce_update_everywhere_checks(MYSQL_THD thd, SYS_VAR *var,
2314 void* save,
2315 struct st_mysql_value *value)
2316 {
2317 DBUG_ENTER("check_enforce_update_everywhere_checks");
2318 my_bool enforce_update_everywhere_checks_val;
2319
2320 if (!get_bool_value_using_type_lib(value, enforce_update_everywhere_checks_val))
2321 DBUG_RETURN(1);
2322
2323 if (plugin_is_group_replication_running())
2324 {
2325 my_message(ER_GROUP_REPLICATION_RUNNING,
2326 "Cannot turn ON/OFF update everywhere checks mode while "
2327 "Group Replication is running.", MYF(0));
2328 DBUG_RETURN(1);
2329 }
2330
2331 if (single_primary_mode_var && enforce_update_everywhere_checks_val)
2332 {
2333 my_message(ER_WRONG_VALUE_FOR_VAR,
2334 "Cannot enable enforce_update_everywhere_checks while "
2335 "single_primary_mode is enabled.",
2336 MYF(0));
2337 DBUG_RETURN(1);
2338 }
2339
2340 *(my_bool *)save = enforce_update_everywhere_checks_val;
2341
2342 DBUG_RETURN(0);
2343 }
2344
2345 static void
update_allow_local_disjoint_gtids_join(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)2346 update_allow_local_disjoint_gtids_join(MYSQL_THD thd, SYS_VAR *var,
2347 void *var_ptr, const void *save)
2348 {
2349 DBUG_ENTER("update_allow_local_disjoint_gtids_join");
2350
2351 (*(my_bool *) var_ptr)= (*(my_bool *) save);
2352
2353 option_deprecation_warning(thd,
2354 "group_replication_allow_local_disjoint_gtids_join");
2355
2356 DBUG_VOID_RETURN;
2357 }
2358
update_unreachable_timeout(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)2359 static void update_unreachable_timeout(MYSQL_THD thd, SYS_VAR *var,
2360 void *var_ptr, const void *save)
2361 {
2362 DBUG_ENTER("update_unreachable_timeout");
2363
2364 ulong in_val= *static_cast<const ulong*>(save);
2365 (*(ulong*) var_ptr)= (*(ulong*) save);
2366
2367 if (group_partition_handler != NULL)
2368 {
2369 group_partition_handler->update_timeout_on_unreachable(in_val);
2370 }
2371
2372 DBUG_VOID_RETURN;
2373 }
2374
2375 static void
update_member_weight(MYSQL_THD,SYS_VAR *,void * var_ptr,const void * save)2376 update_member_weight(MYSQL_THD, SYS_VAR*,
2377 void *var_ptr, const void *save)
2378 {
2379 DBUG_ENTER("update_member_weight");
2380
2381 (*(uint*) var_ptr)= (*(uint*) save);
2382 uint in_val= *static_cast<const uint*>(save);
2383
2384 if (local_member_info != NULL)
2385 {
2386 local_member_info->set_member_weight(in_val);
2387 }
2388
2389 DBUG_VOID_RETURN;
2390 }
2391
update_transaction_size_limit(MYSQL_THD,SYS_VAR *,void * var_ptr,const void * save)2392 static void update_transaction_size_limit(MYSQL_THD, SYS_VAR *, void *var_ptr,
2393 const void *save) {
2394
2395 ulong in_val = *static_cast<const ulong *>(save);
2396 *static_cast<ulong *>(var_ptr) = in_val;
2397 my_atomic_store64(&transaction_size_limit_var, in_val);
2398
2399 transaction_size_limit_var = in_val;
2400
2401 if (plugin_is_group_replication_running()) {
2402 update_write_set_memory_size_limit(transaction_size_limit_var);
2403 }
2404 }
2405
2406 //Base plugin variables
2407
2408 static MYSQL_SYSVAR_STR(
2409 group_name, /* name */
2410 group_name_var, /* var */
2411 /* optional var | malloc string | no set default */
2412 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC | PLUGIN_VAR_NODEFAULT,
2413 "The group name",
2414 check_group_name, /* check func*/
2415 NULL, /* update func*/
2416 NULL); /* default*/
2417
2418 static MYSQL_SYSVAR_BOOL(
2419 start_on_boot, /* name */
2420 start_group_replication_at_boot_var, /* var */
2421 PLUGIN_VAR_OPCMDARG, /* optional var */
2422 "Whether the server should start Group Replication or not during bootstrap.",
2423 NULL, /* check func*/
2424 NULL, /* update func*/
2425 1); /* default*/
2426
2427 //GCS module variables
2428
2429 static MYSQL_SYSVAR_STR(
2430 local_address, /* name */
2431 local_address_var, /* var */
2432 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2433 "The local address, i.e., host:port.",
2434 NULL, /* check func*/
2435 NULL, /* update func*/
2436 ""); /* default*/
2437
2438 static MYSQL_SYSVAR_STR(
2439 group_seeds, /* name */
2440 group_seeds_var, /* var */
2441 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2442 "The list of group seeds, comma separated. E.g., host1:port1,host2:port2.",
2443 NULL, /* check func*/
2444 NULL, /* update func*/
2445 ""); /* default*/
2446
2447 static MYSQL_SYSVAR_STR(
2448 force_members, /* name */
2449 force_members_var, /* var */
2450 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2451 "The list of members, comma separated. E.g., host1:port1,host2:port2. "
2452 "This option is used to force a new group membership, on which the excluded "
2453 "members will not receive a new view and will be blocked. The DBA will need "
2454 "to kill the excluded servers.",
2455 check_force_members, /* check func*/
2456 NULL, /* update func*/
2457 ""); /* default*/
2458
2459 static MYSQL_SYSVAR_BOOL(
2460 bootstrap_group, /* name */
2461 bootstrap_group_var, /* var */
2462 PLUGIN_VAR_OPCMDARG, /* optional var */
2463 "Specify if this member will bootstrap the group.",
2464 NULL, /* check func. */
2465 NULL, /* update func*/
2466 0 /* default */
2467 );
2468
2469 static MYSQL_SYSVAR_ULONG(
2470 poll_spin_loops, /* name */
2471 poll_spin_loops_var, /* var */
2472 PLUGIN_VAR_OPCMDARG, /* optional var */
2473 "The number of times a thread waits for a communication engine "
2474 "mutex to be freed before the thread is suspended.",
2475 NULL, /* check func. */
2476 NULL, /* update func. */
2477 0, /* default */
2478 0, /* min */
2479 ~0UL, /* max */
2480 0 /* block */
2481 );
2482
2483 //Recovery module variables
2484
2485 static MYSQL_SYSVAR_ULONG(
2486 recovery_retry_count, /* name */
2487 recovery_retry_count_var, /* var */
2488 PLUGIN_VAR_OPCMDARG, /* optional var */
2489 "The number of times that the joiner tries to connect to the available donors before giving up.",
2490 NULL, /* check func. */
2491 update_recovery_retry_count, /* update func. */
2492 10, /* default */
2493 0, /* min */
2494 LONG_TIMEOUT, /* max */
2495 0 /* block */
2496 );
2497
2498 static MYSQL_SYSVAR_ULONG(
2499 recovery_reconnect_interval, /* name */
2500 recovery_reconnect_interval_var, /* var */
2501 PLUGIN_VAR_OPCMDARG, /* optional var */
2502 "The sleep time between reconnection attempts when no donor was found in the group",
2503 NULL, /* check func. */
2504 update_recovery_reconnect_interval, /* update func. */
2505 60, /* default */
2506 0, /* min */
2507 LONG_TIMEOUT, /* max */
2508 0 /* block */
2509 );
2510
2511 //SSL options for recovery
2512
2513 static MYSQL_SYSVAR_BOOL(
2514 recovery_use_ssl, /* name */
2515 recovery_use_ssl_var, /* var */
2516 PLUGIN_VAR_OPCMDARG, /* optional var */
2517 "Whether SSL use should be obligatory during Group Replication recovery process.",
2518 NULL, /* check func*/
2519 update_ssl_use, /* update func*/
2520 0); /* default*/
2521
2522 static MYSQL_SYSVAR_STR(
2523 recovery_ssl_ca, /* name */
2524 recovery_ssl_ca_var, /* var */
2525 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2526 "The path to a file that contains a list of trusted SSL certificate authorities.",
2527 check_recovery_ssl_option, /* check func*/
2528 update_recovery_ssl_option, /* update func*/
2529 ""); /* default*/
2530
2531 static MYSQL_SYSVAR_STR(
2532 recovery_ssl_capath, /* name */
2533 recovery_ssl_capath_var, /* var */
2534 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2535 "The path to a directory that contains trusted SSL certificate authority certificates.",
2536 check_recovery_ssl_option, /* check func*/
2537 update_recovery_ssl_option, /* update func*/
2538 ""); /* default*/
2539
2540 static MYSQL_SYSVAR_STR(
2541 recovery_ssl_cert, /* name */
2542 recovery_ssl_cert_var, /* var */
2543 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2544 "The name of the SSL certificate file to use for establishing a secure connection.",
2545 check_recovery_ssl_option, /* check func*/
2546 update_recovery_ssl_option, /* update func*/
2547 ""); /* default*/
2548
2549 static MYSQL_SYSVAR_STR(
2550 recovery_ssl_cipher, /* name */
2551 recovery_ssl_cipher_var, /* var */
2552 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2553 "A list of permissible ciphers to use for SSL encryption.",
2554 check_recovery_ssl_option, /* check func*/
2555 update_recovery_ssl_option, /* update func*/
2556 ""); /* default*/
2557
2558 static MYSQL_SYSVAR_STR(
2559 recovery_ssl_key, /* name */
2560 recovery_ssl_key_var, /* var */
2561 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2562 "The name of the SSL key file to use for establishing a secure connection.",
2563 check_recovery_ssl_option, /* check func*/
2564 update_recovery_ssl_option, /* update func*/
2565 ""); /* default*/
2566
2567 static MYSQL_SYSVAR_STR(
2568 recovery_ssl_crl, /* name */
2569 recovery_ssl_crl_var, /* var */
2570 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2571 "The path to a file containing certificate revocation lists.",
2572 check_recovery_ssl_option, /* check func*/
2573 update_recovery_ssl_option, /* update func*/
2574 ""); /* default*/
2575
2576 static MYSQL_SYSVAR_STR(
2577 recovery_ssl_crlpath, /* name */
2578 recovery_ssl_crlpath_var, /* var */
2579 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2580 "The path to a directory that contains files containing certificate revocation lists.",
2581 check_recovery_ssl_option, /* check func*/
2582 update_recovery_ssl_option, /* update func*/
2583 ""); /* default*/
2584
2585 static MYSQL_SYSVAR_BOOL(
2586 recovery_ssl_verify_server_cert, /* name */
2587 recovery_ssl_verify_server_cert_var, /* var */
2588 PLUGIN_VAR_OPCMDARG, /* optional var */
2589 "Make recovery check the server's Common Name value in the donor sent certificate.",
2590 NULL, /* check func*/
2591 update_ssl_server_cert_verification, /* update func*/
2592 0); /* default*/
2593
2594 /** Initialize the ssl option map with variable names*/
initialize_ssl_option_map()2595 static void initialize_ssl_option_map()
2596 {
2597 recovery_ssl_opt_map.clear();
2598 st_mysql_sys_var* ssl_ca_var= MYSQL_SYSVAR(recovery_ssl_ca);
2599 recovery_ssl_opt_map[ssl_ca_var->name]= RECOVERY_SSL_CA_OPT;
2600 st_mysql_sys_var* ssl_capath_var= MYSQL_SYSVAR(recovery_ssl_capath);
2601 recovery_ssl_opt_map[ssl_capath_var->name]= RECOVERY_SSL_CAPATH_OPT;
2602 st_mysql_sys_var* ssl_cert_var= MYSQL_SYSVAR(recovery_ssl_cert);
2603 recovery_ssl_opt_map[ssl_cert_var->name]= RECOVERY_SSL_CERT_OPT;
2604 st_mysql_sys_var* ssl_cipher_var= MYSQL_SYSVAR(recovery_ssl_cipher);
2605 recovery_ssl_opt_map[ssl_cipher_var->name]= RECOVERY_SSL_CIPHER_OPT;
2606 st_mysql_sys_var* ssl_key_var= MYSQL_SYSVAR(recovery_ssl_key);
2607 recovery_ssl_opt_map[ssl_key_var->name]= RECOVERY_SSL_KEY_OPT;
2608 st_mysql_sys_var* ssl_crl_var=MYSQL_SYSVAR(recovery_ssl_crl);
2609 recovery_ssl_opt_map[ssl_crl_var->name]= RECOVERY_SSL_CRL_OPT;
2610 st_mysql_sys_var* ssl_crlpath_var=MYSQL_SYSVAR(recovery_ssl_crlpath);
2611 recovery_ssl_opt_map[ssl_crlpath_var->name]= RECOVERY_SSL_CRLPATH_OPT;
2612 }
2613
2614 // Recovery threshold options
2615
2616 const char* recovery_policies[]= { "TRANSACTIONS_CERTIFIED",
2617 "TRANSACTIONS_APPLIED",
2618 (char *)0};
2619
2620 TYPELIB recovery_policies_typelib_t= {
2621 array_elements(recovery_policies) - 1,
2622 "recovery_policies_typelib_t",
2623 recovery_policies,
2624 NULL
2625 };
2626
2627 static MYSQL_SYSVAR_ENUM(
2628 recovery_complete_at, /* name */
2629 recovery_completion_policy_var, /* var */
2630 PLUGIN_VAR_OPCMDARG, /* optional var */
2631 "Recovery policies when handling cached transactions after state transfer."
2632 "possible values are TRANSACTIONS_CERTIFIED or TRANSACTION_APPLIED", /* values */
2633 NULL, /* check func. */
2634 update_recovery_completion_policy, /* update func. */
2635 RECOVERY_POLICY_WAIT_EXECUTED, /* default */
2636 &recovery_policies_typelib_t); /* type lib */
2637
2638 //Generic timeout setting
2639
2640 static MYSQL_SYSVAR_ULONG(
2641 components_stop_timeout, /* name */
2642 components_stop_timeout_var, /* var */
2643 PLUGIN_VAR_OPCMDARG, /* optional var */
2644 "Timeout in seconds that the plugin waits for each of the components when shutting down.",
2645 NULL, /* check func. */
2646 update_component_timeout, /* update func. */
2647 LONG_TIMEOUT, /* default */
2648 2, /* min */
2649 LONG_TIMEOUT, /* max */
2650 0 /* block */
2651 );
2652
2653 //Allow member downgrade
2654
2655 static MYSQL_SYSVAR_BOOL(
2656 allow_local_lower_version_join, /* name */
2657 allow_local_lower_version_join_var, /* var */
2658 PLUGIN_VAR_OPCMDARG, /* optional var */
2659 "Allow this server to join the group even if it has a lower plugin version than the group",
2660 NULL, /* check func. */
2661 NULL, /* update func*/
2662 0 /* default */
2663 );
2664
2665 static MYSQL_SYSVAR_BOOL(
2666 allow_local_disjoint_gtids_join, /* name */
2667 allow_local_disjoint_gtids_join_var, /* var */
2668 PLUGIN_VAR_OPCMDARG, /* optional var */
2669 "Allow this server to join the group even if it has transactions not present in the group",
2670 NULL, /* check func. */
2671 update_allow_local_disjoint_gtids_join,/* update func*/
2672 0 /* default */
2673 );
2674
2675 static MYSQL_SYSVAR_ULONG(
2676 auto_increment_increment, /* name */
2677 auto_increment_increment_var, /* var */
2678 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2679 "The group replication auto_increment_increment determines interval between successive column values",
2680 check_auto_increment_increment, /* check func. */
2681 NULL, /* update by update_func_long func. */
2682 DEFAULT_AUTO_INCREMENT_INCREMENT, /* default */
2683 MIN_AUTO_INCREMENT_INCREMENT, /* min */
2684 MAX_AUTO_INCREMENT_INCREMENT, /* max */
2685 0 /* block */
2686 );
2687
2688 static MYSQL_SYSVAR_ULONG(
2689 compression_threshold, /* name */
2690 compression_threshold_var, /* var */
2691 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2692 "The value in bytes above which (lz4) compression is "
2693 "enforced. When set to zero, deactivates compression. "
2694 "Default: 1000000.",
2695 check_compression_threshold, /* check func. */
2696 NULL, /* update func. */
2697 DEFAULT_COMPRESSION_THRESHOLD, /* default */
2698 MIN_COMPRESSION_THRESHOLD, /* min */
2699 MAX_COMPRESSION_THRESHOLD, /* max */
2700 0 /* block */
2701 );
2702
2703 static MYSQL_SYSVAR_ULONGLONG(
2704 gtid_assignment_block_size, /* name */
2705 gtid_assignment_block_size_var, /* var */
2706 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2707 "The number of consecutive GTIDs that are reserved to each "
2708 "member. Each member will consume its blocks and reserve "
2709 "more when needed. Default: 1000000.",
2710 check_gtid_assignment_block_size, /* check func. */
2711 NULL, /* update func. */
2712 DEFAULT_GTID_ASSIGNMENT_BLOCK_SIZE,/* default */
2713 MIN_GTID_ASSIGNMENT_BLOCK_SIZE, /* min */
2714 MAX_GTID_ASSIGNMENT_BLOCK_SIZE, /* max */
2715 0 /* block */
2716 );
2717
2718 TYPELIB ssl_mode_values_typelib_t= {
2719 array_elements(ssl_mode_values) - 1,
2720 "ssl_mode_values_typelib_t",
2721 ssl_mode_values,
2722 NULL
2723 };
2724
2725 static MYSQL_SYSVAR_ENUM(
2726 ssl_mode, /* name */
2727 ssl_mode_var, /* var */
2728 PLUGIN_VAR_OPCMDARG, /* optional var */
2729 "Specifies the security state of the connection between Group "
2730 "Replication members. Default: DISABLED",
2731 NULL, /* check func. */
2732 NULL, /* update func. */
2733 0, /* default */
2734 &ssl_mode_values_typelib_t /* type lib */
2735 );
2736
2737 static MYSQL_SYSVAR_STR(
2738 ip_whitelist, /* name */
2739 ip_whitelist_var, /* var */
2740 /* optional var | malloc string | no set default */
2741 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC | PLUGIN_VAR_NODEFAULT,
2742 "This option can be used to specify which members "
2743 "are allowed to connect to this member. The input "
2744 "takes the form of a comma separated list of IPv4 "
2745 "addresses or subnet CIDR notation. For example: "
2746 "192.168.1.0/24,10.0.0.1. In addition, the user can "
2747 "also set as input the value 'AUTOMATIC', in which case "
2748 "active interfaces on the host will be scanned and "
2749 "those with addresses on private subnetworks will be "
2750 "automatically added to the IP whitelist. The address "
2751 "127.0.0.1 is always added if not specified explicitly "
2752 "in the whitelist. Default: 'AUTOMATIC'.",
2753 check_ip_whitelist_preconditions, /* check func*/
2754 NULL, /* update func*/
2755 IP_WHITELIST_DEFAULT); /* default*/
2756
2757 static MYSQL_SYSVAR_BOOL(
2758 single_primary_mode, /* name */
2759 single_primary_mode_var, /* var */
2760 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2761 "Instructs the group to automatically pick a single server to be "
2762 "the one that handles read/write workload. This server is the "
2763 "PRIMARY all others are SECONDARIES. Default: TRUE.",
2764 check_single_primary_mode, /* check func*/
2765 NULL, /* update func*/
2766 TRUE); /* default*/
2767
2768 static MYSQL_SYSVAR_BOOL(
2769 enforce_update_everywhere_checks, /* name */
2770 enforce_update_everywhere_checks_var, /* var */
2771 PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2772 "Enable/Disable strict consistency checks for multi-master "
2773 "update everywhere. Default: FALSE.",
2774 check_enforce_update_everywhere_checks, /* check func*/
2775 NULL, /* update func*/
2776 FALSE); /* default*/
2777
2778 const char* flow_control_mode_values[]= {
2779 "DISABLED",
2780 "QUOTA",
2781 (const char*)0
2782 };
2783
2784 TYPELIB flow_control_mode_typelib_t= {
2785 array_elements(flow_control_mode_values) - 1,
2786 "flow_control_mode_typelib_t",
2787 flow_control_mode_values,
2788 NULL
2789 };
2790
2791 static MYSQL_SYSVAR_ENUM(
2792 flow_control_mode, /* name */
2793 flow_control_mode_var, /* var */
2794 PLUGIN_VAR_OPCMDARG, /* optional var */
2795 "Specifies the mode used on flow control. "
2796 "Default: QUOTA",
2797 NULL, /* check func. */
2798 NULL, /* update func. */
2799 FCM_QUOTA, /* default */
2800 &flow_control_mode_typelib_t /* type lib */
2801 );
2802
2803 static MYSQL_SYSVAR_INT(
2804 flow_control_certifier_threshold, /* name */
2805 flow_control_certifier_threshold_var, /* var */
2806 PLUGIN_VAR_OPCMDARG, /* optional var */
2807 "Specifies the number of waiting transactions that will trigger "
2808 "flow control. Default: 25000",
2809 NULL, /* check func. */
2810 NULL, /* update func. */
2811 DEFAULT_FLOW_CONTROL_THRESHOLD, /* default */
2812 MIN_FLOW_CONTROL_THRESHOLD, /* min */
2813 MAX_FLOW_CONTROL_THRESHOLD, /* max */
2814 0 /* block */
2815 );
2816
2817 static MYSQL_SYSVAR_INT(
2818 flow_control_applier_threshold, /* name */
2819 flow_control_applier_threshold_var, /* var */
2820 PLUGIN_VAR_OPCMDARG, /* optional var */
2821 "Specifies the number of waiting transactions that will trigger "
2822 "flow control. Default: 25000",
2823 NULL, /* check func. */
2824 NULL, /* update func. */
2825 DEFAULT_FLOW_CONTROL_THRESHOLD, /* default */
2826 MIN_FLOW_CONTROL_THRESHOLD, /* min */
2827 MAX_FLOW_CONTROL_THRESHOLD, /* max */
2828 0 /* block */
2829 );
2830
2831 static MYSQL_SYSVAR_ULONG(
2832 transaction_size_limit, /* name */
2833 transaction_size_limit_base_var, /* var */
2834 PLUGIN_VAR_OPCMDARG, /* optional var */
2835 "Specifies the limit of transaction size that can be transferred over network.",
2836 NULL, /* check func. */
2837 update_transaction_size_limit, /* update func. */
2838 DEFAULT_TRANSACTION_SIZE_LIMIT, /* default */
2839 MIN_TRANSACTION_SIZE_LIMIT, /* min */
2840 MAX_TRANSACTION_SIZE_LIMIT, /* max */
2841 0 /* block */
2842 );
2843
2844 static MYSQL_SYSVAR_ULONG(
2845 unreachable_majority_timeout, /* name */
2846 timeout_on_unreachable_var, /* var */
2847 PLUGIN_VAR_OPCMDARG, /* optional var */
2848 "The number of seconds before going into error when a majority of members is unreachable."
2849 "If 0 there is no action taken.",
2850 NULL, /* check func. */
2851 update_unreachable_timeout, /* update func. */
2852 0, /* default */
2853 0, /* min */
2854 LONG_TIMEOUT, /* max */
2855 0 /* block */
2856 );
2857
2858 static MYSQL_SYSVAR_UINT(
2859 member_weight, /* name */
2860 member_weight_var, /* var */
2861 PLUGIN_VAR_OPCMDARG, /* optional var */
2862 "Member weight will determine the member role in the group on"
2863 " future primary elections",
2864 NULL, /* check func. */
2865 update_member_weight, /* update func. */
2866 DEFAULT_MEMBER_WEIGHT, /* default */
2867 MIN_MEMBER_WEIGHT, /* min */
2868 MAX_MEMBER_WEIGHT, /* max */
2869 0 /* block */
2870 );
2871
2872 const char *exit_state_actions[]= {"READ_ONLY", "ABORT_SERVER", (char *)0};
2873 TYPELIB exit_state_actions_typelib_t= {array_elements(exit_state_actions) - 1,
2874 "exit_state_actions_typelib_t",
2875 exit_state_actions, NULL};
2876 static MYSQL_SYSVAR_ENUM(exit_state_action, /* name */
2877 exit_state_action_var, /* var */
2878 PLUGIN_VAR_OPCMDARG, /* optional var */
2879 "The action that is taken when the server "
2880 "leaves the group. "
2881 "Possible values are READ_ONLY or "
2882 "ABORT_SERVER.", /* values */
2883 NULL, /* check func. */
2884 NULL, /* update func. */
2885 EXIT_STATE_ACTION_READ_ONLY, /* default */
2886 &exit_state_actions_typelib_t); /* type lib */
2887
2888 static SYS_VAR* group_replication_system_vars[]= {
2889 MYSQL_SYSVAR(group_name),
2890 MYSQL_SYSVAR(start_on_boot),
2891 MYSQL_SYSVAR(local_address),
2892 MYSQL_SYSVAR(group_seeds),
2893 MYSQL_SYSVAR(force_members),
2894 MYSQL_SYSVAR(bootstrap_group),
2895 MYSQL_SYSVAR(poll_spin_loops),
2896 MYSQL_SYSVAR(recovery_retry_count),
2897 MYSQL_SYSVAR(recovery_use_ssl),
2898 MYSQL_SYSVAR(recovery_ssl_ca),
2899 MYSQL_SYSVAR(recovery_ssl_capath),
2900 MYSQL_SYSVAR(recovery_ssl_cert),
2901 MYSQL_SYSVAR(recovery_ssl_cipher),
2902 MYSQL_SYSVAR(recovery_ssl_key),
2903 MYSQL_SYSVAR(recovery_ssl_crl),
2904 MYSQL_SYSVAR(recovery_ssl_crlpath),
2905 MYSQL_SYSVAR(recovery_ssl_verify_server_cert),
2906 MYSQL_SYSVAR(recovery_complete_at),
2907 MYSQL_SYSVAR(recovery_reconnect_interval),
2908 MYSQL_SYSVAR(components_stop_timeout),
2909 MYSQL_SYSVAR(allow_local_lower_version_join),
2910 MYSQL_SYSVAR(allow_local_disjoint_gtids_join),
2911 MYSQL_SYSVAR(auto_increment_increment),
2912 MYSQL_SYSVAR(compression_threshold),
2913 MYSQL_SYSVAR(gtid_assignment_block_size),
2914 MYSQL_SYSVAR(ssl_mode),
2915 MYSQL_SYSVAR(ip_whitelist),
2916 MYSQL_SYSVAR(single_primary_mode),
2917 MYSQL_SYSVAR(enforce_update_everywhere_checks),
2918 MYSQL_SYSVAR(flow_control_mode),
2919 MYSQL_SYSVAR(flow_control_certifier_threshold),
2920 MYSQL_SYSVAR(flow_control_applier_threshold),
2921 MYSQL_SYSVAR(transaction_size_limit),
2922 MYSQL_SYSVAR(unreachable_majority_timeout),
2923 MYSQL_SYSVAR(member_weight),
2924 MYSQL_SYSVAR(exit_state_action),
2925 NULL,
2926 };
2927
2928
2929 static int
show_primary_member(MYSQL_THD thd,SHOW_VAR * var,char * buff)2930 show_primary_member(MYSQL_THD thd, SHOW_VAR *var, char *buff)
2931 {
2932 var->type= SHOW_CHAR;
2933 var->value= NULL;
2934
2935 if (group_member_mgr && single_primary_mode_var &&
2936 plugin_is_group_replication_running())
2937 {
2938 string primary_member_uuid;
2939 group_member_mgr->get_primary_member_uuid(primary_member_uuid);
2940
2941 strncpy(buff, primary_member_uuid.c_str(), SHOW_VAR_FUNC_BUFF_SIZE);
2942 buff[SHOW_VAR_FUNC_BUFF_SIZE - 1] = 0;
2943
2944 var->value= buff;
2945 }
2946
2947 return 0;
2948 }
2949
2950 static SHOW_VAR group_replication_status_vars[]=
2951 {
2952 {"group_replication_primary_member",
2953 (char*) &show_primary_member,
2954 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
2955 {NULL, NULL, SHOW_LONG, SHOW_SCOPE_GLOBAL},
2956 };
2957
2958
mysql_declare_plugin(group_replication_plugin)2959 mysql_declare_plugin(group_replication_plugin)
2960 {
2961 MYSQL_GROUP_REPLICATION_PLUGIN,
2962 &group_replication_descriptor,
2963 group_replication_plugin_name,
2964 "ORACLE",
2965 "Group Replication (1.0.0)", /* Plugin name with full version*/
2966 PLUGIN_LICENSE_GPL,
2967 plugin_group_replication_init, /* Plugin Init */
2968 plugin_group_replication_deinit, /* Plugin Deinit */
2969 0x0100, /* Plugin Version: major.minor */
2970 group_replication_status_vars, /* status variables */
2971 group_replication_system_vars, /* system variables */
2972 NULL, /* config options */
2973 0, /* flags */
2974 }
2975 mysql_declare_plugin_end;
2976