1 /*
2  * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 #include "ngs/client.h"
27 #include "ngs/capabilities/handler_auth_mech.h"
28 #include "ngs/capabilities/handler_readonly_value.h"
29 #include "ngs/capabilities/handler_tls.h"
30 #include "ngs/interface/server_interface.h"
31 #include "ngs/interface/session_interface.h"
32 #include "ngs/ngs_error.h"
33 #include "ngs/protocol/protocol_config.h"
34 #include "ngs/protocol_monitor.h"
35 #include "ngs/scheduler.h"
36 #include "ngs_common/operations_factory.h"
37 
38 #include <string.h>
39 #include <algorithm>
40 #include <functional>
41 #ifndef WIN32
42 #include <arpa/inet.h>
43 #endif
44 
45 #include "ngs/log.h"
46 
47 #undef ERROR  // Needed to avoid conflict with ERROR in mysqlx.pb.h
48 #include "ngs_common/protocol_protobuf.h"
49 
50 using namespace ngs;
51 
Client(Connection_ptr connection,Server_interface & server,Client_id client_id,Protocol_monitor_interface & pmon)52 Client::Client(Connection_ptr connection, Server_interface &server,
53                Client_id client_id, Protocol_monitor_interface &pmon)
54     : m_client_id(client_id),
55       m_server(server),
56       m_connection(connection),
57       m_client_addr("n/c"),
58       m_client_port(0),
59       m_state(Client_invalid),
60       m_removed(false),
61       m_protocol_monitor(pmon),
62       m_close_reason(Not_closing),
63       m_msg_buffer(NULL),
64       m_msg_buffer_size(0) {
65   my_snprintf(m_id, sizeof(m_id), "%llu", static_cast<ulonglong>(client_id));
66 }
67 
~Client()68 Client::~Client() {
69   log_debug("%s: Delete client", m_id);
70   if (m_connection) m_connection->close();
71 
72   if (m_msg_buffer) ngs::free_array(m_msg_buffer);
73 }
74 
get_accept_time() const75 ngs::chrono::time_point Client::get_accept_time() const {
76   return m_accept_time;
77 }
78 
reset_accept_time()79 void Client::reset_accept_time() {
80   m_accept_time = chrono::now();
81   m_server.restart_client_supervision_timer();
82 }
83 
activate_tls()84 void Client::activate_tls() {
85   log_debug("%s: enabling TLS for client", client_id());
86 
87   if (m_server.ssl_context()->activate_tls(
88           connection(),
89           chrono::to_seconds(m_server.get_config()->connect_timeout))) {
90     if (connection().options()->active_tls()) session()->mark_as_tls_session();
91   } else {
92     log_warning("%s: Error during SSL handshake", client_id());
93     disconnect_and_trigger_close();
94   }
95 }
96 
on_auth_timeout()97 void Client::on_auth_timeout() {
98   m_close_reason = Close_connect_timeout;
99 
100   // XXX send an ERROR notice when it's available
101   disconnect_and_trigger_close();
102 }
103 
capabilities_configurator()104 Capabilities_configurator *Client::capabilities_configurator() {
105   std::vector<Capability_handler_ptr> handlers;
106 
107   handlers.push_back(ngs::allocate_shared<Capability_tls>(ngs::ref(*this)));
108   handlers.push_back(
109       ngs::allocate_shared<Capability_auth_mech>(ngs::ref(*this)));
110 
111   handlers.push_back(
112       ngs::allocate_shared<Capability_readonly_value>("doc.formats", "text"));
113 
114   return ngs::allocate_object<Capabilities_configurator>(handlers);
115 }
116 
get_capabilities(const Mysqlx::Connection::CapabilitiesGet & msg)117 void Client::get_capabilities(const Mysqlx::Connection::CapabilitiesGet &msg) {
118   ngs::Memory_instrumented<Capabilities_configurator>::Unique_ptr configurator(
119       capabilities_configurator());
120   ngs::Memory_instrumented<Mysqlx::Connection::Capabilities>::Unique_ptr caps(
121       configurator->get());
122 
123   m_encoder->send_message(Mysqlx::ServerMessages::CONN_CAPABILITIES, *caps);
124 }
125 
set_capabilities(const Mysqlx::Connection::CapabilitiesSet & setcap)126 void Client::set_capabilities(
127     const Mysqlx::Connection::CapabilitiesSet &setcap) {
128   ngs::Memory_instrumented<Capabilities_configurator>::Unique_ptr configurator(
129       capabilities_configurator());
130   Error_code error_code = configurator->prepare_set(setcap.capabilities());
131   m_encoder->send_result(error_code);
132   if (!error_code) {
133     configurator->commit();
134   }
135 }
136 
handle_message(Request & request)137 void Client::handle_message(Request &request) {
138   log_message_recv(request);
139 
140   Client_state expected_state = Client_accepted;
141 
142   // there is no session before authentication, so we handle the messages
143   // ourselves
144   log_debug("%s: Client got message %i", client_id(), request.get_type());
145 
146   switch (request.get_type()) {
147     case Mysqlx::ClientMessages::CON_CLOSE:
148       m_encoder->send_ok("bye!");
149       m_close_reason = Close_normal;
150       disconnect_and_trigger_close();
151       break;
152 
153     case Mysqlx::ClientMessages::SESS_RESET:
154       // no-op, since we're only going to get called here before session is
155       // authenticated
156       break;
157 
158     case Mysqlx::ClientMessages::CON_CAPABILITIES_GET:
159       get_capabilities(static_cast<const Mysqlx::Connection::CapabilitiesGet &>(
160           *request.message()));
161       break;
162 
163     case Mysqlx::ClientMessages::CON_CAPABILITIES_SET:
164       set_capabilities(static_cast<const Mysqlx::Connection::CapabilitiesSet &>(
165           *request.message()));
166       break;
167 
168     case Mysqlx::ClientMessages::SESS_AUTHENTICATE_START:
169       if (m_state.compare_exchange_strong(expected_state,
170                                           Client_authenticating_first) &&
171           server().is_running()) {
172         log_debug("%s: Authenticating client...", client_id());
173 
174         ngs::shared_ptr<Session_interface> s(session());
175         // start redirecting incoming messages directly to the session
176         if (s) {
177           // forward the message to the pre-allocated session, rest of auth will
178           // be handled by the session
179           s->handle_message(request);
180         }
181         break;
182       }
183       // Fall through.
184 
185     default:
186       // invalid message at this time
187       m_protocol_monitor.on_error_unknown_msg_type();
188       log_info("%s: Invalid message %i received during client initialization",
189                client_id(), request.get_type());
190       m_encoder->send_result(ngs::Fatal(ER_X_BAD_MESSAGE, "Invalid message"));
191       m_close_reason = Close_error;
192       disconnect_and_trigger_close();
193       break;
194   }
195 }
196 
disconnect_and_trigger_close()197 void Client::disconnect_and_trigger_close() {
198   if (m_close_reason == Not_closing) m_close_reason = Close_normal;
199 
200   shutdown_connection();
201 }
202 
203 // this will be called on socket errors, but also when halt_and_wait() is called
204 // which will shutdown the socket for reading and trigger a eof
205 // (meaning, closed for reads, but writes would still be ok)
on_network_error(int error)206 void Client::on_network_error(int error) {
207   if (error == 0)
208     log_debug("%s: peer disconnected (state %i)", client_id(), m_state.load());
209   else
210     log_debug("%s: network error %i (state %i)", client_id(), error,
211               m_state.load());
212   if (m_close_reason == Not_closing && m_state != Client_closing && error != 0)
213     m_close_reason = Close_net_error;
214 
215   m_state.exchange(Client_closing);
216 
217   if (m_session &&
218       (Client_authenticating_first == m_state || Client_running == m_state)) {
219     // trigger all sessions to close and stop whatever they're doing
220     log_debug("%s: killing session", client_id());
221     if (Session_interface::Closing != m_session->state())
222       server().get_worker_scheduler()->post_and_wait(
223           ngs::bind(&Client::on_kill, this, ngs::ref(*m_session)));
224   }
225 }
226 
on_kill(Session_interface & session)227 void Client::on_kill(Session_interface &session) { m_session->on_kill(); }
228 
remove_client_from_server()229 void Client::remove_client_from_server() {
230   if (false == m_removed.exchange(true)) m_server.on_client_closed(*this);
231 }
232 
on_client_addr(const bool skip_resolve)233 void Client::on_client_addr(const bool skip_resolve) {
234   m_client_addr.resize(INET6_ADDRSTRLEN);
235 
236   switch (m_connection->connection_type()) {
237     case Connection_tcpip: {
238       m_connection->peer_address(m_client_addr, m_client_port);
239     } break;
240 
241     case Connection_namedpipe:
242     case Connection_unixsocket:  // fall through
243       m_client_host = "localhost";
244       return;
245 
246     default:
247       return;
248   }
249 
250   // turn IP to hostname for auth uses
251   if (skip_resolve) return;
252 
253   m_client_host = "";
254 
255   try {
256     m_client_host = resolve_hostname();
257   } catch (...) {
258     m_close_reason = Close_reject;
259     disconnect_and_trigger_close();
260 
261     throw;
262   }
263 }
264 
on_accept()265 void Client::on_accept() {
266   log_debug("%s: Accepted client connection from %s", client_id(),
267             client_address());
268 
269   m_connection->set_socket_thread_owner();
270 
271   // it can be accessed directly (no other thread access thus object)
272   m_state = Client_accepted;
273 
274   m_encoder.reset(ngs::allocate_object<Protocol_encoder>(
275       m_connection,
276       ngs::bind(&Client::on_network_error, this, ngs::placeholders::_1),
277       ngs::ref(m_protocol_monitor)));
278 
279   // pre-allocate the initial session
280   // this is also needed for the srv_session to correctly report us to the
281   // audit.log as in the Pre-authenticate state
282   ngs::shared_ptr<Session_interface> session(
283       m_server.create_session(*this, *m_encoder, 1));
284   if (!session) {
285     log_warning("%s: Error creating session for connection from %s",
286                 client_id(), m_client_addr.c_str());
287     m_encoder->send_init_error(
288         ngs::Fatal(ER_OUT_OF_RESOURCES, "Could not allocate session"));
289   } else {
290     ngs::Error_code error(session->init());
291     if (error) {
292       log_warning("%s: Error initializing session for connection: %s",
293                   client_id(), error.message.c_str());
294       m_encoder->send_result(error);
295       session.reset();
296     } else
297       m_session = session;
298   }
299   if (!session) {
300     m_close_reason = Close_error;
301     disconnect_and_trigger_close();
302   }
303 }
304 
on_session_auth_success(Session_interface & s)305 void Client::on_session_auth_success(Session_interface &s) {
306   // this is called from worker thread
307   Client_state expected = Client_authenticating_first;
308   m_state.compare_exchange_strong(expected, Client_running);
309 }
310 
on_session_close(Session_interface & s)311 void Client::on_session_close(Session_interface &s) {
312   log_debug("%s: Session %i removed", client_id(), s.session_id());
313 
314   // no more open sessions, disconnect
315   if (m_close_reason == Not_closing) m_close_reason = Close_normal;
316 
317   m_state = Client_closing;
318 
319   shutdown_connection();
320 
321   remove_client_from_server();
322 }
323 
on_session_reset(Session_interface & s)324 void Client::on_session_reset(Session_interface &s) {
325   log_debug("%s: Resetting session %i", client_id(), s.session_id());
326 
327   m_state = Client_accepted_with_session;
328   ngs::shared_ptr<Session_interface> session(
329       m_server.create_session(*this, *m_encoder, 1));
330   if (!session) {
331     log_warning("%s: Error creating session for connection from %s",
332                 client_id(), m_client_addr.c_str());
333     m_encoder->send_result(
334         ngs::Fatal(ER_OUT_OF_RESOURCES, "Could not allocate new session"));
335     m_state = Client_closing;
336   } else {
337     ngs::Error_code error(session->init());
338     if (error) {
339       log_warning("%s: Error initializing session for connection: %s",
340                   client_id(), error.message.c_str());
341       m_encoder->send_result(error);
342       session.reset();
343       m_state = Client_closing;
344     } else {
345       m_session = session;
346       m_encoder->send_ok();
347     }
348   }
349 }
350 
on_server_shutdown()351 void Client::on_server_shutdown() {
352   log_info("%s: closing client because of shutdown (state: %i)", client_id(),
353            m_state.load());
354   // XXX send a server shutdown notice
355   disconnect_and_trigger_close();
356 }
357 
get_protocol_monitor()358 Protocol_monitor_interface &Client::get_protocol_monitor() {
359   return m_protocol_monitor;
360 }
361 
get_last_error(int & error_code,std::string & message)362 void Client::get_last_error(int &error_code, std::string &message) {
363   ngs::Operations_factory operations_factory;
364   System_interface::Shared_ptr system_interface(
365       operations_factory.create_system_interface());
366 
367   system_interface->get_socket_error_and_message(error_code, message);
368 }
369 
shutdown_connection()370 void Client::shutdown_connection() {
371   m_state = Client_closing;
372 
373   if (m_connection->shutdown(Connection_vio::Shutdown_recv) < 0) {
374     int err;
375     std::string strerr;
376 
377     get_last_error(err, strerr);
378     log_debug("%s: connection shutdown error %s (%i)", client_id(),
379               strerr.c_str(), err);
380   }
381 }
382 
read_one_message(Error_code & ret_error)383 Request *Client::read_one_message(Error_code &ret_error) {
384   union {
385     char buffer[4];  // Must be properly aligned
386     longlong dummy;
387   };
388   uint32_t msg_size;
389 
390   /*
391     Use dummy, otherwise g++ 4.4 reports: unused variable 'dummy'
392     MY_ATTRIBUTE((unused)) did not work, so we must use it.
393   */
394   dummy = 0;
395 
396   // untill we get another message to process we mark the connection as idle
397   // (for PSF)
398   m_connection->mark_idle();
399   // read the frame
400   ssize_t nread = m_connection->read(buffer, 4);
401   m_connection->mark_active();
402 
403   if (nread == 0)  // EOF
404   {
405     on_network_error(0);
406     return NULL;
407   }
408   if (nread < 0) {
409     int err;
410     std::string strerr;
411     get_last_error(err, strerr);
412     if (!(err == EBADF && m_close_reason == Close_connect_timeout)) {
413       log_debug("%s: %s (%i) %i", client_id(), strerr.c_str(), err,
414                 m_close_reason);
415       on_network_error(err);
416     }
417     return NULL;
418   }
419 
420   m_protocol_monitor.on_receive(static_cast<long>(nread));
421 
422 #ifdef WORDS_BIGENDIAN
423   std::swap(buffer[0], buffer[3]);
424   std::swap(buffer[1], buffer[2]);
425 #endif
426   const uint32_t *pdata = (uint32_t *)(buffer);
427   msg_size = *pdata;
428 
429   if (msg_size > m_server.get_config()->max_message_size) {
430     log_warning("%s: Message of size %u received, exceeding the limit of %i",
431                 client_id(), msg_size, m_server.get_config()->max_message_size);
432     // invalid message size
433     // Don't send error, just abort connection
434     // ret_error = Fatal(ER_X_BAD_MESSAGE, "Message too large");
435     return NULL;
436   }
437 
438   if (0 == msg_size) {
439     ret_error =
440         Error(ER_X_BAD_MESSAGE, "Messages without payload are not supported");
441     return NULL;
442   }
443 
444   if (m_msg_buffer_size < msg_size) {
445     m_msg_buffer_size = msg_size;
446     ngs::reallocate_array(m_msg_buffer, m_msg_buffer_size,
447                           KEY_memory_x_recv_buffer);
448   }
449 
450   nread = m_connection->read(&m_msg_buffer[0], msg_size);
451   if (nread == 0)  // EOF
452   {
453     log_info("%s: peer disconnected while reading message body", client_id());
454     on_network_error(0);
455     return NULL;
456   }
457 
458   if (nread < 0) {
459     int err;
460     std::string strerr;
461 
462     get_last_error(err, strerr);
463     log_debug("%s: %s (%i)", client_id(), strerr.c_str(), err);
464     on_network_error(err);
465     return NULL;
466   }
467 
468   m_protocol_monitor.on_receive(static_cast<long>(nread));
469 
470   int8_t type = (int8_t)m_msg_buffer[0];
471   Request_unique_ptr request(ngs::allocate_object<Request>(type));
472 
473   if (msg_size > 1) request->buffer(&m_msg_buffer[1], msg_size - 1);
474 
475   ret_error = m_decoder.parse(*request);
476 
477   return request.release();
478 }
479 
run(const bool skip_name_resolve)480 void Client::run(const bool skip_name_resolve) {
481   try {
482     on_client_addr(skip_name_resolve);
483     on_accept();
484 
485     while (m_state != Client_closing && m_session) {
486       Error_code error;
487       Request_unique_ptr message(read_one_message(error));
488 
489       // read could took some time, thus lets recheck the state
490       if (m_state == Client_closing) break;
491 
492       if (error || !message) {
493         // !message and !error = EOF
494         if (error) m_encoder->send_result(ngs::Fatal(error));
495         disconnect_and_trigger_close();
496         break;
497       }
498       ngs::shared_ptr<Session_interface> s(session());
499       if (m_state != Client_accepted && s) {
500         // pass the message to the session
501         s->handle_message(*message);
502       } else
503         handle_message(*message);
504     }
505   } catch (std::exception &e) {
506     log_error("%s: Force stopping client because exception occurred: %s",
507               client_id(), e.what());
508   }
509 
510   {
511     Mutex_lock lock(server().get_client_exit_mutex());
512     m_state = Client_closed;
513 
514     remove_client_from_server();
515   }
516 }
517