1 /*
2  * Copyright (c) 2016, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 #include "ngs/server_acceptors.h"
27 #include "ngs_common/connection_vio.h"
28 #include "ngs_common/bind.h"
29 #include "ngs_common/string_formatter.h"
30 #include "ngs/log.h"
31 #include <iterator>
32 #include <stdlib.h>
33 #include <algorithm>
34 
35 
36 using namespace ngs;
37 
38 namespace details
39 {
40 
41 class Server_task_listener: public Server_task_interface
42 {
43 public:
Server_task_listener(Listener_interface & listener)44   Server_task_listener(Listener_interface &listener)
45   : m_listener(listener)
46   {
47   }
48 
pre_loop()49   void pre_loop()
50   {
51     m_listener.get_state().set(State_listener_running);
52   }
53 
post_loop()54   void post_loop()
55   {
56     m_listener.get_state().set(State_listener_stopped);
57   }
58 
loop()59   void loop()
60   {
61     m_listener.loop();
62   }
63 
64 private:
65   Listener_interface &m_listener;
66 };
67 
68 } // namespace details
69 
70 
71 class Server_acceptors::Server_task_time_and_event: public Server_task_interface
72 {
73 public:
Server_task_time_and_event(Socket_events & event,Listener_interface::Sync_variable_state & state)74   Server_task_time_and_event(Socket_events &event, Listener_interface::Sync_variable_state &state)
75   : m_event(event), m_state(state)
76   {
77   }
78 
79   // Handler_interface
pre_loop()80   void pre_loop()
81   {
82     m_state.set(State_listener_running);
83 
84     std::for_each(m_listeners.begin(),
85                   m_listeners.end(),
86                   &Server_task_time_and_event::set_listeners_state<State_listener_running>);
87   }
88 
post_loop()89   void post_loop()
90   {
91     m_state.set(State_listener_stopped);
92 
93     std::for_each(m_listeners.begin(),
94                   m_listeners.end(),
95                   &Server_task_time_and_event::set_listeners_state<State_listener_stopped>);
96   }
97 
loop()98   void loop()
99   {
100     m_event.loop();
101   }
102 
103   // Handle_time_and_event
listener_register(Listener_interface * listener)104   void listener_register(Listener_interface *listener)
105   {
106     m_listeners.push_back(listener);
107   }
108 
109 private:
110 
111   template<State_listener state>
set_listeners_state(Listener_interface * listener)112   static void set_listeners_state(Listener_interface *listener)
113   {
114     listener->get_state().set(state);
115   }
116 
117   Socket_events &m_event;
118   Listener_interface::Sync_variable_state &m_state;
119   Server_acceptors::Listener_interfaces m_listeners;
120 };
121 
122 
Server_acceptors(Listener_factory_interface & listener_factory,const std::string & tcp_bind_address,const uint16 tcp_port,const uint32 tcp_port_open_timeout,const std::string & unix_socket_file,const uint32 backlog)123 Server_acceptors::Server_acceptors(
124     Listener_factory_interface &listener_factory,
125     const std::string &tcp_bind_address,
126     const uint16 tcp_port,
127     const uint32 tcp_port_open_timeout,
128     const std::string &unix_socket_file,
129     const uint32 backlog)
130 : m_bind_address(tcp_bind_address),
131   m_tcp_socket(listener_factory.create_tcp_socket_listener(m_bind_address, tcp_port, tcp_port_open_timeout, m_event, backlog)),
132 #if defined(HAVE_SYS_UN_H)
133   m_unix_socket(listener_factory.create_unix_socket_listener(unix_socket_file, m_event, backlog)),
134 #endif
135   m_time_and_event_state(State_listener_initializing),
136   m_time_and_event_task(ngs::allocate_shared<Server_task_time_and_event>(ngs::ref(m_event), ngs::ref(m_time_and_event_state))),
137   m_prepared(false)
138 {
139 }
140 
prepare_impl(On_connection on_connection,const bool skip_networking,const bool use_unix_sockets)141 bool Server_acceptors::prepare_impl(On_connection on_connection, const bool skip_networking, const bool use_unix_sockets)
142 {
143   if (skip_networking)
144     m_tcp_socket.reset();
145 
146   if (!use_unix_sockets)
147     m_unix_socket.reset();
148 
149   Listener_interfaces listeners = get_array_of_listeners();
150 
151   if (listeners.empty())
152   {
153     log_warning("All I/O interfaces are disabled, X Protocol won't be accessible");
154 
155     return false;
156   }
157 
158   const size_t number_of_prepared_listeners = std::count_if(
159       listeners.begin(),
160       listeners.end(),
161       ngs::bind(&Listener_interface::setup_listener, ngs::placeholders::_1, on_connection));
162 
163   if (0 == number_of_prepared_listeners)
164   {
165     abort();
166 
167     log_error("Preparation of I/O interfaces failed, X Protocol won't be accessible");
168 
169     return false;
170   }
171 
172   return true;
173 }
174 
prepare(On_connection on_connection,const bool skip_networking,const bool use_unix_sockets)175 bool Server_acceptors::prepare(On_connection on_connection, const bool skip_networking, const bool use_unix_sockets)
176 {
177   const bool result = prepare_impl(on_connection, skip_networking, use_unix_sockets);
178 
179   Listener_interfaces listeners = get_array_of_listeners();
180 
181   std::for_each(
182       listeners.begin(),
183       listeners.end(),
184       Server_acceptors::report_listener_status);
185 
186   m_prepared = true;
187 
188   return result;
189 }
190 
abort()191 void Server_acceptors::abort()
192 {
193   Listener_interfaces listeners = get_array_of_listeners();
194 
195   std::for_each(
196       listeners.begin(),
197       listeners.end(),
198       &Server_acceptors::close_listener);
199 
200   m_time_and_event_state.set(State_listener_stopped);
201 
202   std::for_each(
203       listeners.begin(),
204       listeners.end(),
205       Server_acceptors::mark_as_stopped);
206 }
207 
stop(const bool is_called_from_timeout_handler)208 void Server_acceptors::stop(const bool is_called_from_timeout_handler)
209 {
210   Listener_interfaces listeners = get_array_of_listeners();
211 
212   m_event.break_loop();
213 
214   std::for_each(
215       listeners.begin(),
216       listeners.end(),
217       &Server_acceptors::close_listener);
218 
219   if (!is_called_from_timeout_handler)
220     m_time_and_event_state.wait_for(State_listener_stopped);
221 
222   std::for_each(
223       listeners.begin(),
224       listeners.end(),
225       &Server_acceptors::wait_until_stopped);
226 }
227 
is_listener_configured(Listener_interface * listener)228 bool Server_acceptors::is_listener_configured(Listener_interface *listener)
229 {
230   if (NULL == listener)
231     return false;
232 
233   const State_listener allowed_states[] = {State_listener_prepared, State_listener_running};
234 
235   return listener->get_state().is(allowed_states);
236 }
237 
was_unix_socket_configured()238 bool Server_acceptors::was_unix_socket_configured()
239 {
240   return is_listener_configured(m_unix_socket.get());
241 }
242 
was_tcp_server_configured(std::string & bind_address)243 bool Server_acceptors::was_tcp_server_configured(std::string &bind_address)
244 {
245   if (is_listener_configured(m_tcp_socket.get()))
246   {
247     bind_address = m_bind_address;
248     return true;
249   }
250 
251   return false;
252 }
253 
was_prepared() const254 bool Server_acceptors::was_prepared() const
255 {
256   return m_prepared;
257 }
258 
add_timer(const std::size_t delay_ms,ngs::function<bool ()> callback)259 void Server_acceptors::add_timer(const std::size_t delay_ms, ngs::function<bool ()> callback)
260 {
261   m_event.add_timer(delay_ms, callback);
262 }
263 
create_server_tasks_for_listeners()264 Server_tasks_interfaces Server_acceptors::create_server_tasks_for_listeners()
265 {
266   Listener_interfaces listeners = get_array_of_listeners();
267   Server_tasks_interfaces  handlers;
268 
269   handlers.push_back(m_time_and_event_task);
270 
271   for(Listener_interfaces::iterator i = listeners.begin();
272       listeners.end() != i;
273       ++i)
274   {
275     Listener_interface *listener = (*i);
276 
277     if (!listener->get_state().is(State_listener_prepared))
278       continue;
279 
280     if (listener->is_handled_by_socket_event())
281     {
282       m_time_and_event_task->listener_register(listener);
283       continue;
284     }
285 
286     ngs::shared_ptr<Server_task_interface> handler(ngs::allocate_shared<details::Server_task_listener>(ngs::ref(*listener)));
287     handlers.push_back(handler);
288   }
289 
290   return handlers;
291 }
292 
get_array_of_listeners()293 Server_acceptors::Listener_interfaces Server_acceptors::get_array_of_listeners()
294 {
295   Listener_interfaces result;
296 
297   if (m_tcp_socket)
298     result.push_back(m_tcp_socket.get());
299 
300   if (m_unix_socket)
301     result.push_back(m_unix_socket.get());
302 
303   return result;
304 }
305 
wait_until_stopped(Listener_interface * listener)306 void Server_acceptors::wait_until_stopped(Listener_interface *listener)
307 {
308   if (listener->is_handled_by_socket_event())
309     return;
310 
311   listener->get_state().wait_for(State_listener_stopped);
312 }
313 
close_listener(Listener_interface * listener)314 void Server_acceptors::close_listener(Listener_interface *listener)
315 {
316   listener->close_listener();
317 }
318 
mark_as_stopped(Listener_interface * listener)319 void Server_acceptors::mark_as_stopped(Listener_interface *listener)
320 {
321   listener->get_state().set(State_listener_stopped);
322 }
323 
report_listener_status(Listener_interface * listener)324 void Server_acceptors::report_listener_status(Listener_interface *listener)
325 {
326   if (!listener->get_state().is(State_listener_prepared))
327   {
328     log_error("Setup of %s failed, %s",
329               listener->get_name_and_configuration().c_str(),
330               listener->get_last_error().c_str());
331 
332     std::string listener_configuration_variable = ngs::join(listener->get_configuration_variables(),"','");
333 
334     if (!listener_configuration_variable.empty())
335     {
336       log_info("Please see the MySQL documentation for '%s' system variables to fix the error", listener_configuration_variable.c_str());
337     }
338 
339     return;
340   }
341 
342   log_info("X Plugin listens on %s", listener->get_name_and_configuration().c_str());
343 }
344