1 /* Copyright (C) 2019, 2020, MariaDB Corporation.
2 
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6 
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11 
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15 
16 #include "tpool_structs.h"
17 #include <limits.h>
18 #include <algorithm>
19 #include <assert.h>
20 #include <atomic>
21 #include <chrono>
22 #include <condition_variable>
23 #include <iostream>
24 #include <limits.h>
25 #include <mutex>
26 #include <queue>
27 #include <stack>
28 #include <thread>
29 #include <vector>
30 #include "tpool.h"
31 #include <assert.h>
32 #include <my_global.h>
33 #include <my_dbug.h>
34 #include <thr_timer.h>
35 #include <stdlib.h>
36 
37 namespace tpool
38 {
39 
40 #ifdef __linux__
41   extern aio* create_linux_aio(thread_pool* tp, int max_io);
42 #endif
43 #ifdef _WIN32
44   extern aio* create_win_aio(thread_pool* tp, int max_io);
45 #endif
46 
47   static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
48   static const int  OVERSUBSCRIBE_FACTOR = 2;
49 
50 /**
51   Implementation of generic threadpool.
52   This threadpool consists of the following components
53 
54   - The task queue. This queue is populated by submit()
55   - Worker that execute the  work items.
56   - Timer thread that takes care of pool health
57 
58   The task queue is populated by submit() method.
59   on submit(), a worker thread  can be woken, or created
60   to execute tasks.
61 
62   The timer thread watches if work items  are being dequeued, and if not,
63   this can indicate potential deadlock.
64   Thus the timer thread can also wake or create a thread, to ensure some progress.
65 
66   Optimizations:
67 
68   - worker threads that are idle for long time will shutdown.
69   - worker threads are woken in LIFO order, which minimizes context switching
70   and also ensures that idle timeout works well. LIFO wakeup order ensures
71   that active threads stay active, and idle ones stay idle.
72 
73 */
74 
75 /**
76  Worker wakeup flags.
77 */
78 enum worker_wake_reason
79 {
80   WAKE_REASON_NONE,
81   WAKE_REASON_TASK,
82   WAKE_REASON_SHUTDOWN
83 };
84 
85 
86 
87 /* A per-worker  thread structure.*/
MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE)88 struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE)  worker_data
89 {
90   /** Condition variable to wakeup this worker.*/
91   std::condition_variable m_cv;
92 
93   /** Reason why worker was woken. */
94   worker_wake_reason m_wake_reason;
95 
96   /**
97     If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
98   */
99   task* m_task;
100 
101   /** Struct is member of intrusive doubly linked list */
102   worker_data* m_prev;
103   worker_data* m_next;
104 
105   /* Current state of the worker.*/
106   enum state
107   {
108     NONE = 0,
109     EXECUTING_TASK = 1,
110     LONG_TASK = 2,
111     WAITING = 4
112   };
113 
114   int m_state;
115 
116   bool is_executing_task()
117   {
118     return m_state & EXECUTING_TASK;
119   }
120   bool is_long_task()
121   {
122     return m_state & LONG_TASK;
123   }
124   bool is_waiting()
125   {
126     return m_state & WAITING;
127   }
128   std::chrono::system_clock::time_point m_task_start_time;
129   worker_data() :
130     m_cv(),
131     m_wake_reason(WAKE_REASON_NONE),
132     m_task(),
133     m_prev(),
134     m_next(),
135     m_state(NONE),
136     m_task_start_time()
137   {}
138 
139   /*Define custom new/delete because of overaligned structure. */
140   void* operator new(size_t size)
141   {
142 #ifdef _WIN32
143     return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
144 #else
145     void* ptr;
146     int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size);
147     return ret ? 0 : ptr;
148 #endif
149   }
150   void operator delete(void* p)
151   {
152 #ifdef _WIN32
153     _aligned_free(p);
154 #else
155     free(p);
156 #endif
157   }
158 };
159 
160 
161 static thread_local worker_data* tls_worker_data;
162 
163 class thread_pool_generic : public thread_pool
164 {
165   /** Cache for per-worker structures */
166   cache<worker_data> m_thread_data_cache;
167 
168   /** The task queue */
169   circular_queue<task*> m_task_queue;
170 
171   /** List of standby (idle) workers */
172   doubly_linked_list<worker_data> m_standby_threads;
173 
174   /** List of threads that are executing tasks */
175   doubly_linked_list<worker_data> m_active_threads;
176 
177   /* Mutex that protects the whole struct, most importantly
178   the standby threads list, and task queue */
179   std::mutex m_mtx;
180 
181   /** Timeout after which idle worker shuts down */
182   std::chrono::milliseconds m_thread_timeout;
183 
184   /** How often should timer wakeup.*/
185   std::chrono::milliseconds m_timer_interval;
186 
187   /** Another condition variable, used in pool shutdown */
188   std::condition_variable m_cv_no_threads;
189 
190   /** Condition variable for the timer thread. Signaled on shutdown. */
191   std::condition_variable m_cv_timer;
192 
193   /** Overall number of enqueues*/
194   unsigned long long m_tasks_enqueued;
195   unsigned long long m_group_enqueued;
196   /** Overall number of dequeued tasks. */
197   unsigned long long m_tasks_dequeued;
198 
199   /** Statistic related, number of worker thread wakeups */
200   int m_wakeups;
201 
202   /**
203   Statistic related, number of spurious thread wakeups
204   (i.e thread woke up, and the task queue is empty)
205   */
206   int m_spurious_wakeups;
207 
208   /** The desired concurrency.  This number of workers should be
209   actively executing. */
210   unsigned int m_concurrency;
211 
212   /** True, if threadpool is being shutdown, false otherwise */
213   bool m_in_shutdown;
214 
215   /** Maintenance timer state : true = active(ON),false = inactive(OFF)*/
216   enum class timer_state_t
217   {
218     OFF, ON
219   };
220   timer_state_t m_timer_state= timer_state_t::OFF;
221   void switch_timer(timer_state_t state);
222 
223   /* Updates idle_since, and maybe switches the timer off */
224   void check_idle(std::chrono::system_clock::time_point now);
225 
226   /** time point when timer last ran, used as a coarse clock. */
227   std::chrono::system_clock::time_point m_timestamp;
228 
229   /** Number of long running tasks. The long running tasks are excluded when
230   adjusting concurrency */
231   unsigned int m_long_tasks_count;
232 
233   unsigned int m_waiting_task_count;
234 
235   /** Last time thread was created*/
236   std::chrono::system_clock::time_point m_last_thread_creation;
237 
238   /** Minimumum number of threads in this pool.*/
239   unsigned int m_min_threads;
240 
241   /** Maximimum number of threads in this pool. */
242   unsigned int m_max_threads;
243 
244   /* maintenance related statistics (see maintenance()) */
245   size_t m_last_thread_count;
246   unsigned long long m_last_activity;
247 
248   void worker_main(worker_data *thread_data);
249   void worker_end(worker_data* thread_data);
250 
251   /* Checks threadpool responsiveness, adjusts thread_counts */
252   void maintenance();
maintenance_func(void * arg)253   static void maintenance_func(void* arg)
254   {
255     ((thread_pool_generic *)arg)->maintenance();
256   }
257   bool add_thread();
258   bool wake(worker_wake_reason reason, task *t = nullptr);
259   void maybe_wake_or_create_thread();
260   bool too_many_active_threads();
261   bool get_task(worker_data *thread_var, task **t);
262   bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
263                       worker_data *thread_var);
264   void cancel_pending(task* t);
265 
thread_count()266   size_t thread_count()
267   {
268     return m_active_threads.size() + m_standby_threads.size();
269   }
270 public:
271   thread_pool_generic(int min_threads, int max_threads);
272   ~thread_pool_generic();
273   void wait_begin() override;
274   void wait_end() override;
275   void submit_task(task *task) override;
create_native_aio(int max_io)276   virtual aio *create_native_aio(int max_io) override
277   {
278 #ifdef _WIN32
279     return create_win_aio(this, max_io);
280 #elif defined(__linux__)
281     return create_linux_aio(this,max_io);
282 #else
283     return nullptr;
284 #endif
285   }
286 
287   class timer_generic : public thr_timer_t, public timer
288   {
289     thread_pool_generic* m_pool;
290     waitable_task m_task;
291     callback_func m_callback;
292     void* m_data;
293     int m_period;
294     std::mutex m_mtx;
295     bool m_on;
296     std::atomic<bool> m_running;
297 
run()298     void run()
299     {
300       /*
301         In rare cases, multiple callbacks can be scheduled,
302         e.g with set_time(0,0) in a loop.
303         We do not allow parallel execution, as user is not prepared.
304       */
305       bool expected = false;
306       if (!m_running.compare_exchange_strong(expected, true))
307         return;
308 
309       m_callback(m_data);
310       dbug_execute_after_task_callback();
311       m_running = false;
312 
313       if (m_pool && m_period)
314       {
315         std::unique_lock<std::mutex> lk(m_mtx);
316         if (m_on)
317         {
318           DBUG_PUSH_EMPTY;
319           thr_timer_end(this);
320           thr_timer_settime(this, 1000ULL * m_period);
321           DBUG_POP_EMPTY;
322         }
323       }
324     }
325 
execute(void * arg)326     static void execute(void* arg)
327     {
328       auto timer = (timer_generic*)arg;
329       timer->run();
330     }
331 
submit_task(void * arg)332     static void submit_task(void* arg)
333     {
334       timer_generic* timer = (timer_generic*)arg;
335       timer->m_pool->submit_task(&timer->m_task);
336     }
337 
338   public:
timer_generic(callback_func func,void * data,thread_pool_generic * pool)339     timer_generic(callback_func func, void* data, thread_pool_generic * pool):
340       m_pool(pool),
341       m_task(timer_generic::execute,this),
342       m_callback(func),m_data(data),m_period(0),m_mtx(),
343       m_on(true),m_running()
344     {
345       if (pool)
346       {
347         /* EXecute callback in threadpool*/
348         thr_timer_init(this, submit_task, this);
349       }
350       else
351       {
352         /* run in "timer" thread */
353         thr_timer_init(this, m_task.get_func(), m_task.get_arg());
354       }
355     }
356 
set_time(int initial_delay_ms,int period_ms)357     void set_time(int initial_delay_ms, int period_ms) override
358     {
359       std::unique_lock<std::mutex> lk(m_mtx);
360       if (!m_on)
361         return;
362       thr_timer_end(this);
363       if (!m_pool)
364         thr_timer_set_period(this, 1000ULL * period_ms);
365       else
366         m_period = period_ms;
367       thr_timer_settime(this, 1000ULL * initial_delay_ms);
368     }
369 
370     /*
371       Change only period of a periodic timer
372       (after the next execution). Workarounds
373       mysys timer deadlocks
374     */
set_period(int period_ms)375     void set_period(int period_ms)
376     {
377       std::unique_lock<std::mutex> lk(m_mtx);
378       if (!m_on)
379         return;
380       if (!m_pool)
381        thr_timer_set_period(this, 1000ULL * period_ms);
382       else
383          m_period = period_ms;
384     }
385 
disarm()386     void disarm() override
387     {
388       std::unique_lock<std::mutex> lk(m_mtx);
389       m_on = false;
390       thr_timer_end(this);
391       lk.unlock();
392 
393       if (m_task.m_group)
394       {
395         m_task.m_group->cancel_pending(&m_task);
396       }
397       if (m_pool)
398       {
399         m_pool->cancel_pending(&m_task);
400       }
401       m_task.wait();
402     }
403 
~timer_generic()404     virtual ~timer_generic()
405     {
406       disarm();
407     }
408   };
409   timer_generic m_maintenance_timer;
create_timer(callback_func func,void * data)410   virtual timer* create_timer(callback_func func, void *data) override
411   {
412     return new timer_generic(func, data, this);
413   }
414 };
415 
cancel_pending(task * t)416 void thread_pool_generic::cancel_pending(task* t)
417 {
418   std::unique_lock <std::mutex> lk(m_mtx);
419   for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
420   {
421     if (*it == t)
422     {
423       t->release();
424       *it = nullptr;
425     }
426   }
427 }
428 /**
429   Register worker in standby list, and wait to be woken.
430 
431   @retval true  if thread was woken
432   @retval false idle wait timeout exceeded (the current thread must shutdown)
433 */
wait_for_tasks(std::unique_lock<std::mutex> & lk,worker_data * thread_data)434 bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
435                                          worker_data *thread_data)
436 {
437   assert(m_task_queue.empty());
438   assert(!m_in_shutdown);
439 
440   thread_data->m_wake_reason= WAKE_REASON_NONE;
441   m_active_threads.erase(thread_data);
442   m_standby_threads.push_back(thread_data);
443 
444   for (;;)
445   {
446     thread_data->m_cv.wait_for(lk, m_thread_timeout);
447 
448     if (thread_data->m_wake_reason != WAKE_REASON_NONE)
449     {
450       /* Woke up not due to timeout.*/
451       return true;
452     }
453 
454     if (thread_count() <= m_min_threads)
455     {
456       /* Do not shutdown thread, maintain required minimum of worker
457         threads.*/
458       continue;
459     }
460 
461     /*
462       Woke up due to timeout, remove this thread's  from the standby list. In
463       all other cases where it is signaled it is removed by the signaling
464       thread.
465     */
466     m_standby_threads.erase(thread_data);
467     m_active_threads.push_back(thread_data);
468     return false;
469   }
470 }
471 
472 
473 /**
474  Workers "get next task" routine.
475 
476  A task can be handed over to the current thread directly during submit().
477  if thread_var->m_wake_reason == WAKE_REASON_TASK.
478 
479  Or a task can be taken from the task queue.
480  In case task queue is empty, the worker thread will park (wait for wakeup).
481 */
get_task(worker_data * thread_var,task ** t)482 bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
483 {
484   std::unique_lock<std::mutex> lk(m_mtx);
485 
486   if (thread_var->is_long_task())
487   {
488     DBUG_ASSERT(m_long_tasks_count);
489     m_long_tasks_count--;
490   }
491   DBUG_ASSERT(!thread_var->is_waiting());
492   thread_var->m_state = worker_data::NONE;
493 
494   while (m_task_queue.empty())
495   {
496     if (m_in_shutdown)
497       return false;
498 
499     if (!wait_for_tasks(lk, thread_var))
500       return false;
501     if (m_task_queue.empty())
502     {
503       m_spurious_wakeups++;
504       continue;
505     }
506   }
507 
508   /* Dequeue from the task queue.*/
509   *t= m_task_queue.front();
510   m_task_queue.pop();
511   m_tasks_dequeued++;
512   thread_var->m_state |= worker_data::EXECUTING_TASK;
513   thread_var->m_task_start_time = m_timestamp;
514   return true;
515 }
516 
517 /** Worker thread shutdown routine. */
worker_end(worker_data * thread_data)518 void thread_pool_generic::worker_end(worker_data* thread_data)
519 {
520   std::lock_guard<std::mutex> lk(m_mtx);
521   DBUG_ASSERT(!thread_data->is_long_task());
522   m_active_threads.erase(thread_data);
523   m_thread_data_cache.put(thread_data);
524 
525   if (!thread_count() && m_in_shutdown)
526   {
527     /* Signal the destructor that no more threads are left. */
528     m_cv_no_threads.notify_all();
529   }
530 }
531 
532 extern "C" void set_tls_pool(tpool::thread_pool* pool);
533 
534 /* The worker get/execute task loop.*/
worker_main(worker_data * thread_var)535 void thread_pool_generic::worker_main(worker_data *thread_var)
536 {
537   task* task;
538   set_tls_pool(this);
539   if(m_worker_init_callback)
540    m_worker_init_callback();
541 
542   tls_worker_data = thread_var;
543 
544   while (get_task(thread_var, &task) && task)
545   {
546     task->execute();
547   }
548 
549   if (m_worker_destroy_callback)
550     m_worker_destroy_callback();
551 
552   worker_end(thread_var);
553 }
554 
555 
556 /*
557   Check if threadpool had been idle for a while
558   Switch off maintenance timer if it is in idle state
559   for too long.
560 
561   Helper function, to be used inside maintenance callback,
562   before m_last_activity is updated
563 */
564 
565 static const auto invalid_timestamp=  std::chrono::system_clock::time_point::max();
566 constexpr auto max_idle_time= std::chrono::minutes(1);
567 
568 /* Time since maintenance timer had nothing to do */
569 static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
check_idle(std::chrono::system_clock::time_point now)570 void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
571 {
572   DBUG_ASSERT(m_task_queue.empty());
573 
574   /*
575    We think that there is no activity, if there were at most 2 tasks
576    since last time, and there is a spare thread.
577    The 2 tasks (and not 0) is to account for some periodic timers.
578   */
579   bool idle= m_standby_threads.m_count > 0;
580 
581   if (!idle)
582   {
583     idle_since= invalid_timestamp;
584     return;
585   }
586 
587   if (idle_since == invalid_timestamp)
588   {
589     idle_since= now;
590     return;
591   }
592 
593   /* Switch timer off after 1 minute of idle time */
594   if (now - idle_since > max_idle_time)
595   {
596     idle_since= invalid_timestamp;
597     switch_timer(timer_state_t::OFF);
598   }
599 }
600 
601 
602 /*
603   Periodic job to fix thread count and concurrency,
604   in case of long tasks, etc
605 */
maintenance()606 void thread_pool_generic::maintenance()
607 {
608   /*
609     If pool is busy (i.e the its mutex is currently locked), we can
610     skip the maintenance task, some times, to lower mutex contention
611   */
612   static int skip_counter;
613   const int MAX_SKIPS = 10;
614   std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
615   if (skip_counter == MAX_SKIPS)
616   {
617     lk.lock();
618   }
619   else if (!lk.try_lock())
620   {
621     skip_counter++;
622     return;
623   }
624 
625   skip_counter = 0;
626 
627   m_timestamp = std::chrono::system_clock::now();
628 
629   if (m_task_queue.empty())
630   {
631     check_idle(m_timestamp);
632     m_last_activity = m_tasks_dequeued + m_wakeups;
633     return;
634   }
635 
636   m_long_tasks_count = 0;
637   for (auto thread_data = m_active_threads.front();
638     thread_data;
639     thread_data = thread_data->m_next)
640   {
641     if (thread_data->is_executing_task() &&
642        !thread_data->is_waiting() &&
643       (thread_data->is_long_task()
644       || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
645     {
646       thread_data->m_state |= worker_data::LONG_TASK;
647       m_long_tasks_count++;
648     }
649   }
650 
651   maybe_wake_or_create_thread();
652 
653   size_t thread_cnt = (int)thread_count();
654   if (m_last_activity == m_tasks_dequeued + m_wakeups &&
655       m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
656   {
657     // no progress made since last iteration. create new
658     // thread
659     add_thread();
660   }
661   m_last_activity = m_tasks_dequeued + m_wakeups;
662   m_last_thread_count= thread_cnt;
663 }
664 
665 /*
666   Heuristic used for thread creation throttling.
667   Returns interval in milliseconds between thread creation
668   (depending on number of threads already in the pool, and
669   desired concurrency level)
670 */
throttling_interval_ms(size_t n_threads,size_t concurrency)671 static int  throttling_interval_ms(size_t n_threads,size_t concurrency)
672 {
673   if (n_threads < concurrency*4)
674     return 0;
675 
676   if (n_threads < concurrency*8)
677     return 50;
678 
679   if (n_threads < concurrency*16)
680     return 100;
681 
682   return 200;
683 }
684 
685 /* Create a new worker.*/
add_thread()686 bool thread_pool_generic::add_thread()
687 {
688   size_t n_threads = thread_count();
689 
690   if (n_threads >= m_max_threads)
691     return false;
692 
693   if (n_threads >= m_min_threads)
694   {
695     auto now = std::chrono::system_clock::now();
696     if (now - m_last_thread_creation <
697       std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
698     {
699       /*
700         Throttle thread creation and wakeup deadlock detection timer,
701         if is it off.
702       */
703       switch_timer(timer_state_t::ON);
704 
705       return false;
706     }
707   }
708 
709   worker_data *thread_data = m_thread_data_cache.get();
710   m_active_threads.push_back(thread_data);
711   try
712   {
713     std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
714     m_last_thread_creation = std::chrono::system_clock::now();
715     thread.detach();
716   }
717   catch (std::system_error& e)
718   {
719     m_active_threads.erase(thread_data);
720     m_thread_data_cache.put(thread_data);
721     static bool warning_written;
722     if (!warning_written)
723     {
724       fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
725         "current number of threads in pool %zu\n", e.what(), thread_count());
726       warning_written = true;
727     }
728     return false;
729   }
730   return true;
731 }
732 
733 /** Wake a standby thread, and hand the given task over to this thread. */
wake(worker_wake_reason reason,task *)734 bool thread_pool_generic::wake(worker_wake_reason reason, task *)
735 {
736   assert(reason != WAKE_REASON_NONE);
737 
738   if (m_standby_threads.empty())
739     return false;
740   auto var= m_standby_threads.back();
741   m_standby_threads.pop_back();
742   m_active_threads.push_back(var);
743   assert(var->m_wake_reason == WAKE_REASON_NONE);
744   var->m_wake_reason= reason;
745   var->m_cv.notify_one();
746   m_wakeups++;
747   return true;
748 }
749 
750 
thread_pool_generic(int min_threads,int max_threads)751 thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
752   m_thread_data_cache(max_threads),
753   m_task_queue(10000),
754   m_standby_threads(),
755   m_active_threads(),
756   m_mtx(),
757   m_thread_timeout(std::chrono::milliseconds(60000)),
758   m_timer_interval(std::chrono::milliseconds(400)),
759   m_cv_no_threads(),
760   m_cv_timer(),
761   m_tasks_enqueued(),
762   m_tasks_dequeued(),
763   m_wakeups(),
764   m_spurious_wakeups(),
765   m_concurrency(std::thread::hardware_concurrency()*2),
766   m_in_shutdown(),
767   m_timestamp(),
768   m_long_tasks_count(),
769   m_waiting_task_count(),
770   m_last_thread_creation(),
771   m_min_threads(min_threads),
772   m_max_threads(max_threads),
773   m_last_thread_count(),
774   m_last_activity(),
775   m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr)
776 {
777 
778   if (m_max_threads < m_concurrency)
779     m_concurrency = m_max_threads;
780   if (m_min_threads > m_concurrency)
781     m_concurrency = min_threads;
782   if (!m_concurrency)
783     m_concurrency = 1;
784 
785   // start the timer
786   m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
787 }
788 
789 
maybe_wake_or_create_thread()790 void thread_pool_generic::maybe_wake_or_create_thread()
791 {
792   if (m_task_queue.empty())
793     return;
794   DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count));
795   if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
796     return;
797   if (!m_standby_threads.empty())
798   {
799     wake(WAKE_REASON_TASK);
800   }
801   else
802   {
803     add_thread();
804   }
805 }
806 
too_many_active_threads()807 bool thread_pool_generic::too_many_active_threads()
808 {
809   return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
810     m_concurrency* OVERSUBSCRIBE_FACTOR;
811 }
812 
813 /** Submit a new task*/
submit_task(task * task)814 void thread_pool_generic::submit_task(task* task)
815 {
816   std::unique_lock<std::mutex> lk(m_mtx);
817   if (m_in_shutdown)
818     return;
819   task->add_ref();
820   m_tasks_enqueued++;
821   m_task_queue.push(task);
822   maybe_wake_or_create_thread();
823 }
824 
825 
826 /* Notify thread pool that current thread is going to wait */
wait_begin()827 void thread_pool_generic::wait_begin()
828 {
829   if (!tls_worker_data || tls_worker_data->is_long_task())
830     return;
831   std::unique_lock<std::mutex> lk(m_mtx);
832   if(tls_worker_data->is_long_task())
833   {
834     /*
835      Current task flag could have become "long-running"
836      while waiting for the lock, thus recheck.
837     */
838     return;
839   }
840   DBUG_ASSERT(!tls_worker_data->is_waiting());
841   tls_worker_data->m_state |= worker_data::WAITING;
842   m_waiting_task_count++;
843 
844   /* Maintain concurrency */
845   maybe_wake_or_create_thread();
846 }
847 
848 
wait_end()849 void thread_pool_generic::wait_end()
850 {
851   if (tls_worker_data && tls_worker_data->is_waiting())
852   {
853     std::unique_lock<std::mutex> lk(m_mtx);
854     tls_worker_data->m_state &= ~worker_data::WAITING;
855     m_waiting_task_count--;
856   }
857 }
858 
859 
switch_timer(timer_state_t state)860 void thread_pool_generic::switch_timer(timer_state_t state)
861 {
862   if (m_timer_state == state)
863     return;
864   /*
865     We can't use timer::set_time, because mysys timers are deadlock
866     prone.
867 
868     Instead, to switch off we increase the  timer period
869     and decrease period to switch on.
870 
871     This might introduce delays in thread creation when needed,
872     as period will only be changed when timer fires next time.
873     For this reason, we can't use very long periods for the "off" state.
874   */
875   m_timer_state= state;
876   long long period= (state == timer_state_t::OFF) ?
877      m_timer_interval.count()*10: m_timer_interval.count();
878 
879   m_maintenance_timer.set_period((int)period);
880 }
881 
882 
883 /**
884   Wake  up all workers, and wait until they are gone
885   Stop the timer.
886 */
~thread_pool_generic()887 thread_pool_generic::~thread_pool_generic()
888 {
889   /*
890     Stop AIO early.
891     This is needed to prevent AIO completion thread
892     from calling submit_task() on an object that is being destroyed.
893   */
894   m_aio.reset();
895 
896   /* Also stop the maintanence task early. */
897   m_maintenance_timer.disarm();
898 
899   std::unique_lock<std::mutex> lk(m_mtx);
900   m_in_shutdown= true;
901 
902   /* Wake up idle threads. */
903   while (wake(WAKE_REASON_SHUTDOWN))
904   {
905   }
906 
907   while (thread_count())
908   {
909     m_cv_no_threads.wait(lk);
910   }
911 
912   lk.unlock();
913 }
914 
create_thread_pool_generic(int min_threads,int max_threads)915 thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
916 {
917  return new thread_pool_generic(min_threads, max_threads);
918 }
919 
920 } // namespace tpool
921