1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
23 
24 
25 #include "semisync_master.h"
26 #include "sql_class.h"                          // THD
27 #include "semisync_master_ack_receiver.h"
28 
29 ReplSemiSyncMaster repl_semisync;
30 Ack_receiver ack_receiver;
31 
32 /* The places at where semisync waits for binlog ACKs. */
33 enum enum_wait_point {
34   WAIT_AFTER_SYNC,
35   WAIT_AFTER_COMMIT
36 };
37 
38 static ulong rpl_semi_sync_master_wait_point= WAIT_AFTER_COMMIT;
39 
40 static bool SEMI_SYNC_DUMP= true;
41 
42 thread_local_key_t THR_RPL_SEMI_SYNC_DUMP;
43 
is_semi_sync_dump()44 static inline bool is_semi_sync_dump()
45 {
46   /*
47     The key is only set for semisync dump threads, so it just checks if
48     the key is not NULL.
49   */
50   return my_get_thread_local(THR_RPL_SEMI_SYNC_DUMP) != NULL;
51 }
52 
53 C_MODE_START
54 
repl_semi_report_binlog_update(Binlog_storage_param * param,const char * log_file,my_off_t log_pos)55 int repl_semi_report_binlog_update(Binlog_storage_param *param,
56 				   const char *log_file,
57 				   my_off_t log_pos)
58 {
59   int  error= 0;
60 
61   if (repl_semisync.getMasterEnabled())
62   {
63     /*
64       Let us store the binlog file name and the position, so that
65       we know how long to wait for the binlog to the replicated to
66       the slave in synchronous replication.
67     */
68     error= repl_semisync.writeTranxInBinlog(log_file,
69                                             log_pos);
70   }
71 
72   return error;
73 }
74 
repl_semi_report_binlog_sync(Binlog_storage_param * param,const char * log_file,my_off_t log_pos)75 int repl_semi_report_binlog_sync(Binlog_storage_param *param,
76                                  const char *log_file,
77                                  my_off_t log_pos)
78 {
79   if (rpl_semi_sync_master_wait_point == WAIT_AFTER_SYNC)
80     return repl_semisync.commitTrx(log_file, log_pos);
81   return 0;
82 }
83 
repl_semi_report_before_dml(Trans_param * param,int & out)84 int repl_semi_report_before_dml(Trans_param *param, int& out)
85 {
86   return 0;
87 }
88 
repl_semi_request_commit(Trans_param * param)89 int repl_semi_request_commit(Trans_param *param)
90 {
91   return 0;
92 }
93 
repl_semi_report_before_commit(Trans_param * param)94 int repl_semi_report_before_commit(Trans_param *param)
95 {
96   return 0;
97 }
98 
repl_semi_report_before_rollback(Trans_param * param)99 int repl_semi_report_before_rollback(Trans_param *param)
100 {
101   return 0;
102 }
103 
repl_semi_report_commit(Trans_param * param)104 int repl_semi_report_commit(Trans_param *param)
105 {
106 
107   bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
108 
109   if (rpl_semi_sync_master_wait_point == WAIT_AFTER_COMMIT &&
110       is_real_trans && param->log_pos)
111   {
112     const char *binlog_name= param->log_file;
113     return repl_semisync.commitTrx(binlog_name, param->log_pos);
114   }
115   return 0;
116 }
117 
repl_semi_report_rollback(Trans_param * param)118 int repl_semi_report_rollback(Trans_param *param)
119 {
120   return repl_semi_report_commit(param);
121 }
122 
repl_semi_binlog_dump_start(Binlog_transmit_param * param,const char * log_file,my_off_t log_pos)123 int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
124 				 const char *log_file,
125 				 my_off_t log_pos)
126 {
127   long long semi_sync_slave= 0;
128 
129   /*
130     semi_sync_slave will be 0 if the user variable doesn't exist. Otherwise, it
131     will be set to the value of the user variable.
132     'rpl_semi_sync_slave = 0' means that it is not a semisync slave.
133   */
134   get_user_var_int("rpl_semi_sync_slave", &semi_sync_slave, NULL);
135 
136   if (semi_sync_slave != 0)
137   {
138     if (ack_receiver.add_slave(current_thd))
139     {
140       sql_print_error("Failed to register slave to semi-sync ACK receiver "
141                       "thread.");
142       return -1;
143     }
144 
145     my_set_thread_local(THR_RPL_SEMI_SYNC_DUMP, &SEMI_SYNC_DUMP);
146 
147     /* One more semi-sync slave */
148     repl_semisync.add_slave();
149 
150     /* Tell server it will observe the transmission.*/
151     param->set_observe_flag();
152 
153     /*
154       Let's assume this semi-sync slave has already received all
155       binlog events before the filename and position it requests.
156     */
157     repl_semisync.handleAck(param->server_id, log_file, log_pos);
158   }
159   else
160     param->set_dont_observe_flag();
161 
162   sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
163 			semi_sync_slave != 0 ? "semi-sync" : "asynchronous",
164 			param->server_id, log_file, (unsigned long)log_pos);
165 
166   return 0;
167 }
168 
repl_semi_binlog_dump_end(Binlog_transmit_param * param)169 int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
170 {
171   bool semi_sync_slave= is_semi_sync_dump();
172 
173   sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
174                         semi_sync_slave ? "semi-sync" : "asynchronous",
175                         param->server_id);
176   if (semi_sync_slave)
177   {
178     ack_receiver.remove_slave(current_thd);
179     /* One less semi-sync slave */
180     repl_semisync.remove_slave();
181     my_set_thread_local(THR_RPL_SEMI_SYNC_DUMP, NULL);
182   }
183   return 0;
184 }
185 
repl_semi_reserve_header(Binlog_transmit_param * param,unsigned char * header,unsigned long size,unsigned long * len)186 int repl_semi_reserve_header(Binlog_transmit_param *param,
187 			     unsigned char *header,
188 			     unsigned long size, unsigned long *len)
189 {
190   if (is_semi_sync_dump())
191     *len +=  repl_semisync.reserveSyncHeader(header, size);
192   return 0;
193 }
194 
repl_semi_before_send_event(Binlog_transmit_param * param,unsigned char * packet,unsigned long len,const char * log_file,my_off_t log_pos)195 int repl_semi_before_send_event(Binlog_transmit_param *param,
196                                 unsigned char *packet, unsigned long len,
197                                 const char *log_file, my_off_t log_pos)
198 {
199   if (!is_semi_sync_dump())
200     return 0;
201 
202   return repl_semisync.updateSyncHeader(packet,
203 					log_file,
204 					log_pos,
205 					param->server_id);
206 }
207 
repl_semi_after_send_event(Binlog_transmit_param * param,const char * event_buf,unsigned long len,const char * skipped_log_file,my_off_t skipped_log_pos)208 int repl_semi_after_send_event(Binlog_transmit_param *param,
209                                const char *event_buf, unsigned long len,
210                                const char * skipped_log_file,
211                                my_off_t skipped_log_pos)
212 {
213   if (is_semi_sync_dump())
214   {
215     if(skipped_log_pos>0)
216       repl_semisync.skipSlaveReply(event_buf, param->server_id,
217                                    skipped_log_file, skipped_log_pos);
218     else
219     {
220       THD *thd= current_thd;
221       /*
222         Possible errors in reading slave reply are ignored deliberately
223         because we do not want dump thread to quit on this. Error
224         messages are already reported.
225       */
226       (void) repl_semisync.readSlaveReply(
227         thd->get_protocol_classic()->get_net(),
228         param->server_id, event_buf);
229       thd->clear_error();
230     }
231   }
232   return 0;
233 }
234 
repl_semi_reset_master(Binlog_transmit_param * param)235 int repl_semi_reset_master(Binlog_transmit_param *param)
236 {
237   if (repl_semisync.resetMaster())
238     return 1;
239   return 0;
240 }
241 
242 C_MODE_END
243 
244 /*
245   semisync system variables
246  */
247 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
248 				      SYS_VAR *var,
249 				      void *ptr,
250 				      const void *val);
251 
252 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
253 					  SYS_VAR *var,
254 					  void *ptr,
255 					  const void *val);
256 
257 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd,
258 				      SYS_VAR *var,
259 				      void *ptr,
260 				      const void *val);
261 
262 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
263 				      SYS_VAR *var,
264 				      void *ptr,
265 				      const void *val);
266 
267 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,
268                                                           SYS_VAR *var,
269                                                           void *ptr,
270                                                           const void *val);
271 
272 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
273   PLUGIN_VAR_OPCMDARG,
274  "Enable semi-synchronous replication master (disabled by default). ",
275   NULL, 			// check
276   &fix_rpl_semi_sync_master_enabled,	// update
277   0);
278 
279 static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
280   PLUGIN_VAR_OPCMDARG,
281  "The timeout value (in ms) for semi-synchronous replication in the master",
282   NULL, 			// check
283   fix_rpl_semi_sync_master_timeout,	// update
284   10000, 0, ~0UL, 1);
285 
286 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
287   PLUGIN_VAR_OPCMDARG,
288  "Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
289   NULL, 			// check
290   &fix_rpl_semi_sync_master_wait_no_slave,  // update
291   1);
292 
293 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
294   PLUGIN_VAR_OPCMDARG,
295  "The tracing level for semi-sync replication.",
296   NULL,				  // check
297   &fix_rpl_semi_sync_master_trace_level, // update
298   32, 0, ~0UL, 1);
299 
300 static const char *wait_point_names[]= {"AFTER_SYNC", "AFTER_COMMIT", NullS};
301 static TYPELIB wait_point_typelib= {
302   array_elements(wait_point_names) - 1,
303   "",
304   wait_point_names,
305   NULL
306 };
307 static MYSQL_SYSVAR_ENUM(
308   wait_point,                      /* name     */
309   rpl_semi_sync_master_wait_point, /* var      */
310   PLUGIN_VAR_OPCMDARG,             /* flags    */
311   "Semisync can wait for slave ACKs at one of two points,"
312   "AFTER_SYNC or AFTER_COMMIT. AFTER_SYNC is the default value."
313   "AFTER_SYNC means that semisynchronous replication waits just after the "
314   "binary log file is flushed, but before the engine commits, and so "
315   "guarantees that no other sessions can see the data before replicated to "
316   "slave. AFTER_COMMIT means that semisynchronous replication waits just "
317   "after the engine commits. Other sessions may see the data before it is "
318   "replicated, even though the current session is still waiting for the commit "
319   "to end successfully.",
320   NULL,                            /* check()  */
321   NULL,                            /* update() */
322   WAIT_AFTER_SYNC,                 /* default  */
323   &wait_point_typelib              /* typelib  */
324 );
325 
326 static MYSQL_SYSVAR_UINT(wait_for_slave_count,   /* name  */
327   rpl_semi_sync_master_wait_for_slave_count,     /* var   */
328   PLUGIN_VAR_OPCMDARG,                           /* flags */
329   "How many slaves the events should be replicated to. Semisynchronous "
330   "replication master will wait until all events of the transaction are "
331   "replicated to at least rpl_semi_sync_master_wait_for_slave_count slaves",
332   NULL,                                           /* check() */
333   &fix_rpl_semi_sync_master_wait_for_slave_count, /* update */
334   1, 1, 65535, 1);
335 
336 static SYS_VAR* semi_sync_master_system_vars[]= {
337   MYSQL_SYSVAR(enabled),
338   MYSQL_SYSVAR(timeout),
339   MYSQL_SYSVAR(wait_no_slave),
340   MYSQL_SYSVAR(trace_level),
341   MYSQL_SYSVAR(wait_point),
342   MYSQL_SYSVAR(wait_for_slave_count),
343   NULL,
344 };
fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)345 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
346 				      SYS_VAR *var,
347 				      void *ptr,
348 				      const void *val)
349 {
350   *(unsigned long *)ptr= *(unsigned long *)val;
351   repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
352   return;
353 }
354 
fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)355 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
356 					  SYS_VAR *var,
357 					  void *ptr,
358 					  const void *val)
359 {
360   *(unsigned long *)ptr= *(unsigned long *)val;
361   repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
362   ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level);
363   return;
364 }
365 
fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)366 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
367 				      SYS_VAR *var,
368 				      void *ptr,
369 				      const void *val)
370 {
371   *(char *)ptr= *(char *)val;
372   if (rpl_semi_sync_master_enabled)
373   {
374     if (repl_semisync.enableMaster() != 0)
375       rpl_semi_sync_master_enabled = false;
376     else if (ack_receiver.start())
377     {
378       repl_semisync.disableMaster();
379       rpl_semi_sync_master_enabled = false;
380     }
381   }
382   else
383   {
384     if (repl_semisync.disableMaster() != 0)
385       rpl_semi_sync_master_enabled = true;
386     ack_receiver.stop();
387   }
388 
389   return;
390 }
391 
fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)392 static void fix_rpl_semi_sync_master_wait_for_slave_count(MYSQL_THD thd,
393                                                           SYS_VAR *var,
394                                                           void *ptr,
395                                                           const void *val)
396 {
397   (void) repl_semisync.setWaitSlaveCount(*(unsigned int*) val);
398   return;
399 }
400 
fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)401 static void fix_rpl_semi_sync_master_wait_no_slave(MYSQL_THD thd,
402 				      SYS_VAR *var,
403 				      void *ptr,
404 				      const void *val)
405 {
406   if (rpl_semi_sync_master_wait_no_slave != *(char *)val)
407   {
408     *(char *)ptr= *(char *)val;
409     repl_semisync.set_wait_no_slave(val);
410   }
411   return;
412 }
413 
414 Trans_observer trans_observer = {
415   sizeof(Trans_observer),		// len
416 
417   repl_semi_report_before_dml,      //before_dml
418   repl_semi_report_before_commit,   // before_commit
419   repl_semi_report_before_rollback, // before_rollback
420   repl_semi_report_commit,	// after_commit
421   repl_semi_report_rollback,	// after_rollback
422 };
423 
424 Binlog_storage_observer storage_observer = {
425   sizeof(Binlog_storage_observer), // len
426 
427   repl_semi_report_binlog_update, // report_update
428   repl_semi_report_binlog_sync,   // after_sync
429 };
430 
431 Binlog_transmit_observer transmit_observer = {
432   sizeof(Binlog_transmit_observer), // len
433 
434   repl_semi_binlog_dump_start,	// start
435   repl_semi_binlog_dump_end,	// stop
436   repl_semi_reserve_header,	// reserve_header
437   repl_semi_before_send_event,	// before_send_event
438   repl_semi_after_send_event,	// after_send_event
439   repl_semi_reset_master,	// reset
440 };
441 
442 
443 #define SHOW_FNAME(name)			\
444   rpl_semi_sync_master_show_##name
445 
446 #define DEF_SHOW_FUNC(name, show_type)					\
447   static  int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
448   {									\
449     repl_semisync.setExportStats();					\
450     var->type= show_type;						\
451     var->value= (char *)&rpl_semi_sync_master_##name;				\
452     return 0;								\
453   }
454 
455 DEF_SHOW_FUNC(status, SHOW_BOOL)
456 DEF_SHOW_FUNC(clients, SHOW_LONG)
457 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
458 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
459 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
460 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
461 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
462 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
463 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
464 
465 
466 /* plugin status variables */
467 static SHOW_VAR semi_sync_master_status_vars[]= {
468   {"Rpl_semi_sync_master_status",
469    (char*) &SHOW_FNAME(status),
470    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
471   {"Rpl_semi_sync_master_clients",
472    (char*) &SHOW_FNAME(clients),
473    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
474   {"Rpl_semi_sync_master_yes_tx",
475    (char*) &rpl_semi_sync_master_yes_transactions,
476    SHOW_LONG, SHOW_SCOPE_GLOBAL},
477   {"Rpl_semi_sync_master_no_tx",
478    (char*) &rpl_semi_sync_master_no_transactions,
479    SHOW_LONG, SHOW_SCOPE_GLOBAL},
480   {"Rpl_semi_sync_master_wait_sessions",
481    (char*) &SHOW_FNAME(wait_sessions),
482    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
483   {"Rpl_semi_sync_master_no_times",
484    (char*) &rpl_semi_sync_master_off_times,
485    SHOW_LONG, SHOW_SCOPE_GLOBAL},
486   {"Rpl_semi_sync_master_timefunc_failures",
487    (char*) &rpl_semi_sync_master_timefunc_fails,
488    SHOW_LONG, SHOW_SCOPE_GLOBAL},
489   {"Rpl_semi_sync_master_wait_pos_backtraverse",
490    (char*) &rpl_semi_sync_master_wait_pos_backtraverse,
491    SHOW_LONG, SHOW_SCOPE_GLOBAL},
492   {"Rpl_semi_sync_master_tx_wait_time",
493    (char*) &SHOW_FNAME(trx_wait_time),
494    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
495   {"Rpl_semi_sync_master_tx_waits",
496    (char*) &SHOW_FNAME(trx_wait_num),
497    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
498   {"Rpl_semi_sync_master_tx_avg_wait_time",
499    (char*) &SHOW_FNAME(avg_trx_wait_time),
500    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
501   {"Rpl_semi_sync_master_net_wait_time",
502    (char*) &SHOW_FNAME(net_wait_time),
503    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
504   {"Rpl_semi_sync_master_net_waits",
505    (char*) &SHOW_FNAME(net_wait_num),
506    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
507   {"Rpl_semi_sync_master_net_avg_wait_time",
508    (char*) &SHOW_FNAME(avg_net_wait_time),
509    SHOW_FUNC, SHOW_SCOPE_GLOBAL},
510   {NULL, NULL, SHOW_LONG, SHOW_SCOPE_GLOBAL},
511 };
512 
513 #ifdef HAVE_PSI_INTERFACE
514 
515 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
516 PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
517 
518 static PSI_mutex_info all_semisync_mutexes[]=
519 {
520   { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0},
521   { &key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0}
522 };
523 
524 PSI_cond_key key_ss_cond_COND_binlog_send_;
525 PSI_cond_key key_ss_cond_Ack_receiver_cond;
526 
527 static PSI_cond_info all_semisync_conds[]=
528 {
529   { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0},
530   { &key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0}
531 };
532 
533 PSI_thread_key key_ss_thread_Ack_receiver_thread;
534 
535 static PSI_thread_info all_semisync_threads[]=
536 {
537   {&key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_GLOBAL}
538 };
539 #endif /* HAVE_PSI_INTERFACE */
540 
541 PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
542 { 0, "Waiting for semi-sync ACK from slave", 0};
543 
544 PSI_stage_info stage_waiting_for_semi_sync_slave=
545 { 0, "Waiting for semi-sync slave connection", 0};
546 
547 PSI_stage_info stage_reading_semi_sync_ack=
548 { 0, "Reading semi-sync ACK from slave", 0};
549 
550 /* Always defined. */
551 PSI_memory_key key_ss_memory_TranxNodeAllocator_block;
552 
553 #ifdef HAVE_PSI_INTERFACE
554 PSI_stage_info *all_semisync_stages[]=
555 {
556   & stage_waiting_for_semi_sync_ack_from_slave,
557   & stage_waiting_for_semi_sync_slave,
558   & stage_reading_semi_sync_ack
559 };
560 
561 PSI_memory_info all_semisync_memory[]=
562 {
563   {&key_ss_memory_TranxNodeAllocator_block, "TranxNodeAllocator::block", 0}
564 };
565 
init_semisync_psi_keys(void)566 static void init_semisync_psi_keys(void)
567 {
568   const char* category= "semisync";
569   int count;
570 
571   count= array_elements(all_semisync_mutexes);
572   mysql_mutex_register(category, all_semisync_mutexes, count);
573 
574   count= array_elements(all_semisync_conds);
575   mysql_cond_register(category, all_semisync_conds, count);
576 
577   count= array_elements(all_semisync_stages);
578   mysql_stage_register(category, all_semisync_stages, count);
579 
580   count= array_elements(all_semisync_memory);
581   mysql_memory_register(category, all_semisync_memory, count);
582 
583   count= array_elements(all_semisync_threads);
584   mysql_thread_register(category, all_semisync_threads, count);
585 }
586 #endif /* HAVE_PSI_INTERFACE */
587 
semi_sync_master_plugin_init(void * p)588 static int semi_sync_master_plugin_init(void *p)
589 {
590 #ifdef HAVE_PSI_INTERFACE
591   init_semisync_psi_keys();
592 #endif
593 
594   my_create_thread_local_key(&THR_RPL_SEMI_SYNC_DUMP, NULL);
595 
596   if (repl_semisync.initObject())
597     return 1;
598   if (ack_receiver.init())
599     return 1;
600   if (register_trans_observer(&trans_observer, p))
601     return 1;
602   if (register_binlog_storage_observer(&storage_observer, p))
603     return 1;
604   if (register_binlog_transmit_observer(&transmit_observer, p))
605     return 1;
606   return 0;
607 }
608 
semi_sync_master_plugin_deinit(void * p)609 static int semi_sync_master_plugin_deinit(void *p)
610 {
611   ack_receiver.stop();
612   my_delete_thread_local_key(THR_RPL_SEMI_SYNC_DUMP);
613 
614   if (unregister_trans_observer(&trans_observer, p))
615   {
616     sql_print_error("unregister_trans_observer failed");
617     return 1;
618   }
619   if (unregister_binlog_storage_observer(&storage_observer, p))
620   {
621     sql_print_error("unregister_binlog_storage_observer failed");
622     return 1;
623   }
624   if (unregister_binlog_transmit_observer(&transmit_observer, p))
625   {
626     sql_print_error("unregister_binlog_transmit_observer failed");
627     return 1;
628   }
629 
630   sql_print_information("unregister_replicator OK");
631   return 0;
632 }
633 
634 struct Mysql_replication semi_sync_master_plugin= {
635   MYSQL_REPLICATION_INTERFACE_VERSION
636 };
637 
638 /*
639   Plugin library descriptor
640 */
mysql_declare_plugin(semi_sync_master)641 mysql_declare_plugin(semi_sync_master)
642 {
643   MYSQL_REPLICATION_PLUGIN,
644   &semi_sync_master_plugin,
645   "rpl_semi_sync_master",
646   "He Zhenxing",
647   "Semi-synchronous replication master",
648   PLUGIN_LICENSE_GPL,
649   semi_sync_master_plugin_init, /* Plugin Init */
650   semi_sync_master_plugin_deinit, /* Plugin Deinit */
651   0x0100 /* 1.0 */,
652   semi_sync_master_status_vars,	/* status variables */
653   semi_sync_master_system_vars,	/* system variables */
654   NULL,                         /* config options */
655   0,                            /* flags */
656 }
657 mysql_declare_plugin_end;
658