1 /* Copyright (C) 2012 Monty Program Ab
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #include "mariadb.h"
17 #include <violite.h>
18 #include <sql_priv.h>
19 #include <sql_class.h>
20 #include <my_pthread.h>
21 #include <scheduler.h>
22
23 #ifdef HAVE_POOL_OF_THREADS
24
25 #ifdef _WIN32
26 /* AIX may define this, too ?*/
27 #define HAVE_IOCP
28 #endif
29
30 #ifdef HAVE_IOCP
31 #define OPTIONAL_IO_POLL_READ_PARAM this
32 #else
33 #define OPTIONAL_IO_POLL_READ_PARAM 0
34 #endif
35
36 #ifdef _WIN32
37 typedef HANDLE TP_file_handle;
38 #else
39 typedef int TP_file_handle;
40 #define INVALID_HANDLE_VALUE -1
41 #endif
42
43
44 #include <sql_connect.h>
45 #include <mysqld.h>
46 #include <debug_sync.h>
47 #include <time.h>
48 #include <sql_plist.h>
49 #include <threadpool.h>
50 #include <time.h>
51 #ifdef __linux__
52 #include <sys/epoll.h>
53 typedef struct epoll_event native_event;
54 #elif defined(HAVE_KQUEUE)
55 #include <sys/event.h>
56 typedef struct kevent native_event;
57 #elif defined (__sun)
58 #include <port.h>
59 typedef port_event_t native_event;
60 #elif defined (HAVE_IOCP)
61 typedef OVERLAPPED_ENTRY native_event;
62 #else
63 #error threadpool is not available on this platform
64 #endif
65
66
io_poll_close(TP_file_handle fd)67 static void io_poll_close(TP_file_handle fd)
68 {
69 #ifdef _WIN32
70 CloseHandle(fd);
71 #else
72 close(fd);
73 #endif
74 }
75
76
77 /** Maximum number of native events a listener can read in one go */
78 #define MAX_EVENTS 1024
79
80 /** Indicates that threadpool was initialized*/
81 static bool threadpool_started= false;
82
83 /*
84 Define PSI Keys for performance schema.
85 We have a mutex per group, worker threads, condition per worker thread,
86 and timer thread with its own mutex and condition.
87 */
88
89
90 #ifdef HAVE_PSI_INTERFACE
91 static PSI_mutex_key key_group_mutex;
92 static PSI_mutex_key key_timer_mutex;
93 static PSI_mutex_info mutex_list[]=
94 {
95 { &key_group_mutex, "group_mutex", 0},
96 { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
97 };
98
99 static PSI_cond_key key_worker_cond;
100 static PSI_cond_key key_timer_cond;
101 static PSI_cond_info cond_list[]=
102 {
103 { &key_worker_cond, "worker_cond", 0},
104 { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
105 };
106
107 static PSI_thread_key key_worker_thread;
108 static PSI_thread_key key_timer_thread;
109 static PSI_thread_info thread_list[] =
110 {
111 {&key_worker_thread, "worker_thread", 0},
112 {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
113 };
114
115 /* Macro to simplify performance schema registration */
116 #define PSI_register(X) \
117 if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
118 #else
119 #define PSI_register(X) /* no-op */
120 #endif
121
122
123 struct thread_group_t;
124
125 /* Per-thread structure for workers */
126 struct worker_thread_t
127 {
128 ulonglong event_count; /* number of request handled by this thread */
129 thread_group_t* thread_group;
130 worker_thread_t *next_in_list;
131 worker_thread_t **prev_in_list;
132
133 mysql_cond_t cond;
134 bool woken;
135 };
136
137 typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
138 &worker_thread_t::next_in_list,
139 &worker_thread_t::prev_in_list>
140 >
141 worker_list_t;
142
143 struct TP_connection_generic:public TP_connection
144 {
145 TP_connection_generic(CONNECT *c);
146 ~TP_connection_generic();
147
initTP_connection_generic148 virtual int init(){ return 0; };
149 virtual void set_io_timeout(int sec);
150 virtual int start_io();
151 virtual void wait_begin(int type);
152 virtual void wait_end();
153
154 thread_group_t *thread_group;
155 TP_connection_generic *next_in_queue;
156 TP_connection_generic **prev_in_queue;
157 ulonglong abs_wait_timeout;
158 ulonglong dequeue_time;
159 TP_file_handle fd;
160 bool bound_to_poll_descriptor;
161 int waiting;
162 #ifdef HAVE_IOCP
163 OVERLAPPED overlapped;
164 #endif
165 #ifdef _WIN32
166 enum_vio_type vio_type;
167 #endif
168 };
169
170
171 typedef I_P_List<TP_connection_generic,
172 I_P_List_adapter<TP_connection_generic,
173 &TP_connection_generic::next_in_queue,
174 &TP_connection_generic::prev_in_queue>,
175 I_P_List_null_counter,
176 I_P_List_fast_push_back<TP_connection_generic> >
177 connection_queue_t;
178
179 const int NQUEUES=2; /* We have high and low priority queues*/
180
MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE)181 struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
182 {
183 mysql_mutex_t mutex;
184 connection_queue_t queues[NQUEUES];
185 worker_list_t waiting_threads;
186 worker_thread_t *listener;
187 pthread_attr_t *pthread_attr;
188 TP_file_handle pollfd;
189 int thread_count;
190 int active_thread_count;
191 int connection_count;
192 /* Stats for the deadlock detection timer routine.*/
193 int io_event_count;
194 int queue_event_count;
195 ulonglong last_thread_creation_time;
196 int shutdown_pipe[2];
197 bool shutdown;
198 bool stalled;
199 };
200
201 static thread_group_t *all_groups;
202 static uint group_count;
203 static int32 shutdown_group_count;
204
205 /**
206 Used for printing "pool blocked" message, see
207 print_pool_blocked_message();
208 */
209 static ulonglong pool_block_start;
210
211 /* Global timer for all groups */
212 struct pool_timer_t
213 {
214 mysql_mutex_t mutex;
215 mysql_cond_t cond;
216 volatile uint64 current_microtime;
217 volatile uint64 next_timeout_check;
218 int tick_interval;
219 bool shutdown;
220 pthread_t timer_thread_id;
221 };
222
223 static pool_timer_t pool_timer;
224
225 static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
226 static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
227 static int wake_thread(thread_group_t *thread_group);
228 static int wake_or_create_thread(thread_group_t *thread_group);
229 static int create_worker(thread_group_t *thread_group);
230 static void *worker_main(void *param);
231 static void check_stall(thread_group_t *thread_group);
232 static void set_next_timeout_check(ulonglong abstime);
233 static void print_pool_blocked_message(bool);
234
235 /**
236 Asynchronous network IO.
237
238 We use native edge-triggered network IO multiplexing facility.
239 This maps to different APIs on different Unixes.
240
241 Supported are currently Linux with epoll, Solaris with event ports,
242 OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags
243 (the event is signalled once client has written something into the socket,
244 then socket is removed from the "poll-set" until the command is finished,
245 and we need to re-arm/re-register socket)
246
247 No implementation for poll/select is currently provided.
248
249 The API closely resembles all of the above mentioned platform APIs
250 and consists of following functions.
251
252 - io_poll_create()
253 Creates an io_poll descriptor
254 On Linux: epoll_create()
255
256 - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt)
257 Associate file descriptor with io poll descriptor
258 On Linux : epoll_ctl(..EPOLL_CTL_ADD))
259
260 - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
261 Associate file descriptor with io poll descriptor
262 On Linux: epoll_ctl(..EPOLL_CTL_DEL)
263
264
265 - io_poll_start_read(int poll_fd,int fd, void *data, void *opt)
266 The same as io_poll_associate_fd(), but cannot be used before
267 io_poll_associate_fd() was called.
268 On Linux : epoll_ctl(..EPOLL_CTL_MOD)
269
270 - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents,
271 int timeout_ms)
272
273 wait until one or more descriptors added with io_poll_associate_fd()
274 or io_poll_start_read() becomes readable. Data associated with
275 descriptors can be retrieved from native_events array, using
276 native_event_get_userdata() function.
277
278
279 On Linux: epoll_wait()
280 */
281
282 #if defined (__linux__)
283 #ifndef EPOLLRDHUP
284 /* Early 2.6 kernel did not have EPOLLRDHUP */
285 #define EPOLLRDHUP 0
286 #endif
io_poll_create()287 static TP_file_handle io_poll_create()
288 {
289 return epoll_create(1);
290 }
291
292
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)293 int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*)
294 {
295 struct epoll_event ev;
296 ev.data.u64= 0; /* Keep valgrind happy */
297 ev.data.ptr= data;
298 ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
299 return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev);
300 }
301
302
303
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)304 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
305 {
306 struct epoll_event ev;
307 ev.data.u64= 0; /* Keep valgrind happy */
308 ev.data.ptr= data;
309 ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
310 return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
311 }
312
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)313 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
314 {
315 struct epoll_event ev;
316 return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
317 }
318
319
320 /*
321 Wrapper around epoll_wait.
322 NOTE - in case of EINTR, it restarts with original timeout. Since we use
323 either infinite or 0 timeouts, this is not critical
324 */
io_poll_wait(TP_file_handle pollfd,native_event * native_events,int maxevents,int timeout_ms)325 int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents,
326 int timeout_ms)
327 {
328 int ret;
329 do
330 {
331 ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
332 }
333 while(ret == -1 && errno == EINTR);
334 return ret;
335 }
336
337
native_event_get_userdata(native_event * event)338 static void *native_event_get_userdata(native_event *event)
339 {
340 return event->data.ptr;
341 }
342
343 #elif defined(HAVE_KQUEUE)
344
345 /*
346 NetBSD is incompatible with other BSDs , last parameter in EV_SET macro
347 (udata, user data) needs to be intptr_t, whereas it needs to be void*
348 everywhere else.
349 */
350
351 #ifdef __NetBSD__
352 #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, (intptr_t)g)
353 #else
354 #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, g)
355 #endif
356
357
io_poll_create()358 TP_file_handle io_poll_create()
359 {
360 return kqueue();
361 }
362
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)363 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
364 {
365 struct kevent ke;
366 MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
367 0, 0, data);
368 return kevent(pollfd, &ke, 1, 0, 0, 0);
369 }
370
371
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)372 int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
373 {
374 struct kevent ke;
375 MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
376 0, 0, data);
377 return io_poll_start_read(pollfd,fd, data, 0);
378 }
379
380
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)381 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
382 {
383 struct kevent ke;
384 MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
385 return kevent(pollfd, &ke, 1, 0, 0, 0);
386 }
387
388
io_poll_wait(TP_file_handle pollfd,struct kevent * events,int maxevents,int timeout_ms)389 int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms)
390 {
391 struct timespec ts;
392 int ret;
393 if (timeout_ms >= 0)
394 {
395 ts.tv_sec= timeout_ms/1000;
396 ts.tv_nsec= (timeout_ms%1000)*1000000;
397 }
398 do
399 {
400 ret= kevent(pollfd, 0, 0, events, maxevents,
401 (timeout_ms >= 0)?&ts:NULL);
402 }
403 while (ret == -1 && errno == EINTR);
404 return ret;
405 }
406
native_event_get_userdata(native_event * event)407 static void* native_event_get_userdata(native_event *event)
408 {
409 return (void *)event->udata;
410 }
411
412 #elif defined (__sun)
413
io_poll_create()414 static TP_file_handle io_poll_create()
415 {
416 return port_create();
417 }
418
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)419 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
420 {
421 return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
422 }
423
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)424 static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
425 {
426 return io_poll_start_read(pollfd, fd, data, 0);
427 }
428
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)429 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
430 {
431 return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
432 }
433
io_poll_wait(TP_file_handle pollfd,native_event * events,int maxevents,int timeout_ms)434 int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
435 {
436 struct timespec ts;
437 int ret;
438 uint_t nget= 1;
439 if (timeout_ms >= 0)
440 {
441 ts.tv_sec= timeout_ms/1000;
442 ts.tv_nsec= (timeout_ms%1000)*1000000;
443 }
444 do
445 {
446 ret= port_getn(pollfd, events, maxevents, &nget,
447 (timeout_ms >= 0)?&ts:NULL);
448 }
449 while (ret == -1 && errno == EINTR);
450 DBUG_ASSERT(nget < INT_MAX);
451 return (int)nget;
452 }
453
native_event_get_userdata(native_event * event)454 static void* native_event_get_userdata(native_event *event)
455 {
456 return event->portev_user;
457 }
458
459 #elif defined(HAVE_IOCP)
460
461
io_poll_create()462 static TP_file_handle io_poll_create()
463 {
464 return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
465 }
466
467
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void *,void * opt)468 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
469 {
470 static char c;
471 TP_connection_generic *con= (TP_connection_generic *)opt;
472 OVERLAPPED *overlapped= &con->overlapped;
473 if (con->vio_type == VIO_TYPE_NAMEDPIPE)
474 {
475 if (ReadFile(fd, &c, 0, NULL, overlapped))
476 return 0;
477 }
478 else
479 {
480 WSABUF buf;
481 buf.buf= &c;
482 buf.len= 0;
483 DWORD flags=0;
484
485 if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
486 return 0;
487 }
488
489 if (GetLastError() == ERROR_IO_PENDING)
490 return 0;
491
492 return 1;
493 }
494
495
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void * opt)496 static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt)
497 {
498 HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0);
499 if (!h)
500 return -1;
501 return io_poll_start_read(pollfd,fd, 0, opt);
502 }
503
504
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)505 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
506 {
507 /* Not possible to unbind/rebind file descriptor in IOCP. */
508 return 0;
509 }
510
511
io_poll_wait(TP_file_handle pollfd,native_event * events,int maxevents,int timeout_ms)512 int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
513 {
514 ULONG n;
515 BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
516 maxevents, &n, timeout_ms, FALSE);
517
518 return ok ? (int)n : -1;
519 }
520
521
native_event_get_userdata(native_event * event)522 static void* native_event_get_userdata(native_event *event)
523 {
524 return (void *)event->lpCompletionKey;
525 }
526 #endif
527
528
529 /* Dequeue element from a workqueue */
530
queue_get(thread_group_t * thread_group)531 static TP_connection_generic *queue_get(thread_group_t *thread_group)
532 {
533 DBUG_ENTER("queue_get");
534 thread_group->queue_event_count++;
535 TP_connection_generic *c;
536 for (int i=0; i < NQUEUES;i++)
537 {
538 c= thread_group->queues[i].pop_front();
539 if (c)
540 DBUG_RETURN(c);
541 }
542 DBUG_RETURN(0);
543 }
544
is_queue_empty(thread_group_t * thread_group)545 static bool is_queue_empty(thread_group_t *thread_group)
546 {
547 for (int i=0; i < NQUEUES; i++)
548 {
549 if (!thread_group->queues[i].is_empty())
550 return false;
551 }
552 return true;
553 }
554
555
queue_init(thread_group_t * thread_group)556 static void queue_init(thread_group_t *thread_group)
557 {
558 for (int i=0; i < NQUEUES; i++)
559 {
560 thread_group->queues[i].empty();
561 }
562 }
563
queue_put(thread_group_t * thread_group,native_event * ev,int cnt)564 static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
565 {
566 ulonglong now= pool_timer.current_microtime;
567 for(int i=0; i < cnt; i++)
568 {
569 TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
570 c->dequeue_time= now;
571 thread_group->queues[c->priority].push_back(c);
572 }
573 }
574
575 /*
576 Handle wait timeout :
577 Find connections that have been idle for too long and kill them.
578 Also, recalculate time when next timeout check should run.
579 */
580
timeout_check(pool_timer_t * timer)581 static void timeout_check(pool_timer_t *timer)
582 {
583 DBUG_ENTER("timeout_check");
584
585 mysql_mutex_lock(&LOCK_thread_count);
586 I_List_iterator<THD> it(threads);
587
588 /* Reset next timeout check, it will be recalculated in the loop below */
589 my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
590
591 THD *thd;
592 while ((thd=it++))
593 {
594 TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
595 if (!connection || connection->state != TP_STATE_IDLE)
596 {
597 /*
598 Connection does not have scheduler data. This happens for example
599 if THD belongs to a different scheduler, that is listening to extra_port.
600 */
601 continue;
602 }
603
604 if(connection->abs_wait_timeout < timer->current_microtime)
605 {
606 tp_timeout_handler(connection);
607 }
608 else
609 {
610 set_next_timeout_check(connection->abs_wait_timeout);
611 }
612 }
613 mysql_mutex_unlock(&LOCK_thread_count);
614 DBUG_VOID_RETURN;
615 }
616
617
618 /*
619 Timer thread.
620
621 Periodically, check if one of the thread groups is stalled. Stalls happen if
622 events are not being dequeued from the queue, or from the network, Primary
623 reason for stall can be a lengthy executing non-blocking request. It could
624 also happen that thread is waiting but wait_begin/wait_end is forgotten by
625 storage engine. Timer thread will create a new thread in group in case of
626 a stall.
627
628 Besides checking for stalls, timer thread is also responsible for terminating
629 clients that have been idle for longer than wait_timeout seconds.
630
631 TODO: Let the timer sleep for long time if there is no work to be done.
632 Currently it wakes up rather often on and idle server.
633 */
634
timer_thread(void * param)635 static void* timer_thread(void *param)
636 {
637 uint i;
638 pool_timer_t* timer=(pool_timer_t *)param;
639
640 my_thread_init();
641 DBUG_ENTER("timer_thread");
642 timer->next_timeout_check= ULONGLONG_MAX;
643 timer->current_microtime= microsecond_interval_timer();
644
645 for(;;)
646 {
647 struct timespec ts;
648 int err;
649
650 set_timespec_nsec(ts,timer->tick_interval*1000000);
651 mysql_mutex_lock(&timer->mutex);
652 err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
653 if (timer->shutdown)
654 {
655 mysql_mutex_unlock(&timer->mutex);
656 break;
657 }
658 if (err == ETIMEDOUT)
659 {
660 timer->current_microtime= microsecond_interval_timer();
661
662 /* Check stalls in thread groups */
663 for (i= 0; i < threadpool_max_size; i++)
664 {
665 if(all_groups[i].connection_count)
666 check_stall(&all_groups[i]);
667 }
668
669 /* Check if any client exceeded wait_timeout */
670 if (timer->next_timeout_check <= timer->current_microtime)
671 timeout_check(timer);
672 }
673 mysql_mutex_unlock(&timer->mutex);
674 }
675
676 mysql_mutex_destroy(&timer->mutex);
677 my_thread_end();
678 return NULL;
679 }
680
681
682
check_stall(thread_group_t * thread_group)683 void check_stall(thread_group_t *thread_group)
684 {
685 mysql_mutex_lock(&thread_group->mutex);
686
687 /*
688 Bump priority for the low priority connections that spent too much
689 time in low prio queue.
690 */
691 TP_connection_generic *c;
692 for (;;)
693 {
694 c= thread_group->queues[TP_PRIORITY_LOW].front();
695 if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer)
696 {
697 thread_group->queues[TP_PRIORITY_LOW].remove(c);
698 thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
699 }
700 else
701 break;
702 }
703
704 /*
705 Check if listener is present. If not, check whether any IO
706 events were dequeued since last time. If not, this means
707 listener is either in tight loop or thd_wait_begin()
708 was forgotten. Create a new worker(it will make itself listener).
709 */
710 if (!thread_group->listener && !thread_group->io_event_count)
711 {
712 wake_or_create_thread(thread_group);
713 mysql_mutex_unlock(&thread_group->mutex);
714 return;
715 }
716
717 /* Reset io event count */
718 thread_group->io_event_count= 0;
719
720 /*
721 Check whether requests from the workqueue are being dequeued.
722
723 The stall detection and resolution works as follows:
724
725 1. There is a counter thread_group->queue_event_count for the number of
726 events removed from the queue. Timer resets the counter to 0 on each run.
727 2. Timer determines stall if this counter remains 0 since last check
728 and the queue is not empty.
729 3. Once timer determined a stall it sets thread_group->stalled flag and
730 wakes and idle worker (or creates a new one, subject to throttling).
731 4. The stalled flag is reset, when an event is dequeued.
732
733 Q : Will this handling lead to an unbound growth of threads, if queue
734 stalls permanently?
735 A : No. If queue stalls permanently, it is an indication for many very long
736 simultaneous queries. The maximum number of simultanoues queries is
737 max_connections, further we have threadpool_max_threads limit, upon which no
738 worker threads are created. So in case there is a flood of very long
739 queries, threadpool would slowly approach thread-per-connection behavior.
740 NOTE:
741 If long queries never wait, creation of the new threads is done by timer,
742 so it is slower than in real thread-per-connection. However if long queries
743 do wait and indicate that via thd_wait_begin/end callbacks, thread creation
744 will be faster.
745 */
746 if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
747 {
748 thread_group->stalled= true;
749 wake_or_create_thread(thread_group);
750 }
751
752 /* Reset queue event count */
753 thread_group->queue_event_count= 0;
754
755 mysql_mutex_unlock(&thread_group->mutex);
756 }
757
758
start_timer(pool_timer_t * timer)759 static void start_timer(pool_timer_t* timer)
760 {
761 DBUG_ENTER("start_timer");
762 mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
763 mysql_cond_init(key_timer_cond, &timer->cond, NULL);
764 timer->shutdown = false;
765 mysql_thread_create(key_timer_thread, &timer->timer_thread_id, NULL,
766 timer_thread, timer);
767 DBUG_VOID_RETURN;
768 }
769
770
stop_timer(pool_timer_t * timer)771 static void stop_timer(pool_timer_t *timer)
772 {
773 DBUG_ENTER("stop_timer");
774 mysql_mutex_lock(&timer->mutex);
775 timer->shutdown = true;
776 mysql_cond_signal(&timer->cond);
777 mysql_mutex_unlock(&timer->mutex);
778 pthread_join(timer->timer_thread_id, NULL);
779 DBUG_VOID_RETURN;
780 }
781
782
783 /**
784 Poll for socket events and distribute them to worker threads
785 In many case current thread will handle single event itself.
786
787 @return a ready connection, or NULL on shutdown
788 */
listener(worker_thread_t * current_thread,thread_group_t * thread_group)789 static TP_connection_generic * listener(worker_thread_t *current_thread,
790 thread_group_t *thread_group)
791 {
792 DBUG_ENTER("listener");
793 TP_connection_generic *retval= NULL;
794
795 for(;;)
796 {
797 native_event ev[MAX_EVENTS];
798 int cnt;
799
800 if (thread_group->shutdown)
801 break;
802
803 cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
804
805 if (cnt <=0)
806 {
807 DBUG_ASSERT(thread_group->shutdown);
808 break;
809 }
810
811 mysql_mutex_lock(&thread_group->mutex);
812
813 if (thread_group->shutdown)
814 {
815 mysql_mutex_unlock(&thread_group->mutex);
816 break;
817 }
818
819 thread_group->io_event_count += cnt;
820
821 /*
822 We got some network events and need to make decisions : whether
823 listener hould handle events and whether or not any wake worker
824 threads so they can handle events.
825
826 Q1 : Should listener handle an event itself, or put all events into
827 queue and let workers handle the events?
828
829 Solution :
830 Generally, listener that handles events itself is preferable. We do not
831 want listener thread to change its state from waiting to running too
832 often, Since listener has just woken from poll, it better uses its time
833 slice and does some work. Besides, not handling events means they go to
834 the queue, and often to wake another worker must wake up to handle the
835 event. This is not good, as we want to avoid wakeups.
836
837 The downside of listener that also handles queries is that we can
838 potentially leave thread group for long time not picking the new
839 network events. It is not a major problem, because this stall will be
840 detected sooner or later by the timer thread. Still, relying on timer
841 is not always good, because it may "tick" too slow (large timer_interval)
842
843 We use following strategy to solve this problem - if queue was not empty
844 we suspect flood of network events and listener stays, Otherwise, it
845 handles a query.
846
847
848 Q2: If queue is not empty, how many workers to wake?
849
850 Solution:
851 We generally try to keep one thread per group active (threads handling
852 queries are considered active, unless they stuck in inside some "wait")
853 Thus, we will wake only one worker, and only if there is not active
854 threads currently,and listener is not going to handle a query. When we
855 don't wake, we hope that currently active threads will finish fast and
856 handle the queue. If this does not happen, timer thread will detect stall
857 and wake a worker.
858
859 NOTE: Currently nothing is done to detect or prevent long queuing times.
860 A solution for the future would be to give up "one active thread per
861 group" principle, if events stay in the queue for too long, and just wake
862 more workers.
863 */
864
865 bool listener_picks_event=is_queue_empty(thread_group);
866 queue_put(thread_group, ev, cnt);
867 if (listener_picks_event)
868 {
869 /* Handle the first event. */
870 retval= queue_get(thread_group);
871 mysql_mutex_unlock(&thread_group->mutex);
872 break;
873 }
874
875 if(thread_group->active_thread_count==0)
876 {
877 /* We added some work items to queue, now wake a worker. */
878 if(wake_thread(thread_group))
879 {
880 /*
881 Wake failed, hence groups has no idle threads. Now check if there are
882 any threads in the group except listener.
883 */
884 if(thread_group->thread_count == 1)
885 {
886 /*
887 Currently there is no worker thread in the group, as indicated by
888 thread_count == 1 (this means listener is the only one thread in
889 the group).
890 The queue is not empty, and listener is not going to handle
891 events. In order to drain the queue, we create a worker here.
892 Alternatively, we could just rely on timer to detect stall, and
893 create thread, but waiting for timer would be an inefficient and
894 pointless delay.
895 */
896 create_worker(thread_group);
897 }
898 }
899 }
900 mysql_mutex_unlock(&thread_group->mutex);
901 }
902
903 DBUG_RETURN(retval);
904 }
905
906 /**
907 Adjust thread counters in group or global
908 whenever thread is created or is about to exit
909
910 @param thread_group
911 @param count - 1, when new thread is created
912 -1, when thread is about to exit
913 */
914
add_thread_count(thread_group_t * thread_group,int32 count)915 static void add_thread_count(thread_group_t *thread_group, int32 count)
916 {
917 thread_group->thread_count += count;
918 /* worker starts out and end in "active" state */
919 thread_group->active_thread_count += count;
920 my_atomic_add32(&tp_stats.num_worker_threads, count);
921 }
922
923
924 /**
925 Creates a new worker thread.
926 thread_mutex must be held when calling this function
927
928 NOTE: in rare cases, the number of threads can exceed
929 threadpool_max_threads, because we need at least 2 threads
930 per group to prevent deadlocks (one listener + one worker)
931 */
932
create_worker(thread_group_t * thread_group)933 static int create_worker(thread_group_t *thread_group)
934 {
935 pthread_t thread_id;
936 bool max_threads_reached= false;
937 int err;
938
939 DBUG_ENTER("create_worker");
940 if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
941 && thread_group->thread_count >= 2)
942 {
943 err= 1;
944 max_threads_reached= true;
945 goto end;
946 }
947
948
949 err= mysql_thread_create(key_worker_thread, &thread_id,
950 thread_group->pthread_attr, worker_main, thread_group);
951 if (!err)
952 {
953 thread_group->last_thread_creation_time=microsecond_interval_timer();
954 statistic_increment(thread_created,&LOCK_status);
955 add_thread_count(thread_group, 1);
956 }
957 else
958 {
959 my_errno= errno;
960 }
961
962 end:
963 if (err)
964 print_pool_blocked_message(max_threads_reached);
965 else
966 pool_block_start= 0; /* Reset pool blocked timer, if it was set */
967
968 DBUG_RETURN(err);
969 }
970
971
972 /**
973 Calculate microseconds throttling delay for thread creation.
974
975 The value depends on how many threads are already in the group:
976 small number of threads means no delay, the more threads the larger
977 the delay.
978
979 The actual values were not calculated using any scientific methods.
980 They just look right, and behave well in practice.
981
982 TODO: Should throttling depend on thread_pool_stall_limit?
983 */
microsecond_throttling_interval(thread_group_t * thread_group)984 static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
985 {
986 int count= thread_group->thread_count;
987
988 if (count < 4)
989 return 0;
990
991 if (count < 8)
992 return 50*1000;
993
994 if(count < 16)
995 return 100*1000;
996
997 return 200*1000;
998 }
999
1000
1001 /**
1002 Wakes a worker thread, or creates a new one.
1003
1004 Worker creation is throttled, so we avoid too many threads
1005 to be created during the short time.
1006 */
wake_or_create_thread(thread_group_t * thread_group)1007 static int wake_or_create_thread(thread_group_t *thread_group)
1008 {
1009 DBUG_ENTER("wake_or_create_thread");
1010
1011 if (thread_group->shutdown)
1012 DBUG_RETURN(0);
1013
1014 if (wake_thread(thread_group) == 0)
1015 DBUG_RETURN(0);
1016
1017 if (thread_group->thread_count > thread_group->connection_count)
1018 DBUG_RETURN(-1);
1019
1020
1021 if (thread_group->active_thread_count == 0)
1022 {
1023 /*
1024 We're better off creating a new thread here with no delay, either there
1025 are no workers at all, or they all are all blocking and there was no
1026 idle thread to wakeup. Smells like a potential deadlock or very slowly
1027 executing requests, e.g sleeps or user locks.
1028 */
1029 DBUG_RETURN(create_worker(thread_group));
1030 }
1031
1032 ulonglong now = microsecond_interval_timer();
1033 ulonglong time_since_last_thread_created =
1034 (now - thread_group->last_thread_creation_time);
1035
1036 /* Throttle thread creation. */
1037 if (time_since_last_thread_created >
1038 microsecond_throttling_interval(thread_group))
1039 {
1040 DBUG_RETURN(create_worker(thread_group));
1041 }
1042
1043 DBUG_RETURN(-1);
1044 }
1045
1046
1047
thread_group_init(thread_group_t * thread_group,pthread_attr_t * thread_attr)1048 int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
1049 {
1050 DBUG_ENTER("thread_group_init");
1051 thread_group->pthread_attr = thread_attr;
1052 mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
1053 thread_group->pollfd= INVALID_HANDLE_VALUE;
1054 thread_group->shutdown_pipe[0]= -1;
1055 thread_group->shutdown_pipe[1]= -1;
1056 queue_init(thread_group);
1057 DBUG_RETURN(0);
1058 }
1059
1060
thread_group_destroy(thread_group_t * thread_group)1061 void thread_group_destroy(thread_group_t *thread_group)
1062 {
1063 mysql_mutex_destroy(&thread_group->mutex);
1064 if (thread_group->pollfd != INVALID_HANDLE_VALUE)
1065 {
1066 io_poll_close(thread_group->pollfd);
1067 thread_group->pollfd= INVALID_HANDLE_VALUE;
1068 }
1069 #ifndef HAVE_IOCP
1070 for(int i=0; i < 2; i++)
1071 {
1072 if(thread_group->shutdown_pipe[i] != -1)
1073 {
1074 close(thread_group->shutdown_pipe[i]);
1075 thread_group->shutdown_pipe[i]= -1;
1076 }
1077 }
1078 #endif
1079
1080 if (my_atomic_add32(&shutdown_group_count, -1) == 1)
1081 {
1082 my_free(all_groups);
1083 all_groups= 0;
1084 }
1085 }
1086
1087 /**
1088 Wake sleeping thread from waiting list
1089 */
1090
wake_thread(thread_group_t * thread_group)1091 static int wake_thread(thread_group_t *thread_group)
1092 {
1093 DBUG_ENTER("wake_thread");
1094 worker_thread_t *thread = thread_group->waiting_threads.front();
1095 if(thread)
1096 {
1097 thread->woken= true;
1098 thread_group->waiting_threads.remove(thread);
1099 mysql_cond_signal(&thread->cond);
1100 DBUG_RETURN(0);
1101 }
1102 DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
1103 }
1104
1105 /*
1106 Wake listener thread (during shutdown)
1107 Self-pipe trick is used in most cases,except IOCP.
1108 */
wake_listener(thread_group_t * thread_group)1109 static int wake_listener(thread_group_t *thread_group)
1110 {
1111 #ifndef HAVE_IOCP
1112 if (pipe(thread_group->shutdown_pipe))
1113 {
1114 return -1;
1115 }
1116
1117 /* Wake listener */
1118 if (io_poll_associate_fd(thread_group->pollfd,
1119 thread_group->shutdown_pipe[0], NULL, NULL))
1120 {
1121 return -1;
1122 }
1123 char c= 0;
1124 if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
1125 return -1;
1126 #else
1127 PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0);
1128 #endif
1129 return 0;
1130 }
1131 /**
1132 Initiate shutdown for thread group.
1133
1134 The shutdown is asynchronous, we only care to wake all threads in here, so
1135 they can finish. We do not wait here until threads terminate. Final cleanup
1136 of the group (thread_group_destroy) will be done by the last exiting threads.
1137 */
1138
thread_group_close(thread_group_t * thread_group)1139 static void thread_group_close(thread_group_t *thread_group)
1140 {
1141 DBUG_ENTER("thread_group_close");
1142
1143 mysql_mutex_lock(&thread_group->mutex);
1144 if (thread_group->thread_count == 0)
1145 {
1146 mysql_mutex_unlock(&thread_group->mutex);
1147 thread_group_destroy(thread_group);
1148 DBUG_VOID_RETURN;
1149 }
1150
1151 thread_group->shutdown= true;
1152 thread_group->listener= NULL;
1153
1154 wake_listener(thread_group);
1155
1156 /* Wake all workers. */
1157 while(wake_thread(thread_group) == 0)
1158 {
1159 }
1160
1161 mysql_mutex_unlock(&thread_group->mutex);
1162
1163 DBUG_VOID_RETURN;
1164 }
1165
1166
1167 /*
1168 Add work to the queue. Maybe wake a worker if they all sleep.
1169
1170 Currently, this function is only used when new connections need to
1171 perform login (this is done in worker threads).
1172
1173 */
1174
queue_put(thread_group_t * thread_group,TP_connection_generic * connection)1175 static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection)
1176 {
1177 DBUG_ENTER("queue_put");
1178
1179 connection->dequeue_time= pool_timer.current_microtime;
1180 thread_group->queues[connection->priority].push_back(connection);
1181
1182 if (thread_group->active_thread_count == 0)
1183 wake_or_create_thread(thread_group);
1184
1185 DBUG_VOID_RETURN;
1186 }
1187
1188
1189 /*
1190 Prevent too many threads executing at the same time,if the workload is
1191 not CPU bound.
1192 */
1193
too_many_threads(thread_group_t * thread_group)1194 static bool too_many_threads(thread_group_t *thread_group)
1195 {
1196 return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe
1197 && !thread_group->stalled);
1198 }
1199
1200
1201 /**
1202 Retrieve a connection with pending event.
1203
1204 Pending event in our case means that there is either a pending login request
1205 (if connection is not yet logged in), or there are unread bytes on the socket.
1206
1207 If there are no pending events currently, thread will wait.
1208 If timeout specified in abstime parameter passes, the function returns NULL.
1209
1210 @param current_thread - current worker thread
1211 @param thread_group - current thread group
1212 @param abstime - absolute wait timeout
1213
1214 @return
1215 connection with pending event.
1216 NULL is returned if timeout has expired,or on shutdown.
1217 */
1218
get_event(worker_thread_t * current_thread,thread_group_t * thread_group,struct timespec * abstime)1219 TP_connection_generic *get_event(worker_thread_t *current_thread,
1220 thread_group_t *thread_group, struct timespec *abstime)
1221 {
1222 DBUG_ENTER("get_event");
1223 TP_connection_generic *connection = NULL;
1224
1225
1226 mysql_mutex_lock(&thread_group->mutex);
1227 DBUG_ASSERT(thread_group->active_thread_count >= 0);
1228
1229 for(;;)
1230 {
1231 int err=0;
1232 bool oversubscribed = too_many_threads(thread_group);
1233 if (thread_group->shutdown)
1234 break;
1235
1236 /* Check if queue is not empty */
1237 if (!oversubscribed)
1238 {
1239 connection = queue_get(thread_group);
1240 if(connection)
1241 break;
1242 }
1243
1244 /* If there is currently no listener in the group, become one. */
1245 if(!thread_group->listener)
1246 {
1247 thread_group->listener= current_thread;
1248 thread_group->active_thread_count--;
1249 mysql_mutex_unlock(&thread_group->mutex);
1250
1251 connection = listener(current_thread, thread_group);
1252
1253 mysql_mutex_lock(&thread_group->mutex);
1254 thread_group->active_thread_count++;
1255 /* There is no listener anymore, it just returned. */
1256 thread_group->listener= NULL;
1257 break;
1258 }
1259
1260
1261 /*
1262 Last thing we try before going to sleep is to
1263 non-blocking event poll, i.e with timeout = 0.
1264 If this returns events, pick one
1265 */
1266 if (!oversubscribed)
1267 {
1268
1269 native_event ev[MAX_EVENTS];
1270 int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
1271 if (cnt > 0)
1272 {
1273 queue_put(thread_group, ev, cnt);
1274 connection= queue_get(thread_group);
1275 break;
1276 }
1277 }
1278
1279
1280 /* And now, finally sleep */
1281 current_thread->woken = false; /* wake() sets this to true */
1282
1283 /*
1284 Add current thread to the head of the waiting list and wait.
1285 It is important to add thread to the head rather than tail
1286 as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
1287 */
1288 thread_group->waiting_threads.push_front(current_thread);
1289
1290 thread_group->active_thread_count--;
1291 if (abstime)
1292 {
1293 err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex,
1294 abstime);
1295 }
1296 else
1297 {
1298 err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex);
1299 }
1300 thread_group->active_thread_count++;
1301
1302 if (!current_thread->woken)
1303 {
1304 /*
1305 Thread was not signalled by wake(), it might be a spurious wakeup or
1306 a timeout. Anyhow, we need to remove ourselves from the list now.
1307 If thread was explicitly woken, than caller removed us from the list.
1308 */
1309 thread_group->waiting_threads.remove(current_thread);
1310 }
1311
1312 if (err)
1313 break;
1314 }
1315
1316 thread_group->stalled= false;
1317 mysql_mutex_unlock(&thread_group->mutex);
1318
1319 DBUG_RETURN(connection);
1320 }
1321
1322
1323
1324 /**
1325 Tells the pool that worker starts waiting on IO, lock, condition,
1326 sleep() or similar.
1327 */
1328
wait_begin(thread_group_t * thread_group)1329 void wait_begin(thread_group_t *thread_group)
1330 {
1331 DBUG_ENTER("wait_begin");
1332 mysql_mutex_lock(&thread_group->mutex);
1333 thread_group->active_thread_count--;
1334
1335 DBUG_ASSERT(thread_group->active_thread_count >=0);
1336 DBUG_ASSERT(thread_group->connection_count > 0);
1337
1338 if ((thread_group->active_thread_count == 0) &&
1339 (!is_queue_empty(thread_group) || !thread_group->listener))
1340 {
1341 /*
1342 Group might stall while this thread waits, thus wake
1343 or create a worker to prevent stall.
1344 */
1345 wake_or_create_thread(thread_group);
1346 }
1347
1348 mysql_mutex_unlock(&thread_group->mutex);
1349 DBUG_VOID_RETURN;
1350 }
1351
1352 /**
1353 Tells the pool has finished waiting.
1354 */
1355
wait_end(thread_group_t * thread_group)1356 void wait_end(thread_group_t *thread_group)
1357 {
1358 DBUG_ENTER("wait_end");
1359 mysql_mutex_lock(&thread_group->mutex);
1360 thread_group->active_thread_count++;
1361 mysql_mutex_unlock(&thread_group->mutex);
1362 DBUG_VOID_RETURN;
1363 }
1364
1365
1366
1367
new_connection(CONNECT * c)1368 TP_connection * TP_pool_generic::new_connection(CONNECT *c)
1369 {
1370 return new (std::nothrow) TP_connection_generic(c);
1371 }
1372
1373 /**
1374 Add a new connection to thread pool..
1375 */
1376
add(TP_connection * c)1377 void TP_pool_generic::add(TP_connection *c)
1378 {
1379 DBUG_ENTER("tp_add_connection");
1380
1381 TP_connection_generic *connection=(TP_connection_generic *)c;
1382 thread_group_t *thread_group= connection->thread_group;
1383 /*
1384 Add connection to the work queue.Actual logon
1385 will be done by a worker thread.
1386 */
1387 mysql_mutex_lock(&thread_group->mutex);
1388 queue_put(thread_group, connection);
1389 mysql_mutex_unlock(&thread_group->mutex);
1390 DBUG_VOID_RETURN;
1391 }
1392
1393
1394
1395 /**
1396 MySQL scheduler callback: wait begin
1397 */
1398
wait_begin(int type)1399 void TP_connection_generic::wait_begin(int type)
1400 {
1401 DBUG_ENTER("wait_begin");
1402
1403 DBUG_ASSERT(!waiting);
1404 waiting++;
1405 if (waiting == 1)
1406 ::wait_begin(thread_group);
1407 DBUG_VOID_RETURN;
1408 }
1409
1410
1411 /**
1412 MySQL scheduler callback: wait end
1413 */
1414
wait_end()1415 void TP_connection_generic::wait_end()
1416 {
1417 DBUG_ENTER("wait_end");
1418 DBUG_ASSERT(waiting);
1419 waiting--;
1420 if (waiting == 0)
1421 ::wait_end(thread_group);
1422 DBUG_VOID_RETURN;
1423 }
1424
1425
set_next_timeout_check(ulonglong abstime)1426 static void set_next_timeout_check(ulonglong abstime)
1427 {
1428 DBUG_ENTER("set_next_timeout_check");
1429 while(abstime < pool_timer.next_timeout_check)
1430 {
1431 longlong old= (longlong)pool_timer.next_timeout_check;
1432 my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
1433 &old, abstime);
1434 }
1435 DBUG_VOID_RETURN;
1436 }
1437
TP_connection_generic(CONNECT * c)1438 TP_connection_generic::TP_connection_generic(CONNECT *c):
1439 TP_connection(c),
1440 thread_group(0),
1441 next_in_queue(0),
1442 prev_in_queue(0),
1443 abs_wait_timeout(ULONGLONG_MAX),
1444 bound_to_poll_descriptor(false),
1445 waiting(false)
1446 #ifdef HAVE_IOCP
1447 , overlapped()
1448 #endif
1449 {
1450 DBUG_ASSERT(c->vio);
1451
1452 #ifdef _WIN32
1453 vio_type= c->vio->type;
1454 fd= (vio_type == VIO_TYPE_NAMEDPIPE) ?
1455 c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket);
1456 #else
1457 fd= mysql_socket_getfd(c->vio->mysql_socket);
1458 #endif
1459
1460 /* Assign connection to a group. */
1461 thread_group_t *group=
1462 &all_groups[c->thread_id%group_count];
1463
1464 thread_group=group;
1465
1466 mysql_mutex_lock(&group->mutex);
1467 group->connection_count++;
1468 mysql_mutex_unlock(&group->mutex);
1469 }
1470
~TP_connection_generic()1471 TP_connection_generic::~TP_connection_generic()
1472 {
1473 mysql_mutex_lock(&thread_group->mutex);
1474 thread_group->connection_count--;
1475 mysql_mutex_unlock(&thread_group->mutex);
1476 }
1477
1478 /**
1479 Set wait timeout for connection.
1480 */
1481
set_io_timeout(int timeout_sec)1482 void TP_connection_generic::set_io_timeout(int timeout_sec)
1483 {
1484 DBUG_ENTER("set_wait_timeout");
1485 /*
1486 Calculate wait deadline for this connection.
1487 Instead of using microsecond_interval_timer() which has a syscall
1488 overhead, use pool_timer.current_microtime and take
1489 into account that its value could be off by at most
1490 one tick interval.
1491 */
1492
1493 abs_wait_timeout= pool_timer.current_microtime +
1494 1000LL*pool_timer.tick_interval +
1495 1000000LL*timeout_sec;
1496
1497 set_next_timeout_check(abs_wait_timeout);
1498 DBUG_VOID_RETURN;
1499 }
1500
1501
1502 #ifndef HAVE_IOCP
1503 /**
1504 Handle a (rare) special case,where connection needs to
1505 migrate to a different group because group_count has changed
1506 after thread_pool_size setting.
1507 */
1508
change_group(TP_connection_generic * c,thread_group_t * old_group,thread_group_t * new_group)1509 static int change_group(TP_connection_generic *c,
1510 thread_group_t *old_group,
1511 thread_group_t *new_group)
1512 {
1513 int ret= 0;
1514
1515 DBUG_ASSERT(c->thread_group == old_group);
1516
1517 /* Remove connection from the old group. */
1518 mysql_mutex_lock(&old_group->mutex);
1519 if (c->bound_to_poll_descriptor)
1520 {
1521 io_poll_disassociate_fd(old_group->pollfd,c->fd);
1522 c->bound_to_poll_descriptor= false;
1523 }
1524 c->thread_group->connection_count--;
1525 mysql_mutex_unlock(&old_group->mutex);
1526
1527 /* Add connection to the new group. */
1528 mysql_mutex_lock(&new_group->mutex);
1529 c->thread_group= new_group;
1530 new_group->connection_count++;
1531 /* Ensure that there is a listener in the new group. */
1532 if (!new_group->thread_count)
1533 ret= create_worker(new_group);
1534 mysql_mutex_unlock(&new_group->mutex);
1535 return ret;
1536 }
1537 #endif
1538
start_io()1539 int TP_connection_generic::start_io()
1540 {
1541 #ifndef HAVE_IOCP
1542 /*
1543 Usually, connection will stay in the same group for the entire
1544 connection's life. However, we do allow group_count to
1545 change at runtime, which means in rare cases when it changes is
1546 connection should need to migrate to another group, this ensures
1547 to ensure equal load between groups.
1548
1549 So we recalculate in which group the connection should be, based
1550 on thread_id and current group count, and migrate if necessary.
1551 */
1552 thread_group_t *group =
1553 &all_groups[thd->thread_id%group_count];
1554
1555 if (group != thread_group)
1556 {
1557 if (change_group(this, thread_group, group))
1558 return -1;
1559 }
1560 #endif
1561
1562 /*
1563 Bind to poll descriptor if not yet done.
1564 */
1565 if (!bound_to_poll_descriptor)
1566 {
1567 bound_to_poll_descriptor= true;
1568 return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
1569 }
1570
1571 return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
1572 }
1573
1574
1575
1576 /**
1577 Worker thread's main
1578 */
1579
worker_main(void * param)1580 static void *worker_main(void *param)
1581 {
1582
1583 worker_thread_t this_thread;
1584 pthread_detach_this_thread();
1585 my_thread_init();
1586
1587 DBUG_ENTER("worker_main");
1588
1589 thread_group_t *thread_group = (thread_group_t *)param;
1590
1591 /* Init per-thread structure */
1592 mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
1593 this_thread.thread_group= thread_group;
1594 this_thread.event_count=0;
1595
1596 /* Run event loop */
1597 for(;;)
1598 {
1599 TP_connection_generic *connection;
1600 struct timespec ts;
1601 set_timespec(ts,threadpool_idle_timeout);
1602 connection = get_event(&this_thread, thread_group, &ts);
1603 if (!connection)
1604 break;
1605 this_thread.event_count++;
1606 tp_callback(connection);
1607 }
1608
1609 /* Thread shutdown: cleanup per-worker-thread structure. */
1610 mysql_cond_destroy(&this_thread.cond);
1611
1612 bool last_thread; /* last thread in group exits */
1613 mysql_mutex_lock(&thread_group->mutex);
1614 add_thread_count(thread_group, -1);
1615 last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
1616 mysql_mutex_unlock(&thread_group->mutex);
1617
1618 /* Last thread in group exits and pool is terminating, destroy group.*/
1619 if (last_thread)
1620 thread_group_destroy(thread_group);
1621
1622 my_thread_end();
1623 return NULL;
1624 }
1625
1626
TP_pool_generic()1627 TP_pool_generic::TP_pool_generic()
1628 {}
1629
init()1630 int TP_pool_generic::init()
1631 {
1632 DBUG_ENTER("TP_pool_generic::TP_pool_generic");
1633 threadpool_max_size= MY_MAX(threadpool_size, 128);
1634 all_groups= (thread_group_t *)
1635 my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
1636 if (!all_groups)
1637 {
1638 threadpool_max_size= 0;
1639 sql_print_error("Allocation failed");
1640 DBUG_RETURN(-1);
1641 }
1642 scheduler_init();
1643 threadpool_started= true;
1644 for (uint i= 0; i < threadpool_max_size; i++)
1645 {
1646 thread_group_init(&all_groups[i], get_connection_attrib());
1647 }
1648 set_pool_size(threadpool_size);
1649 if(group_count == 0)
1650 {
1651 /* Something went wrong */
1652 sql_print_error("Can't set threadpool size to %d",threadpool_size);
1653 DBUG_RETURN(-1);
1654 }
1655 PSI_register(mutex);
1656 PSI_register(cond);
1657 PSI_register(thread);
1658
1659 pool_timer.tick_interval= threadpool_stall_limit;
1660 start_timer(&pool_timer);
1661 DBUG_RETURN(0);
1662 }
1663
~TP_pool_generic()1664 TP_pool_generic::~TP_pool_generic()
1665 {
1666 DBUG_ENTER("tp_end");
1667
1668 if (!threadpool_started)
1669 DBUG_VOID_RETURN;
1670
1671 stop_timer(&pool_timer);
1672 shutdown_group_count= threadpool_max_size;
1673 for (uint i= 0; i < threadpool_max_size; i++)
1674 {
1675 thread_group_close(&all_groups[i]);
1676 }
1677
1678 /*
1679 Wait until memory occupied by all_groups is freed.
1680 */
1681 int timeout_ms=5000;
1682 while(all_groups && timeout_ms--)
1683 my_sleep(1000);
1684
1685 threadpool_started= false;
1686 DBUG_VOID_RETURN;
1687 }
1688
1689
1690 /** Ensure that poll descriptors are created when threadpool_size changes */
set_pool_size(uint size)1691 int TP_pool_generic::set_pool_size(uint size)
1692 {
1693 bool success= true;
1694
1695 for(uint i=0; i< size; i++)
1696 {
1697 thread_group_t *group= &all_groups[i];
1698 mysql_mutex_lock(&group->mutex);
1699 if (group->pollfd == INVALID_HANDLE_VALUE)
1700 {
1701 group->pollfd= io_poll_create();
1702 success= (group->pollfd != INVALID_HANDLE_VALUE);
1703 if(!success)
1704 {
1705 sql_print_error("io_poll_create() failed, errno=%d\n", errno);
1706 }
1707 }
1708 mysql_mutex_unlock(&group->mutex);
1709 if (!success)
1710 {
1711 group_count= i;
1712 return -1;
1713 }
1714 }
1715 group_count= size;
1716 return 0;
1717 }
1718
set_stall_limit(uint limit)1719 int TP_pool_generic::set_stall_limit(uint limit)
1720 {
1721 mysql_mutex_lock(&(pool_timer.mutex));
1722 pool_timer.tick_interval= limit;
1723 mysql_mutex_unlock(&(pool_timer.mutex));
1724 mysql_cond_signal(&(pool_timer.cond));
1725 return 0;
1726 }
1727
1728
1729 /**
1730 Calculate number of idle/waiting threads in the pool.
1731
1732 Sum idle threads over all groups.
1733 Don't do any locking, it is not required for stats.
1734 */
1735
get_idle_thread_count()1736 int TP_pool_generic::get_idle_thread_count()
1737 {
1738 int sum=0;
1739 for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
1740 {
1741 sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
1742 }
1743 return sum;
1744 }
1745
1746
1747 /* Report threadpool problems */
1748
1749 /**
1750 Delay in microseconds, after which "pool blocked" message is printed.
1751 (30 sec == 30 Mio usec)
1752 */
1753 #define BLOCK_MSG_DELAY (30*1000000)
1754
1755 #define MAX_THREADS_REACHED_MSG \
1756 "Threadpool could not create additional thread to handle queries, because the \
1757 number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
1758 parameter can help in this situation.\n \
1759 If 'extra_port' parameter is set, you can still connect to the database with \
1760 superuser account (it must be TCP connection using extra_port as TCP port) \
1761 and troubleshoot the situation. \
1762 A likely cause of pool blocks are clients that lock resources for long time. \
1763 'show processlist' or 'show engine innodb status' can give additional hints."
1764
1765 #define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
1766
1767 /**
1768 Write a message when blocking situation in threadpool occurs.
1769 The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
1770 It will be just a single message for each blocking situation (to prevent
1771 log flood).
1772 */
1773
print_pool_blocked_message(bool max_threads_reached)1774 static void print_pool_blocked_message(bool max_threads_reached)
1775 {
1776 ulonglong now;
1777 static bool msg_written;
1778
1779 now= microsecond_interval_timer();
1780 if (pool_block_start == 0)
1781 {
1782 pool_block_start= now;
1783 msg_written = false;
1784 return;
1785 }
1786
1787 if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
1788 {
1789 if (max_threads_reached)
1790 sql_print_warning(MAX_THREADS_REACHED_MSG);
1791 else
1792 sql_print_warning(CREATE_THREAD_ERROR_MSG, my_errno);
1793
1794 sql_print_information("Threadpool has been blocked for %u seconds\n",
1795 (uint)((now- pool_block_start)/1000000));
1796 /* avoid reperated messages for the same blocking situation */
1797 msg_written= true;
1798 }
1799 }
1800
1801 #endif /* HAVE_POOL_OF_THREADS */
1802