1 /*
2  * Copyright (c) 2016, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 #include "io/xpl_listener_tcp.h"
27 #include "xpl_log.h"
28 #include "mysqlx_version.h"
29 #include "ngs_common/smart_ptr.h"
30 #include "ngs_common/string_formatter.h"
31 #include "ngs_common/operations_factory.h"
32 
33 namespace xpl {
34 
35 const char *BIND_ALL_ADDRESSES = "*";
36 const char *BIND_IPv4_ADDRESS = "0.0.0.0";
37 const char *BIND_IPv6_ADDRESS = "::";
38 
39 class Tcp_creator {
40 public:
Tcp_creator(ngs::Operations_factory_interface & factory)41   Tcp_creator(ngs::Operations_factory_interface &factory)
42   : m_factory(factory),
43     m_system_interface(m_factory.create_system_interface()) {
44 
45   }
46 
resolve_bind_address(const std::string & bind_address,const unsigned short port,int & error_code,std::string & error_message)47   ngs::shared_ptr<addrinfo> resolve_bind_address(
48       const std::string &bind_address,
49       const unsigned short port,
50       int &error_code,
51       std::string &error_message
52       ) {
53     struct addrinfo *result = NULL;
54     std::string  service;
55     std::vector<std::string> bind_addresses;
56     ngs::String_formatter formatter;
57     service = formatter.append(port).get_result();
58 
59     bind_addresses.push_back(bind_address);
60 
61     if (BIND_ALL_ADDRESSES == bind_address) {
62       bind_addresses.clear();
63       bind_addresses.push_back(BIND_IPv4_ADDRESS);
64 
65       if (is_ipv6_avaiable()) {
66         log_info("IPv6 is available");
67         bind_addresses.push_back(BIND_IPv6_ADDRESS);
68       }
69     }
70 
71     while (!bind_addresses.empty() && NULL == result) {
72       result = resolve_addr_info(bind_addresses.back(), service);
73 
74       bind_addresses.pop_back();
75     }
76 
77     if (NULL == result) {
78       error_message = "can't resolve `hostname`";
79 
80       return ngs::shared_ptr<addrinfo>();
81     }
82 
83     return ngs::shared_ptr<addrinfo>(
84         result,
85         ngs::bind(&ngs::System_interface::freeaddrinfo, m_system_interface, ngs::placeholders::_1));
86   }
87 
create_and_bind_socket(ngs::shared_ptr<addrinfo> ai,const uint32 backlog,int & error_code,std::string & error_message)88   ngs::Socket_interface::Shared_ptr create_and_bind_socket(
89       ngs::shared_ptr<addrinfo> ai,
90       const uint32 backlog,
91       int &error_code,
92       std::string &error_message) {
93     addrinfo *used_ai = NULL;
94     std::string errstr;
95 
96     ngs::Socket_interface::Shared_ptr result_socket = create_socket_from_addrinfo(
97         ai.get(), KEY_socket_x_tcpip, AF_INET, &used_ai);
98 
99     if (NULL == result_socket.get())
100       result_socket = create_socket_from_addrinfo(ai.get(), KEY_socket_x_tcpip, AF_INET6, &used_ai);
101 
102     if (NULL == result_socket.get()) {
103       m_system_interface->get_socket_error_and_message(error_code, errstr);
104 
105       error_message = ngs::String_formatter()
106           .append("`socket()` failed with error: ").append(errstr)
107           .append("(").append(error_code).append(")")
108           .get_result();
109 
110       return ngs::Socket_interface::Shared_ptr();
111     }
112 
113   #ifdef IPV6_V6ONLY
114        /*
115          For interoperability with older clients, IPv6 socket should
116          listen on both IPv6 and IPv4 wildcard addresses.
117          Turn off IPV6_V6ONLY option.
118 
119          NOTE: this will work starting from Windows Vista only.
120          On Windows XP dual stack is not available, so it will not
121          listen on the corresponding IPv4-address.
122        */
123       if (used_ai->ai_family == AF_INET6) {
124         int option_flag= 0;
125 
126         if (result_socket->set_socket_opt(
127               IPPROTO_IPV6, IPV6_V6ONLY,
128               (char *) &option_flag, sizeof (option_flag))) {
129           log_error("Failed to reset IPV6_V6ONLY flag (error: %d). "
130                     "The server will listen to IPv6 addresses only.",
131                     (int) socket_errno);
132         }
133       }
134   #endif
135 
136     error_code = 0;
137 
138     {
139       int one = 1;
140       if(result_socket->set_socket_opt(SOL_SOCKET, SO_REUSEADDR, (const char*)&one, sizeof(one))) {
141           log_error("Failed to set SO_REUSEADDR flag (error: %d). ",
142                     (int) m_system_interface->get_socket_errno());
143       }
144     }
145 
146     result_socket->set_socket_thread_owner();
147 
148     if (result_socket->bind((const struct sockaddr *)used_ai->ai_addr, used_ai->ai_addrlen) < 0) {
149       // lets decide later if its an error or not
150       m_system_interface->get_socket_error_and_message(error_code, errstr);
151 
152       error_message = ngs::String_formatter()
153           .append("`bind()` failed with error: ").append(errstr)
154           .append(" (").append(error_code).append("). Do you already have another mysqld server running with Mysqlx ?")
155           .get_result();
156 
157       return ngs::Socket_interface::Shared_ptr();
158     }
159 
160     if (result_socket->listen(backlog) < 0) {
161       // lets decide later if its an error or not
162       m_system_interface->get_socket_error_and_message(error_code, errstr);
163 
164       error_message = ngs::String_formatter()
165           .append("`listen()` failed with error: ").append(errstr)
166           .append("(").append(error_code).append(")")
167           .get_result();
168 
169       return ngs::Socket_interface::Shared_ptr();
170     }
171 
172     m_used_address.resize(200, '\0');
173 
174     if (vio_getnameinfo((const struct sockaddr*)used_ai->ai_addr, &m_used_address[0], m_used_address.length(), NULL, 0, NI_NUMERICHOST)) {
175       m_used_address[0] = '\0';
176     }
177 
178     m_used_address.resize(strlen(m_used_address.c_str()));
179 
180     return result_socket;
181   }
get_used_address()182   std::string get_used_address() {
183     return m_used_address;
184   }
185 
186 private:
create_socket_from_addrinfo(addrinfo * ai,PSI_socket_key psi_key,const int family,addrinfo ** used_ai)187   ngs::Socket_interface::Shared_ptr create_socket_from_addrinfo(
188       addrinfo *ai,
189       PSI_socket_key psi_key,
190       const int family,
191       addrinfo **used_ai) {
192     for(addrinfo *cur_ai = ai;
193         NULL != cur_ai;
194         cur_ai = cur_ai->ai_next) {
195       if (family != cur_ai->ai_family)
196         continue;
197 
198       ngs::Socket_interface::Shared_ptr result = m_factory.create_socket(
199           psi_key, family, SOCK_STREAM, 0);
200 
201       if (INVALID_SOCKET != result->get_socket_fd()) {
202         *used_ai = cur_ai;
203         return result;
204       }
205     }
206 
207     return ngs::Socket_interface::Shared_ptr();
208   }
209 
is_ipv6_avaiable()210   bool is_ipv6_avaiable() {
211     ngs::Socket_interface::Shared_ptr socket(m_factory.create_socket(PSI_NOT_INSTRUMENTED, AF_INET6, SOCK_STREAM, 0));
212     const bool has_ipv6 = INVALID_SOCKET != socket->get_socket_fd();
213 
214     return has_ipv6;
215   }
216 
resolve_addr_info(const std::string & address,const std::string service)217   struct addrinfo *resolve_addr_info(const std::string &address, const std::string service) {
218     struct addrinfo hints;
219     struct addrinfo *ai = NULL;
220     memset(&hints, 0, sizeof(hints));
221     hints.ai_flags= AI_PASSIVE;
222     hints.ai_socktype= SOCK_STREAM;
223     hints.ai_family= AF_UNSPEC;
224 
225     if (!m_system_interface->getaddrinfo(address.c_str(), service.c_str(), &hints, &ai)) {
226       return ai;
227     }
228 
229     return NULL;
230   }
231 
232   std::string m_used_address;
233   ngs::Operations_factory_interface &m_factory;
234   ngs::System_interface::Shared_ptr m_system_interface;
235 };
236 
Listener_tcp(Factory_ptr operations_factory,std::string & bind_address,const uint16 port,const uint32 port_open_timeout,ngs::Socket_events_interface & event,const uint32 backlog)237 Listener_tcp::Listener_tcp(Factory_ptr operations_factory,
238                            std::string &bind_address,
239                            const uint16 port,
240                            const uint32 port_open_timeout,
241                            ngs::Socket_events_interface &event,
242                            const uint32 backlog)
243 : m_operations_factory(operations_factory),
244   m_state(ngs::State_listener_initializing),
245   m_bind_address(bind_address),
246   m_port(port),
247   m_port_open_timeout(port_open_timeout),
248   m_backlog(backlog),
249   m_event(event) {
250 }
251 
~Listener_tcp()252 Listener_tcp::~Listener_tcp() {
253   // close_listener() can be called multiple times
254   close_listener();
255 }
256 
get_state()257 Listener_tcp::Sync_variable_state &Listener_tcp::get_state() {
258   return m_state;
259 }
260 
is_handled_by_socket_event()261 bool Listener_tcp::is_handled_by_socket_event() {
262   return true;
263 }
264 
get_last_error()265 std::string Listener_tcp::get_last_error() {
266   return m_last_error;
267 }
268 
get_name_and_configuration() const269 std::string Listener_tcp::get_name_and_configuration() const {
270   return ngs::String_formatter()
271     .append("TCP (bind-address:'").append(m_bind_address).append("', ")
272     .append("port:").append(m_port)
273     .append(")")
274     .get_result();
275 }
276 
get_configuration_variables() const277 std::vector<std::string> Listener_tcp::get_configuration_variables() const
278 {
279   std::vector<std::string> result;
280 
281   result.push_back(MYSQLX_SYSTEM_VARIABLE_PREFIX("port"));
282   result.push_back(MYSQLX_SYSTEM_VARIABLE_PREFIX("bind_address"));
283 
284   return result;
285 }
286 
setup_listener(On_connection on_connection)287 bool Listener_tcp::setup_listener(On_connection on_connection) {
288   if (!m_state.is(ngs::State_listener_initializing))
289     return false;
290 
291   m_tcp_socket = create_socket();
292 
293   // create_socket in case of invalid socket or setup failure
294   // is going to return NULL
295   if (NULL == m_tcp_socket.get())
296     return false;
297 
298   if (m_event.listen(m_tcp_socket, on_connection)) {
299     m_state.set(ngs::State_listener_prepared);
300     return true;
301   }
302 
303   m_last_error = "event dispatcher couldn't register socket";
304   m_tcp_socket.reset();
305 
306   return false;
307 }
308 
close_listener()309 void Listener_tcp::close_listener() {
310   // ngs::Socket_interface::close can be called multiple times
311   // it invalidates the content of m_mysql_socket thus at next call
312   // it does nothing
313   //
314   // Same applies to close_listener()
315   m_state.set(ngs::State_listener_stopped);
316 
317   if (m_tcp_socket)
318     m_tcp_socket->close();
319 }
320 
loop()321 void Listener_tcp::loop() {
322 }
323 
create_socket()324 ngs::Socket_interface::Shared_ptr Listener_tcp::create_socket() {
325   Tcp_creator creator(*m_operations_factory);
326   int         error_code;
327 
328   ngs::Socket_interface::Shared_ptr result_socket;
329   ngs::System_interface::Shared_ptr system_interface(m_operations_factory->create_system_interface());
330 
331   log_debug("TCP Sockets address is '%s' and port is %i", m_bind_address.c_str(), (int)m_port);
332 
333   ngs::shared_ptr<addrinfo> ai = creator.resolve_bind_address(
334       m_bind_address,
335       m_port,
336       error_code,
337       m_last_error);
338 
339   if (NULL == ai.get())
340     return ngs::Socket_interface::Shared_ptr();
341 
342   for(uint32 waited = 0, retry = 1;
343       waited <= m_port_open_timeout;
344       ++retry) {
345     result_socket = creator.create_and_bind_socket(
346         ai,
347         m_backlog,
348         error_code,
349         m_last_error);
350 
351     // Success, lets break the loop
352     // `create_and_bind_socket` in case of invalid socket/failure
353     //  returns empty pointer
354     if (NULL != result_socket.get()) {
355       m_bind_address = creator.get_used_address();
356       break;
357     }
358 
359     // Critical failure, lets break the loop
360     if (SOCKET_EADDRINUSE != system_interface->get_socket_errno())
361       break;
362 
363     log_info("Retrying `bind()` on TCP/IP port %i", (int)m_port);
364 
365     const int time_to_wait = retry * retry / 3 + 1;
366     system_interface->sleep(time_to_wait);
367 
368     waited += time_to_wait;
369   }
370 
371   return result_socket;
372 }
373 
374 } // namespace xpl
375