1 /* Copyright (C) 2012 Monty Program Ab
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
15 
16 #include <my_global.h>
17 #include <violite.h>
18 #include <sql_priv.h>
19 #include <sql_class.h>
20 #include <my_pthread.h>
21 #include <scheduler.h>
22 #include <sql_connect.h>
23 #include <mysqld.h>
24 #include <debug_sync.h>
25 #include <time.h>
26 #include <sql_plist.h>
27 #include <threadpool.h>
28 #include <global_threads.h>
29 #include <mysql/thread_pool_priv.h>             // thd_is_transaction_active()
30 #include <time.h>
31 #ifdef __linux__
32 #include <sys/epoll.h>
33 typedef struct epoll_event native_event;
34 #endif
35 #if defined (__FreeBSD__) || defined (__APPLE__) || defined(__DragonFly__)
36 #include <sys/event.h>
37 typedef struct kevent native_event;
38 #endif
39 #if defined (__sun)
40 #include <port.h>
41 typedef port_event_t native_event;
42 #endif
43 
44 /** Maximum number of native events a listener can read in one go */
45 #define MAX_EVENTS 1024
46 
47 /** Define if wait_begin() should create threads if necessary without waiting
48 for stall detection to kick in */
49 #define THREADPOOL_CREATE_THREADS_ON_WAIT
50 
51 /* Possible values for thread_pool_high_prio_mode */
52 const char *threadpool_high_prio_mode_names[]= {"transactions", "statements",
53                                                  "none", NullS};
54 
55 /** Indicates that threadpool was initialized*/
56 static bool threadpool_started= false;
57 
58 /*
59   Define PSI Keys for performance schema.
60   We have a mutex per group, worker threads, condition per worker thread,
61   and timer thread  with its own mutex and condition.
62 */
63 
64 
65 #ifdef HAVE_PSI_INTERFACE
66 static PSI_mutex_key key_group_mutex;
67 static PSI_mutex_key key_timer_mutex;
68 static PSI_mutex_info mutex_list[]=
69 {
70   { &key_group_mutex, "group_mutex", 0},
71   { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
72 };
73 
74 static PSI_cond_key key_worker_cond;
75 static PSI_cond_key key_timer_cond;
76 static PSI_cond_info cond_list[]=
77 {
78   { &key_worker_cond, "worker_cond", 0},
79   { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
80 };
81 
82 static PSI_thread_key key_worker_thread;
83 static PSI_thread_key key_timer_thread;
84 static PSI_thread_info	thread_list[] =
85 {
86  {&key_worker_thread, "worker_thread", 0},
87  {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
88 };
89 
90 /* Macro to simplify performance schema registration */
91 #define PSI_register(X) \
92  if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
93 #endif
94 
95 
96 struct thread_group_t;
97 
98 /* Per-thread structure for workers */
99 struct worker_thread_t
100 {
101   ulonglong  event_count; /* number of request handled by this thread */
102   thread_group_t* thread_group;
103   worker_thread_t *next_in_list;
104   worker_thread_t **prev_in_list;
105 
106   mysql_cond_t  cond;
107   bool          woken;
108 };
109 
110 typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
111                  &worker_thread_t::next_in_list,
112                  &worker_thread_t::prev_in_list>
113                  >
114 worker_list_t;
115 
116 struct connection_t
117 {
118 
119   THD *thd;
120   thread_group_t *thread_group;
121   connection_t *next_in_queue;
122   connection_t **prev_in_queue;
123   ulonglong abs_wait_timeout;
124   bool logged_in;
125   bool bound_to_poll_descriptor;
126   bool waiting;
127   uint tickets;
128 };
129 
130 typedef I_P_List<connection_t,
131                      I_P_List_adapter<connection_t,
132                                       &connection_t::next_in_queue,
133                                       &connection_t::prev_in_queue>,
134                      I_P_List_null_counter,
135                      I_P_List_fast_push_back<connection_t> >
136 connection_queue_t;
137 
138 struct thread_group_t
139 {
140   mysql_mutex_t mutex;
141   connection_queue_t queue;
142   connection_queue_t high_prio_queue;
143   worker_list_t waiting_threads;
144   worker_thread_t *listener;
145   pthread_attr_t *pthread_attr;
146   int  pollfd;
147   int  thread_count;
148   int  active_thread_count;
149   int  connection_count;
150   int  waiting_thread_count;
151   /* Stats for the deadlock detection timer routine.*/
152   int io_event_count;
153   int queue_event_count;
154   ulonglong last_thread_creation_time;
155   int  shutdown_pipe[2];
156   bool shutdown;
157   bool stalled;
158 
159 } MY_ALIGNED(512);
160 
161 static thread_group_t all_groups[MAX_THREAD_GROUPS];
162 static uint group_count;
163 
164 /**
165  Used for printing "pool blocked" message, see
166  print_pool_blocked_message();
167 */
168 static ulonglong pool_block_start;
169 
170 /* Global timer for all groups  */
171 struct pool_timer_t
172 {
173   mysql_mutex_t mutex;
174   mysql_cond_t cond;
175   volatile uint64 current_microtime;
176   volatile uint64 next_timeout_check;
177   int  tick_interval;
178   bool shutdown;
179 };
180 
181 static pool_timer_t pool_timer;
182 
183 static void queue_put(thread_group_t *thread_group, connection_t *connection);
184 static int  wake_thread(thread_group_t *thread_group);
185 static void handle_event(connection_t *connection);
186 static int  wake_or_create_thread(thread_group_t *thread_group);
187 static int  create_worker(thread_group_t *thread_group);
188 static void *worker_main(void *param);
189 static void check_stall(thread_group_t *thread_group);
190 static void connection_abort(connection_t *connection);
191 static void set_wait_timeout(connection_t *connection);
192 static void set_next_timeout_check(ulonglong abstime);
193 static void print_pool_blocked_message(bool);
194 
195 /**
196  Asynchronous network IO.
197 
198  We use native edge-triggered network IO multiplexing facility.
199  This maps to different APIs on different Unixes.
200 
201  Supported are currently Linux with epoll, Solaris with event ports,
202  OSX and BSD with kevent. All those API's are used with one-shot flags
203  (the event is signalled once client has written something into the socket,
204  then socket is removed from the "poll-set" until the  command is finished,
205  and we need to re-arm/re-register socket)
206 
207  No implementation for poll/select/AIO is currently provided.
208 
209  The API closely resembles all of the above mentioned platform APIs
210  and consists of following functions.
211 
212  - io_poll_create()
213  Creates an io_poll descriptor
214  On Linux: epoll_create()
215 
216  - io_poll_associate_fd(int poll_fd, int fd, void *data)
217  Associate file descriptor with io poll descriptor
218  On Linux : epoll_ctl(..EPOLL_CTL_ADD))
219 
220  - io_poll_disassociate_fd(int pollfd, int fd)
221   Associate file descriptor with io poll descriptor
222   On Linux: epoll_ctl(..EPOLL_CTL_DEL)
223 
224 
225  - io_poll_start_read(int poll_fd,int fd, void *data)
226  The same as io_poll_associate_fd(), but cannot be used before
227  io_poll_associate_fd() was called.
228  On Linux : epoll_ctl(..EPOLL_CTL_MOD)
229 
230  - io_poll_wait (int pollfd, native_event *native_events, int maxevents,
231    int timeout_ms)
232 
233  wait until one or more descriptors added with io_poll_associate_fd()
234  or io_poll_start_read() becomes readable. Data associated with
235  descriptors can be retrieved from native_events array, using
236  native_event_get_userdata() function.
237 
238 
239  On Linux: epoll_wait()
240 */
241 
242 #if defined (__linux__)
243 #ifndef EPOLLRDHUP
244 /* Early 2.6 kernel did not have EPOLLRDHUP */
245 #define EPOLLRDHUP 0
246 #endif
io_poll_create()247 static int io_poll_create()
248 {
249   return epoll_create(1);
250 }
251 
252 
io_poll_associate_fd(int pollfd,int fd,void * data)253 static int io_poll_associate_fd(int pollfd, int fd, void *data)
254 {
255   struct epoll_event ev;
256   ev.data.u64= 0; /* Keep valgrind happy */
257   ev.data.ptr= data;
258   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
259   return epoll_ctl(pollfd, EPOLL_CTL_ADD,  fd, &ev);
260 }
261 
262 
263 
io_poll_start_read(int pollfd,int fd,void * data)264 static int io_poll_start_read(int pollfd, int fd, void *data)
265 {
266   struct epoll_event ev;
267   ev.data.u64= 0; /* Keep valgrind happy */
268   ev.data.ptr= data;
269   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
270   return epoll_ctl(pollfd, EPOLL_CTL_MOD,  fd, &ev);
271 }
272 
io_poll_disassociate_fd(int pollfd,int fd)273 static int io_poll_disassociate_fd(int pollfd, int fd)
274 {
275   struct epoll_event ev;
276   return epoll_ctl(pollfd, EPOLL_CTL_DEL,  fd, &ev);
277 }
278 
279 
280 /*
281  Wrapper around epoll_wait.
282  NOTE - in case of EINTR, it restarts with original timeout. Since we use
283  either infinite or 0 timeouts, this is not critical
284 */
io_poll_wait(int pollfd,native_event * native_events,int maxevents,int timeout_ms)285 static int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
286                         int timeout_ms)
287 {
288   int ret;
289   do
290   {
291     ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
292   }
293   while(ret == -1 && errno == EINTR);
294   return ret;
295 }
296 
297 
native_event_get_userdata(native_event * event)298 static void *native_event_get_userdata(native_event *event)
299 {
300   return event->data.ptr;
301 }
302 
303 #elif defined (__FreeBSD__) || defined (__APPLE__) || defined(__DragonFly__)
io_poll_create()304 static int io_poll_create()
305 {
306   return kqueue();
307 }
308 
io_poll_start_read(int pollfd,int fd,void * data)309 static int io_poll_start_read(int pollfd, int fd, void *data)
310 {
311   struct kevent ke;
312   EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
313          0, 0, data);
314   return kevent(pollfd, &ke, 1, 0, 0, 0);
315 }
316 
317 
io_poll_associate_fd(int pollfd,int fd,void * data)318 static int io_poll_associate_fd(int pollfd, int fd, void *data)
319 {
320   struct kevent ke;
321   EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
322          0, 0, data);
323   return io_poll_start_read(pollfd,fd, data);
324 }
325 
326 
io_poll_disassociate_fd(int pollfd,int fd)327 static int io_poll_disassociate_fd(int pollfd, int fd)
328 {
329   struct kevent ke;
330   EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
331   return kevent(pollfd, &ke, 1, 0, 0, 0);
332 }
333 
334 
io_poll_wait(int pollfd,struct kevent * events,int maxevents,int timeout_ms)335 static int io_poll_wait(int pollfd, struct kevent *events, int maxevents,
336                         int timeout_ms)
337 {
338   struct timespec ts;
339   int ret;
340   if (timeout_ms >= 0)
341   {
342     ts.tv_sec= timeout_ms/1000;
343     ts.tv_nsec= (timeout_ms%1000)*1000000;
344   }
345   do
346   {
347     ret= kevent(pollfd, 0, 0, events, maxevents,
348                (timeout_ms >= 0)?&ts:NULL);
349   }
350   while (ret == -1 && errno == EINTR);
351   return ret;
352 }
353 
native_event_get_userdata(native_event * event)354 static void* native_event_get_userdata(native_event *event)
355 {
356   return event->udata;
357 }
358 
359 #elif defined (__sun)
360 
io_poll_create()361 static int io_poll_create()
362 {
363   return port_create();
364 }
365 
io_poll_start_read(int pollfd,int fd,void * data)366 static int io_poll_start_read(int pollfd, int fd, void *data)
367 {
368   return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
369 }
370 
io_poll_associate_fd(int pollfd,int fd,void * data)371 static int io_poll_associate_fd(int pollfd, int fd, void *data)
372 {
373   return io_poll_start_read(pollfd, fd, data);
374 }
375 
io_poll_disassociate_fd(int pollfd,int fd)376 static int io_poll_disassociate_fd(int pollfd, int fd)
377 {
378   return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
379 }
380 
io_poll_wait(int pollfd,native_event * events,int maxevents,int timeout_ms)381 static int io_poll_wait(int pollfd, native_event *events, int maxevents,
382                         int timeout_ms)
383 {
384   struct timespec ts;
385   int ret;
386   uint_t nget= 1;
387   if (timeout_ms >= 0)
388   {
389     ts.tv_sec= timeout_ms/1000;
390     ts.tv_nsec= (timeout_ms%1000)*1000000;
391   }
392   do
393   {
394     ret= port_getn(pollfd, events, maxevents, &nget,
395             (timeout_ms >= 0)?&ts:NULL);
396   }
397   while (ret == -1 && errno == EINTR);
398   DBUG_ASSERT(nget < INT_MAX);
399   return (int)nget;
400 }
401 
native_event_get_userdata(native_event * event)402 static void* native_event_get_userdata(native_event *event)
403 {
404   return event->portev_user;
405 }
406 #else
407 #error not ported yet to this OS
408 #endif
409 
410 namespace {
411 
412 /*
413   Prevent too many active threads executing at the same time, if the workload is
414   not CPU bound.
415 */
416 
too_many_active_threads(thread_group_t * thread_group)417 inline bool too_many_active_threads(thread_group_t *thread_group)
418 {
419   return (thread_group->active_thread_count
420           >= 1 + (int) threadpool_oversubscribe
421           && !thread_group->stalled);
422 }
423 
424 /*
425   Limit the number of 'busy' threads by 1 + thread_pool_oversubscribe. A thread
426   is busy if it is in either the active state or the waiting state (i.e. between
427   thd_wait_begin() / thd_wait_end() calls).
428 */
429 
too_many_busy_threads(thread_group_t * thread_group)430 inline bool too_many_busy_threads(thread_group_t *thread_group)
431 {
432   return (thread_group->active_thread_count + thread_group->waiting_thread_count
433           > 1 + (int) threadpool_oversubscribe);
434 }
435 
436 /*
437    Checks if a given connection is eligible to enter the high priority queue
438    based on its current thread_pool_high_prio_mode value, available high
439    priority tickets and transactional state and whether any locks are held.
440 */
441 
connection_is_high_prio(const connection_t * c)442 inline bool connection_is_high_prio(const connection_t *c)
443 {
444   const ulong mode= c->thd->variables.threadpool_high_prio_mode;
445 
446   return (mode == TP_HIGH_PRIO_MODE_STATEMENTS) ||
447     (mode == TP_HIGH_PRIO_MODE_TRANSACTIONS && c->tickets > 0 &&
448      (thd_is_transaction_active(c->thd) ||
449       c->thd->variables.option_bits & OPTION_TABLE_LOCK ||
450       c->thd->locked_tables_mode != LTM_NONE ||
451       c->thd->mdl_context.has_locks() ||
452       c->thd->global_read_lock.is_acquired() ||
453       c->thd->backup_tables_lock.is_acquired() ||
454       c->thd->backup_binlog_lock.is_acquired() ||
455       c->thd->ull_hash.records > 0));
456 }
457 
458 } // namespace
459 
460 /* Dequeue element from a workqueue */
461 
queue_get(thread_group_t * thread_group)462 static connection_t *queue_get(thread_group_t *thread_group)
463 {
464   DBUG_ENTER("queue_get");
465   thread_group->queue_event_count++;
466   connection_t *c;
467 
468   if ((c= thread_group->high_prio_queue.front()))
469   {
470     thread_group->high_prio_queue.remove(c);
471   }
472   /*
473     Don't pick events from the low priority queue if there are too many
474     active + waiting threads.
475   */
476   else if (!too_many_busy_threads(thread_group) &&
477            (c= thread_group->queue.front()))
478   {
479     thread_group->queue.remove(c);
480   }
481   DBUG_RETURN(c);
482 }
483 
484 
485 /*
486   Handle wait timeout :
487   Find connections that have been idle for too long and kill them.
488   Also, recalculate time when next timeout check should run.
489 */
490 
timeout_check(pool_timer_t * timer)491 static void timeout_check(pool_timer_t *timer)
492 {
493   std::set<THD*> global_thread_list_copy;
494   DBUG_ENTER("timeout_check");
495 
496   mysql_mutex_lock(&LOCK_thd_remove);
497   copy_global_thread_list(&global_thread_list_copy);
498 
499   Thread_iterator it= global_thread_list_copy.begin();
500   Thread_iterator end= global_thread_list_copy.end();
501 
502   /* Reset next timeout check, it will be recalculated in the loop below */
503   my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
504 
505   THD *thd;
506   for ( ; it != end; ++it)
507   {
508     thd= (*it);
509     if (thd->net.reading_or_writing != 1)
510       continue;
511 
512     connection_t *connection= (connection_t *)thd->event_scheduler.data;
513     if (!connection)
514     {
515       /*
516         Connection does not have scheduler data. This happens for example
517         if THD belongs to a different scheduler, that is listening to extra_port.
518       */
519       continue;
520     }
521 
522     if(connection->abs_wait_timeout < timer->current_microtime)
523     {
524       /* Wait timeout exceeded, kill connection. */
525       mysql_mutex_lock(&thd->LOCK_thd_data);
526       thd->killed = THD::KILL_CONNECTION;
527       tp_post_kill_notification(thd);
528       mysql_mutex_unlock(&thd->LOCK_thd_data);
529     }
530     else
531     {
532       set_next_timeout_check(connection->abs_wait_timeout);
533     }
534   }
535   mysql_mutex_unlock(&LOCK_thd_remove);
536   DBUG_VOID_RETURN;
537 }
538 
539 
540 /*
541  Timer thread.
542 
543   Periodically, check if one of the thread groups is stalled. Stalls happen if
544   events are not being dequeued from the queue, or from the network, Primary
545   reason for stall can be a lengthy executing non-blocking request. It could
546   also happen that thread is waiting but wait_begin/wait_end is forgotten by
547   storage engine. Timer thread will create a new thread in group in case of
548   a stall.
549 
550   Besides checking for stalls, timer thread is also responsible for terminating
551   clients that have been idle for longer than wait_timeout seconds.
552 
553   TODO: Let the timer sleep for long time if there is no work to be done.
554   Currently it wakes up rather often on and idle server.
555 */
556 
timer_thread(void * param)557 static void* timer_thread(void *param)
558 {
559   uint i;
560   pool_timer_t* timer=(pool_timer_t *)param;
561 
562   my_thread_init();
563   DBUG_ENTER("timer_thread");
564   timer->next_timeout_check= ULONGLONG_MAX;
565   timer->current_microtime= my_microsecond_getsystime();
566 
567   for(;;)
568   {
569     struct timespec ts;
570     int err;
571 
572     set_timespec_nsec(ts,timer->tick_interval * 1000000ULL);
573     mysql_mutex_lock(&timer->mutex);
574     err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
575     if (timer->shutdown)
576     {
577       mysql_mutex_unlock(&timer->mutex);
578       break;
579     }
580     if (err == ETIMEDOUT)
581     {
582       timer->current_microtime= my_microsecond_getsystime();
583 
584       /* Check stalls in thread groups */
585       for(i=0; i< array_elements(all_groups);i++)
586       {
587         if(all_groups[i].connection_count)
588            check_stall(&all_groups[i]);
589       }
590 
591       /* Check if any client exceeded wait_timeout */
592       if (timer->next_timeout_check <= timer->current_microtime)
593         timeout_check(timer);
594     }
595     mysql_mutex_unlock(&timer->mutex);
596   }
597 
598   mysql_mutex_destroy(&timer->mutex);
599   my_thread_end();
600   return NULL;
601 }
602 
603 /*
604   Check if both the high and low priority queues are empty.
605 
606   NOTE: we also consider the low priority queue empty in case it has events, but
607   they cannot be processed due to the too_many_busy_threads() limit.
608 */
queues_are_empty(thread_group_t * tg)609 static bool queues_are_empty(thread_group_t *tg)
610 {
611   return (tg->high_prio_queue.is_empty() &&
612           (tg->queue.is_empty() || too_many_busy_threads(tg)));
613 }
614 
check_stall(thread_group_t * thread_group)615 void check_stall(thread_group_t *thread_group)
616 {
617   if (mysql_mutex_trylock(&thread_group->mutex) != 0)
618   {
619     /* Something happens. Don't disturb */
620     return;
621   }
622 
623   /*
624     Check if listener is present. If not,  check whether any IO
625     events were dequeued since last time. If not, this means
626     listener is either in tight loop or thd_wait_begin()
627     was forgotten. Create a new worker(it will make itself listener).
628   */
629   if (!thread_group->listener && !thread_group->io_event_count)
630   {
631     wake_or_create_thread(thread_group);
632     mysql_mutex_unlock(&thread_group->mutex);
633     return;
634   }
635 
636   /*  Reset io event count */
637   thread_group->io_event_count= 0;
638 
639   /*
640     Check whether requests from the workqueues are being dequeued.
641 
642     The stall detection and resolution works as follows:
643 
644     1. There is a counter thread_group->queue_event_count for the number of
645        events removed from the queues. Timer resets the counter to 0 on each run.
646     2. Timer determines stall if this counter remains 0 since last check
647        and at least one of the high and low priority queues is not empty.
648     3. Once timer determined a stall it sets thread_group->stalled flag and
649        wakes and idle worker (or creates a new one, subject to throttling).
650     4. The stalled flag is reset, when an event is dequeued.
651 
652     Q : Will this handling lead to an unbound growth of threads, if queues
653     stall permanently?
654     A : No. If queues stall permanently, it is an indication for many very long
655     simultaneous queries. The maximum number of simultanoues queries is
656     max_connections, further we have threadpool_max_threads limit, upon which no
657     worker threads are created. So in case there is a flood of very long
658     queries, threadpool would slowly approach thread-per-connection behavior.
659     NOTE:
660     If long queries never wait, creation of the new threads is done by timer,
661     so it is slower than in real thread-per-connection. However if long queries
662     do wait and indicate that via thd_wait_begin/end callbacks, thread creation
663     will be faster.
664   */
665   if (!thread_group->queue_event_count && !queues_are_empty(thread_group))
666   {
667     thread_group->stalled= true;
668     wake_or_create_thread(thread_group);
669   }
670 
671   /* Reset queue event count */
672   thread_group->queue_event_count= 0;
673 
674   mysql_mutex_unlock(&thread_group->mutex);
675 }
676 
677 
start_timer(pool_timer_t * timer)678 static void start_timer(pool_timer_t* timer)
679 {
680   pthread_t thread_id;
681   DBUG_ENTER("start_timer");
682   mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
683   mysql_cond_init(key_timer_cond, &timer->cond, NULL);
684   timer->shutdown = false;
685   mysql_thread_create(key_timer_thread,&thread_id, NULL, timer_thread, timer);
686   DBUG_VOID_RETURN;
687 }
688 
689 
stop_timer(pool_timer_t * timer)690 static void stop_timer(pool_timer_t *timer)
691 {
692   DBUG_ENTER("stop_timer");
693   mysql_mutex_lock(&timer->mutex);
694   timer->shutdown = true;
695   mysql_cond_signal(&timer->cond);
696   mysql_mutex_unlock(&timer->mutex);
697   DBUG_VOID_RETURN;
698 }
699 
700 
701 /**
702   Poll for socket events and distribute them to worker threads
703   In many case current thread will handle single event itself.
704 
705   @return a ready connection, or NULL on shutdown
706 */
listener(worker_thread_t * current_thread,thread_group_t * thread_group)707 static connection_t * listener(worker_thread_t *current_thread,
708                                thread_group_t *thread_group)
709 {
710   DBUG_ENTER("listener");
711   connection_t *retval= NULL;
712 
713   for(;;)
714   {
715     native_event ev[MAX_EVENTS];
716     int cnt;
717 
718     if (thread_group->shutdown)
719       break;
720 
721     cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
722 
723     if (cnt <=0)
724     {
725       DBUG_ASSERT(thread_group->shutdown);
726       break;
727     }
728 
729     mysql_mutex_lock(&thread_group->mutex);
730 
731     if (thread_group->shutdown)
732     {
733       mysql_mutex_unlock(&thread_group->mutex);
734       break;
735     }
736 
737     thread_group->io_event_count += cnt;
738 
739     /*
740      We got some network events and need to make decisions : whether
741      listener  hould handle events and whether or not any wake worker
742      threads so they can handle events.
743 
744      Q1 : Should listener handle an event itself, or put all events into
745      queue  and let workers handle the events?
746 
747      Solution :
748      Generally, listener that handles events itself is preferable. We do not
749      want listener thread to change its state from waiting  to running too
750      often, Since listener has just woken from poll, it better uses its time
751      slice and does some work. Besides, not handling events means they go to
752      the  queue, and often to wake another worker must wake up to handle the
753      event. This is not good, as we want to avoid wakeups.
754 
755      The downside of listener that also handles queries is that we can
756      potentially leave thread group  for long time not picking the new
757      network events. It is not  a major problem, because this stall will be
758      detected  sooner or later by  the timer thread. Still, relying on timer
759      is not always good, because it may "tick" too slow (large timer_interval)
760 
761      We use following strategy to solve this problem - if queue was not empty
762      we suspect flood of network events and listener stays, Otherwise, it
763      handles a query.
764 
765 
766      Q2: If queue is not empty, how many workers to wake?
767 
768      Solution:
769      We generally try to keep one thread per group active (threads handling
770      queries   are considered active, unless they stuck in inside some "wait")
771      Thus, we will wake only one worker, and only if there is not active
772      threads currently,and listener is not going to handle a query. When we
773      don't wake, we hope that  currently active  threads will finish fast and
774      handle the queue. If this does  not happen, timer thread will detect stall
775      and wake a worker.
776 
777      NOTE: Currently nothing is done to detect or prevent long queuing times.
778      A solutionc for the future would be to give up "one active thread per
779      group" principle, if events stay  in the queue for too long, and just wake
780      more workers.
781     */
782 
783     bool listener_picks_event= thread_group->high_prio_queue.is_empty() &&
784       thread_group->queue.is_empty();
785 
786     /*
787       If listener_picks_event is set, listener thread will handle first event,
788       and put the rest into the queue. If listener_pick_event is not set, all
789       events go to the queue.
790     */
791     for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
792     {
793       connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
794       if (connection_is_high_prio(c))
795       {
796         c->tickets--;
797         thread_group->high_prio_queue.push_back(c);
798       }
799       else
800       {
801         c->tickets= c->thd->variables.threadpool_high_prio_tickets;
802         thread_group->queue.push_back(c);
803       }
804     }
805 
806     if (listener_picks_event)
807     {
808       /* Handle the first event. */
809       retval= (connection_t *)native_event_get_userdata(&ev[0]);
810       mysql_mutex_unlock(&thread_group->mutex);
811       break;
812     }
813 
814     if(thread_group->active_thread_count==0)
815     {
816       /* We added some work items to queue, now wake a worker. */
817       if(wake_thread(thread_group))
818       {
819         /*
820           Wake failed, hence groups has no idle threads. Now check if there are
821           any threads in the group except listener.
822         */
823         if(thread_group->thread_count == 1)
824         {
825            /*
826              Currently there is no worker thread in the group, as indicated by
827              thread_count == 1 (this means listener is the only one thread in
828              the group).
829              The queue is not empty, and listener is not going to handle
830              events. In order to drain the queue,  we create a worker here.
831              Alternatively, we could just rely on timer to detect stall, and
832              create thread, but waiting for timer would be an inefficient and
833              pointless delay.
834            */
835            create_worker(thread_group);
836         }
837       }
838     }
839     mysql_mutex_unlock(&thread_group->mutex);
840   }
841 
842   DBUG_RETURN(retval);
843 }
844 
845 /**
846   Adjust thread counters in group or global
847   whenever thread is created or is about to exit
848 
849   @param thread_group
850   @param count -  1, when new thread is created
851                  -1, when thread is about to exit
852 */
853 
add_thread_count(thread_group_t * thread_group,int32 count)854 static void add_thread_count(thread_group_t *thread_group, int32 count)
855 {
856   thread_group->thread_count += count;
857   /* worker starts out and end in "active" state */
858   thread_group->active_thread_count += count;
859   my_atomic_add32(&tp_stats.num_worker_threads, count);
860 }
861 
862 
863 /**
864   Creates a new worker thread.
865   thread_mutex must be held when calling this function
866 
867   NOTE: in rare cases, the number of threads can exceed
868   threadpool_max_threads, because we need at least 2 threads
869   per group to prevent deadlocks (one listener + one worker)
870 */
871 
create_worker(thread_group_t * thread_group)872 static int create_worker(thread_group_t *thread_group)
873 {
874   pthread_t thread_id;
875   bool max_threads_reached= false;
876   int err;
877 
878   DBUG_ENTER("create_worker");
879   if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
880      && thread_group->thread_count >= 2)
881   {
882     err= 1;
883     max_threads_reached= true;
884     goto end;
885   }
886 
887 
888   err= mysql_thread_create(key_worker_thread, &thread_id,
889          thread_group->pthread_attr, worker_main, thread_group);
890   if (!err)
891   {
892     thread_group->last_thread_creation_time=my_microsecond_getsystime();
893     thread_created++;
894     add_thread_count(thread_group, 1);
895   }
896   else
897   {
898     my_errno= errno;
899   }
900 
901 end:
902   if (err)
903     print_pool_blocked_message(max_threads_reached);
904   else
905     pool_block_start= 0; /* Reset pool blocked timer, if it was set */
906 
907   DBUG_RETURN(err);
908 }
909 
910 
911 /**
912  Calculate microseconds throttling delay for thread creation.
913 
914  The value depends on how many threads are already in the group:
915  small number of threads means no delay, the more threads the larger
916  the delay.
917 
918  The actual values were not calculated using any scientific methods.
919  They just look right, and behave well in practice.
920 
921  TODO: Should throttling depend on thread_pool_stall_limit?
922 */
microsecond_throttling_interval(thread_group_t * thread_group)923 static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
924 {
925   int count= thread_group->thread_count;
926 
927   if (count < 4)
928     return 0;
929 
930   if (count < 8)
931     return 50*1000;
932 
933   if(count < 16)
934     return 100*1000;
935 
936   return 200*1000;
937 }
938 
939 
940 /**
941   Wakes a worker thread, or creates a new one.
942 
943   Worker creation is throttled, so we avoid too many threads
944   to be created during the short time.
945 */
wake_or_create_thread(thread_group_t * thread_group)946 static int wake_or_create_thread(thread_group_t *thread_group)
947 {
948   DBUG_ENTER("wake_or_create_thread");
949 
950   if (thread_group->shutdown)
951    DBUG_RETURN(0);
952 
953   if (wake_thread(thread_group) == 0)
954     DBUG_RETURN(0);
955 
956   if (thread_group->thread_count > thread_group->connection_count)
957     DBUG_RETURN(-1);
958 
959 
960   if (thread_group->active_thread_count == 0)
961   {
962     /*
963      We're better off creating a new thread here  with no delay, either there
964      are no workers at all, or they all are all blocking and there was no
965      idle  thread to wakeup. Smells like a potential deadlock or very slowly
966      executing requests, e.g sleeps or user locks.
967     */
968     DBUG_RETURN(create_worker(thread_group));
969   }
970 
971   ulonglong now = my_microsecond_getsystime();
972   ulonglong time_since_last_thread_created =
973     (now - thread_group->last_thread_creation_time);
974 
975   /* Throttle thread creation. */
976   if (time_since_last_thread_created >
977        microsecond_throttling_interval(thread_group))
978   {
979     DBUG_RETURN(create_worker(thread_group));
980   }
981 
982   DBUG_RETURN(-1);
983 }
984 
985 
986 
thread_group_init(thread_group_t * thread_group,pthread_attr_t * thread_attr)987 static int thread_group_init(thread_group_t *thread_group,
988                              pthread_attr_t* thread_attr)
989 {
990   DBUG_ENTER("thread_group_init");
991   thread_group->pthread_attr = thread_attr;
992   mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
993   thread_group->pollfd= -1;
994   thread_group->shutdown_pipe[0]= -1;
995   thread_group->shutdown_pipe[1]= -1;
996   DBUG_RETURN(0);
997 }
998 
999 
thread_group_destroy(thread_group_t * thread_group)1000 static void thread_group_destroy(thread_group_t *thread_group)
1001 {
1002   mysql_mutex_destroy(&thread_group->mutex);
1003   if (thread_group->pollfd != -1)
1004   {
1005     close(thread_group->pollfd);
1006     thread_group->pollfd= -1;
1007   }
1008   for(int i=0; i < 2; i++)
1009   {
1010     if(thread_group->shutdown_pipe[i] != -1)
1011     {
1012       close(thread_group->shutdown_pipe[i]);
1013       thread_group->shutdown_pipe[i]= -1;
1014     }
1015   }
1016 }
1017 
1018 /**
1019   Wake sleeping thread from waiting list
1020 */
1021 
wake_thread(thread_group_t * thread_group)1022 static int wake_thread(thread_group_t *thread_group)
1023 {
1024   DBUG_ENTER("wake_thread");
1025   worker_thread_t *thread = thread_group->waiting_threads.front();
1026   if(thread)
1027   {
1028     thread->woken= true;
1029     thread_group->waiting_threads.remove(thread);
1030     mysql_cond_signal(&thread->cond);
1031     DBUG_RETURN(0);
1032   }
1033   DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
1034 }
1035 
1036 
1037 /**
1038   Initiate shutdown for thread group.
1039 
1040   The shutdown is asynchronous, we only care to  wake all threads in here, so
1041   they can finish. We do not wait here until threads terminate. Final cleanup
1042   of the group (thread_group_destroy) will be done by the last exiting threads.
1043 */
1044 
thread_group_close(thread_group_t * thread_group)1045 static void thread_group_close(thread_group_t *thread_group)
1046 {
1047   DBUG_ENTER("thread_group_close");
1048 
1049   mysql_mutex_lock(&thread_group->mutex);
1050   if (thread_group->thread_count == 0)
1051   {
1052     mysql_mutex_unlock(&thread_group->mutex);
1053     thread_group_destroy(thread_group);
1054     DBUG_VOID_RETURN;
1055   }
1056 
1057   thread_group->shutdown= true;
1058   thread_group->listener= NULL;
1059 
1060   if (pipe(thread_group->shutdown_pipe))
1061   {
1062     mysql_mutex_unlock(&thread_group->mutex);
1063     DBUG_VOID_RETURN;
1064   }
1065 
1066   /* Wake listener */
1067   if (io_poll_associate_fd(thread_group->pollfd,
1068       thread_group->shutdown_pipe[0], NULL))
1069   {
1070     mysql_mutex_unlock(&thread_group->mutex);
1071     DBUG_VOID_RETURN;
1072   }
1073   char c= 0;
1074   if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
1075   {
1076     mysql_mutex_unlock(&thread_group->mutex);
1077     DBUG_VOID_RETURN;
1078   }
1079 
1080   /* Wake all workers. */
1081   while(wake_thread(thread_group) == 0)
1082   {
1083   }
1084 
1085   mysql_mutex_unlock(&thread_group->mutex);
1086 
1087   DBUG_VOID_RETURN;
1088 }
1089 
1090 
1091 /*
1092   Add work to the queue. Maybe wake a worker if they all sleep.
1093 
1094   Currently, this function is only used when new connections need to
1095   perform login (this is done in worker threads).
1096 
1097 */
1098 
queue_put(thread_group_t * thread_group,connection_t * connection)1099 static void queue_put(thread_group_t *thread_group, connection_t *connection)
1100 {
1101   DBUG_ENTER("queue_put");
1102 
1103   mysql_mutex_lock(&thread_group->mutex);
1104   connection->tickets= connection->thd->variables.threadpool_high_prio_tickets;
1105   thread_group->queue.push_back(connection);
1106 
1107   if (thread_group->active_thread_count == 0)
1108     wake_or_create_thread(thread_group);
1109 
1110   mysql_mutex_unlock(&thread_group->mutex);
1111 
1112   DBUG_VOID_RETURN;
1113 }
1114 
1115 /**
1116   Retrieve a connection with pending event.
1117 
1118   Pending event in our case means that there is either a pending login request
1119   (if connection is not yet logged in), or there are unread bytes on the socket.
1120 
1121   If there are no pending events currently, thread will wait.
1122   If timeout specified in abstime parameter passes, the function returns NULL.
1123 
1124   @param current_thread - current worker thread
1125   @param thread_group - current thread group
1126   @param abstime - absolute wait timeout
1127 
1128   @return
1129   connection with pending event.
1130   NULL is returned if timeout has expired,or on shutdown.
1131 */
1132 
get_event(worker_thread_t * current_thread,thread_group_t * thread_group,struct timespec * abstime)1133 static connection_t *get_event(worker_thread_t *current_thread,
1134                                thread_group_t *thread_group,
1135                                struct timespec *abstime)
1136 {
1137   DBUG_ENTER("get_event");
1138   connection_t *connection = NULL;
1139   int err=0;
1140 
1141   mysql_mutex_lock(&thread_group->mutex);
1142   DBUG_ASSERT(thread_group->active_thread_count >= 0);
1143 
1144   for(;;)
1145   {
1146     bool oversubscribed = too_many_active_threads(thread_group);
1147     if (thread_group->shutdown)
1148      break;
1149 
1150     /* Check if queue is not empty */
1151     if (!oversubscribed)
1152     {
1153       connection = queue_get(thread_group);
1154       if(connection)
1155         break;
1156     }
1157 
1158     /* If there is  currently no listener in the group, become one. */
1159     if(!thread_group->listener)
1160     {
1161       thread_group->listener= current_thread;
1162       thread_group->active_thread_count--;
1163       mysql_mutex_unlock(&thread_group->mutex);
1164 
1165       connection = listener(current_thread, thread_group);
1166 
1167       mysql_mutex_lock(&thread_group->mutex);
1168       thread_group->active_thread_count++;
1169       /* There is no listener anymore, it just returned. */
1170       thread_group->listener= NULL;
1171       break;
1172     }
1173 
1174     /*
1175       Last thing we try before going to sleep is to
1176       pick a single event via epoll, without waiting (timeout 0)
1177     */
1178     if (!oversubscribed)
1179     {
1180       native_event nev;
1181       if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
1182       {
1183         thread_group->io_event_count++;
1184         connection = (connection_t *)native_event_get_userdata(&nev);
1185 
1186         /*
1187           Since we are going to perform an out-of-order event processing for the
1188           connection, first check whether it is eligible for high priority
1189           processing. We can get here even if there are queued events, so it
1190           must either have a high priority ticket, or there must be not too many
1191           busy threads (as if it was coming from a low priority queue).
1192         */
1193         if (connection_is_high_prio(connection))
1194           connection->tickets--;
1195         else if (too_many_busy_threads(thread_group))
1196         {
1197           /*
1198             Not eligible for high priority processing. Restore tickets and put
1199             it into the low priority queue.
1200           */
1201 
1202           connection->tickets=
1203             connection->thd->variables.threadpool_high_prio_tickets;
1204           thread_group->queue.push_back(connection);
1205           connection= NULL;
1206         }
1207 
1208         if (connection)
1209         {
1210           thread_group->queue_event_count++;
1211           break;
1212         }
1213       }
1214     }
1215 
1216     /* And now, finally sleep */
1217     current_thread->woken = false; /* wake() sets this to true */
1218 
1219     /*
1220       Add current thread to the head of the waiting list  and wait.
1221       It is important to add thread to the head rather than tail
1222       as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
1223     */
1224     thread_group->waiting_threads.push_front(current_thread);
1225 
1226     thread_group->active_thread_count--;
1227     if (abstime)
1228     {
1229       err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
1230                                  abstime);
1231     }
1232     else
1233     {
1234       err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
1235     }
1236     thread_group->active_thread_count++;
1237 
1238     if (!current_thread->woken)
1239     {
1240       /*
1241         Thread was not signalled by wake(), it might be a spurious wakeup or
1242         a timeout. Anyhow, we need to remove ourselves from the list now.
1243         If thread was explicitly woken, than caller removed us from the list.
1244       */
1245       thread_group->waiting_threads.remove(current_thread);
1246     }
1247 
1248     if (err)
1249       break;
1250   }
1251 
1252   thread_group->stalled= false;
1253   mysql_mutex_unlock(&thread_group->mutex);
1254 
1255   DBUG_RETURN(connection);
1256 }
1257 
1258 
1259 
1260 /**
1261   Tells the pool that worker starts waiting  on IO, lock, condition,
1262   sleep() or similar.
1263 */
1264 
wait_begin(thread_group_t * thread_group)1265 static void wait_begin(thread_group_t *thread_group)
1266 {
1267   DBUG_ENTER("wait_begin");
1268   mysql_mutex_lock(&thread_group->mutex);
1269   thread_group->active_thread_count--;
1270   thread_group->waiting_thread_count++;
1271 
1272   DBUG_ASSERT(thread_group->active_thread_count >=0);
1273   DBUG_ASSERT(thread_group->connection_count > 0);
1274 
1275 #ifdef THREADPOOL_CREATE_THREADS_ON_WAIT
1276   if ((thread_group->active_thread_count == 0) &&
1277       (!queues_are_empty(thread_group) || !thread_group->listener))
1278   {
1279     /*
1280       Group might stall while this thread waits, thus wake
1281       or create a worker to prevent stall.
1282     */
1283     wake_or_create_thread(thread_group);
1284   }
1285 #endif
1286 
1287   mysql_mutex_unlock(&thread_group->mutex);
1288   DBUG_VOID_RETURN;
1289 }
1290 
1291 /**
1292   Tells the pool has finished waiting.
1293 */
1294 
wait_end(thread_group_t * thread_group)1295 static void wait_end(thread_group_t *thread_group)
1296 {
1297   DBUG_ENTER("wait_end");
1298   mysql_mutex_lock(&thread_group->mutex);
1299   thread_group->active_thread_count++;
1300   thread_group->waiting_thread_count--;
1301   mysql_mutex_unlock(&thread_group->mutex);
1302   DBUG_VOID_RETURN;
1303 }
1304 
1305 
1306 /**
1307   Allocate/initialize a new connection structure.
1308 */
1309 
alloc_connection(THD * thd)1310 static connection_t *alloc_connection(THD *thd)
1311 {
1312   DBUG_ENTER("alloc_connection");
1313 
1314   connection_t* connection = (connection_t *)my_malloc(sizeof(connection_t),0);
1315   if (connection)
1316   {
1317     connection->thd = thd;
1318     connection->waiting= false;
1319     connection->logged_in= false;
1320     connection->bound_to_poll_descriptor= false;
1321     connection->abs_wait_timeout= ULONGLONG_MAX;
1322     connection->tickets = 0;
1323   }
1324   DBUG_RETURN(connection);
1325 }
1326 
1327 
1328 
1329 /**
1330   Add a new connection to thread pool..
1331 */
1332 
tp_add_connection(THD * thd)1333 void tp_add_connection(THD *thd)
1334 {
1335   DBUG_ENTER("tp_add_connection");
1336 
1337   add_global_thread(thd);
1338   mysql_mutex_unlock(&LOCK_thread_count);
1339   connection_t *connection= alloc_connection(thd);
1340   if (connection)
1341   {
1342     thd->event_scheduler.data= connection;
1343 
1344     /* Assign connection to a group. */
1345     thread_group_t *group=
1346       &all_groups[thd->thread_id%group_count];
1347 
1348     connection->thread_group=group;
1349 
1350     mysql_mutex_lock(&group->mutex);
1351     group->connection_count++;
1352     mysql_mutex_unlock(&group->mutex);
1353 
1354     /*
1355        Add connection to the work queue.Actual logon
1356        will be done by a worker thread.
1357     */
1358     queue_put(group, connection);
1359   }
1360   else
1361   {
1362     /* Allocation failed */
1363     threadpool_remove_connection(thd);
1364   }
1365   DBUG_VOID_RETURN;
1366 }
1367 
1368 
1369 /**
1370   Terminate connection.
1371 */
1372 
connection_abort(connection_t * connection)1373 static void connection_abort(connection_t *connection)
1374 {
1375   DBUG_ENTER("connection_abort");
1376   thread_group_t *group= connection->thread_group;
1377 
1378   threadpool_remove_connection(connection->thd);
1379 
1380   mysql_mutex_lock(&group->mutex);
1381   group->connection_count--;
1382   mysql_mutex_unlock(&group->mutex);
1383 
1384   my_free(connection);
1385   DBUG_VOID_RETURN;
1386 }
1387 
1388 
1389 /**
1390   MySQL scheduler callback : kill connection
1391 */
1392 
tp_post_kill_notification(THD * thd)1393 void tp_post_kill_notification(THD *thd)
1394 {
1395   DBUG_ENTER("tp_post_kill_notification");
1396   if (current_thd == thd || thd->system_thread)
1397     DBUG_VOID_RETURN;
1398 
1399   if (thd->net.vio)
1400     vio_cancel(thd->net.vio, SHUT_RD);
1401   DBUG_VOID_RETURN;
1402 }
1403 
1404 /**
1405   MySQL scheduler callback: wait begin
1406 */
1407 
tp_wait_begin(THD * thd,int type)1408 void tp_wait_begin(THD *thd, int type)
1409 {
1410   DBUG_ENTER("tp_wait_begin");
1411   DBUG_ASSERT(thd);
1412   connection_t *connection = (connection_t *)thd->event_scheduler.data;
1413   if (connection)
1414   {
1415     DBUG_ASSERT(!connection->waiting);
1416     connection->waiting= true;
1417     wait_begin(connection->thread_group);
1418   }
1419   DBUG_VOID_RETURN;
1420 }
1421 
1422 
1423 /**
1424   MySQL scheduler callback: wait end
1425 */
1426 
tp_wait_end(THD * thd)1427 void tp_wait_end(THD *thd)
1428 {
1429   DBUG_ENTER("tp_wait_end");
1430   DBUG_ASSERT(thd);
1431 
1432   connection_t *connection = (connection_t *)thd->event_scheduler.data;
1433   if (connection)
1434   {
1435     DBUG_ASSERT(connection->waiting);
1436     connection->waiting = false;
1437     wait_end(connection->thread_group);
1438   }
1439   DBUG_VOID_RETURN;
1440 }
1441 
1442 
set_next_timeout_check(ulonglong abstime)1443 static void set_next_timeout_check(ulonglong abstime)
1444 {
1445   DBUG_ENTER("set_next_timeout_check");
1446   while(abstime < pool_timer.next_timeout_check)
1447   {
1448     longlong old= (longlong)pool_timer.next_timeout_check;
1449     my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
1450           &old, abstime);
1451   }
1452   DBUG_VOID_RETURN;
1453 }
1454 
1455 
1456 /**
1457   Set wait timeout for connection.
1458 */
1459 
set_wait_timeout(connection_t * c)1460 static void set_wait_timeout(connection_t *c)
1461 {
1462   DBUG_ENTER("set_wait_timeout");
1463   /*
1464     Calculate wait deadline for this connection.
1465     Instead of using my_microsecond_getsystime() which has a syscall
1466     overhead, use pool_timer.current_microtime and take
1467     into account that its value could be off by at most
1468     one tick interval.
1469   */
1470 
1471   c->abs_wait_timeout= pool_timer.current_microtime +
1472     1000LL*pool_timer.tick_interval +
1473     1000000LL*c->thd->get_wait_timeout();
1474 
1475   set_next_timeout_check(c->abs_wait_timeout);
1476   DBUG_VOID_RETURN;
1477 }
1478 
1479 
1480 
1481 /**
1482   Handle a (rare) special case,where connection needs to
1483   migrate to a different group because group_count has changed
1484   after thread_pool_size setting.
1485 */
1486 
change_group(connection_t * c,thread_group_t * old_group,thread_group_t * new_group)1487 static int change_group(connection_t *c,
1488  thread_group_t *old_group,
1489  thread_group_t *new_group)
1490 {
1491   int ret= 0;
1492   int fd = mysql_socket_getfd(c->thd->net.vio->mysql_socket);
1493 
1494   DBUG_ASSERT(c->thread_group == old_group);
1495 
1496   /* Remove connection from the old group. */
1497   mysql_mutex_lock(&old_group->mutex);
1498   if (c->bound_to_poll_descriptor)
1499   {
1500     io_poll_disassociate_fd(old_group->pollfd,fd);
1501     c->bound_to_poll_descriptor= false;
1502   }
1503   c->thread_group->connection_count--;
1504   mysql_mutex_unlock(&old_group->mutex);
1505 
1506   /* Add connection to the new group. */
1507   mysql_mutex_lock(&new_group->mutex);
1508   c->thread_group= new_group;
1509   new_group->connection_count++;
1510   /* Ensure that there is a listener in the new group. */
1511   if (!new_group->thread_count)
1512     ret= create_worker(new_group);
1513   mysql_mutex_unlock(&new_group->mutex);
1514   return ret;
1515 }
1516 
1517 
start_io(connection_t * connection)1518 static int start_io(connection_t *connection)
1519 {
1520   int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket);
1521 
1522   /*
1523     Usually, connection will stay in the same group for the entire
1524     connection's life. However, we do allow group_count to
1525     change at runtime, which means in rare cases when it changes is
1526     connection should need to migrate  to another group, this ensures
1527     to ensure equal load between groups.
1528 
1529     So we recalculate in which group the connection should be, based
1530     on thread_id and current group count, and migrate if necessary.
1531   */
1532   thread_group_t *group =
1533     &all_groups[connection->thd->thread_id%group_count];
1534 
1535   if (group != connection->thread_group)
1536   {
1537     if (change_group(connection, connection->thread_group, group))
1538       return -1;
1539   }
1540 
1541   /*
1542     Bind to poll descriptor if not yet done.
1543   */
1544   if (!connection->bound_to_poll_descriptor)
1545   {
1546     connection->bound_to_poll_descriptor= true;
1547     return io_poll_associate_fd(group->pollfd, fd, connection);
1548   }
1549 
1550   return io_poll_start_read(group->pollfd, fd, connection);
1551 }
1552 
1553 
1554 
handle_event(connection_t * connection)1555 static void handle_event(connection_t *connection)
1556 {
1557 
1558   DBUG_ENTER("handle_event");
1559   int err;
1560 
1561   if (!connection->logged_in)
1562   {
1563     err= threadpool_add_connection(connection->thd);
1564     connection->logged_in= true;
1565   }
1566   else
1567   {
1568     err= threadpool_process_request(connection->thd);
1569   }
1570 
1571   if(err)
1572     goto end;
1573 
1574   set_wait_timeout(connection);
1575   err= start_io(connection);
1576 
1577 end:
1578   if (err)
1579     connection_abort(connection);
1580 
1581   DBUG_VOID_RETURN;
1582 }
1583 
1584 
1585 
1586 /**
1587   Worker thread's main
1588 */
1589 
worker_main(void * param)1590 static void *worker_main(void *param)
1591 {
1592 
1593   worker_thread_t this_thread;
1594   pthread_detach_this_thread();
1595   my_thread_init();
1596 
1597   DBUG_ENTER("worker_main");
1598 
1599   thread_group_t *thread_group = (thread_group_t *)param;
1600 
1601   /* Init per-thread structure */
1602   mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
1603   this_thread.thread_group= thread_group;
1604   this_thread.event_count=0;
1605 
1606 #ifdef HAVE_PSI_THREAD_INTERFACE
1607     PSI_THREAD_CALL(set_thread_user_host)
1608       (NULL, 0, NULL, 0);
1609 #endif
1610 
1611   /* Run event loop */
1612   for(;;)
1613   {
1614     connection_t *connection;
1615     struct timespec ts;
1616     set_timespec(ts,threadpool_idle_timeout);
1617     connection = get_event(&this_thread, thread_group, &ts);
1618     if (!connection)
1619       break;
1620     this_thread.event_count++;
1621     handle_event(connection);
1622   }
1623 
1624   /* Thread shutdown: cleanup per-worker-thread structure. */
1625   mysql_cond_destroy(&this_thread.cond);
1626 
1627   bool last_thread;                    /* last thread in group exits */
1628   mysql_mutex_lock(&thread_group->mutex);
1629   add_thread_count(thread_group, -1);
1630   last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
1631   mysql_mutex_unlock(&thread_group->mutex);
1632 
1633   /* Last thread in group exits and pool is terminating, destroy group.*/
1634   if (last_thread)
1635     thread_group_destroy(thread_group);
1636 
1637   my_thread_end();
1638   return NULL;
1639 }
1640 
1641 
tp_init()1642 bool tp_init()
1643 {
1644   DBUG_ENTER("tp_init");
1645   threadpool_started= true;
1646   scheduler_init();
1647 
1648   for(uint i=0; i < array_elements(all_groups); i++)
1649   {
1650     thread_group_init(&all_groups[i], get_connection_attrib());
1651   }
1652   tp_set_threadpool_size(threadpool_size);
1653   if(group_count == 0)
1654   {
1655     /* Something went wrong */
1656     sql_print_error("Can't set threadpool size to %d",threadpool_size);
1657     DBUG_RETURN(1);
1658   }
1659 #ifdef HAVE_PSI_INTERFACE
1660   PSI_register(mutex);
1661   PSI_register(cond);
1662   PSI_register(thread);
1663 #endif
1664 
1665   pool_timer.tick_interval= threadpool_stall_limit;
1666   start_timer(&pool_timer);
1667   DBUG_RETURN(0);
1668 }
1669 
1670 
tp_end()1671 void tp_end()
1672 {
1673   DBUG_ENTER("tp_end");
1674 
1675   if (!threadpool_started)
1676     DBUG_VOID_RETURN;
1677 
1678   stop_timer(&pool_timer);
1679   for(uint i=0; i< array_elements(all_groups); i++)
1680   {
1681     thread_group_close(&all_groups[i]);
1682   }
1683   threadpool_started= false;
1684   DBUG_VOID_RETURN;
1685 }
1686 
1687 
1688 /** Ensure that poll descriptors are created when threadpool_size changes */
1689 
tp_set_threadpool_size(uint size)1690 void tp_set_threadpool_size(uint size)
1691 {
1692   bool success= true;
1693   if (!threadpool_started)
1694     return;
1695 
1696   for(uint i=0; i< size; i++)
1697   {
1698     thread_group_t *group= &all_groups[i];
1699     mysql_mutex_lock(&group->mutex);
1700     if (group->pollfd == -1)
1701     {
1702       group->pollfd= io_poll_create();
1703       success= (group->pollfd >= 0);
1704       if(!success)
1705       {
1706         sql_print_error("io_poll_create() failed, errno=%d\n", errno);
1707         break;
1708       }
1709     }
1710     mysql_mutex_unlock(&all_groups[i].mutex);
1711     if (!success)
1712     {
1713       group_count= i;
1714       return;
1715     }
1716   }
1717   group_count= size;
1718 }
1719 
tp_set_threadpool_stall_limit(uint limit)1720 void tp_set_threadpool_stall_limit(uint limit)
1721 {
1722   if (!threadpool_started)
1723     return;
1724   mysql_mutex_lock(&(pool_timer.mutex));
1725   pool_timer.tick_interval= limit;
1726   mysql_mutex_unlock(&(pool_timer.mutex));
1727   mysql_cond_signal(&(pool_timer.cond));
1728 }
1729 
1730 
1731 /**
1732  Calculate number of idle/waiting threads in the pool.
1733 
1734  Sum idle threads over all groups.
1735  Don't do any locking, it is not required for stats.
1736 */
1737 
tp_get_idle_thread_count()1738 int tp_get_idle_thread_count()
1739 {
1740   int sum=0;
1741   for(uint i= 0;
1742       i< array_elements(all_groups) && (all_groups[i].pollfd >= 0);
1743       i++)
1744   {
1745     sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
1746   }
1747   return sum;
1748 }
1749 
1750 
1751 /* Report threadpool problems */
1752 
1753 /**
1754    Delay in microseconds, after which "pool blocked" message is printed.
1755    (30 sec == 30 Mio usec)
1756 */
1757 #define BLOCK_MSG_DELAY 30*1000000
1758 
1759 #define MAX_THREADS_REACHED_MSG \
1760 "Threadpool could not create additional thread to handle queries, because the \
1761 number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
1762 parameter can help in this situation.\n \
1763 If 'extra_port' parameter is set, you can still connect to the database with \
1764 superuser account (it must be TCP connection using extra_port as TCP port) \
1765 and troubleshoot the situation. \
1766 A likely cause of pool blocks are clients that lock resources for long time. \
1767 'show processlist' or 'show engine innodb status' can give additional hints."
1768 
1769 #define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
1770 
1771 /**
1772  Write a message when blocking situation in threadpool occurs.
1773  The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
1774  It will be just a single message for each blocking situation (to prevent
1775  log flood).
1776 */
1777 
print_pool_blocked_message(bool max_threads_reached)1778 static void print_pool_blocked_message(bool max_threads_reached)
1779 {
1780   ulonglong now= my_microsecond_getsystime();
1781   static bool msg_written;
1782 
1783   if (pool_block_start == 0)
1784   {
1785     pool_block_start= now;
1786     msg_written= false;
1787   }
1788 
1789   if (!msg_written
1790       && ((now > pool_block_start + BLOCK_MSG_DELAY)
1791           || (now == pool_block_start)))
1792   {
1793     if (max_threads_reached)
1794       sql_print_error(MAX_THREADS_REACHED_MSG);
1795     else
1796       sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno);
1797 
1798     if (now > pool_block_start)
1799     {
1800       sql_print_information("Threadpool has been blocked for %u seconds\n",
1801                             (uint)((now - pool_block_start)/1000000));
1802     }
1803     /* avoid reperated messages for the same blocking situation */
1804     msg_written= true;
1805   }
1806 }
1807