1 /* Copyright (C) 2012, 2020, MariaDB
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #include "mariadb.h"
17 #include <violite.h>
18 #include <sql_priv.h>
19 #include <sql_class.h>
20 #include <my_pthread.h>
21 #include <scheduler.h>
22 #include <sql_connect.h>
23 #include <sql_audit.h>
24 #include <debug_sync.h>
25 #include <threadpool.h>
26 
27 #ifdef WITH_WSREP
28 #include "wsrep_trans_observer.h"
29 #endif /* WITH_WSREP */
30 
31 /* Threadpool parameters */
32 
33 uint threadpool_min_threads;
34 uint threadpool_idle_timeout;
35 uint threadpool_size;
36 uint threadpool_max_size;
37 uint threadpool_stall_limit;
38 uint threadpool_max_threads;
39 uint threadpool_oversubscribe;
40 uint threadpool_mode;
41 uint threadpool_prio_kickup_timer;
42 my_bool threadpool_exact_stats;
43 my_bool threadpool_dedicated_listener;
44 
45 /* Stats */
46 TP_STATISTICS tp_stats;
47 
48 
49 static void  threadpool_remove_connection(THD *thd);
50 static int   threadpool_process_request(THD *thd);
51 static THD*  threadpool_add_connection(CONNECT *connect, void *scheduler_data);
52 
53 extern bool do_command(THD*);
54 
get_TP_connection(THD * thd)55 static inline TP_connection *get_TP_connection(THD *thd)
56 {
57   return (TP_connection *)thd->event_scheduler.data;
58 }
59 
60 /*
61   Worker threads contexts, and THD contexts.
62   =========================================
63 
64   Both worker threads and connections have their sets of thread local variables
65   At the moment it is mysys_var (this has specific data for dbug, my_error and
66   similar goodies), and PSI per-client structure.
67 
68   Whenever query is executed following needs to be done:
69 
70   1. Save worker thread context.
71   2. Change TLS variables to connection specific ones using thread_attach(THD*).
72      This function does some additional work , e.g setting up
73      thread_stack/thread_ends_here pointers.
74   3. Process query
75   4. Restore worker thread context.
76 
77   Connection login and termination follows similar schema w.r.t saving and
78   restoring contexts.
79 
80   For both worker thread, and for the connection, mysys variables are created
81   using my_thread_init() and freed with my_thread_end().
82 
83 */
84 struct Worker_thread_context
85 {
86   PSI_thread *psi_thread;
87   st_my_thread_var* mysys_var;
88 
saveWorker_thread_context89   void save()
90   {
91     psi_thread= PSI_CALL_get_thread();
92     mysys_var= my_thread_var;
93   }
94 
restoreWorker_thread_context95   void restore()
96   {
97     PSI_CALL_set_thread(psi_thread);
98     set_mysys_var(mysys_var);
99     set_current_thd(nullptr);
100   }
101 };
102 
103 
104 #ifdef HAVE_PSI_INTERFACE
105 
106 /*
107   The following fixes PSI "idle" psi instrumentation.
108   The server assumes that connection  becomes idle
109   just before net_read_packet() and switches to active after it.
110   In out setup, server becomes idle when async socket io is made.
111 */
112 
113 extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
114 
dummy_before_header(struct st_net *,void *,size_t)115 static void dummy_before_header(struct st_net *, void *, size_t)
116 {
117 }
118 
re_init_net_server_extension(THD * thd)119 static void re_init_net_server_extension(THD *thd)
120 {
121   thd->m_net_server_extension.m_before_header = dummy_before_header;
122 }
123 
124 #else
125 
126 #define re_init_net_server_extension(thd)
127 
128 #endif /* HAVE_PSI_INTERFACE */
129 
130 
set_thd_idle(THD * thd)131 static inline void set_thd_idle(THD *thd)
132 {
133   thd->net.reading_or_writing= 1;
134 #ifdef HAVE_PSI_INTERFACE
135   net_before_header_psi(&thd->net, thd, 0);
136 #endif
137 }
138 
139 /*
140   Attach/associate the connection with the OS thread,
141 */
thread_attach(THD * thd)142 static void thread_attach(THD* thd)
143 {
144 #ifdef WITH_WSREP
145   /* Wait until possible background rollback has finished before
146      attaching the thd. */
147   wsrep_wait_rollback_complete_and_acquire_ownership(thd);
148 #endif /* WITH_WSREP */
149   set_mysys_var(thd->mysys_var);
150   thd->thread_stack=(char*)&thd;
151   thd->store_globals();
152   PSI_CALL_set_thread(thd->get_psi());
153   mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
154 }
155 
156 /*
157   Determine connection priority , using current
158   transaction state and 'threadpool_priority' variable value.
159 */
get_priority(TP_connection * c)160 static TP_PRIORITY get_priority(TP_connection *c)
161 {
162   DBUG_ASSERT(c->thd == current_thd);
163   TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
164   if (prio == TP_PRIORITY_AUTO)
165     prio= c->thd->transaction->is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
166 
167   return prio;
168 }
169 
170 
tp_callback(TP_connection * c)171 void tp_callback(TP_connection *c)
172 {
173   DBUG_ASSERT(c);
174 
175   Worker_thread_context worker_context;
176   worker_context.save();
177 
178   THD *thd= c->thd;
179 
180   c->state = TP_STATE_RUNNING;
181 
182   if (unlikely(!thd))
183   {
184     /* No THD, need to login first. */
185     DBUG_ASSERT(c->connect);
186     thd= c->thd= threadpool_add_connection(c->connect, c);
187     if (!thd)
188     {
189       /* Bail out on connect error.*/
190       goto error;
191     }
192     c->connect= 0;
193   }
194   else if (threadpool_process_request(thd))
195   {
196     /* QUIT or an error occurred. */
197     goto error;
198   }
199 
200   /* Set priority */
201   c->priority= get_priority(c);
202 
203   /* Read next command from client. */
204   c->set_io_timeout(thd->get_net_wait_timeout());
205   c->state= TP_STATE_IDLE;
206   if (c->start_io())
207     goto error;
208 
209   worker_context.restore();
210   return;
211 
212 error:
213   c->thd= 0;
214   if (thd)
215   {
216     threadpool_remove_connection(thd);
217   }
218   delete c;
219   worker_context.restore();
220 }
221 
222 
threadpool_add_connection(CONNECT * connect,void * scheduler_data)223 static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
224 {
225   THD *thd= NULL;
226 
227   DBUG_EXECUTE_IF("CONNECT_wait",
228   {
229     extern MYSQL_SOCKET unix_sock;
230     DBUG_ASSERT(unix_sock.fd >= 0);
231     while (unix_sock.fd >= 0)
232       my_sleep(1000);
233   });
234 
235   /*
236     Create a new connection context: mysys_thread_var and PSI thread
237     Store them in THD.
238   */
239 
240   set_mysys_var(NULL);
241   my_thread_init();
242   st_my_thread_var* mysys_var= my_thread_var;
243   PSI_CALL_set_thread(PSI_CALL_new_thread(key_thread_one_connection, connect, 0));
244   if (!mysys_var ||!(thd= connect->create_thd(NULL)))
245   {
246     /* Out of memory? */
247     connect->close_and_delete();
248     if (mysys_var)
249       my_thread_end();
250     return NULL;
251   }
252   thd->event_scheduler.data = scheduler_data;
253   server_threads.insert(thd); // Make THD visible in show processlist
254   delete connect; // must be after server_threads.insert, see close_connections()
255   thd->set_mysys_var(mysys_var);
256 
257 
258   /* Login. */
259   thread_attach(thd);
260   re_init_net_server_extension(thd);
261   ulonglong now= microsecond_interval_timer();
262   thd->prior_thr_create_utime= now;
263   thd->start_utime= now;
264   thd->thr_create_utime= now;
265 
266   setup_connection_thread_globals(thd);
267 
268   if (thd_prepare_connection(thd))
269     goto end;
270 
271   /*
272     Check if THD is ok, as prepare_new_connection_state()
273     can fail, for example if init command failed.
274   */
275   if (!thd_is_connection_alive(thd))
276     goto end;
277 
278   thd->skip_wait_timeout= true;
279   set_thd_idle(thd);
280   return thd;
281 
282 end:
283   threadpool_remove_connection(thd);
284   return NULL;
285 }
286 
287 
threadpool_remove_connection(THD * thd)288 static void threadpool_remove_connection(THD *thd)
289 {
290   thread_attach(thd);
291   thd->net.reading_or_writing = 0;
292   end_connection(thd);
293   close_connection(thd, 0);
294   unlink_thd(thd);
295   PSI_CALL_delete_current_thread(); // before THD is destroyed
296   delete thd;
297 
298   /*
299     Free resources associated with this connection:
300     mysys thread_var and PSI thread.
301   */
302   my_thread_end();
303 }
304 
305 
306 /*
307   Ensure that proper error message is sent to client,
308   and "aborted" message appears in the log in case of
309   wait timeout.
310 
311   See also timeout handling in net_serv.cc
312 */
handle_wait_timeout(THD * thd)313 static void handle_wait_timeout(THD *thd)
314 {
315   thd->get_stmt_da()->reset_diagnostics_area();
316   thd->reset_killed();
317   my_error(ER_NET_READ_INTERRUPTED, MYF(0));
318   thd->net.last_errno= ER_NET_READ_INTERRUPTED;
319   thd->net.error= 2;
320 }
321 
322 /** Check if some client data is cached in thd->net or thd->net.vio */
has_unread_data(THD * thd)323 static bool has_unread_data(THD* thd)
324 {
325   NET *net= &thd->net;
326   if (net->compress && net->remain_in_buf)
327     return true;
328   Vio *vio= net->vio;
329   return vio->has_data(vio);
330 }
331 
332 
333 /**
334  Process a single client request or a single batch.
335 */
threadpool_process_request(THD * thd)336 static int threadpool_process_request(THD *thd)
337 {
338   int retval= 0;
339   thread_attach(thd);
340 
341   if (thd->killed >= KILL_CONNECTION)
342   {
343     /*
344       killed flag was set by timeout handler
345       or KILL command. Return error.
346     */
347     retval= 1;
348     if(thd->killed == KILL_WAIT_TIMEOUT)
349       handle_wait_timeout(thd);
350     goto end;
351   }
352 
353 
354   /*
355     In the loop below, the flow is essentially the copy of
356     thead-per-connections
357     logic, see do_handle_one_connection() in sql_connect.c
358 
359     The goal is to execute a single query, thus the loop is normally executed
360     only once. However for SSL connections, it can be executed multiple times
361     (SSL can preread and cache incoming data, and vio->has_data() checks if it
362     was the case).
363   */
364   for(;;)
365   {
366     thd->net.reading_or_writing= 0;
367     if (mysql_audit_release_required(thd))
368       mysql_audit_release(thd);
369 
370     if ((retval= do_command(thd)) != 0)
371       goto end;
372 
373     if (!thd_is_connection_alive(thd))
374     {
375       retval= 1;
376       goto end;
377     }
378 
379     set_thd_idle(thd);
380 
381     if (!has_unread_data(thd))
382     {
383       /* More info on this debug sync is in sql_parse.cc*/
384       DEBUG_SYNC(thd, "before_do_command_net_read");
385       goto end;
386     }
387   }
388 
389 end:
390   return retval;
391 }
392 
393 
394 static TP_pool *pool;
395 
tp_init()396 static bool tp_init()
397 {
398 
399 #ifdef _WIN32
400   if (threadpool_mode == TP_MODE_WINDOWS)
401     pool= new (std::nothrow) TP_pool_win;
402   else
403     pool= new (std::nothrow) TP_pool_generic;
404 #else
405   pool= new (std::nothrow) TP_pool_generic;
406 #endif
407   if (!pool)
408     return true;
409   if (pool->init())
410   {
411     delete pool;
412     pool= 0;
413     return true;
414   }
415   return false;
416 }
417 
tp_add_connection(CONNECT * connect)418 static void tp_add_connection(CONNECT *connect)
419 {
420   TP_connection *c= pool->new_connection(connect);
421   DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
422   if (c)
423     pool->add(c);
424   else
425     connect->close_and_delete();
426 }
427 
tp_get_idle_thread_count()428 int tp_get_idle_thread_count()
429 {
430   return pool? pool->get_idle_thread_count(): 0;
431 }
432 
tp_get_thread_count()433 int tp_get_thread_count()
434 {
435   return pool ? pool->get_thread_count() : 0;
436 }
437 
tp_set_min_threads(uint val)438 void tp_set_min_threads(uint val)
439 {
440   if (pool)
441     pool->set_min_threads(val);
442 }
443 
444 
tp_set_max_threads(uint val)445 void tp_set_max_threads(uint val)
446 {
447   if (pool)
448     pool->set_max_threads(val);
449 }
450 
tp_set_threadpool_size(uint val)451 void tp_set_threadpool_size(uint val)
452 {
453   if (pool)
454     pool->set_pool_size(val);
455 }
456 
457 
tp_set_threadpool_stall_limit(uint val)458 void tp_set_threadpool_stall_limit(uint val)
459 {
460   if (pool)
461     pool->set_stall_limit(val);
462 }
463 
464 
tp_timeout_handler(TP_connection * c)465 void tp_timeout_handler(TP_connection *c)
466 {
467   if (c->state != TP_STATE_IDLE)
468     return;
469   THD *thd= c->thd;
470   mysql_mutex_lock(&thd->LOCK_thd_kill);
471   Vio *vio= thd->net.vio;
472   if (vio && (vio_pending(vio) > 0 || vio->has_data(vio)) &&
473       c->state == TP_STATE_IDLE)
474   {
475     /*
476      There is some data on that connection, i.e
477      i.e there was no inactivity timeout.
478      Don't kill.
479     */
480     c->state= TP_STATE_PENDING;
481   }
482   else if (c->state == TP_STATE_IDLE)
483   {
484     thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
485     c->priority= TP_PRIORITY_HIGH;
486     post_kill_notification(thd);
487   }
488   mysql_mutex_unlock(&thd->LOCK_thd_kill);
489 }
490 
491 MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
492 
tp_wait_begin(THD * thd,int type)493 static void tp_wait_begin(THD *thd, int type)
494 {
495   TP_connection *c = get_TP_connection(thd);
496   if (c)
497   {
498     DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST);
499     tp_waits[type]++;
500     c->wait_begin(type);
501   }
502 }
503 
504 
tp_wait_end(THD * thd)505 static void tp_wait_end(THD *thd)
506 {
507   TP_connection *c = get_TP_connection(thd);
508   if (c)
509     c->wait_end();
510 }
511 
512 
tp_end()513 static void tp_end()
514 {
515   delete pool;
516 }
517 
tp_post_kill_notification(THD * thd)518 static void tp_post_kill_notification(THD *thd)
519 {
520   TP_connection *c= get_TP_connection(thd);
521   if (c)
522     c->priority= TP_PRIORITY_HIGH;
523   post_kill_notification(thd);
524 }
525 
526 static scheduler_functions tp_scheduler_functions=
527 {
528   0,                                  // max_threads
529   NULL,
530   NULL,
531   tp_init,                            // init
532   tp_add_connection,                  // add_connection
533   tp_wait_begin,                      // thd_wait_begin
534   tp_wait_end,                        // thd_wait_end
535   tp_post_kill_notification,          // post kill notification
536   tp_end                              // end
537 };
538 
pool_of_threads_scheduler(struct scheduler_functions * func,ulong * arg_max_connections,Atomic_counter<uint> * arg_connection_count)539 void pool_of_threads_scheduler(struct scheduler_functions *func,
540     ulong *arg_max_connections,
541     Atomic_counter<uint> *arg_connection_count)
542 {
543   *func = tp_scheduler_functions;
544   func->max_threads= threadpool_max_threads;
545   func->max_connections= arg_max_connections;
546   func->connection_count= arg_connection_count;
547   scheduler_init();
548 }
549