1 /*
2  * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 #include "xpl_server.h"
27 #include "xpl_client.h"
28 #include "xpl_session.h"
29 #include "xpl_system_variables.h"
30 #include "io/xpl_listener_factory.h"
31 #include "mysql_variables.h"
32 #include "mysql_show_variable_wrapper.h"
33 #include "sql_data_result.h"
34 #include "auth_plain.h"
35 #include "auth_mysql41.h"
36 #include "xpl_error.h"
37 #include "ngs/scheduler.h"
38 #include "ngs/protocol_authentication.h"
39 #include "ngs/protocol/protocol_config.h"
40 #include "ngs/interface/listener_interface.h"
41 #include "ngs/server_acceptors.h"
42 #include <mysql/plugin.h>
43 #include "my_thread_local.h"
44 #include "mysql/service_ssl_wrapper.h"
45 #include "mysqlx_version.h"
46 #include <openssl/err.h>
47 
48 class Session_scheduler : public ngs::Scheduler_dynamic
49 {
50 public:
Session_scheduler(const char * name,void * plugin)51   Session_scheduler(const char* name, void *plugin)
52   : ngs::Scheduler_dynamic(name, KEY_thread_x_worker), m_plugin_ptr(plugin)
53   {
54   }
55 
thread_init()56   virtual bool thread_init()
57   {
58     if (srv_session_init_thread(m_plugin_ptr) != 0)
59     {
60       log_error("srv_session_init_thread returned error");
61       return false;
62     }
63 
64 #ifdef HAVE_PSI_THREAD_INTERFACE
65     // Reset user name and hostname stored in PFS_thread
66     // which were copied from parent thread
67     PSI_THREAD_CALL(set_thread_account) (
68         "", 0, "", 0);
69 #endif // HAVE_PSI_THREAD_INTERFACE
70 
71     ngs::Scheduler_dynamic::thread_init();
72 
73 #if defined(__APPLE__) || defined(HAVE_PTHREAD_SETNAME_NP)
74     char thread_name[16];
75     static int worker = 0;
76     my_snprintf(thread_name, sizeof(thread_name), "xpl_worker%i", worker++);
77 #ifdef __APPLE__
78     pthread_setname_np(thread_name);
79 #else
80     pthread_setname_np(pthread_self(), thread_name);
81 #endif
82 #endif
83 
84     return true;
85   }
86 
thread_end()87   virtual void thread_end()
88   {
89     ngs::Scheduler_dynamic::thread_end();
90     srv_session_deinit_thread();
91 
92     ssl_wrapper_thread_cleanup();
93   }
94 private:
95   void *m_plugin_ptr;
96 };
97 
98 
99 class Worker_scheduler_monitor : public ngs::Scheduler_dynamic::Monitor_interface
100 {
101 public:
on_worker_thread_create()102   virtual void on_worker_thread_create()
103   {
104     ++xpl::Global_status_variables::instance().m_worker_thread_count;
105   }
106 
on_worker_thread_destroy()107   virtual void on_worker_thread_destroy()
108   {
109     --xpl::Global_status_variables::instance().m_worker_thread_count;
110   }
111 
on_task_start()112   virtual void on_task_start()
113   {
114     ++xpl::Global_status_variables::instance().m_active_worker_thread_count;
115   }
116 
on_task_end()117   virtual void on_task_end()
118   {
119     --xpl::Global_status_variables::instance().m_active_worker_thread_count;
120   }
121 };
122 
123 
124 namespace
125 {
126 
127 const char *STATUS_VALUE_FOR_NOT_CONFIGURED_INTERFACE = "UNDEFINED";
128 
129 } // namespace
130 
131 xpl::Server* xpl::Server::instance;
132 ngs::RWLock  xpl::Server::instance_rwl;
133 bool         xpl::Server::exiting = false;
134 
Server(ngs::shared_ptr<ngs::Server_acceptors> acceptors,ngs::shared_ptr<ngs::Scheduler_dynamic> wscheduler,ngs::shared_ptr<ngs::Protocol_config> config)135 xpl::Server::Server(ngs::shared_ptr<ngs::Server_acceptors> acceptors,
136                     ngs::shared_ptr<ngs::Scheduler_dynamic> wscheduler,
137                     ngs::shared_ptr<ngs::Protocol_config> config)
138 : m_client_id(0),
139   m_num_of_connections(0),
140   m_config(config),
141   m_acceptors(acceptors),
142   m_wscheduler(wscheduler),
143   m_nscheduler(ngs::allocate_shared<ngs::Scheduler_dynamic>("network", KEY_thread_x_acceptor)),
144   m_server(acceptors, m_nscheduler, wscheduler, this, config)
145 {
146 }
147 
148 
start_verify_server_state_timer()149 void xpl::Server::start_verify_server_state_timer()
150 {
151   m_server.add_timer(1000, ngs::bind(&Server::on_verify_server_state, this));
152 }
153 
154 
155 /** Timer handler that polls whether X plugin event loop should stop.
156 
157 This can be triggered when:
158 - server is shutting down
159 - plugin is being uninstalled
160 
161 Because this is called by the timer handler from the acceptor event loop,
162 it is guaranteed that it'll run in the acceptor thread.
163 */
on_verify_server_state()164 bool xpl::Server::on_verify_server_state()
165 {
166   if (is_exiting())
167   {
168     if (!exiting)
169       log_info("Shutdown triggered by mysqld abort flag");
170 
171     // closing clients has been moved to other thread
172     // this thread have to gracefully shutdown io operations
173     if (m_wscheduler->is_running())
174     {
175       typedef ngs::Scheduler_dynamic::Task Task;
176       Task *task = ngs::allocate_object<Task>(ngs::bind(&ngs::Server::close_all_clients, &m_server));
177       if (!m_wscheduler->post(task))
178       {
179         log_debug("Unable to schedule closing all clients ");
180         ngs::free_object(task);
181       }
182     }
183 
184     const bool is_called_from_timeout_handler = true;
185     m_server.stop(is_called_from_timeout_handler);
186 
187     return false;
188   }
189   return true;
190 }
191 
192 
create_client(ngs::Connection_ptr connection)193 ngs::shared_ptr<ngs::Client_interface> xpl::Server::create_client(ngs::Connection_ptr connection)
194 {
195   ngs::shared_ptr<ngs::Client_interface> result;
196   result = ngs::allocate_shared<xpl::Client>(connection, ngs::ref(m_server), ++m_client_id,
197                                              ngs::allocate_object<xpl::Protocol_monitor>());
198   return result;
199 }
200 
201 
create_session(ngs::Client_interface & client,ngs::Protocol_encoder & proto,Session::Session_id session_id)202 ngs::shared_ptr<ngs::Session_interface> xpl::Server::create_session(ngs::Client_interface &client,
203                                                             ngs::Protocol_encoder &proto,
204                                                             Session::Session_id session_id)
205 {
206   return ngs::shared_ptr<ngs::Session>(
207            ngs::allocate_shared<xpl::Session>(ngs::ref(client), &proto, session_id));
208 }
209 
210 
on_client_closed(const ngs::Client_interface & client)211 void xpl::Server::on_client_closed(const ngs::Client_interface &client)
212 {
213   ++Global_status_variables::instance().m_closed_connections_count;
214 
215   // Only accepted clients are calling on_client_closed
216   --m_num_of_connections;
217 }
218 
219 
will_accept_client(const ngs::Client_interface & client)220 bool xpl::Server::will_accept_client(const ngs::Client_interface &client)
221 {
222   Mutex_lock lock(m_accepting_mutex);
223 
224   ++m_num_of_connections;
225 
226   log_debug("num_of_connections: %i, max_num_of_connections: %i",(int)m_num_of_connections, (int)xpl::Plugin_system_variables::max_connections);
227   bool can_be_accepted = m_num_of_connections <= (int)xpl::Plugin_system_variables::max_connections;
228 
229   if (!can_be_accepted || is_terminating())
230   {
231     --m_num_of_connections;
232     return false;
233   }
234 
235   return true;
236 }
237 
238 
did_accept_client(const ngs::Client_interface & client)239 void xpl::Server::did_accept_client(const ngs::Client_interface &client)
240 {
241   ++Global_status_variables::instance().m_accepted_connections_count;
242 }
243 
244 
did_reject_client(ngs::Server_delegate::Reject_reason reason)245 void xpl::Server::did_reject_client(ngs::Server_delegate::Reject_reason reason)
246 {
247   switch (reason)
248   {
249     case ngs::Server_delegate::AcceptError:
250       ++Global_status_variables::instance().m_connection_errors_count;
251       ++Global_status_variables::instance().m_connection_accept_errors_count;
252       break;
253     case ngs::Server_delegate::TooManyConnections:
254       ++Global_status_variables::instance().m_rejected_connections_count;
255       break;
256   }
257 }
258 
259 
plugin_system_variables_changed()260 void xpl::Server::plugin_system_variables_changed()
261 {
262   const unsigned int min = m_wscheduler->set_num_workers(Plugin_system_variables::min_worker_threads);
263   if (min < Plugin_system_variables::min_worker_threads)
264     Plugin_system_variables::min_worker_threads = min;
265 
266   m_wscheduler->set_idle_worker_timeout(Plugin_system_variables::idle_worker_thread_timeout * 1000);
267 
268   m_config->max_message_size = Plugin_system_variables::max_allowed_packet;
269   m_config->connect_timeout = ngs::chrono::seconds(Plugin_system_variables::connect_timeout);
270 }
271 
272 
is_terminating() const273 bool xpl::Server::is_terminating() const
274 {
275   return mysqld::is_terminating();
276 }
277 
278 
is_exiting()279 bool xpl::Server::is_exiting()
280 {
281   return mysqld::is_terminating() || xpl::Server::exiting;
282 }
283 
284 
main(MYSQL_PLUGIN p)285 int xpl::Server::main(MYSQL_PLUGIN p)
286 {
287   xpl::plugin_handle = p;
288 
289   uint32 listen_backlog = 50 + Plugin_system_variables::max_connections / 5;
290   if (listen_backlog > 900)
291     listen_backlog= 900;
292 
293   try
294   {
295     Global_status_variables::instance().reset();
296 
297     ngs::shared_ptr<ngs::Scheduler_dynamic> thd_scheduler(ngs::allocate_shared<Session_scheduler>("work", p));
298 
299     Plugin_system_variables::setup_system_variable_from_env_or_compile_opt(
300         Plugin_system_variables::socket,
301         "MYSQLX_UNIX_PORT",
302         MYSQLX_UNIX_ADDR);
303 
304     Listener_factory listener_factory;
305     ngs::shared_ptr<ngs::Server_acceptors> acceptors(ngs::allocate_shared<ngs::Server_acceptors>(
306          ngs::ref(listener_factory),
307          Plugin_system_variables::bind_address,
308          Plugin_system_variables::port,
309          Plugin_system_variables::port_open_timeout,
310          Plugin_system_variables::socket,
311          listen_backlog));
312 
313     instance_rwl.wlock();
314 
315     exiting = false;
316     instance = ngs::allocate_object<Server>(acceptors, thd_scheduler, ngs::allocate_shared<ngs::Protocol_config>());
317 
318     const bool use_only_through_secure_connection = true, use_only_in_non_secure_connection = false;
319 
320     instance->server().add_authentication_mechanism("PLAIN",   Sasl_plain_auth::create,   use_only_through_secure_connection);
321     instance->server().add_authentication_mechanism("MYSQL41", Sasl_mysql41_auth::create, use_only_in_non_secure_connection);
322     instance->server().add_authentication_mechanism("MYSQL41", Sasl_mysql41_auth::create, use_only_through_secure_connection);
323 
324     instance->plugin_system_variables_changed();
325 
326     thd_scheduler->set_monitor(ngs::allocate_object<Worker_scheduler_monitor>());
327     thd_scheduler->launch();
328     instance->m_nscheduler->launch();
329 
330     xpl::Plugin_system_variables::registry_callback(ngs::bind(&Server::plugin_system_variables_changed, instance));
331 
332     instance->m_nscheduler->post(ngs::bind(&Server::net_thread, instance));
333 
334     instance_rwl.unlock();
335   }
336   catch(const std::exception &e)
337   {
338     if (instance)
339       instance->server().start_failed();
340     instance_rwl.unlock();
341     my_plugin_log_message(&xpl::plugin_handle, MY_ERROR_LEVEL, "Startup failed with error \"%s\"", e.what());
342     return 1;
343   }
344 
345   return 0;
346 }
347 
348 
exit(MYSQL_PLUGIN p)349 int xpl::Server::exit(MYSQL_PLUGIN p)
350 {
351   // this flag will trigger the on_verify_server_state() timer to trigger an acceptor thread exit
352   exiting = true;
353   my_plugin_log_message(&xpl::plugin_handle, MY_INFORMATION_LEVEL, "Exiting");
354 
355   if (instance)
356   {
357     // Following writelock sometimes blocks network thread in  bool Server::on_net_startup()
358     // and call to  self->server().stop() wait for network thread to exit
359     // thus its going hand forever. Still we already changed the value of instance. Thus we should exit
360     // successful
361     instance->server().stop();
362     instance->m_nscheduler->stop();
363 
364     xpl::Plugin_system_variables::clean_callbacks();
365 
366     // This is needed to clean up internal data from protobuf, but
367     // once it's called, protobuf can't be used again (and we'll
368     // probably crash if the plugin is reloaded)
369     //
370     // Ideally, this would only be called when the server exits.
371     //google::protobuf::ShutdownProtobufLibrary();
372   }
373 
374   {
375     ngs::RWLock_writelock slock(instance_rwl);
376     ngs::free_object(instance);
377     instance = NULL;
378   }
379 
380   my_plugin_log_message(&xpl::plugin_handle, MY_INFORMATION_LEVEL, "Exit done");
381   return 0;
382 }
383 
verify_mysqlx_user_grants(Sql_data_context & context)384 void xpl::Server::verify_mysqlx_user_grants(Sql_data_context &context)
385 {
386   Sql_data_result  sql_result(context);
387   int              num_of_grants = 0;
388   bool             has_no_privileges   = false;
389   bool             has_select_on_mysql_user = false;
390   bool             has_super = false;
391 
392   // This method checks if mysqlxsys has correct permissions to
393   // access mysql.user table and the SUPER privilege (for killing sessions)
394   // There are three possible states:
395   // 1) User has permissions to the table but no SUPER
396   // 2) User has permissions to the table and SUPER
397   // 2) User has no permissions, thus previous try of
398   //    creation failed, account is accepted and GRANTS should be
399   //    applied again
400 
401   std::string            grants;
402   std::string::size_type p;
403 
404   sql_result.query("SHOW GRANTS FOR " MYSQLXSYS_ACCOUNT);
405 
406   do
407   {
408     sql_result.get_next_field(grants);
409     ++num_of_grants;
410     if (grants == "GRANT USAGE ON *.* TO `" MYSQL_SESSION_USER "`@`" MYSQLXSYS_HOST "`")
411       has_no_privileges = true;
412 
413     bool on_all_schemas = false;
414 
415     if ((p = grants.find("ON *.*")) != std::string::npos)
416     {
417       grants.resize(p); // truncate the non-priv list part of the string
418       on_all_schemas = true;
419     }
420     else if ((p = grants.find("ON `mysql`.*")) != std::string::npos ||
421         (p = grants.find("ON `mysql`.`user`")) != std::string::npos)
422       grants.resize(p); // truncate the non-priv list part of the string
423     else
424       continue;
425 
426     if (grants.find(" ALL ") != std::string::npos)
427     {
428       has_select_on_mysql_user = true;
429       if (on_all_schemas)
430         has_super = true;
431     }
432     if (grants.find(" SELECT ") != std::string::npos ||
433         grants.find(" SELECT,") != std::string::npos)
434       has_select_on_mysql_user = true;
435     if (grants.find(" SUPER ") != std::string::npos)
436       has_super = true;
437   } while (sql_result.next_row());
438 
439   if (has_select_on_mysql_user && has_super)
440   {
441     log_info("Using %s account for authentication which has all required permissions", MYSQLXSYS_ACCOUNT);
442     return;
443   }
444 
445   // If user has no permissions (only default) or only SELECT on mysql.user
446   // lets accept it, and apply the grants
447   if (has_no_privileges && (num_of_grants == 1 || (num_of_grants == 2 && has_select_on_mysql_user)))
448   {
449     log_info("Using existing %s account for authentication. Incomplete grants will be fixed", MYSQLXSYS_ACCOUNT);
450     throw ngs::Error(ER_X_MYSQLX_ACCOUNT_MISSING_PERMISSIONS, "%s account without any grants", MYSQLXSYS_ACCOUNT);
451   }
452 
453   // Users with some custom grants and without access to mysql.user should be rejected
454   throw ngs::Error(ER_X_BAD_CONFIGURATION, "%s account already exists but does not have the expected grants", MYSQLXSYS_ACCOUNT);
455 }
456 
net_thread()457 void xpl::Server::net_thread()
458 {
459   srv_session_init_thread(xpl::plugin_handle);
460 
461 #if defined(__APPLE__)
462   pthread_setname_np("xplugin_acceptor");
463 #elif defined(HAVE_PTHREAD_SETNAME_NP)
464   pthread_setname_np(pthread_self(), "xplugin_acceptor");
465 #endif
466 
467   if (on_net_startup())
468   {
469     log_info("Server starts handling incoming connections");
470     m_server.start();
471     log_info("Stopped handling incoming connections");
472   }
473 
474   ssl_wrapper_thread_cleanup();
475 
476   srv_session_deinit_thread();
477 }
478 
479 
choose_ssl_config(const bool mysqld_have_ssl,const xpl::Ssl_config & mysqld_ssl,const xpl::Ssl_config & mysqlx_ssl)480 static xpl::Ssl_config choose_ssl_config(const bool mysqld_have_ssl,
481     const xpl::Ssl_config &mysqld_ssl,
482     const xpl::Ssl_config & mysqlx_ssl)
483 {
484   if (!mysqlx_ssl.is_configured() && mysqld_have_ssl)
485   {
486     my_plugin_log_message(&xpl::plugin_handle, MY_INFORMATION_LEVEL,
487         "Using SSL configuration from MySQL Server");
488 
489     return mysqld_ssl;
490   }
491 
492   if (mysqlx_ssl.is_configured())
493   {
494     my_plugin_log_message(&xpl::plugin_handle, MY_INFORMATION_LEVEL,
495         "Using SSL configuration from Mysqlx Plugin");
496     return mysqlx_ssl;
497   }
498 
499   my_plugin_log_message(&xpl::plugin_handle, MY_INFORMATION_LEVEL,
500       "Neither MySQL Server nor Mysqlx Plugin has valid SSL configuration");
501 
502   return xpl::Ssl_config();
503 }
504 
on_net_startup()505 bool xpl::Server::on_net_startup()
506 {
507   try
508   {
509     // Ensure to call the start method only once
510     if (server().is_running())
511       return true;
512 
513     Sql_data_context sql_context(NULL, true);
514 
515     if (!sql_context.wait_api_ready(&is_exiting))
516       throw ngs::Error_code(ER_X_SERVICE_ERROR, "Service isn't ready after pulling it few times");
517 
518     ngs::Error_code error = sql_context.init();
519 
520     if (error)
521       throw error;
522 
523     Sql_data_result sql_result(sql_context);
524     try {
525       sql_context.switch_to_local_user(MYSQL_SESSION_USER);
526       sql_result.query("SELECT @@skip_networking, @@skip_name_resolve, @@have_ssl='YES', @@ssl_key, "
527                        "@@ssl_ca, @@ssl_capath, @@ssl_cert, @@ssl_cipher, @@ssl_crl, @@ssl_crlpath, @@tls_version;");
528     } catch (const ngs::Error_code &error) {
529       log_error("Unable to use user mysql.session account when connecting"
530                 "the server for internal plugin requests.");
531       log_info("For more information, please see the X Plugin User Account"
532                "section in the MySQL documentation");
533       throw;
534     }
535 
536 
537     sql_context.detach();
538 
539     Ssl_config ssl_config;
540     bool mysqld_have_ssl = false;
541     bool skip_networking = false;
542     bool skip_name_resolve = false;
543     char *tls_version = NULL;
544 
545     sql_result.get_next_field(skip_networking);
546     sql_result.get_next_field(skip_name_resolve);
547     sql_result.get_next_field(mysqld_have_ssl);
548     sql_result.get_next_field(ssl_config.ssl_key);
549     sql_result.get_next_field(ssl_config.ssl_ca);
550     sql_result.get_next_field(ssl_config.ssl_capath);
551     sql_result.get_next_field(ssl_config.ssl_cert);
552     sql_result.get_next_field(ssl_config.ssl_cipher);
553     sql_result.get_next_field(ssl_config.ssl_crl);
554     sql_result.get_next_field(ssl_config.ssl_crlpath);
555     sql_result.get_next_field(tls_version);
556 
557     instance->start_verify_server_state_timer();
558 
559     ngs::Ssl_context_unique_ptr ssl_ctx(ngs::allocate_object<ngs::Ssl_context>());
560 
561     ssl_config = choose_ssl_config(mysqld_have_ssl,
562                                    ssl_config,
563                                    xpl::Plugin_system_variables::ssl_config);
564 
565     const char *crl = ssl_config.ssl_crl;
566     const char *crlpath = ssl_config.ssl_crlpath;
567 
568     const bool ssl_setup_result = ssl_ctx->setup(tls_version, ssl_config.ssl_key,
569                                                  ssl_config.ssl_ca,
570                                                  ssl_config.ssl_capath,
571                                                  ssl_config.ssl_cert,
572                                                  ssl_config.ssl_cipher,
573                                                  crl, crlpath);
574 
575     if (ssl_setup_result)
576     {
577       my_plugin_log_message(&xpl::plugin_handle, MY_INFORMATION_LEVEL,
578           "Using OpenSSL for TLS connections");
579     }
580     else
581     {
582       my_plugin_log_message(&xpl::plugin_handle, MY_INFORMATION_LEVEL,
583           "For more information, please see the Using Secure Connections with X Plugin section in the MySQL documentation.");
584     }
585 
586     if (instance->server().prepare(ngs::move(ssl_ctx), skip_networking, skip_name_resolve, true))
587         return true;
588   }
589   catch (const ngs::Error_code &e)
590   {
591     // The plugin was unloaded while waiting for service
592     if (is_exiting())
593     {
594       instance->m_server.start_failed();
595       return false;
596     }
597     log_error("%s", e.message.c_str());
598   }
599 
600   instance->server().close_all_clients();
601   instance->m_server.start_failed();
602 
603   return false;
604 }
605 
kill_client(uint64_t client_id,Session & requester)606 ngs::Error_code xpl::Server::kill_client(uint64_t client_id, Session &requester)
607 {
608   ngs::unique_ptr<Mutex_lock> lock(new Mutex_lock(server().get_client_exit_mutex()));
609   ngs::Client_ptr found_client = server().get_client_list().find(client_id);
610 
611   // Locking exit mutex of ensures that the client wont exit Client::run until
612   // the kill command ends, and shared_ptr (found_client) will be released before
613   // the exit_lock is released. Following ensures that the final instance of Clients will be
614   // released in its thread (Scheduler, Client::run).
615 
616   if (found_client &&
617       ngs::Client_interface::Client_closed != found_client->get_state())
618   {
619     xpl::Client_ptr xpl_client =  ngs::static_pointer_cast<xpl::Client>(found_client);
620 
621     if (client_id == requester.client().client_id_num())
622     {
623       lock.reset();
624       xpl_client->kill();
625       return ngs::Success();
626     }
627 
628     bool     is_session = false;
629     uint64_t mysql_session_id = 0;
630 
631     {
632       Mutex_lock lock_session_exit(xpl_client->get_session_exit_mutex());
633       ngs::shared_ptr<xpl::Session> session = xpl_client->get_session();
634 
635       is_session = NULL != session.get();
636       if (is_session)
637         mysql_session_id = session->data_context().mysql_session_id();
638     }
639 
640     if (is_session)
641     {
642       // try to kill the MySQL session
643       ngs::Error_code error = requester.data_context().execute_kill_sql_session(mysql_session_id);
644       if (error)
645         return error;
646 
647       bool is_killed = false;
648       {
649         Mutex_lock lock_session_exit(xpl_client->get_session_exit_mutex());
650         ngs::shared_ptr<xpl::Session> session = xpl_client->get_session();
651 
652         if (session)
653           is_killed = session->data_context().is_killed();
654       }
655 
656       if (is_killed)
657       {
658         xpl_client->kill();
659         return ngs::Success();
660       }
661     }
662     return ngs::Error(ER_KILL_DENIED_ERROR, "Cannot kill client %llu", static_cast<unsigned long long>(client_id));
663   }
664   return ngs::Error(ER_NO_SUCH_THREAD, "Unknown MySQLx client id %llu", static_cast<unsigned long long>(client_id));
665 }
666 
get_socket_file()667 std::string xpl::Server::get_socket_file()
668 {
669   if (!m_server.is_terminating())
670   {
671     if (!m_acceptors->was_prepared())
672       return "";
673 
674     if (m_acceptors->was_unix_socket_configured())
675     {
676       return Plugin_system_variables::socket;
677     }
678   }
679 
680   return ::STATUS_VALUE_FOR_NOT_CONFIGURED_INTERFACE;
681 }
682 
get_tcp_port()683 std::string xpl::Server::get_tcp_port()
684 {
685   if (!m_server.is_terminating())
686   {
687     if (!m_acceptors->was_prepared())
688       return "";
689 
690     std::string bind_address;
691 
692     if (m_acceptors->was_tcp_server_configured(bind_address))
693     {
694       char buffer[100];
695 
696       sprintf(buffer, "%u",Plugin_system_variables::port);
697 
698       return buffer;
699     }
700   }
701 
702   return ::STATUS_VALUE_FOR_NOT_CONFIGURED_INTERFACE;
703 }
704 
get_tcp_bind_address()705 std::string xpl::Server::get_tcp_bind_address()
706 {
707   if (!m_server.is_terminating())
708   {
709     if (!m_acceptors->was_prepared())
710       return "";
711 
712     std::string bind_address;
713 
714     if (m_acceptors->was_tcp_server_configured(bind_address))
715     {
716       return bind_address;
717     }
718   }
719 
720   return ::STATUS_VALUE_FOR_NOT_CONFIGURED_INTERFACE;
721 }
722 
723 struct Client_check_handler_thd
724 {
Client_check_handler_thdClient_check_handler_thd725   Client_check_handler_thd(THD *thd)
726   : m_thd(thd)
727   {
728   }
729 
operator ()Client_check_handler_thd730   bool operator() (ngs::Client_ptr &client)
731   {
732     xpl::Client *xpl_client = (xpl::Client *)client.get();
733 
734     return xpl_client->is_handler_thd(m_thd);
735   }
736 
737   THD *m_thd;
738 };
739 
740 
get_client_by_thd(Server_ref & server,THD * thd)741 xpl::Client_ptr xpl::Server::get_client_by_thd(Server_ref &server, THD *thd)
742 {
743   std::vector<ngs::Client_ptr> clients;
744   Client_check_handler_thd     client_check_thd(thd);
745 
746   (*server)->server().get_client_list().get_all_clients(clients);
747 
748   std::vector<ngs::Client_ptr>::iterator i = std::find_if(clients.begin(), clients.end(), client_check_thd);
749   if (clients.end() != i)
750     return ngs::dynamic_pointer_cast<Client>(*i);
751 
752   return Client_ptr();
753 }
754