1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3    Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
17 
18 
19 #include "semisync_master.h"
20 #include "sql_class.h"                          // THD
21 
22 ReplSemiSyncMaster repl_semisync;
23 
24 C_MODE_START
25 
repl_semi_report_binlog_update(Binlog_storage_param * param,const char * log_file,my_off_t log_pos,uint32 flags)26 int repl_semi_report_binlog_update(Binlog_storage_param *param,
27 				   const char *log_file,
28 				   my_off_t log_pos, uint32 flags)
29 {
30   int  error= 0;
31 
32   if (repl_semisync.getMasterEnabled())
33   {
34     /*
35       Let us store the binlog file name and the position, so that
36       we know how long to wait for the binlog to the replicated to
37       the slave in synchronous replication.
38     */
39     error= repl_semisync.writeTranxInBinlog(log_file,
40                                             log_pos);
41   }
42 
43   return error;
44 }
45 
repl_semi_request_commit(Trans_param * param)46 int repl_semi_request_commit(Trans_param *param)
47 {
48   return 0;
49 }
50 
repl_semi_report_commit(Trans_param * param)51 int repl_semi_report_commit(Trans_param *param)
52 {
53 
54   bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
55 
56   if (is_real_trans && param->log_pos)
57   {
58     const char *binlog_name= param->log_file;
59     return repl_semisync.commitTrx(binlog_name, param->log_pos);
60   }
61   return 0;
62 }
63 
repl_semi_report_rollback(Trans_param * param)64 int repl_semi_report_rollback(Trans_param *param)
65 {
66   return repl_semi_report_commit(param);
67 }
68 
repl_semi_binlog_dump_start(Binlog_transmit_param * param,const char * log_file,my_off_t log_pos)69 int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
70 				 const char *log_file,
71 				 my_off_t log_pos)
72 {
73   bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
74 
75   if (semi_sync_slave)
76   {
77     /* One more semi-sync slave */
78     repl_semisync.add_slave();
79 
80     /*
81       Let's assume this semi-sync slave has already received all
82       binlog events before the filename and position it requests.
83     */
84     repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
85   }
86   sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
87 			semi_sync_slave ? "semi-sync" : "asynchronous",
88 			param->server_id, log_file, (unsigned long)log_pos);
89 
90   return 0;
91 }
92 
repl_semi_binlog_dump_end(Binlog_transmit_param * param)93 int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
94 {
95   bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
96 
97   sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
98                         semi_sync_slave ? "semi-sync" : "asynchronous",
99                         param->server_id);
100   if (semi_sync_slave)
101   {
102     /* One less semi-sync slave */
103     repl_semisync.remove_slave();
104   }
105   return 0;
106 }
107 
repl_semi_reserve_header(Binlog_transmit_param * param,unsigned char * header,unsigned long size,unsigned long * len)108 int repl_semi_reserve_header(Binlog_transmit_param *param,
109 			     unsigned char *header,
110 			     unsigned long size, unsigned long *len)
111 {
112   *len +=  repl_semisync.reserveSyncHeader(header, size);
113   return 0;
114 }
115 
repl_semi_before_send_event(Binlog_transmit_param * param,unsigned char * packet,unsigned long len,const char * log_file,my_off_t log_pos)116 int repl_semi_before_send_event(Binlog_transmit_param *param,
117                                 unsigned char *packet, unsigned long len,
118                                 const char *log_file, my_off_t log_pos)
119 {
120   return repl_semisync.updateSyncHeader(packet,
121 					log_file,
122 					log_pos,
123 					param->server_id);
124 }
125 
repl_semi_after_send_event(Binlog_transmit_param * param,const char * event_buf,unsigned long len)126 int repl_semi_after_send_event(Binlog_transmit_param *param,
127                                const char *event_buf, unsigned long len)
128 {
129   if (repl_semisync.is_semi_sync_slave())
130   {
131     THD *thd= current_thd;
132     /*
133       Possible errors in reading slave reply are ignored deliberately
134       because we do not want dump thread to quit on this. Error
135       messages are already reported.
136     */
137     (void) repl_semisync.readSlaveReply(&thd->net,
138                                         param->server_id, event_buf);
139     thd->clear_error();
140   }
141   return 0;
142 }
143 
repl_semi_reset_master(Binlog_transmit_param * param)144 int repl_semi_reset_master(Binlog_transmit_param *param)
145 {
146   if (repl_semisync.resetMaster())
147     return 1;
148   return 0;
149 }
150 
151 C_MODE_END
152 
153 /*
154   semisync system variables
155  */
156 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
157 				      SYS_VAR *var,
158 				      void *ptr,
159 				      const void *val);
160 
161 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
162 					  SYS_VAR *var,
163 					  void *ptr,
164 					  const void *val);
165 
166 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
167 				      SYS_VAR *var,
168 				      void *ptr,
169 				      const void *val);
170 
171 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
172   PLUGIN_VAR_OPCMDARG,
173  "Enable semi-synchronous replication master (disabled by default). ",
174   NULL, 			// check
175   &fix_rpl_semi_sync_master_enabled,	// update
176   0);
177 
178 static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
179   PLUGIN_VAR_OPCMDARG,
180  "The timeout value (in ms) for semi-synchronous replication in the master",
181   NULL, 			// check
182   fix_rpl_semi_sync_master_timeout,	// update
183   10000, 0, ~0UL, 1);
184 
185 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
186   PLUGIN_VAR_OPCMDARG,
187  "Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
188   NULL, 			// check
189   NULL,                         // update
190   1);
191 
192 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
193   PLUGIN_VAR_OPCMDARG,
194  "The tracing level for semi-sync replication.",
195   NULL,				  // check
196   &fix_rpl_semi_sync_master_trace_level, // update
197   32, 0, ~0UL, 1);
198 
199 static SYS_VAR* semi_sync_master_system_vars[]= {
200   MYSQL_SYSVAR(enabled),
201   MYSQL_SYSVAR(timeout),
202   MYSQL_SYSVAR(wait_no_slave),
203   MYSQL_SYSVAR(trace_level),
204   NULL,
205 };
206 
207 
fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)208 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
209 				      SYS_VAR *var,
210 				      void *ptr,
211 				      const void *val)
212 {
213   *(unsigned long *)ptr= *(unsigned long *)val;
214   repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
215   return;
216 }
217 
fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)218 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
219 					  SYS_VAR *var,
220 					  void *ptr,
221 					  const void *val)
222 {
223   *(unsigned long *)ptr= *(unsigned long *)val;
224   repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
225   return;
226 }
227 
fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)228 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
229 				      SYS_VAR *var,
230 				      void *ptr,
231 				      const void *val)
232 {
233   *(char *)ptr= *(char *)val;
234   if (rpl_semi_sync_master_enabled)
235   {
236     if (repl_semisync.enableMaster() != 0)
237       rpl_semi_sync_master_enabled = false;
238   }
239   else
240   {
241     if (repl_semisync.disableMaster() != 0)
242       rpl_semi_sync_master_enabled = true;
243   }
244 
245   return;
246 }
247 
248 Trans_observer trans_observer = {
249   sizeof(Trans_observer),		// len
250 
251   repl_semi_report_commit,	// after_commit
252   repl_semi_report_rollback,	// after_rollback
253 };
254 
255 Binlog_storage_observer storage_observer = {
256   sizeof(Binlog_storage_observer), // len
257 
258   repl_semi_report_binlog_update, // report_update
259 };
260 
261 Binlog_transmit_observer transmit_observer = {
262   sizeof(Binlog_transmit_observer), // len
263 
264   repl_semi_binlog_dump_start,	// start
265   repl_semi_binlog_dump_end,	// stop
266   repl_semi_reserve_header,	// reserve_header
267   repl_semi_before_send_event,	// before_send_event
268   repl_semi_after_send_event,	// after_send_event
269   repl_semi_reset_master,	// reset
270 };
271 
272 
273 #define SHOW_FNAME(name)			\
274   rpl_semi_sync_master_show_##name
275 
276 #define DEF_SHOW_FUNC(name, show_type)					\
277   static  int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
278   {									\
279     repl_semisync.setExportStats();					\
280     var->type= show_type;						\
281     var->value= (char *)&rpl_semi_sync_master_##name;				\
282     return 0;								\
283   }
284 
285 DEF_SHOW_FUNC(status, SHOW_BOOL)
286 DEF_SHOW_FUNC(clients, SHOW_LONG)
287 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
288 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
289 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
290 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
291 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
292 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
293 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
294 
295 
296 /* plugin status variables */
297 static SHOW_VAR semi_sync_master_status_vars[]= {
298   {"Rpl_semi_sync_master_status",
299    (char*) &SHOW_FNAME(status),
300    SHOW_FUNC},
301   {"Rpl_semi_sync_master_clients",
302    (char*) &SHOW_FNAME(clients),
303    SHOW_FUNC},
304   {"Rpl_semi_sync_master_yes_tx",
305    (char*) &rpl_semi_sync_master_yes_transactions,
306    SHOW_LONG},
307   {"Rpl_semi_sync_master_no_tx",
308    (char*) &rpl_semi_sync_master_no_transactions,
309    SHOW_LONG},
310   {"Rpl_semi_sync_master_wait_sessions",
311    (char*) &SHOW_FNAME(wait_sessions),
312    SHOW_FUNC},
313   {"Rpl_semi_sync_master_no_times",
314    (char*) &rpl_semi_sync_master_off_times,
315    SHOW_LONG},
316   {"Rpl_semi_sync_master_timefunc_failures",
317    (char*) &rpl_semi_sync_master_timefunc_fails,
318    SHOW_LONG},
319   {"Rpl_semi_sync_master_wait_pos_backtraverse",
320    (char*) &rpl_semi_sync_master_wait_pos_backtraverse,
321    SHOW_LONG},
322   {"Rpl_semi_sync_master_tx_wait_time",
323    (char*) &SHOW_FNAME(trx_wait_time),
324    SHOW_FUNC},
325   {"Rpl_semi_sync_master_tx_waits",
326    (char*) &SHOW_FNAME(trx_wait_num),
327    SHOW_FUNC},
328   {"Rpl_semi_sync_master_tx_avg_wait_time",
329    (char*) &SHOW_FNAME(avg_trx_wait_time),
330    SHOW_FUNC},
331   {"Rpl_semi_sync_master_net_wait_time",
332    (char*) &SHOW_FNAME(net_wait_time),
333    SHOW_FUNC},
334   {"Rpl_semi_sync_master_net_waits",
335    (char*) &SHOW_FNAME(net_wait_num),
336    SHOW_FUNC},
337   {"Rpl_semi_sync_master_net_avg_wait_time",
338    (char*) &SHOW_FNAME(avg_net_wait_time),
339    SHOW_FUNC},
340   {NULL, NULL, SHOW_LONG},
341 };
342 
343 #ifdef HAVE_PSI_INTERFACE
344 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
345 
346 static PSI_mutex_info all_semisync_mutexes[]=
347 {
348   { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
349 };
350 
351 PSI_cond_key key_ss_cond_COND_binlog_send_;
352 
353 static PSI_cond_info all_semisync_conds[]=
354 {
355   { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
356 };
357 
init_semisync_psi_keys(void)358 static void init_semisync_psi_keys(void)
359 {
360   const char* category= "semisync";
361   int count;
362 
363   if (PSI_server == NULL)
364     return;
365 
366   count= array_elements(all_semisync_mutexes);
367   PSI_server->register_mutex(category, all_semisync_mutexes, count);
368 
369   count= array_elements(all_semisync_conds);
370   PSI_server->register_cond(category, all_semisync_conds, count);
371 }
372 #endif /* HAVE_PSI_INTERFACE */
373 
semi_sync_master_plugin_init(void * p)374 static int semi_sync_master_plugin_init(void *p)
375 {
376 #ifdef HAVE_PSI_INTERFACE
377   init_semisync_psi_keys();
378 #endif
379 
380   if (repl_semisync.initObject())
381     return 1;
382   if (register_trans_observer(&trans_observer, p))
383     return 1;
384   if (register_binlog_storage_observer(&storage_observer, p))
385     return 1;
386   if (register_binlog_transmit_observer(&transmit_observer, p))
387     return 1;
388   return 0;
389 }
390 
semi_sync_master_plugin_deinit(void * p)391 static int semi_sync_master_plugin_deinit(void *p)
392 {
393   if (unregister_trans_observer(&trans_observer, p))
394   {
395     sql_print_error("unregister_trans_observer failed");
396     return 1;
397   }
398   if (unregister_binlog_storage_observer(&storage_observer, p))
399   {
400     sql_print_error("unregister_binlog_storage_observer failed");
401     return 1;
402   }
403   if (unregister_binlog_transmit_observer(&transmit_observer, p))
404   {
405     sql_print_error("unregister_binlog_transmit_observer failed");
406     return 1;
407   }
408   sql_print_information("unregister_replicator OK");
409   return 0;
410 }
411 
412 struct Mysql_replication semi_sync_master_plugin= {
413   MYSQL_REPLICATION_INTERFACE_VERSION
414 };
415 
416 /*
417   Plugin library descriptor
418 */
mysql_declare_plugin(semi_sync_master)419 mysql_declare_plugin(semi_sync_master)
420 {
421   MYSQL_REPLICATION_PLUGIN,
422   &semi_sync_master_plugin,
423   "rpl_semi_sync_master",
424   "He Zhenxing",
425   "Semi-synchronous replication master",
426   PLUGIN_LICENSE_GPL,
427   semi_sync_master_plugin_init, /* Plugin Init */
428   semi_sync_master_plugin_deinit, /* Plugin Deinit */
429   0x0100 /* 1.0 */,
430   semi_sync_master_status_vars,	/* status variables */
431   semi_sync_master_system_vars,	/* system variables */
432   NULL,                         /* config options */
433   0,                            /* flags */
434 }
435 mysql_declare_plugin_end;
436