1 /*
2 * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 * 02110-1301 USA
24 */
25
26 #include "ngs/client.h"
27 #include "ngs/capabilities/handler_auth_mech.h"
28 #include "ngs/capabilities/handler_readonly_value.h"
29 #include "ngs/capabilities/handler_tls.h"
30 #include "ngs/interface/server_interface.h"
31 #include "ngs/interface/session_interface.h"
32 #include "ngs/ngs_error.h"
33 #include "ngs/protocol/protocol_config.h"
34 #include "ngs/protocol_monitor.h"
35 #include "ngs/scheduler.h"
36 #include "ngs_common/operations_factory.h"
37
38 #include <string.h>
39 #include <algorithm>
40 #include <functional>
41 #ifndef WIN32
42 #include <arpa/inet.h>
43 #endif
44
45 #include "ngs/log.h"
46
47 #undef ERROR // Needed to avoid conflict with ERROR in mysqlx.pb.h
48 #include "ngs_common/protocol_protobuf.h"
49
50 using namespace ngs;
51
Client(Connection_ptr connection,Server_interface & server,Client_id client_id,Protocol_monitor_interface & pmon)52 Client::Client(Connection_ptr connection, Server_interface &server,
53 Client_id client_id, Protocol_monitor_interface &pmon)
54 : m_client_id(client_id),
55 m_server(server),
56 m_connection(connection),
57 m_client_addr("n/c"),
58 m_client_port(0),
59 m_state(Client_invalid),
60 m_removed(false),
61 m_protocol_monitor(pmon),
62 m_close_reason(Not_closing),
63 m_msg_buffer(NULL),
64 m_msg_buffer_size(0) {
65 my_snprintf(m_id, sizeof(m_id), "%llu", static_cast<ulonglong>(client_id));
66 }
67
~Client()68 Client::~Client() {
69 log_debug("%s: Delete client", m_id);
70 if (m_connection) m_connection->close();
71
72 if (m_msg_buffer) ngs::free_array(m_msg_buffer);
73 }
74
get_accept_time() const75 ngs::chrono::time_point Client::get_accept_time() const {
76 return m_accept_time;
77 }
78
reset_accept_time()79 void Client::reset_accept_time() {
80 m_accept_time = chrono::now();
81 m_server.restart_client_supervision_timer();
82 }
83
activate_tls()84 void Client::activate_tls() {
85 log_debug("%s: enabling TLS for client", client_id());
86
87 if (m_server.ssl_context()->activate_tls(
88 connection(),
89 chrono::to_seconds(m_server.get_config()->connect_timeout))) {
90 if (connection().options()->active_tls()) session()->mark_as_tls_session();
91 } else {
92 log_warning("%s: Error during SSL handshake", client_id());
93 disconnect_and_trigger_close();
94 }
95 }
96
on_auth_timeout()97 void Client::on_auth_timeout() {
98 m_close_reason = Close_connect_timeout;
99
100 // XXX send an ERROR notice when it's available
101 disconnect_and_trigger_close();
102 }
103
capabilities_configurator()104 Capabilities_configurator *Client::capabilities_configurator() {
105 std::vector<Capability_handler_ptr> handlers;
106
107 handlers.push_back(ngs::allocate_shared<Capability_tls>(ngs::ref(*this)));
108 handlers.push_back(
109 ngs::allocate_shared<Capability_auth_mech>(ngs::ref(*this)));
110
111 handlers.push_back(
112 ngs::allocate_shared<Capability_readonly_value>("doc.formats", "text"));
113
114 return ngs::allocate_object<Capabilities_configurator>(handlers);
115 }
116
get_capabilities(const Mysqlx::Connection::CapabilitiesGet & msg)117 void Client::get_capabilities(const Mysqlx::Connection::CapabilitiesGet &msg) {
118 ngs::Memory_instrumented<Capabilities_configurator>::Unique_ptr configurator(
119 capabilities_configurator());
120 ngs::Memory_instrumented<Mysqlx::Connection::Capabilities>::Unique_ptr caps(
121 configurator->get());
122
123 m_encoder->send_message(Mysqlx::ServerMessages::CONN_CAPABILITIES, *caps);
124 }
125
set_capabilities(const Mysqlx::Connection::CapabilitiesSet & setcap)126 void Client::set_capabilities(
127 const Mysqlx::Connection::CapabilitiesSet &setcap) {
128 ngs::Memory_instrumented<Capabilities_configurator>::Unique_ptr configurator(
129 capabilities_configurator());
130 Error_code error_code = configurator->prepare_set(setcap.capabilities());
131 m_encoder->send_result(error_code);
132 if (!error_code) {
133 configurator->commit();
134 }
135 }
136
handle_message(Request & request)137 void Client::handle_message(Request &request) {
138 log_message_recv(request);
139
140 Client_state expected_state = Client_accepted;
141
142 // there is no session before authentication, so we handle the messages
143 // ourselves
144 log_debug("%s: Client got message %i", client_id(), request.get_type());
145
146 switch (request.get_type()) {
147 case Mysqlx::ClientMessages::CON_CLOSE:
148 m_encoder->send_ok("bye!");
149 m_close_reason = Close_normal;
150 disconnect_and_trigger_close();
151 break;
152
153 case Mysqlx::ClientMessages::SESS_RESET:
154 // no-op, since we're only going to get called here before session is
155 // authenticated
156 break;
157
158 case Mysqlx::ClientMessages::CON_CAPABILITIES_GET:
159 get_capabilities(static_cast<const Mysqlx::Connection::CapabilitiesGet &>(
160 *request.message()));
161 break;
162
163 case Mysqlx::ClientMessages::CON_CAPABILITIES_SET:
164 set_capabilities(static_cast<const Mysqlx::Connection::CapabilitiesSet &>(
165 *request.message()));
166 break;
167
168 case Mysqlx::ClientMessages::SESS_AUTHENTICATE_START:
169 if (m_state.compare_exchange_strong(expected_state,
170 Client_authenticating_first) &&
171 server().is_running()) {
172 log_debug("%s: Authenticating client...", client_id());
173
174 ngs::shared_ptr<Session_interface> s(session());
175 // start redirecting incoming messages directly to the session
176 if (s) {
177 // forward the message to the pre-allocated session, rest of auth will
178 // be handled by the session
179 s->handle_message(request);
180 }
181 break;
182 }
183 // Fall through.
184
185 default:
186 // invalid message at this time
187 m_protocol_monitor.on_error_unknown_msg_type();
188 log_info("%s: Invalid message %i received during client initialization",
189 client_id(), request.get_type());
190 m_encoder->send_result(ngs::Fatal(ER_X_BAD_MESSAGE, "Invalid message"));
191 m_close_reason = Close_error;
192 disconnect_and_trigger_close();
193 break;
194 }
195 }
196
disconnect_and_trigger_close()197 void Client::disconnect_and_trigger_close() {
198 if (m_close_reason == Not_closing) m_close_reason = Close_normal;
199
200 shutdown_connection();
201 }
202
203 // this will be called on socket errors, but also when halt_and_wait() is called
204 // which will shutdown the socket for reading and trigger a eof
205 // (meaning, closed for reads, but writes would still be ok)
on_network_error(int error)206 void Client::on_network_error(int error) {
207 if (error == 0)
208 log_debug("%s: peer disconnected (state %i)", client_id(), m_state.load());
209 else
210 log_debug("%s: network error %i (state %i)", client_id(), error,
211 m_state.load());
212 if (m_close_reason == Not_closing && m_state != Client_closing && error != 0)
213 m_close_reason = Close_net_error;
214
215 m_state.exchange(Client_closing);
216
217 if (m_session &&
218 (Client_authenticating_first == m_state || Client_running == m_state)) {
219 // trigger all sessions to close and stop whatever they're doing
220 log_debug("%s: killing session", client_id());
221 if (Session_interface::Closing != m_session->state())
222 server().get_worker_scheduler()->post_and_wait(
223 ngs::bind(&Client::on_kill, this, ngs::ref(*m_session)));
224 }
225 }
226
on_kill(Session_interface & session)227 void Client::on_kill(Session_interface &session) { m_session->on_kill(); }
228
remove_client_from_server()229 void Client::remove_client_from_server() {
230 if (false == m_removed.exchange(true)) m_server.on_client_closed(*this);
231 }
232
on_client_addr(const bool skip_resolve)233 void Client::on_client_addr(const bool skip_resolve) {
234 m_client_addr.resize(INET6_ADDRSTRLEN);
235
236 switch (m_connection->connection_type()) {
237 case Connection_tcpip: {
238 m_connection->peer_address(m_client_addr, m_client_port);
239 } break;
240
241 case Connection_namedpipe:
242 case Connection_unixsocket: // fall through
243 m_client_host = "localhost";
244 return;
245
246 default:
247 return;
248 }
249
250 // turn IP to hostname for auth uses
251 if (skip_resolve) return;
252
253 m_client_host = "";
254
255 try {
256 m_client_host = resolve_hostname();
257 } catch (...) {
258 m_close_reason = Close_reject;
259 disconnect_and_trigger_close();
260
261 throw;
262 }
263 }
264
on_accept()265 void Client::on_accept() {
266 log_debug("%s: Accepted client connection from %s", client_id(),
267 client_address());
268
269 m_connection->set_socket_thread_owner();
270
271 // it can be accessed directly (no other thread access thus object)
272 m_state = Client_accepted;
273
274 m_encoder.reset(ngs::allocate_object<Protocol_encoder>(
275 m_connection,
276 ngs::bind(&Client::on_network_error, this, ngs::placeholders::_1),
277 ngs::ref(m_protocol_monitor)));
278
279 // pre-allocate the initial session
280 // this is also needed for the srv_session to correctly report us to the
281 // audit.log as in the Pre-authenticate state
282 ngs::shared_ptr<Session_interface> session(
283 m_server.create_session(*this, *m_encoder, 1));
284 if (!session) {
285 log_warning("%s: Error creating session for connection from %s",
286 client_id(), m_client_addr.c_str());
287 m_encoder->send_init_error(
288 ngs::Fatal(ER_OUT_OF_RESOURCES, "Could not allocate session"));
289 } else {
290 ngs::Error_code error(session->init());
291 if (error) {
292 log_warning("%s: Error initializing session for connection: %s",
293 client_id(), error.message.c_str());
294 m_encoder->send_result(error);
295 session.reset();
296 } else
297 m_session = session;
298 }
299 if (!session) {
300 m_close_reason = Close_error;
301 disconnect_and_trigger_close();
302 }
303 }
304
on_session_auth_success(Session_interface & s)305 void Client::on_session_auth_success(Session_interface &s) {
306 // this is called from worker thread
307 Client_state expected = Client_authenticating_first;
308 m_state.compare_exchange_strong(expected, Client_running);
309 }
310
on_session_close(Session_interface & s)311 void Client::on_session_close(Session_interface &s) {
312 log_debug("%s: Session %i removed", client_id(), s.session_id());
313
314 // no more open sessions, disconnect
315 if (m_close_reason == Not_closing) m_close_reason = Close_normal;
316
317 m_state = Client_closing;
318
319 shutdown_connection();
320
321 remove_client_from_server();
322 }
323
on_session_reset(Session_interface & s)324 void Client::on_session_reset(Session_interface &s) {
325 log_debug("%s: Resetting session %i", client_id(), s.session_id());
326
327 m_state = Client_accepted_with_session;
328 ngs::shared_ptr<Session_interface> session(
329 m_server.create_session(*this, *m_encoder, 1));
330 if (!session) {
331 log_warning("%s: Error creating session for connection from %s",
332 client_id(), m_client_addr.c_str());
333 m_encoder->send_result(
334 ngs::Fatal(ER_OUT_OF_RESOURCES, "Could not allocate new session"));
335 m_state = Client_closing;
336 } else {
337 ngs::Error_code error(session->init());
338 if (error) {
339 log_warning("%s: Error initializing session for connection: %s",
340 client_id(), error.message.c_str());
341 m_encoder->send_result(error);
342 session.reset();
343 m_state = Client_closing;
344 } else {
345 m_session = session;
346 m_encoder->send_ok();
347 }
348 }
349 }
350
on_server_shutdown()351 void Client::on_server_shutdown() {
352 log_info("%s: closing client because of shutdown (state: %i)", client_id(),
353 m_state.load());
354 // XXX send a server shutdown notice
355 disconnect_and_trigger_close();
356 }
357
get_protocol_monitor()358 Protocol_monitor_interface &Client::get_protocol_monitor() {
359 return m_protocol_monitor;
360 }
361
get_last_error(int & error_code,std::string & message)362 void Client::get_last_error(int &error_code, std::string &message) {
363 ngs::Operations_factory operations_factory;
364 System_interface::Shared_ptr system_interface(
365 operations_factory.create_system_interface());
366
367 system_interface->get_socket_error_and_message(error_code, message);
368 }
369
shutdown_connection()370 void Client::shutdown_connection() {
371 m_state = Client_closing;
372
373 if (m_connection->shutdown(Connection_vio::Shutdown_recv) < 0) {
374 int err;
375 std::string strerr;
376
377 get_last_error(err, strerr);
378 log_debug("%s: connection shutdown error %s (%i)", client_id(),
379 strerr.c_str(), err);
380 }
381 }
382
read_one_message(Error_code & ret_error)383 Request *Client::read_one_message(Error_code &ret_error) {
384 union {
385 char buffer[4]; // Must be properly aligned
386 longlong dummy;
387 };
388 uint32_t msg_size;
389
390 /*
391 Use dummy, otherwise g++ 4.4 reports: unused variable 'dummy'
392 MY_ATTRIBUTE((unused)) did not work, so we must use it.
393 */
394 dummy = 0;
395
396 // untill we get another message to process we mark the connection as idle
397 // (for PSF)
398 m_connection->mark_idle();
399 // read the frame
400 ssize_t nread = m_connection->read(buffer, 4);
401 m_connection->mark_active();
402
403 if (nread == 0) // EOF
404 {
405 on_network_error(0);
406 return NULL;
407 }
408 if (nread < 0) {
409 int err;
410 std::string strerr;
411 get_last_error(err, strerr);
412 if (!(err == EBADF && m_close_reason == Close_connect_timeout)) {
413 log_debug("%s: %s (%i) %i", client_id(), strerr.c_str(), err,
414 m_close_reason);
415 on_network_error(err);
416 }
417 return NULL;
418 }
419
420 m_protocol_monitor.on_receive(static_cast<long>(nread));
421
422 #ifdef WORDS_BIGENDIAN
423 std::swap(buffer[0], buffer[3]);
424 std::swap(buffer[1], buffer[2]);
425 #endif
426 const uint32_t *pdata = (uint32_t *)(buffer);
427 msg_size = *pdata;
428
429 if (msg_size > m_server.get_config()->max_message_size) {
430 log_warning("%s: Message of size %u received, exceeding the limit of %i",
431 client_id(), msg_size, m_server.get_config()->max_message_size);
432 // invalid message size
433 // Don't send error, just abort connection
434 // ret_error = Fatal(ER_X_BAD_MESSAGE, "Message too large");
435 return NULL;
436 }
437
438 if (0 == msg_size) {
439 ret_error =
440 Error(ER_X_BAD_MESSAGE, "Messages without payload are not supported");
441 return NULL;
442 }
443
444 if (m_msg_buffer_size < msg_size) {
445 m_msg_buffer_size = msg_size;
446 ngs::reallocate_array(m_msg_buffer, m_msg_buffer_size,
447 KEY_memory_x_recv_buffer);
448 }
449
450 nread = m_connection->read(&m_msg_buffer[0], msg_size);
451 if (nread == 0) // EOF
452 {
453 log_info("%s: peer disconnected while reading message body", client_id());
454 on_network_error(0);
455 return NULL;
456 }
457
458 if (nread < 0) {
459 int err;
460 std::string strerr;
461
462 get_last_error(err, strerr);
463 log_debug("%s: %s (%i)", client_id(), strerr.c_str(), err);
464 on_network_error(err);
465 return NULL;
466 }
467
468 m_protocol_monitor.on_receive(static_cast<long>(nread));
469
470 int8_t type = (int8_t)m_msg_buffer[0];
471 Request_unique_ptr request(ngs::allocate_object<Request>(type));
472
473 if (msg_size > 1) request->buffer(&m_msg_buffer[1], msg_size - 1);
474
475 ret_error = m_decoder.parse(*request);
476
477 return request.release();
478 }
479
run(const bool skip_name_resolve)480 void Client::run(const bool skip_name_resolve) {
481 try {
482 on_client_addr(skip_name_resolve);
483 on_accept();
484
485 while (m_state != Client_closing && m_session) {
486 Error_code error;
487 Request_unique_ptr message(read_one_message(error));
488
489 // read could took some time, thus lets recheck the state
490 if (m_state == Client_closing) break;
491
492 if (error || !message) {
493 // !message and !error = EOF
494 if (error) m_encoder->send_result(ngs::Fatal(error));
495 disconnect_and_trigger_close();
496 break;
497 }
498 ngs::shared_ptr<Session_interface> s(session());
499 if (m_state != Client_accepted && s) {
500 // pass the message to the session
501 s->handle_message(*message);
502 } else
503 handle_message(*message);
504 }
505 } catch (std::exception &e) {
506 log_error("%s: Force stopping client because exception occurred: %s",
507 client_id(), e.what());
508 }
509
510 {
511 Mutex_lock lock(server().get_client_exit_mutex());
512 m_state = Client_closed;
513
514 remove_client_from_server();
515 }
516 }
517