1 /* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
15 
16 #include <my_global.h>
17 #include "semisync_master.h"
18 #include "semisync_master_ack_receiver.h"
19 
20 #ifdef HAVE_PSI_MUTEX_INTERFACE
21 extern PSI_mutex_key key_LOCK_ack_receiver;
22 extern PSI_cond_key key_COND_ack_receiver;
23 #endif
24 #ifdef HAVE_PSI_THREAD_INTERFACE
25 extern PSI_thread_key key_thread_ack_receiver;
26 #endif
27 extern Repl_semi_sync_master repl_semisync;
28 
29 /* Callback function of ack receive thread */
ack_receive_handler(void * arg)30 pthread_handler_t ack_receive_handler(void *arg)
31 {
32   Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
33 
34   my_thread_init();
35   recv->run();
36   my_thread_end();
37 
38   return NULL;
39 }
40 
Ack_receiver()41 Ack_receiver::Ack_receiver()
42 {
43   DBUG_ENTER("Ack_receiver::Ack_receiver");
44 
45   m_status= ST_DOWN;
46   mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, NULL);
47   mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
48   m_pid= 0;
49 
50   DBUG_VOID_RETURN;
51 }
52 
cleanup()53 void Ack_receiver::cleanup()
54 {
55   DBUG_ENTER("Ack_receiver::~Ack_receiver");
56 
57   stop();
58   mysql_mutex_destroy(&m_mutex);
59   mysql_cond_destroy(&m_cond);
60 
61   DBUG_VOID_RETURN;
62 }
63 
start()64 bool Ack_receiver::start()
65 {
66   DBUG_ENTER("Ack_receiver::start");
67 
68   mysql_mutex_lock(&m_mutex);
69   if(m_status == ST_DOWN)
70   {
71     pthread_attr_t attr;
72 
73     m_status= ST_UP;
74 
75     if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
76         pthread_attr_init(&attr) != 0 ||
77         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
78 #ifndef _WIN32
79         pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
80 #endif
81         mysql_thread_create(key_thread_ack_receiver, &m_pid,
82                             &attr, ack_receive_handler, this))
83     {
84       sql_print_error("Failed to start semi-sync ACK receiver thread, "
85                       " could not create thread(errno:%d)", errno);
86 
87       m_status= ST_DOWN;
88       mysql_mutex_unlock(&m_mutex);
89 
90       DBUG_RETURN(true);
91     }
92     (void) pthread_attr_destroy(&attr);
93   }
94   mysql_mutex_unlock(&m_mutex);
95 
96   DBUG_RETURN(false);
97 }
98 
stop()99 void Ack_receiver::stop()
100 {
101   DBUG_ENTER("Ack_receiver::stop");
102 
103   mysql_mutex_lock(&m_mutex);
104   if (m_status == ST_UP)
105   {
106     m_status= ST_STOPPING;
107     mysql_cond_broadcast(&m_cond);
108 
109     while (m_status == ST_STOPPING)
110       mysql_cond_wait(&m_cond, &m_mutex);
111 
112     DBUG_ASSERT(m_status == ST_DOWN);
113 
114     m_pid= 0;
115   }
116   mysql_mutex_unlock(&m_mutex);
117 
118   DBUG_VOID_RETURN;
119 }
120 
add_slave(THD * thd)121 bool Ack_receiver::add_slave(THD *thd)
122 {
123   Slave *slave;
124   DBUG_ENTER("Ack_receiver::add_slave");
125 
126   if (!(slave= new Slave))
127     DBUG_RETURN(true);
128 
129   slave->thd= thd;
130   slave->vio= *thd->net.vio;
131   slave->vio.mysql_socket.m_psi= NULL;
132   slave->vio.read_timeout= 1;
133 
134   mysql_mutex_lock(&m_mutex);
135   m_slaves.push_back(slave);
136   m_slaves_changed= true;
137   mysql_cond_broadcast(&m_cond);
138   mysql_mutex_unlock(&m_mutex);
139 
140   DBUG_RETURN(false);
141 }
142 
remove_slave(THD * thd)143 void Ack_receiver::remove_slave(THD *thd)
144 {
145   I_List_iterator<Slave> it(m_slaves);
146   Slave *slave;
147   DBUG_ENTER("Ack_receiver::remove_slave");
148 
149   mysql_mutex_lock(&m_mutex);
150 
151   while ((slave= it++))
152   {
153     if (slave->thd == thd)
154     {
155       delete slave;
156       m_slaves_changed= true;
157       break;
158     }
159   }
160   mysql_mutex_unlock(&m_mutex);
161 
162   DBUG_VOID_RETURN;
163 }
164 
set_stage_info(const PSI_stage_info & stage)165 inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
166 {
167   MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
168 }
169 
wait_for_slave_connection()170 inline void Ack_receiver::wait_for_slave_connection()
171 {
172   set_stage_info(stage_waiting_for_semi_sync_slave);
173   mysql_cond_wait(&m_cond, &m_mutex);
174 }
175 
176 /* Auxilary function to initialize a NET object with given net buffer. */
init_net(NET * net,unsigned char * buff,unsigned int buff_len)177 static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
178 {
179   memset(net, 0, sizeof(NET));
180   net->max_packet= buff_len;
181   net->buff= buff;
182   net->buff_end= buff + buff_len;
183   net->read_pos= net->buff;
184 }
185 
run()186 void Ack_receiver::run()
187 {
188   THD *thd= new THD(next_thread_id());
189   NET net;
190   unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
191 
192   my_thread_init();
193 
194   DBUG_ENTER("Ack_receiver::run");
195 
196 #ifdef HAVE_POLL
197   Poll_socket_listener listener(m_slaves);
198 #else
199   Select_socket_listener listener(m_slaves);
200 #endif //HAVE_POLL
201 
202   sql_print_information("Starting ack receiver thread");
203   thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
204   thd->thread_stack= (char*) &thd;
205   thd->store_globals();
206   thd->security_ctx->skip_grants();
207   thread_safe_increment32(&service_thread_count);
208   thd->set_command(COM_DAEMON);
209   init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
210 
211   mysql_mutex_lock(&m_mutex);
212   m_slaves_changed= true;
213   mysql_mutex_unlock(&m_mutex);
214 
215   while (1)
216   {
217     int ret;
218     uint slave_count __attribute__((unused))= 0;
219     Slave *slave;
220 
221     mysql_mutex_lock(&m_mutex);
222     if (unlikely(m_status == ST_STOPPING))
223       goto end;
224 
225     set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
226     if (unlikely(m_slaves_changed))
227     {
228       if (unlikely(m_slaves.is_empty()))
229       {
230         wait_for_slave_connection();
231         mysql_mutex_unlock(&m_mutex);
232         continue;
233       }
234 
235       if ((slave_count= listener.init_slave_sockets()) == 0)
236         goto end;
237       m_slaves_changed= false;
238 #ifdef HAVE_POLL
239       DBUG_PRINT("info", ("fd count %u", slave_count));
240 #else
241       DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,
242                           (int) listener.get_max_fd()));
243 #endif
244     }
245 
246     ret= listener.listen_on_sockets();
247     if (ret <= 0)
248     {
249       mysql_mutex_unlock(&m_mutex);
250 
251       ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
252 
253       if (ret == -1 && errno != EINTR)
254         sql_print_information("Failed to wait on semi-sync sockets, "
255                               "error: errno=%d", socket_errno);
256       /* Sleep 1us, so other threads can catch the m_mutex easily. */
257       my_sleep(1);
258       continue;
259     }
260 
261     set_stage_info(stage_reading_semi_sync_ack);
262     Slave_ilist_iterator it(m_slaves);
263     while ((slave= it++))
264     {
265       if (listener.is_socket_active(slave))
266       {
267         ulong len;
268 
269         net_clear(&net, 0);
270         net.vio= &slave->vio;
271         /*
272           Set compress flag. This is needed to support
273           Slave_compress_protocol flag enabled Slaves
274         */
275         net.compress= slave->thd->net.compress;
276 
277         len= my_net_read(&net);
278         if (likely(len != packet_error))
279           repl_semisync_master.report_reply_packet(slave->server_id(),
280                                                    net.read_pos, len);
281         else if (net.last_errno == ER_NET_READ_ERROR)
282           listener.clear_socket_info(slave);
283       }
284     }
285     mysql_mutex_unlock(&m_mutex);
286   }
287 end:
288   sql_print_information("Stopping ack receiver thread");
289   m_status= ST_DOWN;
290   delete thd;
291   thread_safe_decrement32(&service_thread_count);
292   signal_thd_deleted();
293   mysql_cond_broadcast(&m_cond);
294   mysql_mutex_unlock(&m_mutex);
295   DBUG_VOID_RETURN;
296 }
297