1 /* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License as published by 5 the Free Software Foundation; version 2 of the License. 6 7 This program is distributed in the hope that it will be useful, 8 but WITHOUT ANY WARRANTY; without even the implied warranty of 9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 10 GNU General Public License for more details. 11 12 You should have received a copy of the GNU General Public License 13 along with this program; if not, write to the Free Software 14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 15 16 #ifndef SEMISYNC_MASTER_ACK_RECEIVER_DEFINED 17 #define SEMISYNC_MASTER_ACK_RECEIVER_DEFINED 18 19 #include "my_global.h" 20 #include "my_pthread.h" 21 #include "sql_class.h" 22 #include "semisync.h" 23 #include <vector> 24 25 struct Slave :public ilink 26 { 27 THD *thd; 28 Vio vio; 29 #ifdef HAVE_POLL 30 uint m_fds_index; 31 #endif sock_fdSlave32 my_socket sock_fd() const { return vio.mysql_socket.fd; } server_idSlave33 uint server_id() const { return thd->variables.server_id; } 34 }; 35 36 typedef I_List<Slave> Slave_ilist; 37 typedef I_List_iterator<Slave> Slave_ilist_iterator; 38 39 /** 40 Ack_receiver is responsible to control ack receive thread and maintain 41 slave information used by ack receive thread. 42 43 There are mainly four operations on ack receive thread: 44 start: start ack receive thread 45 stop: stop ack receive thread 46 add_slave: maintain a new semisync slave's information 47 remove_slave: remove a semisync slave's information 48 */ 49 class Ack_receiver : public Repl_semi_sync_base 50 { 51 public: 52 Ack_receiver(); ~Ack_receiver()53 ~Ack_receiver() {} 54 void cleanup(); 55 /** 56 Notify ack receiver to receive acks on the dump session. 57 58 It adds the given dump thread into the slave list and wakes 59 up ack thread if it is waiting for any slave coming. 60 61 @param[in] thd THD of a dump thread. 62 63 @return it return false if succeeds, otherwise true is returned. 64 */ 65 bool add_slave(THD *thd); 66 67 /** 68 Notify ack receiver not to receive ack on the dump session. 69 70 it removes the given dump thread from slave list. 71 72 @param[in] thd THD of a dump thread. 73 */ 74 void remove_slave(THD *thd); 75 76 /** 77 Start ack receive thread 78 79 @return it return false if succeeds, otherwise true is returned. 80 */ 81 bool start(); 82 83 /** 84 Stop ack receive thread 85 */ 86 void stop(); 87 88 /** 89 The core of ack receive thread. 90 91 It monitors all slaves' sockets and receives acks when they come. 92 */ 93 void run(); 94 set_trace_level(unsigned long trace_level)95 void set_trace_level(unsigned long trace_level) 96 { 97 m_trace_level= trace_level; 98 } 99 private: 100 enum status {ST_UP, ST_DOWN, ST_STOPPING}; 101 uint8 m_status; 102 /* 103 Protect m_status, m_slaves_changed and m_slaves. ack thread and other 104 session may access the variables at the same time. 105 */ 106 mysql_mutex_t m_mutex; 107 mysql_cond_t m_cond; 108 /* If slave list is updated(add or remove). */ 109 bool m_slaves_changed; 110 111 Slave_ilist m_slaves; 112 pthread_t m_pid; 113 114 /* Declare them private, so no one can copy the object. */ 115 Ack_receiver(const Ack_receiver &ack_receiver); 116 Ack_receiver& operator=(const Ack_receiver &ack_receiver); 117 118 void set_stage_info(const PSI_stage_info &stage); 119 void wait_for_slave_connection(); 120 }; 121 122 123 #ifdef HAVE_POLL 124 #include <sys/poll.h> 125 #include <vector> 126 127 class Poll_socket_listener 128 { 129 public: Poll_socket_listener(const Slave_ilist & slaves)130 Poll_socket_listener(const Slave_ilist &slaves) 131 :m_slaves(slaves) 132 { 133 } 134 listen_on_sockets()135 bool listen_on_sockets() 136 { 137 return poll(m_fds.data(), m_fds.size(), 1000 /*1 Second timeout*/); 138 } 139 is_socket_active(const Slave * slave)140 bool is_socket_active(const Slave *slave) 141 { 142 return m_fds[slave->m_fds_index].revents & POLLIN; 143 } 144 clear_socket_info(const Slave * slave)145 void clear_socket_info(const Slave *slave) 146 { 147 m_fds[slave->m_fds_index].fd= -1; 148 m_fds[slave->m_fds_index].events= 0; 149 } 150 init_slave_sockets()151 uint init_slave_sockets() 152 { 153 Slave_ilist_iterator it(const_cast<Slave_ilist&>(m_slaves)); 154 Slave *slave; 155 uint fds_index= 0; 156 157 m_fds.clear(); 158 while ((slave= it++)) 159 { 160 pollfd poll_fd; 161 poll_fd.fd= slave->sock_fd(); 162 poll_fd.events= POLLIN; 163 m_fds.push_back(poll_fd); 164 slave->m_fds_index= fds_index++; 165 } 166 return fds_index; 167 } 168 169 private: 170 const Slave_ilist &m_slaves; 171 std::vector<pollfd> m_fds; 172 }; 173 174 #else //NO POLL 175 176 class Select_socket_listener 177 { 178 public: Select_socket_listener(const Slave_ilist & slaves)179 Select_socket_listener(const Slave_ilist &slaves) 180 :m_slaves(slaves), m_max_fd(INVALID_SOCKET) 181 { 182 } 183 listen_on_sockets()184 bool listen_on_sockets() 185 { 186 /* Reinitialze the fds with active fds before calling select */ 187 m_fds= m_init_fds; 188 struct timeval tv= {1,0}; 189 /* select requires max fd + 1 for the first argument */ 190 return select((int) m_max_fd+1, &m_fds, NULL, NULL, &tv); 191 } 192 is_socket_active(const Slave * slave)193 bool is_socket_active(const Slave *slave) 194 { 195 return FD_ISSET(slave->sock_fd(), &m_fds); 196 } 197 clear_socket_info(const Slave * slave)198 void clear_socket_info(const Slave *slave) 199 { 200 FD_CLR(slave->sock_fd(), &m_init_fds); 201 } 202 init_slave_sockets()203 uint init_slave_sockets() 204 { 205 Slave_ilist_iterator it(const_cast<Slave_ilist&>(m_slaves)); 206 Slave *slave; 207 uint fds_index= 0; 208 209 FD_ZERO(&m_init_fds); 210 while ((slave= it++)) 211 { 212 my_socket socket_id= slave->sock_fd(); 213 m_max_fd= (socket_id > m_max_fd ? socket_id : m_max_fd); 214 #ifndef _WIN32 215 if (socket_id > FD_SETSIZE) 216 { 217 sql_print_error("Semisync slave socket fd is %u. " 218 "select() cannot handle if the socket fd is " 219 "greater than %u (FD_SETSIZE).", socket_id, FD_SETSIZE); 220 return 0; 221 } 222 #endif //_WIN32 223 FD_SET(socket_id, &m_init_fds); 224 fds_index++; 225 } 226 return fds_index; 227 } get_max_fd()228 my_socket get_max_fd() { return m_max_fd; } 229 230 private: 231 const Slave_ilist &m_slaves; 232 my_socket m_max_fd; 233 fd_set m_init_fds; 234 fd_set m_fds; 235 }; 236 237 #endif //HAVE_POLL 238 239 extern Ack_receiver ack_receiver; 240 #endif 241