1 /*
2  * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 #include "ngs/server.h"
27 #include "ngs/interface/client_interface.h"
28 #include "ngs/interface/connection_acceptor_interface.h"
29 #include "ngs/interface/server_task_interface.h"
30 #include "ngs/server_acceptors.h"
31 #include "ngs/scheduler.h"
32 #include "ngs/protocol_monitor.h"
33 #include "ngs/protocol/protocol_config.h"
34 #include "ngs/server_client_timeout.h"
35 #include "ngs_common/connection_vio.h"
36 #include "xpl_log.h"
37 #include "mysqlx_version.h"
38 
39 
40 using namespace ngs;
41 
Server(ngs::shared_ptr<Server_acceptors> acceptors,ngs::shared_ptr<Scheduler_dynamic> accept_scheduler,ngs::shared_ptr<Scheduler_dynamic> work_scheduler,Server_delegate * delegate,ngs::shared_ptr<Protocol_config> config)42 Server::Server(ngs::shared_ptr<Server_acceptors>  acceptors,
43                ngs::shared_ptr<Scheduler_dynamic> accept_scheduler,
44                ngs::shared_ptr<Scheduler_dynamic> work_scheduler,
45                Server_delegate *delegate,
46                ngs::shared_ptr<Protocol_config> config)
47 : m_timer_running(false),
48   m_skip_name_resolve(false),
49   m_errors_while_accepting(0),
50   m_acceptors(acceptors),
51   m_accept_scheduler(accept_scheduler),
52   m_worker_scheduler(work_scheduler),
53   m_config(config),
54   m_state(State_initializing),
55   m_delegate(delegate)
56 {
57 }
58 
prepare(Ssl_context_unique_ptr ssl_context,const bool skip_networking,const bool skip_name_resolve,const bool use_unix_sockets)59 bool Server::prepare(Ssl_context_unique_ptr ssl_context, const bool skip_networking, const bool skip_name_resolve, const bool use_unix_sockets)
60 {
61   Listener_interface::On_connection on_connection = ngs::bind(&Server::on_accept, this, ngs::placeholders::_1);
62 
63   m_skip_name_resolve = skip_name_resolve;
64   m_ssl_context = ngs::move(ssl_context);
65 
66   const bool result = m_acceptors->prepare(on_connection, skip_networking, use_unix_sockets);
67 
68   if (result)
69   {
70     m_state.set(State_running);
71 
72     m_acceptors->add_timer(1000, ngs::bind(&Server::on_check_terminated_workers, this));
73 
74     return true;
75   }
76 
77   return false;
78 }
79 
run_task(ngs::shared_ptr<Server_task_interface> handler)80 void Server::run_task(ngs::shared_ptr<Server_task_interface> handler)
81 {
82   handler->pre_loop();
83 
84   while (m_state.is(State_running))
85   {
86     handler->loop();
87   }
88 
89   handler->post_loop();
90 }
91 
start_failed()92 void Server::start_failed()
93 {
94   m_state.exchange(State_initializing, State_failure);
95   m_acceptors->abort();
96 }
97 
is_running()98 bool Server::is_running()
99 {
100   return m_state.is(State_running) && !m_delegate->is_terminating();
101 }
102 
is_terminating()103 bool Server::is_terminating()
104 {
105   return m_state.is(State_failure) || m_state.is(State_terminating) || m_delegate->is_terminating();
106 }
107 
start()108 void Server::start()
109 {
110   Server_tasks_interfaces handlers = m_acceptors->create_server_tasks_for_listeners();
111   Server_tasks_interfaces::iterator handler_iterator = handlers.begin();
112 
113   if (handler_iterator == handlers.end())
114     return;
115 
116   ngs::shared_ptr<Server_task_interface> handler_to_run_in_current_thread = *(handler_iterator++);
117 
118   while(handlers.end() != handler_iterator)
119   {
120     m_accept_scheduler->post(ngs::bind(&Server::run_task, this, (*handler_iterator)));
121 
122     ++handler_iterator;
123   }
124 
125   run_task(handler_to_run_in_current_thread);
126 }
127 
128 /** Stop the network acceptor loop */
stop(const bool is_called_from_timeout_handler)129 void Server::stop(const bool is_called_from_timeout_handler)
130 {
131   const State allowed_values[] = {State_failure, State_running, State_terminating};
132 
133   m_state.wait_for(allowed_values);
134   if (State_terminating == m_state.set_and_return_old(State_terminating))
135     return;
136 
137   m_acceptors->stop(is_called_from_timeout_handler);
138 
139   close_all_clients();
140 
141   wait_for_clients_closure();
142 
143   if (m_worker_scheduler)
144   {
145     m_worker_scheduler->stop();
146     m_worker_scheduler.reset();
147   }
148 }
149 
150 struct Copy_client_not_closed
151 {
Copy_client_not_closedCopy_client_not_closed152   Copy_client_not_closed(std::vector<ngs::Client_ptr> &client_list)
153   : m_client_list(client_list)
154   {
155   }
156 
operator ()Copy_client_not_closed157   bool operator() (ngs::Client_ptr &client)
158   {
159     if (ngs::Client_interface::Client_closed != client->get_state())
160       m_client_list.push_back(client);
161 
162     // Continue enumerating
163     return false;
164   }
165 
166   std::vector<ngs::Client_ptr> &m_client_list;
167 };
168 
go_through_all_clients(ngs::function<void (Client_ptr)> callback)169 void Server::go_through_all_clients(ngs::function<void (Client_ptr)> callback)
170 {
171   Mutex_lock lock_client_exit(m_client_exit_mutex);
172   std::vector<ngs::Client_ptr> client_list;
173   Copy_client_not_closed matcher(client_list);
174 
175   // Prolong life of clients when there are already in
176   // Closing state. Client::close could access m_client_list
177   // causing a deadlock thus we copied all elements
178   m_client_list.enumerate(matcher);
179 
180   std::for_each(client_list.begin(), client_list.end(), callback);
181 }
182 
close_all_clients()183 void Server::close_all_clients()
184 {
185   go_through_all_clients(ngs::bind(&Client_interface::on_server_shutdown, ngs::placeholders::_1));
186 }
187 
wait_for_clients_closure()188 void Server::wait_for_clients_closure()
189 {
190   size_t num_of_retries = 4 * 5;
191 
192   //TODO: For now lets pull the list, it should be rewriten
193   // after implementation of Client timeout in closing state
194   while (m_client_list.size() > 0)
195   {
196     if (0 == --num_of_retries)
197     {
198       const unsigned int num_of_clients = static_cast<unsigned int>(m_client_list.size());
199 
200       log_error("Detected %u hanging client", num_of_clients);
201       break;
202     }
203     my_sleep(250000); // wait for 0.25s
204   }
205 }
206 
start_client_supervision_timer(const chrono::duration & oldest_object_time_ms)207 void Server::start_client_supervision_timer(const chrono::duration &oldest_object_time_ms)
208 {
209   log_debug("Supervision timer started %i ms", (int)chrono::to_milliseconds(oldest_object_time_ms));
210 
211   m_timer_running = true;
212 
213   m_acceptors->add_timer(static_cast<size_t>(chrono::to_milliseconds(oldest_object_time_ms)),
214             ngs::bind(&Server::timeout_for_clients_validation, this));
215 }
216 
restart_client_supervision_timer()217 void Server::restart_client_supervision_timer()
218 {
219   if (!m_timer_running)
220   {
221     start_client_supervision_timer(get_config()->connect_timeout);
222   }
223 }
224 
timeout_for_clients_validation()225 bool Server::timeout_for_clients_validation()
226 {
227   m_timer_running = false;
228 
229   log_debug("Supervision timeout - started client state verification");
230 
231   const chrono::time_point time_oldest =
232       chrono::now() - get_config()->connect_timeout;
233   const chrono::time_point time_to_release =
234       time_oldest + get_config()->connect_timeout_hysteresis;
235 
236   Server_client_timeout client_validator(time_to_release);
237 
238   go_through_all_clients(ngs::bind(&Server_client_timeout::validate_client_state, &client_validator, ngs::placeholders::_1));
239 
240   if (chrono::is_valid(client_validator.get_oldest_client_accept_time()))
241   {
242     start_client_supervision_timer(client_validator.get_oldest_client_accept_time() - time_oldest);
243   }
244   return false;
245 }
246 
247 
on_accept(Connection_acceptor_interface & connection_acceptor)248 void Server::on_accept(Connection_acceptor_interface &connection_acceptor)
249 {
250   // That means that the event loop was just break in the stop()
251   if (m_state.is(State_terminating))
252     return;
253 
254   Vio *vio = connection_acceptor.accept();
255 
256   if (NULL == vio)
257   {
258     m_delegate->did_reject_client(Server_delegate::AcceptError);
259 
260     if (0 == (m_errors_while_accepting++ & 255))
261     {
262       // error accepting client
263       log_error("Error accepting client");
264     }
265     const time_t microseconds_to_sleep = 100000;
266 
267     my_sleep(microseconds_to_sleep);
268 
269     return;
270   }
271 
272   Connection_ptr connection(ngs::allocate_shared<ngs::Connection_vio>(ngs::ref(*m_ssl_context), vio));
273   ngs::shared_ptr<Client_interface> client(m_delegate->create_client(connection));
274 
275   if (m_delegate->will_accept_client(*client))
276   {
277     m_delegate->did_accept_client(*client);
278 
279     // connection accepted, add to client list and start handshake etc
280     client->reset_accept_time();
281     m_client_list.add(client);
282 
283     Scheduler_dynamic::Task *task = ngs::allocate_object<Scheduler_dynamic::Task>(ngs::bind(&ngs::Client_interface::run, client,
284                     m_skip_name_resolve));
285 
286     const uint64_t client_id = client->client_id_num();
287     client.reset();
288 
289     // all references to client object should be removed at this thread
290     if (!m_worker_scheduler->post(task))
291     {
292       log_error("Internal error scheduling client for execution");
293       ngs::free_object(task);
294       m_client_list.remove(client_id);
295     }
296 
297     restart_client_supervision_timer();
298   }
299   else
300   {
301     m_delegate->did_reject_client(Server_delegate::TooManyConnections);
302     log_warning("Unable to accept connection, disconnecting client");
303   }
304 }
305 
on_check_terminated_workers()306 bool Server::on_check_terminated_workers()
307 {
308   if (m_worker_scheduler)
309   {
310     m_worker_scheduler->join_terminating_workers();
311     return true;
312   }
313   return false;
314 }
315 
create_session(Client_interface & client,Protocol_encoder & proto,int session_id)316 ngs::shared_ptr<Session_interface> Server::create_session(Client_interface &client,
317                                                   Protocol_encoder &proto,
318                                                   int session_id)
319 {
320   if (is_terminating())
321     return ngs::shared_ptr<Session_interface>();
322 
323   return m_delegate->create_session(client, proto, session_id);
324 }
325 
326 
on_client_closed(const Client_interface & client)327 void Server::on_client_closed(const Client_interface &client)
328 {
329   log_debug("%s: on_client_close", client.client_id());
330   m_delegate->on_client_closed(client);
331 
332   m_client_list.remove(client.client_id_num());
333 }
334 
335 
add_authentication_mechanism(const std::string & name,Authentication_handler::create initiator,const bool allowed_only_with_secure_connection)336 void Server::add_authentication_mechanism(const std::string &name,
337                                           Authentication_handler::create initiator,
338                                           const bool allowed_only_with_secure_connection)
339 {
340   Authentication_key key(name, allowed_only_with_secure_connection);
341 
342   m_auth_handlers[key] = initiator;
343 }
344 
get_auth_handler(const std::string & name,Session_interface * session)345 Authentication_handler_ptr Server::get_auth_handler(const std::string &name, Session_interface *session)
346 {
347   Connection_type type = session->client().connection().connection_type();
348   Authentication_key key(name, Connection_type_helper::is_secure_type(type));
349 
350   Auth_handler_map::const_iterator auth_handler = m_auth_handlers.find(key);
351 
352   if (auth_handler == m_auth_handlers.end())
353     return Authentication_handler_ptr();
354 
355   return auth_handler->second(session);
356 }
357 
get_authentication_mechanisms(std::vector<std::string> & auth_mech,Client_interface & client)358 void Server::get_authentication_mechanisms(std::vector<std::string> &auth_mech, Client_interface &client)
359 {
360   const Connection_type type      = client.connection().connection_type();
361   const bool            is_secure = Connection_type_helper::is_secure_type(type);
362 
363   auth_mech.clear();
364 
365   auth_mech.reserve(m_auth_handlers.size());
366 
367   Auth_handler_map::const_iterator i = m_auth_handlers.begin();
368 
369   while (m_auth_handlers.end() != i)
370   {
371     if (i->first.must_be_secure_connection == is_secure)
372       auth_mech.push_back(i->first.name);
373     ++i;
374   }
375 }
376 
add_timer(const std::size_t delay_ms,ngs::function<bool ()> callback)377 void Server::add_timer(const std::size_t delay_ms, ngs::function<bool ()> callback)
378 {
379   m_acceptors->add_timer(delay_ms, callback);
380 }
381