1 /* Copyright (C) 2012, 2020, MariaDB
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #include "mariadb.h"
17 #include <violite.h>
18 #include <sql_priv.h>
19 #include <sql_class.h>
20 #include <my_pthread.h>
21 #include <scheduler.h>
22 #include <sql_connect.h>
23 #include <sql_audit.h>
24 #include <debug_sync.h>
25 #include <threadpool.h>
26 
27 
28 /* Threadpool parameters */
29 
30 uint threadpool_min_threads;
31 uint threadpool_idle_timeout;
32 uint threadpool_size;
33 uint threadpool_max_size;
34 uint threadpool_stall_limit;
35 uint threadpool_max_threads;
36 uint threadpool_oversubscribe;
37 uint threadpool_mode;
38 uint threadpool_prio_kickup_timer;
39 
40 /* Stats */
41 TP_STATISTICS tp_stats;
42 
43 
44 static void  threadpool_remove_connection(THD *thd);
45 static int   threadpool_process_request(THD *thd);
46 static THD*  threadpool_add_connection(CONNECT *connect, void *scheduler_data);
47 
48 extern bool do_command(THD*);
49 
get_TP_connection(THD * thd)50 static inline TP_connection *get_TP_connection(THD *thd)
51 {
52   return (TP_connection *)thd->event_scheduler.data;
53 }
54 
55 /*
56   Worker threads contexts, and THD contexts.
57   =========================================
58 
59   Both worker threads and connections have their sets of thread local variables
60   At the moment it is mysys_var (this has specific data for dbug, my_error and
61   similar goodies), and PSI per-client structure.
62 
63   Whenever query is executed following needs to be done:
64 
65   1. Save worker thread context.
66   2. Change TLS variables to connection specific ones using thread_attach(THD*).
67      This function does some additional work , e.g setting up
68      thread_stack/thread_ends_here pointers.
69   3. Process query
70   4. Restore worker thread context.
71 
72   Connection login and termination follows similar schema w.r.t saving and
73   restoring contexts.
74 
75   For both worker thread, and for the connection, mysys variables are created
76   using my_thread_init() and freed with my_thread_end().
77 
78 */
79 struct Worker_thread_context
80 {
81   PSI_thread *psi_thread;
82   st_my_thread_var* mysys_var;
83 
saveWorker_thread_context84   void save()
85   {
86     psi_thread= PSI_CALL_get_thread();
87     mysys_var= my_thread_var;
88   }
89 
restoreWorker_thread_context90   void restore()
91   {
92     PSI_CALL_set_thread(psi_thread);
93     set_mysys_var(mysys_var);
94     pthread_setspecific(THR_THD, 0);
95   }
96 };
97 
98 
99 #ifdef HAVE_PSI_INTERFACE
100 
101 /*
102   The following fixes PSI "idle" psi instrumentation.
103   The server assumes that connection  becomes idle
104   just before net_read_packet() and switches to active after it.
105   In out setup, server becomes idle when async socket io is made.
106 */
107 
108 extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
109 
dummy_before_header(struct st_net *,void *,size_t)110 static void dummy_before_header(struct st_net *, void *, size_t)
111 {
112 }
113 
re_init_net_server_extension(THD * thd)114 static void re_init_net_server_extension(THD *thd)
115 {
116   thd->m_net_server_extension.m_before_header = dummy_before_header;
117 }
118 
119 #else
120 
121 #define re_init_net_server_extension(thd)
122 
123 #endif /* HAVE_PSI_INTERFACE */
124 
125 
set_thd_idle(THD * thd)126 static inline void set_thd_idle(THD *thd)
127 {
128   thd->net.reading_or_writing= 1;
129 #ifdef HAVE_PSI_INTERFACE
130   net_before_header_psi(&thd->net, thd, 0);
131 #endif
132 }
133 
134 /*
135   Attach/associate the connection with the OS thread,
136 */
thread_attach(THD * thd)137 static void thread_attach(THD* thd)
138 {
139   set_mysys_var(thd->mysys_var);
140   thd->thread_stack=(char*)&thd;
141   thd->store_globals();
142   PSI_CALL_set_thread(thd->event_scheduler.m_psi);
143   mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
144 }
145 
146 /*
147   Determine connection priority , using current
148   transaction state and 'threadpool_priority' variable value.
149 */
get_priority(TP_connection * c)150 static TP_PRIORITY get_priority(TP_connection *c)
151 {
152   DBUG_ASSERT(c->thd == current_thd);
153   TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
154   if (prio == TP_PRIORITY_AUTO)
155   {
156     return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
157   }
158   return prio;
159 }
160 
161 
tp_callback(TP_connection * c)162 void tp_callback(TP_connection *c)
163 {
164   DBUG_ASSERT(c);
165 
166   Worker_thread_context worker_context;
167   worker_context.save();
168 
169   THD *thd= c->thd;
170 
171   c->state = TP_STATE_RUNNING;
172 
173   if (unlikely(!thd))
174   {
175     /* No THD, need to login first. */
176     DBUG_ASSERT(c->connect);
177     thd= c->thd= threadpool_add_connection(c->connect, c);
178     if (!thd)
179     {
180       /* Bail out on connect error.*/
181       goto error;
182     }
183     c->connect= 0;
184   }
185   else if (threadpool_process_request(thd))
186   {
187     /* QUIT or an error occurred. */
188     goto error;
189   }
190 
191   /* Set priority */
192   c->priority= get_priority(c);
193 
194   /* Read next command from client. */
195   c->set_io_timeout(thd->get_net_wait_timeout());
196   c->state= TP_STATE_IDLE;
197   if (c->start_io())
198     goto error;
199 
200   worker_context.restore();
201   return;
202 
203 error:
204   c->thd= 0;
205   if (thd)
206   {
207     threadpool_remove_connection(thd);
208   }
209   delete c;
210   worker_context.restore();
211 }
212 
213 
threadpool_add_connection(CONNECT * connect,void * scheduler_data)214 static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
215 {
216   THD *thd= NULL;
217 
218   /*
219     Create a new connection context: mysys_thread_var and PSI thread
220     Store them in THD.
221   */
222 
223   set_mysys_var(NULL);
224   my_thread_init();
225   st_my_thread_var* mysys_var= my_thread_var;
226   if (!mysys_var ||!(thd= connect->create_thd(NULL)))
227   {
228     /* Out of memory? */
229     connect->close_and_delete();
230     if (mysys_var)
231     {
232 #ifdef HAVE_PSI_INTERFACE
233       /*
234        current PSI is still from worker thread.
235        Set to 0, to avoid premature cleanup by my_thread_end
236       */
237       if (PSI_server) PSI_server->set_thread(0);
238 #endif
239       my_thread_end();
240     }
241     return NULL;
242   }
243   delete connect;
244   add_to_active_threads(thd);
245   thd->set_mysys_var(mysys_var);
246   thd->event_scheduler.data= scheduler_data;
247 
248   /* Create new PSI thread for use with the THD. */
249   thd->event_scheduler.m_psi=
250     PSI_CALL_new_thread(key_thread_one_connection, thd, thd->thread_id);
251 
252 
253   /* Login. */
254   thread_attach(thd);
255   re_init_net_server_extension(thd);
256   ulonglong now= microsecond_interval_timer();
257   thd->prior_thr_create_utime= now;
258   thd->start_utime= now;
259   thd->thr_create_utime= now;
260 
261   if (setup_connection_thread_globals(thd))
262     goto end;
263 
264   if (thd_prepare_connection(thd))
265     goto end;
266 
267   /*
268     Check if THD is ok, as prepare_new_connection_state()
269     can fail, for example if init command failed.
270   */
271   if (!thd_is_connection_alive(thd))
272     goto end;
273 
274   thd->skip_wait_timeout= true;
275   set_thd_idle(thd);
276   return thd;
277 
278 end:
279   threadpool_remove_connection(thd);
280   return NULL;
281 }
282 
283 
threadpool_remove_connection(THD * thd)284 static void threadpool_remove_connection(THD *thd)
285 {
286   thread_attach(thd);
287   thd->event_scheduler.data= 0;
288   thd->net.reading_or_writing = 0;
289   end_connection(thd);
290   close_connection(thd, 0);
291   unlink_thd(thd);
292   delete thd;
293 
294   /*
295     Free resources associated with this connection:
296     mysys thread_var and PSI thread.
297   */
298   my_thread_end();
299 }
300 
301 
302 /*
303   Ensure that proper error message is sent to client,
304   and "aborted" message appears in the log in case of
305   wait timeout.
306 
307   See also timeout handling in net_serv.cc
308 */
handle_wait_timeout(THD * thd)309 static void handle_wait_timeout(THD *thd)
310 {
311   thd->get_stmt_da()->reset_diagnostics_area();
312   thd->reset_killed();
313   my_error(ER_NET_READ_INTERRUPTED, MYF(0));
314   thd->net.last_errno= ER_NET_READ_INTERRUPTED;
315   thd->net.error= 2;
316 }
317 
318 /** Check if some client data is cached in thd->net or thd->net.vio */
has_unread_data(THD * thd)319 static bool has_unread_data(THD* thd)
320 {
321   NET *net= &thd->net;
322   if (net->compress && net->remain_in_buf)
323     return true;
324   Vio *vio= net->vio;
325   return vio->has_data(vio);
326 }
327 
328 
329 /**
330  Process a single client request or a single batch.
331 */
threadpool_process_request(THD * thd)332 static int threadpool_process_request(THD *thd)
333 {
334   int retval= 0;
335   thread_attach(thd);
336 
337   if (thd->killed >= KILL_CONNECTION)
338   {
339     /*
340       killed flag was set by timeout handler
341       or KILL command. Return error.
342     */
343     retval= 1;
344     if(thd->killed == KILL_WAIT_TIMEOUT)
345       handle_wait_timeout(thd);
346     goto end;
347   }
348 
349 
350   /*
351     In the loop below, the flow is essentially the copy of
352     thead-per-connections
353     logic, see do_handle_one_connection() in sql_connect.c
354 
355     The goal is to execute a single query, thus the loop is normally executed
356     only once. However for SSL connections, it can be executed multiple times
357     (SSL can preread and cache incoming data, and vio->has_data() checks if it
358     was the case).
359   */
360   for(;;)
361   {
362     thd->net.reading_or_writing= 0;
363     if (mysql_audit_release_required(thd))
364       mysql_audit_release(thd);
365 
366     if ((retval= do_command(thd)) != 0)
367       goto end;
368 
369     if (!thd_is_connection_alive(thd))
370     {
371       retval= 1;
372       goto end;
373     }
374 
375     set_thd_idle(thd);
376 
377     if (!has_unread_data(thd))
378     {
379       /* More info on this debug sync is in sql_parse.cc*/
380       DEBUG_SYNC(thd, "before_do_command_net_read");
381       goto end;
382     }
383   }
384 
385 end:
386   return retval;
387 }
388 
389 
390 
391 /* Dummy functions, do nothing */
392 
tp_init_new_connection_thread()393 static bool tp_init_new_connection_thread()
394 {
395   return 0;
396 }
397 
tp_end_thread(THD *,bool)398 static bool tp_end_thread(THD *, bool)
399 {
400   return 0;
401 }
402 
403 static TP_pool *pool;
404 
tp_init()405 static bool tp_init()
406 {
407 
408 #ifdef _WIN32
409   if (threadpool_mode == TP_MODE_WINDOWS)
410     pool= new (std::nothrow) TP_pool_win;
411   else
412     pool= new (std::nothrow) TP_pool_generic;
413 #else
414   pool= new (std::nothrow) TP_pool_generic;
415 #endif
416   if (!pool)
417     return true;
418   if (pool->init())
419   {
420     delete pool;
421     pool= 0;
422     return true;
423   }
424   return false;
425 }
426 
tp_add_connection(CONNECT * connect)427 static void tp_add_connection(CONNECT *connect)
428 {
429   TP_connection *c= pool->new_connection(connect);
430   DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
431   if (c)
432     pool->add(c);
433   else
434     connect->close_and_delete();
435 }
436 
tp_get_idle_thread_count()437 int tp_get_idle_thread_count()
438 {
439   return pool? pool->get_idle_thread_count(): 0;
440 }
441 
tp_get_thread_count()442 int tp_get_thread_count()
443 {
444   return pool ? pool->get_thread_count() : 0;
445 }
446 
tp_set_min_threads(uint val)447 void tp_set_min_threads(uint val)
448 {
449   if (pool)
450     pool->set_min_threads(val);
451 }
452 
453 
tp_set_max_threads(uint val)454 void tp_set_max_threads(uint val)
455 {
456   if (pool)
457     pool->set_max_threads(val);
458 }
459 
tp_set_threadpool_size(uint val)460 void tp_set_threadpool_size(uint val)
461 {
462   if (pool)
463     pool->set_pool_size(val);
464 }
465 
466 
tp_set_threadpool_stall_limit(uint val)467 void tp_set_threadpool_stall_limit(uint val)
468 {
469   if (pool)
470     pool->set_stall_limit(val);
471 }
472 
473 
tp_timeout_handler(TP_connection * c)474 void tp_timeout_handler(TP_connection *c)
475 {
476   if (c->state != TP_STATE_IDLE)
477     return;
478   THD *thd= c->thd;
479   mysql_mutex_lock(&thd->LOCK_thd_kill);
480   Vio *vio= thd->net.vio;
481   if (vio && (vio_pending(vio) > 0 || vio->has_data(vio)) &&
482       c->state == TP_STATE_IDLE)
483   {
484     /*
485      There is some data on that connection, i.e
486      i.e there was no inactivity timeout.
487      Don't kill.
488     */
489     c->state= TP_STATE_PENDING;
490   }
491   else if (c->state == TP_STATE_IDLE)
492   {
493     thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
494     c->priority= TP_PRIORITY_HIGH;
495     post_kill_notification(thd);
496   }
497   mysql_mutex_unlock(&thd->LOCK_thd_kill);
498 }
499 
500 
tp_wait_begin(THD * thd,int type)501 static void tp_wait_begin(THD *thd, int type)
502 {
503   TP_connection *c = get_TP_connection(thd);
504   if (c)
505     c->wait_begin(type);
506 }
507 
508 
tp_wait_end(THD * thd)509 static void tp_wait_end(THD *thd)
510 {
511   TP_connection *c = get_TP_connection(thd);
512   if (c)
513     c->wait_end();
514 }
515 
516 
tp_end()517 static void tp_end()
518 {
519   delete pool;
520 }
521 
tp_post_kill_notification(THD * thd)522 static void tp_post_kill_notification(THD *thd)
523 {
524   TP_connection *c= get_TP_connection(thd);
525   if (c)
526     c->priority= TP_PRIORITY_HIGH;
527   post_kill_notification(thd);
528 }
529 
530 static scheduler_functions tp_scheduler_functions=
531 {
532   0,                                  // max_threads
533   NULL,
534   NULL,
535   tp_init,                            // init
536   tp_init_new_connection_thread,      // init_new_connection_thread
537   tp_add_connection,                  // add_connection
538   tp_wait_begin,                      // thd_wait_begin
539   tp_wait_end,                        // thd_wait_end
540   tp_post_kill_notification,          // post kill notification
541   tp_end_thread,                      // Dummy function
542   tp_end                              // end
543 };
544 
pool_of_threads_scheduler(struct scheduler_functions * func,ulong * arg_max_connections,uint * arg_connection_count)545 void pool_of_threads_scheduler(struct scheduler_functions *func,
546     ulong *arg_max_connections,
547     uint *arg_connection_count)
548 {
549   *func = tp_scheduler_functions;
550   func->max_threads= threadpool_max_threads;
551   func->max_connections= arg_max_connections;
552   func->connection_count= arg_connection_count;
553   scheduler_init();
554 }
555