1 /* Copyright (C) 2012, 2020, MariaDB
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #include "mariadb.h"
17 #include <violite.h>
18 #include <sql_priv.h>
19 #include <sql_class.h>
20 #include <my_pthread.h>
21 #include <scheduler.h>
22 #include <sql_connect.h>
23 #include <sql_audit.h>
24 #include <debug_sync.h>
25 #include <threadpool.h>
26
27 #ifdef WITH_WSREP
28 #include "wsrep_trans_observer.h"
29 #endif /* WITH_WSREP */
30
31 /* Threadpool parameters */
32
33 uint threadpool_min_threads;
34 uint threadpool_idle_timeout;
35 uint threadpool_size;
36 uint threadpool_max_size;
37 uint threadpool_stall_limit;
38 uint threadpool_max_threads;
39 uint threadpool_oversubscribe;
40 uint threadpool_mode;
41 uint threadpool_prio_kickup_timer;
42 my_bool threadpool_exact_stats;
43 my_bool threadpool_dedicated_listener;
44
45 /* Stats */
46 TP_STATISTICS tp_stats;
47
48
49 static void threadpool_remove_connection(THD *thd);
50 static int threadpool_process_request(THD *thd);
51 static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
52
53 extern bool do_command(THD*);
54
get_TP_connection(THD * thd)55 static inline TP_connection *get_TP_connection(THD *thd)
56 {
57 return (TP_connection *)thd->event_scheduler.data;
58 }
59
60 /*
61 Worker threads contexts, and THD contexts.
62 =========================================
63
64 Both worker threads and connections have their sets of thread local variables
65 At the moment it is mysys_var (this has specific data for dbug, my_error and
66 similar goodies), and PSI per-client structure.
67
68 Whenever query is executed following needs to be done:
69
70 1. Save worker thread context.
71 2. Change TLS variables to connection specific ones using thread_attach(THD*).
72 This function does some additional work , e.g setting up
73 thread_stack/thread_ends_here pointers.
74 3. Process query
75 4. Restore worker thread context.
76
77 Connection login and termination follows similar schema w.r.t saving and
78 restoring contexts.
79
80 For both worker thread, and for the connection, mysys variables are created
81 using my_thread_init() and freed with my_thread_end().
82
83 */
84 struct Worker_thread_context
85 {
86 PSI_thread *psi_thread;
87 st_my_thread_var* mysys_var;
88
saveWorker_thread_context89 void save()
90 {
91 psi_thread= PSI_CALL_get_thread();
92 mysys_var= my_thread_var;
93 }
94
restoreWorker_thread_context95 void restore()
96 {
97 PSI_CALL_set_thread(psi_thread);
98 set_mysys_var(mysys_var);
99 set_current_thd(nullptr);
100 }
101 };
102
103
104 #ifdef HAVE_PSI_INTERFACE
105
106 /*
107 The following fixes PSI "idle" psi instrumentation.
108 The server assumes that connection becomes idle
109 just before net_read_packet() and switches to active after it.
110 In out setup, server becomes idle when async socket io is made.
111 */
112
113 extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
114
dummy_before_header(struct st_net *,void *,size_t)115 static void dummy_before_header(struct st_net *, void *, size_t)
116 {
117 }
118
re_init_net_server_extension(THD * thd)119 static void re_init_net_server_extension(THD *thd)
120 {
121 thd->m_net_server_extension.m_before_header = dummy_before_header;
122 }
123
124 #else
125
126 #define re_init_net_server_extension(thd)
127
128 #endif /* HAVE_PSI_INTERFACE */
129
130
set_thd_idle(THD * thd)131 static inline void set_thd_idle(THD *thd)
132 {
133 thd->net.reading_or_writing= 1;
134 #ifdef HAVE_PSI_INTERFACE
135 net_before_header_psi(&thd->net, thd, 0);
136 #endif
137 }
138
139 /*
140 Attach/associate the connection with the OS thread,
141 */
thread_attach(THD * thd)142 static void thread_attach(THD* thd)
143 {
144 #ifdef WITH_WSREP
145 /* Wait until possible background rollback has finished before
146 attaching the thd. */
147 wsrep_wait_rollback_complete_and_acquire_ownership(thd);
148 #endif /* WITH_WSREP */
149 set_mysys_var(thd->mysys_var);
150 thd->thread_stack=(char*)&thd;
151 thd->store_globals();
152 PSI_CALL_set_thread(thd->get_psi());
153 mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
154 }
155
156 /*
157 Determine connection priority , using current
158 transaction state and 'threadpool_priority' variable value.
159 */
get_priority(TP_connection * c)160 static TP_PRIORITY get_priority(TP_connection *c)
161 {
162 DBUG_ASSERT(c->thd == current_thd);
163 TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
164 if (prio == TP_PRIORITY_AUTO)
165 prio= c->thd->transaction->is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
166
167 return prio;
168 }
169
170
tp_callback(TP_connection * c)171 void tp_callback(TP_connection *c)
172 {
173 DBUG_ASSERT(c);
174
175 Worker_thread_context worker_context;
176 worker_context.save();
177
178 THD *thd= c->thd;
179
180 c->state = TP_STATE_RUNNING;
181
182 if (unlikely(!thd))
183 {
184 /* No THD, need to login first. */
185 DBUG_ASSERT(c->connect);
186 thd= c->thd= threadpool_add_connection(c->connect, c);
187 if (!thd)
188 {
189 /* Bail out on connect error.*/
190 goto error;
191 }
192 c->connect= 0;
193 }
194 else if (threadpool_process_request(thd))
195 {
196 /* QUIT or an error occurred. */
197 goto error;
198 }
199
200 /* Set priority */
201 c->priority= get_priority(c);
202
203 /* Read next command from client. */
204 c->set_io_timeout(thd->get_net_wait_timeout());
205 c->state= TP_STATE_IDLE;
206 if (c->start_io())
207 goto error;
208
209 worker_context.restore();
210 return;
211
212 error:
213 c->thd= 0;
214 if (thd)
215 {
216 threadpool_remove_connection(thd);
217 }
218 delete c;
219 worker_context.restore();
220 }
221
222
threadpool_add_connection(CONNECT * connect,void * scheduler_data)223 static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
224 {
225 THD *thd= NULL;
226
227 DBUG_EXECUTE_IF("CONNECT_wait",
228 {
229 extern MYSQL_SOCKET unix_sock;
230 DBUG_ASSERT(unix_sock.fd >= 0);
231 while (unix_sock.fd >= 0)
232 my_sleep(1000);
233 });
234
235 /*
236 Create a new connection context: mysys_thread_var and PSI thread
237 Store them in THD.
238 */
239
240 set_mysys_var(NULL);
241 my_thread_init();
242 st_my_thread_var* mysys_var= my_thread_var;
243 PSI_CALL_set_thread(PSI_CALL_new_thread(key_thread_one_connection, connect, 0));
244 if (!mysys_var ||!(thd= connect->create_thd(NULL)))
245 {
246 /* Out of memory? */
247 connect->close_and_delete();
248 if (mysys_var)
249 my_thread_end();
250 return NULL;
251 }
252 thd->event_scheduler.data = scheduler_data;
253 server_threads.insert(thd); // Make THD visible in show processlist
254 delete connect; // must be after server_threads.insert, see close_connections()
255 thd->set_mysys_var(mysys_var);
256
257
258 /* Login. */
259 thread_attach(thd);
260 re_init_net_server_extension(thd);
261 ulonglong now= microsecond_interval_timer();
262 thd->prior_thr_create_utime= now;
263 thd->start_utime= now;
264 thd->thr_create_utime= now;
265
266 setup_connection_thread_globals(thd);
267
268 if (thd_prepare_connection(thd))
269 goto end;
270
271 /*
272 Check if THD is ok, as prepare_new_connection_state()
273 can fail, for example if init command failed.
274 */
275 if (!thd_is_connection_alive(thd))
276 goto end;
277
278 thd->skip_wait_timeout= true;
279 set_thd_idle(thd);
280 return thd;
281
282 end:
283 threadpool_remove_connection(thd);
284 return NULL;
285 }
286
287
threadpool_remove_connection(THD * thd)288 static void threadpool_remove_connection(THD *thd)
289 {
290 thread_attach(thd);
291 thd->net.reading_or_writing = 0;
292 end_connection(thd);
293 close_connection(thd, 0);
294 unlink_thd(thd);
295 PSI_CALL_delete_current_thread(); // before THD is destroyed
296 delete thd;
297
298 /*
299 Free resources associated with this connection:
300 mysys thread_var and PSI thread.
301 */
302 my_thread_end();
303 }
304
305
306 /*
307 Ensure that proper error message is sent to client,
308 and "aborted" message appears in the log in case of
309 wait timeout.
310
311 See also timeout handling in net_serv.cc
312 */
handle_wait_timeout(THD * thd)313 static void handle_wait_timeout(THD *thd)
314 {
315 thd->get_stmt_da()->reset_diagnostics_area();
316 thd->reset_killed();
317 my_error(ER_NET_READ_INTERRUPTED, MYF(0));
318 thd->net.last_errno= ER_NET_READ_INTERRUPTED;
319 thd->net.error= 2;
320 }
321
322 /** Check if some client data is cached in thd->net or thd->net.vio */
has_unread_data(THD * thd)323 static bool has_unread_data(THD* thd)
324 {
325 NET *net= &thd->net;
326 if (net->compress && net->remain_in_buf)
327 return true;
328 Vio *vio= net->vio;
329 return vio->has_data(vio);
330 }
331
332
333 /**
334 Process a single client request or a single batch.
335 */
threadpool_process_request(THD * thd)336 static int threadpool_process_request(THD *thd)
337 {
338 int retval= 0;
339 thread_attach(thd);
340
341 if (thd->killed >= KILL_CONNECTION)
342 {
343 /*
344 killed flag was set by timeout handler
345 or KILL command. Return error.
346 */
347 retval= 1;
348 if(thd->killed == KILL_WAIT_TIMEOUT)
349 handle_wait_timeout(thd);
350 goto end;
351 }
352
353
354 /*
355 In the loop below, the flow is essentially the copy of
356 thead-per-connections
357 logic, see do_handle_one_connection() in sql_connect.c
358
359 The goal is to execute a single query, thus the loop is normally executed
360 only once. However for SSL connections, it can be executed multiple times
361 (SSL can preread and cache incoming data, and vio->has_data() checks if it
362 was the case).
363 */
364 for(;;)
365 {
366 thd->net.reading_or_writing= 0;
367 if (mysql_audit_release_required(thd))
368 mysql_audit_release(thd);
369
370 if ((retval= do_command(thd)) != 0)
371 goto end;
372
373 if (!thd_is_connection_alive(thd))
374 {
375 retval= 1;
376 goto end;
377 }
378
379 set_thd_idle(thd);
380
381 if (!has_unread_data(thd))
382 {
383 /* More info on this debug sync is in sql_parse.cc*/
384 DEBUG_SYNC(thd, "before_do_command_net_read");
385 goto end;
386 }
387 }
388
389 end:
390 return retval;
391 }
392
393
394 static TP_pool *pool;
395
tp_init()396 static bool tp_init()
397 {
398
399 #ifdef _WIN32
400 if (threadpool_mode == TP_MODE_WINDOWS)
401 pool= new (std::nothrow) TP_pool_win;
402 else
403 pool= new (std::nothrow) TP_pool_generic;
404 #else
405 pool= new (std::nothrow) TP_pool_generic;
406 #endif
407 if (!pool)
408 return true;
409 if (pool->init())
410 {
411 delete pool;
412 pool= 0;
413 return true;
414 }
415 return false;
416 }
417
tp_add_connection(CONNECT * connect)418 static void tp_add_connection(CONNECT *connect)
419 {
420 TP_connection *c= pool->new_connection(connect);
421 DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
422 if (c)
423 pool->add(c);
424 else
425 connect->close_and_delete();
426 }
427
tp_get_idle_thread_count()428 int tp_get_idle_thread_count()
429 {
430 return pool? pool->get_idle_thread_count(): 0;
431 }
432
tp_get_thread_count()433 int tp_get_thread_count()
434 {
435 return pool ? pool->get_thread_count() : 0;
436 }
437
tp_set_min_threads(uint val)438 void tp_set_min_threads(uint val)
439 {
440 if (pool)
441 pool->set_min_threads(val);
442 }
443
444
tp_set_max_threads(uint val)445 void tp_set_max_threads(uint val)
446 {
447 if (pool)
448 pool->set_max_threads(val);
449 }
450
tp_set_threadpool_size(uint val)451 void tp_set_threadpool_size(uint val)
452 {
453 if (pool)
454 pool->set_pool_size(val);
455 }
456
457
tp_set_threadpool_stall_limit(uint val)458 void tp_set_threadpool_stall_limit(uint val)
459 {
460 if (pool)
461 pool->set_stall_limit(val);
462 }
463
464
tp_timeout_handler(TP_connection * c)465 void tp_timeout_handler(TP_connection *c)
466 {
467 if (c->state != TP_STATE_IDLE)
468 return;
469 THD *thd= c->thd;
470 mysql_mutex_lock(&thd->LOCK_thd_kill);
471 Vio *vio= thd->net.vio;
472 if (vio && (vio_pending(vio) > 0 || vio->has_data(vio)) &&
473 c->state == TP_STATE_IDLE)
474 {
475 /*
476 There is some data on that connection, i.e
477 i.e there was no inactivity timeout.
478 Don't kill.
479 */
480 c->state= TP_STATE_PENDING;
481 }
482 else if (c->state == TP_STATE_IDLE)
483 {
484 thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
485 c->priority= TP_PRIORITY_HIGH;
486 post_kill_notification(thd);
487 }
488 mysql_mutex_unlock(&thd->LOCK_thd_kill);
489 }
490
491 MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
492
tp_wait_begin(THD * thd,int type)493 static void tp_wait_begin(THD *thd, int type)
494 {
495 TP_connection *c = get_TP_connection(thd);
496 if (c)
497 {
498 DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST);
499 tp_waits[type]++;
500 c->wait_begin(type);
501 }
502 }
503
504
tp_wait_end(THD * thd)505 static void tp_wait_end(THD *thd)
506 {
507 TP_connection *c = get_TP_connection(thd);
508 if (c)
509 c->wait_end();
510 }
511
512
tp_end()513 static void tp_end()
514 {
515 delete pool;
516 }
517
tp_post_kill_notification(THD * thd)518 static void tp_post_kill_notification(THD *thd)
519 {
520 TP_connection *c= get_TP_connection(thd);
521 if (c)
522 c->priority= TP_PRIORITY_HIGH;
523 post_kill_notification(thd);
524 }
525
526 static scheduler_functions tp_scheduler_functions=
527 {
528 0, // max_threads
529 NULL,
530 NULL,
531 tp_init, // init
532 tp_add_connection, // add_connection
533 tp_wait_begin, // thd_wait_begin
534 tp_wait_end, // thd_wait_end
535 tp_post_kill_notification, // post kill notification
536 tp_end // end
537 };
538
pool_of_threads_scheduler(struct scheduler_functions * func,ulong * arg_max_connections,Atomic_counter<uint> * arg_connection_count)539 void pool_of_threads_scheduler(struct scheduler_functions *func,
540 ulong *arg_max_connections,
541 Atomic_counter<uint> *arg_connection_count)
542 {
543 *func = tp_scheduler_functions;
544 func->max_threads= threadpool_max_threads;
545 func->max_connections= arg_max_connections;
546 func->connection_count= arg_connection_count;
547 scheduler_init();
548 }
549