1 /* Copyright (C) 2012, 2020, MariaDB
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #include "mariadb.h"
17 #include <violite.h>
18 #include <sql_priv.h>
19 #include <sql_class.h>
20 #include <my_pthread.h>
21 #include <scheduler.h>
22 #include <sql_connect.h>
23 #include <sql_audit.h>
24 #include <debug_sync.h>
25 #include <threadpool.h>
26
27
28 /* Threadpool parameters */
29
30 uint threadpool_min_threads;
31 uint threadpool_idle_timeout;
32 uint threadpool_size;
33 uint threadpool_max_size;
34 uint threadpool_stall_limit;
35 uint threadpool_max_threads;
36 uint threadpool_oversubscribe;
37 uint threadpool_mode;
38 uint threadpool_prio_kickup_timer;
39
40 /* Stats */
41 TP_STATISTICS tp_stats;
42
43
44 static void threadpool_remove_connection(THD *thd);
45 static int threadpool_process_request(THD *thd);
46 static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
47
48 extern bool do_command(THD*);
49
get_TP_connection(THD * thd)50 static inline TP_connection *get_TP_connection(THD *thd)
51 {
52 return (TP_connection *)thd->event_scheduler.data;
53 }
54
55 /*
56 Worker threads contexts, and THD contexts.
57 =========================================
58
59 Both worker threads and connections have their sets of thread local variables
60 At the moment it is mysys_var (this has specific data for dbug, my_error and
61 similar goodies), and PSI per-client structure.
62
63 Whenever query is executed following needs to be done:
64
65 1. Save worker thread context.
66 2. Change TLS variables to connection specific ones using thread_attach(THD*).
67 This function does some additional work , e.g setting up
68 thread_stack/thread_ends_here pointers.
69 3. Process query
70 4. Restore worker thread context.
71
72 Connection login and termination follows similar schema w.r.t saving and
73 restoring contexts.
74
75 For both worker thread, and for the connection, mysys variables are created
76 using my_thread_init() and freed with my_thread_end().
77
78 */
79 struct Worker_thread_context
80 {
81 PSI_thread *psi_thread;
82 st_my_thread_var* mysys_var;
83
saveWorker_thread_context84 void save()
85 {
86 psi_thread= PSI_CALL_get_thread();
87 mysys_var= my_thread_var;
88 }
89
restoreWorker_thread_context90 void restore()
91 {
92 PSI_CALL_set_thread(psi_thread);
93 set_mysys_var(mysys_var);
94 pthread_setspecific(THR_THD, 0);
95 }
96 };
97
98
99 #ifdef HAVE_PSI_INTERFACE
100
101 /*
102 The following fixes PSI "idle" psi instrumentation.
103 The server assumes that connection becomes idle
104 just before net_read_packet() and switches to active after it.
105 In out setup, server becomes idle when async socket io is made.
106 */
107
108 extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
109
dummy_before_header(struct st_net *,void *,size_t)110 static void dummy_before_header(struct st_net *, void *, size_t)
111 {
112 }
113
re_init_net_server_extension(THD * thd)114 static void re_init_net_server_extension(THD *thd)
115 {
116 thd->m_net_server_extension.m_before_header = dummy_before_header;
117 }
118
119 #else
120
121 #define re_init_net_server_extension(thd)
122
123 #endif /* HAVE_PSI_INTERFACE */
124
125
set_thd_idle(THD * thd)126 static inline void set_thd_idle(THD *thd)
127 {
128 thd->net.reading_or_writing= 1;
129 #ifdef HAVE_PSI_INTERFACE
130 net_before_header_psi(&thd->net, thd, 0);
131 #endif
132 }
133
134 /*
135 Attach/associate the connection with the OS thread,
136 */
thread_attach(THD * thd)137 static void thread_attach(THD* thd)
138 {
139 set_mysys_var(thd->mysys_var);
140 thd->thread_stack=(char*)&thd;
141 thd->store_globals();
142 PSI_CALL_set_thread(thd->event_scheduler.m_psi);
143 mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
144 }
145
146 /*
147 Determine connection priority , using current
148 transaction state and 'threadpool_priority' variable value.
149 */
get_priority(TP_connection * c)150 static TP_PRIORITY get_priority(TP_connection *c)
151 {
152 DBUG_ASSERT(c->thd == current_thd);
153 TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
154 if (prio == TP_PRIORITY_AUTO)
155 {
156 return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
157 }
158 return prio;
159 }
160
161
tp_callback(TP_connection * c)162 void tp_callback(TP_connection *c)
163 {
164 DBUG_ASSERT(c);
165
166 Worker_thread_context worker_context;
167 worker_context.save();
168
169 THD *thd= c->thd;
170
171 c->state = TP_STATE_RUNNING;
172
173 if (unlikely(!thd))
174 {
175 /* No THD, need to login first. */
176 DBUG_ASSERT(c->connect);
177 thd= c->thd= threadpool_add_connection(c->connect, c);
178 if (!thd)
179 {
180 /* Bail out on connect error.*/
181 goto error;
182 }
183 c->connect= 0;
184 }
185 else if (threadpool_process_request(thd))
186 {
187 /* QUIT or an error occurred. */
188 goto error;
189 }
190
191 /* Set priority */
192 c->priority= get_priority(c);
193
194 /* Read next command from client. */
195 c->set_io_timeout(thd->get_net_wait_timeout());
196 c->state= TP_STATE_IDLE;
197 if (c->start_io())
198 goto error;
199
200 worker_context.restore();
201 return;
202
203 error:
204 c->thd= 0;
205 if (thd)
206 {
207 threadpool_remove_connection(thd);
208 }
209 delete c;
210 worker_context.restore();
211 }
212
213
threadpool_add_connection(CONNECT * connect,void * scheduler_data)214 static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
215 {
216 THD *thd= NULL;
217
218 /*
219 Create a new connection context: mysys_thread_var and PSI thread
220 Store them in THD.
221 */
222
223 set_mysys_var(NULL);
224 my_thread_init();
225 st_my_thread_var* mysys_var= my_thread_var;
226 if (!mysys_var ||!(thd= connect->create_thd(NULL)))
227 {
228 /* Out of memory? */
229 connect->close_and_delete();
230 if (mysys_var)
231 {
232 #ifdef HAVE_PSI_INTERFACE
233 /*
234 current PSI is still from worker thread.
235 Set to 0, to avoid premature cleanup by my_thread_end
236 */
237 if (PSI_server) PSI_server->set_thread(0);
238 #endif
239 my_thread_end();
240 }
241 return NULL;
242 }
243 delete connect;
244 add_to_active_threads(thd);
245 thd->set_mysys_var(mysys_var);
246 thd->event_scheduler.data= scheduler_data;
247
248 /* Create new PSI thread for use with the THD. */
249 thd->event_scheduler.m_psi=
250 PSI_CALL_new_thread(key_thread_one_connection, thd, thd->thread_id);
251
252
253 /* Login. */
254 thread_attach(thd);
255 re_init_net_server_extension(thd);
256 ulonglong now= microsecond_interval_timer();
257 thd->prior_thr_create_utime= now;
258 thd->start_utime= now;
259 thd->thr_create_utime= now;
260
261 if (setup_connection_thread_globals(thd))
262 goto end;
263
264 if (thd_prepare_connection(thd))
265 goto end;
266
267 /*
268 Check if THD is ok, as prepare_new_connection_state()
269 can fail, for example if init command failed.
270 */
271 if (!thd_is_connection_alive(thd))
272 goto end;
273
274 thd->skip_wait_timeout= true;
275 set_thd_idle(thd);
276 return thd;
277
278 end:
279 threadpool_remove_connection(thd);
280 return NULL;
281 }
282
283
threadpool_remove_connection(THD * thd)284 static void threadpool_remove_connection(THD *thd)
285 {
286 thread_attach(thd);
287 thd->event_scheduler.data= 0;
288 thd->net.reading_or_writing = 0;
289 end_connection(thd);
290 close_connection(thd, 0);
291 unlink_thd(thd);
292 delete thd;
293
294 /*
295 Free resources associated with this connection:
296 mysys thread_var and PSI thread.
297 */
298 my_thread_end();
299 }
300
301
302 /*
303 Ensure that proper error message is sent to client,
304 and "aborted" message appears in the log in case of
305 wait timeout.
306
307 See also timeout handling in net_serv.cc
308 */
handle_wait_timeout(THD * thd)309 static void handle_wait_timeout(THD *thd)
310 {
311 thd->get_stmt_da()->reset_diagnostics_area();
312 thd->reset_killed();
313 my_error(ER_NET_READ_INTERRUPTED, MYF(0));
314 thd->net.last_errno= ER_NET_READ_INTERRUPTED;
315 thd->net.error= 2;
316 }
317
318 /** Check if some client data is cached in thd->net or thd->net.vio */
has_unread_data(THD * thd)319 static bool has_unread_data(THD* thd)
320 {
321 NET *net= &thd->net;
322 if (net->compress && net->remain_in_buf)
323 return true;
324 Vio *vio= net->vio;
325 return vio->has_data(vio);
326 }
327
328
329 /**
330 Process a single client request or a single batch.
331 */
threadpool_process_request(THD * thd)332 static int threadpool_process_request(THD *thd)
333 {
334 int retval= 0;
335 thread_attach(thd);
336
337 if (thd->killed >= KILL_CONNECTION)
338 {
339 /*
340 killed flag was set by timeout handler
341 or KILL command. Return error.
342 */
343 retval= 1;
344 if(thd->killed == KILL_WAIT_TIMEOUT)
345 handle_wait_timeout(thd);
346 goto end;
347 }
348
349
350 /*
351 In the loop below, the flow is essentially the copy of
352 thead-per-connections
353 logic, see do_handle_one_connection() in sql_connect.c
354
355 The goal is to execute a single query, thus the loop is normally executed
356 only once. However for SSL connections, it can be executed multiple times
357 (SSL can preread and cache incoming data, and vio->has_data() checks if it
358 was the case).
359 */
360 for(;;)
361 {
362 thd->net.reading_or_writing= 0;
363 if (mysql_audit_release_required(thd))
364 mysql_audit_release(thd);
365
366 if ((retval= do_command(thd)) != 0)
367 goto end;
368
369 if (!thd_is_connection_alive(thd))
370 {
371 retval= 1;
372 goto end;
373 }
374
375 set_thd_idle(thd);
376
377 if (!has_unread_data(thd))
378 {
379 /* More info on this debug sync is in sql_parse.cc*/
380 DEBUG_SYNC(thd, "before_do_command_net_read");
381 goto end;
382 }
383 }
384
385 end:
386 return retval;
387 }
388
389
390
391 /* Dummy functions, do nothing */
392
tp_init_new_connection_thread()393 static bool tp_init_new_connection_thread()
394 {
395 return 0;
396 }
397
tp_end_thread(THD *,bool)398 static bool tp_end_thread(THD *, bool)
399 {
400 return 0;
401 }
402
403 static TP_pool *pool;
404
tp_init()405 static bool tp_init()
406 {
407
408 #ifdef _WIN32
409 if (threadpool_mode == TP_MODE_WINDOWS)
410 pool= new (std::nothrow) TP_pool_win;
411 else
412 pool= new (std::nothrow) TP_pool_generic;
413 #else
414 pool= new (std::nothrow) TP_pool_generic;
415 #endif
416 if (!pool)
417 return true;
418 if (pool->init())
419 {
420 delete pool;
421 pool= 0;
422 return true;
423 }
424 return false;
425 }
426
tp_add_connection(CONNECT * connect)427 static void tp_add_connection(CONNECT *connect)
428 {
429 TP_connection *c= pool->new_connection(connect);
430 DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
431 if (c)
432 pool->add(c);
433 else
434 connect->close_and_delete();
435 }
436
tp_get_idle_thread_count()437 int tp_get_idle_thread_count()
438 {
439 return pool? pool->get_idle_thread_count(): 0;
440 }
441
tp_get_thread_count()442 int tp_get_thread_count()
443 {
444 return pool ? pool->get_thread_count() : 0;
445 }
446
tp_set_min_threads(uint val)447 void tp_set_min_threads(uint val)
448 {
449 if (pool)
450 pool->set_min_threads(val);
451 }
452
453
tp_set_max_threads(uint val)454 void tp_set_max_threads(uint val)
455 {
456 if (pool)
457 pool->set_max_threads(val);
458 }
459
tp_set_threadpool_size(uint val)460 void tp_set_threadpool_size(uint val)
461 {
462 if (pool)
463 pool->set_pool_size(val);
464 }
465
466
tp_set_threadpool_stall_limit(uint val)467 void tp_set_threadpool_stall_limit(uint val)
468 {
469 if (pool)
470 pool->set_stall_limit(val);
471 }
472
473
tp_timeout_handler(TP_connection * c)474 void tp_timeout_handler(TP_connection *c)
475 {
476 if (c->state != TP_STATE_IDLE)
477 return;
478 THD *thd= c->thd;
479 mysql_mutex_lock(&thd->LOCK_thd_kill);
480 Vio *vio= thd->net.vio;
481 if (vio && (vio_pending(vio) > 0 || vio->has_data(vio)) &&
482 c->state == TP_STATE_IDLE)
483 {
484 /*
485 There is some data on that connection, i.e
486 i.e there was no inactivity timeout.
487 Don't kill.
488 */
489 c->state= TP_STATE_PENDING;
490 }
491 else if (c->state == TP_STATE_IDLE)
492 {
493 thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
494 c->priority= TP_PRIORITY_HIGH;
495 post_kill_notification(thd);
496 }
497 mysql_mutex_unlock(&thd->LOCK_thd_kill);
498 }
499
500
tp_wait_begin(THD * thd,int type)501 static void tp_wait_begin(THD *thd, int type)
502 {
503 TP_connection *c = get_TP_connection(thd);
504 if (c)
505 c->wait_begin(type);
506 }
507
508
tp_wait_end(THD * thd)509 static void tp_wait_end(THD *thd)
510 {
511 TP_connection *c = get_TP_connection(thd);
512 if (c)
513 c->wait_end();
514 }
515
516
tp_end()517 static void tp_end()
518 {
519 delete pool;
520 }
521
tp_post_kill_notification(THD * thd)522 static void tp_post_kill_notification(THD *thd)
523 {
524 TP_connection *c= get_TP_connection(thd);
525 if (c)
526 c->priority= TP_PRIORITY_HIGH;
527 post_kill_notification(thd);
528 }
529
530 static scheduler_functions tp_scheduler_functions=
531 {
532 0, // max_threads
533 NULL,
534 NULL,
535 tp_init, // init
536 tp_init_new_connection_thread, // init_new_connection_thread
537 tp_add_connection, // add_connection
538 tp_wait_begin, // thd_wait_begin
539 tp_wait_end, // thd_wait_end
540 tp_post_kill_notification, // post kill notification
541 tp_end_thread, // Dummy function
542 tp_end // end
543 };
544
pool_of_threads_scheduler(struct scheduler_functions * func,ulong * arg_max_connections,uint * arg_connection_count)545 void pool_of_threads_scheduler(struct scheduler_functions *func,
546 ulong *arg_max_connections,
547 uint *arg_connection_count)
548 {
549 *func = tp_scheduler_functions;
550 func->max_threads= threadpool_max_threads;
551 func->max_connections= arg_max_connections;
552 func->connection_count= arg_connection_count;
553 scheduler_init();
554 }
555