1 /*
2    Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "srv_session.h"
26 #include "my_dbug.h"
27 #include "my_thread.h"
28 #include "sql_class.h"
29 #include "sql_base.h"            // close_mysql_tables
30 #include "sql_connect.h"         // thd_init_client_charset
31 #include "mysqld_thd_manager.h"  // Global_THD_manager
32 #include "sql_audit.h"           // MYSQL_AUDIT_NOTIFY_CONNECTION_CONNECT
33 #include "log.h"                 // Query log
34 #include "my_thread_local.h"     // my_get_thread_local & my_set_thread_local
35 #include "mysqld.h"              // current_thd
36 #include "sql_parse.h"           // dispatch_command()
37 #include "sql_thd_internal_api.h" // thd_set_thread_stack
38 #include "mutex_lock.h"
39 #include "mysql/psi/psi.h"
40 #include "conn_handler/connection_handler_manager.h"
41 #include "sql_plugin.h"
42 
43 #include <map>
44 
45 /**
46   @file
47   class Srv_session implementation. See the method comments for more. Please,
48   check also srv_session.h for more information.
49 */
50 
51 
52 extern void thd_clear_errors(THD *thd);
53 
54 static thread_local_key_t THR_stack_start_address;
55 static thread_local_key_t THR_srv_session_thread;
56 static bool srv_session_THRs_initialized= false;
57 
58 
59 /**
60   A simple wrapper around a RW lock:
61   Grabs the lock in the CTOR, releases it in the DTOR.
62   The lock may be NULL, in which case this is a no-op.
63 
64   Based on Mutex_lock from include/mutex_lock.h
65 */
66 class Auto_rw_lock_read
67 {
68 public:
Auto_rw_lock_read(mysql_rwlock_t * lock)69   explicit Auto_rw_lock_read(mysql_rwlock_t *lock) : rw_lock(NULL)
70   {
71     if (lock && 0 == mysql_rwlock_rdlock(lock))
72       rw_lock = lock;
73   }
74 
~Auto_rw_lock_read()75   ~Auto_rw_lock_read()
76   {
77     if (rw_lock)
78       mysql_rwlock_unlock(rw_lock);
79   }
80 private:
81   mysql_rwlock_t *rw_lock;
82 
83   Auto_rw_lock_read(const Auto_rw_lock_read&);         /* Not copyable. */
84   void operator=(const Auto_rw_lock_read&);            /* Not assignable. */
85 };
86 
87 
88 class Auto_rw_lock_write
89 {
90 public:
Auto_rw_lock_write(mysql_rwlock_t * lock)91   explicit Auto_rw_lock_write(mysql_rwlock_t *lock) : rw_lock(NULL)
92   {
93     if (lock && 0 == mysql_rwlock_wrlock(lock))
94       rw_lock = lock;
95   }
96 
~Auto_rw_lock_write()97   ~Auto_rw_lock_write()
98   {
99     if (rw_lock)
100       mysql_rwlock_unlock(rw_lock);
101   }
102 private:
103   mysql_rwlock_t *rw_lock;
104 
105   Auto_rw_lock_write(const Auto_rw_lock_write&);        /* Non-copyable */
106   void operator=(const Auto_rw_lock_write&);            /* Non-assignable */
107 };
108 
109 
110 class Thread_to_plugin_map
111 {
112 private:
113   bool initted;
114   bool psi_initted;
115   mysql_mutex_t LOCK_collection;
116 
117 #ifdef HAVE_PSI_INTERFACE
118   PSI_mutex_key key_LOCK_collection;
119 #endif
120   std::map<my_thread_t, const void*> collection;
121 
122 public:
123   /**
124     Initializes the map
125 
126     @param null_val null value to be returned when element not found in the map
127 
128     @return
129       false  success
130       true   failure
131   */
init()132   bool init()
133   {
134     const char* category= "session";
135     PSI_mutex_info all_mutexes[]=
136     {
137       { &key_LOCK_collection, "LOCK_srv_session_threads", PSI_FLAG_GLOBAL}
138     };
139 
140     initted= true;
141 #ifdef HAVE_PSI_INTERFACE
142     psi_initted= true;
143 
144     mysql_mutex_register(category, all_mutexes, array_elements(all_mutexes));
145 #endif
146     mysql_mutex_init(key_LOCK_collection, &LOCK_collection, MY_MUTEX_INIT_FAST);
147 
148     return false;
149   }
150 
151   /**
152     Deinitializes the map
153 
154     @return
155       false  success
156       true   failure
157   */
deinit()158   bool deinit()
159   {
160     initted= false;
161     mysql_mutex_destroy(&LOCK_collection);
162 
163     return false;
164   }
165 
166   /**
167     Adds a pthread to the list
168 
169     @param key   plugin
170 
171     @return
172       false  success
173       true   failure
174   */
add(my_thread_t thread,const void * plugin)175   bool add(my_thread_t thread, const void *plugin)
176   {
177     Mutex_lock lock(&LOCK_collection);
178     try
179     {
180       std::map<my_thread_t, const void*>::iterator it= collection.find(thread);
181       if (it == collection.end())
182         collection[thread]= plugin;
183     }
184     catch (const std::bad_alloc &e)
185     {
186       return true;
187     }
188     return false;
189   }
190 
191   /**
192     Removes a pthread from the list
193 
194     @param thread  Thread to remove from the list
195 
196     @return
197       false  success
198       true   failure
199   */
remove(my_thread_t thread)200   unsigned int remove(my_thread_t thread)
201   {
202     Mutex_lock lock(&LOCK_collection);
203     std::map<my_thread_t, const void*>::iterator it= collection.find(thread);
204     if (it != collection.end())
205     {
206       collection.erase(it);
207       return false;
208     }
209     return true;
210   }
211 
212   /**
213     Empties the map
214 
215     @return
216       false  success
217       true   failure
218   */
clear()219   bool clear()
220   {
221     Mutex_lock lock(&LOCK_collection);
222     collection.clear();
223     return false;
224   }
225 
226   /**
227     Returns the number of all threads
228   */
size()229   unsigned int size()
230   {
231     Mutex_lock lock(&LOCK_collection);
232     return collection.size();
233   }
234 
235   /**
236     Returns the number threads for a plugin
237 
238     @param plugin The plugin for which we need info.
239   */
count(const void * plugin)240   unsigned int count(const void *plugin)
241   {
242     if (!plugin)
243       return size();
244 
245     unsigned int ret= 0;
246     Mutex_lock lock(&LOCK_collection);
247     std::map<my_thread_t, const void*>::iterator it= collection.begin();
248     for (; it != collection.end(); ++it)
249     {
250       if (it->second == plugin)
251         ++ret;
252     }
253     return ret;
254   }
255 
256   /**
257     Kills all threads associated with a plugin
258 
259     @param plugin The plugin for which we need info.
260   */
kill(const void * plugin)261   unsigned int kill(const void *plugin)
262   {
263     std::list<my_thread_t> to_remove;
264 
265     Mutex_lock lock(&LOCK_collection);
266 
267     for (std::map<my_thread_t, const void*>::iterator it= collection.begin();
268          it != collection.end();
269          ++it)
270     {
271       if (!plugin || (it->second == plugin))
272       {
273         to_remove.push_back(it->first);
274 
275         my_thread_handle thread;
276 #ifndef _WIN32
277         /*
278            On Windows we need HANDLE to cancel a thread.
279            Win32 API's GetCurrentThread() returns something which seems the
280            same in every thread, thus unusable as a key.
281            GetCurrentThreadId returns an ID (DWORD), but it can't be used with
282            my_thread_cancel() which calls TerminateThread() on Windows.
283            TerminateThread() needs a Handle.
284            Therefore this killing functionality is now only for Posix Threads
285            until there is a solution for Windows.
286         */
287         thread.thread= it->first;
288         sql_print_error("Killing thread %lu", (unsigned long) it->first);
289         if (!my_thread_cancel(&thread))
290         {
291           void *dummy_retval;
292           my_thread_join(&thread, &dummy_retval);
293         }
294 #endif
295       }
296     }
297     if (to_remove.size())
298     {
299       for (std::list<my_thread_t>::iterator it= to_remove.begin();
300            it != to_remove.end();
301            ++it)
302         collection.erase(*it);
303     }
304     return to_remove.size();
305   }
306 };
307 
308 /**
309  std::map of THD* as key and Srv_session* as value guarded by a read-write lock.
310  RW lock is used instead of a mutex, as find() is a hot spot due to the sanity
311  checks it is used for - when a pointer to a closed session is passed.
312 */
313 class Mutexed_map_thd_srv_session
314 {
315 public:
316   class Do_Impl
317   {
318   public:
~Do_Impl()319     virtual ~Do_Impl() {}
320     /**
321       Work on the session
322 
323       @return
324         false  Leave the session in the map
325         true   Remove the session from the map
326     */
327     virtual bool operator()(Srv_session*) = 0;
328   };
329 
330 private:
331   /*
332     The first type in the tuple should be the key type of
333     Thread_to_plugin_map
334   */
335   typedef std::pair<const void*, Srv_session*> map_value_t;
336 
337   std::map<const THD*, map_value_t> collection;
338 
339   bool initted;
340   bool psi_initted;
341 
342   mysql_rwlock_t LOCK_collection;
343 
344 #ifdef HAVE_PSI_INTERFACE
345   PSI_rwlock_key key_LOCK_collection;
346 #endif
347 
348 public:
349   /**
350     Initializes the map
351 
352     @param null_val null value to be returned when element not found in the map
353 
354     @return
355       false  success
356       true   failure
357   */
init()358   bool init()
359   {
360     const char* category= "session";
361     PSI_rwlock_info all_rwlocks[]=
362     {
363       { &key_LOCK_collection, "LOCK_srv_session_collection", PSI_FLAG_GLOBAL}
364     };
365 
366     initted= true;
367 #ifdef HAVE_PSI_INTERFACE
368     psi_initted= true;
369 
370     mysql_rwlock_register(category, all_rwlocks, array_elements(all_rwlocks));
371 #endif
372     mysql_rwlock_init(key_LOCK_collection, &LOCK_collection);
373 
374     return false;
375   }
376 
377   /**
378     Deinitializes the map
379 
380     @return
381       false  success
382       true   failure
383   */
deinit()384   bool deinit()
385   {
386     initted= false;
387     mysql_rwlock_destroy(&LOCK_collection);
388 
389     return false;
390   }
391 
392   /**
393     Searches for an element with in the map
394 
395     @param key Key of the element
396 
397     @return
398       value of the element
399       NULL  if not found
400   */
find(const THD * key)401   Srv_session* find(const THD* key)
402   {
403     Auto_rw_lock_read lock(&LOCK_collection);
404 
405     std::map<const THD*, map_value_t>::iterator it= collection.find(key);
406     return (it != collection.end())? it->second.second : NULL;
407   }
408 
409   /**
410     Add an element to the map
411 
412     @param key     key
413     @param plugin  secondary key
414     @param value   value
415 
416     @return
417       false  success
418       true   failure
419   */
add(const THD * key,const void * plugin,Srv_session * session)420   bool add(const THD* key, const void *plugin, Srv_session *session)
421   {
422     Auto_rw_lock_write lock(&LOCK_collection);
423     try
424     {
425       collection[key]= std::make_pair(plugin, session);
426     }
427     catch (const std::bad_alloc &e)
428     {
429       return true;
430     }
431     return false;
432   }
433 
434   /**
435     Removes an element from the map.
436 
437     @param key  key
438 
439     @return
440       false  success
441       true   failure
442   */
remove(const THD * key)443   bool remove(const THD* key)
444   {
445     Auto_rw_lock_write lock(&LOCK_collection);
446     /*
447       If we use erase with the key directly an exception could be thrown. The
448       find method never throws. erase() with iterator as parameter also never
449       throws.
450     */
451     std::map<const THD*, map_value_t>::iterator it= collection.find(key);
452     if (it != collection.end())
453       collection.erase(it);
454     return false;
455   }
456 
457   /**
458     Removes all elements which have been added with plugin as plugin name.
459 
460     @param plugin key
461     @param removed OUT Number of removed elements
462   */
remove_all_of_plugin(const void * plugin,unsigned int & removed)463   void remove_all_of_plugin(const void *plugin, unsigned int &removed)
464   {
465     std::list<Srv_session *> to_close;
466     removed= 0;
467 
468     {
469       Auto_rw_lock_write lock(&LOCK_collection);
470 
471       for (std::map<const THD*, map_value_t>::iterator it= collection.begin();
472            it != collection.end();
473            ++it)
474       {
475         if (it->second.first == plugin)
476           to_close.push_back(it->second.second);
477       }
478     }
479 
480     /* Outside of the lock as Srv_session::close() will try to
481       remove itself from the list*/
482     if ((removed= to_close.size()))
483     {
484       for (std::list<Srv_session *>::iterator it= to_close.begin();
485            it != to_close.end();
486            ++it)
487       {
488         Srv_session *session= *it;
489         session->detach();
490         session->close();
491         delete session;
492       }
493     }
494   }
495 
496   /**
497     Empties the map
498 
499     @return
500       false  success
501       true   failure
502   */
clear()503   bool clear()
504   {
505     Auto_rw_lock_write lock(&LOCK_collection);
506     collection.clear();
507     return false;
508   }
509 
510   /**
511     Returns the number of elements in the maps
512   */
size()513   unsigned int size()
514   {
515     Auto_rw_lock_read lock(&LOCK_collection);
516     return collection.size();
517   }
518 };
519 
520 static Mutexed_map_thd_srv_session server_session_list;
521 static Thread_to_plugin_map server_session_threads;
522 
523 /**
524   Constructs a session state object for Srv_session::execute_command.
525   Saves state. Uses RAII.
526 
527   @param sess Session to backup
528 */
529 Srv_session::
Session_backup_and_attach(Srv_session * sess,bool is_close_session)530 Session_backup_and_attach::Session_backup_and_attach(Srv_session *sess,
531                                                      bool is_close_session)
532   :session(sess),
533    old_session(NULL),
534    in_close_session(is_close_session)
535 {
536   THD *c_thd= current_thd;
537   const void *is_srv_session_thread= my_get_thread_local(THR_srv_session_thread);
538   backup_thd= is_srv_session_thread? NULL:c_thd;
539 
540   if (is_srv_session_thread && c_thd && c_thd != &session->thd)
541   {
542     if ((old_session= server_session_list.find(c_thd)))
543       old_session->detach();
544   }
545 
546   attach_error= session->attach();
547 }
548 
549 
550 /**
551   Destructs the session state object. In other words it restores to
552   previous state.
553 */
~Session_backup_and_attach()554 Srv_session::Session_backup_and_attach::~Session_backup_and_attach()
555 {
556   if (backup_thd)
557   {
558     session->detach();
559     backup_thd->store_globals();
560 #ifdef HAVE_PSI_THREAD_INTERFACE
561     enum_vio_type vio_type= backup_thd->get_vio_type();
562     if (vio_type != NO_VIO_TYPE)
563       PSI_THREAD_CALL(set_connection_type)(vio_type);
564 #endif /* HAVE_PSI_THREAD_INTERFACE */
565   }
566   else if (in_close_session)
567   {
568     /*
569       We should restore the old session only in case of close.
570       In case of execute we should stay attached.
571     */
572     session->detach();
573     if (old_session)
574       old_session->attach();
575   }
576 }
577 
578 
err_start_result_metadata(void *,uint,uint,const CHARSET_INFO *)579 static int err_start_result_metadata(void*, uint, uint, const CHARSET_INFO *)
580 {
581   return 1;
582 }
583 
err_field_metadata(void *,struct st_send_field *,const CHARSET_INFO *)584 static int err_field_metadata(void*, struct st_send_field*, const CHARSET_INFO*)
585 {
586   return 1;
587 }
588 
err_end_result_metadata(void *,uint,uint)589 static int err_end_result_metadata(void*, uint, uint)
590 {
591   return 1;
592 }
593 
err_start_row(void *)594 static int err_start_row(void*)
595 {
596   return 1;
597 }
598 
err_end_row(void *)599 static int err_end_row(void*)
600 {
601   return 1;
602 }
603 
err_abort_row(void *)604 static void err_abort_row(void*)
605 {
606 }
607 
err_get_client_capabilities(void *)608 static ulong err_get_client_capabilities(void*)
609 {
610   return 0;
611 }
612 
err_get_null(void *)613 static int err_get_null(void*)
614 {
615   return 1;
616 }
617 
err_get_integer(void *,longlong)618 static int err_get_integer(void*, longlong)
619 {
620   return 1;
621 }
622 
err_get_longlong(void *,longlong,uint)623 static int err_get_longlong(void*, longlong, uint)
624 {
625   return 1;
626 }
627 
err_get_decimal(void *,const decimal_t *)628 static int err_get_decimal(void*, const decimal_t*)
629 {
630   return 1;
631 }
632 
err_get_double(void *,double,uint32)633 static int err_get_double(void*, double, uint32)
634 {
635   return 1;
636 }
637 
err_get_date(void *,const MYSQL_TIME *)638 static int err_get_date(void*, const MYSQL_TIME*)
639 {
640   return 1;
641 }
642 
err_get_time(void *,const MYSQL_TIME *,uint)643 static int err_get_time(void*, const MYSQL_TIME*, uint)
644 {
645   return 1;
646 }
647 
err_get_datetime(void *,const MYSQL_TIME *,uint)648 static int err_get_datetime(void*, const MYSQL_TIME*, uint)
649 {
650   return 1;
651 }
652 
653 
err_get_string(void *,const char *,size_t,const CHARSET_INFO *)654 static int err_get_string(void*, const char*, size_t,const CHARSET_INFO*)
655 {
656   return 1;
657 }
658 
err_handle_ok(void * ctx,uint server_status,uint warn_count,ulonglong affected_rows,ulonglong last_insert_id,const char * const message)659 static void err_handle_ok(void * ctx, uint server_status, uint warn_count,
660                           ulonglong affected_rows, ulonglong last_insert_id,
661                           const char * const message)
662 {
663   Srv_session::st_err_protocol_ctx *pctx=
664              static_cast<Srv_session::st_err_protocol_ctx*>(ctx);
665   if (pctx && pctx->handler)
666   {
667     char buf[256];
668     my_snprintf(buf, sizeof(buf),
669                 "OK status=%u warnings=%u affected=%llu last_id=%llu",
670                 server_status, warn_count, affected_rows, last_insert_id);
671     pctx->handler(pctx->handler_context, 0, buf);
672   }
673 }
674 
err_handle_error(void * ctx,uint err_errno,const char * err_msg,const char * sqlstate)675 static void err_handle_error(void * ctx, uint err_errno, const char * err_msg,
676                              const char * sqlstate)
677 {
678   Srv_session::st_err_protocol_ctx *pctx=
679              static_cast<Srv_session::st_err_protocol_ctx*>(ctx);
680   if (pctx && pctx->handler)
681     pctx->handler(pctx->handler_context, err_errno, err_msg);
682 }
683 
err_shutdown(void *,int server_shutdown)684 static void err_shutdown(void*, int server_shutdown){}
685 
686 
687 const struct st_command_service_cbs error_protocol_callbacks=
688 {
689   err_start_result_metadata,
690   err_field_metadata,
691   err_end_result_metadata,
692   err_start_row,
693   err_end_row,
694   err_abort_row,
695   err_get_client_capabilities,
696   err_get_null,
697   err_get_integer,
698   err_get_longlong,
699   err_get_decimal,
700   err_get_double,
701   err_get_date,
702   err_get_time,
703   err_get_datetime,
704   err_get_string,
705   err_handle_ok,
706   err_handle_error,
707   err_shutdown
708 };
709 
710 
711 /**
712   Modifies the PSI structures to (de)install a THD
713 
714   @param thd THD
715 */
set_psi(THD * thd)716 static void set_psi(THD *thd)
717 {
718 #ifdef HAVE_PSI_THREAD_INTERFACE
719   struct PSI_thread *psi= PSI_THREAD_CALL(get_thread)();
720   PSI_THREAD_CALL(set_thread_id)(psi, thd? thd->thread_id() : 0);
721   PSI_THREAD_CALL(set_thread_THD)(psi, thd);
722 #endif
723 }
724 
725 
726 /**
727   Initializes physical thread to use with session service.
728 
729   @param plugin Pointer to the plugin structure, passed to the plugin over
730                 the plugin init function.
731 
732   @return
733     false  success
734     true   failure
735 */
init_thread(const void * plugin)736 bool Srv_session::init_thread(const void *plugin)
737 {
738   int stack_start;
739   if (my_thread_init() ||
740       my_set_thread_local(THR_srv_session_thread, const_cast<void*>(plugin)) ||
741       my_set_thread_local(THR_stack_start_address, &stack_start))
742   {
743     connection_errors_internal++;
744     return true;
745   }
746 
747   server_session_threads.add(my_thread_self(), plugin);
748 
749   return false;
750 }
751 
752 
753 /**
754   Looks if there is currently attached session and detaches it.
755 
756   @param plugin  The plugin to be checked
757 */
close_currently_attached_session_if_any(const st_plugin_int * plugin)758 static void close_currently_attached_session_if_any(const st_plugin_int *plugin)
759 {
760   THD *c_thd= current_thd;
761   if (!c_thd)
762     return;
763 
764   Srv_session* current_session= server_session_list.find(c_thd);
765 
766   if (current_session)
767   {
768     sql_print_error("Plugin %s is deinitializing a thread but "
769                      "left a session attached. Detaching it forcefully.",
770                      plugin->name.str);
771 
772     if (current_session->detach())
773       sql_print_error("Failed to detach the session.");
774   }
775 }
776 
777 
778 /**
779   Looks if the plugin has any non-closed sessions and closes them forcefully
780 
781   @param plugin  The plugin to be checked
782 */
close_all_sessions_of_plugin_if_any(const st_plugin_int * plugin)783 static void close_all_sessions_of_plugin_if_any(const st_plugin_int *plugin)
784 {
785   unsigned int removed_count;
786 
787   server_session_list.remove_all_of_plugin(plugin, removed_count);
788 
789   if (removed_count)
790      sql_print_error("Closed forcefully %u session%s left opened by plugin %s",
791                      removed_count, (removed_count > 1)? "s":"",
792                      plugin? plugin->name.str : "SERVER_INTERNAL");
793 }
794 
795 
796 /**
797   Deinitializes physical thread to use with session service
798 */
deinit_thread()799 void Srv_session::deinit_thread()
800 {
801   const st_plugin_int *plugin= static_cast<const st_plugin_int *>(
802                                 my_get_thread_local(THR_srv_session_thread));
803   if (plugin)
804     close_currently_attached_session_if_any(plugin);
805 
806   if (server_session_threads.remove(my_thread_self()))
807     sql_print_error("Failed to decrement the number of threads");
808 
809   if (!server_session_threads.count(plugin))
810     close_all_sessions_of_plugin_if_any(plugin);
811 
812   my_set_thread_local(THR_srv_session_thread, NULL);
813 
814   assert(my_get_thread_local(THR_stack_start_address));
815   my_set_thread_local(THR_stack_start_address, NULL);
816   my_thread_end();
817 }
818 
819 
820 /**
821   Checks if a plugin has left threads and sessions
822 
823   @param plugin  The plugin to be checked
824 */
check_for_stale_threads(const st_plugin_int * plugin)825 void Srv_session::check_for_stale_threads(const st_plugin_int *plugin)
826 {
827   if (!plugin)
828     return;
829 
830   unsigned int thread_count= server_session_threads.count(plugin);
831   if (thread_count)
832   {
833     close_all_sessions_of_plugin_if_any(plugin);
834 
835     sql_print_error("Plugin %s did not deinitialize %u threads",
836                     plugin->name.str, thread_count);
837 
838     unsigned int killed_count= server_session_threads.kill(plugin);
839     sql_print_error("Killed %u threads of plugin %s",
840                     killed_count, plugin->name.str);
841   }
842 }
843 
844 
845 /**
846   Inits the module
847 
848   @return
849     false  success
850     true   failure
851 */
module_init()852 bool Srv_session::module_init()
853 {
854   if (srv_session_THRs_initialized)
855     return false;
856 
857   if (my_create_thread_local_key(&THR_stack_start_address, NULL) ||
858       my_create_thread_local_key(&THR_srv_session_thread, NULL))
859   {
860     sql_print_error("Can't create thread key for SQL session service");
861     return true;
862   }
863   srv_session_THRs_initialized= true;
864 
865   server_session_list.init();
866   server_session_threads.init();
867 
868   return false;
869 }
870 
871 
872 /**
873   Deinits the module.
874 
875   Never fails
876 
877   @return
878     false  success
879 */
module_deinit()880 bool Srv_session::module_deinit()
881 {
882   if (srv_session_THRs_initialized)
883   {
884     srv_session_THRs_initialized= false;
885     (void) my_delete_thread_local_key(THR_stack_start_address);
886     (void) my_delete_thread_local_key(THR_srv_session_thread);
887 
888     server_session_list.clear();
889     server_session_list.deinit();
890 
891     server_session_threads.clear();
892     server_session_threads.deinit();
893 
894     srv_session_THRs_initialized= false;
895   }
896   return false;
897 }
898 
899 
900 /**
901   Checks if the session is valid.
902 
903   Checked is if session is NULL, or in the list of opened sessions. If the
904   session is not in this list it was either closed or the address is invalid.
905 
906   @return
907     true  valid
908     false not valid
909 */
is_valid(const Srv_session * session)910 bool Srv_session::is_valid(const Srv_session *session)
911 {
912   const THD *thd= session?
913     reinterpret_cast<const THD*>(session + my_offsetof(Srv_session, thd)):
914     NULL;
915   return thd? (bool) server_session_list.find(thd) : false;
916 }
917 
918 
919 /**
920   Constructs a server session
921 
922   @param error_cb       Default completion callback
923   @param err_cb_ctx     Plugin's context, opaque pointer that would
924                         be provided to callbacks. Might be NULL.
925 */
Srv_session(srv_session_error_cb err_cb,void * err_cb_ctx)926 Srv_session::Srv_session(srv_session_error_cb err_cb, void *err_cb_ctx) :
927   da(false), err_protocol_ctx(err_cb, err_cb_ctx),
928   protocol_error(&error_protocol_callbacks, CS_TEXT_REPRESENTATION,
929                  (void*)&err_protocol_ctx),
930   state(SRV_SESSION_CREATED), vio_type(NO_VIO_TYPE)
931 {
932   thd.mark_as_srv_session();
933 }
934 
935 
936 /**
937   Opens a server session
938 
939   @return
940     false  on success
941     true   on failure
942 */
open()943 bool Srv_session::open()
944 {
945   DBUG_ENTER("Srv_session::open");
946 
947   DBUG_PRINT("info",("Session=%p  THD=%p  DA=%p", this, &thd, &da));
948   assert(state == SRV_SESSION_CREATED || state == SRV_SESSION_CLOSED);
949 
950   thd.set_protocol(&protocol_error);
951   thd.push_diagnostics_area(&da);
952   /*
953     thd.stack_start will be set once we start attempt to attach.
954     store_globals() will check for it, so we will set it beforehand.
955 
956     No store_globals() here as the session is always created in a detached
957     state. Attachment with store_globals() will happen on demand.
958   */
959   if (thd_init_client_charset(&thd, my_charset_utf8_general_ci.number))
960   {
961     connection_errors_internal++;
962     if (err_protocol_ctx.handler)
963       err_protocol_ctx.handler(err_protocol_ctx.handler_context,
964                                ER_OUT_OF_RESOURCES,
965                                ER_DEFAULT(ER_OUT_OF_RESOURCES));
966     Connection_handler_manager::dec_connection_count();
967     DBUG_RETURN(true);
968   }
969 
970   thd.update_charset();
971 
972   thd.set_new_thread_id();
973 
974   DBUG_PRINT("info", ("thread_id=%d", thd.thread_id()));
975 
976   thd.set_time();
977   thd.thr_create_utime= thd.start_utime= my_micro_time();
978 
979   /*
980     Disable QC - plugins will most probably install their own protocol
981     and it won't be compatible with the QC. In addition, Protocol_error
982     is not compatible with the QC.
983   */
984   thd.variables.query_cache_type = 0;
985 
986   thd.set_command(COM_SLEEP);
987   thd.init_for_queries();
988 
989   Global_THD_manager::get_instance()->add_thd(&thd);
990 
991   const void *plugin= my_get_thread_local(THR_srv_session_thread);
992 
993   server_session_list.add(&thd, plugin, this);
994 
995   if (mysql_audit_notify(&thd,
996                          AUDIT_EVENT(MYSQL_AUDIT_CONNECTION_PRE_AUTHENTICATE)))
997   {
998     Connection_handler_manager::dec_connection_count();
999     DBUG_RETURN(true);
1000   }
1001 
1002   DBUG_RETURN(false);
1003 }
1004 
1005 
1006 /**
1007   Attaches the session to the current physical thread
1008 
1009   @param session  Session handle
1010 
1011   @returns
1012     false   success
1013     true    failure
1014 */
attach()1015 bool Srv_session::attach()
1016 {
1017   const bool first_attach= (state == SRV_SESSION_CREATED);
1018   DBUG_ENTER("Srv_session::attach");
1019   DBUG_PRINT("info",("current_thd=%p", current_thd));
1020 
1021   if (is_attached())
1022   {
1023     if (!my_thread_equal(thd.real_id, my_thread_self()))
1024     {
1025       DBUG_PRINT("error", ("Attached to different thread. Detach in it"));
1026       DBUG_RETURN(true);
1027     }
1028     /* As it is attached, no need to do anything */
1029     DBUG_RETURN(false);
1030   }
1031 
1032   if (&thd == current_thd)
1033     DBUG_RETURN(false);
1034 
1035   THD *old_thd= current_thd;
1036   DBUG_PRINT("info",("current_thd=%p", current_thd));
1037 
1038   if (old_thd)
1039     old_thd->restore_globals();
1040 
1041   DBUG_PRINT("info",("current_thd=%p", current_thd));
1042 
1043   const char *new_stack= my_get_thread_local(THR_srv_session_thread)?
1044         (const char*)my_get_thread_local(THR_stack_start_address):
1045         (old_thd? old_thd->thread_stack : NULL);
1046 
1047   /*
1048     Attach optimistically, as this will set thread_stack,
1049     which needed by store_globals()
1050   */
1051   set_attached(new_stack);
1052 
1053   // This will install our new THD object as current_thd
1054   if (thd.store_globals())
1055   {
1056     DBUG_PRINT("error", ("Error while storing globals"));
1057 
1058     if (old_thd)
1059       old_thd->store_globals();
1060 
1061     set_psi(old_thd);
1062 
1063     set_detached();
1064     DBUG_RETURN(true);
1065   }
1066   Srv_session* old_session= server_session_list.find(old_thd);
1067 
1068   /* Really detach only if we are sure everything went fine */
1069   if (old_session)
1070     old_session->set_detached();
1071 
1072   thd_clear_errors(&thd);
1073 
1074   set_psi(&thd);
1075 
1076 #ifdef HAVE_PSI_THREAD_INTERFACE
1077    PSI_THREAD_CALL(set_connection_type)(vio_type != NO_VIO_TYPE?
1078                                         vio_type : thd.get_vio_type());
1079 #endif /* HAVE_PSI_THREAD_INTERFACE */
1080 
1081   if (first_attach)
1082   {
1083     /*
1084       At first attach the security context should have been already set and
1085       and this will report corect information.
1086     */
1087     if (mysql_audit_notify(&thd, AUDIT_EVENT(MYSQL_AUDIT_CONNECTION_CONNECT)))
1088       DBUG_RETURN(true);
1089     query_logger.general_log_print(&thd, COM_CONNECT, NullS);
1090   }
1091 
1092   DBUG_RETURN(false);
1093 }
1094 
1095 
1096 /**
1097   Detaches the session from the current physical thread.
1098 
1099   @returns
1100     false success
1101     true  failure
1102 */
detach()1103 bool Srv_session::detach()
1104 {
1105   DBUG_ENTER("Srv_session::detach");
1106 
1107   if (!is_attached())
1108     DBUG_RETURN(false);
1109 
1110   if (!my_thread_equal(thd.real_id, my_thread_self()))
1111   {
1112     DBUG_PRINT("error", ("Attached to a different thread. Detach in it"));
1113     DBUG_RETURN(true);
1114   }
1115 
1116   DBUG_PRINT("info",("Session=%p THD=%p current_thd=%p",
1117                      this, &thd, current_thd));
1118 
1119   assert(&thd == current_thd);
1120   thd.restore_globals();
1121 
1122   set_psi(NULL);
1123   /*
1124     We can't call PSI_THREAD_CALL(set_connection_type)(NO_VIO_TYPE) here because
1125     it will assert. Thus, it will be possible to have a physical thread, which
1126     has no session attached to it but have cached vio type.
1127     This only will happen in a spawned thread initialized by this service.
1128     If a server initialized thread is used, just after detach the previous
1129     current_thd will be re-attached again (not created by our service) and
1130     the vio type will be set correctly.
1131     Please see: Session_backup_and_attach::~Session_backup_and_attach()
1132   */
1133 
1134   /*
1135     Call after restore_globals() as it will check the stack_addr, which is
1136     nulled by set_detached()
1137   */
1138   set_detached();
1139   DBUG_RETURN(false);
1140 }
1141 
1142 
1143 /**
1144   Closes the session
1145 
1146   @returns
1147     false Session successfully closed
1148     true  No such session exists / Session is attached to a different thread
1149 */
close()1150 bool Srv_session::close()
1151 {
1152   DBUG_ENTER("Srv_session::close");
1153 
1154   DBUG_PRINT("info",("Session=%p THD=%p current_thd=%p",
1155                      this, &thd, current_thd));
1156 
1157   assert(state < SRV_SESSION_CLOSED);
1158 
1159   /*
1160     RAII
1161     We store the state (the currently attached session, if different than
1162     our and then attach our.
1163     The destructor will attach the session we detached.
1164   */
1165 
1166   Srv_session::Session_backup_and_attach backup(this, true);
1167 
1168   if (backup.attach_error)
1169     DBUG_RETURN(true);
1170 
1171   state= SRV_SESSION_CLOSED;
1172 
1173   server_session_list.remove(&thd);
1174 
1175   /*
1176     Log to general log must happen before release_resources() as
1177     current_thd will be different then.
1178   */
1179   query_logger.general_log_print(&thd, COM_QUIT, NullS);
1180   mysql_audit_notify(&thd, AUDIT_EVENT(MYSQL_AUDIT_CONNECTION_DISCONNECT), 0);
1181 
1182   close_mysql_tables(&thd);
1183 
1184   thd.pop_diagnostics_area();
1185 
1186   thd.get_stmt_da()->reset_diagnostics_area();
1187 
1188   thd.disconnect();
1189 
1190   set_psi(NULL);
1191 
1192   thd.release_resources();
1193 
1194   Global_THD_manager::get_instance()->remove_thd(&thd);
1195 
1196   Connection_handler_manager::dec_connection_count();
1197 
1198   DBUG_RETURN(false);
1199 }
1200 
1201 
1202 /**
1203   Sets session's state to attached
1204 
1205   @param stack  New stack address
1206 */
set_attached(const char * stack)1207 void Srv_session::set_attached(const char *stack)
1208 {
1209   state= SRV_SESSION_ATTACHED;
1210   thd_set_thread_stack(&thd, stack);
1211 }
1212 
1213 
1214 /**
1215   Changes the state of a session to detached
1216 */
set_detached()1217 void Srv_session::set_detached()
1218 {
1219   state= SRV_SESSION_DETACHED;
1220   thd_set_thread_stack(&thd, NULL);
1221 }
1222 
1223 #include "auth_common.h"
1224 
execute_command(enum enum_server_command command,const union COM_DATA * data,const CHARSET_INFO * client_cs,const struct st_command_service_cbs * callbacks,enum cs_text_or_binary text_or_binary,void * callbacks_context)1225 int Srv_session::execute_command(enum enum_server_command command,
1226                                  const union COM_DATA * data,
1227                                  const CHARSET_INFO * client_cs,
1228                                  const struct st_command_service_cbs *callbacks,
1229                                  enum cs_text_or_binary text_or_binary,
1230                                  void * callbacks_context)
1231 {
1232   DBUG_ENTER("Srv_session::execute_command");
1233 
1234   if (!srv_session_server_is_available())
1235   {
1236     if (err_protocol_ctx.handler)
1237       err_protocol_ctx.handler(err_protocol_ctx.handler_context,
1238                                ER_SESSION_WAS_KILLED,
1239                                ER_DEFAULT(ER_SESSION_WAS_KILLED));
1240     DBUG_RETURN(1);
1241   }
1242 
1243   if (thd.killed)
1244   {
1245     if (err_protocol_ctx.handler)
1246       err_protocol_ctx.handler(err_protocol_ctx.handler_context,
1247                                ER_SESSION_WAS_KILLED,
1248                                ER_DEFAULT(ER_SESSION_WAS_KILLED));
1249     DBUG_RETURN(1);
1250   }
1251 
1252   assert(thd.get_protocol() == &protocol_error);
1253 
1254   // RAII:the destructor restores the state
1255   Srv_session::Session_backup_and_attach backup(this, false);
1256 
1257   if (backup.attach_error)
1258     DBUG_RETURN(1);
1259 
1260   if (client_cs &&
1261       thd.variables.character_set_results != client_cs &&
1262       thd_init_client_charset(&thd, client_cs->number))
1263     DBUG_RETURN(1);
1264 
1265   /* Switch to different callbacks */
1266   Protocol_callback client_proto(callbacks, text_or_binary, callbacks_context);
1267 
1268   thd.set_protocol(&client_proto);
1269 
1270   mysql_audit_release(&thd);
1271 
1272   /*
1273     The server does it for COM_QUERY in mysql_parse() but not for
1274     COM_INIT_DB, for example
1275   */
1276   if (command != COM_QUERY)
1277     thd.reset_for_next_command();
1278 
1279   assert(thd.m_statement_psi == NULL);
1280   thd.m_statement_psi= MYSQL_START_STATEMENT(&thd.m_statement_state,
1281                                              stmt_info_new_packet.m_key,
1282                                              thd.db().str,
1283                                              thd.db().length,
1284                                              thd.charset(), NULL);
1285   int ret= dispatch_command(&thd, data, command);
1286 
1287   thd.set_protocol(&protocol_error);
1288   DBUG_RETURN(ret);
1289 }
1290 
1291 
1292 /**
1293   Sets the connection type.
1294 
1295   @see enum_vio_type
1296 
1297   @return
1298     false success
1299     true  failure
1300 */
set_connection_type(enum_vio_type v_type)1301 bool Srv_session::set_connection_type(enum_vio_type v_type)
1302 {
1303   if (v_type < FIRST_VIO_TYPE || v_type > LAST_VIO_TYPE)
1304     return true;
1305 
1306   vio_type= v_type;
1307 #ifdef HAVE_PSI_THREAD_INTERFACE
1308   if (is_attached())
1309     PSI_THREAD_CALL(set_connection_type)(vio_type);
1310 #endif /* HAVE_PSI_THREAD_INTERFACE */
1311   return false;
1312 }
1313 
1314 
1315 /**
1316   Template class for scanning the thd list in the Global_THD_manager and
1317   modifying and element from the list. This template is for functions that
1318   need to change data of a THD without getting into races.
1319 
1320   If the THD is found, a callback is executed under THD::Lock_thd_data.
1321   A pointer to a member variable is passed as a second parameter to the
1322   callback. The callback should store its result there.
1323 
1324   After the search has been completed the result value can be obtained by
1325   calling get_result().
1326 */
1327 template <typename INPUT_TYPE>
1328 class Find_thd_by_id_with_callback_set: public Find_THD_Impl
1329 {
1330 public:
1331   typedef void (*callback_t)(THD *thd, INPUT_TYPE *result);
1332 
Find_thd_by_id_with_callback_set(my_thread_id t_id,callback_t cb,INPUT_TYPE in)1333   Find_thd_by_id_with_callback_set(my_thread_id t_id, callback_t cb,
1334                                    INPUT_TYPE in):
1335     thread_id(t_id), callback(cb), input(in) {}
1336 
1337   /**
1338     Callback called for every THD in the thd_list.
1339 
1340     When a thread is found the callback function passed to the constructor
1341     is invoked under THD::Lock_thd_data
1342   */
operator ()(THD * thd)1343   virtual bool operator()(THD *thd)
1344   {
1345     if (thd->thread_id() == thread_id)
1346     {
1347       Mutex_lock lock(&thd->LOCK_thd_data);
1348       callback(thd, &input);
1349       return true;
1350     }
1351     return false;
1352   }
1353 private:
1354   my_thread_id thread_id;
1355   callback_t callback;
1356   INPUT_TYPE input;
1357 };
1358 
1359 /**
1360   Callback for inspecting a THD object and modifying the peer_port member
1361 */
set_client_port_in_thd(THD * thd,uint16_t * input)1362 static void set_client_port_in_thd(THD *thd, uint16_t *input)
1363 {
1364   thd->peer_port= *input;
1365 }
1366 
1367 
1368 /**
1369   Sets the client port.
1370 
1371   @note The client port in SHOW PROCESSLIST, INFORMATION_SCHEMA.PROCESSLIST.
1372   This port is NOT shown in PERFORMANCE_SCHEMA.THREADS.
1373 
1374   @param port  Port number
1375 */
set_client_port(uint16_t port)1376 void Srv_session::set_client_port(uint16_t port)
1377 {
1378   Find_thd_by_id_with_callback_set<uint16_t>
1379      find_thd_with_id(thd.thread_id(), set_client_port_in_thd, port);
1380   Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
1381 }
1382 
1383 
1384 /**
1385   Returns the number opened sessions in thread initialized by this class.
1386 */
session_count()1387 unsigned int Srv_session::session_count()
1388 {
1389   return server_session_list.size();
1390 }
1391 
1392 
1393 /**
1394   Returns the number currently running threads initialized by this class.
1395 */
thread_count(const void * plugin)1396 unsigned int Srv_session::thread_count(const void *plugin)
1397 {
1398   return server_session_threads.count(plugin);
1399 }
1400