1 /* Copyright (C) 2007 Google Inc.
2 Copyright (c) 2008, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24
25 #include "semisync_master.h"
26 #include "sql_class.h" // THD
27 #include "semisync_master_ack_receiver.h"
28
29 ReplSemiSyncMaster repl_semisync;
30 Ack_receiver ack_receiver;
31
32 /* The places at where semisync waits for binlog ACKs. */
33 enum enum_wait_point {
34 WAIT_AFTER_SYNC,
35 WAIT_AFTER_COMMIT
36 };
37
38 static ulong rpl_semi_sync_master_wait_point= WAIT_AFTER_COMMIT;
39
40 static bool SEMI_SYNC_DUMP= true;
41
42 thread_local_key_t THR_RPL_SEMI_SYNC_DUMP;
43
is_semi_sync_dump()44 static inline bool is_semi_sync_dump()
45 {
46 /*
47 The key is only set for semisync dump threads, so it just checks if
48 the key is not NULL.
49 */
50 return my_get_thread_local(THR_RPL_SEMI_SYNC_DUMP) != NULL;
51 }
52
53 C_MODE_START
54
repl_semi_report_binlog_update(Binlog_storage_param * param,const char * log_file,my_off_t log_pos)55 int repl_semi_report_binlog_update(Binlog_storage_param *param,
56 const char *log_file,
57 my_off_t log_pos)
58 {
59 int error= 0;
60
61 if (repl_semisync.getMasterEnabled())
62 {
63 /*
64 Let us store the binlog file name and the position, so that
65 we know how long to wait for the binlog to the replicated to
66 the slave in synchronous replication.
67 */
68 error= repl_semisync.writeTranxInBinlog(log_file,
69 log_pos);
70 }
71
72 return error;
73 }
74
repl_semi_report_binlog_sync(Binlog_storage_param * param,const char * log_file,my_off_t log_pos)75 int repl_semi_report_binlog_sync(Binlog_storage_param *param,
76 const char *log_file,
77 my_off_t log_pos)
78 {
79 if (rpl_semi_sync_master_wait_point == WAIT_AFTER_SYNC)
80 return repl_semisync.commitTrx(log_file, log_pos);
81 return 0;
82 }
83
repl_semi_report_before_dml(Trans_param * param,int & out)84 int repl_semi_report_before_dml(Trans_param *param, int& out)
85 {
86 return 0;
87 }
88
repl_semi_request_commit(Trans_param * param)89 int repl_semi_request_commit(Trans_param *param)
90 {
91 return 0;
92 }
93
repl_semi_report_before_commit(Trans_param * param)94 int repl_semi_report_before_commit(Trans_param *param)
95 {
96 return 0;
97 }
98
repl_semi_report_before_rollback(Trans_param * param)99 int repl_semi_report_before_rollback(Trans_param *param)
100 {
101 return 0;
102 }
103
repl_semi_report_commit(Trans_param * param)104 int repl_semi_report_commit(Trans_param *param)
105 {
106
107 bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
108
109 if (rpl_semi_sync_master_wait_point == WAIT_AFTER_COMMIT &&
110 is_real_trans && param->log_pos)
111 {
112 const char *binlog_name= param->log_file;
113 return repl_semisync.commitTrx(binlog_name, param->log_pos);
114 }
115 return 0;
116 }
117
repl_semi_report_rollback(Trans_param * param)118 int repl_semi_report_rollback(Trans_param *param)
119 {
120 return repl_semi_report_commit(param);
121 }
122
repl_semi_binlog_dump_start(Binlog_transmit_param * param,const char * log_file,my_off_t log_pos)123 int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
124 const char *log_file,
125 my_off_t log_pos)
126 {
127 long long semi_sync_slave= 0;
128
129 /*
130 semi_sync_slave will be 0 if the user variable doesn't exist. Otherwise, it
131 will be set to the value of the user variable.
132 'rpl_semi_sync_slave = 0' means that it is not a semisync slave.
133 */
134 get_user_var_int("rpl_semi_sync_slave", &semi_sync_slave, NULL);
135
136 if (semi_sync_slave != 0)
137 {
138 if (ack_receiver.add_slave(current_thd))
139 {
140 sql_print_error("Failed to register slave to semi-sync ACK receiver "
141 "thread.");
142 return -1;
143 }
144
145 my_set_thread_local(THR_RPL_SEMI_SYNC_DUMP, &SEMI_SYNC_DUMP);
146
147 /* One more semi-sync slave */
148 repl_semisync.add_slave();
149
150 /* Tell server it will observe the transmission.*/
151 param->set_observe_flag();
152
153 /*
154 Let's assume this semi-sync slave has already received all
155 binlog events before the filename and position it requests.
156 */
157 repl_semisync.handleAck(param->server_id, log_file, log_pos);
158 }
159 else
160 param->set_dont_observe_flag();
161
162 sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
163 semi_sync_slave != 0 ? "semi-sync" : "asynchronous",
164 param->server_id, log_file, (unsigned long)log_pos);
165
166 return 0;
167 }
168
repl_semi_binlog_dump_end(Binlog_transmit_param * param)169 int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
170 {
171 bool semi_sync_slave= is_semi_sync_dump();
172
173 sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
174 semi_sync_slave ? "semi-sync" : "asynchronous",
175 param->server_id);
176 if (semi_sync_slave)
177 {
178 ack_receiver.remove_slave(current_thd);
179 /* One less semi-sync slave */
180 repl_semisync.remove_slave();
181 my_set_thread_local(THR_RPL_SEMI_SYNC_DUMP, NULL);
182 }
183 return 0;
184 }
185
repl_semi_reserve_header(Binlog_transmit_param * param,unsigned char * header,unsigned long size,unsigned long * len)186 int repl_semi_reserve_header(Binlog_transmit_param *param,
187 unsigned char *header,
188 unsigned long size, unsigned long *len)
189 {
190 if (is_semi_sync_dump())
191 *len += repl_semisync.reserveSyncHeader(header, size);
192 return 0;
193 }
194
repl_semi_before_send_event(Binlog_transmit_param * param,unsigned char * packet,unsigned long len,const char * log_file,my_off_t log_pos)195 int repl_semi_before_send_event(Binlog_transmit_param *param,
196 unsigned char *packet, unsigned long len,
197 const char *log_file, my_off_t log_pos)
198 {
199 if (!is_semi_sync_dump())
200 return 0;
201
202 return repl_semisync.updateSyncHeader(packet,
203 log_file,
204 log_pos,
205 param->server_id);
206 }
207
repl_semi_after_send_event(Binlog_transmit_param * param,const char * event_buf,unsigned long len,const char * skipped_log_file,my_off_t skipped_log_pos)208 int repl_semi_after_send_event(Binlog_transmit_param *param,
209 const char *event_buf, unsigned long len,
210 const char * skipped_log_file,
211 my_off_t skipped_log_pos)
212 {
213 if (is_semi_sync_dump())
214 {
215 if(skipped_log_pos>0)
216 repl_semisync.skipSlaveReply(event_buf, param->server_id,
217 skipped_log_file, skipped_log_pos);
218 else
219 {
220 THD *thd= current_thd;
221 /*
222 Possible errors in reading slave reply are ignored deliberately
223 because we do not want dump thread to quit on this. Error
224 messages are already reported.
225 */
226 (void) repl_semisync.readSlaveReply(
227 thd->get_protocol_classic()->get_net(),
228 param->server_id, event_buf);
229 thd->clear_error();
230 }
231 }
232 return 0;
233 }
234
repl_semi_reset_master(Binlog_transmit_param * param)235 int repl_semi_reset_master(Binlog_transmit_param *param)
236 {
237 if (repl_semisync.resetMaster())
238 return 1;
239 return 0;
240 }
241
242 C_MODE_END
243
244 /*
245 semisync system variables
246 */
247 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
248 SYS_VAR *var,
249 void *ptr,
250 const void *val);
251
252 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
253 SYS_VAR *var,
254 void *ptr,
255 const void *val);
256
257 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd,
258 SYS_VAR *var,
259 void *ptr,
260 const void *val);
261
262 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
263 SYS_VAR *var,
264 void *ptr,
265 const void *val);
266
267 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,
268 SYS_VAR *var,
269 void *ptr,
270 const void *val);
271
272 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
273 PLUGIN_VAR_OPCMDARG,
274 "Enable semi-synchronous replication master (disabled by default). ",
275 NULL, // check
276 &fix_rpl_semi_sync_master_enabled, // update
277 0);
278
279 static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
280 PLUGIN_VAR_OPCMDARG,
281 "The timeout value (in ms) for semi-synchronous replication in the master",
282 NULL, // check
283 fix_rpl_semi_sync_master_timeout, // update
284 10000, 0, ~0UL, 1);
285
286 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
287 PLUGIN_VAR_OPCMDARG,
288 "Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
289 NULL, // check
290 &fix_rpl_semi_sync_master_wait_no_slave, // update
291 1);
292
293 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
294 PLUGIN_VAR_OPCMDARG,
295 "The tracing level for semi-sync replication.",
296 NULL, // check
297 &fix_rpl_semi_sync_master_trace_level, // update
298 32, 0, ~0UL, 1);
299
300 static const char *wait_point_names[]= {"AFTER_SYNC", "AFTER_COMMIT", NullS};
301 static TYPELIB wait_point_typelib= {
302 array_elements(wait_point_names) - 1,
303 "",
304 wait_point_names,
305 NULL
306 };
307 static MYSQL_SYSVAR_ENUM(
308 wait_point, /* name */
309 rpl_semi_sync_master_wait_point, /* var */
310 PLUGIN_VAR_OPCMDARG, /* flags */
311 "Semisync can wait for slave ACKs at one of two points,"
312 "AFTER_SYNC or AFTER_COMMIT. AFTER_SYNC is the default value."
313 "AFTER_SYNC means that semisynchronous replication waits just after the "
314 "binary log file is flushed, but before the engine commits, and so "
315 "guarantees that no other sessions can see the data before replicated to "
316 "slave. AFTER_COMMIT means that semisynchronous replication waits just "
317 "after the engine commits. Other sessions may see the data before it is "
318 "replicated, even though the current session is still waiting for the commit "
319 "to end successfully.",
320 NULL, /* check() */
321 NULL, /* update() */
322 WAIT_AFTER_SYNC, /* default */
323 &wait_point_typelib /* typelib */
324 );
325
326 static MYSQL_SYSVAR_UINT(wait_for_slave_count, /* name */
327 rpl_semi_sync_master_wait_for_slave_count, /* var */
328 PLUGIN_VAR_OPCMDARG, /* flags */
329 "How many slaves the events should be replicated to. Semisynchronous "
330 "replication master will wait until all events of the transaction are "
331 "replicated to at least rpl_semi_sync_master_wait_for_slave_count slaves",
332 NULL, /* check() */
333 &fix_rpl_semi_sync_master_wait_for_slave_count, /* update */
334 1, 1, 65535, 1);
335
336 static SYS_VAR* semi_sync_master_system_vars[]= {
337 MYSQL_SYSVAR(enabled),
338 MYSQL_SYSVAR(timeout),
339 MYSQL_SYSVAR(wait_no_slave),
340 MYSQL_SYSVAR(trace_level),
341 MYSQL_SYSVAR(wait_point),
342 MYSQL_SYSVAR(wait_for_slave_count),
343 NULL,
344 };
fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)345 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
346 SYS_VAR *var,
347 void *ptr,
348 const void *val)
349 {
350 *(unsigned long *)ptr= *(unsigned long *)val;
351 repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
352 return;
353 }
354
fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)355 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
356 SYS_VAR *var,
357 void *ptr,
358 const void *val)
359 {
360 *(unsigned long *)ptr= *(unsigned long *)val;
361 repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
362 ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level);
363 return;
364 }
365
fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)366 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
367 SYS_VAR *var,
368 void *ptr,
369 const void *val)
370 {
371 *(char *)ptr= *(char *)val;
372 if (rpl_semi_sync_master_enabled)
373 {
374 if (repl_semisync.enableMaster() != 0)
375 rpl_semi_sync_master_enabled = false;
376 else if (ack_receiver.start())
377 {
378 repl_semisync.disableMaster();
379 rpl_semi_sync_master_enabled = false;
380 }
381 }
382 else
383 {
384 if (repl_semisync.disableMaster() != 0)
385 rpl_semi_sync_master_enabled = true;
386 ack_receiver.stop();
387 }
388
389 return;
390 }
391
fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)392 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,
393 SYS_VAR *var,
394 void *ptr,
395 const void *val)
396 {
397 (void) repl_semisync.setWaitSlaveCount(*(unsigned int*) val);
398 return;
399 }
400
fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)401 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd,
402 SYS_VAR *var,
403 void *ptr,
404 const void *val)
405 {
406 if (rpl_semi_sync_master_wait_no_slave != *(char *)val)
407 {
408 *(char *)ptr= *(char *)val;
409 repl_semisync.set_wait_no_slave(val);
410 }
411 return;
412 }
413
414 Trans_observer trans_observer = {
415 sizeof(Trans_observer), // len
416
417 repl_semi_report_before_dml, //before_dml
418 repl_semi_report_before_commit, // before_commit
419 repl_semi_report_before_rollback, // before_rollback
420 repl_semi_report_commit, // after_commit
421 repl_semi_report_rollback, // after_rollback
422 };
423
424 Binlog_storage_observer storage_observer = {
425 sizeof(Binlog_storage_observer), // len
426
427 repl_semi_report_binlog_update, // report_update
428 repl_semi_report_binlog_sync, // after_sync
429 };
430
431 Binlog_transmit_observer transmit_observer = {
432 sizeof(Binlog_transmit_observer), // len
433
434 repl_semi_binlog_dump_start, // start
435 repl_semi_binlog_dump_end, // stop
436 repl_semi_reserve_header, // reserve_header
437 repl_semi_before_send_event, // before_send_event
438 repl_semi_after_send_event, // after_send_event
439 repl_semi_reset_master, // reset
440 };
441
442
443 #define SHOW_FNAME(name) \
444 rpl_semi_sync_master_show_##name
445
446 #define DEF_SHOW_FUNC(name, show_type) \
447 static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
448 { \
449 repl_semisync.setExportStats(); \
450 var->type= show_type; \
451 var->value= (char *)&rpl_semi_sync_master_##name; \
452 return 0; \
453 }
454
455 DEF_SHOW_FUNC(status, SHOW_BOOL)
456 DEF_SHOW_FUNC(clients, SHOW_LONG)
457 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
458 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
459 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
460 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
461 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
462 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
463 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
464
465
466 /* plugin status variables */
467 static SHOW_VAR semi_sync_master_status_vars[]= {
468 {"Rpl_semi_sync_master_status",
469 (char*) &SHOW_FNAME(status),
470 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
471 {"Rpl_semi_sync_master_clients",
472 (char*) &SHOW_FNAME(clients),
473 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
474 {"Rpl_semi_sync_master_yes_tx",
475 (char*) &rpl_semi_sync_master_yes_transactions,
476 SHOW_LONG, SHOW_SCOPE_GLOBAL},
477 {"Rpl_semi_sync_master_no_tx",
478 (char*) &rpl_semi_sync_master_no_transactions,
479 SHOW_LONG, SHOW_SCOPE_GLOBAL},
480 {"Rpl_semi_sync_master_wait_sessions",
481 (char*) &SHOW_FNAME(wait_sessions),
482 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
483 {"Rpl_semi_sync_master_no_times",
484 (char*) &rpl_semi_sync_master_off_times,
485 SHOW_LONG, SHOW_SCOPE_GLOBAL},
486 {"Rpl_semi_sync_master_timefunc_failures",
487 (char*) &rpl_semi_sync_master_timefunc_fails,
488 SHOW_LONG, SHOW_SCOPE_GLOBAL},
489 {"Rpl_semi_sync_master_wait_pos_backtraverse",
490 (char*) &rpl_semi_sync_master_wait_pos_backtraverse,
491 SHOW_LONG, SHOW_SCOPE_GLOBAL},
492 {"Rpl_semi_sync_master_tx_wait_time",
493 (char*) &SHOW_FNAME(trx_wait_time),
494 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
495 {"Rpl_semi_sync_master_tx_waits",
496 (char*) &SHOW_FNAME(trx_wait_num),
497 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
498 {"Rpl_semi_sync_master_tx_avg_wait_time",
499 (char*) &SHOW_FNAME(avg_trx_wait_time),
500 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
501 {"Rpl_semi_sync_master_net_wait_time",
502 (char*) &SHOW_FNAME(net_wait_time),
503 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
504 {"Rpl_semi_sync_master_net_waits",
505 (char*) &SHOW_FNAME(net_wait_num),
506 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
507 {"Rpl_semi_sync_master_net_avg_wait_time",
508 (char*) &SHOW_FNAME(avg_net_wait_time),
509 SHOW_FUNC, SHOW_SCOPE_GLOBAL},
510 {NULL, NULL, SHOW_LONG, SHOW_SCOPE_GLOBAL},
511 };
512
513 #ifdef HAVE_PSI_INTERFACE
514
515 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
516 PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
517
518 static PSI_mutex_info all_semisync_mutexes[]=
519 {
520 { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0},
521 { &key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0}
522 };
523
524 PSI_cond_key key_ss_cond_COND_binlog_send_;
525 PSI_cond_key key_ss_cond_Ack_receiver_cond;
526
527 static PSI_cond_info all_semisync_conds[]=
528 {
529 { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0},
530 { &key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0}
531 };
532
533 PSI_thread_key key_ss_thread_Ack_receiver_thread;
534
535 static PSI_thread_info all_semisync_threads[]=
536 {
537 {&key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_GLOBAL}
538 };
539 #endif /* HAVE_PSI_INTERFACE */
540
541 PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
542 { 0, "Waiting for semi-sync ACK from slave", 0};
543
544 PSI_stage_info stage_waiting_for_semi_sync_slave=
545 { 0, "Waiting for semi-sync slave connection", 0};
546
547 PSI_stage_info stage_reading_semi_sync_ack=
548 { 0, "Reading semi-sync ACK from slave", 0};
549
550 /* Always defined. */
551 PSI_memory_key key_ss_memory_TranxNodeAllocator_block;
552
553 #ifdef HAVE_PSI_INTERFACE
554 PSI_stage_info *all_semisync_stages[]=
555 {
556 & stage_waiting_for_semi_sync_ack_from_slave,
557 & stage_waiting_for_semi_sync_slave,
558 & stage_reading_semi_sync_ack
559 };
560
561 PSI_memory_info all_semisync_memory[]=
562 {
563 {&key_ss_memory_TranxNodeAllocator_block, "TranxNodeAllocator::block", 0}
564 };
565
init_semisync_psi_keys(void)566 static void init_semisync_psi_keys(void)
567 {
568 const char* category= "semisync";
569 int count;
570
571 count= array_elements(all_semisync_mutexes);
572 mysql_mutex_register(category, all_semisync_mutexes, count);
573
574 count= array_elements(all_semisync_conds);
575 mysql_cond_register(category, all_semisync_conds, count);
576
577 count= array_elements(all_semisync_stages);
578 mysql_stage_register(category, all_semisync_stages, count);
579
580 count= array_elements(all_semisync_memory);
581 mysql_memory_register(category, all_semisync_memory, count);
582
583 count= array_elements(all_semisync_threads);
584 mysql_thread_register(category, all_semisync_threads, count);
585 }
586 #endif /* HAVE_PSI_INTERFACE */
587
semi_sync_master_plugin_init(void * p)588 static int semi_sync_master_plugin_init(void *p)
589 {
590 #ifdef HAVE_PSI_INTERFACE
591 init_semisync_psi_keys();
592 #endif
593
594 my_create_thread_local_key(&THR_RPL_SEMI_SYNC_DUMP, NULL);
595
596 if (repl_semisync.initObject())
597 return 1;
598 if (ack_receiver.init())
599 return 1;
600 if (register_trans_observer(&trans_observer, p))
601 return 1;
602 if (register_binlog_storage_observer(&storage_observer, p))
603 return 1;
604 if (register_binlog_transmit_observer(&transmit_observer, p))
605 return 1;
606 return 0;
607 }
608
semi_sync_master_plugin_deinit(void * p)609 static int semi_sync_master_plugin_deinit(void *p)
610 {
611 ack_receiver.stop();
612 my_delete_thread_local_key(THR_RPL_SEMI_SYNC_DUMP);
613
614 if (unregister_trans_observer(&trans_observer, p))
615 {
616 sql_print_error("unregister_trans_observer failed");
617 return 1;
618 }
619 if (unregister_binlog_storage_observer(&storage_observer, p))
620 {
621 sql_print_error("unregister_binlog_storage_observer failed");
622 return 1;
623 }
624 if (unregister_binlog_transmit_observer(&transmit_observer, p))
625 {
626 sql_print_error("unregister_binlog_transmit_observer failed");
627 return 1;
628 }
629
630 sql_print_information("unregister_replicator OK");
631 return 0;
632 }
633
634 struct Mysql_replication semi_sync_master_plugin= {
635 MYSQL_REPLICATION_INTERFACE_VERSION
636 };
637
638 /*
639 Plugin library descriptor
640 */
mysql_declare_plugin(semi_sync_master)641 mysql_declare_plugin(semi_sync_master)
642 {
643 MYSQL_REPLICATION_PLUGIN,
644 &semi_sync_master_plugin,
645 "rpl_semi_sync_master",
646 "He Zhenxing",
647 "Semi-synchronous replication master",
648 PLUGIN_LICENSE_GPL,
649 semi_sync_master_plugin_init, /* Plugin Init */
650 semi_sync_master_plugin_deinit, /* Plugin Deinit */
651 0x0100 /* 1.0 */,
652 semi_sync_master_status_vars, /* status variables */
653 semi_sync_master_system_vars, /* system variables */
654 NULL, /* config options */
655 0, /* flags */
656 }
657 mysql_declare_plugin_end;
658