1 /*
2  * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 #ifndef _NGS_SCHEDULER_H_
27 #define _NGS_SCHEDULER_H_
28 
29 #include "ngs/thread.h"
30 #include "ngs/memory.h"
31 #include "ngs_common/atomic.h"
32 #include <string>
33 #include <vector>
34 #include <list>
35 
36 
37 namespace ngs
38 {
39   // Scheduler with dynamic thread pool.
40   class Scheduler_dynamic
41   {
42   public:
43     class Monitor_interface
44     {
45     public:
~Monitor_interface()46       virtual ~Monitor_interface() {}
47 
48       virtual void on_worker_thread_create() = 0;
49       virtual void on_worker_thread_destroy() = 0;
50       virtual void on_task_start() = 0;
51       virtual void on_task_end() = 0;
52     };
53 
54     typedef ngs::function<void()> Task;
55 
56     Scheduler_dynamic(const char* name, PSI_thread_key thread_key = PSI_NOT_INSTRUMENTED);
57     virtual ~Scheduler_dynamic();
58 
59     virtual void launch();
60     virtual void stop();
61     virtual unsigned int set_num_workers(unsigned int n);
62     void set_idle_worker_timeout(unsigned long long milliseconds);
63     bool post(Task* task);
64     bool post(const Task& task);
65     bool post_and_wait(const Task& task);
66 
thread_init()67     virtual bool thread_init() { return true; }
68     virtual void thread_end();
69 
70     void set_monitor(Monitor_interface *monitor);
71 
72     bool is_worker_thread(my_thread_t thread_id);
73     bool is_running();
74     void join_terminating_workers();
75 
76   private:
77     template<typename Element_type>
78     class lock_list
79     {
80     public:
lock_list()81       lock_list()
82       : m_access_mutex(KEY_mutex_x_lock_list_access)
83       {
84       }
85 
empty()86       bool empty()
87       {
88         Mutex_lock guard(m_access_mutex);
89         return m_list.empty();
90       }
91 
push(const Element_type & t)92       bool push(const Element_type &t)
93       {
94         Mutex_lock guard(m_access_mutex);
95         m_list.push_back(t);
96         return true;
97       }
98 
pop(Element_type & result)99       bool pop(Element_type &result)
100       {
101         Mutex_lock guard(m_access_mutex);
102         if (m_list.empty())
103           return false;
104 
105         result = m_list.front();
106 
107         m_list.pop_front();
108         return true;
109       }
110 
remove_if(Element_type & result,ngs::function<bool (Element_type &)> matches)111       bool remove_if(Element_type &result, ngs::function<bool(Element_type &)> matches)
112       {
113         Mutex_lock guard(m_access_mutex);
114         for (typename std::list<Element_type>::iterator it = m_list.begin(); it != m_list.end(); ++it)
115         {
116           if (matches(*it))
117           {
118             result = *it;
119             m_list.erase(it);
120             return true;
121           }
122         }
123 
124         return false;
125       }
126 
127     private:
128       Mutex m_access_mutex;
129       std::list<Element_type> m_list;
130     };
131 
132     Scheduler_dynamic(const Scheduler_dynamic&);
133     Scheduler_dynamic& operator=(const Scheduler_dynamic&);
134 
135     static void* worker_proxy(void* data);
136     void* worker();
137 
138     void create_thread();
139     void create_min_num_workers();
140 
thread_id_matches(Thread_t & thread,my_thread_t id)141     static bool thread_id_matches(Thread_t& thread, my_thread_t id)
142     {
143       return thread.thread == id;
144     }
145 
146     bool wait_if_idle_then_delete_worker(ulonglong &thread_waiting_started);
147     int32 increase_workers_count();
148     int32 decrease_workers_count();
149     int32 increase_tasks_count();
150     int32 decrease_tasks_count();
151 
152     const std::string m_name;
153     Mutex m_worker_pending_mutex;
154     Cond m_worker_pending_cond;
155     Mutex m_thread_exit_mutex;
156     Cond m_thread_exit_cond;
157     Mutex m_post_mutex;
158     volatile ngs::atomic<int32> m_is_running;
159     volatile ngs::atomic<int32> m_min_workers_count;
160     volatile ngs::atomic<int32> m_workers_count;
161     volatile ngs::atomic<int32> m_tasks_count;
162     volatile ngs::atomic<int64> m_idle_worker_timeout; // milliseconds
163     lock_list<Task *> m_tasks;
164     lock_list<Thread_t> m_threads;
165     lock_list<my_thread_t> m_terminating_workers;
166     ngs::Memory_instrumented<Monitor_interface>::Unique_ptr m_monitor;
167     PSI_thread_key m_thread_key;
168   };
169 }
170 
171 #endif
172