1 /* Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
2 Use is subject to license terms.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17
18 #include <my_global.h>
19 #include "semisync_slave.h"
20
21 Repl_semi_sync_slave repl_semisync_slave;
22
23 my_bool rpl_semi_sync_slave_enabled= 0;
24
25 char rpl_semi_sync_slave_delay_master;
26 my_bool rpl_semi_sync_slave_status= 0;
27 ulong rpl_semi_sync_slave_trace_level;
28
29 /*
30 indicate whether or not the slave should send a reply to the master.
31
32 This is set to true in repl_semi_slave_read_event if the current
33 event read is the last event of a transaction. And the value is
34 checked in repl_semi_slave_queue_event.
35 */
36 bool semi_sync_need_reply= false;
37 unsigned int rpl_semi_sync_slave_kill_conn_timeout;
38 unsigned long long rpl_semi_sync_slave_send_ack = 0;
39
init_object()40 int Repl_semi_sync_slave::init_object()
41 {
42 int result= 0;
43
44 m_init_done = true;
45
46 /* References to the parameter works after set_options(). */
47 set_slave_enabled(rpl_semi_sync_slave_enabled);
48 set_trace_level(rpl_semi_sync_slave_trace_level);
49 set_delay_master(rpl_semi_sync_slave_delay_master);
50 set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout);
51
52 return result;
53 }
54
slave_read_sync_header(const char * header,unsigned long total_len,int * semi_flags,const char ** payload,unsigned long * payload_len)55 int Repl_semi_sync_slave::slave_read_sync_header(const char *header,
56 unsigned long total_len,
57 int *semi_flags,
58 const char **payload,
59 unsigned long *payload_len)
60 {
61 int read_res = 0;
62 DBUG_ENTER("Repl_semi_sync_slave::slave_read_sync_header");
63
64 if (rpl_semi_sync_slave_status)
65 {
66 if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1)
67 && (unsigned char)(header[0]) == k_packet_magic_num)
68 {
69 semi_sync_need_reply = (header[1] & k_packet_flag_sync);
70 *payload_len = total_len - 2;
71 *payload = header + 2;
72
73 DBUG_PRINT("semisync", ("%s: reply - %d",
74 "Repl_semi_sync_slave::slave_read_sync_header",
75 semi_sync_need_reply));
76
77 if (semi_sync_need_reply)
78 *semi_flags |= SEMI_SYNC_NEED_ACK;
79 if (is_delay_master())
80 *semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC;
81 }
82 else
83 {
84 sql_print_error("Missing magic number for semi-sync packet, packet "
85 "len: %lu", total_len);
86 read_res = -1;
87 }
88 } else {
89 *payload= header;
90 *payload_len= total_len;
91 }
92
93 DBUG_RETURN(read_res);
94 }
95
slave_start(Master_info * mi)96 int Repl_semi_sync_slave::slave_start(Master_info *mi)
97 {
98 bool semi_sync= get_slave_enabled();
99
100 sql_print_information("Slave I/O thread: Start %s replication to\
101 master '%s@%s:%d' in log '%s' at position %lu",
102 semi_sync ? "semi-sync" : "asynchronous",
103 const_cast<char *>(mi->user), mi->host, mi->port,
104 const_cast<char *>(mi->master_log_name),
105 (unsigned long)(mi->master_log_pos));
106
107 if (semi_sync && !rpl_semi_sync_slave_status)
108 rpl_semi_sync_slave_status= 1;
109
110 /*clear the counter*/
111 rpl_semi_sync_slave_send_ack= 0;
112 return 0;
113 }
114
slave_stop(Master_info * mi)115 int Repl_semi_sync_slave::slave_stop(Master_info *mi)
116 {
117 if (rpl_semi_sync_slave_status)
118 rpl_semi_sync_slave_status= 0;
119 if (get_slave_enabled())
120 kill_connection(mi->mysql);
121 return 0;
122 }
123
reset_slave(Master_info * mi)124 int Repl_semi_sync_slave::reset_slave(Master_info *mi)
125 {
126 return 0;
127 }
128
kill_connection(MYSQL * mysql)129 void Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
130 {
131 if (!mysql)
132 return;
133
134 char kill_buffer[30];
135 MYSQL *kill_mysql = NULL;
136 kill_mysql = mysql_init(kill_mysql);
137 mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &m_kill_conn_timeout);
138 mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &m_kill_conn_timeout);
139 mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &m_kill_conn_timeout);
140
141 bool ret= (!mysql_real_connect(kill_mysql, mysql->host,
142 mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0));
143 if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret)
144 {
145 sql_print_information("cannot connect to master to kill slave io_thread's "
146 "connection");
147 mysql_close(kill_mysql);
148 return;
149 }
150 size_t kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu",
151 mysql->thread_id);
152 mysql_real_query(kill_mysql, kill_buffer, (ulong)kill_buffer_length);
153 mysql_close(kill_mysql);
154 }
155
request_transmit(Master_info * mi)156 int Repl_semi_sync_slave::request_transmit(Master_info *mi)
157 {
158 MYSQL *mysql= mi->mysql;
159 MYSQL_RES *res= 0;
160 MYSQL_ROW row;
161 const char *query;
162
163 if (!get_slave_enabled())
164 return 0;
165
166 query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
167 if (mysql_real_query(mysql, query, (ulong)strlen(query)) ||
168 !(res= mysql_store_result(mysql)))
169 {
170 sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql));
171 return 1;
172 }
173
174 row= mysql_fetch_row(res);
175 if (DBUG_EVALUATE_IF("master_not_support_semisync", 1, 0)
176 || !row)
177 {
178 /* Master does not support semi-sync */
179 sql_print_warning("Master server does not support semi-sync, "
180 "fallback to asynchronous replication");
181 rpl_semi_sync_slave_status= 0;
182 mysql_free_result(res);
183 return 0;
184 }
185 mysql_free_result(res);
186
187 /*
188 Tell master dump thread that we want to do semi-sync
189 replication
190 */
191 query= "SET @rpl_semi_sync_slave= 1";
192 if (mysql_real_query(mysql, query, (ulong)strlen(query)))
193 {
194 sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
195 return 1;
196 }
197 mysql_free_result(mysql_store_result(mysql));
198 rpl_semi_sync_slave_status= 1;
199
200 return 0;
201 }
202
slave_reply(Master_info * mi)203 int Repl_semi_sync_slave::slave_reply(Master_info *mi)
204 {
205 MYSQL* mysql= mi->mysql;
206 const char *binlog_filename= const_cast<char *>(mi->master_log_name);
207 my_off_t binlog_filepos= mi->master_log_pos;
208
209 NET *net= &mysql->net;
210 uchar reply_buffer[REPLY_MAGIC_NUM_LEN
211 + REPLY_BINLOG_POS_LEN
212 + REPLY_BINLOG_NAME_LEN];
213 int reply_res = 0;
214 size_t name_len = strlen(binlog_filename);
215
216 DBUG_ENTER("Repl_semi_sync_slave::slave_reply");
217
218 if (rpl_semi_sync_slave_status && semi_sync_need_reply)
219 {
220 /* Prepare the buffer of the reply. */
221 reply_buffer[REPLY_MAGIC_NUM_OFFSET] = k_packet_magic_num;
222 int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
223 memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
224 binlog_filename,
225 name_len + 1 /* including trailing '\0' */);
226
227 DBUG_PRINT("semisync", ("%s: reply (%s, %lu)",
228 "Repl_semi_sync_slave::slave_reply",
229 binlog_filename, (ulong)binlog_filepos));
230
231 net_clear(net, 0);
232 /* Send the reply. */
233 reply_res = my_net_write(net, reply_buffer,
234 name_len + REPLY_BINLOG_NAME_OFFSET);
235 if (!reply_res)
236 {
237 reply_res = DBUG_EVALUATE_IF("semislave_failed_net_flush", 1, net_flush(net));
238 if (reply_res)
239 sql_print_error("Semi-sync slave net_flush() reply failed");
240 rpl_semi_sync_slave_send_ack++;
241 }
242 else
243 {
244 sql_print_error("Semi-sync slave send reply failed: %s (%d)",
245 net->last_error, net->last_errno);
246 }
247 }
248
249 DBUG_RETURN(reply_res);
250 }
251