1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008, 2021, Oracle and/or its affiliates.
3    Copyright (c) 2008, 2021, Oracle and/or its affiliates.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
24 
25 
26 #include "semisync_slave.h"
27 #include <mysql.h>
28 #include <mysqld_error.h>
29 
30 ReplSemiSyncSlave repl_semisync;
31 
32 /*
33   indicate whether or not the slave should send a reply to the master.
34 
35   This is set to true in repl_semi_slave_read_event if the current
36   event read is the last event of a transaction. And the value is
37   checked in repl_semi_slave_queue_event.
38 */
39 bool semi_sync_need_reply= false;
40 
41 C_MODE_START
42 
repl_semi_reset_slave(Binlog_relay_IO_param * param)43 int repl_semi_reset_slave(Binlog_relay_IO_param *param)
44 {
45   // TODO: reset semi-sync slave status here
46   return 0;
47 }
48 
repl_semi_slave_request_dump(Binlog_relay_IO_param * param,uint32 flags)49 int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
50 				 uint32 flags)
51 {
52   MYSQL *mysql= param->mysql;
53   MYSQL_RES *res= 0;
54 #ifndef NDEBUG
55   MYSQL_ROW row= NULL;
56 #endif
57   const char *query;
58   uint mysql_error= 0;
59 
60   if (!repl_semisync.getSlaveEnabled())
61     return 0;
62 
63   /* Check if master server has semi-sync plugin installed */
64   query= "SELECT @@global.rpl_semi_sync_master_enabled";
65   if (mysql_real_query(mysql, query, static_cast<ulong>(strlen(query))) ||
66       !(res= mysql_store_result(mysql)))
67   {
68     mysql_error= mysql_errno(mysql);
69     if (mysql_error != ER_UNKNOWN_SYSTEM_VARIABLE)
70     {
71       sql_print_error("Execution failed on master: %s; error %d", query, mysql_error);
72       return 1;
73     }
74   }
75   else
76   {
77 #ifndef NDEBUG
78     row=
79 #endif
80       mysql_fetch_row(res);
81   }
82 
83   assert(mysql_error == ER_UNKNOWN_SYSTEM_VARIABLE ||
84          strtoul(row[0], 0, 10) == 0 || strtoul(row[0], 0, 10) == 1);
85 
86   if (mysql_error == ER_UNKNOWN_SYSTEM_VARIABLE)
87   {
88     /* Master does not support semi-sync */
89     sql_print_warning("Master server does not support semi-sync, "
90                       "fallback to asynchronous replication");
91     rpl_semi_sync_slave_status= 0;
92     mysql_free_result(res);
93     return 0;
94   }
95   mysql_free_result(res);
96 
97   /*
98     Tell master dump thread that we want to do semi-sync
99     replication
100   */
101   query= "SET @rpl_semi_sync_slave= 1";
102   if (mysql_real_query(mysql, query, static_cast<ulong>(strlen(query))))
103   {
104     sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
105     return 1;
106   }
107   mysql_free_result(mysql_store_result(mysql));
108   rpl_semi_sync_slave_status= 1;
109   return 0;
110 }
111 
repl_semi_slave_read_event(Binlog_relay_IO_param * param,const char * packet,unsigned long len,const char ** event_buf,unsigned long * event_len)112 int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
113 			       const char *packet, unsigned long len,
114 			       const char **event_buf, unsigned long *event_len)
115 {
116   if (rpl_semi_sync_slave_status)
117     return repl_semisync.slaveReadSyncHeader(packet, len,
118 					     &semi_sync_need_reply,
119 					     event_buf, event_len);
120   *event_buf= packet;
121   *event_len= len;
122   return 0;
123 }
124 
repl_semi_slave_queue_event(Binlog_relay_IO_param * param,const char * event_buf,unsigned long event_len,uint32 flags)125 int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
126 				const char *event_buf,
127 				unsigned long event_len,
128 				uint32 flags)
129 {
130   if (rpl_semi_sync_slave_status && semi_sync_need_reply)
131   {
132     /*
133       We deliberately ignore the error in slaveReply, such error
134       should not cause the slave IO thread to stop, and the error
135       messages are already reported.
136     */
137     (void) repl_semisync.slaveReply(param->mysql,
138                                     param->master_log_name,
139                                     param->master_log_pos);
140   }
141   return 0;
142 }
143 
repl_semi_slave_io_start(Binlog_relay_IO_param * param)144 int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
145 {
146   return repl_semisync.slaveStart(param);
147 }
148 
repl_semi_slave_io_end(Binlog_relay_IO_param * param)149 int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
150 {
151   return repl_semisync.slaveStop(param);
152 }
153 
repl_semi_slave_sql_start(Binlog_relay_IO_param * param)154 int repl_semi_slave_sql_start(Binlog_relay_IO_param *param)
155 {
156   return 0;
157 }
158 
repl_semi_slave_sql_stop(Binlog_relay_IO_param * param,bool aborted)159 int repl_semi_slave_sql_stop(Binlog_relay_IO_param *param, bool aborted)
160 {
161   return 0;
162 }
163 
164 C_MODE_END
165 
fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)166 static void fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd,
167 					    SYS_VAR *var,
168 					    void *ptr,
169 					    const void *val)
170 {
171   *(char *)ptr= *(char *)val;
172   repl_semisync.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0);
173   return;
174 }
175 
fix_rpl_semi_sync_trace_level(MYSQL_THD thd,SYS_VAR * var,void * ptr,const void * val)176 static void fix_rpl_semi_sync_trace_level(MYSQL_THD thd,
177 					  SYS_VAR *var,
178 					  void *ptr,
179 					  const void *val)
180 {
181   *(unsigned long *)ptr= *(unsigned long *)val;
182   repl_semisync.setTraceLevel(rpl_semi_sync_slave_trace_level);
183   return;
184 }
185 
186 /* plugin system variables */
187 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_slave_enabled,
188   PLUGIN_VAR_OPCMDARG,
189  "Enable semi-synchronous replication slave (disabled by default). ",
190   NULL,				   // check
191   &fix_rpl_semi_sync_slave_enabled, // update
192   0);
193 
194 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
195   PLUGIN_VAR_OPCMDARG,
196  "The tracing level for semi-sync replication.",
197   NULL,				  // check
198   &fix_rpl_semi_sync_trace_level, // update
199   32, 0, ~0UL, 1);
200 
201 static SYS_VAR* semi_sync_slave_system_vars[]= {
202   MYSQL_SYSVAR(enabled),
203   MYSQL_SYSVAR(trace_level),
204   NULL,
205 };
206 
207 
208 /* plugin status variables */
209 static SHOW_VAR semi_sync_slave_status_vars[]= {
210   {"Rpl_semi_sync_slave_status",
211    (char*) &rpl_semi_sync_slave_status, SHOW_BOOL, SHOW_SCOPE_GLOBAL},
212   {NULL, NULL, SHOW_BOOL, SHOW_SCOPE_GLOBAL},
213 };
214 
215 Binlog_relay_IO_observer relay_io_observer = {
216   sizeof(Binlog_relay_IO_observer), // len
217 
218   repl_semi_slave_io_start,	// start
219   repl_semi_slave_io_end,	// stop
220   repl_semi_slave_sql_start,    // start sql thread
221   repl_semi_slave_sql_stop,     // stop sql thread
222   repl_semi_slave_request_dump,	// request_transmit
223   repl_semi_slave_read_event,	// after_read_event
224   repl_semi_slave_queue_event,	// after_queue_event
225   repl_semi_reset_slave,	// reset
226 };
227 
semi_sync_slave_plugin_init(void * p)228 static int semi_sync_slave_plugin_init(void *p)
229 {
230   if (repl_semisync.initObject())
231     return 1;
232   if (register_binlog_relay_io_observer(&relay_io_observer, p))
233     return 1;
234   return 0;
235 }
236 
semi_sync_slave_plugin_deinit(void * p)237 static int semi_sync_slave_plugin_deinit(void *p)
238 {
239   if (unregister_binlog_relay_io_observer(&relay_io_observer, p))
240     return 1;
241   return 0;
242 }
243 
244 
245 struct Mysql_replication semi_sync_slave_plugin= {
246   MYSQL_REPLICATION_INTERFACE_VERSION
247 };
248 
249 /*
250   Plugin library descriptor
251 */
mysql_declare_plugin(semi_sync_slave)252 mysql_declare_plugin(semi_sync_slave)
253 {
254   MYSQL_REPLICATION_PLUGIN,
255   &semi_sync_slave_plugin,
256   "rpl_semi_sync_slave",
257   "He Zhenxing",
258   "Semi-synchronous replication slave",
259   PLUGIN_LICENSE_GPL,
260   semi_sync_slave_plugin_init, /* Plugin Init */
261   semi_sync_slave_plugin_deinit, /* Plugin Deinit */
262   0x0100 /* 1.0 */,
263   semi_sync_slave_status_vars,	/* status variables */
264   semi_sync_slave_system_vars,	/* system variables */
265   NULL,                         /* config options */
266   0,                            /* flags */
267 }
268 mysql_declare_plugin_end;
269