1 /* 2 * Copyright (c) 2015, 2021, Oracle and/or its affiliates. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License, version 2.0, 6 * as published by the Free Software Foundation. 7 * 8 * This program is also distributed with certain software (including 9 * but not limited to OpenSSL) that is licensed under separate terms, 10 * as designated in a particular file or component or in included license 11 * documentation. The authors of MySQL hereby grant you an additional 12 * permission to link the program and your derivative works with the 13 * separately licensed software that they have included with MySQL. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU General Public License, version 2.0, for more details. 19 * 20 * You should have received a copy of the GNU General Public License 21 * along with this program; if not, write to the Free Software 22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 23 * 02110-1301 USA 24 */ 25 26 #ifndef _NGS_SCHEDULER_H_ 27 #define _NGS_SCHEDULER_H_ 28 29 #include "ngs/thread.h" 30 #include "ngs/memory.h" 31 #include "ngs_common/atomic.h" 32 #include <string> 33 #include <vector> 34 #include <list> 35 36 37 namespace ngs 38 { 39 // Scheduler with dynamic thread pool. 40 class Scheduler_dynamic 41 { 42 public: 43 class Monitor_interface 44 { 45 public: ~Monitor_interface()46 virtual ~Monitor_interface() {} 47 48 virtual void on_worker_thread_create() = 0; 49 virtual void on_worker_thread_destroy() = 0; 50 virtual void on_task_start() = 0; 51 virtual void on_task_end() = 0; 52 }; 53 54 typedef ngs::function<void()> Task; 55 56 Scheduler_dynamic(const char* name, PSI_thread_key thread_key = PSI_NOT_INSTRUMENTED); 57 virtual ~Scheduler_dynamic(); 58 59 virtual void launch(); 60 virtual void stop(); 61 virtual unsigned int set_num_workers(unsigned int n); 62 void set_idle_worker_timeout(unsigned long long milliseconds); 63 bool post(Task* task); 64 bool post(const Task& task); 65 bool post_and_wait(const Task& task); 66 thread_init()67 virtual bool thread_init() { return true; } 68 virtual void thread_end(); 69 70 void set_monitor(Monitor_interface *monitor); 71 72 bool is_worker_thread(my_thread_t thread_id); 73 bool is_running(); 74 void join_terminating_workers(); 75 76 private: 77 template<typename Element_type> 78 class lock_list 79 { 80 public: lock_list()81 lock_list() 82 : m_access_mutex(KEY_mutex_x_lock_list_access) 83 { 84 } 85 empty()86 bool empty() 87 { 88 Mutex_lock guard(m_access_mutex); 89 return m_list.empty(); 90 } 91 push(const Element_type & t)92 bool push(const Element_type &t) 93 { 94 Mutex_lock guard(m_access_mutex); 95 m_list.push_back(t); 96 return true; 97 } 98 pop(Element_type & result)99 bool pop(Element_type &result) 100 { 101 Mutex_lock guard(m_access_mutex); 102 if (m_list.empty()) 103 return false; 104 105 result = m_list.front(); 106 107 m_list.pop_front(); 108 return true; 109 } 110 remove_if(Element_type & result,ngs::function<bool (Element_type &)> matches)111 bool remove_if(Element_type &result, ngs::function<bool(Element_type &)> matches) 112 { 113 Mutex_lock guard(m_access_mutex); 114 for (typename std::list<Element_type>::iterator it = m_list.begin(); it != m_list.end(); ++it) 115 { 116 if (matches(*it)) 117 { 118 result = *it; 119 m_list.erase(it); 120 return true; 121 } 122 } 123 124 return false; 125 } 126 127 private: 128 Mutex m_access_mutex; 129 std::list<Element_type> m_list; 130 }; 131 132 Scheduler_dynamic(const Scheduler_dynamic&); 133 Scheduler_dynamic& operator=(const Scheduler_dynamic&); 134 135 static void* worker_proxy(void* data); 136 void* worker(); 137 138 void create_thread(); 139 void create_min_num_workers(); 140 thread_id_matches(Thread_t & thread,my_thread_t id)141 static bool thread_id_matches(Thread_t& thread, my_thread_t id) 142 { 143 return thread.thread == id; 144 } 145 146 bool wait_if_idle_then_delete_worker(ulonglong &thread_waiting_started); 147 int32 increase_workers_count(); 148 int32 decrease_workers_count(); 149 int32 increase_tasks_count(); 150 int32 decrease_tasks_count(); 151 152 const std::string m_name; 153 Mutex m_worker_pending_mutex; 154 Cond m_worker_pending_cond; 155 Mutex m_thread_exit_mutex; 156 Cond m_thread_exit_cond; 157 Mutex m_post_mutex; 158 volatile ngs::atomic<int32> m_is_running; 159 volatile ngs::atomic<int32> m_min_workers_count; 160 volatile ngs::atomic<int32> m_workers_count; 161 volatile ngs::atomic<int32> m_tasks_count; 162 volatile ngs::atomic<int64> m_idle_worker_timeout; // milliseconds 163 lock_list<Task *> m_tasks; 164 lock_list<Thread_t> m_threads; 165 lock_list<my_thread_t> m_terminating_workers; 166 ngs::Memory_instrumented<Monitor_interface>::Unique_ptr m_monitor; 167 PSI_thread_key m_thread_key; 168 }; 169 } 170 171 #endif 172