1 /*
2 * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 * 02110-1301 USA
24 */
25
26 #include "ngs/server.h"
27 #include "ngs/interface/client_interface.h"
28 #include "ngs/interface/connection_acceptor_interface.h"
29 #include "ngs/interface/server_task_interface.h"
30 #include "ngs/server_acceptors.h"
31 #include "ngs/scheduler.h"
32 #include "ngs/protocol_monitor.h"
33 #include "ngs/protocol/protocol_config.h"
34 #include "ngs/server_client_timeout.h"
35 #include "ngs_common/connection_vio.h"
36 #include "xpl_log.h"
37 #include "mysqlx_version.h"
38
39
40 using namespace ngs;
41
Server(ngs::shared_ptr<Server_acceptors> acceptors,ngs::shared_ptr<Scheduler_dynamic> accept_scheduler,ngs::shared_ptr<Scheduler_dynamic> work_scheduler,Server_delegate * delegate,ngs::shared_ptr<Protocol_config> config)42 Server::Server(ngs::shared_ptr<Server_acceptors> acceptors,
43 ngs::shared_ptr<Scheduler_dynamic> accept_scheduler,
44 ngs::shared_ptr<Scheduler_dynamic> work_scheduler,
45 Server_delegate *delegate,
46 ngs::shared_ptr<Protocol_config> config)
47 : m_timer_running(false),
48 m_skip_name_resolve(false),
49 m_errors_while_accepting(0),
50 m_acceptors(acceptors),
51 m_accept_scheduler(accept_scheduler),
52 m_worker_scheduler(work_scheduler),
53 m_config(config),
54 m_state(State_initializing),
55 m_delegate(delegate)
56 {
57 }
58
prepare(Ssl_context_unique_ptr ssl_context,const bool skip_networking,const bool skip_name_resolve,const bool use_unix_sockets)59 bool Server::prepare(Ssl_context_unique_ptr ssl_context, const bool skip_networking, const bool skip_name_resolve, const bool use_unix_sockets)
60 {
61 Listener_interface::On_connection on_connection = ngs::bind(&Server::on_accept, this, ngs::placeholders::_1);
62
63 m_skip_name_resolve = skip_name_resolve;
64 m_ssl_context = ngs::move(ssl_context);
65
66 const bool result = m_acceptors->prepare(on_connection, skip_networking, use_unix_sockets);
67
68 if (result)
69 {
70 m_state.set(State_running);
71
72 m_acceptors->add_timer(1000, ngs::bind(&Server::on_check_terminated_workers, this));
73
74 return true;
75 }
76
77 return false;
78 }
79
run_task(ngs::shared_ptr<Server_task_interface> handler)80 void Server::run_task(ngs::shared_ptr<Server_task_interface> handler)
81 {
82 handler->pre_loop();
83
84 while (m_state.is(State_running))
85 {
86 handler->loop();
87 }
88
89 handler->post_loop();
90 }
91
start_failed()92 void Server::start_failed()
93 {
94 m_state.exchange(State_initializing, State_failure);
95 m_acceptors->abort();
96 }
97
is_running()98 bool Server::is_running()
99 {
100 return m_state.is(State_running) && !m_delegate->is_terminating();
101 }
102
is_terminating()103 bool Server::is_terminating()
104 {
105 return m_state.is(State_failure) || m_state.is(State_terminating) || m_delegate->is_terminating();
106 }
107
start()108 void Server::start()
109 {
110 Server_tasks_interfaces handlers = m_acceptors->create_server_tasks_for_listeners();
111 Server_tasks_interfaces::iterator handler_iterator = handlers.begin();
112
113 if (handler_iterator == handlers.end())
114 return;
115
116 ngs::shared_ptr<Server_task_interface> handler_to_run_in_current_thread = *(handler_iterator++);
117
118 while(handlers.end() != handler_iterator)
119 {
120 m_accept_scheduler->post(ngs::bind(&Server::run_task, this, (*handler_iterator)));
121
122 ++handler_iterator;
123 }
124
125 run_task(handler_to_run_in_current_thread);
126 }
127
128 /** Stop the network acceptor loop */
stop(const bool is_called_from_timeout_handler)129 void Server::stop(const bool is_called_from_timeout_handler)
130 {
131 const State allowed_values[] = {State_failure, State_running, State_terminating};
132
133 m_state.wait_for(allowed_values);
134 if (State_terminating == m_state.set_and_return_old(State_terminating))
135 return;
136
137 m_acceptors->stop(is_called_from_timeout_handler);
138
139 close_all_clients();
140
141 wait_for_clients_closure();
142
143 if (m_worker_scheduler)
144 {
145 m_worker_scheduler->stop();
146 m_worker_scheduler.reset();
147 }
148 }
149
150 struct Copy_client_not_closed
151 {
Copy_client_not_closedCopy_client_not_closed152 Copy_client_not_closed(std::vector<ngs::Client_ptr> &client_list)
153 : m_client_list(client_list)
154 {
155 }
156
operator ()Copy_client_not_closed157 bool operator() (ngs::Client_ptr &client)
158 {
159 if (ngs::Client_interface::Client_closed != client->get_state())
160 m_client_list.push_back(client);
161
162 // Continue enumerating
163 return false;
164 }
165
166 std::vector<ngs::Client_ptr> &m_client_list;
167 };
168
go_through_all_clients(ngs::function<void (Client_ptr)> callback)169 void Server::go_through_all_clients(ngs::function<void (Client_ptr)> callback)
170 {
171 Mutex_lock lock_client_exit(m_client_exit_mutex);
172 std::vector<ngs::Client_ptr> client_list;
173 Copy_client_not_closed matcher(client_list);
174
175 // Prolong life of clients when there are already in
176 // Closing state. Client::close could access m_client_list
177 // causing a deadlock thus we copied all elements
178 m_client_list.enumerate(matcher);
179
180 std::for_each(client_list.begin(), client_list.end(), callback);
181 }
182
close_all_clients()183 void Server::close_all_clients()
184 {
185 go_through_all_clients(ngs::bind(&Client_interface::on_server_shutdown, ngs::placeholders::_1));
186 }
187
wait_for_clients_closure()188 void Server::wait_for_clients_closure()
189 {
190 size_t num_of_retries = 4 * 5;
191
192 //TODO: For now lets pull the list, it should be rewriten
193 // after implementation of Client timeout in closing state
194 while (m_client_list.size() > 0)
195 {
196 if (0 == --num_of_retries)
197 {
198 const unsigned int num_of_clients = static_cast<unsigned int>(m_client_list.size());
199
200 log_error("Detected %u hanging client", num_of_clients);
201 break;
202 }
203 my_sleep(250000); // wait for 0.25s
204 }
205 }
206
start_client_supervision_timer(const chrono::duration & oldest_object_time_ms)207 void Server::start_client_supervision_timer(const chrono::duration &oldest_object_time_ms)
208 {
209 log_debug("Supervision timer started %i ms", (int)chrono::to_milliseconds(oldest_object_time_ms));
210
211 m_timer_running = true;
212
213 m_acceptors->add_timer(static_cast<size_t>(chrono::to_milliseconds(oldest_object_time_ms)),
214 ngs::bind(&Server::timeout_for_clients_validation, this));
215 }
216
restart_client_supervision_timer()217 void Server::restart_client_supervision_timer()
218 {
219 if (!m_timer_running)
220 {
221 start_client_supervision_timer(get_config()->connect_timeout);
222 }
223 }
224
timeout_for_clients_validation()225 bool Server::timeout_for_clients_validation()
226 {
227 m_timer_running = false;
228
229 log_debug("Supervision timeout - started client state verification");
230
231 const chrono::time_point time_oldest =
232 chrono::now() - get_config()->connect_timeout;
233 const chrono::time_point time_to_release =
234 time_oldest + get_config()->connect_timeout_hysteresis;
235
236 Server_client_timeout client_validator(time_to_release);
237
238 go_through_all_clients(ngs::bind(&Server_client_timeout::validate_client_state, &client_validator, ngs::placeholders::_1));
239
240 if (chrono::is_valid(client_validator.get_oldest_client_accept_time()))
241 {
242 start_client_supervision_timer(client_validator.get_oldest_client_accept_time() - time_oldest);
243 }
244 return false;
245 }
246
247
on_accept(Connection_acceptor_interface & connection_acceptor)248 void Server::on_accept(Connection_acceptor_interface &connection_acceptor)
249 {
250 // That means that the event loop was just break in the stop()
251 if (m_state.is(State_terminating))
252 return;
253
254 Vio *vio = connection_acceptor.accept();
255
256 if (NULL == vio)
257 {
258 m_delegate->did_reject_client(Server_delegate::AcceptError);
259
260 if (0 == (m_errors_while_accepting++ & 255))
261 {
262 // error accepting client
263 log_error("Error accepting client");
264 }
265 const time_t microseconds_to_sleep = 100000;
266
267 my_sleep(microseconds_to_sleep);
268
269 return;
270 }
271
272 Connection_ptr connection(ngs::allocate_shared<ngs::Connection_vio>(ngs::ref(*m_ssl_context), vio));
273 ngs::shared_ptr<Client_interface> client(m_delegate->create_client(connection));
274
275 if (m_delegate->will_accept_client(*client))
276 {
277 m_delegate->did_accept_client(*client);
278
279 // connection accepted, add to client list and start handshake etc
280 client->reset_accept_time();
281 m_client_list.add(client);
282
283 Scheduler_dynamic::Task *task = ngs::allocate_object<Scheduler_dynamic::Task>(ngs::bind(&ngs::Client_interface::run, client,
284 m_skip_name_resolve));
285
286 const uint64_t client_id = client->client_id_num();
287 client.reset();
288
289 // all references to client object should be removed at this thread
290 if (!m_worker_scheduler->post(task))
291 {
292 log_error("Internal error scheduling client for execution");
293 ngs::free_object(task);
294 m_client_list.remove(client_id);
295 }
296
297 restart_client_supervision_timer();
298 }
299 else
300 {
301 m_delegate->did_reject_client(Server_delegate::TooManyConnections);
302 log_warning("Unable to accept connection, disconnecting client");
303 }
304 }
305
on_check_terminated_workers()306 bool Server::on_check_terminated_workers()
307 {
308 if (m_worker_scheduler)
309 {
310 m_worker_scheduler->join_terminating_workers();
311 return true;
312 }
313 return false;
314 }
315
create_session(Client_interface & client,Protocol_encoder & proto,int session_id)316 ngs::shared_ptr<Session_interface> Server::create_session(Client_interface &client,
317 Protocol_encoder &proto,
318 int session_id)
319 {
320 if (is_terminating())
321 return ngs::shared_ptr<Session_interface>();
322
323 return m_delegate->create_session(client, proto, session_id);
324 }
325
326
on_client_closed(const Client_interface & client)327 void Server::on_client_closed(const Client_interface &client)
328 {
329 log_debug("%s: on_client_close", client.client_id());
330 m_delegate->on_client_closed(client);
331
332 m_client_list.remove(client.client_id_num());
333 }
334
335
add_authentication_mechanism(const std::string & name,Authentication_handler::create initiator,const bool allowed_only_with_secure_connection)336 void Server::add_authentication_mechanism(const std::string &name,
337 Authentication_handler::create initiator,
338 const bool allowed_only_with_secure_connection)
339 {
340 Authentication_key key(name, allowed_only_with_secure_connection);
341
342 m_auth_handlers[key] = initiator;
343 }
344
get_auth_handler(const std::string & name,Session_interface * session)345 Authentication_handler_ptr Server::get_auth_handler(const std::string &name, Session_interface *session)
346 {
347 Connection_type type = session->client().connection().connection_type();
348 Authentication_key key(name, Connection_type_helper::is_secure_type(type));
349
350 Auth_handler_map::const_iterator auth_handler = m_auth_handlers.find(key);
351
352 if (auth_handler == m_auth_handlers.end())
353 return Authentication_handler_ptr();
354
355 return auth_handler->second(session);
356 }
357
get_authentication_mechanisms(std::vector<std::string> & auth_mech,Client_interface & client)358 void Server::get_authentication_mechanisms(std::vector<std::string> &auth_mech, Client_interface &client)
359 {
360 const Connection_type type = client.connection().connection_type();
361 const bool is_secure = Connection_type_helper::is_secure_type(type);
362
363 auth_mech.clear();
364
365 auth_mech.reserve(m_auth_handlers.size());
366
367 Auth_handler_map::const_iterator i = m_auth_handlers.begin();
368
369 while (m_auth_handlers.end() != i)
370 {
371 if (i->first.must_be_secure_connection == is_secure)
372 auth_mech.push_back(i->first.name);
373 ++i;
374 }
375 }
376
add_timer(const std::size_t delay_ms,ngs::function<bool ()> callback)377 void Server::add_timer(const std::size_t delay_ms, ngs::function<bool ()> callback)
378 {
379 m_acceptors->add_timer(delay_ms, callback);
380 }
381