1 /*
2 * Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "plugin/x/ngs/include/ngs/socket_acceptors_task.h"
26
27 #include <algorithm>
28 #include <cstdlib>
29 #include <iterator>
30 #include <set>
31
32 #include "my_config.h" // NOLINT(build/include_subdir)
33
34 #include "plugin/x/ngs/include/ngs/log.h"
35 #include "plugin/x/src/helper/string_formatter.h"
36 #include "plugin/x/src/module_mysqlx.h"
37 #include "plugin/x/src/variables/system_variables.h"
38 #include "plugin/x/src/xpl_performance_schema.h"
39
40 extern bool check_address_is_wildcard(const char *address_value,
41 size_t address_length);
42
43 namespace ngs {
44
Socket_acceptors_task(const xpl::iface::Listener_factory & listener_factory,const std::string & multi_bind_address,const uint16_t tcp_port,const uint32_t tcp_port_open_timeout,const std::string & unix_socket_file,const uint32_t backlog,const std::shared_ptr<xpl::iface::Socket_events> & event)45 Socket_acceptors_task::Socket_acceptors_task(
46 const xpl::iface::Listener_factory &listener_factory,
47 const std::string &multi_bind_address, const uint16_t tcp_port,
48 const uint32_t tcp_port_open_timeout, const std::string &unix_socket_file,
49 const uint32_t backlog,
50 const std::shared_ptr<xpl::iface::Socket_events> &event)
51 : m_listener_factory(listener_factory),
52 m_event(event),
53 m_multi_bind_address(multi_bind_address),
54 m_tcp_port(tcp_port),
55 m_tcp_port_open_timeout(tcp_port_open_timeout),
56 m_unix_socket_file(unix_socket_file),
57 m_backlog(backlog),
58 m_time_and_event_state(xpl::iface::Listener::State::k_initializing,
59 KEY_mutex_x_socket_acceptors_sync,
60 KEY_cond_x_socket_acceptors_sync) {}
61
62 namespace {
63
parse_bind_address_value(const char * begin_address_value,std::string * address_value,std::string * network_namespace)64 bool parse_bind_address_value(const char *begin_address_value,
65 std::string *address_value,
66 std::string *network_namespace) {
67 const char *namespace_separator = strchr(begin_address_value, '/');
68
69 if (namespace_separator != nullptr) {
70 if (begin_address_value == namespace_separator)
71 /*
72 Parse error: there is no character before '/',
73 that is missed address value
74 */
75 return true;
76
77 if (*(namespace_separator + 1) == 0)
78 /*
79 Parse error: there is no character immediately after '/',
80 that is missed namespace name.
81 */
82 return true;
83
84 /*
85 Found namespace delimiter. Extract namespace and address values
86 */
87 *address_value = std::string(begin_address_value, namespace_separator);
88 *network_namespace = std::string(namespace_separator + 1);
89 } else {
90 *address_value = begin_address_value;
91 }
92 return false;
93 }
94
trim(std::string * value)95 void trim(std::string *value) {
96 static const char *k_whitespace = " \n\r\t\f\v";
97 const std::size_t first = value->find_first_not_of(k_whitespace);
98 if (first == std::string::npos) {
99 *value = "";
100 return;
101 }
102 const std::size_t last = value->find_last_not_of(k_whitespace);
103 if (last == std::string::npos) {
104 *value = value->substr(first);
105 return;
106 }
107 *value = value->substr(first, last - first + 1);
108 }
109
split(const std::string & value,char delim,std::vector<std::string> * result)110 void split(const std::string &value, char delim,
111 std::vector<std::string> *result) {
112 std::stringstream ss(value);
113 std::string item;
114 while (std::getline(ss, item, delim)) result->push_back(item);
115 }
116
is_address_valid(const std::string & address,const bool is_multi_address,std::string * host,std::string * net_namespace)117 bool is_address_valid(const std::string &address, const bool is_multi_address,
118 std::string *host, std::string *net_namespace) {
119 if (parse_bind_address_value(address.c_str(), host, net_namespace)) {
120 log_error(ER_XPLUGIN_FAILED_TO_VALIDATE_ADDRESS, address.c_str(),
121 "can't be parsed as an address");
122 return false;
123 }
124 const bool is_wildcard =
125 check_address_is_wildcard(host->c_str(), host->length());
126
127 if (!net_namespace->empty() && is_wildcard) {
128 log_error(ER_XPLUGIN_FAILED_TO_VALIDATE_ADDRESS, address.c_str(),
129 "network namespace are not allowed for wildcards");
130 return false;
131 }
132
133 if (is_wildcard && is_multi_address) {
134 log_error(ER_XPLUGIN_FAILED_TO_VALIDATE_ADDRESS, address.c_str(),
135 "wildcards are not allowed when there are more than one address");
136 return false;
137 }
138
139 return true;
140 }
141 } // namespace
142
prepare_listeners()143 void Socket_acceptors_task::prepare_listeners() {
144 const bool skip_networking =
145 xpl::Plugin_system_variables::get_system_variable("skip_networking") ==
146 "ON";
147
148 if (!skip_networking) {
149 std::vector<std::string> addresses;
150 split(m_multi_bind_address, ',', &addresses);
151
152 const bool is_multi_address = addresses.size() > 1;
153 for (auto &address : addresses) {
154 trim(&address);
155 std::string host, net_namespace;
156 if (is_address_valid(address, is_multi_address, &host, &net_namespace))
157 m_tcp_socket.push_back(m_listener_factory.create_tcp_socket_listener(
158 host, net_namespace, m_tcp_port, m_tcp_port_open_timeout,
159 m_event.get(), m_backlog));
160 }
161 }
162
163 if (xpl::Plugin_system_variables::get_system_variable("socket") ==
164 m_unix_socket_file) {
165 log_warning(ER_INVALID_XPLUGIN_SOCKET_SAME_AS_SERVER);
166 } else {
167 #if defined(HAVE_SYS_UN_H)
168 m_unix_socket = m_listener_factory.create_unix_socket_listener(
169 m_unix_socket_file, m_event.get(), m_backlog);
170 #endif
171 }
172 }
173
prepare_impl(xpl::iface::Server_task::Task_context * context)174 bool Socket_acceptors_task::prepare_impl(
175 xpl::iface::Server_task::Task_context *context) {
176 Listener_interfaces listeners = get_array_of_listeners();
177
178 if (listeners.empty()) {
179 log_warning(ER_XPLUGIN_ALL_IO_INTERFACES_DISABLED);
180
181 return false;
182 }
183
184 const size_t number_of_prepared_listeners =
185 std::count_if(listeners.begin(), listeners.end(),
186 [context](xpl::iface::Listener *l) -> bool {
187 return l->setup_listener(context->m_on_connection);
188 });
189
190 if (0 == number_of_prepared_listeners) {
191 stop(Stop_cause::k_server_task_triggered_event);
192 log_error(ER_XPLUGIN_FAILED_TO_PREPARE_IO_INTERFACES);
193
194 return false;
195 }
196
197 return true;
198 }
199
prepare(xpl::iface::Server_task::Task_context * context)200 bool Socket_acceptors_task::prepare(
201 xpl::iface::Server_task::Task_context *context) {
202 prepare_listeners();
203
204 const bool result = prepare_impl(context);
205 Listener_interfaces listeners = get_array_of_listeners();
206 Server_properties properties;
207 std::set<std::string> configuration_variables;
208
209 for (auto &l : listeners) {
210 if (!l->report_status())
211 configuration_variables.insert(l->get_configuration_variable());
212
213 l->report_properties(
214 [&properties](const Server_property_ids id, const std::string &value) {
215 auto &p = properties[id];
216 if (id == Server_property_ids::k_tcp_bind_address) {
217 if (p.empty()) {
218 p = value;
219 return;
220 }
221 if (value == ngs::PROPERTY_NOT_CONFIGURED) return;
222 if (p == ngs::PROPERTY_NOT_CONFIGURED) {
223 p = value;
224 return;
225 }
226 p += "," + value;
227 return;
228 }
229 p = value;
230 });
231 }
232
233 for (const auto &var : configuration_variables)
234 log_info(ER_XPLUGIN_LISTENER_SYS_VARIABLE_ERROR, var.c_str());
235
236 properties[Server_property_ids::k_number_of_interfaces] =
237 std::to_string(listeners.size());
238
239 if (result) show_startup_log(properties);
240 if (context && context->m_properties) context->m_properties->swap(properties);
241 return result;
242 }
243
stop(const Stop_cause cause)244 void Socket_acceptors_task::stop(const Stop_cause cause) {
245 Listener_interfaces listeners = get_array_of_listeners();
246
247 m_event->break_loop();
248
249 for (auto &listener : listeners) listener->close_listener();
250
251 switch (cause) {
252 case Stop_cause::k_abort:
253 m_time_and_event_state.set(xpl::iface::Listener::State::k_stopped);
254 break;
255
256 case Stop_cause::k_normal_shutdown:
257 m_time_and_event_state.wait_for(xpl::iface::Listener::State::k_stopped);
258 break;
259
260 case Stop_cause::k_server_task_triggered_event:
261 break;
262 }
263 }
264
show_startup_log(const Server_properties & properties) const265 void Socket_acceptors_task::show_startup_log(
266 const Server_properties &properties) const {
267 std::string combined_status;
268 Server_properties::const_iterator i =
269 properties.find(Server_property_ids::k_tcp_bind_address);
270 if (i != properties.end()) {
271 const auto &bind_address = i->second;
272 if (!bind_address.empty() && bind_address != ngs::PROPERTY_NOT_CONFIGURED)
273 combined_status += "Bind-address: '" + bind_address +
274 "' port: " + std::to_string(m_tcp_port);
275 }
276 i = properties.find(Server_property_ids::k_unix_socket);
277 if (i != properties.end()) {
278 const auto &unix_socket = i->second;
279 if (!unix_socket.empty() && unix_socket != ngs::PROPERTY_NOT_CONFIGURED)
280 combined_status +=
281 (combined_status.empty() ? "Socket: " : ", socket: ") + unix_socket;
282 }
283 log_system(ER_XPLUGIN_LISTENER_STATUS_MSG, combined_status.c_str());
284 }
285
286 Socket_acceptors_task::Listener_interfaces
get_array_of_listeners()287 Socket_acceptors_task::get_array_of_listeners() {
288 Listener_interfaces result;
289
290 for (auto &s : m_tcp_socket)
291 if (s) result.push_back(s.get());
292
293 if (m_unix_socket) result.push_back(m_unix_socket.get());
294
295 return result;
296 }
297
pre_loop()298 void Socket_acceptors_task::pre_loop() {
299 log_debug("Socket_acceptors_task::pre_loop");
300 m_time_and_event_state.set(xpl::iface::Listener::State::k_running);
301 auto listeners = get_array_of_listeners();
302
303 for (auto &listener : listeners) {
304 listener->pre_loop();
305 listener->set_state(xpl::iface::Listener::State::k_running);
306 }
307 }
308
post_loop()309 void Socket_acceptors_task::post_loop() {
310 log_debug("Socket_acceptors_task::post_loop");
311 auto listeners = get_array_of_listeners();
312
313 m_time_and_event_state.set(xpl::iface::Listener::State::k_stopped);
314
315 for (auto &l : listeners)
316 l->set_state(xpl::iface::Listener::State::k_stopped);
317 }
318
loop()319 void Socket_acceptors_task::loop() { m_event->loop(); }
320
321 } // namespace ngs
322