1 /* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "gcs_operations.h"
24 #include "plugin.h"
25 #include "plugin_log.h"
26
27 #include <vector>
28
29
30 const std::string Gcs_operations::gcs_engine= "xcom";
31
32
Gcs_operations()33 Gcs_operations::Gcs_operations()
34 : gcs_interface(NULL),
35 leave_coordination_leaving(0),
36 leave_coordination_left(0)
37 {
38 gcs_operations_lock= new Checkable_rwlock(
39 #ifdef HAVE_PSI_INTERFACE
40 key_GR_RWLOCK_gcs_operations
41 #endif
42 );
43 }
44
45
~Gcs_operations()46 Gcs_operations::~Gcs_operations()
47 {
48 delete gcs_operations_lock;
49 }
50
51
52 int
initialize()53 Gcs_operations::initialize()
54 {
55 DBUG_ENTER("Gcs_operations::initialize");
56 int error= 0;
57 gcs_operations_lock->wrlock();
58
59 my_atomic_store32(&leave_coordination_leaving, 0);
60 my_atomic_store32(&leave_coordination_left, 0);
61
62 assert(gcs_interface == NULL);
63 if ((gcs_interface=
64 Gcs_interface_factory::get_interface_implementation(
65 gcs_engine)) == NULL)
66 {
67 /* purecov: begin inspected */
68 log_message(MY_ERROR_LEVEL,
69 "Failure in group communication engine '%s' initialization",
70 gcs_engine.c_str());
71 error= GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR;
72 goto end;
73 /* purecov: end */
74 }
75
76 if (gcs_interface->set_logger(&gcs_logger))
77 {
78 /* purecov: begin inspected */
79 log_message(MY_ERROR_LEVEL,
80 "Unable to set the group communication engine logger");
81 error= GROUP_REPLICATION_COMMUNICATION_LAYER_SESSION_ERROR;
82 goto end;
83 /* purecov: end */
84 }
85
86 end:
87 gcs_operations_lock->unlock();
88 DBUG_RETURN(error);
89 }
90
91
92 void
finalize()93 Gcs_operations::finalize()
94 {
95 DBUG_ENTER("Gcs_operations::finalize");
96 gcs_operations_lock->wrlock();
97
98 if (gcs_interface != NULL)
99 gcs_interface->finalize();
100 Gcs_interface_factory::cleanup(gcs_engine);
101 gcs_interface= NULL;
102
103 gcs_operations_lock->unlock();
104 DBUG_VOID_RETURN;
105 }
106
107
108 enum enum_gcs_error
configure(const Gcs_interface_parameters & parameters)109 Gcs_operations::configure(const Gcs_interface_parameters& parameters)
110 {
111 DBUG_ENTER("Gcs_operations::configure");
112 enum enum_gcs_error error= GCS_NOK;
113 gcs_operations_lock->wrlock();
114
115 if (gcs_interface != NULL)
116 error= gcs_interface->initialize(parameters);
117
118 gcs_operations_lock->unlock();
119 DBUG_RETURN(error);
120 }
121
122
123 enum enum_gcs_error
join(const Gcs_communication_event_listener & communication_event_listener,const Gcs_control_event_listener & control_event_listener)124 Gcs_operations::join(const Gcs_communication_event_listener& communication_event_listener,
125 const Gcs_control_event_listener& control_event_listener)
126 {
127 DBUG_ENTER("Gcs_operations::join");
128 enum enum_gcs_error error= GCS_NOK;
129 gcs_operations_lock->wrlock();
130
131 if (gcs_interface == NULL || !gcs_interface->is_initialized())
132 {
133 /* purecov: begin inspected */
134 gcs_operations_lock->unlock();
135 DBUG_RETURN(GCS_NOK);
136 /* purecov: end */
137 }
138
139 std::string group_name(group_name_var);
140 Gcs_group_identifier group_id(group_name);
141
142 Gcs_communication_interface *gcs_communication=
143 gcs_interface->get_communication_session(group_id);
144 Gcs_control_interface *gcs_control=
145 gcs_interface->get_control_session(group_id);
146
147 if (gcs_communication == NULL || gcs_control == NULL)
148 {
149 /* purecov: begin inspected */
150 gcs_operations_lock->unlock();
151 DBUG_RETURN(GCS_NOK);
152 /* purecov: end */
153 }
154
155 gcs_control->add_event_listener(control_event_listener);
156 gcs_communication->add_event_listener(communication_event_listener);
157
158 /*
159 Fake a GCS join error by not invoking join(), the
160 view_change_notifier will error out and return a error on
161 START GROUP_REPLICATION command.
162 */
163 DBUG_EXECUTE_IF("group_replication_inject_gcs_join_error",
164 { gcs_operations_lock->unlock(); DBUG_RETURN(GCS_OK); };);
165
166 error= gcs_control->join();
167
168 gcs_operations_lock->unlock();
169 DBUG_RETURN(error);
170 }
171
172
173 bool
belongs_to_group()174 Gcs_operations::belongs_to_group()
175 {
176 DBUG_ENTER("Gcs_operations::belongs_to_group");
177 bool res= false;
178 gcs_operations_lock->rdlock();
179
180 if (gcs_interface != NULL && gcs_interface->is_initialized())
181 {
182 std::string group_name(group_name_var);
183 Gcs_group_identifier group_id(group_name);
184 Gcs_control_interface *gcs_control=
185 gcs_interface->get_control_session(group_id);
186
187 if (gcs_control != NULL && gcs_control->belongs_to_group())
188 res= true;
189 }
190
191 gcs_operations_lock->unlock();
192 DBUG_RETURN(res);
193 }
194
195
196 Gcs_operations::enum_leave_state
leave()197 Gcs_operations::leave()
198 {
199 DBUG_ENTER("Gcs_operations::leave");
200 enum_leave_state state= ERROR_WHEN_LEAVING;
201 gcs_operations_lock->wrlock();
202
203 if (my_atomic_load32(&leave_coordination_left))
204 {
205 state= ALREADY_LEFT;
206 goto end;
207 }
208 if (my_atomic_load32(&leave_coordination_leaving))
209 {
210 state= ALREADY_LEAVING;
211 goto end;
212 }
213
214 if (gcs_interface != NULL && gcs_interface->is_initialized())
215 {
216 std::string group_name(group_name_var);
217 Gcs_group_identifier group_id(group_name);
218 Gcs_control_interface *gcs_control=
219 gcs_interface->get_control_session(group_id);
220
221 if (gcs_control != NULL)
222 {
223 if (!gcs_control->leave())
224 {
225 state= NOW_LEAVING;
226 my_atomic_store32(&leave_coordination_leaving, 1);
227 goto end;
228 }
229 }
230 else
231 {
232 /* purecov: begin inspected */
233 log_message(MY_ERROR_LEVEL,
234 "Error calling group communication interfaces while trying"
235 " to leave the group");
236 goto end;
237 /* purecov: end */
238 }
239 }
240 else
241 {
242 log_message(MY_ERROR_LEVEL,
243 "Error calling group communication interfaces while trying"
244 " to leave the group");
245 goto end;
246 }
247
248 end:
249 gcs_operations_lock->unlock();
250 DBUG_RETURN(state);
251 }
252
253
254 void
leave_coordination_member_left()255 Gcs_operations::leave_coordination_member_left()
256 {
257 DBUG_ENTER("Gcs_operations::leave_coordination_member_left");
258 my_atomic_store32(&leave_coordination_leaving, 0);
259 my_atomic_store32(&leave_coordination_left, 1);
260 DBUG_VOID_RETURN;
261 }
262
263
264 Gcs_view*
get_current_view()265 Gcs_operations::get_current_view()
266 {
267 DBUG_ENTER("Gcs_operations::get_current_view");
268 Gcs_view *view= NULL;
269 gcs_operations_lock->rdlock();
270
271 if (gcs_interface != NULL && gcs_interface->is_initialized())
272 {
273 std::string group_name(group_name_var);
274 Gcs_group_identifier group_id(group_name);
275 Gcs_control_interface *gcs_control=
276 gcs_interface->get_control_session(group_id);
277
278 if (gcs_control != NULL && gcs_control->belongs_to_group())
279 view= gcs_control->get_current_view();
280 }
281
282 gcs_operations_lock->unlock();
283 DBUG_RETURN(view);
284 }
285
286
287 int
get_local_member_identifier(std::string & identifier)288 Gcs_operations::get_local_member_identifier(std::string& identifier)
289 {
290 DBUG_ENTER("Gcs_operations::get_local_member_identifier");
291 int error= 1;
292 gcs_operations_lock->rdlock();
293
294 if (gcs_interface != NULL && gcs_interface->is_initialized())
295 {
296 std::string group_name(group_name_var);
297 Gcs_group_identifier group_id(group_name);
298 Gcs_control_interface *gcs_control=
299 gcs_interface->get_control_session(group_id);
300
301 if (gcs_control != NULL)
302 {
303 identifier.assign(gcs_control->get_local_member_identifier().get_member_id());
304 error= 0;
305 }
306 }
307
308 gcs_operations_lock->unlock();
309 DBUG_RETURN(error);
310 }
311
312
313 enum enum_gcs_error
send_message(const Plugin_gcs_message & message,bool skip_if_not_initialized)314 Gcs_operations::send_message(const Plugin_gcs_message& message,
315 bool skip_if_not_initialized)
316 {
317 DBUG_ENTER("Gcs_operations::send");
318 enum enum_gcs_error error= GCS_NOK;
319 gcs_operations_lock->rdlock();
320
321 /*
322 Ensure that group communication interfaces are initialized
323 and ready to use, since plugin can leave the group on errors
324 but continue to be active.
325 */
326 if (gcs_interface == NULL || !gcs_interface->is_initialized())
327 {
328 gcs_operations_lock->unlock();
329 DBUG_RETURN(skip_if_not_initialized ? GCS_OK : GCS_NOK);
330 }
331
332 std::string group_name(group_name_var);
333 Gcs_group_identifier group_id(group_name);
334
335 Gcs_communication_interface *gcs_communication=
336 gcs_interface->get_communication_session(group_id);
337 Gcs_control_interface *gcs_control=
338 gcs_interface->get_control_session(group_id);
339
340 if (gcs_communication == NULL || gcs_control == NULL)
341 {
342 /* purecov: begin inspected */
343 gcs_operations_lock->unlock();
344 DBUG_RETURN(skip_if_not_initialized ? GCS_OK : GCS_NOK);
345 /* purecov: end */
346 }
347
348 std::vector<uchar> message_data;
349 message.encode(&message_data);
350
351 Gcs_member_identifier origin= gcs_control->get_local_member_identifier();
352 Gcs_message gcs_message(origin, new Gcs_message_data(0, message_data.size()));
353 gcs_message.get_message_data().append_to_payload(&message_data.front(),
354 message_data.size());
355 error= gcs_communication->send_message(gcs_message);
356
357 gcs_operations_lock->unlock();
358 DBUG_RETURN(error);
359 }
360
361
362 int
force_members(const char * members)363 Gcs_operations::force_members(const char* members)
364 {
365 DBUG_ENTER("Gcs_operations::force_members");
366 int error= 0;
367 gcs_operations_lock->wrlock();
368
369 if (gcs_interface == NULL || !gcs_interface->is_initialized())
370 {
371 /* purecov: begin inspected */
372 log_message(MY_ERROR_LEVEL,
373 "Member is OFFLINE, it is not possible to force a "
374 "new group membership");
375 error= 1;
376 goto end;
377 /* purecov: end */
378 }
379
380 if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
381 {
382 std::string group_id_str(group_name_var);
383 Gcs_group_identifier group_id(group_id_str);
384 Gcs_group_management_interface* gcs_management=
385 gcs_interface->get_management_session(group_id);
386
387 if (gcs_management == NULL)
388 {
389 /* purecov: begin inspected */
390 log_message(MY_ERROR_LEVEL,
391 "Error calling group communication interfaces");
392 error= 1;
393 goto end;
394 /* purecov: end */
395 }
396
397 view_change_notifier->start_injected_view_modification();
398
399 Gcs_interface_parameters gcs_interface_parameters;
400 gcs_interface_parameters.add_parameter("peer_nodes",
401 std::string(members));
402 enum_gcs_error result=
403 gcs_management->modify_configuration(gcs_interface_parameters);
404 if (result != GCS_OK)
405 {
406 /* purecov: begin inspected */
407 log_message(MY_ERROR_LEVEL,
408 "Error setting group_replication_force_members "
409 "value '%s' on group communication interfaces", members);
410 error= 1;
411 goto end;
412 /* purecov: end */
413 }
414 log_message(MY_INFORMATION_LEVEL,
415 "The group_replication_force_members value '%s' "
416 "was set in the group communication interfaces", members);
417 if (view_change_notifier->wait_for_view_modification())
418 {
419 /* purecov: begin inspected */
420 log_message(MY_ERROR_LEVEL,
421 "Timeout on wait for view after setting "
422 "group_replication_force_members value '%s' "
423 "into group communication interfaces", members);
424 error= 1;
425 goto end;
426 /* purecov: end */
427 }
428 }
429 else
430 {
431 log_message(MY_ERROR_LEVEL,
432 "Member is not ONLINE, it is not possible to force a "
433 "new group membership");
434 error= 1;
435 goto end;
436 }
437
438 end:
439 gcs_operations_lock->unlock();
440 DBUG_RETURN(error);
441 }
442
get_gcs_engine()443 const std::string& Gcs_operations::get_gcs_engine()
444 {
445 return gcs_engine;
446 }
447