1 /*
2 * Copyright (c) 2016, 2021, Oracle and/or its affiliates.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 * 02110-1301 USA
24 */
25
26 #include "io/xpl_listener_tcp.h"
27 #include "xpl_log.h"
28 #include "mysqlx_version.h"
29 #include "ngs_common/smart_ptr.h"
30 #include "ngs_common/string_formatter.h"
31 #include "ngs_common/operations_factory.h"
32
33 namespace xpl {
34
35 const char *BIND_ALL_ADDRESSES = "*";
36 const char *BIND_IPv4_ADDRESS = "0.0.0.0";
37 const char *BIND_IPv6_ADDRESS = "::";
38
39 class Tcp_creator {
40 public:
Tcp_creator(ngs::Operations_factory_interface & factory)41 Tcp_creator(ngs::Operations_factory_interface &factory)
42 : m_factory(factory),
43 m_system_interface(m_factory.create_system_interface()) {
44
45 }
46
resolve_bind_address(const std::string & bind_address,const unsigned short port,int & error_code,std::string & error_message)47 ngs::shared_ptr<addrinfo> resolve_bind_address(
48 const std::string &bind_address,
49 const unsigned short port,
50 int &error_code,
51 std::string &error_message
52 ) {
53 struct addrinfo *result = NULL;
54 std::string service;
55 std::vector<std::string> bind_addresses;
56 ngs::String_formatter formatter;
57 service = formatter.append(port).get_result();
58
59 bind_addresses.push_back(bind_address);
60
61 if (BIND_ALL_ADDRESSES == bind_address) {
62 bind_addresses.clear();
63 bind_addresses.push_back(BIND_IPv4_ADDRESS);
64
65 if (is_ipv6_avaiable()) {
66 log_info("IPv6 is available");
67 bind_addresses.push_back(BIND_IPv6_ADDRESS);
68 }
69 }
70
71 while (!bind_addresses.empty() && NULL == result) {
72 result = resolve_addr_info(bind_addresses.back(), service);
73
74 bind_addresses.pop_back();
75 }
76
77 if (NULL == result) {
78 error_message = "can't resolve `hostname`";
79
80 return ngs::shared_ptr<addrinfo>();
81 }
82
83 return ngs::shared_ptr<addrinfo>(
84 result,
85 ngs::bind(&ngs::System_interface::freeaddrinfo, m_system_interface, ngs::placeholders::_1));
86 }
87
create_and_bind_socket(ngs::shared_ptr<addrinfo> ai,const uint32 backlog,int & error_code,std::string & error_message)88 ngs::Socket_interface::Shared_ptr create_and_bind_socket(
89 ngs::shared_ptr<addrinfo> ai,
90 const uint32 backlog,
91 int &error_code,
92 std::string &error_message) {
93 addrinfo *used_ai = NULL;
94 std::string errstr;
95
96 ngs::Socket_interface::Shared_ptr result_socket = create_socket_from_addrinfo(
97 ai.get(), KEY_socket_x_tcpip, AF_INET, &used_ai);
98
99 if (NULL == result_socket.get())
100 result_socket = create_socket_from_addrinfo(ai.get(), KEY_socket_x_tcpip, AF_INET6, &used_ai);
101
102 if (NULL == result_socket.get()) {
103 m_system_interface->get_socket_error_and_message(error_code, errstr);
104
105 error_message = ngs::String_formatter()
106 .append("`socket()` failed with error: ").append(errstr)
107 .append("(").append(error_code).append(")")
108 .get_result();
109
110 return ngs::Socket_interface::Shared_ptr();
111 }
112
113 #ifdef IPV6_V6ONLY
114 /*
115 For interoperability with older clients, IPv6 socket should
116 listen on both IPv6 and IPv4 wildcard addresses.
117 Turn off IPV6_V6ONLY option.
118
119 NOTE: this will work starting from Windows Vista only.
120 On Windows XP dual stack is not available, so it will not
121 listen on the corresponding IPv4-address.
122 */
123 if (used_ai->ai_family == AF_INET6) {
124 int option_flag= 0;
125
126 if (result_socket->set_socket_opt(
127 IPPROTO_IPV6, IPV6_V6ONLY,
128 (char *) &option_flag, sizeof (option_flag))) {
129 log_error("Failed to reset IPV6_V6ONLY flag (error: %d). "
130 "The server will listen to IPv6 addresses only.",
131 (int) socket_errno);
132 }
133 }
134 #endif
135
136 error_code = 0;
137
138 {
139 int one = 1;
140 if(result_socket->set_socket_opt(SOL_SOCKET, SO_REUSEADDR, (const char*)&one, sizeof(one))) {
141 log_error("Failed to set SO_REUSEADDR flag (error: %d). ",
142 (int) m_system_interface->get_socket_errno());
143 }
144 }
145
146 result_socket->set_socket_thread_owner();
147
148 if (result_socket->bind((const struct sockaddr *)used_ai->ai_addr, used_ai->ai_addrlen) < 0) {
149 // lets decide later if its an error or not
150 m_system_interface->get_socket_error_and_message(error_code, errstr);
151
152 error_message = ngs::String_formatter()
153 .append("`bind()` failed with error: ").append(errstr)
154 .append(" (").append(error_code).append("). Do you already have another mysqld server running with Mysqlx ?")
155 .get_result();
156
157 return ngs::Socket_interface::Shared_ptr();
158 }
159
160 if (result_socket->listen(backlog) < 0) {
161 // lets decide later if its an error or not
162 m_system_interface->get_socket_error_and_message(error_code, errstr);
163
164 error_message = ngs::String_formatter()
165 .append("`listen()` failed with error: ").append(errstr)
166 .append("(").append(error_code).append(")")
167 .get_result();
168
169 return ngs::Socket_interface::Shared_ptr();
170 }
171
172 m_used_address.resize(200, '\0');
173
174 if (vio_getnameinfo((const struct sockaddr*)used_ai->ai_addr, &m_used_address[0], m_used_address.length(), NULL, 0, NI_NUMERICHOST)) {
175 m_used_address[0] = '\0';
176 }
177
178 m_used_address.resize(strlen(m_used_address.c_str()));
179
180 return result_socket;
181 }
get_used_address()182 std::string get_used_address() {
183 return m_used_address;
184 }
185
186 private:
create_socket_from_addrinfo(addrinfo * ai,PSI_socket_key psi_key,const int family,addrinfo ** used_ai)187 ngs::Socket_interface::Shared_ptr create_socket_from_addrinfo(
188 addrinfo *ai,
189 PSI_socket_key psi_key,
190 const int family,
191 addrinfo **used_ai) {
192 for(addrinfo *cur_ai = ai;
193 NULL != cur_ai;
194 cur_ai = cur_ai->ai_next) {
195 if (family != cur_ai->ai_family)
196 continue;
197
198 ngs::Socket_interface::Shared_ptr result = m_factory.create_socket(
199 psi_key, family, SOCK_STREAM, 0);
200
201 if (INVALID_SOCKET != result->get_socket_fd()) {
202 *used_ai = cur_ai;
203 return result;
204 }
205 }
206
207 return ngs::Socket_interface::Shared_ptr();
208 }
209
is_ipv6_avaiable()210 bool is_ipv6_avaiable() {
211 ngs::Socket_interface::Shared_ptr socket(m_factory.create_socket(PSI_NOT_INSTRUMENTED, AF_INET6, SOCK_STREAM, 0));
212 const bool has_ipv6 = INVALID_SOCKET != socket->get_socket_fd();
213
214 return has_ipv6;
215 }
216
resolve_addr_info(const std::string & address,const std::string service)217 struct addrinfo *resolve_addr_info(const std::string &address, const std::string service) {
218 struct addrinfo hints;
219 struct addrinfo *ai = NULL;
220 memset(&hints, 0, sizeof(hints));
221 hints.ai_flags= AI_PASSIVE;
222 hints.ai_socktype= SOCK_STREAM;
223 hints.ai_family= AF_UNSPEC;
224
225 if (!m_system_interface->getaddrinfo(address.c_str(), service.c_str(), &hints, &ai)) {
226 return ai;
227 }
228
229 return NULL;
230 }
231
232 std::string m_used_address;
233 ngs::Operations_factory_interface &m_factory;
234 ngs::System_interface::Shared_ptr m_system_interface;
235 };
236
Listener_tcp(Factory_ptr operations_factory,std::string & bind_address,const uint16 port,const uint32 port_open_timeout,ngs::Socket_events_interface & event,const uint32 backlog)237 Listener_tcp::Listener_tcp(Factory_ptr operations_factory,
238 std::string &bind_address,
239 const uint16 port,
240 const uint32 port_open_timeout,
241 ngs::Socket_events_interface &event,
242 const uint32 backlog)
243 : m_operations_factory(operations_factory),
244 m_state(ngs::State_listener_initializing),
245 m_bind_address(bind_address),
246 m_port(port),
247 m_port_open_timeout(port_open_timeout),
248 m_backlog(backlog),
249 m_event(event) {
250 }
251
~Listener_tcp()252 Listener_tcp::~Listener_tcp() {
253 // close_listener() can be called multiple times
254 close_listener();
255 }
256
get_state()257 Listener_tcp::Sync_variable_state &Listener_tcp::get_state() {
258 return m_state;
259 }
260
is_handled_by_socket_event()261 bool Listener_tcp::is_handled_by_socket_event() {
262 return true;
263 }
264
get_last_error()265 std::string Listener_tcp::get_last_error() {
266 return m_last_error;
267 }
268
get_name_and_configuration() const269 std::string Listener_tcp::get_name_and_configuration() const {
270 return ngs::String_formatter()
271 .append("TCP (bind-address:'").append(m_bind_address).append("', ")
272 .append("port:").append(m_port)
273 .append(")")
274 .get_result();
275 }
276
get_configuration_variables() const277 std::vector<std::string> Listener_tcp::get_configuration_variables() const
278 {
279 std::vector<std::string> result;
280
281 result.push_back(MYSQLX_SYSTEM_VARIABLE_PREFIX("port"));
282 result.push_back(MYSQLX_SYSTEM_VARIABLE_PREFIX("bind_address"));
283
284 return result;
285 }
286
setup_listener(On_connection on_connection)287 bool Listener_tcp::setup_listener(On_connection on_connection) {
288 if (!m_state.is(ngs::State_listener_initializing))
289 return false;
290
291 m_tcp_socket = create_socket();
292
293 // create_socket in case of invalid socket or setup failure
294 // is going to return NULL
295 if (NULL == m_tcp_socket.get())
296 return false;
297
298 if (m_event.listen(m_tcp_socket, on_connection)) {
299 m_state.set(ngs::State_listener_prepared);
300 return true;
301 }
302
303 m_last_error = "event dispatcher couldn't register socket";
304 m_tcp_socket.reset();
305
306 return false;
307 }
308
close_listener()309 void Listener_tcp::close_listener() {
310 // ngs::Socket_interface::close can be called multiple times
311 // it invalidates the content of m_mysql_socket thus at next call
312 // it does nothing
313 //
314 // Same applies to close_listener()
315 m_state.set(ngs::State_listener_stopped);
316
317 if (m_tcp_socket)
318 m_tcp_socket->close();
319 }
320
loop()321 void Listener_tcp::loop() {
322 }
323
create_socket()324 ngs::Socket_interface::Shared_ptr Listener_tcp::create_socket() {
325 Tcp_creator creator(*m_operations_factory);
326 int error_code;
327
328 ngs::Socket_interface::Shared_ptr result_socket;
329 ngs::System_interface::Shared_ptr system_interface(m_operations_factory->create_system_interface());
330
331 log_debug("TCP Sockets address is '%s' and port is %i", m_bind_address.c_str(), (int)m_port);
332
333 ngs::shared_ptr<addrinfo> ai = creator.resolve_bind_address(
334 m_bind_address,
335 m_port,
336 error_code,
337 m_last_error);
338
339 if (NULL == ai.get())
340 return ngs::Socket_interface::Shared_ptr();
341
342 for(uint32 waited = 0, retry = 1;
343 waited <= m_port_open_timeout;
344 ++retry) {
345 result_socket = creator.create_and_bind_socket(
346 ai,
347 m_backlog,
348 error_code,
349 m_last_error);
350
351 // Success, lets break the loop
352 // `create_and_bind_socket` in case of invalid socket/failure
353 // returns empty pointer
354 if (NULL != result_socket.get()) {
355 m_bind_address = creator.get_used_address();
356 break;
357 }
358
359 // Critical failure, lets break the loop
360 if (SOCKET_EADDRINUSE != system_interface->get_socket_errno())
361 break;
362
363 log_info("Retrying `bind()` on TCP/IP port %i", (int)m_port);
364
365 const int time_to_wait = retry * retry / 3 + 1;
366 system_interface->sleep(time_to_wait);
367
368 waited += time_to_wait;
369 }
370
371 return result_socket;
372 }
373
374 } // namespace xpl
375