1 /* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16 #include <my_global.h>
17 #include "semisync_master.h"
18 #include "semisync_master_ack_receiver.h"
19
20 #ifdef HAVE_PSI_MUTEX_INTERFACE
21 extern PSI_mutex_key key_LOCK_ack_receiver;
22 extern PSI_cond_key key_COND_ack_receiver;
23 #endif
24 #ifdef HAVE_PSI_THREAD_INTERFACE
25 extern PSI_thread_key key_thread_ack_receiver;
26 #endif
27 extern Repl_semi_sync_master repl_semisync;
28
29 /* Callback function of ack receive thread */
ack_receive_handler(void * arg)30 pthread_handler_t ack_receive_handler(void *arg)
31 {
32 Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
33
34 my_thread_init();
35 recv->run();
36 my_thread_end();
37
38 return NULL;
39 }
40
Ack_receiver()41 Ack_receiver::Ack_receiver()
42 {
43 DBUG_ENTER("Ack_receiver::Ack_receiver");
44
45 m_status= ST_DOWN;
46 mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, NULL);
47 mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
48 m_pid= 0;
49
50 DBUG_VOID_RETURN;
51 }
52
cleanup()53 void Ack_receiver::cleanup()
54 {
55 DBUG_ENTER("Ack_receiver::~Ack_receiver");
56
57 stop();
58 mysql_mutex_destroy(&m_mutex);
59 mysql_cond_destroy(&m_cond);
60
61 DBUG_VOID_RETURN;
62 }
63
start()64 bool Ack_receiver::start()
65 {
66 DBUG_ENTER("Ack_receiver::start");
67
68 mysql_mutex_lock(&m_mutex);
69 if(m_status == ST_DOWN)
70 {
71 pthread_attr_t attr;
72
73 m_status= ST_UP;
74
75 if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
76 pthread_attr_init(&attr) != 0 ||
77 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
78 #ifndef _WIN32
79 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
80 #endif
81 mysql_thread_create(key_thread_ack_receiver, &m_pid,
82 &attr, ack_receive_handler, this))
83 {
84 sql_print_error("Failed to start semi-sync ACK receiver thread, "
85 " could not create thread(errno:%d)", errno);
86
87 m_status= ST_DOWN;
88 mysql_mutex_unlock(&m_mutex);
89
90 DBUG_RETURN(true);
91 }
92 (void) pthread_attr_destroy(&attr);
93 }
94 mysql_mutex_unlock(&m_mutex);
95
96 DBUG_RETURN(false);
97 }
98
stop()99 void Ack_receiver::stop()
100 {
101 DBUG_ENTER("Ack_receiver::stop");
102
103 mysql_mutex_lock(&m_mutex);
104 if (m_status == ST_UP)
105 {
106 m_status= ST_STOPPING;
107 mysql_cond_broadcast(&m_cond);
108
109 while (m_status == ST_STOPPING)
110 mysql_cond_wait(&m_cond, &m_mutex);
111
112 DBUG_ASSERT(m_status == ST_DOWN);
113
114 m_pid= 0;
115 }
116 mysql_mutex_unlock(&m_mutex);
117
118 DBUG_VOID_RETURN;
119 }
120
add_slave(THD * thd)121 bool Ack_receiver::add_slave(THD *thd)
122 {
123 Slave *slave;
124 DBUG_ENTER("Ack_receiver::add_slave");
125
126 if (!(slave= new Slave))
127 DBUG_RETURN(true);
128
129 slave->thd= thd;
130 slave->vio= *thd->net.vio;
131 slave->vio.mysql_socket.m_psi= NULL;
132 slave->vio.read_timeout= 1;
133
134 mysql_mutex_lock(&m_mutex);
135 m_slaves.push_back(slave);
136 m_slaves_changed= true;
137 mysql_cond_broadcast(&m_cond);
138 mysql_mutex_unlock(&m_mutex);
139
140 DBUG_RETURN(false);
141 }
142
remove_slave(THD * thd)143 void Ack_receiver::remove_slave(THD *thd)
144 {
145 I_List_iterator<Slave> it(m_slaves);
146 Slave *slave;
147 DBUG_ENTER("Ack_receiver::remove_slave");
148
149 mysql_mutex_lock(&m_mutex);
150
151 while ((slave= it++))
152 {
153 if (slave->thd == thd)
154 {
155 delete slave;
156 m_slaves_changed= true;
157 break;
158 }
159 }
160 mysql_mutex_unlock(&m_mutex);
161
162 DBUG_VOID_RETURN;
163 }
164
set_stage_info(const PSI_stage_info & stage)165 inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
166 {
167 MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
168 }
169
wait_for_slave_connection()170 inline void Ack_receiver::wait_for_slave_connection()
171 {
172 set_stage_info(stage_waiting_for_semi_sync_slave);
173 mysql_cond_wait(&m_cond, &m_mutex);
174 }
175
176 /* Auxilary function to initialize a NET object with given net buffer. */
init_net(NET * net,unsigned char * buff,unsigned int buff_len)177 static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
178 {
179 memset(net, 0, sizeof(NET));
180 net->max_packet= buff_len;
181 net->buff= buff;
182 net->buff_end= buff + buff_len;
183 net->read_pos= net->buff;
184 }
185
run()186 void Ack_receiver::run()
187 {
188 THD *thd= new THD(next_thread_id());
189 NET net;
190 unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
191
192 my_thread_init();
193
194 DBUG_ENTER("Ack_receiver::run");
195
196 #ifdef HAVE_POLL
197 Poll_socket_listener listener(m_slaves);
198 #else
199 Select_socket_listener listener(m_slaves);
200 #endif //HAVE_POLL
201
202 sql_print_information("Starting ack receiver thread");
203 thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
204 thd->thread_stack= (char*) &thd;
205 thd->store_globals();
206 thd->security_ctx->skip_grants();
207 thread_safe_increment32(&service_thread_count);
208 thd->set_command(COM_DAEMON);
209 init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
210
211 mysql_mutex_lock(&m_mutex);
212 m_slaves_changed= true;
213 mysql_mutex_unlock(&m_mutex);
214
215 while (1)
216 {
217 int ret;
218 uint slave_count __attribute__((unused))= 0;
219 Slave *slave;
220
221 mysql_mutex_lock(&m_mutex);
222 if (unlikely(m_status == ST_STOPPING))
223 goto end;
224
225 set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
226 if (unlikely(m_slaves_changed))
227 {
228 if (unlikely(m_slaves.is_empty()))
229 {
230 wait_for_slave_connection();
231 mysql_mutex_unlock(&m_mutex);
232 continue;
233 }
234
235 if ((slave_count= listener.init_slave_sockets()) == 0)
236 goto end;
237 m_slaves_changed= false;
238 #ifdef HAVE_POLL
239 DBUG_PRINT("info", ("fd count %u", slave_count));
240 #else
241 DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,
242 (int) listener.get_max_fd()));
243 #endif
244 }
245
246 ret= listener.listen_on_sockets();
247 if (ret <= 0)
248 {
249 mysql_mutex_unlock(&m_mutex);
250
251 ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
252
253 if (ret == -1 && errno != EINTR)
254 sql_print_information("Failed to wait on semi-sync sockets, "
255 "error: errno=%d", socket_errno);
256 /* Sleep 1us, so other threads can catch the m_mutex easily. */
257 my_sleep(1);
258 continue;
259 }
260
261 set_stage_info(stage_reading_semi_sync_ack);
262 Slave_ilist_iterator it(m_slaves);
263 while ((slave= it++))
264 {
265 if (listener.is_socket_active(slave))
266 {
267 ulong len;
268
269 net_clear(&net, 0);
270 net.vio= &slave->vio;
271 /*
272 Set compress flag. This is needed to support
273 Slave_compress_protocol flag enabled Slaves
274 */
275 net.compress= slave->thd->net.compress;
276
277 len= my_net_read(&net);
278 if (likely(len != packet_error))
279 repl_semisync_master.report_reply_packet(slave->server_id(),
280 net.read_pos, len);
281 else if (net.last_errno == ER_NET_READ_ERROR)
282 listener.clear_socket_info(slave);
283 }
284 }
285 mysql_mutex_unlock(&m_mutex);
286 }
287 end:
288 sql_print_information("Stopping ack receiver thread");
289 m_status= ST_DOWN;
290 delete thd;
291 thread_safe_decrement32(&service_thread_count);
292 signal_thd_deleted();
293 mysql_cond_broadcast(&m_cond);
294 mysql_mutex_unlock(&m_mutex);
295 DBUG_VOID_RETURN;
296 }
297