1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 #include <stddef.h>
25 #include <sys/types.h>
26 
27 #include "my_inttypes.h"
28 #include "my_macros.h"
29 #include "my_psi_config.h"
30 #include "mysql/psi/mysql_memory.h"
31 #include "mysql/psi/mysql_stage.h"
32 #include "plugin/semisync/semisync_master.h"
33 #include "plugin/semisync/semisync_master_ack_receiver.h"
34 #include "sql/current_thd.h"
35 #include "sql/protocol_classic.h"
36 #include "sql/sql_class.h"  // THD
37 #include "typelib.h"
38 
39 ReplSemiSyncMaster *repl_semisync = nullptr;
40 Ack_receiver *ack_receiver = nullptr;
41 
42 /* The places at where semisync waits for binlog ACKs. */
43 enum enum_wait_point { WAIT_AFTER_SYNC, WAIT_AFTER_COMMIT };
44 
45 static ulong rpl_semi_sync_master_wait_point = WAIT_AFTER_COMMIT;
46 
47 thread_local bool THR_RPL_SEMI_SYNC_DUMP = false;
48 
49 static SERVICE_TYPE(registry) *reg_srv = nullptr;
50 SERVICE_TYPE(log_builtins) *log_bi = nullptr;
51 SERVICE_TYPE(log_builtins_string) *log_bs = nullptr;
52 
is_semi_sync_dump()53 static inline bool is_semi_sync_dump() { return THR_RPL_SEMI_SYNC_DUMP; }
54 
repl_semi_report_binlog_update(Binlog_storage_param *,const char * log_file,my_off_t log_pos)55 static int repl_semi_report_binlog_update(Binlog_storage_param *,
56                                           const char *log_file,
57                                           my_off_t log_pos) {
58   int error = 0;
59 
60   if (repl_semisync->getMasterEnabled()) {
61     /*
62       Let us store the binlog file name and the position, so that
63       we know how long to wait for the binlog to the replicated to
64       the slave in synchronous replication.
65     */
66     error = repl_semisync->writeTranxInBinlog(log_file, log_pos);
67   }
68 
69   return error;
70 }
71 
repl_semi_report_binlog_sync(Binlog_storage_param *,const char * log_file,my_off_t log_pos)72 static int repl_semi_report_binlog_sync(Binlog_storage_param *,
73                                         const char *log_file,
74                                         my_off_t log_pos) {
75   if (rpl_semi_sync_master_wait_point == WAIT_AFTER_SYNC)
76     return repl_semisync->commitTrx(log_file, log_pos);
77   return 0;
78 }
79 
repl_semi_report_before_dml(Trans_param *,int &)80 static int repl_semi_report_before_dml(Trans_param *, int &) { return 0; }
81 
repl_semi_report_before_commit(Trans_param *)82 static int repl_semi_report_before_commit(Trans_param *) { return 0; }
83 
repl_semi_report_before_rollback(Trans_param *)84 static int repl_semi_report_before_rollback(Trans_param *) { return 0; }
85 
repl_semi_report_commit(Trans_param * param)86 static int repl_semi_report_commit(Trans_param *param) {
87   bool is_real_trans = param->flags & TRANS_IS_REAL_TRANS;
88 
89   if (rpl_semi_sync_master_wait_point == WAIT_AFTER_COMMIT && is_real_trans &&
90       param->log_pos) {
91     const char *binlog_name = param->log_file;
92     return repl_semisync->commitTrx(binlog_name, param->log_pos);
93   }
94   return 0;
95 }
96 
repl_semi_report_rollback(Trans_param * param)97 static int repl_semi_report_rollback(Trans_param *param) {
98   return repl_semi_report_commit(param);
99 }
100 
repl_semi_report_begin(Trans_param *,int &)101 static int repl_semi_report_begin(Trans_param *, int &) { return 0; }
102 
repl_semi_binlog_dump_start(Binlog_transmit_param * param,const char * log_file,my_off_t log_pos)103 static int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
104                                        const char *log_file, my_off_t log_pos) {
105   long long semi_sync_slave = 0;
106 
107   /*
108     semi_sync_slave will be 0 if the user variable doesn't exist. Otherwise, it
109     will be set to the value of the user variable.
110     'rpl_semi_sync_slave = 0' means that it is not a semisync slave.
111   */
112   get_user_var_int("rpl_semi_sync_slave", &semi_sync_slave, nullptr);
113 
114   if (semi_sync_slave != 0) {
115     if (ack_receiver->add_slave(current_thd)) {
116       LogErr(ERROR_LEVEL, ER_SEMISYNC_FAILED_REGISTER_SLAVE_TO_RECEIVER);
117       return -1;
118     }
119 
120     THR_RPL_SEMI_SYNC_DUMP = true;
121 
122     /* One more semi-sync slave */
123     repl_semisync->add_slave();
124 
125     /* Tell server it will observe the transmission.*/
126     param->set_observe_flag();
127 
128     /*
129       Let's assume this semi-sync slave has already received all
130       binlog events before the filename and position it requests.
131     */
132     repl_semisync->handleAck(param->server_id, log_file, log_pos);
133   } else
134     param->set_dont_observe_flag();
135 
136   LogErr(INFORMATION_LEVEL, ER_SEMISYNC_START_BINLOG_DUMP_TO_SLAVE,
137          semi_sync_slave != 0 ? "semi-sync" : "asynchronous", param->server_id,
138          log_file, (unsigned long)log_pos);
139   return 0;
140 }
141 
repl_semi_binlog_dump_end(Binlog_transmit_param * param)142 static int repl_semi_binlog_dump_end(Binlog_transmit_param *param) {
143   bool semi_sync_slave = is_semi_sync_dump();
144 
145   LogErr(INFORMATION_LEVEL, ER_SEMISYNC_STOP_BINLOG_DUMP_TO_SLAVE,
146          semi_sync_slave ? "semi-sync" : "asynchronous", param->server_id);
147 
148   if (semi_sync_slave) {
149     ack_receiver->remove_slave(current_thd);
150     /* One less semi-sync slave */
151     repl_semisync->remove_slave();
152     THR_RPL_SEMI_SYNC_DUMP = false;
153   }
154   return 0;
155 }
156 
repl_semi_reserve_header(Binlog_transmit_param *,unsigned char * header,unsigned long size,unsigned long * len)157 static int repl_semi_reserve_header(Binlog_transmit_param *,
158                                     unsigned char *header, unsigned long size,
159                                     unsigned long *len) {
160   if (is_semi_sync_dump())
161     *len += repl_semisync->reserveSyncHeader(header, size);
162   return 0;
163 }
164 
repl_semi_before_send_event(Binlog_transmit_param * param,unsigned char * packet,unsigned long,const char * log_file,my_off_t log_pos)165 static int repl_semi_before_send_event(Binlog_transmit_param *param,
166                                        unsigned char *packet, unsigned long,
167                                        const char *log_file, my_off_t log_pos) {
168   if (!is_semi_sync_dump()) return 0;
169 
170   return repl_semisync->updateSyncHeader(packet, log_file, log_pos,
171                                          param->server_id);
172 }
173 
repl_semi_after_send_event(Binlog_transmit_param * param,const char * event_buf,unsigned long,const char * skipped_log_file,my_off_t skipped_log_pos)174 static int repl_semi_after_send_event(Binlog_transmit_param *param,
175                                       const char *event_buf, unsigned long,
176                                       const char *skipped_log_file,
177                                       my_off_t skipped_log_pos) {
178   if (is_semi_sync_dump()) {
179     if (skipped_log_pos > 0)
180       repl_semisync->skipSlaveReply(event_buf, param->server_id,
181                                     skipped_log_file, skipped_log_pos);
182     else {
183       THD *thd = current_thd;
184       /*
185         Possible errors in reading slave reply are ignored deliberately
186         because we do not want dump thread to quit on this. Error
187         messages are already reported.
188       */
189       (void)repl_semisync->readSlaveReply(
190           thd->get_protocol_classic()->get_net(), event_buf);
191       thd->clear_error();
192     }
193   }
194   return 0;
195 }
196 
repl_semi_reset_master(Binlog_transmit_param *)197 static int repl_semi_reset_master(Binlog_transmit_param *) {
198   if (repl_semisync->resetMaster()) return 1;
199   return 0;
200 }
201 
202 /*
203   semisync system variables
204  */
205 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, SYS_VAR *var,
206                                              void *ptr, const void *val);
207 
208 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd, SYS_VAR *var,
209                                                  void *ptr, const void *val);
210 
211 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd, SYS_VAR *var,
212                                                    void *ptr, const void *val);
213 
214 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, SYS_VAR *var,
215                                              void *ptr, const void *val);
216 
217 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,
218                                                           SYS_VAR *var,
219                                                           void *ptr,
220                                                           const void *val);
221 
222 static MYSQL_SYSVAR_BOOL(
223     enabled, rpl_semi_sync_master_enabled, PLUGIN_VAR_OPCMDARG,
224     "Enable semi-synchronous replication master (disabled by default). ",
225     nullptr,                            // check
226     &fix_rpl_semi_sync_master_enabled,  // update
227     0);
228 
229 static MYSQL_SYSVAR_ULONG(
230     timeout, rpl_semi_sync_master_timeout, PLUGIN_VAR_OPCMDARG,
231     "The timeout value (in ms) for semi-synchronous replication in the master",
232     nullptr,                           // check
233     fix_rpl_semi_sync_master_timeout,  // update
234     10000, 0, ~0UL, 1);
235 
236 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
237                          PLUGIN_VAR_OPCMDARG,
238                          "Wait until timeout when no semi-synchronous "
239                          "replication slave available (enabled by default). ",
240                          nullptr,                                  // check
241                          &fix_rpl_semi_sync_master_wait_no_slave,  // update
242                          1);
243 
244 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
245                           PLUGIN_VAR_OPCMDARG,
246                           "The tracing level for semi-sync replication.",
247                           nullptr,                                // check
248                           &fix_rpl_semi_sync_master_trace_level,  // update
249                           32, 0, ~0UL, 1);
250 
251 static const char *wait_point_names[] = {"AFTER_SYNC", "AFTER_COMMIT", NullS};
252 static TYPELIB wait_point_typelib = {array_elements(wait_point_names) - 1, "",
253                                      wait_point_names, nullptr};
254 static MYSQL_SYSVAR_ENUM(
255     wait_point,                      /* name     */
256     rpl_semi_sync_master_wait_point, /* var      */
257     PLUGIN_VAR_OPCMDARG,             /* flags    */
258     "Semisync can wait for slave ACKs at one of two points,"
259     "AFTER_SYNC or AFTER_COMMIT. AFTER_SYNC is the default value."
260     "AFTER_SYNC means that semisynchronous replication waits just after the "
261     "binary log file is flushed, but before the engine commits, and so "
262     "guarantees that no other sessions can see the data before replicated to "
263     "slave. AFTER_COMMIT means that semisynchronous replication waits just "
264     "after the engine commits. Other sessions may see the data before it is "
265     "replicated, even though the current session is still waiting for the "
266     "commit "
267     "to end successfully.",
268     nullptr,            /* check()  */
269     nullptr,            /* update() */
270     WAIT_AFTER_SYNC,    /* default  */
271     &wait_point_typelib /* typelib  */
272 );
273 
274 static MYSQL_SYSVAR_UINT(
275     wait_for_slave_count,                      /* name  */
276     rpl_semi_sync_master_wait_for_slave_count, /* var   */
277     PLUGIN_VAR_OPCMDARG,                       /* flags */
278     "How many slaves the events should be replicated to. Semisynchronous "
279     "replication master will wait until all events of the transaction are "
280     "replicated to at least rpl_semi_sync_master_wait_for_slave_count slaves",
281     nullptr,                                        /* check() */
282     &fix_rpl_semi_sync_master_wait_for_slave_count, /* update */
283     1, 1, 65535, 1);
284 
285 static SYS_VAR *semi_sync_master_system_vars[] = {
286     MYSQL_SYSVAR(enabled),
287     MYSQL_SYSVAR(timeout),
288     MYSQL_SYSVAR(wait_no_slave),
289     MYSQL_SYSVAR(trace_level),
290     MYSQL_SYSVAR(wait_point),
291     MYSQL_SYSVAR(wait_for_slave_count),
292     nullptr,
293 };
fix_rpl_semi_sync_master_timeout(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)294 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD, SYS_VAR *, void *ptr,
295                                              const void *val) {
296   *static_cast<unsigned long *>(ptr) = *static_cast<const unsigned long *>(val);
297   repl_semisync->setWaitTimeout(rpl_semi_sync_master_timeout);
298   return;
299 }
300 
fix_rpl_semi_sync_master_trace_level(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)301 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD, SYS_VAR *,
302                                                  void *ptr, const void *val) {
303   *static_cast<unsigned long *>(ptr) = *static_cast<const unsigned long *>(val);
304   repl_semisync->setTraceLevel(rpl_semi_sync_master_trace_level);
305   ack_receiver->setTraceLevel(rpl_semi_sync_master_trace_level);
306   return;
307 }
308 
fix_rpl_semi_sync_master_enabled(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)309 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD, SYS_VAR *, void *ptr,
310                                              const void *val) {
311   *static_cast<char *>(ptr) = *static_cast<const char *>(val);
312   if (rpl_semi_sync_master_enabled) {
313     if (repl_semisync->enableMaster() != 0)
314       rpl_semi_sync_master_enabled = false;
315     else if (ack_receiver->start()) {
316       repl_semisync->disableMaster();
317       rpl_semi_sync_master_enabled = false;
318     }
319   } else {
320     if (repl_semisync->disableMaster() != 0)
321       rpl_semi_sync_master_enabled = true;
322     ack_receiver->stop();
323   }
324 
325   return;
326 }
327 
fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD,SYS_VAR *,void *,const void * val)328 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD, SYS_VAR *,
329                                                           void *,
330                                                           const void *val) {
331   (void)repl_semisync->setWaitSlaveCount(
332       *static_cast<const unsigned int *>(val));
333 }
334 
fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD,SYS_VAR *,void * ptr,const void * val)335 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD, SYS_VAR *,
336                                                    void *ptr, const void *val) {
337   if (rpl_semi_sync_master_wait_no_slave != *static_cast<const char *>(val)) {
338     *static_cast<char *>(ptr) = *static_cast<const char *>(val);
339     repl_semisync->set_wait_no_slave(val);
340   }
341 }
342 
343 Trans_observer trans_observer = {
344     sizeof(Trans_observer),  // len
345 
346     repl_semi_report_before_dml,       // before_dml
347     repl_semi_report_before_commit,    // before_commit
348     repl_semi_report_before_rollback,  // before_rollback
349     repl_semi_report_commit,           // after_commit
350     repl_semi_report_rollback,         // after_rollback
351     repl_semi_report_begin,            // begin
352 };
353 
354 Binlog_storage_observer storage_observer = {
355     sizeof(Binlog_storage_observer),  // len
356 
357     repl_semi_report_binlog_update,  // report_update
358     repl_semi_report_binlog_sync,    // after_sync
359 };
360 
361 Binlog_transmit_observer transmit_observer = {
362     sizeof(Binlog_transmit_observer),  // len
363 
364     repl_semi_binlog_dump_start,  // start
365     repl_semi_binlog_dump_end,    // stop
366     repl_semi_reserve_header,     // reserve_header
367     repl_semi_before_send_event,  // before_send_event
368     repl_semi_after_send_event,   // after_send_event
369     repl_semi_reset_master,       // reset
370 };
371 
372 #define SHOW_FNAME(name) rpl_semi_sync_master_show_##name
373 
374 #define DEF_SHOW_FUNC(name, show_type)                             \
375   static int SHOW_FNAME(name)(MYSQL_THD, SHOW_VAR * var, char *) { \
376     repl_semisync->setExportStats();                               \
377     var->type = show_type;                                         \
378     var->value = (char *)&rpl_semi_sync_master_##name;             \
379     return 0;                                                      \
380   }
381 
382 DEF_SHOW_FUNC(status, SHOW_BOOL)
383 DEF_SHOW_FUNC(clients, SHOW_LONG)
384 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
385 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
386 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
387 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
388 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
389 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
390 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
391 
392 /* plugin status variables */
393 static SHOW_VAR semi_sync_master_status_vars[] = {
394     {"Rpl_semi_sync_master_status", (char *)&SHOW_FNAME(status), SHOW_FUNC,
395      SHOW_SCOPE_GLOBAL},
396     {"Rpl_semi_sync_master_clients", (char *)&SHOW_FNAME(clients), SHOW_FUNC,
397      SHOW_SCOPE_GLOBAL},
398     {"Rpl_semi_sync_master_yes_tx",
399      (char *)&rpl_semi_sync_master_yes_transactions, SHOW_LONG,
400      SHOW_SCOPE_GLOBAL},
401     {"Rpl_semi_sync_master_no_tx",
402      (char *)&rpl_semi_sync_master_no_transactions, SHOW_LONG,
403      SHOW_SCOPE_GLOBAL},
404     {"Rpl_semi_sync_master_wait_sessions", (char *)&SHOW_FNAME(wait_sessions),
405      SHOW_FUNC, SHOW_SCOPE_GLOBAL},
406     {"Rpl_semi_sync_master_no_times", (char *)&rpl_semi_sync_master_off_times,
407      SHOW_LONG, SHOW_SCOPE_GLOBAL},
408     {"Rpl_semi_sync_master_timefunc_failures",
409      (char *)&rpl_semi_sync_master_timefunc_fails, SHOW_LONG,
410      SHOW_SCOPE_GLOBAL},
411     {"Rpl_semi_sync_master_wait_pos_backtraverse",
412      (char *)&rpl_semi_sync_master_wait_pos_backtraverse, SHOW_LONG,
413      SHOW_SCOPE_GLOBAL},
414     {"Rpl_semi_sync_master_tx_wait_time", (char *)&SHOW_FNAME(trx_wait_time),
415      SHOW_FUNC, SHOW_SCOPE_GLOBAL},
416     {"Rpl_semi_sync_master_tx_waits", (char *)&SHOW_FNAME(trx_wait_num),
417      SHOW_FUNC, SHOW_SCOPE_GLOBAL},
418     {"Rpl_semi_sync_master_tx_avg_wait_time",
419      (char *)&SHOW_FNAME(avg_trx_wait_time), SHOW_FUNC, SHOW_SCOPE_GLOBAL},
420     {"Rpl_semi_sync_master_net_wait_time", (char *)&SHOW_FNAME(net_wait_time),
421      SHOW_FUNC, SHOW_SCOPE_GLOBAL},
422     {"Rpl_semi_sync_master_net_waits", (char *)&SHOW_FNAME(net_wait_num),
423      SHOW_FUNC, SHOW_SCOPE_GLOBAL},
424     {"Rpl_semi_sync_master_net_avg_wait_time",
425      (char *)&SHOW_FNAME(avg_net_wait_time), SHOW_FUNC, SHOW_SCOPE_GLOBAL},
426     {nullptr, nullptr, SHOW_LONG, SHOW_SCOPE_GLOBAL},
427 };
428 
429 #ifdef HAVE_PSI_INTERFACE
430 
431 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
432 PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
433 
434 static PSI_mutex_info all_semisync_mutexes[] = {
435     {&key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0, 0, PSI_DOCUMENT_ME},
436     {&key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0, 0,
437      PSI_DOCUMENT_ME}};
438 
439 PSI_cond_key key_ss_cond_COND_binlog_send_;
440 PSI_cond_key key_ss_cond_Ack_receiver_cond;
441 
442 static PSI_cond_info all_semisync_conds[] = {
443     {&key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0, 0,
444      PSI_DOCUMENT_ME},
445     {&key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0, 0,
446      PSI_DOCUMENT_ME}};
447 
448 PSI_thread_key key_ss_thread_Ack_receiver_thread;
449 
450 static PSI_thread_info all_semisync_threads[] = {
451     {&key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_SINGLETON, 0,
452      PSI_DOCUMENT_ME}};
453 #endif /* HAVE_PSI_INTERFACE */
454 
455 PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave = {
456     0, "Waiting for semi-sync ACK from slave", 0, PSI_DOCUMENT_ME};
457 
458 PSI_stage_info stage_waiting_for_semi_sync_slave = {
459     0, "Waiting for semi-sync slave connection", 0, PSI_DOCUMENT_ME};
460 
461 PSI_stage_info stage_reading_semi_sync_ack = {
462     0, "Reading semi-sync ACK from slave", 0, PSI_DOCUMENT_ME};
463 
464 /* Always defined. */
465 PSI_memory_key key_ss_memory_TranxNodeAllocator_block;
466 
467 #ifdef HAVE_PSI_INTERFACE
468 PSI_stage_info *all_semisync_stages[] = {
469     &stage_waiting_for_semi_sync_ack_from_slave,
470     &stage_waiting_for_semi_sync_slave, &stage_reading_semi_sync_ack};
471 
472 PSI_memory_info all_semisync_memory[] = {
473     {&key_ss_memory_TranxNodeAllocator_block, "TranxNodeAllocator::block", 0, 0,
474      PSI_DOCUMENT_ME}};
475 
init_semisync_psi_keys(void)476 static void init_semisync_psi_keys(void) {
477   const char *category = "semisync";
478   int count;
479 
480   count = static_cast<int>(array_elements(all_semisync_mutexes));
481   mysql_mutex_register(category, all_semisync_mutexes, count);
482 
483   count = static_cast<int>(array_elements(all_semisync_conds));
484   mysql_cond_register(category, all_semisync_conds, count);
485 
486   count = static_cast<int>(array_elements(all_semisync_stages));
487   mysql_stage_register(category, all_semisync_stages, count);
488 
489   count = static_cast<int>(array_elements(all_semisync_memory));
490   mysql_memory_register(category, all_semisync_memory, count);
491 
492   count = static_cast<int>(array_elements(all_semisync_threads));
493   mysql_thread_register(category, all_semisync_threads, count);
494 }
495 #endif /* HAVE_PSI_INTERFACE */
496 
semi_sync_master_plugin_init(void * p)497 static int semi_sync_master_plugin_init(void *p) {
498   // Initialize error logging service.
499   if (init_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs)) return 1;
500 
501 #ifdef HAVE_PSI_INTERFACE
502   init_semisync_psi_keys();
503 #endif
504 
505   THR_RPL_SEMI_SYNC_DUMP = false;
506 
507   /*
508     In case the plugin has been unloaded, and reloaded, we may need to
509     re-initialize some global variables.
510     These are initialized to zero by the linker, but may need to be
511     re-initialized
512   */
513   rpl_semi_sync_master_no_transactions = 0;
514   rpl_semi_sync_master_yes_transactions = 0;
515 
516   repl_semisync = new ReplSemiSyncMaster();
517   ack_receiver = new Ack_receiver();
518 
519   if (repl_semisync->initObject()) {
520     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
521     return 1;
522   }
523   if (ack_receiver->init()) {
524     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
525     return 1;
526   }
527   if (register_trans_observer(&trans_observer, p)) {
528     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
529     return 1;
530   }
531   if (register_binlog_storage_observer(&storage_observer, p)) {
532     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
533     return 1;
534   }
535   if (register_binlog_transmit_observer(&transmit_observer, p)) {
536     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
537     return 1;
538   }
539   return 0;
540 }
541 
semi_sync_master_plugin_deinit(void * p)542 static int semi_sync_master_plugin_deinit(void *p) {
543   // the plugin was not initialized, there is nothing to do here
544   if (ack_receiver == nullptr || repl_semisync == nullptr) return 0;
545 
546   THR_RPL_SEMI_SYNC_DUMP = false;
547 
548   if (unregister_trans_observer(&trans_observer, p)) {
549     LogErr(ERROR_LEVEL, ER_SEMISYNC_UNREGISTER_TRX_OBSERVER_FAILED);
550     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
551     return 1;
552   }
553   if (unregister_binlog_storage_observer(&storage_observer, p)) {
554     LogErr(ERROR_LEVEL, ER_SEMISYNC_UNREGISTER_BINLOG_STORAGE_OBSERVER_FAILED);
555     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
556     return 1;
557   }
558   if (unregister_binlog_transmit_observer(&transmit_observer, p)) {
559     LogErr(ERROR_LEVEL, ER_SEMISYNC_UNREGISTER_BINLOG_TRANSMIT_OBSERVER_FAILED);
560     deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
561     return 1;
562   }
563   delete ack_receiver;
564   ack_receiver = nullptr;
565   delete repl_semisync;
566   repl_semisync = nullptr;
567 
568   LogErr(INFORMATION_LEVEL, ER_SEMISYNC_UNREGISTERED_REPLICATOR);
569   deinit_logging_service_for_plugin(&reg_srv, &log_bi, &log_bs);
570   return 0;
571 }
572 
573 struct Mysql_replication semi_sync_master_plugin = {
574     MYSQL_REPLICATION_INTERFACE_VERSION};
575 
576 /*
577   Plugin library descriptor
578 */
mysql_declare_plugin(semi_sync_master)579 mysql_declare_plugin(semi_sync_master){
580     MYSQL_REPLICATION_PLUGIN,
581     &semi_sync_master_plugin,
582     "rpl_semi_sync_master",
583     PLUGIN_AUTHOR_ORACLE,
584     "Semi-synchronous replication master",
585     PLUGIN_LICENSE_GPL,
586     semi_sync_master_plugin_init,   /* Plugin Init */
587     nullptr,                        /* Plugin Check uninstall */
588     semi_sync_master_plugin_deinit, /* Plugin Deinit */
589     0x0100 /* 1.0 */,
590     semi_sync_master_status_vars, /* status variables */
591     semi_sync_master_system_vars, /* system variables */
592     nullptr,                      /* config options */
593     0,                            /* flags */
594 } mysql_declare_plugin_end;
595