1 /* Copyright (C) 2012 Monty Program Ab
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #include "mariadb.h"
17 #include <violite.h>
18 #include <sql_priv.h>
19 #include <sql_class.h>
20 #include <my_pthread.h>
21 #include <scheduler.h>
22 
23 #ifdef HAVE_POOL_OF_THREADS
24 
25 #ifdef _WIN32
26 /* AIX may define this, too ?*/
27 #define HAVE_IOCP
28 #endif
29 
30 #ifdef HAVE_IOCP
31 #define OPTIONAL_IO_POLL_READ_PARAM this
32 #else
33 #define OPTIONAL_IO_POLL_READ_PARAM 0
34 #endif
35 
36 #ifdef _WIN32
37 typedef HANDLE TP_file_handle;
38 #else
39 typedef int TP_file_handle;
40 #define  INVALID_HANDLE_VALUE -1
41 #endif
42 
43 
44 #include <sql_connect.h>
45 #include <mysqld.h>
46 #include <debug_sync.h>
47 #include <time.h>
48 #include <sql_plist.h>
49 #include <threadpool.h>
50 #include <time.h>
51 #ifdef __linux__
52 #include <sys/epoll.h>
53 typedef struct epoll_event native_event;
54 #elif defined(HAVE_KQUEUE)
55 #include <sys/event.h>
56 typedef struct kevent native_event;
57 #elif defined (__sun)
58 #include <port.h>
59 typedef port_event_t native_event;
60 #elif defined (HAVE_IOCP)
61 typedef OVERLAPPED_ENTRY native_event;
62 #else
63 #error threadpool is not available on this platform
64 #endif
65 
66 
io_poll_close(TP_file_handle fd)67 static void io_poll_close(TP_file_handle fd)
68 {
69 #ifdef _WIN32
70   CloseHandle(fd);
71 #else
72   close(fd);
73 #endif
74 }
75 
76 
77 /** Maximum number of native events a listener can read in one go */
78 #define MAX_EVENTS 1024
79 
80 /** Indicates that threadpool was initialized*/
81 static bool threadpool_started= false;
82 
83 /*
84   Define PSI Keys for performance schema.
85   We have a mutex per group, worker threads, condition per worker thread,
86   and timer thread  with its own mutex and condition.
87 */
88 
89 
90 #ifdef HAVE_PSI_INTERFACE
91 static PSI_mutex_key key_group_mutex;
92 static PSI_mutex_key key_timer_mutex;
93 static PSI_mutex_info mutex_list[]=
94 {
95   { &key_group_mutex, "group_mutex", 0},
96   { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
97 };
98 
99 static PSI_cond_key key_worker_cond;
100 static PSI_cond_key key_timer_cond;
101 static PSI_cond_info cond_list[]=
102 {
103   { &key_worker_cond, "worker_cond", 0},
104   { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
105 };
106 
107 static PSI_thread_key key_worker_thread;
108 static PSI_thread_key key_timer_thread;
109 static PSI_thread_info	thread_list[] =
110 {
111  {&key_worker_thread, "worker_thread", 0},
112  {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
113 };
114 
115 /* Macro to simplify performance schema registration */
116 #define PSI_register(X) \
117  if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
118 #else
119 #define PSI_register(X) /* no-op */
120 #endif
121 
122 
123 struct thread_group_t;
124 
125 /* Per-thread structure for workers */
126 struct worker_thread_t
127 {
128   ulonglong  event_count; /* number of request handled by this thread */
129   thread_group_t* thread_group;
130   worker_thread_t *next_in_list;
131   worker_thread_t **prev_in_list;
132 
133   mysql_cond_t  cond;
134   bool          woken;
135 };
136 
137 typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
138                  &worker_thread_t::next_in_list,
139                  &worker_thread_t::prev_in_list>
140                  >
141 worker_list_t;
142 
143 struct TP_connection_generic:public TP_connection
144 {
145   TP_connection_generic(CONNECT *c);
146   ~TP_connection_generic();
147 
initTP_connection_generic148   virtual int init(){ return 0; };
149   virtual void set_io_timeout(int sec);
150   virtual int  start_io();
151   virtual void wait_begin(int type);
152   virtual void wait_end();
153 
154   thread_group_t *thread_group;
155   TP_connection_generic *next_in_queue;
156   TP_connection_generic **prev_in_queue;
157   ulonglong abs_wait_timeout;
158   ulonglong dequeue_time;
159   TP_file_handle fd;
160   bool bound_to_poll_descriptor;
161   int waiting;
162 #ifdef HAVE_IOCP
163   OVERLAPPED overlapped;
164 #endif
165 #ifdef _WIN32
166   enum_vio_type vio_type;
167 #endif
168 };
169 
170 
171 typedef I_P_List<TP_connection_generic,
172                      I_P_List_adapter<TP_connection_generic,
173                                       &TP_connection_generic::next_in_queue,
174                                       &TP_connection_generic::prev_in_queue>,
175                      I_P_List_null_counter,
176                      I_P_List_fast_push_back<TP_connection_generic> >
177 connection_queue_t;
178 
179 const int NQUEUES=2; /* We have high and low priority queues*/
180 
MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE)181 struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
182 {
183   mysql_mutex_t mutex;
184   connection_queue_t queues[NQUEUES];
185   worker_list_t waiting_threads;
186   worker_thread_t *listener;
187   pthread_attr_t *pthread_attr;
188   TP_file_handle  pollfd;
189   int  thread_count;
190   int  active_thread_count;
191   int  connection_count;
192   /* Stats for the deadlock detection timer routine.*/
193   int io_event_count;
194   int queue_event_count;
195   ulonglong last_thread_creation_time;
196   int  shutdown_pipe[2];
197   bool shutdown;
198   bool stalled;
199 };
200 
201 static thread_group_t *all_groups;
202 static uint group_count;
203 static int32 shutdown_group_count;
204 
205 /**
206  Used for printing "pool blocked" message, see
207  print_pool_blocked_message();
208 */
209 static ulonglong pool_block_start;
210 
211 /* Global timer for all groups  */
212 struct pool_timer_t
213 {
214   mysql_mutex_t mutex;
215   mysql_cond_t cond;
216   volatile uint64 current_microtime;
217   volatile uint64 next_timeout_check;
218   int  tick_interval;
219   bool shutdown;
220   pthread_t timer_thread_id;
221 };
222 
223 static pool_timer_t pool_timer;
224 
225 static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
226 static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
227 static int  wake_thread(thread_group_t *thread_group);
228 static int  wake_or_create_thread(thread_group_t *thread_group);
229 static int  create_worker(thread_group_t *thread_group);
230 static void *worker_main(void *param);
231 static void check_stall(thread_group_t *thread_group);
232 static void set_next_timeout_check(ulonglong abstime);
233 static void print_pool_blocked_message(bool);
234 
235 /**
236  Asynchronous network IO.
237 
238  We use native edge-triggered network IO multiplexing facility.
239  This maps to different APIs on different Unixes.
240 
241  Supported are currently Linux with epoll, Solaris with event ports,
242  OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags
243  (the event is signalled once client has written something into the socket,
244  then socket is removed from the "poll-set" until the  command is finished,
245  and we need to re-arm/re-register socket)
246 
247  No implementation for poll/select is currently provided.
248 
249  The API closely resembles all of the above mentioned platform APIs
250  and consists of following functions.
251 
252  - io_poll_create()
253  Creates an io_poll descriptor
254  On Linux: epoll_create()
255 
256  - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt)
257  Associate file descriptor with io poll descriptor
258  On Linux : epoll_ctl(..EPOLL_CTL_ADD))
259 
260  - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
261   Associate file descriptor with io poll descriptor
262   On Linux: epoll_ctl(..EPOLL_CTL_DEL)
263 
264 
265  - io_poll_start_read(int poll_fd,int fd, void *data, void *opt)
266  The same as io_poll_associate_fd(), but cannot be used before
267  io_poll_associate_fd() was called.
268  On Linux : epoll_ctl(..EPOLL_CTL_MOD)
269 
270  - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents,
271    int timeout_ms)
272 
273  wait until one or more descriptors added with io_poll_associate_fd()
274  or io_poll_start_read() becomes readable. Data associated with
275  descriptors can be retrieved from native_events array, using
276  native_event_get_userdata() function.
277 
278 
279  On Linux: epoll_wait()
280 */
281 
282 #if defined (__linux__)
283 #ifndef EPOLLRDHUP
284 /* Early 2.6 kernel did not have EPOLLRDHUP */
285 #define EPOLLRDHUP 0
286 #endif
io_poll_create()287 static TP_file_handle io_poll_create()
288 {
289   return epoll_create(1);
290 }
291 
292 
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)293 int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*)
294 {
295   struct epoll_event ev;
296   ev.data.u64= 0; /* Keep valgrind happy */
297   ev.data.ptr= data;
298   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
299   return epoll_ctl(pollfd, EPOLL_CTL_ADD,  fd, &ev);
300 }
301 
302 
303 
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)304 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
305 {
306   struct epoll_event ev;
307   ev.data.u64= 0; /* Keep valgrind happy */
308   ev.data.ptr= data;
309   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
310   return epoll_ctl(pollfd, EPOLL_CTL_MOD,  fd, &ev);
311 }
312 
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)313 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
314 {
315   struct epoll_event ev;
316   return epoll_ctl(pollfd, EPOLL_CTL_DEL,  fd, &ev);
317 }
318 
319 
320 /*
321  Wrapper around epoll_wait.
322  NOTE - in case of EINTR, it restarts with original timeout. Since we use
323  either infinite or 0 timeouts, this is not critical
324 */
io_poll_wait(TP_file_handle pollfd,native_event * native_events,int maxevents,int timeout_ms)325 int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents,
326               int timeout_ms)
327 {
328   int ret;
329   do
330   {
331     ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
332   }
333   while(ret == -1 && errno == EINTR);
334   return ret;
335 }
336 
337 
native_event_get_userdata(native_event * event)338 static void *native_event_get_userdata(native_event *event)
339 {
340   return event->data.ptr;
341 }
342 
343 #elif defined(HAVE_KQUEUE)
344 
345 /*
346   NetBSD is incompatible with other BSDs , last parameter in EV_SET macro
347   (udata, user data) needs to be intptr_t, whereas it needs to be void*
348   everywhere else.
349 */
350 
351 #ifdef __NetBSD__
352 #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, (intptr_t)g)
353 #else
354 #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, g)
355 #endif
356 
357 
io_poll_create()358 TP_file_handle io_poll_create()
359 {
360   return kqueue();
361 }
362 
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)363 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
364 {
365   struct kevent ke;
366   MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
367          0, 0, data);
368   return kevent(pollfd, &ke, 1, 0, 0, 0);
369 }
370 
371 
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)372 int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
373 {
374   struct kevent ke;
375   MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
376          0, 0, data);
377   return io_poll_start_read(pollfd,fd, data, 0);
378 }
379 
380 
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)381 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
382 {
383   struct kevent ke;
384   MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
385   return kevent(pollfd, &ke, 1, 0, 0, 0);
386 }
387 
388 
io_poll_wait(TP_file_handle pollfd,struct kevent * events,int maxevents,int timeout_ms)389 int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms)
390 {
391   struct timespec ts;
392   int ret;
393   if (timeout_ms >= 0)
394   {
395     ts.tv_sec= timeout_ms/1000;
396     ts.tv_nsec= (timeout_ms%1000)*1000000;
397   }
398   do
399   {
400     ret= kevent(pollfd, 0, 0, events, maxevents,
401                (timeout_ms >= 0)?&ts:NULL);
402   }
403   while (ret == -1 && errno == EINTR);
404   return ret;
405 }
406 
native_event_get_userdata(native_event * event)407 static void* native_event_get_userdata(native_event *event)
408 {
409   return (void *)event->udata;
410 }
411 
412 #elif defined (__sun)
413 
io_poll_create()414 static TP_file_handle io_poll_create()
415 {
416   return port_create();
417 }
418 
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)419 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
420 {
421   return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
422 }
423 
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void *)424 static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
425 {
426   return io_poll_start_read(pollfd, fd, data, 0);
427 }
428 
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)429 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
430 {
431   return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
432 }
433 
io_poll_wait(TP_file_handle pollfd,native_event * events,int maxevents,int timeout_ms)434 int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
435 {
436   struct timespec ts;
437   int ret;
438   uint_t nget= 1;
439   if (timeout_ms >= 0)
440   {
441     ts.tv_sec= timeout_ms/1000;
442     ts.tv_nsec= (timeout_ms%1000)*1000000;
443   }
444   do
445   {
446     ret= port_getn(pollfd, events, maxevents, &nget,
447             (timeout_ms >= 0)?&ts:NULL);
448   }
449   while (ret == -1 && errno == EINTR);
450   DBUG_ASSERT(nget < INT_MAX);
451   return (int)nget;
452 }
453 
native_event_get_userdata(native_event * event)454 static void* native_event_get_userdata(native_event *event)
455 {
456   return event->portev_user;
457 }
458 
459 #elif defined(HAVE_IOCP)
460 
461 
io_poll_create()462 static TP_file_handle io_poll_create()
463 {
464   return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
465 }
466 
467 
io_poll_start_read(TP_file_handle pollfd,TP_file_handle fd,void *,void * opt)468 int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
469 {
470   static char c;
471   TP_connection_generic *con= (TP_connection_generic *)opt;
472   OVERLAPPED *overlapped= &con->overlapped;
473   if (con->vio_type == VIO_TYPE_NAMEDPIPE)
474   {
475     if (ReadFile(fd, &c, 0, NULL, overlapped))
476       return 0;
477   }
478   else
479   {
480     WSABUF buf;
481     buf.buf= &c;
482     buf.len= 0;
483     DWORD flags=0;
484 
485     if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
486       return 0;
487   }
488 
489   if (GetLastError() == ERROR_IO_PENDING)
490     return 0;
491 
492   return 1;
493 }
494 
495 
io_poll_associate_fd(TP_file_handle pollfd,TP_file_handle fd,void * data,void * opt)496 static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt)
497 {
498   HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0);
499   if (!h)
500     return -1;
501   return io_poll_start_read(pollfd,fd, 0, opt);
502 }
503 
504 
io_poll_disassociate_fd(TP_file_handle pollfd,TP_file_handle fd)505 int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
506 {
507   /* Not possible to unbind/rebind file descriptor in IOCP. */
508   return 0;
509 }
510 
511 
io_poll_wait(TP_file_handle pollfd,native_event * events,int maxevents,int timeout_ms)512 int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
513 {
514   ULONG n;
515   BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
516      maxevents, &n, timeout_ms, FALSE);
517 
518   return ok ? (int)n : -1;
519 }
520 
521 
native_event_get_userdata(native_event * event)522 static void* native_event_get_userdata(native_event *event)
523 {
524   return (void *)event->lpCompletionKey;
525 }
526 #endif
527 
528 
529 /* Dequeue element from a workqueue */
530 
queue_get(thread_group_t * thread_group)531 static TP_connection_generic *queue_get(thread_group_t *thread_group)
532 {
533   DBUG_ENTER("queue_get");
534   thread_group->queue_event_count++;
535   TP_connection_generic *c;
536   for (int i=0; i < NQUEUES;i++)
537   {
538     c= thread_group->queues[i].pop_front();
539     if (c)
540       DBUG_RETURN(c);
541   }
542   DBUG_RETURN(0);
543 }
544 
is_queue_empty(thread_group_t * thread_group)545 static bool is_queue_empty(thread_group_t *thread_group)
546 {
547   for (int i=0; i < NQUEUES; i++)
548   {
549     if (!thread_group->queues[i].is_empty())
550       return false;
551   }
552   return true;
553 }
554 
555 
queue_init(thread_group_t * thread_group)556 static void queue_init(thread_group_t *thread_group)
557 {
558   for (int i=0; i < NQUEUES; i++)
559   {
560     thread_group->queues[i].empty();
561   }
562 }
563 
queue_put(thread_group_t * thread_group,native_event * ev,int cnt)564 static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
565 {
566   ulonglong now= pool_timer.current_microtime;
567   for(int i=0; i < cnt; i++)
568   {
569     TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
570     c->dequeue_time= now;
571     thread_group->queues[c->priority].push_back(c);
572   }
573 }
574 
575 /*
576   Handle wait timeout :
577   Find connections that have been idle for too long and kill them.
578   Also, recalculate time when next timeout check should run.
579 */
580 
timeout_check(pool_timer_t * timer)581 static void timeout_check(pool_timer_t *timer)
582 {
583   DBUG_ENTER("timeout_check");
584 
585   mysql_mutex_lock(&LOCK_thread_count);
586   I_List_iterator<THD> it(threads);
587 
588   /* Reset next timeout check, it will be recalculated in the loop below */
589   my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
590 
591   THD *thd;
592   while ((thd=it++))
593   {
594     TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
595     if (!connection || connection->state != TP_STATE_IDLE)
596     {
597       /*
598         Connection does not have scheduler data. This happens for example
599         if THD belongs to a different scheduler, that is listening to extra_port.
600       */
601       continue;
602     }
603 
604     if(connection->abs_wait_timeout < timer->current_microtime)
605     {
606       tp_timeout_handler(connection);
607     }
608     else
609     {
610       set_next_timeout_check(connection->abs_wait_timeout);
611     }
612   }
613   mysql_mutex_unlock(&LOCK_thread_count);
614   DBUG_VOID_RETURN;
615 }
616 
617 
618 /*
619  Timer thread.
620 
621   Periodically, check if one of the thread groups is stalled. Stalls happen if
622   events are not being dequeued from the queue, or from the network, Primary
623   reason for stall can be a lengthy executing non-blocking request. It could
624   also happen that thread is waiting but wait_begin/wait_end is forgotten by
625   storage engine. Timer thread will create a new thread in group in case of
626   a stall.
627 
628   Besides checking for stalls, timer thread is also responsible for terminating
629   clients that have been idle for longer than wait_timeout seconds.
630 
631   TODO: Let the timer sleep for long time if there is no work to be done.
632   Currently it wakes up rather often on and idle server.
633 */
634 
timer_thread(void * param)635 static void* timer_thread(void *param)
636 {
637   uint i;
638   pool_timer_t* timer=(pool_timer_t *)param;
639 
640   my_thread_init();
641   DBUG_ENTER("timer_thread");
642   timer->next_timeout_check= ULONGLONG_MAX;
643   timer->current_microtime= microsecond_interval_timer();
644 
645   for(;;)
646   {
647     struct timespec ts;
648     int err;
649 
650     set_timespec_nsec(ts,timer->tick_interval*1000000);
651     mysql_mutex_lock(&timer->mutex);
652     err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
653     if (timer->shutdown)
654     {
655       mysql_mutex_unlock(&timer->mutex);
656       break;
657     }
658     if (err == ETIMEDOUT)
659     {
660       timer->current_microtime= microsecond_interval_timer();
661 
662       /* Check stalls in thread groups */
663       for (i= 0; i < threadpool_max_size; i++)
664       {
665         if(all_groups[i].connection_count)
666            check_stall(&all_groups[i]);
667       }
668 
669       /* Check if any client exceeded wait_timeout */
670       if (timer->next_timeout_check <= timer->current_microtime)
671         timeout_check(timer);
672     }
673     mysql_mutex_unlock(&timer->mutex);
674   }
675 
676   mysql_mutex_destroy(&timer->mutex);
677   my_thread_end();
678   return NULL;
679 }
680 
681 
682 
check_stall(thread_group_t * thread_group)683 void check_stall(thread_group_t *thread_group)
684 {
685   mysql_mutex_lock(&thread_group->mutex);
686 
687   /*
688    Bump priority for the low priority connections that spent too much
689    time in low prio queue.
690   */
691   TP_connection_generic *c;
692   for (;;)
693   {
694     c= thread_group->queues[TP_PRIORITY_LOW].front();
695     if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer)
696     {
697       thread_group->queues[TP_PRIORITY_LOW].remove(c);
698       thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
699     }
700     else
701       break;
702   }
703 
704   /*
705     Check if listener is present. If not,  check whether any IO
706     events were dequeued since last time. If not, this means
707     listener is either in tight loop or thd_wait_begin()
708     was forgotten. Create a new worker(it will make itself listener).
709   */
710   if (!thread_group->listener && !thread_group->io_event_count)
711   {
712     wake_or_create_thread(thread_group);
713     mysql_mutex_unlock(&thread_group->mutex);
714     return;
715   }
716 
717   /*  Reset io event count */
718   thread_group->io_event_count= 0;
719 
720   /*
721     Check whether requests from the workqueue are being dequeued.
722 
723     The stall detection and resolution works as follows:
724 
725     1. There is a counter thread_group->queue_event_count for the number of
726        events removed from the queue. Timer resets the counter to 0 on each run.
727     2. Timer determines stall if this counter remains 0 since last check
728        and the queue is not empty.
729     3. Once timer determined a stall it sets thread_group->stalled flag and
730        wakes and idle worker (or creates a new one, subject to throttling).
731     4. The stalled flag is reset, when an event is dequeued.
732 
733     Q : Will this handling lead to an unbound growth of threads, if queue
734     stalls permanently?
735     A : No. If queue stalls permanently, it is an indication for many very long
736     simultaneous queries. The maximum number of simultanoues queries is
737     max_connections, further we have threadpool_max_threads limit, upon which no
738     worker threads are created. So in case there is a flood of very long
739     queries, threadpool would slowly approach thread-per-connection behavior.
740     NOTE:
741     If long queries never wait, creation of the new threads is done by timer,
742     so it is slower than in real thread-per-connection. However if long queries
743     do wait and indicate that via thd_wait_begin/end callbacks, thread creation
744     will be faster.
745   */
746   if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
747   {
748     thread_group->stalled= true;
749     wake_or_create_thread(thread_group);
750   }
751 
752   /* Reset queue event count */
753   thread_group->queue_event_count= 0;
754 
755   mysql_mutex_unlock(&thread_group->mutex);
756 }
757 
758 
start_timer(pool_timer_t * timer)759 static void start_timer(pool_timer_t* timer)
760 {
761   DBUG_ENTER("start_timer");
762   mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
763   mysql_cond_init(key_timer_cond, &timer->cond, NULL);
764   timer->shutdown = false;
765   mysql_thread_create(key_timer_thread, &timer->timer_thread_id, NULL,
766                       timer_thread, timer);
767   DBUG_VOID_RETURN;
768 }
769 
770 
stop_timer(pool_timer_t * timer)771 static void stop_timer(pool_timer_t *timer)
772 {
773   DBUG_ENTER("stop_timer");
774   mysql_mutex_lock(&timer->mutex);
775   timer->shutdown = true;
776   mysql_cond_signal(&timer->cond);
777   mysql_mutex_unlock(&timer->mutex);
778   pthread_join(timer->timer_thread_id, NULL);
779   DBUG_VOID_RETURN;
780 }
781 
782 
783 /**
784   Poll for socket events and distribute them to worker threads
785   In many case current thread will handle single event itself.
786 
787   @return a ready connection, or NULL on shutdown
788 */
listener(worker_thread_t * current_thread,thread_group_t * thread_group)789 static TP_connection_generic * listener(worker_thread_t *current_thread,
790                                thread_group_t *thread_group)
791 {
792   DBUG_ENTER("listener");
793   TP_connection_generic *retval= NULL;
794 
795   for(;;)
796   {
797     native_event ev[MAX_EVENTS];
798     int cnt;
799 
800     if (thread_group->shutdown)
801       break;
802 
803     cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
804 
805     if (cnt <=0)
806     {
807       DBUG_ASSERT(thread_group->shutdown);
808       break;
809     }
810 
811     mysql_mutex_lock(&thread_group->mutex);
812 
813     if (thread_group->shutdown)
814     {
815       mysql_mutex_unlock(&thread_group->mutex);
816       break;
817     }
818 
819     thread_group->io_event_count += cnt;
820 
821     /*
822      We got some network events and need to make decisions : whether
823      listener  hould handle events and whether or not any wake worker
824      threads so they can handle events.
825 
826      Q1 : Should listener handle an event itself, or put all events into
827      queue  and let workers handle the events?
828 
829      Solution :
830      Generally, listener that handles events itself is preferable. We do not
831      want listener thread to change its state from waiting  to running too
832      often, Since listener has just woken from poll, it better uses its time
833      slice and does some work. Besides, not handling events means they go to
834      the  queue, and often to wake another worker must wake up to handle the
835      event. This is not good, as we want to avoid wakeups.
836 
837      The downside of listener that also handles queries is that we can
838      potentially leave thread group  for long time not picking the new
839      network events. It is not  a major problem, because this stall will be
840      detected  sooner or later by  the timer thread. Still, relying on timer
841      is not always good, because it may "tick" too slow (large timer_interval)
842 
843      We use following strategy to solve this problem - if queue was not empty
844      we suspect flood of network events and listener stays, Otherwise, it
845      handles a query.
846 
847 
848      Q2: If queue is not empty, how many workers to wake?
849 
850      Solution:
851      We generally try to keep one thread per group active (threads handling
852      queries   are considered active, unless they stuck in inside some "wait")
853      Thus, we will wake only one worker, and only if there is not active
854      threads currently,and listener is not going to handle a query. When we
855      don't wake, we hope that  currently active  threads will finish fast and
856      handle the queue. If this does  not happen, timer thread will detect stall
857      and wake a worker.
858 
859      NOTE: Currently nothing is done to detect or prevent long queuing times.
860      A solution for the future would be to give up "one active thread per
861      group" principle, if events stay  in the queue for too long, and just wake
862      more workers.
863     */
864 
865     bool listener_picks_event=is_queue_empty(thread_group);
866     queue_put(thread_group, ev, cnt);
867     if (listener_picks_event)
868     {
869       /* Handle the first event. */
870       retval= queue_get(thread_group);
871       mysql_mutex_unlock(&thread_group->mutex);
872       break;
873     }
874 
875     if(thread_group->active_thread_count==0)
876     {
877       /* We added some work items to queue, now wake a worker. */
878       if(wake_thread(thread_group))
879       {
880         /*
881           Wake failed, hence groups has no idle threads. Now check if there are
882           any threads in the group except listener.
883         */
884         if(thread_group->thread_count == 1)
885         {
886            /*
887              Currently there is no worker thread in the group, as indicated by
888              thread_count == 1 (this means listener is the only one thread in
889              the group).
890              The queue is not empty, and listener is not going to handle
891              events. In order to drain the queue,  we create a worker here.
892              Alternatively, we could just rely on timer to detect stall, and
893              create thread, but waiting for timer would be an inefficient and
894              pointless delay.
895            */
896            create_worker(thread_group);
897         }
898       }
899     }
900     mysql_mutex_unlock(&thread_group->mutex);
901   }
902 
903   DBUG_RETURN(retval);
904 }
905 
906 /**
907   Adjust thread counters in group or global
908   whenever thread is created or is about to exit
909 
910   @param thread_group
911   @param count -  1, when new thread is created
912                  -1, when thread is about to exit
913 */
914 
add_thread_count(thread_group_t * thread_group,int32 count)915 static void add_thread_count(thread_group_t *thread_group, int32 count)
916 {
917   thread_group->thread_count += count;
918   /* worker starts out and end in "active" state */
919   thread_group->active_thread_count += count;
920   my_atomic_add32(&tp_stats.num_worker_threads, count);
921 }
922 
923 
924 /**
925   Creates a new worker thread.
926   thread_mutex must be held when calling this function
927 
928   NOTE: in rare cases, the number of threads can exceed
929   threadpool_max_threads, because we need at least 2 threads
930   per group to prevent deadlocks (one listener + one worker)
931 */
932 
create_worker(thread_group_t * thread_group)933 static int create_worker(thread_group_t *thread_group)
934 {
935   pthread_t thread_id;
936   bool max_threads_reached= false;
937   int err;
938 
939   DBUG_ENTER("create_worker");
940   if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
941      && thread_group->thread_count >= 2)
942   {
943     err= 1;
944     max_threads_reached= true;
945     goto end;
946   }
947 
948 
949   err= mysql_thread_create(key_worker_thread, &thread_id,
950          thread_group->pthread_attr, worker_main, thread_group);
951   if (!err)
952   {
953     thread_group->last_thread_creation_time=microsecond_interval_timer();
954     statistic_increment(thread_created,&LOCK_status);
955     add_thread_count(thread_group, 1);
956   }
957   else
958   {
959     my_errno= errno;
960   }
961 
962 end:
963   if (err)
964     print_pool_blocked_message(max_threads_reached);
965   else
966     pool_block_start= 0; /* Reset pool blocked timer, if it was set */
967 
968   DBUG_RETURN(err);
969 }
970 
971 
972 /**
973  Calculate microseconds throttling delay for thread creation.
974 
975  The value depends on how many threads are already in the group:
976  small number of threads means no delay, the more threads the larger
977  the delay.
978 
979  The actual values were not calculated using any scientific methods.
980  They just look right, and behave well in practice.
981 
982  TODO: Should throttling depend on thread_pool_stall_limit?
983 */
microsecond_throttling_interval(thread_group_t * thread_group)984 static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
985 {
986   int count= thread_group->thread_count;
987 
988   if (count < 4)
989     return 0;
990 
991   if (count < 8)
992     return 50*1000;
993 
994   if(count < 16)
995     return 100*1000;
996 
997   return 200*1000;
998 }
999 
1000 
1001 /**
1002   Wakes a worker thread, or creates a new one.
1003 
1004   Worker creation is throttled, so we avoid too many threads
1005   to be created during the short time.
1006 */
wake_or_create_thread(thread_group_t * thread_group)1007 static int wake_or_create_thread(thread_group_t *thread_group)
1008 {
1009   DBUG_ENTER("wake_or_create_thread");
1010 
1011   if (thread_group->shutdown)
1012    DBUG_RETURN(0);
1013 
1014   if (wake_thread(thread_group) == 0)
1015     DBUG_RETURN(0);
1016 
1017   if (thread_group->thread_count > thread_group->connection_count)
1018     DBUG_RETURN(-1);
1019 
1020 
1021   if (thread_group->active_thread_count == 0)
1022   {
1023     /*
1024      We're better off creating a new thread here  with no delay, either there
1025      are no workers at all, or they all are all blocking and there was no
1026      idle  thread to wakeup. Smells like a potential deadlock or very slowly
1027      executing requests, e.g sleeps or user locks.
1028     */
1029     DBUG_RETURN(create_worker(thread_group));
1030   }
1031 
1032   ulonglong now = microsecond_interval_timer();
1033   ulonglong time_since_last_thread_created =
1034     (now - thread_group->last_thread_creation_time);
1035 
1036   /* Throttle thread creation. */
1037   if (time_since_last_thread_created >
1038        microsecond_throttling_interval(thread_group))
1039   {
1040     DBUG_RETURN(create_worker(thread_group));
1041   }
1042 
1043   DBUG_RETURN(-1);
1044 }
1045 
1046 
1047 
thread_group_init(thread_group_t * thread_group,pthread_attr_t * thread_attr)1048 int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
1049 {
1050   DBUG_ENTER("thread_group_init");
1051   thread_group->pthread_attr = thread_attr;
1052   mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
1053   thread_group->pollfd= INVALID_HANDLE_VALUE;
1054   thread_group->shutdown_pipe[0]= -1;
1055   thread_group->shutdown_pipe[1]= -1;
1056   queue_init(thread_group);
1057   DBUG_RETURN(0);
1058 }
1059 
1060 
thread_group_destroy(thread_group_t * thread_group)1061 void thread_group_destroy(thread_group_t *thread_group)
1062 {
1063   mysql_mutex_destroy(&thread_group->mutex);
1064   if (thread_group->pollfd != INVALID_HANDLE_VALUE)
1065   {
1066     io_poll_close(thread_group->pollfd);
1067     thread_group->pollfd= INVALID_HANDLE_VALUE;
1068   }
1069 #ifndef HAVE_IOCP
1070   for(int i=0; i < 2; i++)
1071   {
1072     if(thread_group->shutdown_pipe[i] != -1)
1073     {
1074       close(thread_group->shutdown_pipe[i]);
1075       thread_group->shutdown_pipe[i]= -1;
1076     }
1077   }
1078 #endif
1079 
1080   if (my_atomic_add32(&shutdown_group_count, -1) == 1)
1081   {
1082     my_free(all_groups);
1083     all_groups= 0;
1084   }
1085 }
1086 
1087 /**
1088   Wake sleeping thread from waiting list
1089 */
1090 
wake_thread(thread_group_t * thread_group)1091 static int wake_thread(thread_group_t *thread_group)
1092 {
1093   DBUG_ENTER("wake_thread");
1094   worker_thread_t *thread = thread_group->waiting_threads.front();
1095   if(thread)
1096   {
1097     thread->woken= true;
1098     thread_group->waiting_threads.remove(thread);
1099     mysql_cond_signal(&thread->cond);
1100     DBUG_RETURN(0);
1101   }
1102   DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
1103 }
1104 
1105 /*
1106    Wake listener thread (during shutdown)
1107    Self-pipe trick is used in most cases,except IOCP.
1108 */
wake_listener(thread_group_t * thread_group)1109 static int wake_listener(thread_group_t *thread_group)
1110 {
1111 #ifndef HAVE_IOCP
1112   if (pipe(thread_group->shutdown_pipe))
1113   {
1114     return -1;
1115   }
1116 
1117   /* Wake listener */
1118   if (io_poll_associate_fd(thread_group->pollfd,
1119     thread_group->shutdown_pipe[0], NULL, NULL))
1120   {
1121     return -1;
1122   }
1123   char c= 0;
1124   if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
1125     return -1;
1126 #else
1127   PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0);
1128 #endif
1129   return 0;
1130 }
1131 /**
1132   Initiate shutdown for thread group.
1133 
1134   The shutdown is asynchronous, we only care to  wake all threads in here, so
1135   they can finish. We do not wait here until threads terminate. Final cleanup
1136   of the group (thread_group_destroy) will be done by the last exiting threads.
1137 */
1138 
thread_group_close(thread_group_t * thread_group)1139 static void thread_group_close(thread_group_t *thread_group)
1140 {
1141   DBUG_ENTER("thread_group_close");
1142 
1143   mysql_mutex_lock(&thread_group->mutex);
1144   if (thread_group->thread_count == 0)
1145   {
1146     mysql_mutex_unlock(&thread_group->mutex);
1147     thread_group_destroy(thread_group);
1148     DBUG_VOID_RETURN;
1149   }
1150 
1151   thread_group->shutdown= true;
1152   thread_group->listener= NULL;
1153 
1154   wake_listener(thread_group);
1155 
1156   /* Wake all workers. */
1157   while(wake_thread(thread_group) == 0)
1158   {
1159   }
1160 
1161   mysql_mutex_unlock(&thread_group->mutex);
1162 
1163   DBUG_VOID_RETURN;
1164 }
1165 
1166 
1167 /*
1168   Add work to the queue. Maybe wake a worker if they all sleep.
1169 
1170   Currently, this function is only used when new connections need to
1171   perform login (this is done in worker threads).
1172 
1173 */
1174 
queue_put(thread_group_t * thread_group,TP_connection_generic * connection)1175 static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection)
1176 {
1177   DBUG_ENTER("queue_put");
1178 
1179   connection->dequeue_time= pool_timer.current_microtime;
1180   thread_group->queues[connection->priority].push_back(connection);
1181 
1182   if (thread_group->active_thread_count == 0)
1183     wake_or_create_thread(thread_group);
1184 
1185   DBUG_VOID_RETURN;
1186 }
1187 
1188 
1189 /*
1190   Prevent too many threads executing at the same time,if the workload is
1191   not CPU bound.
1192 */
1193 
too_many_threads(thread_group_t * thread_group)1194 static bool too_many_threads(thread_group_t *thread_group)
1195 {
1196   return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe
1197    && !thread_group->stalled);
1198 }
1199 
1200 
1201 /**
1202   Retrieve a connection with pending event.
1203 
1204   Pending event in our case means that there is either a pending login request
1205   (if connection is not yet logged in), or there are unread bytes on the socket.
1206 
1207   If there are no pending events currently, thread will wait.
1208   If timeout specified in abstime parameter passes, the function returns NULL.
1209 
1210   @param current_thread - current worker thread
1211   @param thread_group - current thread group
1212   @param abstime - absolute wait timeout
1213 
1214   @return
1215   connection with pending event.
1216   NULL is returned if timeout has expired,or on shutdown.
1217 */
1218 
get_event(worker_thread_t * current_thread,thread_group_t * thread_group,struct timespec * abstime)1219 TP_connection_generic *get_event(worker_thread_t *current_thread,
1220   thread_group_t *thread_group,  struct timespec *abstime)
1221 {
1222   DBUG_ENTER("get_event");
1223   TP_connection_generic *connection = NULL;
1224 
1225 
1226   mysql_mutex_lock(&thread_group->mutex);
1227   DBUG_ASSERT(thread_group->active_thread_count >= 0);
1228 
1229   for(;;)
1230   {
1231     int err=0;
1232     bool oversubscribed = too_many_threads(thread_group);
1233     if (thread_group->shutdown)
1234      break;
1235 
1236     /* Check if queue is not empty */
1237     if (!oversubscribed)
1238     {
1239       connection = queue_get(thread_group);
1240       if(connection)
1241         break;
1242     }
1243 
1244     /* If there is  currently no listener in the group, become one. */
1245     if(!thread_group->listener)
1246     {
1247       thread_group->listener= current_thread;
1248       thread_group->active_thread_count--;
1249       mysql_mutex_unlock(&thread_group->mutex);
1250 
1251       connection = listener(current_thread, thread_group);
1252 
1253       mysql_mutex_lock(&thread_group->mutex);
1254       thread_group->active_thread_count++;
1255       /* There is no listener anymore, it just returned. */
1256       thread_group->listener= NULL;
1257       break;
1258     }
1259 
1260 
1261     /*
1262       Last thing we try before going to sleep is to
1263       non-blocking event poll, i.e with timeout = 0.
1264       If this returns events, pick one
1265     */
1266     if (!oversubscribed)
1267     {
1268 
1269       native_event ev[MAX_EVENTS];
1270       int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
1271       if (cnt > 0)
1272       {
1273         queue_put(thread_group, ev, cnt);
1274         connection= queue_get(thread_group);
1275         break;
1276       }
1277     }
1278 
1279 
1280     /* And now, finally sleep */
1281     current_thread->woken = false; /* wake() sets this to true */
1282 
1283     /*
1284       Add current thread to the head of the waiting list  and wait.
1285       It is important to add thread to the head rather than tail
1286       as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
1287     */
1288     thread_group->waiting_threads.push_front(current_thread);
1289 
1290     thread_group->active_thread_count--;
1291     if (abstime)
1292     {
1293       err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
1294                                  abstime);
1295     }
1296     else
1297     {
1298       err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
1299     }
1300     thread_group->active_thread_count++;
1301 
1302     if (!current_thread->woken)
1303     {
1304       /*
1305         Thread was not signalled by wake(), it might be a spurious wakeup or
1306         a timeout. Anyhow, we need to remove ourselves from the list now.
1307         If thread was explicitly woken, than caller removed us from the list.
1308       */
1309       thread_group->waiting_threads.remove(current_thread);
1310     }
1311 
1312     if (err)
1313       break;
1314   }
1315 
1316   thread_group->stalled= false;
1317   mysql_mutex_unlock(&thread_group->mutex);
1318 
1319   DBUG_RETURN(connection);
1320 }
1321 
1322 
1323 
1324 /**
1325   Tells the pool that worker starts waiting  on IO, lock, condition,
1326   sleep() or similar.
1327 */
1328 
wait_begin(thread_group_t * thread_group)1329 void wait_begin(thread_group_t *thread_group)
1330 {
1331   DBUG_ENTER("wait_begin");
1332   mysql_mutex_lock(&thread_group->mutex);
1333   thread_group->active_thread_count--;
1334 
1335   DBUG_ASSERT(thread_group->active_thread_count >=0);
1336   DBUG_ASSERT(thread_group->connection_count > 0);
1337 
1338   if ((thread_group->active_thread_count == 0) &&
1339      (!is_queue_empty(thread_group) || !thread_group->listener))
1340   {
1341     /*
1342       Group might stall while this thread waits, thus wake
1343       or create a worker to prevent stall.
1344     */
1345     wake_or_create_thread(thread_group);
1346   }
1347 
1348   mysql_mutex_unlock(&thread_group->mutex);
1349   DBUG_VOID_RETURN;
1350 }
1351 
1352 /**
1353   Tells the pool has finished waiting.
1354 */
1355 
wait_end(thread_group_t * thread_group)1356 void wait_end(thread_group_t *thread_group)
1357 {
1358   DBUG_ENTER("wait_end");
1359   mysql_mutex_lock(&thread_group->mutex);
1360   thread_group->active_thread_count++;
1361   mysql_mutex_unlock(&thread_group->mutex);
1362   DBUG_VOID_RETURN;
1363 }
1364 
1365 
1366 
1367 
new_connection(CONNECT * c)1368 TP_connection * TP_pool_generic::new_connection(CONNECT *c)
1369 {
1370   return new (std::nothrow) TP_connection_generic(c);
1371 }
1372 
1373 /**
1374   Add a new connection to thread pool..
1375 */
1376 
add(TP_connection * c)1377 void TP_pool_generic::add(TP_connection *c)
1378 {
1379   DBUG_ENTER("tp_add_connection");
1380 
1381   TP_connection_generic *connection=(TP_connection_generic *)c;
1382   thread_group_t *thread_group= connection->thread_group;
1383   /*
1384     Add connection to the work queue.Actual logon
1385     will be done by a worker thread.
1386   */
1387   mysql_mutex_lock(&thread_group->mutex);
1388   queue_put(thread_group, connection);
1389   mysql_mutex_unlock(&thread_group->mutex);
1390   DBUG_VOID_RETURN;
1391 }
1392 
1393 
1394 
1395 /**
1396   MySQL scheduler callback: wait begin
1397 */
1398 
wait_begin(int type)1399 void TP_connection_generic::wait_begin(int type)
1400 {
1401   DBUG_ENTER("wait_begin");
1402 
1403   DBUG_ASSERT(!waiting);
1404   waiting++;
1405   if (waiting == 1)
1406     ::wait_begin(thread_group);
1407   DBUG_VOID_RETURN;
1408 }
1409 
1410 
1411 /**
1412   MySQL scheduler callback: wait end
1413 */
1414 
wait_end()1415 void TP_connection_generic::wait_end()
1416 {
1417   DBUG_ENTER("wait_end");
1418   DBUG_ASSERT(waiting);
1419   waiting--;
1420   if (waiting == 0)
1421     ::wait_end(thread_group);
1422   DBUG_VOID_RETURN;
1423 }
1424 
1425 
set_next_timeout_check(ulonglong abstime)1426 static void set_next_timeout_check(ulonglong abstime)
1427 {
1428   DBUG_ENTER("set_next_timeout_check");
1429   while(abstime < pool_timer.next_timeout_check)
1430   {
1431     longlong old= (longlong)pool_timer.next_timeout_check;
1432     my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
1433           &old, abstime);
1434   }
1435   DBUG_VOID_RETURN;
1436 }
1437 
TP_connection_generic(CONNECT * c)1438 TP_connection_generic::TP_connection_generic(CONNECT *c):
1439   TP_connection(c),
1440   thread_group(0),
1441   next_in_queue(0),
1442   prev_in_queue(0),
1443   abs_wait_timeout(ULONGLONG_MAX),
1444   bound_to_poll_descriptor(false),
1445   waiting(false)
1446 #ifdef HAVE_IOCP
1447 , overlapped()
1448 #endif
1449 {
1450   DBUG_ASSERT(c->vio);
1451 
1452 #ifdef _WIN32
1453   vio_type= c->vio->type;
1454   fd= (vio_type == VIO_TYPE_NAMEDPIPE) ?
1455     c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket);
1456 #else
1457   fd= mysql_socket_getfd(c->vio->mysql_socket);
1458 #endif
1459 
1460   /* Assign connection to a group. */
1461   thread_group_t *group=
1462     &all_groups[c->thread_id%group_count];
1463 
1464   thread_group=group;
1465 
1466   mysql_mutex_lock(&group->mutex);
1467   group->connection_count++;
1468   mysql_mutex_unlock(&group->mutex);
1469 }
1470 
~TP_connection_generic()1471 TP_connection_generic::~TP_connection_generic()
1472 {
1473   mysql_mutex_lock(&thread_group->mutex);
1474   thread_group->connection_count--;
1475   mysql_mutex_unlock(&thread_group->mutex);
1476 }
1477 
1478 /**
1479   Set wait timeout for connection.
1480 */
1481 
set_io_timeout(int timeout_sec)1482 void TP_connection_generic::set_io_timeout(int timeout_sec)
1483 {
1484   DBUG_ENTER("set_wait_timeout");
1485   /*
1486     Calculate wait deadline for this connection.
1487     Instead of using microsecond_interval_timer() which has a syscall
1488     overhead, use pool_timer.current_microtime and take
1489     into account that its value could be off by at most
1490     one tick interval.
1491   */
1492 
1493   abs_wait_timeout= pool_timer.current_microtime +
1494     1000LL*pool_timer.tick_interval +
1495     1000000LL*timeout_sec;
1496 
1497   set_next_timeout_check(abs_wait_timeout);
1498   DBUG_VOID_RETURN;
1499 }
1500 
1501 
1502 #ifndef HAVE_IOCP
1503 /**
1504   Handle a (rare) special case,where connection needs to
1505   migrate to a different group because group_count has changed
1506   after thread_pool_size setting.
1507 */
1508 
change_group(TP_connection_generic * c,thread_group_t * old_group,thread_group_t * new_group)1509 static int change_group(TP_connection_generic *c,
1510  thread_group_t *old_group,
1511  thread_group_t *new_group)
1512 {
1513   int ret= 0;
1514 
1515   DBUG_ASSERT(c->thread_group == old_group);
1516 
1517   /* Remove connection from the old group. */
1518   mysql_mutex_lock(&old_group->mutex);
1519   if (c->bound_to_poll_descriptor)
1520   {
1521     io_poll_disassociate_fd(old_group->pollfd,c->fd);
1522     c->bound_to_poll_descriptor= false;
1523   }
1524   c->thread_group->connection_count--;
1525   mysql_mutex_unlock(&old_group->mutex);
1526 
1527   /* Add connection to the new group. */
1528   mysql_mutex_lock(&new_group->mutex);
1529   c->thread_group= new_group;
1530   new_group->connection_count++;
1531   /* Ensure that there is a listener in the new group. */
1532   if (!new_group->thread_count)
1533     ret= create_worker(new_group);
1534   mysql_mutex_unlock(&new_group->mutex);
1535   return ret;
1536 }
1537 #endif
1538 
start_io()1539 int TP_connection_generic::start_io()
1540 {
1541 #ifndef HAVE_IOCP
1542   /*
1543     Usually, connection will stay in the same group for the entire
1544     connection's life. However, we do allow group_count to
1545     change at runtime, which means in rare cases when it changes is
1546     connection should need to migrate  to another group, this ensures
1547     to ensure equal load between groups.
1548 
1549     So we recalculate in which group the connection should be, based
1550     on thread_id and current group count, and migrate if necessary.
1551   */
1552   thread_group_t *group =
1553     &all_groups[thd->thread_id%group_count];
1554 
1555   if (group != thread_group)
1556   {
1557     if (change_group(this, thread_group, group))
1558       return -1;
1559   }
1560 #endif
1561 
1562   /*
1563     Bind to poll descriptor if not yet done.
1564   */
1565   if (!bound_to_poll_descriptor)
1566   {
1567     bound_to_poll_descriptor= true;
1568     return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
1569   }
1570 
1571   return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
1572 }
1573 
1574 
1575 
1576 /**
1577   Worker thread's main
1578 */
1579 
worker_main(void * param)1580 static void *worker_main(void *param)
1581 {
1582 
1583   worker_thread_t this_thread;
1584   pthread_detach_this_thread();
1585   my_thread_init();
1586 
1587   DBUG_ENTER("worker_main");
1588 
1589   thread_group_t *thread_group = (thread_group_t *)param;
1590 
1591   /* Init per-thread structure */
1592   mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
1593   this_thread.thread_group= thread_group;
1594   this_thread.event_count=0;
1595 
1596   /* Run event loop */
1597   for(;;)
1598   {
1599     TP_connection_generic *connection;
1600     struct timespec ts;
1601     set_timespec(ts,threadpool_idle_timeout);
1602     connection = get_event(&this_thread, thread_group, &ts);
1603     if (!connection)
1604       break;
1605     this_thread.event_count++;
1606     tp_callback(connection);
1607   }
1608 
1609   /* Thread shutdown: cleanup per-worker-thread structure. */
1610   mysql_cond_destroy(&this_thread.cond);
1611 
1612   bool last_thread;                    /* last thread in group exits */
1613   mysql_mutex_lock(&thread_group->mutex);
1614   add_thread_count(thread_group, -1);
1615   last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
1616   mysql_mutex_unlock(&thread_group->mutex);
1617 
1618   /* Last thread in group exits and pool is terminating, destroy group.*/
1619   if (last_thread)
1620     thread_group_destroy(thread_group);
1621 
1622   my_thread_end();
1623   return NULL;
1624 }
1625 
1626 
TP_pool_generic()1627 TP_pool_generic::TP_pool_generic()
1628 {}
1629 
init()1630 int TP_pool_generic::init()
1631 {
1632   DBUG_ENTER("TP_pool_generic::TP_pool_generic");
1633   threadpool_max_size= MY_MAX(threadpool_size, 128);
1634   all_groups= (thread_group_t *)
1635     my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
1636   if (!all_groups)
1637   {
1638     threadpool_max_size= 0;
1639     sql_print_error("Allocation failed");
1640     DBUG_RETURN(-1);
1641   }
1642   scheduler_init();
1643   threadpool_started= true;
1644   for (uint i= 0; i < threadpool_max_size; i++)
1645   {
1646     thread_group_init(&all_groups[i], get_connection_attrib());
1647   }
1648   set_pool_size(threadpool_size);
1649   if(group_count == 0)
1650   {
1651     /* Something went wrong */
1652     sql_print_error("Can't set threadpool size to %d",threadpool_size);
1653     DBUG_RETURN(-1);
1654   }
1655   PSI_register(mutex);
1656   PSI_register(cond);
1657   PSI_register(thread);
1658 
1659   pool_timer.tick_interval= threadpool_stall_limit;
1660   start_timer(&pool_timer);
1661   DBUG_RETURN(0);
1662 }
1663 
~TP_pool_generic()1664 TP_pool_generic::~TP_pool_generic()
1665 {
1666   DBUG_ENTER("tp_end");
1667 
1668   if (!threadpool_started)
1669     DBUG_VOID_RETURN;
1670 
1671   stop_timer(&pool_timer);
1672   shutdown_group_count= threadpool_max_size;
1673   for (uint i= 0; i < threadpool_max_size; i++)
1674   {
1675     thread_group_close(&all_groups[i]);
1676   }
1677 
1678   /*
1679     Wait until memory occupied by all_groups is freed.
1680   */
1681   int timeout_ms=5000;
1682   while(all_groups && timeout_ms--)
1683     my_sleep(1000);
1684 
1685   threadpool_started= false;
1686   DBUG_VOID_RETURN;
1687 }
1688 
1689 
1690 /** Ensure that poll descriptors are created when threadpool_size changes */
set_pool_size(uint size)1691 int TP_pool_generic::set_pool_size(uint size)
1692 {
1693   bool success= true;
1694 
1695   for(uint i=0; i< size; i++)
1696   {
1697     thread_group_t *group= &all_groups[i];
1698     mysql_mutex_lock(&group->mutex);
1699     if (group->pollfd == INVALID_HANDLE_VALUE)
1700     {
1701       group->pollfd= io_poll_create();
1702       success= (group->pollfd != INVALID_HANDLE_VALUE);
1703       if(!success)
1704       {
1705         sql_print_error("io_poll_create() failed, errno=%d\n", errno);
1706       }
1707     }
1708     mysql_mutex_unlock(&group->mutex);
1709     if (!success)
1710     {
1711       group_count= i;
1712       return -1;
1713     }
1714   }
1715   group_count= size;
1716   return 0;
1717 }
1718 
set_stall_limit(uint limit)1719 int TP_pool_generic::set_stall_limit(uint limit)
1720 {
1721   mysql_mutex_lock(&(pool_timer.mutex));
1722   pool_timer.tick_interval= limit;
1723   mysql_mutex_unlock(&(pool_timer.mutex));
1724   mysql_cond_signal(&(pool_timer.cond));
1725   return 0;
1726 }
1727 
1728 
1729 /**
1730  Calculate number of idle/waiting threads in the pool.
1731 
1732  Sum idle threads over all groups.
1733  Don't do any locking, it is not required for stats.
1734 */
1735 
get_idle_thread_count()1736 int TP_pool_generic::get_idle_thread_count()
1737 {
1738   int sum=0;
1739   for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
1740   {
1741     sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
1742   }
1743   return sum;
1744 }
1745 
1746 
1747 /* Report threadpool problems */
1748 
1749 /**
1750    Delay in microseconds, after which "pool blocked" message is printed.
1751    (30 sec == 30 Mio usec)
1752 */
1753 #define BLOCK_MSG_DELAY (30*1000000)
1754 
1755 #define MAX_THREADS_REACHED_MSG \
1756 "Threadpool could not create additional thread to handle queries, because the \
1757 number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
1758 parameter can help in this situation.\n \
1759 If 'extra_port' parameter is set, you can still connect to the database with \
1760 superuser account (it must be TCP connection using extra_port as TCP port) \
1761 and troubleshoot the situation. \
1762 A likely cause of pool blocks are clients that lock resources for long time. \
1763 'show processlist' or 'show engine innodb status' can give additional hints."
1764 
1765 #define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
1766 
1767 /**
1768  Write a message when blocking situation in threadpool occurs.
1769  The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
1770  It will be just a single message for each blocking situation (to prevent
1771  log flood).
1772 */
1773 
print_pool_blocked_message(bool max_threads_reached)1774 static void print_pool_blocked_message(bool max_threads_reached)
1775 {
1776   ulonglong now;
1777   static bool msg_written;
1778 
1779   now= microsecond_interval_timer();
1780   if (pool_block_start == 0)
1781   {
1782     pool_block_start= now;
1783     msg_written = false;
1784     return;
1785   }
1786 
1787   if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
1788   {
1789     if (max_threads_reached)
1790       sql_print_warning(MAX_THREADS_REACHED_MSG);
1791     else
1792       sql_print_warning(CREATE_THREAD_ERROR_MSG, my_errno);
1793 
1794     sql_print_information("Threadpool has been blocked for %u seconds\n",
1795       (uint)((now- pool_block_start)/1000000));
1796     /* avoid reperated messages for the same blocking situation */
1797     msg_written= true;
1798   }
1799 }
1800 
1801 #endif /* HAVE_POOL_OF_THREADS */
1802