1 /* Copyright (C) 2007 Google Inc.
2 Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3 Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17
18
19 #include "semisync_master.h"
20 #include "sql_class.h" // THD
21
22 ReplSemiSyncMaster repl_semisync;
23
24 C_MODE_START
25
repl_semi_report_binlog_update(Binlog_storage_param * param,const char * log_file,my_off_t log_pos,uint32 flags)26 int repl_semi_report_binlog_update(Binlog_storage_param *param,
27 const char *log_file,
28 my_off_t log_pos, uint32 flags)
29 {
30 int error= 0;
31
32 if (repl_semisync.getMasterEnabled())
33 {
34 /*
35 Let us store the binlog file name and the position, so that
36 we know how long to wait for the binlog to the replicated to
37 the slave in synchronous replication.
38 */
39 error= repl_semisync.writeTranxInBinlog(log_file,
40 log_pos);
41 }
42
43 return error;
44 }
45
repl_semi_request_commit(Trans_param * param)46 int repl_semi_request_commit(Trans_param *param)
47 {
48 return 0;
49 }
50
repl_semi_report_commit(Trans_param * param)51 int repl_semi_report_commit(Trans_param *param)
52 {
53
54 bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
55
56 if (is_real_trans && param->log_pos)
57 {
58 const char *binlog_name= param->log_file;
59 return repl_semisync.commitTrx(binlog_name, param->log_pos);
60 }
61 return 0;
62 }
63
repl_semi_report_rollback(Trans_param * param)64 int repl_semi_report_rollback(Trans_param *param)
65 {
66 return repl_semi_report_commit(param);
67 }
68
repl_semi_binlog_dump_start(Binlog_transmit_param * param,const char * log_file,my_off_t log_pos)69 int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
70 const char *log_file,
71 my_off_t log_pos)
72 {
73 bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
74
75 if (semi_sync_slave)
76 {
77 /* One more semi-sync slave */
78 repl_semisync.add_slave();
79
80 /*
81 Let's assume this semi-sync slave has already received all
82 binlog events before the filename and position it requests.
83 */
84 repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
85 }
86 sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
87 semi_sync_slave ? "semi-sync" : "asynchronous",
88 param->server_id, log_file, (unsigned long)log_pos);
89
90 return 0;
91 }
92
repl_semi_binlog_dump_end(Binlog_transmit_param * param)93 int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
94 {
95 bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
96
97 sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
98 semi_sync_slave ? "semi-sync" : "asynchronous",
99 param->server_id);
100 if (semi_sync_slave)
101 {
102 /* One less semi-sync slave */
103 repl_semisync.remove_slave();
104 }
105 return 0;
106 }
107
repl_semi_reserve_header(Binlog_transmit_param * param,unsigned char * header,unsigned long size,unsigned long * len)108 int repl_semi_reserve_header(Binlog_transmit_param *param,
109 unsigned char *header,
110 unsigned long size, unsigned long *len)
111 {
112 *len += repl_semisync.reserveSyncHeader(header, size);
113 return 0;
114 }
115
repl_semi_before_send_event(Binlog_transmit_param * param,unsigned char * packet,unsigned long len,const char * log_file,my_off_t log_pos)116 int repl_semi_before_send_event(Binlog_transmit_param *param,
117 unsigned char *packet, unsigned long len,
118 const char *log_file, my_off_t log_pos)
119 {
120 return repl_semisync.updateSyncHeader(packet,
121 log_file,
122 log_pos,
123 param->server_id);
124 }
125
repl_semi_after_send_event(Binlog_transmit_param * param,const char * event_buf,unsigned long len)126 int repl_semi_after_send_event(Binlog_transmit_param *param,
127 const char *event_buf, unsigned long len)
128 {
129 if (repl_semisync.is_semi_sync_slave())
130 {
131 THD *thd= current_thd;
132 /*
133 Possible errors in reading slave reply are ignored deliberately
134 because we do not want dump thread to quit on this. Error
135 messages are already reported.
136 */
137 (void) repl_semisync.readSlaveReply(&thd->net,
138 param->server_id, event_buf);
139 thd->clear_error();
140 }
141 return 0;
142 }
143
repl_semi_reset_master(Binlog_transmit_param * param)144 int repl_semi_reset_master(Binlog_transmit_param *param)
145 {
146 if (repl_semisync.resetMaster())
147 return 1;
148 return 0;
149 }
150
151 C_MODE_END
152
153 /*
154 semisync system variables
155 */
156 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
157 SYS_VAR *var,
158 void *ptr,
159 const void *val);
160
161 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
162 SYS_VAR *var,
163 void *ptr,
164 const void *val);
165
166 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
167 SYS_VAR *var,
168 void *ptr,
169 const void *val);
170
171 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
172 PLUGIN_VAR_OPCMDARG,
173 "Enable semi-synchronous replication master (disabled by default). ",
174 NULL, // check
175 &fix_rpl_semi_sync_master_enabled, // update
176 0);
177
178 static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
179 PLUGIN_VAR_OPCMDARG,
180 "The timeout value (in ms) for semi-synchronous replication in the master",
181 NULL, // check
182 fix_rpl_semi_sync_master_timeout, // update
183 10000, 0, ~0UL, 1);
184
185 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
186 PLUGIN_VAR_OPCMDARG,
187 "Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
188 NULL, // check
189 NULL, // update
190 1);
191
192 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
193 PLUGIN_VAR_OPCMDARG,
194 "The tracing level for semi-sync replication.",
195 NULL, // check
196 &fix_rpl_semi_sync_master_trace_level, // update
197 32, 0, ~0UL, 1);
198
199 static SYS_VAR* semi_sync_master_system_vars[]= {
200 MYSQL_SYSVAR(enabled),
201 MYSQL_SYSVAR(timeout),
202 MYSQL_SYSVAR(wait_no_slave),
203 MYSQL_SYSVAR(trace_level),
204 NULL,
205 };
206
207
fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)208 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
209 SYS_VAR *var,
210 void *ptr,
211 const void *val)
212 {
213 *(unsigned long *)ptr= *(unsigned long *)val;
214 repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
215 return;
216 }
217
fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)218 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
219 SYS_VAR *var,
220 void *ptr,
221 const void *val)
222 {
223 *(unsigned long *)ptr= *(unsigned long *)val;
224 repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
225 return;
226 }
227
fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)228 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
229 SYS_VAR *var,
230 void *ptr,
231 const void *val)
232 {
233 *(char *)ptr= *(char *)val;
234 if (rpl_semi_sync_master_enabled)
235 {
236 if (repl_semisync.enableMaster() != 0)
237 rpl_semi_sync_master_enabled = false;
238 }
239 else
240 {
241 if (repl_semisync.disableMaster() != 0)
242 rpl_semi_sync_master_enabled = true;
243 }
244
245 return;
246 }
247
248 Trans_observer trans_observer = {
249 sizeof(Trans_observer), // len
250
251 repl_semi_report_commit, // after_commit
252 repl_semi_report_rollback, // after_rollback
253 };
254
255 Binlog_storage_observer storage_observer = {
256 sizeof(Binlog_storage_observer), // len
257
258 repl_semi_report_binlog_update, // report_update
259 };
260
261 Binlog_transmit_observer transmit_observer = {
262 sizeof(Binlog_transmit_observer), // len
263
264 repl_semi_binlog_dump_start, // start
265 repl_semi_binlog_dump_end, // stop
266 repl_semi_reserve_header, // reserve_header
267 repl_semi_before_send_event, // before_send_event
268 repl_semi_after_send_event, // after_send_event
269 repl_semi_reset_master, // reset
270 };
271
272
273 #define SHOW_FNAME(name) \
274 rpl_semi_sync_master_show_##name
275
276 #define DEF_SHOW_FUNC(name, show_type) \
277 static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
278 { \
279 repl_semisync.setExportStats(); \
280 var->type= show_type; \
281 var->value= (char *)&rpl_semi_sync_master_##name; \
282 return 0; \
283 }
284
285 DEF_SHOW_FUNC(status, SHOW_BOOL)
286 DEF_SHOW_FUNC(clients, SHOW_LONG)
287 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
288 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
289 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
290 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
291 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
292 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
293 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
294
295
296 /* plugin status variables */
297 static SHOW_VAR semi_sync_master_status_vars[]= {
298 {"Rpl_semi_sync_master_status",
299 (char*) &SHOW_FNAME(status),
300 SHOW_FUNC},
301 {"Rpl_semi_sync_master_clients",
302 (char*) &SHOW_FNAME(clients),
303 SHOW_FUNC},
304 {"Rpl_semi_sync_master_yes_tx",
305 (char*) &rpl_semi_sync_master_yes_transactions,
306 SHOW_LONG},
307 {"Rpl_semi_sync_master_no_tx",
308 (char*) &rpl_semi_sync_master_no_transactions,
309 SHOW_LONG},
310 {"Rpl_semi_sync_master_wait_sessions",
311 (char*) &SHOW_FNAME(wait_sessions),
312 SHOW_FUNC},
313 {"Rpl_semi_sync_master_no_times",
314 (char*) &rpl_semi_sync_master_off_times,
315 SHOW_LONG},
316 {"Rpl_semi_sync_master_timefunc_failures",
317 (char*) &rpl_semi_sync_master_timefunc_fails,
318 SHOW_LONG},
319 {"Rpl_semi_sync_master_wait_pos_backtraverse",
320 (char*) &rpl_semi_sync_master_wait_pos_backtraverse,
321 SHOW_LONG},
322 {"Rpl_semi_sync_master_tx_wait_time",
323 (char*) &SHOW_FNAME(trx_wait_time),
324 SHOW_FUNC},
325 {"Rpl_semi_sync_master_tx_waits",
326 (char*) &SHOW_FNAME(trx_wait_num),
327 SHOW_FUNC},
328 {"Rpl_semi_sync_master_tx_avg_wait_time",
329 (char*) &SHOW_FNAME(avg_trx_wait_time),
330 SHOW_FUNC},
331 {"Rpl_semi_sync_master_net_wait_time",
332 (char*) &SHOW_FNAME(net_wait_time),
333 SHOW_FUNC},
334 {"Rpl_semi_sync_master_net_waits",
335 (char*) &SHOW_FNAME(net_wait_num),
336 SHOW_FUNC},
337 {"Rpl_semi_sync_master_net_avg_wait_time",
338 (char*) &SHOW_FNAME(avg_net_wait_time),
339 SHOW_FUNC},
340 {NULL, NULL, SHOW_LONG},
341 };
342
343 #ifdef HAVE_PSI_INTERFACE
344 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
345
346 static PSI_mutex_info all_semisync_mutexes[]=
347 {
348 { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
349 };
350
351 PSI_cond_key key_ss_cond_COND_binlog_send_;
352
353 static PSI_cond_info all_semisync_conds[]=
354 {
355 { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
356 };
357
init_semisync_psi_keys(void)358 static void init_semisync_psi_keys(void)
359 {
360 const char* category= "semisync";
361 int count;
362
363 if (PSI_server == NULL)
364 return;
365
366 count= array_elements(all_semisync_mutexes);
367 PSI_server->register_mutex(category, all_semisync_mutexes, count);
368
369 count= array_elements(all_semisync_conds);
370 PSI_server->register_cond(category, all_semisync_conds, count);
371 }
372 #endif /* HAVE_PSI_INTERFACE */
373
semi_sync_master_plugin_init(void * p)374 static int semi_sync_master_plugin_init(void *p)
375 {
376 #ifdef HAVE_PSI_INTERFACE
377 init_semisync_psi_keys();
378 #endif
379
380 if (repl_semisync.initObject())
381 return 1;
382 if (register_trans_observer(&trans_observer, p))
383 return 1;
384 if (register_binlog_storage_observer(&storage_observer, p))
385 return 1;
386 if (register_binlog_transmit_observer(&transmit_observer, p))
387 return 1;
388 return 0;
389 }
390
semi_sync_master_plugin_deinit(void * p)391 static int semi_sync_master_plugin_deinit(void *p)
392 {
393 if (unregister_trans_observer(&trans_observer, p))
394 {
395 sql_print_error("unregister_trans_observer failed");
396 return 1;
397 }
398 if (unregister_binlog_storage_observer(&storage_observer, p))
399 {
400 sql_print_error("unregister_binlog_storage_observer failed");
401 return 1;
402 }
403 if (unregister_binlog_transmit_observer(&transmit_observer, p))
404 {
405 sql_print_error("unregister_binlog_transmit_observer failed");
406 return 1;
407 }
408 sql_print_information("unregister_replicator OK");
409 return 0;
410 }
411
412 struct Mysql_replication semi_sync_master_plugin= {
413 MYSQL_REPLICATION_INTERFACE_VERSION
414 };
415
416 /*
417 Plugin library descriptor
418 */
mysql_declare_plugin(semi_sync_master)419 mysql_declare_plugin(semi_sync_master)
420 {
421 MYSQL_REPLICATION_PLUGIN,
422 &semi_sync_master_plugin,
423 "rpl_semi_sync_master",
424 "He Zhenxing",
425 "Semi-synchronous replication master",
426 PLUGIN_LICENSE_GPL,
427 semi_sync_master_plugin_init, /* Plugin Init */
428 semi_sync_master_plugin_deinit, /* Plugin Deinit */
429 0x0100 /* 1.0 */,
430 semi_sync_master_status_vars, /* status variables */
431 semi_sync_master_system_vars, /* system variables */
432 NULL, /* config options */
433 0, /* flags */
434 }
435 mysql_declare_plugin_end;
436