1 /* Copyright (C) 2007 Google Inc.
2 Copyright (c) 2008, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 #include <stddef.h>
25 #include <sys/types.h>
26
27 #include "my_inttypes.h"
28 #include "my_macros.h"
29 #include "my_psi_config.h"
30 #include "mysql/psi/mysql_memory.h"
31 #include "mysql/psi/mysql_stage.h"
32 #include "plugin/semisync/semisync_master.h"
33 #include "plugin/semisync/semisync_master_ack_receiver.h"
34 #include "sql/current_thd.h"
35 #include "sql/protocol_classic.h"
36 #include "sql/sql_class.h" // THD
37 #include "typelib.h"
38
39 ReplSemiSyncMaster *repl_semisync = nullptr;
40 Ack_receiver *ack_receiver = nullptr;
41
42 /* The places at where semisync waits for binlog ACKs. */
43 enum enum_wait_point { WAIT_AFTER_SYNC, WAIT_AFTER_COMMIT };
44
45 static ulong rpl_semi_sync_master_wait_point = WAIT_AFTER_COMMIT;
46
47 thread_local bool THR_RPL_SEMI_SYNC_DUMP = false;
48
49 static SERVICE_TYPE(registry) *reg_srv = nullptr;
50 SERVICE_TYPE(log_builtins) *log_bi = nullptr;
51 SERVICE_TYPE(log_builtins_string) *log_bs = nullptr;
52
is_semi_sync_dump()53 static inline bool is_semi_sync_dump() { return THR_RPL_SEMI_SYNC_DUMP; }
54
repl_semi_report_binlog_update(Binlog_storage_param *,const char * log_file,my_off_t log_pos)55 static int repl_semi_report_binlog_update(Binlog_storage_param *,
56 const char *log_file,
57 my_off_t log_pos) {
58 int error = 0;
59
60 if (repl_semisync->getMasterEnabled()) {
61 /*
62 Let us store the binlog file name and the position, so that
63 we know how long to wait for the binlog to the replicated to
64 the slave in synchronous replication.
65 */
66 error = repl_semisync->writeTranxInBinlog(log_file, log_pos);
67 }
68
69 return error;
70 }
71
repl_semi_report_binlog_sync(Binlog_storage_param *,const char * log_file,my_off_t log_pos)72 static int repl_semi_report_binlog_sync(Binlog_storage_param *,
73 const char *log_file,
74 my_off_t log_pos) {
75 if (rpl_semi_sync_master_wait_point == WAIT_AFTER_SYNC)
76 return repl_semisync->commitTrx(log_file, log_pos);
77 return 0;
78 }
79
repl_semi_report_before_dml(Trans_param *,int &)80 static int repl_semi_report_before_dml(Trans_param *, int &) { return 0; }
81
repl_semi_report_before_commit(Trans_param *)82 static int repl_semi_report_before_commit(Trans_param *) { return 0; }
83
repl_semi_report_before_rollback(Trans_param *)84 static int repl_semi_report_before_rollback(Trans_param *) { return 0; }
85
repl_semi_report_commit(Trans_param * param)86 static int repl_semi_report_commit(Trans_param *param) {
87 bool is_real_trans = param->flags & TRANS_IS_REAL_TRANS;
88
89 if (rpl_semi_sync_master_wait_point == WAIT_AFTER_COMMIT && is_real_trans &&
90 param->log_pos) {
91 const char *binlog_name = param->log_file;
92 return repl_semisync->commitTrx(binlog_name, param->log_pos);
93 }
94 return 0;
95 }
96
repl_semi_report_rollback(Trans_param * param)97 static int repl_semi_report_rollback(Trans_param *param) {
98 return repl_semi_report_commit(param);
99 }
100
repl_semi_report_begin(Trans_param *,int &)101 static int repl_semi_report_begin(Trans_param *, int &) { return 0; }
102
repl_semi_binlog_dump_start(Binlog_transmit_param * param,const char * log_file,my_off_t log_pos)103 static int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
104 const char *log_file, my_off_t log_pos) {
105 long long semi_sync_slave = 0;
106
107 /*
108 semi_sync_slave will be 0 if the user variable doesn't exist. Otherwise, it
109 will be set to the value of the user variable.
110 'rpl_semi_sync_slave = 0' means that it is not a semisync slave.
111 */
112 get_user_var_int("rpl_semi_sync_slave", &semi_sync_slave, nullptr);
113
114 if (semi_sync_slave != 0) {
115 if (ack_receiver->add_slave(current_thd)) {
116 LogErr(ERROR_LEVEL, ER_SEMISYNC_FAILED_REGISTER_SLAVE_TO_RECEIVER);
117 return -1;
118 }
119
120 THR_RPL_SEMI_SYNC_DUMP = true;
121
122 /* One more semi-sync slave */
123 repl_semisync->add_slave();
124
125 /* Tell server it will observe the transmission.*/
126 param->set_observe_flag();
127
128 /*
129 Let's assume this semi-sync slave has already received all
130 binlog events before the filename and position it requests.
131 */
132 repl_semisync->handleAck(param->server_id, log_file, log_pos);
133 } else
134 param->set_dont_observe_flag();
135
136 LogErr(INFORMATION_LEVEL, ER_SEMISYNC_START_BINLOG_DUMP_TO_SLAVE,
137 semi_sync_slave != 0 ? "semi-sync" : "asynchronous", param->server_id,
138 log_file, (unsigned long)log_pos);
139 return 0;
140 }
141
repl_semi_binlog_dump_end(Binlog_transmit_param * param)142 static int repl_semi_binlog_dump_end(Binlog_transmit_param *param) {
143 bool semi_sync_slave = is_semi_sync_dump();
144
145 LogErr(INFORMATION_LEVEL, ER_SEMISYNC_STOP_BINLOG_DUMP_TO_SLAVE,
146 semi_sync_slave ? "semi-sync" : "asynchronous", param->server_id);
147
148 if (semi_sync_slave) {
149 ack_receiver->remove_slave(current_thd);
150 /* One less semi-sync slave */
151 repl_semisync->remove_slave();
152 THR_RPL_SEMI_SYNC_DUMP = false;
153 }
154 return 0;
155 }
156
repl_semi_reserve_header(Binlog_transmit_param *,unsigned char * header,unsigned long size,unsigned long * len)157 static int repl_semi_reserve_header(Binlog_transmit_param *,
158 unsigned char *header, unsigned long size,
159 unsigned long *len) {
160 if (is_semi_sync_dump())
161 *len += repl_semisync->reserveSyncHeader(header, size);
162 return 0;
163 }
164
repl_semi_before_send_event(Binlog_transmit_param * param,unsigned char * packet,unsigned long,const char * log_file,my_off_t log_pos)165 static int repl_semi_before_send_event(Binlog_transmit_param *param,
166 unsigned char *packet, unsigned long,
167 const char *log_file, my_off_t log_pos) {
168 if (!is_semi_sync_dump()) return 0;
169
170 return repl_semisync->updateSyncHeader(packet, log_file, log_pos,
171 param->server_id);
172 }
173
repl_semi_after_send_event(Binlog_transmit_param * param,const char * event_buf,unsigned long,const char * skipped_log_file,my_off_t skipped_log_pos)174 static int repl_semi_after_send_event(Binlog_transmit_param *param,
175 const char *event_buf, unsigned long,
176 const char *skipped_log_file,
177 my_off_t skipped_log_pos) {
178 if (is_semi_sync_dump()) {
179 if (skipped_log_pos > 0)
180 repl_semisync->skipSlaveReply(event_buf, param->server_id,
181 skipped_log_file, skipped_log_pos);
182 else {
183 THD *thd = current_thd;
184 /*
185 Possible errors in reading slave reply are ignored deliberately
186 because we do not want dump thread to quit on this. Error
187 messages are already reported.
188 */
189 (void)repl_semisync->readSlaveReply(
190 thd->get_protocol_classic()->get_net(), event_buf);
191 thd->clear_error();
192 }
193 }
194 return 0;
195 }
196
repl_semi_reset_master(Binlog_transmit_param *)197 static int repl_semi_reset_master(Binlog_transmit_param *) {
198 if (repl_semisync->resetMaster()) return 1;
199 return 0;
200 }
201
202 /*
203 semisync system variables
204 */
205 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, SYS_VAR *var,
206 void *ptr, const void *val);
207
208 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd, SYS_VAR *var,
209 void *ptr, const void *val);
210
211 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd, SYS_VAR *var,
212 void *ptr, const void *val);
213
214 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, SYS_VAR *var,
215 void *ptr, const void *val);
216
217 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,
218 SYS_VAR *var,
219 void *ptr,
220 const void *val);
221
222 static MYSQL_SYSVAR_BOOL(
223 enabled, rpl_semi_sync_master_enabled, PLUGIN_VAR_OPCMDARG,
224 "Enable semi-synchronous replication master (disabled by default). ",
225 nullptr, // check
226 &fix_rpl_semi_sync_master_enabled, // update
227 0);
228
229 static MYSQL_SYSVAR_ULONG(
230 timeout, rpl_semi_sync_master_timeout, PLUGIN_VAR_OPCMDARG,
231 "The timeout value (in ms) for semi-synchronous replication in the master",
232 nullptr, // check
233 fix_rpl_semi_sync_master_timeout, // update
234 10000, 0, ~0UL, 1);
235
236 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
237 PLUGIN_VAR_OPCMDARG,
238 "Wait until timeout when no semi-synchronous "
239 "replication slave available (enabled by default). ",
240 nullptr, // check
241 &fix_rpl_semi_sync_master_wait_no_slave, // update
242 1);
243
244 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
245 PLUGIN_VAR_OPCMDARG,
246 "The tracing level for semi-sync replication.",
247 nullptr, // check
248 &fix_rpl_semi_sync_master_trace_level, // update
249 32, 0, ~0UL, 1);
250
251 static const char *wait_point_names[] = {"AFTER_SYNC", "AFTER_COMMIT", NullS};
252 static TYPELIB wait_point_typelib = {array_elements(wait_point_names) - 1, "",
253 wait_point_names, nullptr};
254 static MYSQL_SYSVAR_ENUM(
255 wait_point, /* name */
256 rpl_semi_sync_master_wait_point, /* var */
257 PLUGIN_VAR_OPCMDARG, /* flags */
258 "Semisync can wait for slave ACKs at one of two points,"
259 "AFTER_SYNC or AFTER_COMMIT. AFTER_SYNC is the default value."
260 "AFTER_SYNC means that semisynchronous replication waits just after the "
261 "binary log file is flushed, but before the engine commits, and so "
262 "guarantees that no other sessions can see the data before replicated to "
263 "slave. AFTER_COMMIT means that semisynchronous replication waits just "
264 "after the engine commits. Other sessions may see the data before it is "
265 "replicated, even though the current session is still waiting for the "
266 "commit "
267 "to end successfully.",
268 nullptr, /* check() */
269 nullptr, /* update() */
270 WAIT_AFTER_SYNC, /* default */
271 &wait_point_typelib /* typelib */
272 );
273
274 static MYSQL_SYSVAR_UINT(
275 wait_for_slave_count, /* name */
276 rpl_semi_sync_master_wait_for_slave_count, /* var */
277 PLUGIN_VAR_OPCMDARG, /* flags */
278 "How many slaves the events should be replicated to. Semisynchronous "
279 "replication master will wait until all events of the transaction are "
280 "replicated to at least rpl_semi_sync_master_wait_for_slave_count slaves",
281 nullptr, /* check() */
282 &fix_rpl_semi_sync_master_wait_for_slave_count, /* update */
283 1, 1, 65535, 1);
284
285 static SYS_VAR *semi_sync_master_system_vars[] = {
286 MYSQL_SYSVAR(enabled),
287 MYSQL_SYSVAR(timeout),
288 MYSQL_SYSVAR(wait_no_slave),
289 MYSQL_SYSVAR(trace_level),
290 MYSQL_SYSVAR(wait_point),
291 MYSQL_SYSVAR(wait_for_slave_count),
292 nullptr,
293 };
fix_rpl_semi_sync_master_timeout(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)294 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD, SYS_VAR *, void *ptr,
295 const void *val) {
296 *static_cast<unsigned long *>(ptr) = *static_cast<const unsigned long *>(val);
297 repl_semisync->setWaitTimeout(rpl_semi_sync_master_timeout);
298 return;
299 }
300
fix_rpl_semi_sync_master_trace_level(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)301 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD, SYS_VAR *,
302 void *ptr, const void *val) {
303 *static_cast<unsigned long *>(ptr) = *static_cast<const unsigned long *>(val);
304 repl_semisync->setTraceLevel(rpl_semi_sync_master_trace_level);
305 ack_receiver->setTraceLevel(rpl_semi_sync_master_trace_level);
306 return;
307 }
308
fix_rpl_semi_sync_master_enabled(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)309 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD, SYS_VAR *, void *ptr,
310 const void *val) {
311 *static_cast<char *>(ptr) = *static_cast<const char *>(val);
312 if (rpl_semi_sync_master_enabled) {
313 if (repl_semisync->enableMaster() != 0)
314 rpl_semi_sync_master_enabled = false;
315 else if (ack_receiver->start()) {
316 repl_semisync->disableMaster();
317 rpl_semi_sync_master_enabled = false;
318 }
319 } else {
320 if (repl_semisync->disableMaster() != 0)
321 rpl_semi_sync_master_enabled = true;
322 ack_receiver->stop();
323 }
324
325 return;
326 }
327
fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD,SYS_VAR *,void *,const void * val)328 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD, SYS_VAR *,
329 void *,
330 const void *val) {
331 (void)repl_semisync->setWaitSlaveCount(
332 *static_cast<const unsigned int *>(val));
333 }
334
fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)335 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD, SYS_VAR *,
336 void *ptr, const void *val) {
337 if (rpl_semi_sync_master_wait_no_slave != *static_cast<const char *>(val)) {
338 *static_cast<char *>(ptr) = *static_cast<const char *>(val);
339 repl_semisync->set_wait_no_slave(val);
340 }
341 }
342
343 Trans_observer trans_observer = {
344 sizeof(Trans_observer), // len
345
346 repl_semi_report_before_dml, // before_dml
347 repl_semi_report_before_commit, // before_commit
348 repl_semi_report_before_rollback, // before_rollback
349 repl_semi_report_commit, // after_commit
350 repl_semi_report_rollback, // after_rollback
351 repl_semi_report_begin, // begin
352 };
353
354 Binlog_storage_observer storage_observer = {
355 sizeof(Binlog_storage_observer), // len
356
357 repl_semi_report_binlog_update, // report_update
358 repl_semi_report_binlog_sync, // after_sync
359 };
360
361 Binlog_transmit_observer transmit_observer = {
362 sizeof(Binlog_transmit_observer), // len
363
364 repl_semi_binlog_dump_start, // start
365 repl_semi_binlog_dump_end, // stop
366 repl_semi_reserve_header, // reserve_header
367 repl_semi_before_send_event, // before_send_event
368 repl_semi_after_send_event, // after_send_event
369 repl_semi_reset_master, // reset
370 };
371
372 #define SHOW_FNAME(name) rpl_semi_sync_master_show_##name
373
374 #define DEF_SHOW_FUNC(name, show_type) \
375 static int SHOW_FNAME(name)(MYSQL_THD, SHOW_VAR * var, char *) { \
376 repl_semisync->setExportStats(); \
377 var->type = show_type; \
378 var->value = (char *)&rpl_semi_sync_master_##name; \
379 return 0; \
380 }
381
382 DEF_SHOW_FUNC(status, SHOW_BOOL)
383 DEF_SHOW_FUNC(clients, SHOW_LONG)
384 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
385 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
386 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
387 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
388 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
389 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
390 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
391
392 /* plugin status variables */
393 static SHOW_VAR semi_sync_master_status_vars[] = {
394 {"Rpl_semi_sync_master_status", (char *)&SHOW_FNAME(status), SHOW_FUNC,
395 SHOW_SCOPE_GLOBAL},
396 {"Rpl_semi_sync_master_clients", (char *)&SHOW_FNAME(clients), SHOW_FUNC,
397 SHOW_SCOPE_GLOBAL},
398 {"Rpl_semi_sync_master_yes_tx",
399 (char *)&rpl_semi_sync_master_yes_transactions, SHOW_LONG,
400 SHOW_SCOPE_GLOBAL},
401 {"Rpl_semi_sync_master_no_tx",
402 (char *)&rpl_semi_sync_master_no_transactions, SHOW_LONG,
403 SHOW_SCOPE_GLOBAL},
404 {"Rpl_semi_sync_master_wait_sessions", (char *)&SHOW_FNAME(wait_sessions),
405 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
406 {"Rpl_semi_sync_master_no_times", (char *)&rpl_semi_sync_master_off_times,
407 SHOW_LONG, SHOW_SCOPE_GLOBAL},
408 {"Rpl_semi_sync_master_timefunc_failures",
409 (char *)&rpl_semi_sync_master_timefunc_fails, SHOW_LONG,
410 SHOW_SCOPE_GLOBAL},
411 {"Rpl_semi_sync_master_wait_pos_backtraverse",
412 (char *)&rpl_semi_sync_master_wait_pos_backtraverse, SHOW_LONG,
413 SHOW_SCOPE_GLOBAL},
414 {"Rpl_semi_sync_master_tx_wait_time", (char *)&SHOW_FNAME(trx_wait_time),
415 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
416 {"Rpl_semi_sync_master_tx_waits", (char *)&SHOW_FNAME(trx_wait_num),
417 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
418 {"Rpl_semi_sync_master_tx_avg_wait_time",
419 (char *)&SHOW_FNAME(avg_trx_wait_time), SHOW_FUNC, SHOW_SCOPE_GLOBAL},
420 {"Rpl_semi_sync_master_net_wait_time", (char *)&SHOW_FNAME(net_wait_time),
421 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
422 {"Rpl_semi_sync_master_net_waits", (char *)&SHOW_FNAME(net_wait_num),
423 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
424 {"Rpl_semi_sync_master_net_avg_wait_time",
425 (char *)&SHOW_FNAME(avg_net_wait_time), SHOW_FUNC, SHOW_SCOPE_GLOBAL},
426 {nullptr, nullptr, SHOW_LONG, SHOW_SCOPE_GLOBAL},
427 };
428
429 #ifdef HAVE_PSI_INTERFACE
430
431 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
432 PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
433
434 static PSI_mutex_info all_semisync_mutexes[] = {
435 {&key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0, 0, PSI_DOCUMENT_ME},
436 {&key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0, 0,
437 PSI_DOCUMENT_ME}};
438
439 PSI_cond_key key_ss_cond_COND_binlog_send_;
440 PSI_cond_key key_ss_cond_Ack_receiver_cond;
441
442 static PSI_cond_info all_semisync_conds[] = {
443 {&key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0, 0,
444 PSI_DOCUMENT_ME},
445 {&key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0, 0,
446 PSI_DOCUMENT_ME}};
447
448 PSI_thread_key key_ss_thread_Ack_receiver_thread;
449
450 static PSI_thread_info all_semisync_threads[] = {
451 {&key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_SINGLETON, 0,
452 PSI_DOCUMENT_ME}};
453 #endif /* HAVE_PSI_INTERFACE */
454
455 PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave = {
456 0, "Waiting for semi-sync ACK from slave", 0, PSI_DOCUMENT_ME};
457
458 PSI_stage_info stage_waiting_for_semi_sync_slave = {
459 0, "Waiting for semi-sync slave connection", 0, PSI_DOCUMENT_ME};
460
461 PSI_stage_info stage_reading_semi_sync_ack = {
462 0, "Reading semi-sync ACK from slave", 0, PSI_DOCUMENT_ME};
463
464 /* Always defined. */
465 PSI_memory_key key_ss_memory_TranxNodeAllocator_block;
466
467 #ifdef HAVE_PSI_INTERFACE
468 PSI_stage_info *all_semisync_stages[] = {
469 &stage_waiting_for_semi_sync_ack_from_slave,
470 &stage_waiting_for_semi_sync_slave, &stage_reading_semi_sync_ack};
471
472 PSI_memory_info all_semisync_memory[] = {
473 {&key_ss_memory_TranxNodeAllocator_block, "TranxNodeAllocator::block", 0, 0,
474 PSI_DOCUMENT_ME}};
475
init_semisync_psi_keys(void)476 static void init_semisync_psi_keys(void) {
477 const char *category = "semisync";
478 int count;
479
480 count = static_cast<int>(array_elements(all_semisync_mutexes));
481 mysql_mutex_register(category, all_semisync_mutexes, count);
482
483 count = static_cast<int>(array_elements(all_semisync_conds));
484 mysql_cond_register(category, all_semisync_conds, count);
485
486 count = static_cast<int>(array_elements(all_semisync_stages));
487 mysql_stage_register(category, all_semisync_stages, count);
488
489 count = static_cast<int>(array_elements(all_semisync_memory));
490 mysql_memory_register(category, all_semisync_memory, count);
491
492 count = static_cast<int>(array_elements(all_semisync_threads));
493 mysql_thread_register(category, all_semisync_threads, count);
494 }
495 #endif /* HAVE_PSI_INTERFACE */
496
semi_sync_master_plugin_init(void * p)497 static int semi_sync_master_plugin_init(void *p) {
498 // Initialize error logging service.
499 if (init_logging_service_for_plugin(®_srv, &log_bi, &log_bs)) return 1;
500
501 #ifdef HAVE_PSI_INTERFACE
502 init_semisync_psi_keys();
503 #endif
504
505 THR_RPL_SEMI_SYNC_DUMP = false;
506
507 /*
508 In case the plugin has been unloaded, and reloaded, we may need to
509 re-initialize some global variables.
510 These are initialized to zero by the linker, but may need to be
511 re-initialized
512 */
513 rpl_semi_sync_master_no_transactions = 0;
514 rpl_semi_sync_master_yes_transactions = 0;
515
516 repl_semisync = new ReplSemiSyncMaster();
517 ack_receiver = new Ack_receiver();
518
519 if (repl_semisync->initObject()) {
520 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
521 return 1;
522 }
523 if (ack_receiver->init()) {
524 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
525 return 1;
526 }
527 if (register_trans_observer(&trans_observer, p)) {
528 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
529 return 1;
530 }
531 if (register_binlog_storage_observer(&storage_observer, p)) {
532 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
533 return 1;
534 }
535 if (register_binlog_transmit_observer(&transmit_observer, p)) {
536 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
537 return 1;
538 }
539 return 0;
540 }
541
semi_sync_master_plugin_deinit(void * p)542 static int semi_sync_master_plugin_deinit(void *p) {
543 // the plugin was not initialized, there is nothing to do here
544 if (ack_receiver == nullptr || repl_semisync == nullptr) return 0;
545
546 THR_RPL_SEMI_SYNC_DUMP = false;
547
548 if (unregister_trans_observer(&trans_observer, p)) {
549 LogErr(ERROR_LEVEL, ER_SEMISYNC_UNREGISTER_TRX_OBSERVER_FAILED);
550 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
551 return 1;
552 }
553 if (unregister_binlog_storage_observer(&storage_observer, p)) {
554 LogErr(ERROR_LEVEL, ER_SEMISYNC_UNREGISTER_BINLOG_STORAGE_OBSERVER_FAILED);
555 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
556 return 1;
557 }
558 if (unregister_binlog_transmit_observer(&transmit_observer, p)) {
559 LogErr(ERROR_LEVEL, ER_SEMISYNC_UNREGISTER_BINLOG_TRANSMIT_OBSERVER_FAILED);
560 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
561 return 1;
562 }
563 delete ack_receiver;
564 ack_receiver = nullptr;
565 delete repl_semisync;
566 repl_semisync = nullptr;
567
568 LogErr(INFORMATION_LEVEL, ER_SEMISYNC_UNREGISTERED_REPLICATOR);
569 deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs);
570 return 0;
571 }
572
573 struct Mysql_replication semi_sync_master_plugin = {
574 MYSQL_REPLICATION_INTERFACE_VERSION};
575
576 /*
577 Plugin library descriptor
578 */
mysql_declare_plugin(semi_sync_master)579 mysql_declare_plugin(semi_sync_master){
580 MYSQL_REPLICATION_PLUGIN,
581 &semi_sync_master_plugin,
582 "rpl_semi_sync_master",
583 PLUGIN_AUTHOR_ORACLE,
584 "Semi-synchronous replication master",
585 PLUGIN_LICENSE_GPL,
586 semi_sync_master_plugin_init, /* Plugin Init */
587 nullptr, /* Plugin Check uninstall */
588 semi_sync_master_plugin_deinit, /* Plugin Deinit */
589 0x0100 /* 1.0 */,
590 semi_sync_master_status_vars, /* status variables */
591 semi_sync_master_system_vars, /* system variables */
592 nullptr, /* config options */
593 0, /* flags */
594 } mysql_declare_plugin_end;
595