1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include <sstream>
24 #include <mysql/service_rpl_transaction_write_set.h>
25 
26 #include "observer_server_actions.h"
27 #include "observer_server_state.h"
28 #include "observer_trans.h"
29 #include "plugin.h"
30 #include "plugin_log.h"
31 #include "pipeline_stats.h"
32 
33 using std::string;
34 
35 /* Plugin generic fields */
36 
37 static MYSQL_PLUGIN plugin_info_ptr= NULL;
38 unsigned int plugin_version= 0;
39 
40 //The plugin running flag and lock
41 static mysql_mutex_t plugin_running_mutex;
42 int32 group_replication_running= 0;
43 int32 group_replication_stopping= 0;
44 bool wait_on_engine_initialization= false;
45 bool server_shutdown_status= false;
46 bool plugin_is_auto_starting= false;
47 bool plugin_is_waiting_to_set_server_read_mode= false;
48 bool plugin_is_being_uninstalled= false;
49 
50 /* Plugin modules */
51 //The plugin applier
52 Applier_module *applier_module= NULL;
53 //The plugin recovery module
54 Recovery_module *recovery_module= NULL;
55 //The plugin group communication module
56 Gcs_operations *gcs_module= NULL;
57 //The channel observation module
58 Channel_observation_manager *channel_observation_manager= NULL;
59 //The Single primary channel observation module
60 Asynchronous_channels_state_observer
61     *asynchronous_channels_state_observer= NULL;
62 //Lock to check if the plugin is running or not.
63 Checkable_rwlock *plugin_stop_lock;
64 //Class to coordinate access to the plugin stop lock
65 Shared_writelock *shared_plugin_stop_lock;
66 //Initialization thread for server starts
67 Delayed_initialization_thread *delayed_initialization_thread= NULL;
68 //The transaction handler for network partitions
69 Group_partition_handling *group_partition_handler= NULL;
70 //The handler for transaction killing when an error or partition happens
71 Blocked_transaction_handler *blocked_transaction_handler= NULL;
72 
73 /* Group communication options */
74 char *local_address_var= NULL;
75 char *group_seeds_var= NULL;
76 char *force_members_var= NULL;
77 bool force_members_running= false;
78 static mysql_mutex_t force_members_running_mutex;
79 my_bool bootstrap_group_var= false;
80 ulong poll_spin_loops_var= 0;
81 ulong ssl_mode_var= 0;
82 
83 const char* ssl_mode_values[]= {
84   "DISABLED",
85   "REQUIRED",
86   "VERIFY_CA",
87   "VERIFY_IDENTITY",
88   (char*)0
89 };
90 
91 static const char *bool_type_allowed_values[]= {
92   "OFF",
93   "ON",
94   (const char*)0
95 };
96 
97 static TYPELIB plugin_bool_typelib= {
98   sizeof(bool_type_allowed_values) / sizeof(*bool_type_allowed_values) - 1, // names count
99   "",                       // type name
100   bool_type_allowed_values, // value names
101   NULL                      // count
102 };
103 
104 #define IP_WHITELIST_STR_BUFFER_LENGTH 1024
105 char *ip_whitelist_var= NULL;
106 const char *IP_WHITELIST_DEFAULT= "AUTOMATIC";
107 
108 //The plugin auto increment handler
109 Plugin_group_replication_auto_increment *auto_increment_handler= NULL;
110 Plugin_gcs_events_handler* events_handler= NULL;
111 Plugin_gcs_view_modification_notifier* view_change_notifier= NULL;
112 
113 /* Group management information */
114 Group_member_info_manager_interface *group_member_mgr= NULL;
115 Group_member_info* local_member_info= NULL;
116 
117 /*Compatibility management*/
118 Compatibility_module* compatibility_mgr= NULL;
119 
120 /* Plugin group related options */
121 const char *group_replication_plugin_name= "group_replication";
122 char *group_name_var= NULL;
123 my_bool start_group_replication_at_boot_var= true;
124 rpl_sidno group_sidno;
125 my_bool single_primary_mode_var= FALSE;
126 my_bool enforce_update_everywhere_checks_var= TRUE;
127 
128 /* Applier module related */
129 bool known_server_reset;
130 
131 //Recovery ssl options
132 
133 // Option map entries that map the different SSL options to integer
134 static const int RECOVERY_SSL_CA_OPT= 1;
135 static const int RECOVERY_SSL_CAPATH_OPT= 2;
136 static const int RECOVERY_SSL_CERT_OPT= 3;
137 static const int RECOVERY_SSL_CIPHER_OPT= 4;
138 static const int RECOVERY_SSL_KEY_OPT= 5;
139 static const int RECOVERY_SSL_CRL_OPT= 6;
140 static const int RECOVERY_SSL_CRLPATH_OPT= 7;
141 //The option map <SSL var_name, SSL var code>
142 std::map<const char*, int> recovery_ssl_opt_map;
143 
144 // SSL options
145 my_bool recovery_use_ssl_var= false;
146 char* recovery_ssl_ca_var= NULL;
147 char* recovery_ssl_capath_var= NULL;
148 char* recovery_ssl_cert_var= NULL;
149 char* recovery_ssl_cipher_var= NULL;
150 char* recovery_ssl_key_var= NULL;
151 char* recovery_ssl_crl_var= NULL;
152 char* recovery_ssl_crlpath_var= NULL;
153 my_bool recovery_ssl_verify_server_cert_var= false;
154 ulong  recovery_completion_policy_var;
155 
156 ulong recovery_retry_count_var= 0;
157 ulong recovery_reconnect_interval_var= 0;
158 
159 /* Write set extraction algorithm*/
160 int write_set_extraction_algorithm= HASH_ALGORITHM_OFF;
161 
162 /* Lower case table name */
163 uint gr_lower_case_table_names= 0;
164 
165 /* Generic components variables */
166 ulong components_stop_timeout_var= LONG_TIMEOUT;
167 
168 /* The timeout before going to error when majority becomes unreachable */
169 ulong timeout_on_unreachable_var= 0;
170 
171 /*
172  Exit state action that is executed when a server involuntarily leaves the
173  group.
174 */
175 ulong exit_state_action_var;
176 
177 /**
178   The default value for auto_increment_increment is choosen taking into
179   account the maximum usable values for each possible auto_increment_increment
180   and what is a normal group expected size.
181 */
182 #define DEFAULT_AUTO_INCREMENT_INCREMENT 7
183 #define MIN_AUTO_INCREMENT_INCREMENT 1
184 #define MAX_AUTO_INCREMENT_INCREMENT 65535
185 ulong auto_increment_increment_var= DEFAULT_AUTO_INCREMENT_INCREMENT;
186 
187 /* compression options */
188 #define DEFAULT_COMPRESSION_THRESHOLD 1000000
189 #define MAX_COMPRESSION_THRESHOLD UINT_MAX32
190 #define MIN_COMPRESSION_THRESHOLD 0
191 ulong compression_threshold_var= DEFAULT_COMPRESSION_THRESHOLD;
192 
193 /* GTID assignment block size options */
194 #define DEFAULT_GTID_ASSIGNMENT_BLOCK_SIZE 1000000
195 #define MIN_GTID_ASSIGNMENT_BLOCK_SIZE 1
196 #define MAX_GTID_ASSIGNMENT_BLOCK_SIZE GNO_END
197 ulonglong gtid_assignment_block_size_var= DEFAULT_GTID_ASSIGNMENT_BLOCK_SIZE;
198 
199 /* Flow control options */
200 ulong flow_control_mode_var= FCM_QUOTA;
201 #define DEFAULT_FLOW_CONTROL_THRESHOLD 25000
202 #define MAX_FLOW_CONTROL_THRESHOLD INT_MAX32
203 #define MIN_FLOW_CONTROL_THRESHOLD 0
204 int flow_control_certifier_threshold_var= DEFAULT_FLOW_CONTROL_THRESHOLD;
205 int flow_control_applier_threshold_var= DEFAULT_FLOW_CONTROL_THRESHOLD;
206 
207 /* Transaction size limits */
208 #define DEFAULT_TRANSACTION_SIZE_LIMIT 0
209 #define MAX_TRANSACTION_SIZE_LIMIT 2147483647
210 #define MIN_TRANSACTION_SIZE_LIMIT 0
211 ulong transaction_size_limit_base_var= DEFAULT_TRANSACTION_SIZE_LIMIT;
212 int64 transaction_size_limit_var;
213 
214 /* Member Weight limits */
215 #define DEFAULT_MEMBER_WEIGHT 50
216 #define MAX_MEMBER_WEIGHT 100
217 #define MIN_MEMBER_WEIGHT 0
218 uint member_weight_var= DEFAULT_MEMBER_WEIGHT;
219 
220 /* Downgrade options */
221 char allow_local_lower_version_join_var= 0;
222 
223 /* Allow errand transactions */
224 char allow_local_disjoint_gtids_join_var= 0;
225 
226 /* Certification latch */
227 Wait_ticket<my_thread_id> *certification_latch;
228 
229 /*
230   Internal auxiliary functions signatures.
231 */
232 static int check_group_name_string(const char *str,
233                                    bool is_var_update= false);
234 
235 static int check_recovery_ssl_string(const char *str, const char *var_name,
236                                      bool is_var_update= false);
237 
238 static int check_if_server_properly_configured();
239 
240 static bool init_group_sidno();
241 
242 static void initialize_ssl_option_map();
243 
244 int configure_group_communication(st_server_ssl_variables *ssl_variables);
245 int configure_group_member_manager(char *hostname, char *uuid,
246                                    uint port, unsigned int server_version);
247 bool check_async_channel_running_on_secondary();
248 int configure_compatibility_manager();
249 int initialize_recovery_module();
250 int configure_and_start_applier_module();
251 void initialize_asynchronous_channels_observer();
252 void initialize_group_partition_handler();
253 int start_group_communication();
254 int leave_group();
255 int terminate_plugin_modules(bool flag_stop_async_channel= false);
256 int terminate_applier_module();
257 int terminate_recovery_module();
258 void terminate_asynchronous_channels_observer();
259 
260 /*
261   Auxiliary public functions.
262 */
get_plugin_pointer()263 void *get_plugin_pointer()
264 {
265   return plugin_info_ptr;
266 }
267 
get_plugin_running_lock()268 mysql_mutex_t* get_plugin_running_lock()
269 {
270   return &plugin_running_mutex;
271 }
272 
plugin_is_group_replication_running()273 bool plugin_is_group_replication_running()
274 {
275   return my_atomic_load32(&group_replication_running);
276 }
277 
get_plugin_is_stopping()278 bool get_plugin_is_stopping()
279 {
280   return my_atomic_load32(&group_replication_stopping);
281 }
282 
plugin_group_replication_set_retrieved_certification_info(void * info)283 int plugin_group_replication_set_retrieved_certification_info(void* info)
284 {
285   return recovery_module->set_retrieved_cert_info(info);
286 }
287 
log_message(enum plugin_log_level level,const char * format,...)288 int log_message(enum plugin_log_level level, const char *format, ...)
289 {
290   va_list args;
291   char buff[1024];
292 
293   va_start(args, format);
294   my_vsnprintf(buff, sizeof(buff), format, args);
295   va_end(args);
296   return my_plugin_log_message(&plugin_info_ptr, level, buff);
297 }
298 
option_deprecation_warning(MYSQL_THD thd,const char * name)299 static void option_deprecation_warning(MYSQL_THD thd, const char* name)
300 {
301   if (thd != NULL)
302   {
303     push_warning_printf(thd, Sql_condition::SL_WARNING,
304                         ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT,
305                         ER_THD(thd, ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT),
306                         name);
307   }
308   log_message(MY_WARNING_LEVEL,
309               ER_DEFAULT(ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT),
310               name);
311 }
312 
313 /*
314   Plugin interface.
315 */
316 struct st_mysql_group_replication group_replication_descriptor=
317 {
318   MYSQL_GROUP_REPLICATION_INTERFACE_VERSION,
319   plugin_group_replication_start,
320   plugin_group_replication_stop,
321   plugin_is_group_replication_running,
322   plugin_group_replication_set_retrieved_certification_info,
323   plugin_get_connection_status,
324   plugin_get_group_members,
325   plugin_get_group_member_stats,
326   plugin_get_group_members_number,
327 };
328 
329 bool
plugin_get_connection_status(const GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS & callbacks)330 plugin_get_connection_status(
331     const GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS& callbacks)
332 {
333   char* channel_name= applier_module_channel_name;
334 
335   return get_connection_status(callbacks, group_name_var, channel_name,
336                                plugin_is_group_replication_running());
337 }
338 
339 bool
plugin_get_group_members(uint index,const GROUP_REPLICATION_GROUP_MEMBERS_CALLBACKS & callbacks)340 plugin_get_group_members(
341     uint index, const GROUP_REPLICATION_GROUP_MEMBERS_CALLBACKS& callbacks)
342 {
343   char* channel_name= applier_module_channel_name;
344 
345   return get_group_members_info(index, callbacks, group_member_mgr,
346                                 group_name_var, channel_name);
347 }
348 
plugin_get_group_members_number()349 uint plugin_get_group_members_number()
350 {
351   return group_member_mgr == NULL? 1 :
352                                     (uint)group_member_mgr
353                                                       ->get_number_of_members();
354 }
355 
356 bool
plugin_get_group_member_stats(const GROUP_REPLICATION_GROUP_MEMBER_STATS_CALLBACKS & callbacks)357 plugin_get_group_member_stats(
358     const GROUP_REPLICATION_GROUP_MEMBER_STATS_CALLBACKS& callbacks)
359 {
360   char* channel_name= applier_module_channel_name;
361 
362   return get_group_member_stats(callbacks, group_member_mgr, applier_module,
363                                 gcs_module, group_name_var, channel_name);
364 }
365 
plugin_group_replication_start()366 int plugin_group_replication_start()
367 {
368   DBUG_ENTER("plugin_group_replication_start");
369 
370   Mutex_autolock auto_lock_mutex(&plugin_running_mutex);
371 
372   DBUG_EXECUTE_IF("group_replication_wait_on_start",
373                  {
374                    const char act[]= "now signal signal.start_waiting wait_for signal.start_continue";
375                    assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
376                  });
377 
378   if (plugin_is_group_replication_running())
379     DBUG_RETURN(GROUP_REPLICATION_ALREADY_RUNNING);
380   if (check_if_server_properly_configured())
381     DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
382   if (check_group_name_string(group_name_var))
383     DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
384   if (check_recovery_ssl_string(recovery_ssl_ca_var, "ssl_ca") ||
385       check_recovery_ssl_string(recovery_ssl_capath_var, "ssl_capath") ||
386       check_recovery_ssl_string(recovery_ssl_cert_var, "ssl_cert_pointer") ||
387       check_recovery_ssl_string(recovery_ssl_cipher_var,
388                                 "ssl_cipher_pointer") ||
389       check_recovery_ssl_string(recovery_ssl_key_var, "ssl_key_pointer") ||
390       check_recovery_ssl_string(recovery_ssl_crl_var, "ssl_crl_pointer") ||
391       check_recovery_ssl_string(recovery_ssl_crlpath_var,
392                                 "ssl_crlpath_pointer"))
393     DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
394   if (!start_group_replication_at_boot_var &&
395       !server_engine_initialized())
396   {
397     log_message(MY_ERROR_LEVEL,
398                 "Unable to start Group Replication. Replication applier "
399                 "infrastructure is not initialized since the server was "
400                 "started with server_id=0. Please, restart the server "
401                 "with server_id larger than 0.");
402     DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
403   }
404   if (force_members_var != NULL &&
405       strlen(force_members_var) > 0)
406   {
407     log_message(MY_ERROR_LEVEL,
408                 "group_replication_force_members must be empty "
409                 "on group start. Current value: '%s'",
410                 force_members_var);
411     DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
412   }
413   if (init_group_sidno())
414     DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR); /* purecov: inspected */
415 
416   if (allow_local_disjoint_gtids_join_var)
417   {
418     option_deprecation_warning(current_thd,
419                                "group_replication_allow_local_disjoint_gtids_join");
420   }
421 
422   /*
423     Instantiate certification latch.
424   */
425   certification_latch= new Wait_ticket<my_thread_id>();
426 
427   // GR delayed initialization.
428   if (!server_engine_initialized())
429   {
430     wait_on_engine_initialization= true;
431     plugin_is_auto_starting= false;
432 
433     delayed_initialization_thread= new Delayed_initialization_thread();
434     if (delayed_initialization_thread->launch_initialization_thread())
435     {
436       /* purecov: begin inspected */
437       log_message(MY_ERROR_LEVEL,
438                   "It was not possible to guarantee the initialization of plugin"
439                     " structures on server start");
440       delete delayed_initialization_thread;
441       delayed_initialization_thread= NULL;
442       DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
443       /* purecov: end */
444     }
445 
446     DBUG_RETURN(0); //leave the decision for later
447   }
448 
449   DBUG_RETURN(initialize_plugin_and_join(PSESSION_DEDICATED_THREAD,
450                                          NULL));
451 }
452 
initialize_plugin_and_join(enum_plugin_con_isolation sql_api_isolation,Delayed_initialization_thread * delayed_init_thd)453 int initialize_plugin_and_join(enum_plugin_con_isolation sql_api_isolation,
454                                Delayed_initialization_thread *delayed_init_thd)
455 {
456   DBUG_ENTER("initialize_plugin_and_join");
457 
458   int error= 0;
459 
460   //Avoid unnecessary operations
461   bool enabled_super_read_only= false;
462   bool read_only_mode= false, super_read_only_mode=false;
463   bool write_set_limits_set = false;
464 
465   st_server_ssl_variables server_ssl_variables=
466     {false,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL};
467 
468   char *hostname, *uuid;
469   uint port;
470   unsigned int server_version;
471 
472   Sql_service_command_interface *sql_command_interface=
473     new Sql_service_command_interface();
474 
475   // GCS interface.
476   if ((error= gcs_module->initialize()))
477     goto err; /* purecov: inspected */
478 
479   // Setup SQL service interface.
480   if (sql_command_interface->
481     establish_session_connection(sql_api_isolation, plugin_info_ptr))
482   {
483     error =1; /* purecov: inspected */
484     goto err; /* purecov: inspected */
485   }
486 
487   if (sql_command_interface->set_interface_user(GROUPREPL_USER))
488   {
489     error =1; /* purecov: inspected */
490     goto err; /* purecov: inspected */
491   }
492 
493   get_read_mode_state(sql_command_interface, &read_only_mode,
494                       &super_read_only_mode);
495 
496   /*
497    At this point in the code, set the super_read_only mode here on the
498    server to protect recovery and version module of the Group Replication.
499    This can only be done on START command though, on installs there are
500    deadlock issues.
501   */
502   if (!plugin_is_auto_starting &&
503       enable_super_read_only_mode(sql_command_interface))
504   {
505     /* purecov: begin inspected */
506     error =1;
507     log_message(MY_ERROR_LEVEL,
508                 "Could not enable the server read only mode and guarantee a "
509                   "safe recovery execution");
510     goto err;
511     /* purecov: end */
512   }
513   enabled_super_read_only= true;
514   if (delayed_init_thd)
515     delayed_init_thd->signal_read_mode_ready();
516 
517   require_full_write_set(1);
518   set_write_set_memory_size_limit(get_transaction_size_limit());
519   write_set_limits_set = true;
520 
521   get_server_parameters(&hostname, &port, &uuid, &server_version,
522                         &server_ssl_variables);
523 
524   // Setup GCS.
525   if ((error= configure_group_communication(&server_ssl_variables)))
526   {
527     log_message(MY_ERROR_LEVEL,
528                 "Error on group communication engine initialization");
529     goto err;
530   }
531 
532   // Setup Group Member Manager.
533   if ((error= configure_group_member_manager(hostname, uuid, port,
534                                              server_version)))
535     goto err; /* purecov: inspected */
536 
537   if (check_async_channel_running_on_secondary())
538   {
539     error= 1;
540     log_message(MY_ERROR_LEVEL, "Can't start group replication on secondary"
541       " member with single primary-mode while"
542       " asynchronous replication channels are"
543       " running.");
544     goto err; /* purecov: inspected */
545   }
546 
547   configure_compatibility_manager();
548   DBUG_EXECUTE_IF("group_replication_compatibility_rule_error_major",
549                   {
550                     Member_version other_version= plugin_version + (0x010000);
551                     Member_version current_version= plugin_version;
552                     compatibility_mgr->add_incompatibility(current_version,
553                                                            other_version);
554                   };);
555   DBUG_EXECUTE_IF("group_replication_compatibility_rule_error_minor",
556                   {
557                     Member_version other_version= plugin_version;
558                     Member_version current_version= plugin_version + (0x000100);
559                     compatibility_mgr->add_incompatibility(current_version,
560                                                            other_version);
561                   };);
562   DBUG_EXECUTE_IF("group_replication_compatibility_higher_minor_version",
563                   {
564                     Member_version higher_version= plugin_version + (0x000100);
565                     compatibility_mgr->set_local_version(higher_version);
566                   };);
567   DBUG_EXECUTE_IF("group_replication_compatibility_higher_major_version",
568                   {
569                     Member_version higher_version= plugin_version + (0x010000);
570                     compatibility_mgr->set_local_version(higher_version);
571                   };);
572   DBUG_EXECUTE_IF("group_replication_compatibility_restore_version",
573                   {
574                     Member_version current_version= plugin_version;
575                     compatibility_mgr->set_local_version(current_version);
576                   };);
577 
578   // need to be initialized before applier, is called on kill_pending_transactions
579   blocked_transaction_handler= new Blocked_transaction_handler();
580 
581   if ((error= initialize_recovery_module()))
582     goto err; /* purecov: inspected */
583 
584   //we can only start the applier if the log has been initialized
585   if (configure_and_start_applier_module())
586   {
587     error= GROUP_REPLICATION_REPLICATION_APPLIER_INIT_ERROR;
588     goto err;
589   }
590 
591   initialize_asynchronous_channels_observer();
592   initialize_group_partition_handler();
593 
594   DBUG_EXECUTE_IF("group_replication_before_joining_the_group",
595                   {
596                     const char act[]= "now wait_for signal.continue_group_join";
597                     assert(!debug_sync_set_action(current_thd,
598                                                   STRING_WITH_LEN(act)));
599                   });
600 
601   if ((error= start_group_communication()))
602   {
603     log_message(MY_ERROR_LEVEL,
604                 "Error on group communication engine start");
605     goto err;
606   }
607 
608   if (view_change_notifier->wait_for_view_modification())
609   {
610     if (!view_change_notifier->is_cancelled())
611     {
612       //Only log a error when a view modification was not cancelled.
613       log_message(MY_ERROR_LEVEL,
614                   "Timeout on wait for view after joining group");
615     }
616     error= view_change_notifier->get_error();
617     goto err;
618   }
619   my_atomic_store32(&group_replication_running, 1);
620   my_atomic_store32(&group_replication_stopping, 0);
621   log_primary_member_details();
622 
623 err:
624   if (error)
625   {
626     //Unblock the possible stuck delayed thread
627     if (delayed_init_thd)
628       delayed_init_thd->signal_read_mode_ready();
629     leave_group();
630     terminate_plugin_modules();
631 
632     if (write_set_limits_set) {
633       // Remove server constraints on write set collection
634       update_write_set_memory_size_limit(0);
635       require_full_write_set(0);
636     }
637 
638     if (!server_shutdown_status && server_engine_initialized()
639         && enabled_super_read_only)
640     {
641       set_read_mode_state(sql_command_interface, read_only_mode,
642                           super_read_only_mode);
643     }
644 
645     if (certification_latch != NULL)
646     {
647       delete certification_latch; /* purecov: inspected */
648       certification_latch= NULL;  /* purecov: inspected */
649     }
650   }
651 
652   delete sql_command_interface;
653   plugin_is_auto_starting= false;
654 
655   DBUG_RETURN(error);
656 }
657 
configure_group_member_manager(char * hostname,char * uuid,uint port,unsigned int server_version)658 int configure_group_member_manager(char *hostname, char *uuid,
659                                    uint port, unsigned int server_version)
660 {
661   DBUG_ENTER("configure_group_member_manager");
662 
663   /*
664     Ensure that group communication interfaces are initialized
665     and ready to use, since plugin can leave the group on errors
666     but continue to be active.
667   */
668   std::string gcs_local_member_identifier;
669   if (gcs_module->get_local_member_identifier(gcs_local_member_identifier))
670   {
671     /* purecov: begin inspected */
672     log_message(MY_ERROR_LEVEL, "Error calling group communication interfaces");
673     DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR);
674     /* purecov: end */
675   }
676 
677   if (!strcmp(uuid, group_name_var))
678   {
679     log_message(MY_ERROR_LEVEL,
680                 "Member server_uuid is incompatible with the group. "
681                 "Server_uuid %s matches group_name %s.", uuid, group_name_var);
682     DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
683   }
684   //Configure Group Member Manager
685   plugin_version= server_version;
686 
687   uint32 local_version= plugin_version;
688   DBUG_EXECUTE_IF("group_replication_compatibility_higher_patch_version",
689                   {
690                     local_version= plugin_version + (0x000001);
691                   };);
692   DBUG_EXECUTE_IF("group_replication_compatibility_higher_minor_version",
693                   {
694                     local_version= plugin_version + (0x000100);
695                   };);
696   DBUG_EXECUTE_IF("group_replication_compatibility_higher_major_version",
697                   {
698                     local_version= plugin_version + (0x010000);
699                   };);
700   Member_version local_member_plugin_version(local_version);
701 
702   DBUG_EXECUTE_IF("group_replication_force_member_uuid",
703                   {
704                     uuid= const_cast<char*>("cccccccc-cccc-cccc-cccc-cccccccccccc");
705                   };);
706   delete local_member_info;
707   local_member_info= new Group_member_info(hostname,
708                                            port,
709                                            uuid,
710                                            write_set_extraction_algorithm,
711                                            gcs_local_member_identifier,
712                                            Group_member_info::MEMBER_OFFLINE,
713                                            local_member_plugin_version,
714                                            gtid_assignment_block_size_var,
715                                            Group_member_info::MEMBER_ROLE_SECONDARY,
716                                            single_primary_mode_var,
717                                            enforce_update_everywhere_checks_var,
718                                            member_weight_var,
719                                            gr_lower_case_table_names);
720 
721   //Create the membership info visible for the group
722   delete group_member_mgr;
723   group_member_mgr= new Group_member_info_manager(local_member_info);
724 
725   log_message(MY_INFORMATION_LEVEL,
726               "Member configuration: "
727               "member_id: %lu; "
728               "member_uuid: \"%s\"; "
729               "single-primary mode: \"%s\"; "
730               "group_replication_auto_increment_increment: %lu; ",
731               get_server_id(),
732               (local_member_info != NULL) ? local_member_info->get_uuid().c_str() : "NULL",
733               single_primary_mode_var ? "true" : "false",
734               auto_increment_increment_var);
735 
736   DBUG_RETURN(0);
737 }
738 
init_compatibility_manager()739 void init_compatibility_manager()
740 {
741   if (compatibility_mgr != NULL)
742   {
743     delete compatibility_mgr; /* purecov: inspected */
744   }
745 
746   compatibility_mgr= new Compatibility_module();
747 }
748 
749 
configure_compatibility_manager()750 int configure_compatibility_manager()
751 {
752   Member_version local_member_version(plugin_version);
753   compatibility_mgr->set_local_version(local_member_version);
754 
755   /*
756    If needed.. configure here static rules of incompatibility.
757 
758    Example:
759      Member_version local_member_version(plugin_version);
760      Member_version remote_member_version(0x080001);
761      compatibility_mgr->add_incompatibility(local_member_version,
762                                             remote_member_version);
763 
764      Member_version local_member_version(plugin_version);
765      Member_version remote_member_min_version(0x080000);
766      Member_version remote_member_max_version(0x080005);
767      compatibility_mgr->add_incompatibility(local_member_version,
768                                             remote_member_min_version,
769                                             remote_member_max_version);
770    */
771 
772   return 0;
773 }
774 
leave_group()775 int leave_group()
776 {
777   if (gcs_module->belongs_to_group())
778   {
779     view_change_notifier->start_view_modification();
780 
781     Gcs_operations::enum_leave_state state= gcs_module->leave();
782 
783     std::stringstream ss;
784     plugin_log_level log_severity= MY_WARNING_LEVEL;
785     switch (state)
786     {
787       case Gcs_operations::ERROR_WHEN_LEAVING:
788         /* purecov: begin inspected */
789         ss << "Unable to confirm whether the server has left the group or not. "
790               "Check performance_schema.replication_group_members to check group membership information.";
791         log_severity= MY_ERROR_LEVEL;
792         break;
793         /* purecov: end */
794       case Gcs_operations::ALREADY_LEAVING:
795         ss << "Skipping leave operation: concurrent attempt to leave the group is on-going.";
796         break;
797       case Gcs_operations::ALREADY_LEFT:
798         /* purecov: begin inspected */
799         ss << "Skipping leave operation: member already left the group.";
800         break;
801         /* purecov: end */
802       case Gcs_operations::NOW_LEAVING:
803         goto bypass_message;
804     }
805     log_message(log_severity, ss.str().c_str());
806 bypass_message:
807     //Wait anyway
808     log_message(MY_INFORMATION_LEVEL, "Going to wait for view modification");
809     if (view_change_notifier->wait_for_view_modification())
810     {
811       log_message(MY_WARNING_LEVEL,
812                   "On shutdown there was a timeout receiving a view change. "
813                   "This can lead to a possible inconsistent state. "
814                   "Check the log for more details");
815     }
816   }
817   else
818   {
819     /*
820       Even when we do not belong to the group we invoke leave()
821       to prevent the following situation:
822        1) Server joins group;
823        2) Server leaves group before receiving the view on which
824           it joined the group.
825       If we do not leave preemptively, the server will only leave
826       the group when the communication layer failure detector
827       detects that it left.
828     */
829     log_message(MY_INFORMATION_LEVEL,
830                 "Requesting to leave the group despite of not "
831                 "being a member");
832     gcs_module->leave();
833   }
834 
835   // Finalize GCS.
836   gcs_module->finalize();
837 
838   if (auto_increment_handler != NULL)
839   {
840     auto_increment_handler->reset_auto_increment_variables();
841   }
842 
843   // Destroy handlers and notifiers
844   delete events_handler;
845   events_handler= NULL;
846 
847   return 0;
848 }
849 
plugin_group_replication_stop()850 int plugin_group_replication_stop()
851 {
852   DBUG_ENTER("plugin_group_replication_stop");
853 
854   Mutex_autolock auto_lock_mutex(&plugin_running_mutex);
855 
856   DBUG_EXECUTE_IF("group_replication_wait_on_stop",
857                  {
858                    const char act[]= "now signal signal.stop_waiting wait_for signal.stop_continue";
859                    assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
860                  });
861 
862   /*
863     We delete the delayed initialization object here because:
864 
865     1) It is invoked even if the plugin is stopped as failed starts may still
866     leave the class instantiated. This way, either the stop command or the
867     deinit process that calls this method will always clean this class
868 
869     2) Its use is on before_handle_connection, meaning no stop command can be
870     made before that. This makes this delete safe under the plugin running
871     mutex.
872   */
873   if (delayed_initialization_thread != NULL)
874   {
875     wait_on_engine_initialization= false;
876     delayed_initialization_thread->signal_thread_ready();
877     delayed_initialization_thread->wait_for_thread_end();
878     delete delayed_initialization_thread;
879     delayed_initialization_thread= NULL;
880   }
881 
882   if (!plugin_is_group_replication_running())
883   {
884     DBUG_RETURN(0);
885   }
886 
887   my_atomic_store32(&group_replication_stopping, 1);
888 
889   shared_plugin_stop_lock->grab_write_lock();
890   log_message(MY_INFORMATION_LEVEL,
891               "Plugin 'group_replication' is stopping.");
892 
893   plugin_is_waiting_to_set_server_read_mode= true;
894 
895   DBUG_EXECUTE_IF("group_replication_hold_stop_before_leave_the_group", {
896     const char act[] =
897         "now signal signal.stopping_before_leave_the_group "
898         "wait_for signal.resume_stop_before_leave_the_group";
899     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
900   });
901 
902   // wait for all transactions waiting for certification
903   bool timeout=
904       certification_latch->block_until_empty(TRANSACTION_KILL_TIMEOUT);
905   if (timeout)
906   {
907     //if they are blocked, kill them
908     blocked_transaction_handler->unblock_waiting_transactions();
909   }
910 
911   /* first leave all joined groups (currently one) */
912   leave_group();
913 
914   int error= terminate_plugin_modules(true);
915 
916   my_atomic_store32(&group_replication_running, 0);
917   shared_plugin_stop_lock->release_write_lock();
918   log_message(MY_INFORMATION_LEVEL,
919               "Plugin 'group_replication' has been stopped.");
920 
921   // Enable super_read_only.
922   if (!server_shutdown_status &&
923       !plugin_is_being_uninstalled &&
924       server_engine_initialized())
925   {
926     if (enable_server_read_mode(PSESSION_DEDICATED_THREAD))
927     {
928       log_message(MY_ERROR_LEVEL,
929                   "On plugin shutdown it was not possible to enable the "
930                   "server read only mode. Local transactions will be accepted "
931                   "and committed."); /* purecov: inspected */
932     }
933     plugin_is_waiting_to_set_server_read_mode= false;
934   }
935 
936   // Remove server constraints on write set collection
937   update_write_set_memory_size_limit(0);
938   require_full_write_set(0);
939 
940   DBUG_RETURN(error);
941 }
942 
terminate_plugin_modules(bool flag_stop_async_channel)943 int terminate_plugin_modules(bool flag_stop_async_channel)
944 {
945 
946   if(terminate_recovery_module())
947   {
948     //Do not throw an error since recovery is not vital, but warn either way
949     log_message(MY_WARNING_LEVEL,
950                 "On shutdown there was a timeout on the Group Replication "
951                 "recovery module termination. Check the log for more details"); /* purecov: inspected */
952   }
953 
954   DBUG_EXECUTE_IF("group_replication_after_recovery_module_terminated",
955                  {
956                    const char act[]= "now wait_for signal.termination_continue";
957                    assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
958                  });
959 
960   /*
961     The applier is only shutdown after the communication layer to avoid
962     messages being delivered in the current view, but not applied
963   */
964   int error= 0;
965   if((error= terminate_applier_module()))
966   {
967     log_message(MY_ERROR_LEVEL,
968                 "On shutdown there was a timeout on the Group Replication"
969                 " applier termination.");
970   }
971 
972   terminate_asynchronous_channels_observer();
973 
974   if (flag_stop_async_channel)
975   {
976     int channel_err= channel_stop_all(CHANNEL_APPLIER_THREAD|CHANNEL_RECEIVER_THREAD,
977                             components_stop_timeout_var);
978     if (channel_err)
979     {
980       log_message(MY_ERROR_LEVEL,
981                   "Error stopping all replication channels while server was"
982                   " leaving the group. Please check the error log for "
983                   "additional details. Got error: %d", channel_err);
984       if (!error)
985       {
986         error= GROUP_REPLICATION_CONFIGURATION_ERROR;
987       }
988     }
989   }
990 
991   delete group_partition_handler;
992   group_partition_handler= NULL;
993 
994   delete blocked_transaction_handler;
995   blocked_transaction_handler= NULL;
996 
997   /*
998     Destroy certification latch.
999   */
1000   if (certification_latch != NULL)
1001   {
1002     delete certification_latch;
1003     certification_latch= NULL;
1004   }
1005 
1006   /*
1007     Clear server sessions opened caches on transactions observer.
1008   */
1009   observer_trans_clear_io_cache_unused_list();
1010 
1011   if (group_member_mgr != NULL && local_member_info != NULL)
1012   {
1013     group_member_mgr->update_member_status(local_member_info->get_uuid(),
1014                                            Group_member_info::MEMBER_OFFLINE);
1015   }
1016 
1017   return error;
1018 }
1019 
plugin_group_replication_init(MYSQL_PLUGIN plugin_info)1020 int plugin_group_replication_init(MYSQL_PLUGIN plugin_info)
1021 {
1022   // Reset plugin local variables.
1023   my_atomic_store32(&group_replication_running, 0);
1024   my_atomic_store32(&group_replication_stopping, 0);
1025   plugin_is_being_uninstalled= false;
1026   plugin_is_waiting_to_set_server_read_mode= false;
1027 
1028   // Register all PSI keys at the time plugin init
1029 #ifdef HAVE_PSI_INTERFACE
1030   register_all_group_replication_psi_keys();
1031 #endif /* HAVE_PSI_INTERFACE */
1032 
1033   mysql_mutex_init(key_GR_LOCK_plugin_running, &plugin_running_mutex,
1034                    MY_MUTEX_INIT_FAST);
1035   mysql_mutex_init(key_GR_LOCK_force_members_running,
1036                    &force_members_running_mutex,
1037                    MY_MUTEX_INIT_FAST);
1038 
1039   plugin_stop_lock= new Checkable_rwlock(
1040 #ifdef HAVE_PSI_INTERFACE
1041                                          key_GR_RWLOCK_plugin_stop
1042 #endif /* HAVE_PSI_INTERFACE */
1043                                         );
1044 
1045   shared_plugin_stop_lock= new Shared_writelock(plugin_stop_lock);
1046 
1047   //Initialize transactions observer structures
1048   observer_trans_initialize();
1049 
1050   plugin_info_ptr= plugin_info;
1051 
1052   if (group_replication_init())
1053   {
1054     /* purecov: begin inspected */
1055     log_message(MY_ERROR_LEVEL,
1056                 "Failure during Group Replication handler initialization");
1057     return 1;
1058     /* purecov: end */
1059   }
1060 
1061   if(register_server_state_observer(&server_state_observer,
1062                                     (void *)plugin_info_ptr))
1063   {
1064     /* purecov: begin inspected */
1065     log_message(MY_ERROR_LEVEL,
1066                 "Failure when registering the server state observers");
1067     return 1;
1068     /* purecov: end */
1069   }
1070 
1071   if (register_trans_observer(&trans_observer, (void *)plugin_info_ptr))
1072   {
1073     /* purecov: begin inspected */
1074     log_message(MY_ERROR_LEVEL,
1075                 "Failure when registering the transactions state observers");
1076     return 1;
1077     /* purecov: end */
1078   }
1079 
1080   if (register_binlog_transmit_observer(&binlog_transmit_observer,
1081                                         (void *)plugin_info_ptr))
1082   {
1083     /* purecov: begin inspected */
1084     log_message(MY_ERROR_LEVEL,
1085                 "Failure when registering the binlog state observers");
1086     return 1;
1087     /* purecov: end */
1088   }
1089 
1090   //Initialize the recovery SSL option map
1091   initialize_ssl_option_map();
1092 
1093   //Initialize channel observation and auto increment handlers before start
1094   auto_increment_handler= new Plugin_group_replication_auto_increment();
1095   channel_observation_manager= new Channel_observation_manager(plugin_info);
1096   view_change_notifier= new Plugin_gcs_view_modification_notifier();
1097   gcs_module= new Gcs_operations();
1098 
1099   //Initialize the compatibility module before starting
1100   init_compatibility_manager();
1101 
1102   // Set the atomic var to the value of the base plugin variable
1103   transaction_size_limit_var = transaction_size_limit_base_var;
1104 
1105   plugin_is_auto_starting= start_group_replication_at_boot_var;
1106   if (start_group_replication_at_boot_var && plugin_group_replication_start())
1107   {
1108     log_message(MY_ERROR_LEVEL,
1109                 "Unable to start Group Replication on boot");
1110   }
1111 
1112   return 0;
1113 }
1114 
plugin_group_replication_deinit(void * p)1115 int plugin_group_replication_deinit(void *p)
1116 {
1117   // If plugin was not initialized, there is nothing to do here.
1118   if (plugin_info_ptr == NULL)
1119     return 0;
1120 
1121   plugin_is_being_uninstalled= true;
1122   my_atomic_store32(&group_replication_stopping, 1);
1123   int observer_unregister_error= 0;
1124 
1125   if (plugin_group_replication_stop())
1126     log_message(MY_ERROR_LEVEL,
1127                 "Failure when stopping Group Replication on plugin uninstall");
1128 
1129   if (group_member_mgr != NULL)
1130   {
1131     delete group_member_mgr;
1132     group_member_mgr= NULL;
1133   }
1134 
1135   if (local_member_info != NULL)
1136   {
1137     delete local_member_info;
1138     local_member_info= NULL;
1139   }
1140 
1141   if (compatibility_mgr != NULL)
1142   {
1143     delete compatibility_mgr;
1144     compatibility_mgr= NULL;
1145   }
1146 
1147   if (unregister_server_state_observer(&server_state_observer, p))
1148   {
1149     log_message(MY_ERROR_LEVEL,
1150                 "Failure when unregistering the server state observers");
1151     observer_unregister_error++;
1152   }
1153 
1154   if (unregister_trans_observer(&trans_observer, p))
1155   {
1156     log_message(MY_ERROR_LEVEL,
1157                 "Failure when unregistering the transactions state observers");
1158     observer_unregister_error++;
1159   }
1160 
1161   if (unregister_binlog_transmit_observer(&binlog_transmit_observer, p))
1162   {
1163     log_message(MY_ERROR_LEVEL,
1164                 "Failure when unregistering the binlog state observers");
1165     observer_unregister_error++;
1166   }
1167 
1168   if (observer_unregister_error == 0)
1169     log_message(MY_INFORMATION_LEVEL,
1170                 "All Group Replication server observers"
1171                 " have been successfully unregistered");
1172 
1173   if (channel_observation_manager != NULL)
1174   {
1175     delete channel_observation_manager;
1176     channel_observation_manager= NULL;
1177   }
1178 
1179   delete gcs_module;
1180   gcs_module= NULL;
1181 
1182   delete view_change_notifier;
1183   view_change_notifier= NULL;
1184 
1185   if(auto_increment_handler != NULL)
1186   {
1187     delete auto_increment_handler;
1188     auto_increment_handler= NULL;
1189   }
1190 
1191   mysql_mutex_destroy(&plugin_running_mutex);
1192   mysql_mutex_destroy(&force_members_running_mutex);
1193 
1194   delete shared_plugin_stop_lock;
1195   shared_plugin_stop_lock= NULL;
1196   delete plugin_stop_lock;
1197   plugin_stop_lock= NULL;
1198 
1199   //Terminate transactions observer structures
1200   observer_trans_terminate();
1201 
1202   plugin_info_ptr= NULL;
1203 
1204   return observer_unregister_error;
1205 }
1206 
init_group_sidno()1207 static bool init_group_sidno()
1208 {
1209   DBUG_ENTER("init_group_sidno");
1210   rpl_sid group_sid;
1211 
1212   if (group_sid.parse(group_name_var) != RETURN_STATUS_OK)
1213   {
1214     /* purecov: begin inspected */
1215     log_message(MY_ERROR_LEVEL, "Unable to parse the group name.");
1216     DBUG_RETURN(true);
1217     /* purecov: end */
1218   }
1219 
1220   group_sidno = get_sidno_from_global_sid_map(group_sid);
1221   if (group_sidno <= 0)
1222   {
1223     /* purecov: begin inspected */
1224     log_message(MY_ERROR_LEVEL, "Unable to generate the sidno for the group.");
1225     DBUG_RETURN(true);
1226     /* purecov: end */
1227   }
1228 
1229   DBUG_RETURN(false);
1230 }
1231 
configure_and_start_applier_module()1232 int configure_and_start_applier_module()
1233 {
1234   DBUG_ENTER("configure_and_start_applier_module");
1235 
1236   int error= 0;
1237 
1238   //The applier did not stop properly or suffered a configuration error
1239   if (applier_module != NULL)
1240   {
1241     if ((error= applier_module->is_running())) //it is still running?
1242     {
1243       log_message(MY_ERROR_LEVEL,
1244                   "Cannot start the Group Replication applier as a previous "
1245                   "shutdown is still running: "
1246                   "The thread will stop once its task is complete.");
1247       DBUG_RETURN(error);
1248     }
1249     else
1250     {
1251       //clean a possible existent pipeline
1252       applier_module->terminate_applier_pipeline();
1253       //delete it and create from scratch
1254       delete applier_module;
1255     }
1256   }
1257 
1258   applier_module= new Applier_module();
1259 
1260   recovery_module->set_applier_module(applier_module);
1261 
1262   //For now, only defined pipelines are accepted.
1263   error=
1264     applier_module->setup_applier_module(STANDARD_GROUP_REPLICATION_PIPELINE,
1265                                          known_server_reset,
1266                                          components_stop_timeout_var,
1267                                          group_sidno,
1268                                          gtid_assignment_block_size_var,
1269                                          shared_plugin_stop_lock);
1270   if (error)
1271   {
1272     //Delete the possible existing pipeline
1273     applier_module->terminate_applier_pipeline();
1274     delete applier_module;
1275     applier_module= NULL;
1276     DBUG_RETURN(error);
1277   }
1278 
1279   known_server_reset= false;
1280 
1281   if ((error= applier_module->initialize_applier_thread()))
1282   {
1283     log_message(MY_ERROR_LEVEL,
1284                 "Unable to initialize the Group Replication applier module.");
1285     //terminate the applier_thread if running
1286     if (!applier_module->terminate_applier_thread())
1287     {
1288       delete applier_module;
1289       applier_module= NULL;
1290     }
1291   }
1292   else
1293     log_message(MY_INFORMATION_LEVEL,
1294                 "Group Replication applier module successfully initialized!");
1295 
1296   DBUG_RETURN(error);
1297 }
1298 
initialize_group_partition_handler()1299 void initialize_group_partition_handler()
1300 {
1301   group_partition_handler=
1302       new Group_partition_handling(shared_plugin_stop_lock,
1303                                    timeout_on_unreachable_var);
1304 }
1305 
terminate_applier_module()1306 int terminate_applier_module()
1307 {
1308 
1309   int error= 0;
1310   if (applier_module != NULL)
1311   {
1312     if (!applier_module->terminate_applier_thread()) //all goes fine
1313     {
1314       delete applier_module;
1315       applier_module= NULL;
1316     }
1317     else
1318     {
1319       error= GROUP_REPLICATION_APPLIER_STOP_TIMEOUT;
1320     }
1321   }
1322   return error;
1323 }
1324 
configure_group_communication(st_server_ssl_variables * ssl_variables)1325 int configure_group_communication(st_server_ssl_variables *ssl_variables)
1326 {
1327   DBUG_ENTER("configure_group_communication");
1328 
1329   // GCS interface parameters.
1330   Gcs_interface_parameters gcs_module_parameters;
1331   gcs_module_parameters.add_parameter("group_name",
1332                                       std::string(group_name_var));
1333   if (local_address_var != NULL)
1334     gcs_module_parameters.add_parameter("local_node",
1335                                         std::string(local_address_var));
1336   if (group_seeds_var != NULL)
1337     gcs_module_parameters.add_parameter("peer_nodes",
1338                                         std::string(group_seeds_var));
1339   const std::string bootstrap_group_string=
1340       bootstrap_group_var ? "true" : "false";
1341   gcs_module_parameters.add_parameter("bootstrap_group", bootstrap_group_string);
1342   std::stringstream poll_spin_loops_stream_buffer;
1343   poll_spin_loops_stream_buffer << poll_spin_loops_var;
1344   gcs_module_parameters.add_parameter("poll_spin_loops",
1345                                       poll_spin_loops_stream_buffer.str());
1346 
1347   // Compression parameter
1348   if (compression_threshold_var > 0)
1349   {
1350     std::stringstream ss;
1351     ss << compression_threshold_var;
1352     gcs_module_parameters.add_parameter("compression", std::string("on"));
1353     gcs_module_parameters.add_parameter("compression_threshold", ss.str());
1354   }
1355   else
1356   {
1357     gcs_module_parameters.add_parameter("compression", std::string("off")); /* purecov: inspected */
1358   }
1359 
1360   // SSL parameters.
1361   std::string ssl_mode(ssl_mode_values[ssl_mode_var]);
1362   if (ssl_mode_var > 0)
1363   {
1364     std::string ssl_key(ssl_variables->ssl_key ? ssl_variables->ssl_key : "");
1365     std::string ssl_cert(ssl_variables->ssl_cert ? ssl_variables->ssl_cert : "");
1366     std::string ssl_ca(ssl_variables->ssl_ca ? ssl_variables->ssl_ca : "");
1367     std::string ssl_capath(ssl_variables->ssl_capath ? ssl_variables->ssl_capath : "");
1368     std::string ssl_cipher(ssl_variables->ssl_cipher ? ssl_variables->ssl_cipher : "");
1369     std::string ssl_crl(ssl_variables->ssl_crl ? ssl_variables->ssl_crl : "");
1370     std::string ssl_crlpath(ssl_variables->ssl_crlpath ? ssl_variables->ssl_crlpath : "");
1371     std::string tls_version(ssl_variables->tls_version? ssl_variables->tls_version : "");
1372 
1373     // SSL support on server.
1374     if (ssl_variables->have_ssl_opt)
1375     {
1376       gcs_module_parameters.add_parameter("ssl_mode", ssl_mode);
1377       gcs_module_parameters.add_parameter("server_key_file", ssl_key);
1378       gcs_module_parameters.add_parameter("server_cert_file", ssl_cert);
1379       gcs_module_parameters.add_parameter("client_key_file", ssl_key);
1380       gcs_module_parameters.add_parameter("client_cert_file", ssl_cert);
1381       gcs_module_parameters.add_parameter("ca_file", ssl_ca);
1382       if (!ssl_capath.empty())
1383         gcs_module_parameters.add_parameter("ca_path", ssl_capath); /* purecov: inspected */
1384       gcs_module_parameters.add_parameter("cipher", ssl_cipher);
1385       gcs_module_parameters.add_parameter("tls_version", tls_version);
1386 
1387       if (!ssl_crl.empty())
1388         gcs_module_parameters.add_parameter("crl_file", ssl_crl); /* purecov: inspected */
1389       if (!ssl_crlpath.empty())
1390         gcs_module_parameters.add_parameter("crl_path", ssl_crlpath); /* purecov: inspected */
1391 
1392       log_message(MY_INFORMATION_LEVEL,
1393                   "Group communication SSL configuration: "
1394                   "group_replication_ssl_mode: \"%s\"; "
1395                   "server_key_file: \"%s\"; "
1396                   "server_cert_file: \"%s\"; "
1397                   "client_key_file: \"%s\"; "
1398                   "client_cert_file: \"%s\"; "
1399                   "ca_file: \"%s\"; "
1400                   "ca_path: \"%s\"; "
1401                   "cipher: \"%s\"; "
1402                   "tls_version: \"%s\"; "
1403                   "crl_file: \"%s\"; "
1404                   "crl_path: \"%s\"",
1405                   ssl_mode.c_str(), ssl_key.c_str(), ssl_cert.c_str(),
1406                   ssl_key.c_str(), ssl_cert.c_str(), ssl_ca.c_str(),
1407                   ssl_capath.c_str(), ssl_cipher.c_str(), tls_version.c_str(),
1408                   ssl_crl.c_str(), ssl_crlpath.c_str());
1409     }
1410     // No SSL support on server.
1411     else
1412     {
1413       /* purecov: begin inspected */
1414       log_message(MY_ERROR_LEVEL,
1415                   "MySQL server does not have SSL support and "
1416                   "group_replication_ssl_mode is \"%s\", START "
1417                   "GROUP_REPLICATION will abort", ssl_mode.c_str());
1418       DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR);
1419       /* purecov: end */
1420     }
1421   }
1422   // GCS SSL disabled.
1423   else
1424   {
1425     gcs_module_parameters.add_parameter("ssl_mode", ssl_mode);
1426     log_message(MY_INFORMATION_LEVEL,
1427                 "Group communication SSL configuration: "
1428                 "group_replication_ssl_mode: \"%s\"", ssl_mode.c_str());
1429   }
1430 
1431   if (ip_whitelist_var != NULL)
1432   {
1433     std::string v(ip_whitelist_var);
1434     v.erase(std::remove(v.begin(), v.end(), ' '), v.end());
1435     std::transform(v.begin(), v.end(), v.begin(), ::tolower);
1436 
1437     // if the user specified a list other than automatic
1438     // then we need to pass it to the GCS, otherwise we
1439     // do nothing and let GCS scan for the proper IPs
1440     if (v.find("automatic") == std::string::npos)
1441     {
1442       gcs_module_parameters.add_parameter("ip_whitelist",
1443                                           std::string(ip_whitelist_var));
1444     }
1445   }
1446 
1447   // Configure GCS.
1448   if (gcs_module->configure(gcs_module_parameters))
1449   {
1450     log_message(MY_ERROR_LEVEL,
1451                 "Unable to initialize the group communication engine");
1452     DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR);
1453   }
1454   log_message(MY_INFORMATION_LEVEL,
1455               "Initialized group communication with configuration: "
1456               "group_replication_group_name: \"%s\"; "
1457               "group_replication_local_address: \"%s\"; "
1458               "group_replication_group_seeds: \"%s\"; "
1459               "group_replication_bootstrap_group: %s; "
1460               "group_replication_poll_spin_loops: %lu; "
1461               "group_replication_compression_threshold: %lu; "
1462               "group_replication_ip_whitelist: \"%s\"",
1463               group_name_var, local_address_var, group_seeds_var,
1464               bootstrap_group_var ? "true" : "false",
1465               poll_spin_loops_var, compression_threshold_var,
1466               ip_whitelist_var);
1467 
1468   DBUG_RETURN(0);
1469 }
1470 
start_group_communication()1471 int start_group_communication()
1472 {
1473   DBUG_ENTER("start_group_communication");
1474 
1475   if (auto_increment_handler != NULL)
1476   {
1477     auto_increment_handler->
1478       set_auto_increment_variables(auto_increment_increment_var,
1479                                    get_server_id());
1480   }
1481 
1482   events_handler= new Plugin_gcs_events_handler(applier_module,
1483                                                 recovery_module,
1484                                                 view_change_notifier,
1485                                                 compatibility_mgr,
1486                                                 components_stop_timeout_var);
1487 
1488   view_change_notifier->start_view_modification();
1489 
1490   if (gcs_module->join(*events_handler, *events_handler))
1491     DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_JOIN_ERROR);
1492 
1493   DBUG_RETURN(0);
1494 }
1495 
check_async_channel_running_on_secondary()1496 bool check_async_channel_running_on_secondary()
1497 {
1498   /* To stop group replication to start on secondary member with single primary-
1499      mode, when any async channels are running, we verify whether member is not
1500     bootstrapping. As only when the member is bootstrapping, it can be the
1501     primary leader on a single primary member context.
1502   */
1503   if (single_primary_mode_var && !bootstrap_group_var)
1504   {
1505     if (is_any_slave_channel_running(
1506         CHANNEL_RECEIVER_THREAD | CHANNEL_APPLIER_THREAD))
1507     {
1508       return true;
1509     }
1510   }
1511 
1512   return false;
1513 }
1514 
initialize_asynchronous_channels_observer()1515 void initialize_asynchronous_channels_observer()
1516 {
1517   if (single_primary_mode_var)
1518   {
1519     asynchronous_channels_state_observer=
1520         new Asynchronous_channels_state_observer();
1521     channel_observation_manager
1522         ->register_channel_observer(asynchronous_channels_state_observer);
1523   }
1524 }
1525 
terminate_asynchronous_channels_observer()1526 void terminate_asynchronous_channels_observer()
1527 {
1528   if (asynchronous_channels_state_observer != NULL)
1529   {
1530     channel_observation_manager
1531       ->unregister_channel_observer(asynchronous_channels_state_observer);
1532     delete asynchronous_channels_state_observer;
1533     asynchronous_channels_state_observer= NULL;
1534   }
1535 }
1536 
initialize_recovery_module()1537 int initialize_recovery_module()
1538 {
1539   recovery_module = new Recovery_module(applier_module,
1540                                         channel_observation_manager,
1541                                         components_stop_timeout_var);
1542 
1543   recovery_module->set_recovery_ssl_options(recovery_use_ssl_var,
1544                                             recovery_ssl_ca_var,
1545                                             recovery_ssl_capath_var,
1546                                             recovery_ssl_cert_var,
1547                                             recovery_ssl_cipher_var,
1548                                             recovery_ssl_key_var,
1549                                             recovery_ssl_crl_var,
1550                                             recovery_ssl_crlpath_var,
1551                                             recovery_ssl_verify_server_cert_var);
1552   recovery_module->
1553       set_recovery_completion_policy(
1554           (enum_recovery_completion_policies) recovery_completion_policy_var);
1555   recovery_module->set_recovery_donor_retry_count(recovery_retry_count_var);
1556   recovery_module->
1557       set_recovery_donor_reconnect_interval(recovery_reconnect_interval_var);
1558 
1559   return 0;
1560 }
1561 
terminate_recovery_module()1562 int terminate_recovery_module()
1563 {
1564   int error= 0;
1565   if(recovery_module != NULL)
1566   {
1567     error = recovery_module->stop_recovery();
1568     delete recovery_module;
1569     recovery_module= NULL;
1570   }
1571   return error;
1572 }
1573 
server_engine_initialized()1574 bool server_engine_initialized()
1575 {
1576   //check if empty channel exists, i.e, the slave structures are initialized
1577   return channel_is_active("", CHANNEL_NO_THD);
1578 }
1579 
register_server_reset_master()1580 void register_server_reset_master(){
1581   known_server_reset= true;
1582 }
1583 
get_allow_local_lower_version_join()1584 bool get_allow_local_lower_version_join()
1585 {
1586   DBUG_ENTER("get_allow_local_lower_version_join");
1587   DBUG_RETURN(allow_local_lower_version_join_var);
1588 }
1589 
get_allow_local_disjoint_gtids_join()1590 bool get_allow_local_disjoint_gtids_join()
1591 {
1592   DBUG_ENTER("get_allow_local_disjoint_gtids_join");
1593   DBUG_RETURN(allow_local_disjoint_gtids_join_var);
1594 }
1595 
get_transaction_size_limit()1596 ulong get_transaction_size_limit()
1597 {
1598   DBUG_ENTER("get_transaction_size_limit");
1599   assert(my_atomic_load64(&transaction_size_limit_var)>=0);
1600   ulong limit = static_cast<ulong>(my_atomic_load64(&transaction_size_limit_var));
1601   DBUG_RETURN(limit);
1602 }
1603 
is_plugin_waiting_to_set_server_read_mode()1604 bool is_plugin_waiting_to_set_server_read_mode()
1605 {
1606   DBUG_ENTER("is_plugin_waiting_to_set_server_read_mode");
1607   DBUG_RETURN(plugin_is_waiting_to_set_server_read_mode);
1608 }
1609 
1610 /*
1611   This method is used to accomplish the startup validations of the plugin
1612   regarding system configuration.
1613 
1614   It currently verifies:
1615   - Binlog enabled
1616   - Binlog checksum mode
1617   - Binlog format
1618   - Gtid mode
1619   - LOG_SLAVE_UPDATES
1620   - Single primary mode configuration
1621 
1622   @return If the operation succeed or failed
1623     @retval 0 in case of success
1624     @retval 1 in case of failure
1625  */
check_if_server_properly_configured()1626 static int check_if_server_properly_configured()
1627 {
1628   DBUG_ENTER("check_if_server_properly_configured");
1629 
1630   //Struct that holds startup and runtime requirements
1631   Trans_context_info startup_pre_reqs;
1632 
1633   get_server_startup_prerequirements(startup_pre_reqs, !plugin_is_auto_starting);
1634 
1635   if(!startup_pre_reqs.binlog_enabled)
1636   {
1637     log_message(MY_ERROR_LEVEL, "Binlog must be enabled for Group Replication");
1638     DBUG_RETURN(1);
1639   }
1640 
1641   if(startup_pre_reqs.binlog_checksum_options != binary_log::BINLOG_CHECKSUM_ALG_OFF)
1642   {
1643     log_message(MY_ERROR_LEVEL, "binlog_checksum should be NONE for Group Replication");
1644     DBUG_RETURN(1);
1645   }
1646 
1647   if(startup_pre_reqs.binlog_format != BINLOG_FORMAT_ROW)
1648   {
1649     log_message(MY_ERROR_LEVEL, "Binlog format should be ROW for Group Replication");
1650     DBUG_RETURN(1);
1651   }
1652 
1653   if(startup_pre_reqs.gtid_mode != GTID_MODE_ON)
1654   {
1655     log_message(MY_ERROR_LEVEL, "Gtid mode should be ON for Group Replication");
1656     DBUG_RETURN(1);
1657   }
1658 
1659   if(startup_pre_reqs.log_slave_updates != true)
1660   {
1661     log_message(MY_ERROR_LEVEL,
1662                 "LOG_SLAVE_UPDATES should be ON for Group Replication");
1663     DBUG_RETURN(1);
1664   }
1665 
1666   if(startup_pre_reqs.transaction_write_set_extraction ==
1667      HASH_ALGORITHM_OFF)
1668   {
1669     log_message(MY_ERROR_LEVEL,
1670                 "Extraction of transaction write sets requires an hash algorithm "
1671                 "configuration. Please, double check that the parameter "
1672                 "transaction-write-set-extraction is set to a valid algorithm.");
1673     DBUG_RETURN(1);
1674   }
1675   else
1676   {
1677     write_set_extraction_algorithm=
1678        startup_pre_reqs.transaction_write_set_extraction;
1679   }
1680 
1681   if (startup_pre_reqs.mi_repository_type != 1) //INFO_REPOSITORY_TABLE
1682   {
1683     log_message(MY_ERROR_LEVEL, "Master info repository must be set to TABLE.");
1684     DBUG_RETURN(1);
1685   }
1686 
1687   if (startup_pre_reqs.rli_repository_type != 1) //INFO_REPOSITORY_TABLE
1688   {
1689     log_message(MY_ERROR_LEVEL, "Relay log info repository must be set to TABLE");
1690     DBUG_RETURN(1);
1691   }
1692 
1693   if (startup_pre_reqs.parallel_applier_workers > 0)
1694   {
1695     if (startup_pre_reqs.parallel_applier_type != CHANNEL_MTS_PARALLEL_TYPE_LOGICAL_CLOCK)
1696     {
1697       log_message(MY_ERROR_LEVEL,
1698                   "In order to use parallel applier on Group Replication, parameter "
1699                   "slave-parallel-type must be set to 'LOGICAL_CLOCK'.");
1700       DBUG_RETURN(1);
1701     }
1702 
1703     if (!startup_pre_reqs.parallel_applier_preserve_commit_order)
1704     {
1705       log_message(MY_WARNING_LEVEL,
1706                   "Group Replication requires slave-preserve-commit-order "
1707                   "to be set to ON when using more than 1 applier threads.");
1708       DBUG_RETURN(1);
1709     }
1710   }
1711 
1712   if (single_primary_mode_var && enforce_update_everywhere_checks_var)
1713   {
1714     log_message(MY_ERROR_LEVEL,
1715                 "Is is not allowed to run single primary mode with "
1716                 "'enforce_update_everywhere_checks' enabled.");
1717     DBUG_RETURN(1);
1718   }
1719 
1720   gr_lower_case_table_names= startup_pre_reqs.lower_case_table_names;
1721   assert (gr_lower_case_table_names <= 2);
1722 #ifndef NDEBUG
1723   DBUG_EXECUTE_IF("group_replication_skip_encode_lower_case_table_names",
1724                 {
1725                   gr_lower_case_table_names = SKIP_ENCODING_LOWER_CASE_TABLE_NAMES;
1726                 });
1727 #endif
1728 
1729 
1730   DBUG_RETURN(0);
1731 }
1732 
check_group_name_string(const char * str,bool is_var_update)1733 static int check_group_name_string(const char *str, bool is_var_update)
1734 {
1735   DBUG_ENTER("check_group_name_string");
1736 
1737   if (!str)
1738   {
1739     if(!is_var_update)
1740       log_message(MY_ERROR_LEVEL, "The group name option is mandatory");
1741     else
1742       my_message(ER_WRONG_VALUE_FOR_VAR,
1743                  "The group name option is mandatory",
1744                  MYF(0)); /* purecov: inspected */
1745     DBUG_RETURN(1);
1746   }
1747 
1748   if (strlen(str) > UUID_LENGTH)
1749   {
1750     if(!is_var_update)
1751       log_message(MY_ERROR_LEVEL, "The group name '%s' is not a valid UUID, its"
1752                   " length is too big", str);
1753     else
1754       my_message(ER_WRONG_VALUE_FOR_VAR,
1755                  "The group name is not a valid UUID, its length is too big",
1756                  MYF(0));
1757     DBUG_RETURN(1);
1758   }
1759 
1760   if (!Uuid::is_valid(str))
1761   {
1762     if(!is_var_update)
1763       log_message(MY_ERROR_LEVEL, "The group name '%s' is not a valid UUID", str); /* purecov: inspected */
1764     else
1765       my_message(ER_WRONG_VALUE_FOR_VAR, "The group name is not a valid UUID",
1766                  MYF(0));
1767     DBUG_RETURN(1);
1768   }
1769 
1770   DBUG_RETURN(0);
1771 }
1772 
check_group_name(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)1773 static int check_group_name(MYSQL_THD thd, SYS_VAR *var, void* save,
1774                             struct st_mysql_value *value)
1775 {
1776   DBUG_ENTER("check_group_name");
1777 
1778   char buff[NAME_CHAR_LEN];
1779   const char *str;
1780 
1781   if (plugin_is_group_replication_running())
1782   {
1783     my_message(ER_GROUP_REPLICATION_RUNNING,
1784                "The group name cannot be changed when Group Replication is running",
1785                MYF(0));
1786     DBUG_RETURN(1);
1787   }
1788 
1789   (*(const char **) save)= NULL;
1790 
1791   int length= sizeof(buff);
1792   if ((str= value->val_str(value, buff, &length)))
1793     str= thd->strmake(str, length);
1794   else
1795     DBUG_RETURN(1); /* purecov: inspected */
1796 
1797   if (check_group_name_string(str, true))
1798     DBUG_RETURN(1);
1799 
1800   *(const char**)save= str;
1801 
1802   DBUG_RETURN(0);
1803 }
1804 
1805 //Recovery module's module variable update/validate methods
1806 
update_recovery_retry_count(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1807 static void update_recovery_retry_count(MYSQL_THD thd, SYS_VAR *var,
1808                                         void *var_ptr, const void *save)
1809 {
1810   DBUG_ENTER("update_recovery_retry_count");
1811 
1812   (*(ulong*) var_ptr)= (*(ulong*) save);
1813   ulong in_val= *static_cast<const ulong*>(save);
1814 
1815   if (recovery_module != NULL)
1816   {
1817     recovery_module->set_recovery_donor_retry_count(in_val);
1818   }
1819 
1820   DBUG_VOID_RETURN;
1821 }
1822 
update_recovery_reconnect_interval(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1823 static void update_recovery_reconnect_interval(MYSQL_THD thd, SYS_VAR *var,
1824                                                void *var_ptr, const void *save)
1825 {
1826   DBUG_ENTER("update_recovery_reconnect_interval");
1827 
1828   (*(ulong*) var_ptr)= (*(ulong*) save);
1829   ulong in_val= *static_cast<const ulong*>(save);
1830 
1831   if (recovery_module != NULL)
1832   {
1833     recovery_module->
1834         set_recovery_donor_reconnect_interval(in_val);
1835   }
1836 
1837   DBUG_VOID_RETURN;
1838 }
1839 
1840 //Recovery SSL options
1841 
1842 static void
update_ssl_use(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1843 update_ssl_use(MYSQL_THD thd, SYS_VAR *var,
1844                void *var_ptr, const void *save)
1845 {
1846   DBUG_ENTER("update_ssl_use");
1847 
1848   bool use_ssl_val= *((my_bool *) save);
1849   (*(my_bool *) var_ptr)= (*(my_bool *) save);
1850 
1851   if (recovery_module != NULL)
1852   {
1853       recovery_module->set_recovery_use_ssl(use_ssl_val);
1854   }
1855 
1856   DBUG_VOID_RETURN;
1857 }
1858 
check_recovery_ssl_string(const char * str,const char * var_name,bool is_var_update)1859 static int check_recovery_ssl_string(const char *str, const char *var_name,
1860                                      bool is_var_update)
1861 {
1862   DBUG_ENTER("check_recovery_ssl_string");
1863 
1864   if (strlen(str) > FN_REFLEN)
1865   {
1866     if(!is_var_update)
1867       log_message(MY_ERROR_LEVEL,
1868                   "The given value for recovery ssl option '%s' is invalid"
1869                   " as its length is beyond the limit", var_name);
1870     else
1871       my_message(ER_WRONG_VALUE_FOR_VAR,
1872                   "The given value for recovery ssl option is invalid"
1873                   " as its length is beyond the limit",
1874                  MYF(0));
1875     DBUG_RETURN(1);
1876   }
1877 
1878   DBUG_RETURN(0);
1879 }
1880 
check_recovery_ssl_option(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)1881 static int check_recovery_ssl_option(MYSQL_THD thd, SYS_VAR *var, void* save,
1882                                      struct st_mysql_value *value)
1883 {
1884   DBUG_ENTER("check_recovery_ssl_option");
1885 
1886   char buff[STRING_BUFFER_USUAL_SIZE];
1887   const char *str= NULL;
1888 
1889   (*(const char **) save)= NULL;
1890 
1891   int length= sizeof(buff);
1892   if ((str= value->val_str(value, buff, &length)))
1893     str= thd->strmake(str, length);
1894   else
1895     DBUG_RETURN(1); /* purecov: inspected */
1896 
1897   if (str != NULL && check_recovery_ssl_string(str, var->name, true))
1898   {
1899     DBUG_RETURN(1);
1900   }
1901 
1902   *(const char**)save= str;
1903 
1904   DBUG_RETURN(0);
1905 }
1906 
update_recovery_ssl_option(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1907 static void update_recovery_ssl_option(MYSQL_THD thd, SYS_VAR *var,
1908                                        void *var_ptr, const void *save)
1909 {
1910   DBUG_ENTER("update_recovery_ssl_option");
1911 
1912 
1913   const char *new_option_val= *(const char**)save;
1914   (*(const char **) var_ptr)= (*(const char **) save);
1915 
1916   //According to the var name, get the operation code and act accordingly
1917   switch(recovery_ssl_opt_map[var->name])
1918   {
1919     case RECOVERY_SSL_CA_OPT:
1920       if (recovery_module != NULL)
1921         recovery_module->set_recovery_ssl_ca(new_option_val);
1922       break;
1923     case RECOVERY_SSL_CAPATH_OPT:
1924       if (recovery_module != NULL)
1925         recovery_module->set_recovery_ssl_capath(new_option_val);
1926       break;
1927     case RECOVERY_SSL_CERT_OPT:
1928       if (recovery_module != NULL)
1929         recovery_module->set_recovery_ssl_cert(new_option_val);
1930       break;
1931     case RECOVERY_SSL_CIPHER_OPT:
1932       if (recovery_module != NULL)
1933         recovery_module->set_recovery_ssl_cipher(new_option_val);
1934       break;
1935     case RECOVERY_SSL_KEY_OPT:
1936       if (recovery_module != NULL)
1937         recovery_module->set_recovery_ssl_key(new_option_val);
1938       break;
1939     case RECOVERY_SSL_CRL_OPT:
1940       if (recovery_module != NULL)
1941         recovery_module->set_recovery_ssl_crl(new_option_val);
1942       break;
1943     case RECOVERY_SSL_CRLPATH_OPT:
1944       if (recovery_module != NULL)
1945         recovery_module->set_recovery_ssl_crlpath(new_option_val);
1946       break;
1947     default:
1948       assert(0); /* purecov: inspected */
1949   }
1950 
1951   DBUG_VOID_RETURN;
1952 }
1953 
1954 static void
update_ssl_server_cert_verification(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1955 update_ssl_server_cert_verification(MYSQL_THD thd, SYS_VAR *var,
1956                                     void *var_ptr, const void *save)
1957 {
1958   DBUG_ENTER("update_ssl_server_cert_verification");
1959 
1960   bool ssl_verify_server_cert= *((my_bool *) save);
1961   (*(my_bool *) var_ptr)= (*(my_bool *) save);
1962 
1963   if (recovery_module != NULL)
1964   {
1965     recovery_module->
1966         set_recovery_ssl_verify_server_cert(ssl_verify_server_cert);
1967   }
1968 
1969   DBUG_VOID_RETURN;
1970 }
1971 
1972 // Recovery threshold update method
1973 
1974 static void
update_recovery_completion_policy(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1975 update_recovery_completion_policy(MYSQL_THD thd, SYS_VAR *var,
1976                                   void *var_ptr, const void *save)
1977 {
1978   DBUG_ENTER("update_recovery_completion_policy");
1979 
1980   ulong in_val= *static_cast<const ulong*>(save);
1981   (*(ulong*) var_ptr)= (*(ulong*) save);
1982 
1983   if (recovery_module != NULL)
1984   {
1985     recovery_module->
1986         set_recovery_completion_policy(
1987             (enum_recovery_completion_policies)in_val);
1988   }
1989 
1990   DBUG_VOID_RETURN;
1991 }
1992 
1993 //Component timeout update method
1994 
update_component_timeout(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)1995 static void update_component_timeout(MYSQL_THD thd, SYS_VAR *var,
1996                                      void *var_ptr, const void *save)
1997 {
1998   DBUG_ENTER("update_component_timeout");
1999 
2000   ulong in_val= *static_cast<const ulong*>(save);
2001   (*(ulong*) var_ptr)= (*(ulong*) save);
2002 
2003   if (applier_module != NULL)
2004   {
2005     applier_module->set_stop_wait_timeout(in_val);
2006   }
2007   if (recovery_module != NULL)
2008   {
2009     recovery_module->set_stop_wait_timeout(in_val);
2010   }
2011   if (events_handler != NULL)
2012   {
2013     events_handler->set_stop_wait_timeout(in_val);
2014   }
2015 
2016   DBUG_VOID_RETURN;
2017 }
2018 
check_auto_increment_increment(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2019 static int check_auto_increment_increment(MYSQL_THD thd, SYS_VAR *var,
2020                                           void* save,
2021                                           struct st_mysql_value *value)
2022 {
2023   DBUG_ENTER("check_auto_increment_increment");
2024 
2025   longlong in_val;
2026   value->val_int(value, &in_val);
2027 
2028   if (plugin_is_group_replication_running())
2029   {
2030     my_message(ER_GROUP_REPLICATION_RUNNING,
2031                "The group auto_increment_increment cannot be changed"
2032                " when Group Replication is running",
2033                MYF(0));
2034     DBUG_RETURN(1);
2035   }
2036 
2037   if (in_val > MAX_AUTO_INCREMENT_INCREMENT ||
2038       in_val < MIN_AUTO_INCREMENT_INCREMENT)
2039   {
2040     std::stringstream ss;
2041     ss << "The value " << in_val << " is not within the range of "
2042           "accepted values for the option "
2043           "group_replication_auto_increment_increment. The value "
2044           "must be between " << MIN_AUTO_INCREMENT_INCREMENT <<
2045           " and " << MAX_AUTO_INCREMENT_INCREMENT << " inclusive.";
2046     my_message(ER_WRONG_VALUE_FOR_VAR, ss.str().c_str(), MYF(0));
2047     DBUG_RETURN(1);
2048   }
2049 
2050   *(longlong*)save= in_val;
2051   DBUG_RETURN(0);
2052 }
2053 
2054 //Communication layer options.
2055 
check_ip_whitelist_preconditions(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2056 static int check_ip_whitelist_preconditions(MYSQL_THD thd, SYS_VAR *var,
2057                                             void *save,
2058                                             struct st_mysql_value *value)
2059 {
2060   DBUG_ENTER("check_ip_whitelist_preconditions");
2061 
2062   char buff[IP_WHITELIST_STR_BUFFER_LENGTH];
2063   const char *str;
2064   int length= sizeof(buff);
2065 
2066   if (plugin_is_group_replication_running())
2067   {
2068     my_message(ER_GROUP_REPLICATION_RUNNING,
2069                "The IP whitelist cannot be set while Group Replication "
2070                "is running", MYF(0));
2071     DBUG_RETURN(1);
2072   }
2073 
2074   (*(const char **) save)= NULL;
2075 
2076   if ((str= value->val_str(value, buff, &length)))
2077     str= thd->strmake(str, length);
2078   else // NULL value is not allowed
2079     DBUG_RETURN(1); /* purecov: inspected */
2080 
2081   // remove trailing whitespaces
2082   std::string v(str);
2083   v.erase(std::remove(v.begin(), v.end(), ' '), v.end());
2084   std::transform(v.begin(), v.end(), v.begin(), ::tolower);
2085   if (v.find("automatic") != std::string::npos && v.size() != 9)
2086   {
2087     my_message(ER_GROUP_REPLICATION_CONFIGURATION,
2088                "The IP whitelist is invalid. Make sure that AUTOMATIC "
2089                "when specifying \"AUTOMATIC\" the list contains no "
2090                "other values.", MYF(0));
2091     DBUG_RETURN(1);
2092   }
2093 
2094   *(const char**)save= str;
2095 
2096   DBUG_RETURN(0);
2097 }
2098 
check_compression_threshold(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2099 static int check_compression_threshold(MYSQL_THD thd, SYS_VAR *var,
2100                                        void* save,
2101                                        struct st_mysql_value *value)
2102 {
2103   DBUG_ENTER("check_compression_threshold");
2104 
2105   longlong in_val;
2106   value->val_int(value, &in_val);
2107 
2108   if (plugin_is_group_replication_running())
2109   {
2110     my_message(ER_GROUP_REPLICATION_RUNNING,
2111                "The compression threshold cannot be set while "
2112                "Group Replication is running",
2113                MYF(0));
2114     DBUG_RETURN(1);
2115   }
2116 
2117   if (in_val > MAX_COMPRESSION_THRESHOLD || in_val < 0)
2118   {
2119     std::stringstream ss;
2120     ss << "The value " << in_val << " is not within the range of "
2121       "accepted values for the option compression_threshold!";
2122     my_message(ER_WRONG_VALUE_FOR_VAR, ss.str().c_str(), MYF(0));
2123     DBUG_RETURN(1);
2124   }
2125 
2126   *(longlong*)save= in_val;
2127 
2128   DBUG_RETURN(0);
2129 }
2130 
check_force_members(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2131 static int check_force_members(MYSQL_THD thd, SYS_VAR *var,
2132                                void* save,
2133                                struct st_mysql_value *value)
2134 {
2135   DBUG_ENTER("check_force_members");
2136   int error= 0;
2137   char buff[STRING_BUFFER_USUAL_SIZE];
2138   const char *str= NULL;
2139   (*(const char **) save)= NULL;
2140   int length= 0;
2141 
2142   // Only one set force_members can run at a time.
2143   mysql_mutex_lock(&force_members_running_mutex);
2144   if (force_members_running)
2145   {
2146     log_message(MY_ERROR_LEVEL,
2147                 "There is one group_replication_force_members "
2148                 "operation already ongoing");
2149     mysql_mutex_unlock(&force_members_running_mutex);
2150     DBUG_RETURN(1);
2151   }
2152   force_members_running= true;
2153   mysql_mutex_unlock(&force_members_running_mutex);
2154 
2155 #ifndef NDEBUG
2156   DBUG_EXECUTE_IF("group_replication_wait_on_check_force_members",
2157                   {
2158                     const char act[]= "now wait_for waiting";
2159                     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
2160                   });
2161 #endif
2162 
2163   // String validations.
2164   length= sizeof(buff);
2165   if ((str= value->val_str(value, buff, &length)))
2166     str= thd->strmake(str, length);
2167   else
2168   {
2169     error= 1; /* purecov: inspected */
2170     goto end; /* purecov: inspected */
2171   }
2172 
2173   // If option value is empty string, just update its value.
2174   if (length == 0)
2175     goto update_value;
2176 
2177   // if group replication isn't running and majority is reachable you can't
2178   // update force_members
2179   if (!plugin_is_group_replication_running() ||
2180       !group_member_mgr->is_majority_unreachable())
2181   {
2182     log_message(MY_ERROR_LEVEL,
2183                 "group_replication_force_members can only be updated"
2184                 " when Group Replication is running and a majority of the"
2185                 " members are unreachable");
2186     error= 1;
2187     goto end;
2188   }
2189 
2190   if ((error= gcs_module->force_members(str)))
2191     goto end;
2192 
2193 update_value:
2194   *(const char**)save= str;
2195 
2196 end:
2197   mysql_mutex_lock(&force_members_running_mutex);
2198   force_members_running= false;
2199   mysql_mutex_unlock(&force_members_running_mutex);
2200 
2201   DBUG_RETURN(error);
2202 }
2203 
check_gtid_assignment_block_size(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2204 static int check_gtid_assignment_block_size(MYSQL_THD thd, SYS_VAR *var,
2205                                             void* save,
2206                                             struct st_mysql_value *value)
2207 {
2208   DBUG_ENTER("check_gtid_assignment_block_size");
2209 
2210   longlong in_val;
2211   value->val_int(value, &in_val);
2212 
2213   if (plugin_is_group_replication_running())
2214   {
2215     my_message(ER_GROUP_REPLICATION_RUNNING,
2216                "The GTID assignment block size cannot be set while "
2217                "Group Replication is running", MYF(0));
2218     DBUG_RETURN(1);
2219   }
2220 
2221   if (in_val > MAX_GTID_ASSIGNMENT_BLOCK_SIZE ||
2222       in_val < MIN_GTID_ASSIGNMENT_BLOCK_SIZE)
2223   {
2224     std::stringstream ss;
2225     ss << "The value " << in_val << " is not within the range of "
2226           "accepted values for the option gtid_assignment_block_size. "
2227           "The value must be between " << MIN_GTID_ASSIGNMENT_BLOCK_SIZE <<
2228           " and " << MAX_GTID_ASSIGNMENT_BLOCK_SIZE << " inclusive.";
2229     my_message(ER_WRONG_VALUE_FOR_VAR, ss.str().c_str(), MYF(0));
2230     DBUG_RETURN(1);
2231   }
2232 
2233   *(longlong*)save= in_val;
2234 
2235   DBUG_RETURN(0);
2236 }
2237 
2238 static bool
get_bool_value_using_type_lib(struct st_mysql_value * value,my_bool & resulting_value)2239 get_bool_value_using_type_lib(struct st_mysql_value *value,
2240                               my_bool &resulting_value)
2241 {
2242   DBUG_ENTER("get_bool_value_using_type_lib");
2243   longlong value_to_check;
2244 
2245   if (MYSQL_VALUE_TYPE_STRING == value->value_type(value))
2246   {
2247     const unsigned int flags = 0;
2248 
2249     char text_buffer[10] = { 0 };
2250     int  text_buffer_size = sizeof(text_buffer);
2251     const char *text_value = value->val_str(value,text_buffer, &text_buffer_size);
2252 
2253     if (NULL == text_value)
2254       DBUG_RETURN(false);
2255 
2256     // Return index inside bool_type_allowed_values array
2257     // (first element start with index 1)
2258     value_to_check = find_type(text_value, &plugin_bool_typelib, flags);
2259 
2260     if (0 == value_to_check)
2261     {
2262       DBUG_RETURN(false);
2263     }
2264 
2265     // Move the index value to 0,1 values (OFF, ON)
2266     --value_to_check;
2267   }
2268   else
2269   {
2270     // Do implicit conversion to int
2271     value->val_int(value, &value_to_check);
2272   }
2273 
2274   resulting_value = value_to_check > 0 ? TRUE : FALSE;
2275 
2276   DBUG_RETURN(true);
2277 }
2278 
2279 static int
check_single_primary_mode(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2280 check_single_primary_mode(MYSQL_THD thd, SYS_VAR *var,
2281                           void* save,
2282                           struct st_mysql_value *value)
2283 {
2284   DBUG_ENTER("check_single_primary_mode");
2285   my_bool single_primary_mode_val;
2286 
2287   if (!get_bool_value_using_type_lib(value, single_primary_mode_val))
2288     DBUG_RETURN(1);
2289 
2290   if (plugin_is_group_replication_running())
2291   {
2292     my_message(ER_GROUP_REPLICATION_RUNNING,
2293                "Cannot change into or from single primary mode while "
2294                "Group Replication is running.", MYF(0));
2295     DBUG_RETURN(1);
2296   }
2297 
2298   if (single_primary_mode_val && enforce_update_everywhere_checks_var)
2299   {
2300     my_message(ER_WRONG_VALUE_FOR_VAR,
2301                "Cannot turn ON single_primary_mode while "
2302                "enforce_update_everywhere_checks is enabled.",
2303                MYF(0));
2304     DBUG_RETURN(1);
2305   }
2306 
2307   *(my_bool *)save = single_primary_mode_val;
2308 
2309   DBUG_RETURN(0);
2310 }
2311 
2312 static int
check_enforce_update_everywhere_checks(MYSQL_THD thd,SYS_VAR * var,void * save,struct st_mysql_value * value)2313 check_enforce_update_everywhere_checks(MYSQL_THD thd, SYS_VAR *var,
2314                                        void* save,
2315                                        struct st_mysql_value *value)
2316 {
2317   DBUG_ENTER("check_enforce_update_everywhere_checks");
2318   my_bool enforce_update_everywhere_checks_val;
2319 
2320   if (!get_bool_value_using_type_lib(value, enforce_update_everywhere_checks_val))
2321     DBUG_RETURN(1);
2322 
2323   if (plugin_is_group_replication_running())
2324   {
2325     my_message(ER_GROUP_REPLICATION_RUNNING,
2326                "Cannot turn ON/OFF update everywhere checks mode while "
2327                "Group Replication is running.", MYF(0));
2328     DBUG_RETURN(1);
2329   }
2330 
2331   if (single_primary_mode_var && enforce_update_everywhere_checks_val)
2332   {
2333     my_message(ER_WRONG_VALUE_FOR_VAR,
2334                "Cannot enable enforce_update_everywhere_checks while "
2335                "single_primary_mode is enabled.",
2336                MYF(0));
2337     DBUG_RETURN(1);
2338   }
2339 
2340   *(my_bool *)save = enforce_update_everywhere_checks_val;
2341 
2342   DBUG_RETURN(0);
2343 }
2344 
2345 static void
update_allow_local_disjoint_gtids_join(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)2346 update_allow_local_disjoint_gtids_join(MYSQL_THD thd, SYS_VAR *var,
2347                                        void *var_ptr, const void *save)
2348 {
2349   DBUG_ENTER("update_allow_local_disjoint_gtids_join");
2350 
2351   (*(my_bool *) var_ptr)= (*(my_bool *) save);
2352 
2353   option_deprecation_warning(thd,
2354                              "group_replication_allow_local_disjoint_gtids_join");
2355 
2356   DBUG_VOID_RETURN;
2357 }
2358 
update_unreachable_timeout(MYSQL_THD thd,SYS_VAR * var,void * var_ptr,const void * save)2359 static void update_unreachable_timeout(MYSQL_THD thd, SYS_VAR *var,
2360                                        void *var_ptr, const void *save)
2361 {
2362   DBUG_ENTER("update_unreachable_timeout");
2363 
2364   ulong in_val= *static_cast<const ulong*>(save);
2365   (*(ulong*) var_ptr)= (*(ulong*) save);
2366 
2367   if (group_partition_handler != NULL)
2368   {
2369     group_partition_handler->update_timeout_on_unreachable(in_val);
2370   }
2371 
2372   DBUG_VOID_RETURN;
2373 }
2374 
2375 static void
update_member_weight(MYSQL_THD,SYS_VAR *,void * var_ptr,const void * save)2376 update_member_weight(MYSQL_THD, SYS_VAR*,
2377                      void *var_ptr, const void *save)
2378 {
2379   DBUG_ENTER("update_member_weight");
2380 
2381   (*(uint*) var_ptr)= (*(uint*) save);
2382   uint in_val= *static_cast<const uint*>(save);
2383 
2384   if (local_member_info != NULL)
2385   {
2386     local_member_info->set_member_weight(in_val);
2387   }
2388 
2389   DBUG_VOID_RETURN;
2390 }
2391 
update_transaction_size_limit(MYSQL_THD,SYS_VAR *,void * var_ptr,const void * save)2392 static void update_transaction_size_limit(MYSQL_THD, SYS_VAR *, void *var_ptr,
2393                                           const void *save) {
2394 
2395   ulong in_val = *static_cast<const ulong *>(save);
2396   *static_cast<ulong *>(var_ptr) = in_val;
2397   my_atomic_store64(&transaction_size_limit_var, in_val);
2398 
2399   transaction_size_limit_var = in_val;
2400 
2401   if (plugin_is_group_replication_running()) {
2402     update_write_set_memory_size_limit(transaction_size_limit_var);
2403   }
2404 }
2405 
2406 //Base plugin variables
2407 
2408 static MYSQL_SYSVAR_STR(
2409   group_name,                                 /* name */
2410   group_name_var,                             /* var */
2411   /* optional var | malloc string | no set default */
2412   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC | PLUGIN_VAR_NODEFAULT,
2413   "The group name",
2414   check_group_name,                           /* check func*/
2415   NULL,                                       /* update func*/
2416   NULL);                                      /* default*/
2417 
2418 static MYSQL_SYSVAR_BOOL(
2419   start_on_boot,                              /* name */
2420   start_group_replication_at_boot_var,        /* var */
2421   PLUGIN_VAR_OPCMDARG,                        /* optional var */
2422   "Whether the server should start Group Replication or not during bootstrap.",
2423   NULL,                                       /* check func*/
2424   NULL,                                       /* update func*/
2425   1);                                         /* default*/
2426 
2427 //GCS module variables
2428 
2429 static MYSQL_SYSVAR_STR(
2430   local_address,                              /* name */
2431   local_address_var,                          /* var */
2432   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,  /* optional var | malloc string*/
2433   "The local address, i.e., host:port.",
2434   NULL,                                       /* check func*/
2435   NULL,                                       /* update func*/
2436   "");                                        /* default*/
2437 
2438 static MYSQL_SYSVAR_STR(
2439   group_seeds,                                /* name */
2440   group_seeds_var,                            /* var */
2441   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,  /* optional var | malloc string*/
2442   "The list of group seeds, comma separated. E.g., host1:port1,host2:port2.",
2443   NULL,                                       /* check func*/
2444   NULL,                                       /* update func*/
2445   "");                                        /* default*/
2446 
2447 static MYSQL_SYSVAR_STR(
2448   force_members,                              /* name */
2449   force_members_var,                          /* var */
2450   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,  /* optional var | malloc string*/
2451   "The list of members, comma separated. E.g., host1:port1,host2:port2. "
2452   "This option is used to force a new group membership, on which the excluded "
2453   "members will not receive a new view and will be blocked. The DBA will need "
2454   "to kill the excluded servers.",
2455   check_force_members,                        /* check func*/
2456   NULL,                                       /* update func*/
2457   "");                                        /* default*/
2458 
2459 static MYSQL_SYSVAR_BOOL(
2460   bootstrap_group,                            /* name */
2461   bootstrap_group_var,                        /* var */
2462   PLUGIN_VAR_OPCMDARG,                        /* optional var */
2463   "Specify if this member will bootstrap the group.",
2464   NULL,                                       /* check func. */
2465   NULL,                                       /* update func*/
2466   0                                           /* default */
2467 );
2468 
2469 static MYSQL_SYSVAR_ULONG(
2470   poll_spin_loops,                            /* name */
2471   poll_spin_loops_var,                        /* var */
2472   PLUGIN_VAR_OPCMDARG,                        /* optional var */
2473   "The number of times a thread waits for a communication engine "
2474   "mutex to be freed before the thread is suspended.",
2475   NULL,                                       /* check func. */
2476   NULL,                                       /* update func. */
2477   0,                                          /* default */
2478   0,                                          /* min */
2479   ~0UL,                                       /* max */
2480   0                                           /* block */
2481 );
2482 
2483 //Recovery module variables
2484 
2485 static MYSQL_SYSVAR_ULONG(
2486   recovery_retry_count,              /* name */
2487   recovery_retry_count_var,          /* var */
2488   PLUGIN_VAR_OPCMDARG,               /* optional var */
2489   "The number of times that the joiner tries to connect to the available donors before giving up.",
2490   NULL,                              /* check func. */
2491   update_recovery_retry_count,       /* update func. */
2492   10,                                /* default */
2493   0,                                 /* min */
2494   LONG_TIMEOUT,                      /* max */
2495   0                                  /* block */
2496 );
2497 
2498 static MYSQL_SYSVAR_ULONG(
2499   recovery_reconnect_interval,        /* name */
2500   recovery_reconnect_interval_var,    /* var */
2501   PLUGIN_VAR_OPCMDARG,                /* optional var */
2502   "The sleep time between reconnection attempts when no donor was found in the group",
2503   NULL,                               /* check func. */
2504   update_recovery_reconnect_interval, /* update func. */
2505   60,                                 /* default */
2506   0,                                  /* min */
2507   LONG_TIMEOUT,                       /* max */
2508   0                                   /* block */
2509 );
2510 
2511 //SSL options for recovery
2512 
2513 static MYSQL_SYSVAR_BOOL(
2514     recovery_use_ssl,              /* name */
2515     recovery_use_ssl_var,          /* var */
2516     PLUGIN_VAR_OPCMDARG,           /* optional var */
2517     "Whether SSL use should be obligatory during Group Replication recovery process.",
2518     NULL,                          /* check func*/
2519     update_ssl_use,                /* update func*/
2520     0);                            /* default*/
2521 
2522 static MYSQL_SYSVAR_STR(
2523     recovery_ssl_ca,                 /* name */
2524     recovery_ssl_ca_var,             /* var */
2525     PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2526     "The path to a file that contains a list of trusted SSL certificate authorities.",
2527     check_recovery_ssl_option,       /* check func*/
2528     update_recovery_ssl_option,      /* update func*/
2529     "");                             /* default*/
2530 
2531 static MYSQL_SYSVAR_STR(
2532     recovery_ssl_capath,             /* name */
2533     recovery_ssl_capath_var,         /* var */
2534     PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2535     "The path to a directory that contains trusted SSL certificate authority certificates.",
2536     check_recovery_ssl_option,       /* check func*/
2537     update_recovery_ssl_option,      /* update func*/
2538     "");                             /* default*/
2539 
2540 static MYSQL_SYSVAR_STR(
2541     recovery_ssl_cert,               /* name */
2542     recovery_ssl_cert_var,           /* var */
2543     PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2544     "The name of the SSL certificate file to use for establishing a secure connection.",
2545     check_recovery_ssl_option,       /* check func*/
2546     update_recovery_ssl_option,      /* update func*/
2547     "");                             /* default*/
2548 
2549 static MYSQL_SYSVAR_STR(
2550     recovery_ssl_cipher,             /* name */
2551     recovery_ssl_cipher_var,         /* var */
2552     PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2553     "A list of permissible ciphers to use for SSL encryption.",
2554     check_recovery_ssl_option,       /* check func*/
2555     update_recovery_ssl_option,      /* update func*/
2556     "");                             /* default*/
2557 
2558 static MYSQL_SYSVAR_STR(
2559     recovery_ssl_key,                /* name */
2560     recovery_ssl_key_var,            /* var */
2561     PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2562     "The name of the SSL key file to use for establishing a secure connection.",
2563     check_recovery_ssl_option,       /* check func*/
2564     update_recovery_ssl_option,      /* update func*/
2565     "");                             /* default*/
2566 
2567 static MYSQL_SYSVAR_STR(
2568     recovery_ssl_crl,                /* name */
2569     recovery_ssl_crl_var,            /* var */
2570     PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2571     "The path to a file containing certificate revocation lists.",
2572     check_recovery_ssl_option,       /* check func*/
2573     update_recovery_ssl_option,      /* update func*/
2574     "");                             /* default*/
2575 
2576 static MYSQL_SYSVAR_STR(
2577     recovery_ssl_crlpath,            /* name */
2578     recovery_ssl_crlpath_var,        /* var */
2579     PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, /* optional var | malloc string*/
2580     "The path to a directory that contains files containing certificate revocation lists.",
2581     check_recovery_ssl_option,       /* check func*/
2582     update_recovery_ssl_option,      /* update func*/
2583     "");                             /* default*/
2584 
2585 static MYSQL_SYSVAR_BOOL(
2586     recovery_ssl_verify_server_cert,        /* name */
2587     recovery_ssl_verify_server_cert_var,    /* var */
2588     PLUGIN_VAR_OPCMDARG,                    /* optional var */
2589     "Make recovery check the server's Common Name value in the donor sent certificate.",
2590     NULL,                                   /* check func*/
2591     update_ssl_server_cert_verification,    /* update func*/
2592     0);                                     /* default*/
2593 
2594 /** Initialize the ssl option map with variable names*/
initialize_ssl_option_map()2595 static void initialize_ssl_option_map()
2596 {
2597   recovery_ssl_opt_map.clear();
2598   st_mysql_sys_var* ssl_ca_var= MYSQL_SYSVAR(recovery_ssl_ca);
2599   recovery_ssl_opt_map[ssl_ca_var->name]= RECOVERY_SSL_CA_OPT;
2600   st_mysql_sys_var* ssl_capath_var= MYSQL_SYSVAR(recovery_ssl_capath);
2601   recovery_ssl_opt_map[ssl_capath_var->name]= RECOVERY_SSL_CAPATH_OPT;
2602   st_mysql_sys_var* ssl_cert_var= MYSQL_SYSVAR(recovery_ssl_cert);
2603   recovery_ssl_opt_map[ssl_cert_var->name]= RECOVERY_SSL_CERT_OPT;
2604   st_mysql_sys_var* ssl_cipher_var= MYSQL_SYSVAR(recovery_ssl_cipher);
2605   recovery_ssl_opt_map[ssl_cipher_var->name]= RECOVERY_SSL_CIPHER_OPT;
2606   st_mysql_sys_var* ssl_key_var= MYSQL_SYSVAR(recovery_ssl_key);
2607   recovery_ssl_opt_map[ssl_key_var->name]= RECOVERY_SSL_KEY_OPT;
2608   st_mysql_sys_var* ssl_crl_var=MYSQL_SYSVAR(recovery_ssl_crl);
2609   recovery_ssl_opt_map[ssl_crl_var->name]= RECOVERY_SSL_CRL_OPT;
2610   st_mysql_sys_var* ssl_crlpath_var=MYSQL_SYSVAR(recovery_ssl_crlpath);
2611   recovery_ssl_opt_map[ssl_crlpath_var->name]= RECOVERY_SSL_CRLPATH_OPT;
2612 }
2613 
2614 // Recovery threshold options
2615 
2616 const char* recovery_policies[]= { "TRANSACTIONS_CERTIFIED",
2617                                    "TRANSACTIONS_APPLIED",
2618                                    (char *)0};
2619 
2620 TYPELIB recovery_policies_typelib_t= {
2621   array_elements(recovery_policies) - 1,
2622   "recovery_policies_typelib_t",
2623   recovery_policies,
2624   NULL
2625 };
2626 
2627 static MYSQL_SYSVAR_ENUM(
2628    recovery_complete_at,                                 /* name */
2629    recovery_completion_policy_var,                       /* var */
2630    PLUGIN_VAR_OPCMDARG,                                  /* optional var */
2631    "Recovery policies when handling cached transactions after state transfer."
2632    "possible values are TRANSACTIONS_CERTIFIED or TRANSACTION_APPLIED", /* values */
2633    NULL,                                                 /* check func. */
2634    update_recovery_completion_policy,                    /* update func. */
2635    RECOVERY_POLICY_WAIT_EXECUTED,                        /* default */
2636    &recovery_policies_typelib_t);                        /* type lib */
2637 
2638 //Generic timeout setting
2639 
2640 static MYSQL_SYSVAR_ULONG(
2641   components_stop_timeout,                         /* name */
2642   components_stop_timeout_var,                     /* var */
2643   PLUGIN_VAR_OPCMDARG,                             /* optional var */
2644   "Timeout in seconds that the plugin waits for each of the components when shutting down.",
2645   NULL,                                            /* check func. */
2646   update_component_timeout,                        /* update func. */
2647   LONG_TIMEOUT,                                    /* default */
2648   2,                                               /* min */
2649   LONG_TIMEOUT,                                    /* max */
2650   0                                                /* block */
2651 );
2652 
2653 //Allow member downgrade
2654 
2655 static MYSQL_SYSVAR_BOOL(
2656   allow_local_lower_version_join,        /* name */
2657   allow_local_lower_version_join_var,    /* var */
2658   PLUGIN_VAR_OPCMDARG,                   /* optional var */
2659   "Allow this server to join the group even if it has a lower plugin version than the group",
2660   NULL,                                  /* check func. */
2661   NULL,                                  /* update func*/
2662   0                                      /* default */
2663 );
2664 
2665 static MYSQL_SYSVAR_BOOL(
2666   allow_local_disjoint_gtids_join,       /* name */
2667   allow_local_disjoint_gtids_join_var,   /* var */
2668   PLUGIN_VAR_OPCMDARG,                   /* optional var */
2669   "Allow this server to join the group even if it has transactions not present in the group",
2670   NULL,                                  /* check func. */
2671   update_allow_local_disjoint_gtids_join,/* update func*/
2672   0                                      /* default */
2673 );
2674 
2675 static MYSQL_SYSVAR_ULONG(
2676   auto_increment_increment,          /* name */
2677   auto_increment_increment_var,      /* var */
2678   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2679   "The group replication auto_increment_increment determines interval between successive column values",
2680   check_auto_increment_increment,    /* check func. */
2681   NULL,                              /* update by update_func_long func. */
2682   DEFAULT_AUTO_INCREMENT_INCREMENT,  /* default */
2683   MIN_AUTO_INCREMENT_INCREMENT,      /* min */
2684   MAX_AUTO_INCREMENT_INCREMENT,      /* max */
2685   0                                  /* block */
2686 );
2687 
2688 static MYSQL_SYSVAR_ULONG(
2689   compression_threshold,             /* name */
2690   compression_threshold_var,         /* var */
2691   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2692   "The value in bytes above which (lz4) compression is "
2693   "enforced. When set to zero, deactivates compression. "
2694   "Default: 1000000.",
2695   check_compression_threshold,       /* check func. */
2696   NULL,                              /* update func. */
2697   DEFAULT_COMPRESSION_THRESHOLD,     /* default */
2698   MIN_COMPRESSION_THRESHOLD,         /* min */
2699   MAX_COMPRESSION_THRESHOLD,         /* max */
2700   0                                  /* block */
2701 );
2702 
2703 static MYSQL_SYSVAR_ULONGLONG(
2704   gtid_assignment_block_size,        /* name */
2705   gtid_assignment_block_size_var,    /* var */
2706   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2707   "The number of consecutive GTIDs that are reserved to each "
2708   "member. Each member will consume its blocks and reserve "
2709   "more when needed. Default: 1000000.",
2710   check_gtid_assignment_block_size,  /* check func. */
2711   NULL,                              /* update func. */
2712   DEFAULT_GTID_ASSIGNMENT_BLOCK_SIZE,/* default */
2713   MIN_GTID_ASSIGNMENT_BLOCK_SIZE,    /* min */
2714   MAX_GTID_ASSIGNMENT_BLOCK_SIZE,    /* max */
2715   0                                  /* block */
2716 );
2717 
2718 TYPELIB ssl_mode_values_typelib_t= {
2719   array_elements(ssl_mode_values) - 1,
2720   "ssl_mode_values_typelib_t",
2721   ssl_mode_values,
2722   NULL
2723 };
2724 
2725 static MYSQL_SYSVAR_ENUM(
2726   ssl_mode,                          /* name */
2727   ssl_mode_var,                      /* var */
2728   PLUGIN_VAR_OPCMDARG,               /* optional var */
2729   "Specifies the security state of the connection between Group "
2730   "Replication members. Default: DISABLED",
2731   NULL,                              /* check func. */
2732   NULL,                              /* update func. */
2733   0,                                 /* default */
2734   &ssl_mode_values_typelib_t         /* type lib */
2735 );
2736 
2737 static MYSQL_SYSVAR_STR(
2738   ip_whitelist,                             /* name */
2739   ip_whitelist_var,                         /* var */
2740   /* optional var | malloc string | no set default */
2741   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC | PLUGIN_VAR_NODEFAULT,
2742   "This option can be used to specify which members "
2743   "are allowed to connect to this member. The input "
2744   "takes the form of a comma separated list of IPv4 "
2745   "addresses or subnet CIDR notation. For example: "
2746   "192.168.1.0/24,10.0.0.1. In addition, the user can "
2747   "also set as input the value 'AUTOMATIC', in which case "
2748   "active interfaces on the host will be scanned and "
2749   "those with addresses on private subnetworks will be "
2750   "automatically added to the IP whitelist. The address "
2751   "127.0.0.1 is always added if not specified explicitly "
2752   "in the whitelist. Default: 'AUTOMATIC'.",
2753   check_ip_whitelist_preconditions,           /* check func*/
2754   NULL,                                       /* update func*/
2755   IP_WHITELIST_DEFAULT);                      /* default*/
2756 
2757 static MYSQL_SYSVAR_BOOL(
2758   single_primary_mode,                        /* name */
2759   single_primary_mode_var,                    /* var */
2760   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2761   "Instructs the group to automatically pick a single server to be "
2762   "the one that handles read/write workload. This server is the "
2763   "PRIMARY all others are SECONDARIES. Default: TRUE.",
2764   check_single_primary_mode,                  /* check func*/
2765   NULL,                                       /* update func*/
2766   TRUE);                                      /* default*/
2767 
2768 static MYSQL_SYSVAR_BOOL(
2769   enforce_update_everywhere_checks,           /* name */
2770   enforce_update_everywhere_checks_var,       /* var */
2771   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_NODEFAULT, /* optional var | no set default */
2772   "Enable/Disable strict consistency checks for multi-master "
2773   "update everywhere. Default: FALSE.",
2774   check_enforce_update_everywhere_checks,     /* check func*/
2775   NULL,                                       /* update func*/
2776   FALSE);                                     /* default*/
2777 
2778 const char* flow_control_mode_values[]= {
2779   "DISABLED",
2780   "QUOTA",
2781   (const char*)0
2782 };
2783 
2784 TYPELIB flow_control_mode_typelib_t= {
2785   array_elements(flow_control_mode_values) - 1,
2786   "flow_control_mode_typelib_t",
2787   flow_control_mode_values,
2788   NULL
2789 };
2790 
2791 static MYSQL_SYSVAR_ENUM(
2792   flow_control_mode,                 /* name */
2793   flow_control_mode_var,             /* var */
2794   PLUGIN_VAR_OPCMDARG,               /* optional var */
2795   "Specifies the mode used on flow control. "
2796   "Default: QUOTA",
2797   NULL,                              /* check func. */
2798   NULL,                              /* update func. */
2799   FCM_QUOTA,                         /* default */
2800   &flow_control_mode_typelib_t       /* type lib */
2801 );
2802 
2803 static MYSQL_SYSVAR_INT(
2804   flow_control_certifier_threshold,     /* name */
2805   flow_control_certifier_threshold_var, /* var */
2806   PLUGIN_VAR_OPCMDARG,                  /* optional var */
2807   "Specifies the number of waiting transactions that will trigger "
2808   "flow control. Default: 25000",
2809   NULL,                                 /* check func. */
2810   NULL,                                 /* update func. */
2811   DEFAULT_FLOW_CONTROL_THRESHOLD,       /* default */
2812   MIN_FLOW_CONTROL_THRESHOLD,           /* min */
2813   MAX_FLOW_CONTROL_THRESHOLD,           /* max */
2814   0                                     /* block */
2815 );
2816 
2817 static MYSQL_SYSVAR_INT(
2818   flow_control_applier_threshold,      /* name */
2819   flow_control_applier_threshold_var,  /* var */
2820   PLUGIN_VAR_OPCMDARG,                 /* optional var */
2821   "Specifies the number of waiting transactions that will trigger "
2822   "flow control. Default: 25000",
2823   NULL,                                /* check func. */
2824   NULL,                                /* update func. */
2825   DEFAULT_FLOW_CONTROL_THRESHOLD,      /* default */
2826   MIN_FLOW_CONTROL_THRESHOLD,          /* min */
2827   MAX_FLOW_CONTROL_THRESHOLD,          /* max */
2828   0                                    /* block */
2829 );
2830 
2831 static MYSQL_SYSVAR_ULONG(
2832   transaction_size_limit,              /* name */
2833   transaction_size_limit_base_var,          /* var */
2834   PLUGIN_VAR_OPCMDARG,                 /* optional var */
2835   "Specifies the limit of transaction size that can be transferred over network.",
2836   NULL,                                /* check func. */
2837   update_transaction_size_limit,       /* update func. */
2838   DEFAULT_TRANSACTION_SIZE_LIMIT,      /* default */
2839   MIN_TRANSACTION_SIZE_LIMIT,          /* min */
2840   MAX_TRANSACTION_SIZE_LIMIT,          /* max */
2841   0                                    /* block */
2842 );
2843 
2844 static MYSQL_SYSVAR_ULONG(
2845   unreachable_majority_timeout,                    /* name */
2846   timeout_on_unreachable_var,                      /* var */
2847   PLUGIN_VAR_OPCMDARG,                             /* optional var */
2848   "The number of seconds before going into error when a majority of members is unreachable."
2849   "If 0 there is no action taken.",
2850   NULL,                                            /* check func. */
2851   update_unreachable_timeout,                      /* update func. */
2852   0,                                               /* default */
2853   0,                                               /* min */
2854   LONG_TIMEOUT,                                    /* max */
2855   0                                                /* block */
2856 );
2857 
2858 static MYSQL_SYSVAR_UINT(
2859   member_weight,                       /* name */
2860   member_weight_var,                   /* var */
2861   PLUGIN_VAR_OPCMDARG,                 /* optional var */
2862   "Member weight will determine the member role in the group on"
2863   " future primary elections",
2864   NULL,                                /* check func. */
2865   update_member_weight,                /* update func. */
2866   DEFAULT_MEMBER_WEIGHT,               /* default */
2867   MIN_MEMBER_WEIGHT,                   /* min */
2868   MAX_MEMBER_WEIGHT,                   /* max */
2869   0                                    /* block */
2870 );
2871 
2872 const char *exit_state_actions[]= {"READ_ONLY", "ABORT_SERVER", (char *)0};
2873 TYPELIB exit_state_actions_typelib_t= {array_elements(exit_state_actions) - 1,
2874                                        "exit_state_actions_typelib_t",
2875                                        exit_state_actions, NULL};
2876 static MYSQL_SYSVAR_ENUM(exit_state_action,     /* name */
2877                          exit_state_action_var, /* var */
2878                          PLUGIN_VAR_OPCMDARG,   /* optional var */
2879                          "The action that is taken when the server "
2880                          "leaves the group. "
2881                          "Possible values are READ_ONLY or "
2882                          "ABORT_SERVER.",                /* values */
2883                          NULL,                           /* check func. */
2884                          NULL,                           /* update func. */
2885                          EXIT_STATE_ACTION_READ_ONLY,    /* default */
2886                          &exit_state_actions_typelib_t); /* type lib */
2887 
2888 static SYS_VAR* group_replication_system_vars[]= {
2889   MYSQL_SYSVAR(group_name),
2890   MYSQL_SYSVAR(start_on_boot),
2891   MYSQL_SYSVAR(local_address),
2892   MYSQL_SYSVAR(group_seeds),
2893   MYSQL_SYSVAR(force_members),
2894   MYSQL_SYSVAR(bootstrap_group),
2895   MYSQL_SYSVAR(poll_spin_loops),
2896   MYSQL_SYSVAR(recovery_retry_count),
2897   MYSQL_SYSVAR(recovery_use_ssl),
2898   MYSQL_SYSVAR(recovery_ssl_ca),
2899   MYSQL_SYSVAR(recovery_ssl_capath),
2900   MYSQL_SYSVAR(recovery_ssl_cert),
2901   MYSQL_SYSVAR(recovery_ssl_cipher),
2902   MYSQL_SYSVAR(recovery_ssl_key),
2903   MYSQL_SYSVAR(recovery_ssl_crl),
2904   MYSQL_SYSVAR(recovery_ssl_crlpath),
2905   MYSQL_SYSVAR(recovery_ssl_verify_server_cert),
2906   MYSQL_SYSVAR(recovery_complete_at),
2907   MYSQL_SYSVAR(recovery_reconnect_interval),
2908   MYSQL_SYSVAR(components_stop_timeout),
2909   MYSQL_SYSVAR(allow_local_lower_version_join),
2910   MYSQL_SYSVAR(allow_local_disjoint_gtids_join),
2911   MYSQL_SYSVAR(auto_increment_increment),
2912   MYSQL_SYSVAR(compression_threshold),
2913   MYSQL_SYSVAR(gtid_assignment_block_size),
2914   MYSQL_SYSVAR(ssl_mode),
2915   MYSQL_SYSVAR(ip_whitelist),
2916   MYSQL_SYSVAR(single_primary_mode),
2917   MYSQL_SYSVAR(enforce_update_everywhere_checks),
2918   MYSQL_SYSVAR(flow_control_mode),
2919   MYSQL_SYSVAR(flow_control_certifier_threshold),
2920   MYSQL_SYSVAR(flow_control_applier_threshold),
2921   MYSQL_SYSVAR(transaction_size_limit),
2922   MYSQL_SYSVAR(unreachable_majority_timeout),
2923   MYSQL_SYSVAR(member_weight),
2924   MYSQL_SYSVAR(exit_state_action),
2925   NULL,
2926 };
2927 
2928 
2929 static int
show_primary_member(MYSQL_THD thd,SHOW_VAR * var,char * buff)2930 show_primary_member(MYSQL_THD thd, SHOW_VAR *var, char *buff)
2931 {
2932   var->type= SHOW_CHAR;
2933   var->value= NULL;
2934 
2935   if (group_member_mgr && single_primary_mode_var &&
2936       plugin_is_group_replication_running())
2937   {
2938     string primary_member_uuid;
2939     group_member_mgr->get_primary_member_uuid(primary_member_uuid);
2940 
2941     strncpy(buff, primary_member_uuid.c_str(), SHOW_VAR_FUNC_BUFF_SIZE);
2942     buff[SHOW_VAR_FUNC_BUFF_SIZE - 1] = 0;
2943 
2944     var->value= buff;
2945   }
2946 
2947   return 0;
2948 }
2949 
2950 static SHOW_VAR group_replication_status_vars[]=
2951 {
2952   {"group_replication_primary_member",
2953    (char*) &show_primary_member,
2954    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
2955   {NULL, NULL, SHOW_LONG, SHOW_SCOPE_GLOBAL},
2956 };
2957 
2958 
mysql_declare_plugin(group_replication_plugin)2959 mysql_declare_plugin(group_replication_plugin)
2960 {
2961   MYSQL_GROUP_REPLICATION_PLUGIN,
2962   &group_replication_descriptor,
2963   group_replication_plugin_name,
2964   "ORACLE",
2965   "Group Replication (1.0.0)",      /* Plugin name with full version*/
2966   PLUGIN_LICENSE_GPL,
2967   plugin_group_replication_init,    /* Plugin Init */
2968   plugin_group_replication_deinit,  /* Plugin Deinit */
2969   0x0100,                           /* Plugin Version: major.minor */
2970   group_replication_status_vars,    /* status variables */
2971   group_replication_system_vars,    /* system variables */
2972   NULL,                             /* config options */
2973   0,                                /* flags */
2974 }
2975 mysql_declare_plugin_end;
2976