1 /*
2 Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "srv_session.h"
26 #include "my_dbug.h"
27 #include "my_thread.h"
28 #include "sql_class.h"
29 #include "sql_base.h" // close_mysql_tables
30 #include "sql_connect.h" // thd_init_client_charset
31 #include "mysqld_thd_manager.h" // Global_THD_manager
32 #include "sql_audit.h" // MYSQL_AUDIT_NOTIFY_CONNECTION_CONNECT
33 #include "log.h" // Query log
34 #include "my_thread_local.h" // my_get_thread_local & my_set_thread_local
35 #include "mysqld.h" // current_thd
36 #include "sql_parse.h" // dispatch_command()
37 #include "sql_thd_internal_api.h" // thd_set_thread_stack
38 #include "mutex_lock.h"
39 #include "mysql/psi/psi.h"
40 #include "conn_handler/connection_handler_manager.h"
41 #include "sql_plugin.h"
42
43 #include <map>
44
45 /**
46 @file
47 class Srv_session implementation. See the method comments for more. Please,
48 check also srv_session.h for more information.
49 */
50
51
52 extern void thd_clear_errors(THD *thd);
53
54 static thread_local_key_t THR_stack_start_address;
55 static thread_local_key_t THR_srv_session_thread;
56 static bool srv_session_THRs_initialized= false;
57
58
59 /**
60 A simple wrapper around a RW lock:
61 Grabs the lock in the CTOR, releases it in the DTOR.
62 The lock may be NULL, in which case this is a no-op.
63
64 Based on Mutex_lock from include/mutex_lock.h
65 */
66 class Auto_rw_lock_read
67 {
68 public:
Auto_rw_lock_read(mysql_rwlock_t * lock)69 explicit Auto_rw_lock_read(mysql_rwlock_t *lock) : rw_lock(NULL)
70 {
71 if (lock && 0 == mysql_rwlock_rdlock(lock))
72 rw_lock = lock;
73 }
74
~Auto_rw_lock_read()75 ~Auto_rw_lock_read()
76 {
77 if (rw_lock)
78 mysql_rwlock_unlock(rw_lock);
79 }
80 private:
81 mysql_rwlock_t *rw_lock;
82
83 Auto_rw_lock_read(const Auto_rw_lock_read&); /* Not copyable. */
84 void operator=(const Auto_rw_lock_read&); /* Not assignable. */
85 };
86
87
88 class Auto_rw_lock_write
89 {
90 public:
Auto_rw_lock_write(mysql_rwlock_t * lock)91 explicit Auto_rw_lock_write(mysql_rwlock_t *lock) : rw_lock(NULL)
92 {
93 if (lock && 0 == mysql_rwlock_wrlock(lock))
94 rw_lock = lock;
95 }
96
~Auto_rw_lock_write()97 ~Auto_rw_lock_write()
98 {
99 if (rw_lock)
100 mysql_rwlock_unlock(rw_lock);
101 }
102 private:
103 mysql_rwlock_t *rw_lock;
104
105 Auto_rw_lock_write(const Auto_rw_lock_write&); /* Non-copyable */
106 void operator=(const Auto_rw_lock_write&); /* Non-assignable */
107 };
108
109
110 class Thread_to_plugin_map
111 {
112 private:
113 bool initted;
114 bool psi_initted;
115 mysql_mutex_t LOCK_collection;
116
117 #ifdef HAVE_PSI_INTERFACE
118 PSI_mutex_key key_LOCK_collection;
119 #endif
120 std::map<my_thread_t, const void*> collection;
121
122 public:
123 /**
124 Initializes the map
125
126 @param null_val null value to be returned when element not found in the map
127
128 @return
129 false success
130 true failure
131 */
init()132 bool init()
133 {
134 const char* category= "session";
135 PSI_mutex_info all_mutexes[]=
136 {
137 { &key_LOCK_collection, "LOCK_srv_session_threads", PSI_FLAG_GLOBAL}
138 };
139
140 initted= true;
141 #ifdef HAVE_PSI_INTERFACE
142 psi_initted= true;
143
144 mysql_mutex_register(category, all_mutexes, array_elements(all_mutexes));
145 #endif
146 mysql_mutex_init(key_LOCK_collection, &LOCK_collection, MY_MUTEX_INIT_FAST);
147
148 return false;
149 }
150
151 /**
152 Deinitializes the map
153
154 @return
155 false success
156 true failure
157 */
deinit()158 bool deinit()
159 {
160 initted= false;
161 mysql_mutex_destroy(&LOCK_collection);
162
163 return false;
164 }
165
166 /**
167 Adds a pthread to the list
168
169 @param key plugin
170
171 @return
172 false success
173 true failure
174 */
add(my_thread_t thread,const void * plugin)175 bool add(my_thread_t thread, const void *plugin)
176 {
177 Mutex_lock lock(&LOCK_collection);
178 try
179 {
180 std::map<my_thread_t, const void*>::iterator it= collection.find(thread);
181 if (it == collection.end())
182 collection[thread]= plugin;
183 }
184 catch (const std::bad_alloc &e)
185 {
186 return true;
187 }
188 return false;
189 }
190
191 /**
192 Removes a pthread from the list
193
194 @param thread Thread to remove from the list
195
196 @return
197 false success
198 true failure
199 */
remove(my_thread_t thread)200 unsigned int remove(my_thread_t thread)
201 {
202 Mutex_lock lock(&LOCK_collection);
203 std::map<my_thread_t, const void*>::iterator it= collection.find(thread);
204 if (it != collection.end())
205 {
206 collection.erase(it);
207 return false;
208 }
209 return true;
210 }
211
212 /**
213 Empties the map
214
215 @return
216 false success
217 true failure
218 */
clear()219 bool clear()
220 {
221 Mutex_lock lock(&LOCK_collection);
222 collection.clear();
223 return false;
224 }
225
226 /**
227 Returns the number of all threads
228 */
size()229 unsigned int size()
230 {
231 Mutex_lock lock(&LOCK_collection);
232 return collection.size();
233 }
234
235 /**
236 Returns the number threads for a plugin
237
238 @param plugin The plugin for which we need info.
239 */
count(const void * plugin)240 unsigned int count(const void *plugin)
241 {
242 if (!plugin)
243 return size();
244
245 unsigned int ret= 0;
246 Mutex_lock lock(&LOCK_collection);
247 std::map<my_thread_t, const void*>::iterator it= collection.begin();
248 for (; it != collection.end(); ++it)
249 {
250 if (it->second == plugin)
251 ++ret;
252 }
253 return ret;
254 }
255
256 /**
257 Kills all threads associated with a plugin
258
259 @param plugin The plugin for which we need info.
260 */
kill(const void * plugin)261 unsigned int kill(const void *plugin)
262 {
263 std::list<my_thread_t> to_remove;
264
265 Mutex_lock lock(&LOCK_collection);
266
267 for (std::map<my_thread_t, const void*>::iterator it= collection.begin();
268 it != collection.end();
269 ++it)
270 {
271 if (!plugin || (it->second == plugin))
272 {
273 to_remove.push_back(it->first);
274
275 my_thread_handle thread;
276 #ifndef _WIN32
277 /*
278 On Windows we need HANDLE to cancel a thread.
279 Win32 API's GetCurrentThread() returns something which seems the
280 same in every thread, thus unusable as a key.
281 GetCurrentThreadId returns an ID (DWORD), but it can't be used with
282 my_thread_cancel() which calls TerminateThread() on Windows.
283 TerminateThread() needs a Handle.
284 Therefore this killing functionality is now only for Posix Threads
285 until there is a solution for Windows.
286 */
287 thread.thread= it->first;
288 sql_print_error("Killing thread %lu", (unsigned long) it->first);
289 if (!my_thread_cancel(&thread))
290 {
291 void *dummy_retval;
292 my_thread_join(&thread, &dummy_retval);
293 }
294 #endif
295 }
296 }
297 if (to_remove.size())
298 {
299 for (std::list<my_thread_t>::iterator it= to_remove.begin();
300 it != to_remove.end();
301 ++it)
302 collection.erase(*it);
303 }
304 return to_remove.size();
305 }
306 };
307
308 /**
309 std::map of THD* as key and Srv_session* as value guarded by a read-write lock.
310 RW lock is used instead of a mutex, as find() is a hot spot due to the sanity
311 checks it is used for - when a pointer to a closed session is passed.
312 */
313 class Mutexed_map_thd_srv_session
314 {
315 public:
316 class Do_Impl
317 {
318 public:
~Do_Impl()319 virtual ~Do_Impl() {}
320 /**
321 Work on the session
322
323 @return
324 false Leave the session in the map
325 true Remove the session from the map
326 */
327 virtual bool operator()(Srv_session*) = 0;
328 };
329
330 private:
331 /*
332 The first type in the tuple should be the key type of
333 Thread_to_plugin_map
334 */
335 typedef std::pair<const void*, Srv_session*> map_value_t;
336
337 std::map<const THD*, map_value_t> collection;
338
339 bool initted;
340 bool psi_initted;
341
342 mysql_rwlock_t LOCK_collection;
343
344 #ifdef HAVE_PSI_INTERFACE
345 PSI_rwlock_key key_LOCK_collection;
346 #endif
347
348 public:
349 /**
350 Initializes the map
351
352 @param null_val null value to be returned when element not found in the map
353
354 @return
355 false success
356 true failure
357 */
init()358 bool init()
359 {
360 const char* category= "session";
361 PSI_rwlock_info all_rwlocks[]=
362 {
363 { &key_LOCK_collection, "LOCK_srv_session_collection", PSI_FLAG_GLOBAL}
364 };
365
366 initted= true;
367 #ifdef HAVE_PSI_INTERFACE
368 psi_initted= true;
369
370 mysql_rwlock_register(category, all_rwlocks, array_elements(all_rwlocks));
371 #endif
372 mysql_rwlock_init(key_LOCK_collection, &LOCK_collection);
373
374 return false;
375 }
376
377 /**
378 Deinitializes the map
379
380 @return
381 false success
382 true failure
383 */
deinit()384 bool deinit()
385 {
386 initted= false;
387 mysql_rwlock_destroy(&LOCK_collection);
388
389 return false;
390 }
391
392 /**
393 Searches for an element with in the map
394
395 @param key Key of the element
396
397 @return
398 value of the element
399 NULL if not found
400 */
find(const THD * key)401 Srv_session* find(const THD* key)
402 {
403 Auto_rw_lock_read lock(&LOCK_collection);
404
405 std::map<const THD*, map_value_t>::iterator it= collection.find(key);
406 return (it != collection.end())? it->second.second : NULL;
407 }
408
409 /**
410 Add an element to the map
411
412 @param key key
413 @param plugin secondary key
414 @param value value
415
416 @return
417 false success
418 true failure
419 */
add(const THD * key,const void * plugin,Srv_session * session)420 bool add(const THD* key, const void *plugin, Srv_session *session)
421 {
422 Auto_rw_lock_write lock(&LOCK_collection);
423 try
424 {
425 collection[key]= std::make_pair(plugin, session);
426 }
427 catch (const std::bad_alloc &e)
428 {
429 return true;
430 }
431 return false;
432 }
433
434 /**
435 Removes an element from the map.
436
437 @param key key
438
439 @return
440 false success
441 true failure
442 */
remove(const THD * key)443 bool remove(const THD* key)
444 {
445 Auto_rw_lock_write lock(&LOCK_collection);
446 /*
447 If we use erase with the key directly an exception could be thrown. The
448 find method never throws. erase() with iterator as parameter also never
449 throws.
450 */
451 std::map<const THD*, map_value_t>::iterator it= collection.find(key);
452 if (it != collection.end())
453 collection.erase(it);
454 return false;
455 }
456
457 /**
458 Removes all elements which have been added with plugin as plugin name.
459
460 @param plugin key
461 @param removed OUT Number of removed elements
462 */
remove_all_of_plugin(const void * plugin,unsigned int & removed)463 void remove_all_of_plugin(const void *plugin, unsigned int &removed)
464 {
465 std::list<Srv_session *> to_close;
466 removed= 0;
467
468 {
469 Auto_rw_lock_write lock(&LOCK_collection);
470
471 for (std::map<const THD*, map_value_t>::iterator it= collection.begin();
472 it != collection.end();
473 ++it)
474 {
475 if (it->second.first == plugin)
476 to_close.push_back(it->second.second);
477 }
478 }
479
480 /* Outside of the lock as Srv_session::close() will try to
481 remove itself from the list*/
482 if ((removed= to_close.size()))
483 {
484 for (std::list<Srv_session *>::iterator it= to_close.begin();
485 it != to_close.end();
486 ++it)
487 {
488 Srv_session *session= *it;
489 session->detach();
490 session->close();
491 delete session;
492 }
493 }
494 }
495
496 /**
497 Empties the map
498
499 @return
500 false success
501 true failure
502 */
clear()503 bool clear()
504 {
505 Auto_rw_lock_write lock(&LOCK_collection);
506 collection.clear();
507 return false;
508 }
509
510 /**
511 Returns the number of elements in the maps
512 */
size()513 unsigned int size()
514 {
515 Auto_rw_lock_read lock(&LOCK_collection);
516 return collection.size();
517 }
518 };
519
520 static Mutexed_map_thd_srv_session server_session_list;
521 static Thread_to_plugin_map server_session_threads;
522
523 /**
524 Constructs a session state object for Srv_session::execute_command.
525 Saves state. Uses RAII.
526
527 @param sess Session to backup
528 */
529 Srv_session::
Session_backup_and_attach(Srv_session * sess,bool is_close_session)530 Session_backup_and_attach::Session_backup_and_attach(Srv_session *sess,
531 bool is_close_session)
532 :session(sess),
533 old_session(NULL),
534 in_close_session(is_close_session)
535 {
536 THD *c_thd= current_thd;
537 const void *is_srv_session_thread= my_get_thread_local(THR_srv_session_thread);
538 backup_thd= is_srv_session_thread? NULL:c_thd;
539
540 if (is_srv_session_thread && c_thd && c_thd != &session->thd)
541 {
542 if ((old_session= server_session_list.find(c_thd)))
543 old_session->detach();
544 }
545
546 attach_error= session->attach();
547 }
548
549
550 /**
551 Destructs the session state object. In other words it restores to
552 previous state.
553 */
~Session_backup_and_attach()554 Srv_session::Session_backup_and_attach::~Session_backup_and_attach()
555 {
556 if (backup_thd)
557 {
558 session->detach();
559 backup_thd->store_globals();
560 #ifdef HAVE_PSI_THREAD_INTERFACE
561 enum_vio_type vio_type= backup_thd->get_vio_type();
562 if (vio_type != NO_VIO_TYPE)
563 PSI_THREAD_CALL(set_connection_type)(vio_type);
564 #endif /* HAVE_PSI_THREAD_INTERFACE */
565 }
566 else if (in_close_session)
567 {
568 /*
569 We should restore the old session only in case of close.
570 In case of execute we should stay attached.
571 */
572 session->detach();
573 if (old_session)
574 old_session->attach();
575 }
576 }
577
578
err_start_result_metadata(void *,uint,uint,const CHARSET_INFO *)579 static int err_start_result_metadata(void*, uint, uint, const CHARSET_INFO *)
580 {
581 return 1;
582 }
583
err_field_metadata(void *,struct st_send_field *,const CHARSET_INFO *)584 static int err_field_metadata(void*, struct st_send_field*, const CHARSET_INFO*)
585 {
586 return 1;
587 }
588
err_end_result_metadata(void *,uint,uint)589 static int err_end_result_metadata(void*, uint, uint)
590 {
591 return 1;
592 }
593
err_start_row(void *)594 static int err_start_row(void*)
595 {
596 return 1;
597 }
598
err_end_row(void *)599 static int err_end_row(void*)
600 {
601 return 1;
602 }
603
err_abort_row(void *)604 static void err_abort_row(void*)
605 {
606 }
607
err_get_client_capabilities(void *)608 static ulong err_get_client_capabilities(void*)
609 {
610 return 0;
611 }
612
err_get_null(void *)613 static int err_get_null(void*)
614 {
615 return 1;
616 }
617
err_get_integer(void *,longlong)618 static int err_get_integer(void*, longlong)
619 {
620 return 1;
621 }
622
err_get_longlong(void *,longlong,uint)623 static int err_get_longlong(void*, longlong, uint)
624 {
625 return 1;
626 }
627
err_get_decimal(void *,const decimal_t *)628 static int err_get_decimal(void*, const decimal_t*)
629 {
630 return 1;
631 }
632
err_get_double(void *,double,uint32)633 static int err_get_double(void*, double, uint32)
634 {
635 return 1;
636 }
637
err_get_date(void *,const MYSQL_TIME *)638 static int err_get_date(void*, const MYSQL_TIME*)
639 {
640 return 1;
641 }
642
err_get_time(void *,const MYSQL_TIME *,uint)643 static int err_get_time(void*, const MYSQL_TIME*, uint)
644 {
645 return 1;
646 }
647
err_get_datetime(void *,const MYSQL_TIME *,uint)648 static int err_get_datetime(void*, const MYSQL_TIME*, uint)
649 {
650 return 1;
651 }
652
653
err_get_string(void *,const char *,size_t,const CHARSET_INFO *)654 static int err_get_string(void*, const char*, size_t,const CHARSET_INFO*)
655 {
656 return 1;
657 }
658
err_handle_ok(void * ctx,uint server_status,uint warn_count,ulonglong affected_rows,ulonglong last_insert_id,const char * const message)659 static void err_handle_ok(void * ctx, uint server_status, uint warn_count,
660 ulonglong affected_rows, ulonglong last_insert_id,
661 const char * const message)
662 {
663 Srv_session::st_err_protocol_ctx *pctx=
664 static_cast<Srv_session::st_err_protocol_ctx*>(ctx);
665 if (pctx && pctx->handler)
666 {
667 char buf[256];
668 my_snprintf(buf, sizeof(buf),
669 "OK status=%u warnings=%u affected=%llu last_id=%llu",
670 server_status, warn_count, affected_rows, last_insert_id);
671 pctx->handler(pctx->handler_context, 0, buf);
672 }
673 }
674
err_handle_error(void * ctx,uint err_errno,const char * err_msg,const char * sqlstate)675 static void err_handle_error(void * ctx, uint err_errno, const char * err_msg,
676 const char * sqlstate)
677 {
678 Srv_session::st_err_protocol_ctx *pctx=
679 static_cast<Srv_session::st_err_protocol_ctx*>(ctx);
680 if (pctx && pctx->handler)
681 pctx->handler(pctx->handler_context, err_errno, err_msg);
682 }
683
err_shutdown(void *,int server_shutdown)684 static void err_shutdown(void*, int server_shutdown){}
685
686
687 const struct st_command_service_cbs error_protocol_callbacks=
688 {
689 err_start_result_metadata,
690 err_field_metadata,
691 err_end_result_metadata,
692 err_start_row,
693 err_end_row,
694 err_abort_row,
695 err_get_client_capabilities,
696 err_get_null,
697 err_get_integer,
698 err_get_longlong,
699 err_get_decimal,
700 err_get_double,
701 err_get_date,
702 err_get_time,
703 err_get_datetime,
704 err_get_string,
705 err_handle_ok,
706 err_handle_error,
707 err_shutdown
708 };
709
710
711 /**
712 Modifies the PSI structures to (de)install a THD
713
714 @param thd THD
715 */
set_psi(THD * thd)716 static void set_psi(THD *thd)
717 {
718 #ifdef HAVE_PSI_THREAD_INTERFACE
719 struct PSI_thread *psi= PSI_THREAD_CALL(get_thread)();
720 PSI_THREAD_CALL(set_thread_id)(psi, thd? thd->thread_id() : 0);
721 PSI_THREAD_CALL(set_thread_THD)(psi, thd);
722 #endif
723 }
724
725
726 /**
727 Initializes physical thread to use with session service.
728
729 @param plugin Pointer to the plugin structure, passed to the plugin over
730 the plugin init function.
731
732 @return
733 false success
734 true failure
735 */
init_thread(const void * plugin)736 bool Srv_session::init_thread(const void *plugin)
737 {
738 int stack_start;
739 if (my_thread_init() ||
740 my_set_thread_local(THR_srv_session_thread, const_cast<void*>(plugin)) ||
741 my_set_thread_local(THR_stack_start_address, &stack_start))
742 {
743 connection_errors_internal++;
744 return true;
745 }
746
747 server_session_threads.add(my_thread_self(), plugin);
748
749 return false;
750 }
751
752
753 /**
754 Looks if there is currently attached session and detaches it.
755
756 @param plugin The plugin to be checked
757 */
close_currently_attached_session_if_any(const st_plugin_int * plugin)758 static void close_currently_attached_session_if_any(const st_plugin_int *plugin)
759 {
760 THD *c_thd= current_thd;
761 if (!c_thd)
762 return;
763
764 Srv_session* current_session= server_session_list.find(c_thd);
765
766 if (current_session)
767 {
768 sql_print_error("Plugin %s is deinitializing a thread but "
769 "left a session attached. Detaching it forcefully.",
770 plugin->name.str);
771
772 if (current_session->detach())
773 sql_print_error("Failed to detach the session.");
774 }
775 }
776
777
778 /**
779 Looks if the plugin has any non-closed sessions and closes them forcefully
780
781 @param plugin The plugin to be checked
782 */
close_all_sessions_of_plugin_if_any(const st_plugin_int * plugin)783 static void close_all_sessions_of_plugin_if_any(const st_plugin_int *plugin)
784 {
785 unsigned int removed_count;
786
787 server_session_list.remove_all_of_plugin(plugin, removed_count);
788
789 if (removed_count)
790 sql_print_error("Closed forcefully %u session%s left opened by plugin %s",
791 removed_count, (removed_count > 1)? "s":"",
792 plugin? plugin->name.str : "SERVER_INTERNAL");
793 }
794
795
796 /**
797 Deinitializes physical thread to use with session service
798 */
deinit_thread()799 void Srv_session::deinit_thread()
800 {
801 const st_plugin_int *plugin= static_cast<const st_plugin_int *>(
802 my_get_thread_local(THR_srv_session_thread));
803 if (plugin)
804 close_currently_attached_session_if_any(plugin);
805
806 if (server_session_threads.remove(my_thread_self()))
807 sql_print_error("Failed to decrement the number of threads");
808
809 if (!server_session_threads.count(plugin))
810 close_all_sessions_of_plugin_if_any(plugin);
811
812 my_set_thread_local(THR_srv_session_thread, NULL);
813
814 assert(my_get_thread_local(THR_stack_start_address));
815 my_set_thread_local(THR_stack_start_address, NULL);
816 my_thread_end();
817 }
818
819
820 /**
821 Checks if a plugin has left threads and sessions
822
823 @param plugin The plugin to be checked
824 */
check_for_stale_threads(const st_plugin_int * plugin)825 void Srv_session::check_for_stale_threads(const st_plugin_int *plugin)
826 {
827 if (!plugin)
828 return;
829
830 unsigned int thread_count= server_session_threads.count(plugin);
831 if (thread_count)
832 {
833 close_all_sessions_of_plugin_if_any(plugin);
834
835 sql_print_error("Plugin %s did not deinitialize %u threads",
836 plugin->name.str, thread_count);
837
838 unsigned int killed_count= server_session_threads.kill(plugin);
839 sql_print_error("Killed %u threads of plugin %s",
840 killed_count, plugin->name.str);
841 }
842 }
843
844
845 /**
846 Inits the module
847
848 @return
849 false success
850 true failure
851 */
module_init()852 bool Srv_session::module_init()
853 {
854 if (srv_session_THRs_initialized)
855 return false;
856
857 if (my_create_thread_local_key(&THR_stack_start_address, NULL) ||
858 my_create_thread_local_key(&THR_srv_session_thread, NULL))
859 {
860 sql_print_error("Can't create thread key for SQL session service");
861 return true;
862 }
863 srv_session_THRs_initialized= true;
864
865 server_session_list.init();
866 server_session_threads.init();
867
868 return false;
869 }
870
871
872 /**
873 Deinits the module.
874
875 Never fails
876
877 @return
878 false success
879 */
module_deinit()880 bool Srv_session::module_deinit()
881 {
882 if (srv_session_THRs_initialized)
883 {
884 srv_session_THRs_initialized= false;
885 (void) my_delete_thread_local_key(THR_stack_start_address);
886 (void) my_delete_thread_local_key(THR_srv_session_thread);
887
888 server_session_list.clear();
889 server_session_list.deinit();
890
891 server_session_threads.clear();
892 server_session_threads.deinit();
893
894 srv_session_THRs_initialized= false;
895 }
896 return false;
897 }
898
899
900 /**
901 Checks if the session is valid.
902
903 Checked is if session is NULL, or in the list of opened sessions. If the
904 session is not in this list it was either closed or the address is invalid.
905
906 @return
907 true valid
908 false not valid
909 */
is_valid(const Srv_session * session)910 bool Srv_session::is_valid(const Srv_session *session)
911 {
912 const THD *thd= session?
913 reinterpret_cast<const THD*>(session + my_offsetof(Srv_session, thd)):
914 NULL;
915 return thd? (bool) server_session_list.find(thd) : false;
916 }
917
918
919 /**
920 Constructs a server session
921
922 @param error_cb Default completion callback
923 @param err_cb_ctx Plugin's context, opaque pointer that would
924 be provided to callbacks. Might be NULL.
925 */
Srv_session(srv_session_error_cb err_cb,void * err_cb_ctx)926 Srv_session::Srv_session(srv_session_error_cb err_cb, void *err_cb_ctx) :
927 da(false), err_protocol_ctx(err_cb, err_cb_ctx),
928 protocol_error(&error_protocol_callbacks, CS_TEXT_REPRESENTATION,
929 (void*)&err_protocol_ctx),
930 state(SRV_SESSION_CREATED), vio_type(NO_VIO_TYPE)
931 {
932 thd.mark_as_srv_session();
933 }
934
935
936 /**
937 Opens a server session
938
939 @return
940 false on success
941 true on failure
942 */
open()943 bool Srv_session::open()
944 {
945 DBUG_ENTER("Srv_session::open");
946
947 DBUG_PRINT("info",("Session=%p THD=%p DA=%p", this, &thd, &da));
948 assert(state == SRV_SESSION_CREATED || state == SRV_SESSION_CLOSED);
949
950 thd.set_protocol(&protocol_error);
951 thd.push_diagnostics_area(&da);
952 /*
953 thd.stack_start will be set once we start attempt to attach.
954 store_globals() will check for it, so we will set it beforehand.
955
956 No store_globals() here as the session is always created in a detached
957 state. Attachment with store_globals() will happen on demand.
958 */
959 if (thd_init_client_charset(&thd, my_charset_utf8_general_ci.number))
960 {
961 connection_errors_internal++;
962 if (err_protocol_ctx.handler)
963 err_protocol_ctx.handler(err_protocol_ctx.handler_context,
964 ER_OUT_OF_RESOURCES,
965 ER_DEFAULT(ER_OUT_OF_RESOURCES));
966 Connection_handler_manager::dec_connection_count();
967 DBUG_RETURN(true);
968 }
969
970 thd.update_charset();
971
972 thd.set_new_thread_id();
973
974 DBUG_PRINT("info", ("thread_id=%d", thd.thread_id()));
975
976 thd.set_time();
977 thd.thr_create_utime= thd.start_utime= my_micro_time();
978
979 /*
980 Disable QC - plugins will most probably install their own protocol
981 and it won't be compatible with the QC. In addition, Protocol_error
982 is not compatible with the QC.
983 */
984 thd.variables.query_cache_type = 0;
985
986 thd.set_command(COM_SLEEP);
987 thd.init_for_queries();
988
989 Global_THD_manager::get_instance()->add_thd(&thd);
990
991 const void *plugin= my_get_thread_local(THR_srv_session_thread);
992
993 server_session_list.add(&thd, plugin, this);
994
995 if (mysql_audit_notify(&thd,
996 AUDIT_EVENT(MYSQL_AUDIT_CONNECTION_PRE_AUTHENTICATE)))
997 {
998 Connection_handler_manager::dec_connection_count();
999 DBUG_RETURN(true);
1000 }
1001
1002 DBUG_RETURN(false);
1003 }
1004
1005
1006 /**
1007 Attaches the session to the current physical thread
1008
1009 @param session Session handle
1010
1011 @returns
1012 false success
1013 true failure
1014 */
attach()1015 bool Srv_session::attach()
1016 {
1017 const bool first_attach= (state == SRV_SESSION_CREATED);
1018 DBUG_ENTER("Srv_session::attach");
1019 DBUG_PRINT("info",("current_thd=%p", current_thd));
1020
1021 if (is_attached())
1022 {
1023 if (!my_thread_equal(thd.real_id, my_thread_self()))
1024 {
1025 DBUG_PRINT("error", ("Attached to different thread. Detach in it"));
1026 DBUG_RETURN(true);
1027 }
1028 /* As it is attached, no need to do anything */
1029 DBUG_RETURN(false);
1030 }
1031
1032 if (&thd == current_thd)
1033 DBUG_RETURN(false);
1034
1035 THD *old_thd= current_thd;
1036 DBUG_PRINT("info",("current_thd=%p", current_thd));
1037
1038 if (old_thd)
1039 old_thd->restore_globals();
1040
1041 DBUG_PRINT("info",("current_thd=%p", current_thd));
1042
1043 const char *new_stack= my_get_thread_local(THR_srv_session_thread)?
1044 (const char*)my_get_thread_local(THR_stack_start_address):
1045 (old_thd? old_thd->thread_stack : NULL);
1046
1047 /*
1048 Attach optimistically, as this will set thread_stack,
1049 which needed by store_globals()
1050 */
1051 set_attached(new_stack);
1052
1053 // This will install our new THD object as current_thd
1054 if (thd.store_globals())
1055 {
1056 DBUG_PRINT("error", ("Error while storing globals"));
1057
1058 if (old_thd)
1059 old_thd->store_globals();
1060
1061 set_psi(old_thd);
1062
1063 set_detached();
1064 DBUG_RETURN(true);
1065 }
1066 Srv_session* old_session= server_session_list.find(old_thd);
1067
1068 /* Really detach only if we are sure everything went fine */
1069 if (old_session)
1070 old_session->set_detached();
1071
1072 thd_clear_errors(&thd);
1073
1074 set_psi(&thd);
1075
1076 #ifdef HAVE_PSI_THREAD_INTERFACE
1077 PSI_THREAD_CALL(set_connection_type)(vio_type != NO_VIO_TYPE?
1078 vio_type : thd.get_vio_type());
1079 #endif /* HAVE_PSI_THREAD_INTERFACE */
1080
1081 if (first_attach)
1082 {
1083 /*
1084 At first attach the security context should have been already set and
1085 and this will report corect information.
1086 */
1087 if (mysql_audit_notify(&thd, AUDIT_EVENT(MYSQL_AUDIT_CONNECTION_CONNECT)))
1088 DBUG_RETURN(true);
1089 query_logger.general_log_print(&thd, COM_CONNECT, NullS);
1090 }
1091
1092 DBUG_RETURN(false);
1093 }
1094
1095
1096 /**
1097 Detaches the session from the current physical thread.
1098
1099 @returns
1100 false success
1101 true failure
1102 */
detach()1103 bool Srv_session::detach()
1104 {
1105 DBUG_ENTER("Srv_session::detach");
1106
1107 if (!is_attached())
1108 DBUG_RETURN(false);
1109
1110 if (!my_thread_equal(thd.real_id, my_thread_self()))
1111 {
1112 DBUG_PRINT("error", ("Attached to a different thread. Detach in it"));
1113 DBUG_RETURN(true);
1114 }
1115
1116 DBUG_PRINT("info",("Session=%p THD=%p current_thd=%p",
1117 this, &thd, current_thd));
1118
1119 assert(&thd == current_thd);
1120 thd.restore_globals();
1121
1122 set_psi(NULL);
1123 /*
1124 We can't call PSI_THREAD_CALL(set_connection_type)(NO_VIO_TYPE) here because
1125 it will assert. Thus, it will be possible to have a physical thread, which
1126 has no session attached to it but have cached vio type.
1127 This only will happen in a spawned thread initialized by this service.
1128 If a server initialized thread is used, just after detach the previous
1129 current_thd will be re-attached again (not created by our service) and
1130 the vio type will be set correctly.
1131 Please see: Session_backup_and_attach::~Session_backup_and_attach()
1132 */
1133
1134 /*
1135 Call after restore_globals() as it will check the stack_addr, which is
1136 nulled by set_detached()
1137 */
1138 set_detached();
1139 DBUG_RETURN(false);
1140 }
1141
1142
1143 /**
1144 Closes the session
1145
1146 @returns
1147 false Session successfully closed
1148 true No such session exists / Session is attached to a different thread
1149 */
close()1150 bool Srv_session::close()
1151 {
1152 DBUG_ENTER("Srv_session::close");
1153
1154 DBUG_PRINT("info",("Session=%p THD=%p current_thd=%p",
1155 this, &thd, current_thd));
1156
1157 assert(state < SRV_SESSION_CLOSED);
1158
1159 /*
1160 RAII
1161 We store the state (the currently attached session, if different than
1162 our and then attach our.
1163 The destructor will attach the session we detached.
1164 */
1165
1166 Srv_session::Session_backup_and_attach backup(this, true);
1167
1168 if (backup.attach_error)
1169 DBUG_RETURN(true);
1170
1171 state= SRV_SESSION_CLOSED;
1172
1173 server_session_list.remove(&thd);
1174
1175 /*
1176 Log to general log must happen before release_resources() as
1177 current_thd will be different then.
1178 */
1179 query_logger.general_log_print(&thd, COM_QUIT, NullS);
1180 mysql_audit_notify(&thd, AUDIT_EVENT(MYSQL_AUDIT_CONNECTION_DISCONNECT), 0);
1181
1182 close_mysql_tables(&thd);
1183
1184 thd.pop_diagnostics_area();
1185
1186 thd.get_stmt_da()->reset_diagnostics_area();
1187
1188 thd.disconnect();
1189
1190 set_psi(NULL);
1191
1192 thd.release_resources();
1193
1194 Global_THD_manager::get_instance()->remove_thd(&thd);
1195
1196 Connection_handler_manager::dec_connection_count();
1197
1198 DBUG_RETURN(false);
1199 }
1200
1201
1202 /**
1203 Sets session's state to attached
1204
1205 @param stack New stack address
1206 */
set_attached(const char * stack)1207 void Srv_session::set_attached(const char *stack)
1208 {
1209 state= SRV_SESSION_ATTACHED;
1210 thd_set_thread_stack(&thd, stack);
1211 }
1212
1213
1214 /**
1215 Changes the state of a session to detached
1216 */
set_detached()1217 void Srv_session::set_detached()
1218 {
1219 state= SRV_SESSION_DETACHED;
1220 thd_set_thread_stack(&thd, NULL);
1221 }
1222
1223 #include "auth_common.h"
1224
execute_command(enum enum_server_command command,const union COM_DATA * data,const CHARSET_INFO * client_cs,const struct st_command_service_cbs * callbacks,enum cs_text_or_binary text_or_binary,void * callbacks_context)1225 int Srv_session::execute_command(enum enum_server_command command,
1226 const union COM_DATA * data,
1227 const CHARSET_INFO * client_cs,
1228 const struct st_command_service_cbs *callbacks,
1229 enum cs_text_or_binary text_or_binary,
1230 void * callbacks_context)
1231 {
1232 DBUG_ENTER("Srv_session::execute_command");
1233
1234 if (!srv_session_server_is_available())
1235 {
1236 if (err_protocol_ctx.handler)
1237 err_protocol_ctx.handler(err_protocol_ctx.handler_context,
1238 ER_SESSION_WAS_KILLED,
1239 ER_DEFAULT(ER_SESSION_WAS_KILLED));
1240 DBUG_RETURN(1);
1241 }
1242
1243 if (thd.killed)
1244 {
1245 if (err_protocol_ctx.handler)
1246 err_protocol_ctx.handler(err_protocol_ctx.handler_context,
1247 ER_SESSION_WAS_KILLED,
1248 ER_DEFAULT(ER_SESSION_WAS_KILLED));
1249 DBUG_RETURN(1);
1250 }
1251
1252 assert(thd.get_protocol() == &protocol_error);
1253
1254 // RAII:the destructor restores the state
1255 Srv_session::Session_backup_and_attach backup(this, false);
1256
1257 if (backup.attach_error)
1258 DBUG_RETURN(1);
1259
1260 if (client_cs &&
1261 thd.variables.character_set_results != client_cs &&
1262 thd_init_client_charset(&thd, client_cs->number))
1263 DBUG_RETURN(1);
1264
1265 /* Switch to different callbacks */
1266 Protocol_callback client_proto(callbacks, text_or_binary, callbacks_context);
1267
1268 thd.set_protocol(&client_proto);
1269
1270 mysql_audit_release(&thd);
1271
1272 /*
1273 The server does it for COM_QUERY in mysql_parse() but not for
1274 COM_INIT_DB, for example
1275 */
1276 if (command != COM_QUERY)
1277 thd.reset_for_next_command();
1278
1279 assert(thd.m_statement_psi == NULL);
1280 thd.m_statement_psi= MYSQL_START_STATEMENT(&thd.m_statement_state,
1281 stmt_info_new_packet.m_key,
1282 thd.db().str,
1283 thd.db().length,
1284 thd.charset(), NULL);
1285 int ret= dispatch_command(&thd, data, command);
1286
1287 thd.set_protocol(&protocol_error);
1288 DBUG_RETURN(ret);
1289 }
1290
1291
1292 /**
1293 Sets the connection type.
1294
1295 @see enum_vio_type
1296
1297 @return
1298 false success
1299 true failure
1300 */
set_connection_type(enum_vio_type v_type)1301 bool Srv_session::set_connection_type(enum_vio_type v_type)
1302 {
1303 if (v_type < FIRST_VIO_TYPE || v_type > LAST_VIO_TYPE)
1304 return true;
1305
1306 vio_type= v_type;
1307 #ifdef HAVE_PSI_THREAD_INTERFACE
1308 if (is_attached())
1309 PSI_THREAD_CALL(set_connection_type)(vio_type);
1310 #endif /* HAVE_PSI_THREAD_INTERFACE */
1311 return false;
1312 }
1313
1314
1315 /**
1316 Template class for scanning the thd list in the Global_THD_manager and
1317 modifying and element from the list. This template is for functions that
1318 need to change data of a THD without getting into races.
1319
1320 If the THD is found, a callback is executed under THD::Lock_thd_data.
1321 A pointer to a member variable is passed as a second parameter to the
1322 callback. The callback should store its result there.
1323
1324 After the search has been completed the result value can be obtained by
1325 calling get_result().
1326 */
1327 template <typename INPUT_TYPE>
1328 class Find_thd_by_id_with_callback_set: public Find_THD_Impl
1329 {
1330 public:
1331 typedef void (*callback_t)(THD *thd, INPUT_TYPE *result);
1332
Find_thd_by_id_with_callback_set(my_thread_id t_id,callback_t cb,INPUT_TYPE in)1333 Find_thd_by_id_with_callback_set(my_thread_id t_id, callback_t cb,
1334 INPUT_TYPE in):
1335 thread_id(t_id), callback(cb), input(in) {}
1336
1337 /**
1338 Callback called for every THD in the thd_list.
1339
1340 When a thread is found the callback function passed to the constructor
1341 is invoked under THD::Lock_thd_data
1342 */
operator ()(THD * thd)1343 virtual bool operator()(THD *thd)
1344 {
1345 if (thd->thread_id() == thread_id)
1346 {
1347 Mutex_lock lock(&thd->LOCK_thd_data);
1348 callback(thd, &input);
1349 return true;
1350 }
1351 return false;
1352 }
1353 private:
1354 my_thread_id thread_id;
1355 callback_t callback;
1356 INPUT_TYPE input;
1357 };
1358
1359 /**
1360 Callback for inspecting a THD object and modifying the peer_port member
1361 */
set_client_port_in_thd(THD * thd,uint16_t * input)1362 static void set_client_port_in_thd(THD *thd, uint16_t *input)
1363 {
1364 thd->peer_port= *input;
1365 }
1366
1367
1368 /**
1369 Sets the client port.
1370
1371 @note The client port in SHOW PROCESSLIST, INFORMATION_SCHEMA.PROCESSLIST.
1372 This port is NOT shown in PERFORMANCE_SCHEMA.THREADS.
1373
1374 @param port Port number
1375 */
set_client_port(uint16_t port)1376 void Srv_session::set_client_port(uint16_t port)
1377 {
1378 Find_thd_by_id_with_callback_set<uint16_t>
1379 find_thd_with_id(thd.thread_id(), set_client_port_in_thd, port);
1380 Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
1381 }
1382
1383
1384 /**
1385 Returns the number opened sessions in thread initialized by this class.
1386 */
session_count()1387 unsigned int Srv_session::session_count()
1388 {
1389 return server_session_list.size();
1390 }
1391
1392
1393 /**
1394 Returns the number currently running threads initialized by this class.
1395 */
thread_count(const void * plugin)1396 unsigned int Srv_session::thread_count(const void *plugin)
1397 {
1398 return server_session_threads.count(plugin);
1399 }
1400