1 /* Copyright (C) 2019, 2020, MariaDB Corporation.
2
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15
16 #include "tpool_structs.h"
17 #include <limits.h>
18 #include <algorithm>
19 #include <assert.h>
20 #include <atomic>
21 #include <chrono>
22 #include <condition_variable>
23 #include <iostream>
24 #include <limits.h>
25 #include <mutex>
26 #include <queue>
27 #include <stack>
28 #include <thread>
29 #include <vector>
30 #include "tpool.h"
31 #include <assert.h>
32 #include <my_global.h>
33 #include <my_dbug.h>
34 #include <thr_timer.h>
35 #include <stdlib.h>
36
37 namespace tpool
38 {
39
40 #ifdef __linux__
41 extern aio* create_linux_aio(thread_pool* tp, int max_io);
42 #endif
43 #ifdef _WIN32
44 extern aio* create_win_aio(thread_pool* tp, int max_io);
45 #endif
46
47 static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
48 static const int OVERSUBSCRIBE_FACTOR = 2;
49
50 /**
51 Implementation of generic threadpool.
52 This threadpool consists of the following components
53
54 - The task queue. This queue is populated by submit()
55 - Worker that execute the work items.
56 - Timer thread that takes care of pool health
57
58 The task queue is populated by submit() method.
59 on submit(), a worker thread can be woken, or created
60 to execute tasks.
61
62 The timer thread watches if work items are being dequeued, and if not,
63 this can indicate potential deadlock.
64 Thus the timer thread can also wake or create a thread, to ensure some progress.
65
66 Optimizations:
67
68 - worker threads that are idle for long time will shutdown.
69 - worker threads are woken in LIFO order, which minimizes context switching
70 and also ensures that idle timeout works well. LIFO wakeup order ensures
71 that active threads stay active, and idle ones stay idle.
72
73 */
74
75 /**
76 Worker wakeup flags.
77 */
78 enum worker_wake_reason
79 {
80 WAKE_REASON_NONE,
81 WAKE_REASON_TASK,
82 WAKE_REASON_SHUTDOWN
83 };
84
85
86
87 /* A per-worker thread structure.*/
MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE)88 struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
89 {
90 /** Condition variable to wakeup this worker.*/
91 std::condition_variable m_cv;
92
93 /** Reason why worker was woken. */
94 worker_wake_reason m_wake_reason;
95
96 /**
97 If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
98 */
99 task* m_task;
100
101 /** Struct is member of intrusive doubly linked list */
102 worker_data* m_prev;
103 worker_data* m_next;
104
105 /* Current state of the worker.*/
106 enum state
107 {
108 NONE = 0,
109 EXECUTING_TASK = 1,
110 LONG_TASK = 2,
111 WAITING = 4
112 };
113
114 int m_state;
115
116 bool is_executing_task()
117 {
118 return m_state & EXECUTING_TASK;
119 }
120 bool is_long_task()
121 {
122 return m_state & LONG_TASK;
123 }
124 bool is_waiting()
125 {
126 return m_state & WAITING;
127 }
128 std::chrono::system_clock::time_point m_task_start_time;
129 worker_data() :
130 m_cv(),
131 m_wake_reason(WAKE_REASON_NONE),
132 m_task(),
133 m_prev(),
134 m_next(),
135 m_state(NONE),
136 m_task_start_time()
137 {}
138
139 /*Define custom new/delete because of overaligned structure. */
140 void* operator new(size_t size)
141 {
142 #ifdef _WIN32
143 return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
144 #else
145 void* ptr;
146 int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size);
147 return ret ? 0 : ptr;
148 #endif
149 }
150 void operator delete(void* p)
151 {
152 #ifdef _WIN32
153 _aligned_free(p);
154 #else
155 free(p);
156 #endif
157 }
158 };
159
160
161 static thread_local worker_data* tls_worker_data;
162
163 class thread_pool_generic : public thread_pool
164 {
165 /** Cache for per-worker structures */
166 cache<worker_data> m_thread_data_cache;
167
168 /** The task queue */
169 circular_queue<task*> m_task_queue;
170
171 /** List of standby (idle) workers */
172 doubly_linked_list<worker_data> m_standby_threads;
173
174 /** List of threads that are executing tasks */
175 doubly_linked_list<worker_data> m_active_threads;
176
177 /* Mutex that protects the whole struct, most importantly
178 the standby threads list, and task queue */
179 std::mutex m_mtx;
180
181 /** Timeout after which idle worker shuts down */
182 std::chrono::milliseconds m_thread_timeout;
183
184 /** How often should timer wakeup.*/
185 std::chrono::milliseconds m_timer_interval;
186
187 /** Another condition variable, used in pool shutdown */
188 std::condition_variable m_cv_no_threads;
189
190 /** Condition variable for the timer thread. Signaled on shutdown. */
191 std::condition_variable m_cv_timer;
192
193 /** Overall number of enqueues*/
194 unsigned long long m_tasks_enqueued;
195 unsigned long long m_group_enqueued;
196 /** Overall number of dequeued tasks. */
197 unsigned long long m_tasks_dequeued;
198
199 /** Statistic related, number of worker thread wakeups */
200 int m_wakeups;
201
202 /**
203 Statistic related, number of spurious thread wakeups
204 (i.e thread woke up, and the task queue is empty)
205 */
206 int m_spurious_wakeups;
207
208 /** The desired concurrency. This number of workers should be
209 actively executing. */
210 unsigned int m_concurrency;
211
212 /** True, if threadpool is being shutdown, false otherwise */
213 bool m_in_shutdown;
214
215 /** Maintenance timer state : true = active(ON),false = inactive(OFF)*/
216 enum class timer_state_t
217 {
218 OFF, ON
219 };
220 timer_state_t m_timer_state= timer_state_t::OFF;
221 void switch_timer(timer_state_t state);
222
223 /* Updates idle_since, and maybe switches the timer off */
224 void check_idle(std::chrono::system_clock::time_point now);
225
226 /** time point when timer last ran, used as a coarse clock. */
227 std::chrono::system_clock::time_point m_timestamp;
228
229 /** Number of long running tasks. The long running tasks are excluded when
230 adjusting concurrency */
231 unsigned int m_long_tasks_count;
232
233 unsigned int m_waiting_task_count;
234
235 /** Last time thread was created*/
236 std::chrono::system_clock::time_point m_last_thread_creation;
237
238 /** Minimumum number of threads in this pool.*/
239 unsigned int m_min_threads;
240
241 /** Maximimum number of threads in this pool. */
242 unsigned int m_max_threads;
243
244 /* maintenance related statistics (see maintenance()) */
245 size_t m_last_thread_count;
246 unsigned long long m_last_activity;
247
248 void worker_main(worker_data *thread_data);
249 void worker_end(worker_data* thread_data);
250
251 /* Checks threadpool responsiveness, adjusts thread_counts */
252 void maintenance();
maintenance_func(void * arg)253 static void maintenance_func(void* arg)
254 {
255 ((thread_pool_generic *)arg)->maintenance();
256 }
257 bool add_thread();
258 bool wake(worker_wake_reason reason, task *t = nullptr);
259 void maybe_wake_or_create_thread();
260 bool too_many_active_threads();
261 bool get_task(worker_data *thread_var, task **t);
262 bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
263 worker_data *thread_var);
264 void cancel_pending(task* t);
265
thread_count()266 size_t thread_count()
267 {
268 return m_active_threads.size() + m_standby_threads.size();
269 }
270 public:
271 thread_pool_generic(int min_threads, int max_threads);
272 ~thread_pool_generic();
273 void wait_begin() override;
274 void wait_end() override;
275 void submit_task(task *task) override;
create_native_aio(int max_io)276 virtual aio *create_native_aio(int max_io) override
277 {
278 #ifdef _WIN32
279 return create_win_aio(this, max_io);
280 #elif defined(__linux__)
281 return create_linux_aio(this,max_io);
282 #else
283 return nullptr;
284 #endif
285 }
286
287 class timer_generic : public thr_timer_t, public timer
288 {
289 thread_pool_generic* m_pool;
290 waitable_task m_task;
291 callback_func m_callback;
292 void* m_data;
293 int m_period;
294 std::mutex m_mtx;
295 bool m_on;
296 std::atomic<bool> m_running;
297
run()298 void run()
299 {
300 /*
301 In rare cases, multiple callbacks can be scheduled,
302 e.g with set_time(0,0) in a loop.
303 We do not allow parallel execution, as user is not prepared.
304 */
305 bool expected = false;
306 if (!m_running.compare_exchange_strong(expected, true))
307 return;
308
309 m_callback(m_data);
310 dbug_execute_after_task_callback();
311 m_running = false;
312
313 if (m_pool && m_period)
314 {
315 std::unique_lock<std::mutex> lk(m_mtx);
316 if (m_on)
317 {
318 DBUG_PUSH_EMPTY;
319 thr_timer_end(this);
320 thr_timer_settime(this, 1000ULL * m_period);
321 DBUG_POP_EMPTY;
322 }
323 }
324 }
325
execute(void * arg)326 static void execute(void* arg)
327 {
328 auto timer = (timer_generic*)arg;
329 timer->run();
330 }
331
submit_task(void * arg)332 static void submit_task(void* arg)
333 {
334 timer_generic* timer = (timer_generic*)arg;
335 timer->m_pool->submit_task(&timer->m_task);
336 }
337
338 public:
timer_generic(callback_func func,void * data,thread_pool_generic * pool)339 timer_generic(callback_func func, void* data, thread_pool_generic * pool):
340 m_pool(pool),
341 m_task(timer_generic::execute,this),
342 m_callback(func),m_data(data),m_period(0),m_mtx(),
343 m_on(true),m_running()
344 {
345 if (pool)
346 {
347 /* EXecute callback in threadpool*/
348 thr_timer_init(this, submit_task, this);
349 }
350 else
351 {
352 /* run in "timer" thread */
353 thr_timer_init(this, m_task.get_func(), m_task.get_arg());
354 }
355 }
356
set_time(int initial_delay_ms,int period_ms)357 void set_time(int initial_delay_ms, int period_ms) override
358 {
359 std::unique_lock<std::mutex> lk(m_mtx);
360 if (!m_on)
361 return;
362 thr_timer_end(this);
363 if (!m_pool)
364 thr_timer_set_period(this, 1000ULL * period_ms);
365 else
366 m_period = period_ms;
367 thr_timer_settime(this, 1000ULL * initial_delay_ms);
368 }
369
370 /*
371 Change only period of a periodic timer
372 (after the next execution). Workarounds
373 mysys timer deadlocks
374 */
set_period(int period_ms)375 void set_period(int period_ms)
376 {
377 std::unique_lock<std::mutex> lk(m_mtx);
378 if (!m_on)
379 return;
380 if (!m_pool)
381 thr_timer_set_period(this, 1000ULL * period_ms);
382 else
383 m_period = period_ms;
384 }
385
disarm()386 void disarm() override
387 {
388 std::unique_lock<std::mutex> lk(m_mtx);
389 m_on = false;
390 thr_timer_end(this);
391 lk.unlock();
392
393 if (m_task.m_group)
394 {
395 m_task.m_group->cancel_pending(&m_task);
396 }
397 if (m_pool)
398 {
399 m_pool->cancel_pending(&m_task);
400 }
401 m_task.wait();
402 }
403
~timer_generic()404 virtual ~timer_generic()
405 {
406 disarm();
407 }
408 };
409 timer_generic m_maintenance_timer;
create_timer(callback_func func,void * data)410 virtual timer* create_timer(callback_func func, void *data) override
411 {
412 return new timer_generic(func, data, this);
413 }
414 };
415
cancel_pending(task * t)416 void thread_pool_generic::cancel_pending(task* t)
417 {
418 std::unique_lock <std::mutex> lk(m_mtx);
419 for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
420 {
421 if (*it == t)
422 {
423 t->release();
424 *it = nullptr;
425 }
426 }
427 }
428 /**
429 Register worker in standby list, and wait to be woken.
430
431 @retval true if thread was woken
432 @retval false idle wait timeout exceeded (the current thread must shutdown)
433 */
wait_for_tasks(std::unique_lock<std::mutex> & lk,worker_data * thread_data)434 bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
435 worker_data *thread_data)
436 {
437 assert(m_task_queue.empty());
438 assert(!m_in_shutdown);
439
440 thread_data->m_wake_reason= WAKE_REASON_NONE;
441 m_active_threads.erase(thread_data);
442 m_standby_threads.push_back(thread_data);
443
444 for (;;)
445 {
446 thread_data->m_cv.wait_for(lk, m_thread_timeout);
447
448 if (thread_data->m_wake_reason != WAKE_REASON_NONE)
449 {
450 /* Woke up not due to timeout.*/
451 return true;
452 }
453
454 if (thread_count() <= m_min_threads)
455 {
456 /* Do not shutdown thread, maintain required minimum of worker
457 threads.*/
458 continue;
459 }
460
461 /*
462 Woke up due to timeout, remove this thread's from the standby list. In
463 all other cases where it is signaled it is removed by the signaling
464 thread.
465 */
466 m_standby_threads.erase(thread_data);
467 m_active_threads.push_back(thread_data);
468 return false;
469 }
470 }
471
472
473 /**
474 Workers "get next task" routine.
475
476 A task can be handed over to the current thread directly during submit().
477 if thread_var->m_wake_reason == WAKE_REASON_TASK.
478
479 Or a task can be taken from the task queue.
480 In case task queue is empty, the worker thread will park (wait for wakeup).
481 */
get_task(worker_data * thread_var,task ** t)482 bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
483 {
484 std::unique_lock<std::mutex> lk(m_mtx);
485
486 if (thread_var->is_long_task())
487 {
488 DBUG_ASSERT(m_long_tasks_count);
489 m_long_tasks_count--;
490 }
491 DBUG_ASSERT(!thread_var->is_waiting());
492 thread_var->m_state = worker_data::NONE;
493
494 while (m_task_queue.empty())
495 {
496 if (m_in_shutdown)
497 return false;
498
499 if (!wait_for_tasks(lk, thread_var))
500 return false;
501 if (m_task_queue.empty())
502 {
503 m_spurious_wakeups++;
504 continue;
505 }
506 }
507
508 /* Dequeue from the task queue.*/
509 *t= m_task_queue.front();
510 m_task_queue.pop();
511 m_tasks_dequeued++;
512 thread_var->m_state |= worker_data::EXECUTING_TASK;
513 thread_var->m_task_start_time = m_timestamp;
514 return true;
515 }
516
517 /** Worker thread shutdown routine. */
worker_end(worker_data * thread_data)518 void thread_pool_generic::worker_end(worker_data* thread_data)
519 {
520 std::lock_guard<std::mutex> lk(m_mtx);
521 DBUG_ASSERT(!thread_data->is_long_task());
522 m_active_threads.erase(thread_data);
523 m_thread_data_cache.put(thread_data);
524
525 if (!thread_count() && m_in_shutdown)
526 {
527 /* Signal the destructor that no more threads are left. */
528 m_cv_no_threads.notify_all();
529 }
530 }
531
532 extern "C" void set_tls_pool(tpool::thread_pool* pool);
533
534 /* The worker get/execute task loop.*/
worker_main(worker_data * thread_var)535 void thread_pool_generic::worker_main(worker_data *thread_var)
536 {
537 task* task;
538 set_tls_pool(this);
539 if(m_worker_init_callback)
540 m_worker_init_callback();
541
542 tls_worker_data = thread_var;
543
544 while (get_task(thread_var, &task) && task)
545 {
546 task->execute();
547 }
548
549 if (m_worker_destroy_callback)
550 m_worker_destroy_callback();
551
552 worker_end(thread_var);
553 }
554
555
556 /*
557 Check if threadpool had been idle for a while
558 Switch off maintenance timer if it is in idle state
559 for too long.
560
561 Helper function, to be used inside maintenance callback,
562 before m_last_activity is updated
563 */
564
565 static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
566 constexpr auto max_idle_time= std::chrono::minutes(1);
567
568 /* Time since maintenance timer had nothing to do */
569 static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
check_idle(std::chrono::system_clock::time_point now)570 void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
571 {
572 DBUG_ASSERT(m_task_queue.empty());
573
574 /*
575 We think that there is no activity, if there were at most 2 tasks
576 since last time, and there is a spare thread.
577 The 2 tasks (and not 0) is to account for some periodic timers.
578 */
579 bool idle= m_standby_threads.m_count > 0;
580
581 if (!idle)
582 {
583 idle_since= invalid_timestamp;
584 return;
585 }
586
587 if (idle_since == invalid_timestamp)
588 {
589 idle_since= now;
590 return;
591 }
592
593 /* Switch timer off after 1 minute of idle time */
594 if (now - idle_since > max_idle_time)
595 {
596 idle_since= invalid_timestamp;
597 switch_timer(timer_state_t::OFF);
598 }
599 }
600
601
602 /*
603 Periodic job to fix thread count and concurrency,
604 in case of long tasks, etc
605 */
maintenance()606 void thread_pool_generic::maintenance()
607 {
608 /*
609 If pool is busy (i.e the its mutex is currently locked), we can
610 skip the maintenance task, some times, to lower mutex contention
611 */
612 static int skip_counter;
613 const int MAX_SKIPS = 10;
614 std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
615 if (skip_counter == MAX_SKIPS)
616 {
617 lk.lock();
618 }
619 else if (!lk.try_lock())
620 {
621 skip_counter++;
622 return;
623 }
624
625 skip_counter = 0;
626
627 m_timestamp = std::chrono::system_clock::now();
628
629 if (m_task_queue.empty())
630 {
631 check_idle(m_timestamp);
632 m_last_activity = m_tasks_dequeued + m_wakeups;
633 return;
634 }
635
636 m_long_tasks_count = 0;
637 for (auto thread_data = m_active_threads.front();
638 thread_data;
639 thread_data = thread_data->m_next)
640 {
641 if (thread_data->is_executing_task() &&
642 !thread_data->is_waiting() &&
643 (thread_data->is_long_task()
644 || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
645 {
646 thread_data->m_state |= worker_data::LONG_TASK;
647 m_long_tasks_count++;
648 }
649 }
650
651 maybe_wake_or_create_thread();
652
653 size_t thread_cnt = (int)thread_count();
654 if (m_last_activity == m_tasks_dequeued + m_wakeups &&
655 m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
656 {
657 // no progress made since last iteration. create new
658 // thread
659 add_thread();
660 }
661 m_last_activity = m_tasks_dequeued + m_wakeups;
662 m_last_thread_count= thread_cnt;
663 }
664
665 /*
666 Heuristic used for thread creation throttling.
667 Returns interval in milliseconds between thread creation
668 (depending on number of threads already in the pool, and
669 desired concurrency level)
670 */
throttling_interval_ms(size_t n_threads,size_t concurrency)671 static int throttling_interval_ms(size_t n_threads,size_t concurrency)
672 {
673 if (n_threads < concurrency*4)
674 return 0;
675
676 if (n_threads < concurrency*8)
677 return 50;
678
679 if (n_threads < concurrency*16)
680 return 100;
681
682 return 200;
683 }
684
685 /* Create a new worker.*/
add_thread()686 bool thread_pool_generic::add_thread()
687 {
688 size_t n_threads = thread_count();
689
690 if (n_threads >= m_max_threads)
691 return false;
692
693 if (n_threads >= m_min_threads)
694 {
695 auto now = std::chrono::system_clock::now();
696 if (now - m_last_thread_creation <
697 std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
698 {
699 /*
700 Throttle thread creation and wakeup deadlock detection timer,
701 if is it off.
702 */
703 switch_timer(timer_state_t::ON);
704
705 return false;
706 }
707 }
708
709 worker_data *thread_data = m_thread_data_cache.get();
710 m_active_threads.push_back(thread_data);
711 try
712 {
713 std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
714 m_last_thread_creation = std::chrono::system_clock::now();
715 thread.detach();
716 }
717 catch (std::system_error& e)
718 {
719 m_active_threads.erase(thread_data);
720 m_thread_data_cache.put(thread_data);
721 static bool warning_written;
722 if (!warning_written)
723 {
724 fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
725 "current number of threads in pool %zu\n", e.what(), thread_count());
726 warning_written = true;
727 }
728 return false;
729 }
730 return true;
731 }
732
733 /** Wake a standby thread, and hand the given task over to this thread. */
wake(worker_wake_reason reason,task *)734 bool thread_pool_generic::wake(worker_wake_reason reason, task *)
735 {
736 assert(reason != WAKE_REASON_NONE);
737
738 if (m_standby_threads.empty())
739 return false;
740 auto var= m_standby_threads.back();
741 m_standby_threads.pop_back();
742 m_active_threads.push_back(var);
743 assert(var->m_wake_reason == WAKE_REASON_NONE);
744 var->m_wake_reason= reason;
745 var->m_cv.notify_one();
746 m_wakeups++;
747 return true;
748 }
749
750
thread_pool_generic(int min_threads,int max_threads)751 thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
752 m_thread_data_cache(max_threads),
753 m_task_queue(10000),
754 m_standby_threads(),
755 m_active_threads(),
756 m_mtx(),
757 m_thread_timeout(std::chrono::milliseconds(60000)),
758 m_timer_interval(std::chrono::milliseconds(400)),
759 m_cv_no_threads(),
760 m_cv_timer(),
761 m_tasks_enqueued(),
762 m_tasks_dequeued(),
763 m_wakeups(),
764 m_spurious_wakeups(),
765 m_concurrency(std::thread::hardware_concurrency()*2),
766 m_in_shutdown(),
767 m_timestamp(),
768 m_long_tasks_count(),
769 m_waiting_task_count(),
770 m_last_thread_creation(),
771 m_min_threads(min_threads),
772 m_max_threads(max_threads),
773 m_last_thread_count(),
774 m_last_activity(),
775 m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr)
776 {
777
778 if (m_max_threads < m_concurrency)
779 m_concurrency = m_max_threads;
780 if (m_min_threads > m_concurrency)
781 m_concurrency = min_threads;
782 if (!m_concurrency)
783 m_concurrency = 1;
784
785 // start the timer
786 m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
787 }
788
789
maybe_wake_or_create_thread()790 void thread_pool_generic::maybe_wake_or_create_thread()
791 {
792 if (m_task_queue.empty())
793 return;
794 DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count));
795 if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
796 return;
797 if (!m_standby_threads.empty())
798 {
799 wake(WAKE_REASON_TASK);
800 }
801 else
802 {
803 add_thread();
804 }
805 }
806
too_many_active_threads()807 bool thread_pool_generic::too_many_active_threads()
808 {
809 return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
810 m_concurrency* OVERSUBSCRIBE_FACTOR;
811 }
812
813 /** Submit a new task*/
submit_task(task * task)814 void thread_pool_generic::submit_task(task* task)
815 {
816 std::unique_lock<std::mutex> lk(m_mtx);
817 if (m_in_shutdown)
818 return;
819 task->add_ref();
820 m_tasks_enqueued++;
821 m_task_queue.push(task);
822 maybe_wake_or_create_thread();
823 }
824
825
826 /* Notify thread pool that current thread is going to wait */
wait_begin()827 void thread_pool_generic::wait_begin()
828 {
829 if (!tls_worker_data || tls_worker_data->is_long_task())
830 return;
831 std::unique_lock<std::mutex> lk(m_mtx);
832 if(tls_worker_data->is_long_task())
833 {
834 /*
835 Current task flag could have become "long-running"
836 while waiting for the lock, thus recheck.
837 */
838 return;
839 }
840 DBUG_ASSERT(!tls_worker_data->is_waiting());
841 tls_worker_data->m_state |= worker_data::WAITING;
842 m_waiting_task_count++;
843
844 /* Maintain concurrency */
845 maybe_wake_or_create_thread();
846 }
847
848
wait_end()849 void thread_pool_generic::wait_end()
850 {
851 if (tls_worker_data && tls_worker_data->is_waiting())
852 {
853 std::unique_lock<std::mutex> lk(m_mtx);
854 tls_worker_data->m_state &= ~worker_data::WAITING;
855 m_waiting_task_count--;
856 }
857 }
858
859
switch_timer(timer_state_t state)860 void thread_pool_generic::switch_timer(timer_state_t state)
861 {
862 if (m_timer_state == state)
863 return;
864 /*
865 We can't use timer::set_time, because mysys timers are deadlock
866 prone.
867
868 Instead, to switch off we increase the timer period
869 and decrease period to switch on.
870
871 This might introduce delays in thread creation when needed,
872 as period will only be changed when timer fires next time.
873 For this reason, we can't use very long periods for the "off" state.
874 */
875 m_timer_state= state;
876 long long period= (state == timer_state_t::OFF) ?
877 m_timer_interval.count()*10: m_timer_interval.count();
878
879 m_maintenance_timer.set_period((int)period);
880 }
881
882
883 /**
884 Wake up all workers, and wait until they are gone
885 Stop the timer.
886 */
~thread_pool_generic()887 thread_pool_generic::~thread_pool_generic()
888 {
889 /*
890 Stop AIO early.
891 This is needed to prevent AIO completion thread
892 from calling submit_task() on an object that is being destroyed.
893 */
894 m_aio.reset();
895
896 /* Also stop the maintanence task early. */
897 m_maintenance_timer.disarm();
898
899 std::unique_lock<std::mutex> lk(m_mtx);
900 m_in_shutdown= true;
901
902 /* Wake up idle threads. */
903 while (wake(WAKE_REASON_SHUTDOWN))
904 {
905 }
906
907 while (thread_count())
908 {
909 m_cv_no_threads.wait(lk);
910 }
911
912 lk.unlock();
913 }
914
create_thread_pool_generic(int min_threads,int max_threads)915 thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
916 {
917 return new thread_pool_generic(min_threads, max_threads);
918 }
919
920 } // namespace tpool
921