1 /* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "gcs_operations.h"
24 #include "plugin.h"
25 #include "plugin_log.h"
26 
27 #include <vector>
28 
29 
30 const std::string Gcs_operations::gcs_engine= "xcom";
31 
32 
Gcs_operations()33 Gcs_operations::Gcs_operations()
34   : gcs_interface(NULL),
35     leave_coordination_leaving(0),
36     leave_coordination_left(0)
37 {
38   gcs_operations_lock= new Checkable_rwlock(
39 #ifdef HAVE_PSI_INTERFACE
40       key_GR_RWLOCK_gcs_operations
41 #endif
42   );
43 }
44 
45 
~Gcs_operations()46 Gcs_operations::~Gcs_operations()
47 {
48   delete gcs_operations_lock;
49 }
50 
51 
52 int
initialize()53 Gcs_operations::initialize()
54 {
55   DBUG_ENTER("Gcs_operations::initialize");
56   int error= 0;
57   gcs_operations_lock->wrlock();
58 
59   my_atomic_store32(&leave_coordination_leaving, 0);
60   my_atomic_store32(&leave_coordination_left, 0);
61 
62   assert(gcs_interface == NULL);
63   if ((gcs_interface=
64            Gcs_interface_factory::get_interface_implementation(
65                gcs_engine)) == NULL)
66   {
67     /* purecov: begin inspected */
68     log_message(MY_ERROR_LEVEL,
69                 "Failure in group communication engine '%s' initialization",
70                 gcs_engine.c_str());
71     error= GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR;
72     goto end;
73     /* purecov: end */
74   }
75 
76   if (gcs_interface->set_logger(&gcs_logger))
77   {
78     /* purecov: begin inspected */
79     log_message(MY_ERROR_LEVEL,
80                 "Unable to set the group communication engine logger");
81     error= GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR;
82     goto end;
83     /* purecov: end */
84   }
85 
86 end:
87   gcs_operations_lock->unlock();
88   DBUG_RETURN(error);
89 }
90 
91 
92 void
finalize()93 Gcs_operations::finalize()
94 {
95   DBUG_ENTER("Gcs_operations::finalize");
96   gcs_operations_lock->wrlock();
97 
98   if (gcs_interface != NULL)
99     gcs_interface->finalize();
100   Gcs_interface_factory::cleanup(gcs_engine);
101   gcs_interface= NULL;
102 
103   gcs_operations_lock->unlock();
104   DBUG_VOID_RETURN;
105 }
106 
107 
108 enum enum_gcs_error
configure(const Gcs_interface_parameters & parameters)109 Gcs_operations::configure(const Gcs_interface_parameters& parameters)
110 {
111   DBUG_ENTER("Gcs_operations::configure");
112   enum enum_gcs_error error= GCS_NOK;
113   gcs_operations_lock->wrlock();
114 
115   if (gcs_interface != NULL)
116     error= gcs_interface->initialize(parameters);
117 
118   gcs_operations_lock->unlock();
119   DBUG_RETURN(error);
120 }
121 
122 
123 enum enum_gcs_error
join(const Gcs_communication_event_listener & communication_event_listener,const Gcs_control_event_listener & control_event_listener)124 Gcs_operations::join(const Gcs_communication_event_listener& communication_event_listener,
125                      const Gcs_control_event_listener& control_event_listener)
126 {
127   DBUG_ENTER("Gcs_operations::join");
128   enum enum_gcs_error error= GCS_NOK;
129   gcs_operations_lock->wrlock();
130 
131   if (gcs_interface == NULL || !gcs_interface->is_initialized())
132   {
133     /* purecov: begin inspected */
134     gcs_operations_lock->unlock();
135     DBUG_RETURN(GCS_NOK);
136     /* purecov: end */
137   }
138 
139   std::string group_name(group_name_var);
140   Gcs_group_identifier group_id(group_name);
141 
142   Gcs_communication_interface *gcs_communication=
143       gcs_interface->get_communication_session(group_id);
144   Gcs_control_interface *gcs_control=
145       gcs_interface->get_control_session(group_id);
146 
147   if (gcs_communication == NULL || gcs_control == NULL)
148   {
149     /* purecov: begin inspected */
150     gcs_operations_lock->unlock();
151     DBUG_RETURN(GCS_NOK);
152     /* purecov: end */
153   }
154 
155   gcs_control->add_event_listener(control_event_listener);
156   gcs_communication->add_event_listener(communication_event_listener);
157 
158   /*
159     Fake a GCS join error by not invoking join(), the
160     view_change_notifier will error out and return a error on
161     START GROUP_REPLICATION command.
162   */
163   DBUG_EXECUTE_IF("group_replication_inject_gcs_join_error",
164                   { gcs_operations_lock->unlock(); DBUG_RETURN(GCS_OK); };);
165 
166   error= gcs_control->join();
167 
168   gcs_operations_lock->unlock();
169   DBUG_RETURN(error);
170 }
171 
172 
173 bool
belongs_to_group()174 Gcs_operations::belongs_to_group()
175 {
176   DBUG_ENTER("Gcs_operations::belongs_to_group");
177   bool res= false;
178   gcs_operations_lock->rdlock();
179 
180   if (gcs_interface != NULL && gcs_interface->is_initialized())
181   {
182     std::string group_name(group_name_var);
183     Gcs_group_identifier group_id(group_name);
184     Gcs_control_interface *gcs_control=
185         gcs_interface->get_control_session(group_id);
186 
187     if (gcs_control != NULL && gcs_control->belongs_to_group())
188       res= true;
189   }
190 
191   gcs_operations_lock->unlock();
192   DBUG_RETURN(res);
193 }
194 
195 
196 Gcs_operations::enum_leave_state
leave()197 Gcs_operations::leave()
198 {
199   DBUG_ENTER("Gcs_operations::leave");
200   enum_leave_state state= ERROR_WHEN_LEAVING;
201   gcs_operations_lock->wrlock();
202 
203   if (my_atomic_load32(&leave_coordination_left))
204   {
205     state= ALREADY_LEFT;
206     goto end;
207   }
208   if (my_atomic_load32(&leave_coordination_leaving))
209   {
210     state= ALREADY_LEAVING;
211     goto end;
212   }
213 
214   if (gcs_interface != NULL && gcs_interface->is_initialized())
215   {
216     std::string group_name(group_name_var);
217     Gcs_group_identifier group_id(group_name);
218     Gcs_control_interface *gcs_control=
219         gcs_interface->get_control_session(group_id);
220 
221     if (gcs_control != NULL)
222     {
223       if (!gcs_control->leave())
224       {
225         state= NOW_LEAVING;
226         my_atomic_store32(&leave_coordination_leaving, 1);
227         goto end;
228       }
229     }
230     else
231     {
232       /* purecov: begin inspected */
233       log_message(MY_ERROR_LEVEL,
234                   "Error calling group communication interfaces while trying"
235                   " to leave the group");
236       goto end;
237       /* purecov: end */
238     }
239   }
240   else
241   {
242     log_message(MY_ERROR_LEVEL,
243                 "Error calling group communication interfaces while trying"
244                 " to leave the group");
245     goto end;
246   }
247 
248 end:
249   gcs_operations_lock->unlock();
250   DBUG_RETURN(state);
251 }
252 
253 
254 void
leave_coordination_member_left()255 Gcs_operations::leave_coordination_member_left()
256 {
257   DBUG_ENTER("Gcs_operations::leave_coordination_member_left");
258   my_atomic_store32(&leave_coordination_leaving, 0);
259   my_atomic_store32(&leave_coordination_left, 1);
260   DBUG_VOID_RETURN;
261 }
262 
263 
264 Gcs_view*
get_current_view()265 Gcs_operations::get_current_view()
266 {
267   DBUG_ENTER("Gcs_operations::get_current_view");
268   Gcs_view *view= NULL;
269   gcs_operations_lock->rdlock();
270 
271   if (gcs_interface != NULL && gcs_interface->is_initialized())
272   {
273     std::string group_name(group_name_var);
274     Gcs_group_identifier group_id(group_name);
275     Gcs_control_interface *gcs_control=
276         gcs_interface->get_control_session(group_id);
277 
278     if (gcs_control != NULL && gcs_control->belongs_to_group())
279       view= gcs_control->get_current_view();
280   }
281 
282   gcs_operations_lock->unlock();
283   DBUG_RETURN(view);
284 }
285 
286 
287 int
get_local_member_identifier(std::string & identifier)288 Gcs_operations::get_local_member_identifier(std::string& identifier)
289 {
290   DBUG_ENTER("Gcs_operations::get_local_member_identifier");
291   int error= 1;
292   gcs_operations_lock->rdlock();
293 
294   if (gcs_interface != NULL && gcs_interface->is_initialized())
295   {
296     std::string group_name(group_name_var);
297     Gcs_group_identifier group_id(group_name);
298     Gcs_control_interface *gcs_control=
299         gcs_interface->get_control_session(group_id);
300 
301     if (gcs_control != NULL)
302     {
303       identifier.assign(gcs_control->get_local_member_identifier().get_member_id());
304       error= 0;
305     }
306   }
307 
308   gcs_operations_lock->unlock();
309   DBUG_RETURN(error);
310 }
311 
312 
313 enum enum_gcs_error
send_message(const Plugin_gcs_message & message,bool skip_if_not_initialized)314 Gcs_operations::send_message(const Plugin_gcs_message& message,
315                              bool skip_if_not_initialized)
316 {
317   DBUG_ENTER("Gcs_operations::send");
318   enum enum_gcs_error error= GCS_NOK;
319   gcs_operations_lock->rdlock();
320 
321   /*
322     Ensure that group communication interfaces are initialized
323     and ready to use, since plugin can leave the group on errors
324     but continue to be active.
325   */
326   if (gcs_interface == NULL || !gcs_interface->is_initialized())
327   {
328     gcs_operations_lock->unlock();
329     DBUG_RETURN(skip_if_not_initialized ? GCS_OK : GCS_NOK);
330   }
331 
332   std::string group_name(group_name_var);
333   Gcs_group_identifier group_id(group_name);
334 
335   Gcs_communication_interface *gcs_communication=
336       gcs_interface->get_communication_session(group_id);
337   Gcs_control_interface *gcs_control=
338       gcs_interface->get_control_session(group_id);
339 
340   if (gcs_communication == NULL || gcs_control == NULL)
341   {
342     /* purecov: begin inspected */
343     gcs_operations_lock->unlock();
344     DBUG_RETURN(skip_if_not_initialized ? GCS_OK : GCS_NOK);
345     /* purecov: end */
346   }
347 
348   std::vector<uchar> message_data;
349   message.encode(&message_data);
350 
351   Gcs_member_identifier origin= gcs_control->get_local_member_identifier();
352   Gcs_message gcs_message(origin, new Gcs_message_data(0, message_data.size()));
353   gcs_message.get_message_data().append_to_payload(&message_data.front(),
354                                                    message_data.size());
355   error= gcs_communication->send_message(gcs_message);
356 
357   gcs_operations_lock->unlock();
358   DBUG_RETURN(error);
359 }
360 
361 
362 int
force_members(const char * members)363 Gcs_operations::force_members(const char* members)
364 {
365   DBUG_ENTER("Gcs_operations::force_members");
366   int error= 0;
367   gcs_operations_lock->wrlock();
368 
369   if (gcs_interface == NULL || !gcs_interface->is_initialized())
370   {
371     /* purecov: begin inspected */
372     log_message(MY_ERROR_LEVEL,
373                 "Member is OFFLINE, it is not possible to force a "
374                 "new group membership");
375     error= 1;
376     goto end;
377     /* purecov: end */
378   }
379 
380   if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
381   {
382     std::string group_id_str(group_name_var);
383     Gcs_group_identifier group_id(group_id_str);
384     Gcs_group_management_interface* gcs_management=
385         gcs_interface->get_management_session(group_id);
386 
387     if (gcs_management == NULL)
388     {
389       /* purecov: begin inspected */
390       log_message(MY_ERROR_LEVEL,
391                   "Error calling group communication interfaces");
392       error= 1;
393       goto end;
394       /* purecov: end */
395     }
396 
397     view_change_notifier->start_injected_view_modification();
398 
399     Gcs_interface_parameters gcs_interface_parameters;
400     gcs_interface_parameters.add_parameter("peer_nodes",
401                                            std::string(members));
402     enum_gcs_error result=
403         gcs_management->modify_configuration(gcs_interface_parameters);
404     if (result != GCS_OK)
405     {
406       /* purecov: begin inspected */
407       log_message(MY_ERROR_LEVEL,
408                   "Error setting group_replication_force_members "
409                   "value '%s' on group communication interfaces", members);
410       error= 1;
411       goto end;
412       /* purecov: end */
413     }
414     log_message(MY_INFORMATION_LEVEL,
415                 "The group_replication_force_members value '%s' "
416                 "was set in the group communication interfaces", members);
417     if (view_change_notifier->wait_for_view_modification())
418     {
419       /* purecov: begin inspected */
420       log_message(MY_ERROR_LEVEL,
421                   "Timeout on wait for view after setting "
422                   "group_replication_force_members value '%s' "
423                   "into group communication interfaces", members);
424       error= 1;
425       goto end;
426       /* purecov: end */
427     }
428   }
429   else
430   {
431     log_message(MY_ERROR_LEVEL,
432                 "Member is not ONLINE, it is not possible to force a "
433                 "new group membership");
434     error= 1;
435     goto end;
436   }
437 
438 end:
439   gcs_operations_lock->unlock();
440   DBUG_RETURN(error);
441 }
442 
get_gcs_engine()443 const std::string& Gcs_operations::get_gcs_engine()
444 {
445   return gcs_engine;
446 }
447