1 /*
2 * Copyright (c) 2016, 2021, Oracle and/or its affiliates.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 * 02110-1301 USA
24 */
25
26 #include "ngs/server_acceptors.h"
27 #include "ngs_common/connection_vio.h"
28 #include "ngs_common/bind.h"
29 #include "ngs_common/string_formatter.h"
30 #include "ngs/log.h"
31 #include <iterator>
32 #include <stdlib.h>
33 #include <algorithm>
34
35
36 using namespace ngs;
37
38 namespace details
39 {
40
41 class Server_task_listener: public Server_task_interface
42 {
43 public:
Server_task_listener(Listener_interface & listener)44 Server_task_listener(Listener_interface &listener)
45 : m_listener(listener)
46 {
47 }
48
pre_loop()49 void pre_loop()
50 {
51 m_listener.get_state().set(State_listener_running);
52 }
53
post_loop()54 void post_loop()
55 {
56 m_listener.get_state().set(State_listener_stopped);
57 }
58
loop()59 void loop()
60 {
61 m_listener.loop();
62 }
63
64 private:
65 Listener_interface &m_listener;
66 };
67
68 } // namespace details
69
70
71 class Server_acceptors::Server_task_time_and_event: public Server_task_interface
72 {
73 public:
Server_task_time_and_event(Socket_events & event,Listener_interface::Sync_variable_state & state)74 Server_task_time_and_event(Socket_events &event, Listener_interface::Sync_variable_state &state)
75 : m_event(event), m_state(state)
76 {
77 }
78
79 // Handler_interface
pre_loop()80 void pre_loop()
81 {
82 m_state.set(State_listener_running);
83
84 std::for_each(m_listeners.begin(),
85 m_listeners.end(),
86 &Server_task_time_and_event::set_listeners_state<State_listener_running>);
87 }
88
post_loop()89 void post_loop()
90 {
91 m_state.set(State_listener_stopped);
92
93 std::for_each(m_listeners.begin(),
94 m_listeners.end(),
95 &Server_task_time_and_event::set_listeners_state<State_listener_stopped>);
96 }
97
loop()98 void loop()
99 {
100 m_event.loop();
101 }
102
103 // Handle_time_and_event
listener_register(Listener_interface * listener)104 void listener_register(Listener_interface *listener)
105 {
106 m_listeners.push_back(listener);
107 }
108
109 private:
110
111 template<State_listener state>
set_listeners_state(Listener_interface * listener)112 static void set_listeners_state(Listener_interface *listener)
113 {
114 listener->get_state().set(state);
115 }
116
117 Socket_events &m_event;
118 Listener_interface::Sync_variable_state &m_state;
119 Server_acceptors::Listener_interfaces m_listeners;
120 };
121
122
Server_acceptors(Listener_factory_interface & listener_factory,const std::string & tcp_bind_address,const uint16 tcp_port,const uint32 tcp_port_open_timeout,const std::string & unix_socket_file,const uint32 backlog)123 Server_acceptors::Server_acceptors(
124 Listener_factory_interface &listener_factory,
125 const std::string &tcp_bind_address,
126 const uint16 tcp_port,
127 const uint32 tcp_port_open_timeout,
128 const std::string &unix_socket_file,
129 const uint32 backlog)
130 : m_bind_address(tcp_bind_address),
131 m_tcp_socket(listener_factory.create_tcp_socket_listener(m_bind_address, tcp_port, tcp_port_open_timeout, m_event, backlog)),
132 #if defined(HAVE_SYS_UN_H)
133 m_unix_socket(listener_factory.create_unix_socket_listener(unix_socket_file, m_event, backlog)),
134 #endif
135 m_time_and_event_state(State_listener_initializing),
136 m_time_and_event_task(ngs::allocate_shared<Server_task_time_and_event>(ngs::ref(m_event), ngs::ref(m_time_and_event_state))),
137 m_prepared(false)
138 {
139 }
140
prepare_impl(On_connection on_connection,const bool skip_networking,const bool use_unix_sockets)141 bool Server_acceptors::prepare_impl(On_connection on_connection, const bool skip_networking, const bool use_unix_sockets)
142 {
143 if (skip_networking)
144 m_tcp_socket.reset();
145
146 if (!use_unix_sockets)
147 m_unix_socket.reset();
148
149 Listener_interfaces listeners = get_array_of_listeners();
150
151 if (listeners.empty())
152 {
153 log_warning("All I/O interfaces are disabled, X Protocol won't be accessible");
154
155 return false;
156 }
157
158 const size_t number_of_prepared_listeners = std::count_if(
159 listeners.begin(),
160 listeners.end(),
161 ngs::bind(&Listener_interface::setup_listener, ngs::placeholders::_1, on_connection));
162
163 if (0 == number_of_prepared_listeners)
164 {
165 abort();
166
167 log_error("Preparation of I/O interfaces failed, X Protocol won't be accessible");
168
169 return false;
170 }
171
172 return true;
173 }
174
prepare(On_connection on_connection,const bool skip_networking,const bool use_unix_sockets)175 bool Server_acceptors::prepare(On_connection on_connection, const bool skip_networking, const bool use_unix_sockets)
176 {
177 const bool result = prepare_impl(on_connection, skip_networking, use_unix_sockets);
178
179 Listener_interfaces listeners = get_array_of_listeners();
180
181 std::for_each(
182 listeners.begin(),
183 listeners.end(),
184 Server_acceptors::report_listener_status);
185
186 m_prepared = true;
187
188 return result;
189 }
190
abort()191 void Server_acceptors::abort()
192 {
193 Listener_interfaces listeners = get_array_of_listeners();
194
195 std::for_each(
196 listeners.begin(),
197 listeners.end(),
198 &Server_acceptors::close_listener);
199
200 m_time_and_event_state.set(State_listener_stopped);
201
202 std::for_each(
203 listeners.begin(),
204 listeners.end(),
205 Server_acceptors::mark_as_stopped);
206 }
207
stop(const bool is_called_from_timeout_handler)208 void Server_acceptors::stop(const bool is_called_from_timeout_handler)
209 {
210 Listener_interfaces listeners = get_array_of_listeners();
211
212 m_event.break_loop();
213
214 std::for_each(
215 listeners.begin(),
216 listeners.end(),
217 &Server_acceptors::close_listener);
218
219 if (!is_called_from_timeout_handler)
220 m_time_and_event_state.wait_for(State_listener_stopped);
221
222 std::for_each(
223 listeners.begin(),
224 listeners.end(),
225 &Server_acceptors::wait_until_stopped);
226 }
227
is_listener_configured(Listener_interface * listener)228 bool Server_acceptors::is_listener_configured(Listener_interface *listener)
229 {
230 if (NULL == listener)
231 return false;
232
233 const State_listener allowed_states[] = {State_listener_prepared, State_listener_running};
234
235 return listener->get_state().is(allowed_states);
236 }
237
was_unix_socket_configured()238 bool Server_acceptors::was_unix_socket_configured()
239 {
240 return is_listener_configured(m_unix_socket.get());
241 }
242
was_tcp_server_configured(std::string & bind_address)243 bool Server_acceptors::was_tcp_server_configured(std::string &bind_address)
244 {
245 if (is_listener_configured(m_tcp_socket.get()))
246 {
247 bind_address = m_bind_address;
248 return true;
249 }
250
251 return false;
252 }
253
was_prepared() const254 bool Server_acceptors::was_prepared() const
255 {
256 return m_prepared;
257 }
258
add_timer(const std::size_t delay_ms,ngs::function<bool ()> callback)259 void Server_acceptors::add_timer(const std::size_t delay_ms, ngs::function<bool ()> callback)
260 {
261 m_event.add_timer(delay_ms, callback);
262 }
263
create_server_tasks_for_listeners()264 Server_tasks_interfaces Server_acceptors::create_server_tasks_for_listeners()
265 {
266 Listener_interfaces listeners = get_array_of_listeners();
267 Server_tasks_interfaces handlers;
268
269 handlers.push_back(m_time_and_event_task);
270
271 for(Listener_interfaces::iterator i = listeners.begin();
272 listeners.end() != i;
273 ++i)
274 {
275 Listener_interface *listener = (*i);
276
277 if (!listener->get_state().is(State_listener_prepared))
278 continue;
279
280 if (listener->is_handled_by_socket_event())
281 {
282 m_time_and_event_task->listener_register(listener);
283 continue;
284 }
285
286 ngs::shared_ptr<Server_task_interface> handler(ngs::allocate_shared<details::Server_task_listener>(ngs::ref(*listener)));
287 handlers.push_back(handler);
288 }
289
290 return handlers;
291 }
292
get_array_of_listeners()293 Server_acceptors::Listener_interfaces Server_acceptors::get_array_of_listeners()
294 {
295 Listener_interfaces result;
296
297 if (m_tcp_socket)
298 result.push_back(m_tcp_socket.get());
299
300 if (m_unix_socket)
301 result.push_back(m_unix_socket.get());
302
303 return result;
304 }
305
wait_until_stopped(Listener_interface * listener)306 void Server_acceptors::wait_until_stopped(Listener_interface *listener)
307 {
308 if (listener->is_handled_by_socket_event())
309 return;
310
311 listener->get_state().wait_for(State_listener_stopped);
312 }
313
close_listener(Listener_interface * listener)314 void Server_acceptors::close_listener(Listener_interface *listener)
315 {
316 listener->close_listener();
317 }
318
mark_as_stopped(Listener_interface * listener)319 void Server_acceptors::mark_as_stopped(Listener_interface *listener)
320 {
321 listener->get_state().set(State_listener_stopped);
322 }
323
report_listener_status(Listener_interface * listener)324 void Server_acceptors::report_listener_status(Listener_interface *listener)
325 {
326 if (!listener->get_state().is(State_listener_prepared))
327 {
328 log_error("Setup of %s failed, %s",
329 listener->get_name_and_configuration().c_str(),
330 listener->get_last_error().c_str());
331
332 std::string listener_configuration_variable = ngs::join(listener->get_configuration_variables(),"','");
333
334 if (!listener_configuration_variable.empty())
335 {
336 log_info("Please see the MySQL documentation for '%s' system variables to fix the error", listener_configuration_variable.c_str());
337 }
338
339 return;
340 }
341
342 log_info("X Plugin listens on %s", listener->get_name_and_configuration().c_str());
343 }
344