1 /* Copyright(C) 2019 MariaDB
2
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15
16 #include "tpool_structs.h"
17 #include <stdlib.h>
18 #include <tpool.h>
19 #include <windows.h>
20 #include <atomic>
21
22 /**
23 Implementation of tpool/aio based on Windows native threadpool.
24 */
25
26 namespace tpool
27 {
28 /**
29 Pool, based on Windows native(Vista+) threadpool.
30 */
31 class thread_pool_win : public thread_pool
32 {
33 /**
34 Handle per-thread init/term functions.
35 Since it is Windows that creates thread, and not us,
36 it is tricky. We employ thread local storage data
37 and check whether init function was called, inside every callback.
38 */
39 struct tls_data
40 {
41 thread_pool_win *m_pool;
~tls_datatpool::thread_pool_win::tls_data42 ~tls_data()
43 {
44 /* Call thread termination function. */
45 if (!m_pool)
46 return;
47
48 if (m_pool->m_worker_destroy_callback)
49 m_pool->m_worker_destroy_callback();
50
51 m_pool->m_thread_count--;
52 }
53 /** This needs to be called before every IO or simple task callback.*/
callback_prologtpool::thread_pool_win::tls_data54 void callback_prolog(thread_pool_win* pool)
55 {
56 assert(pool);
57 assert(!m_pool || (m_pool == pool));
58 if (m_pool)
59 {
60 // TLS data already initialized.
61 return;
62 }
63 m_pool = pool;
64 m_pool->m_thread_count++;
65 // Call the thread init function.
66 if (m_pool->m_worker_init_callback)
67 m_pool->m_worker_init_callback();
68 }
69 };
70
71 static thread_local struct tls_data tls_data;
72 /** Timer */
73 class native_timer : public timer
74 {
75 std::mutex m_mtx; // protects against parallel execution
76 std::mutex m_shutdown_mtx; // protects m_on
77 PTP_TIMER m_ptp_timer;
78 callback_func m_func;
79 void *m_data;
80 thread_pool_win& m_pool;
81 int m_period;
82 bool m_on;
83
timer_callback(PTP_CALLBACK_INSTANCE callback_instance,void * context,PTP_TIMER callback_timer)84 static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context,
85 PTP_TIMER callback_timer)
86 {
87 native_timer *timer= (native_timer *) context;
88 tls_data.callback_prolog(&timer->m_pool);
89 std::unique_lock<std::mutex> lk(timer->m_mtx, std::defer_lock);
90 if (!lk.try_lock())
91 {
92 /* Do not try to run timers in parallel */
93 return;
94 }
95 timer->m_func(timer->m_data);
96 dbug_execute_after_task_callback();
97 if (timer->m_period)
98 timer->set_time(timer->m_period, timer->m_period);
99 }
100
101 public:
native_timer(thread_pool_win & pool,callback_func func,void * data)102 native_timer(thread_pool_win& pool, callback_func func, void* data) :
103 m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true)
104 {
105 m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env);
106 }
set_time(int initial_delay_ms,int period_ms)107 void set_time(int initial_delay_ms, int period_ms) override
108 {
109 std::unique_lock<std::mutex> lk(m_shutdown_mtx);
110 if (!m_on)
111 return;
112 long long initial_delay = -10000LL * initial_delay_ms;
113 SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0);
114 SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100);
115 m_period = period_ms;
116 }
disarm()117 void disarm() override
118 {
119 std::unique_lock<std::mutex> lk(m_shutdown_mtx);
120 m_on = false;
121 SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0);
122 lk.unlock();
123 /* Don't do it in timer callback, that will hang*/
124 WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE);
125 }
126
~native_timer()127 ~native_timer()
128 {
129 disarm();
130 CloseThreadpoolTimer(m_ptp_timer);
131 }
132 };
133 /** AIO handler */
134 class native_aio : public aio
135 {
136 thread_pool_win& m_pool;
137
138 public:
native_aio(thread_pool_win & pool,int max_io)139 native_aio(thread_pool_win &pool, int max_io)
140 : m_pool(pool)
141 {
142 }
143
144 /**
145 Submit async IO.
146 */
submit_io(aiocb * cb)147 virtual int submit_io(aiocb* cb) override
148 {
149 memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
150
151 ULARGE_INTEGER uli;
152 uli.QuadPart = cb->m_offset;
153 cb->Offset = uli.LowPart;
154 cb->OffsetHigh = uli.HighPart;
155 cb->m_internal = this;
156 StartThreadpoolIo(cb->m_fh.m_ptp_io);
157
158 BOOL ok;
159 if (cb->m_opcode == aio_opcode::AIO_PREAD)
160 ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
161 else
162 ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
163
164 if (ok || (GetLastError() == ERROR_IO_PENDING))
165 return 0;
166
167 CancelThreadpoolIo(cb->m_fh.m_ptp_io);
168 return -1;
169 }
170
171 /**
172 PTP_WIN32_IO_CALLBACK-typed function, required parameter for
173 CreateThreadpoolIo(). The user callback and other auxiliary data is put into
174 the extended OVERLAPPED parameter.
175 */
io_completion_callback(PTP_CALLBACK_INSTANCE instance,PVOID context,PVOID overlapped,ULONG io_result,ULONG_PTR nbytes,PTP_IO io)176 static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
177 PVOID context, PVOID overlapped,
178 ULONG io_result, ULONG_PTR nbytes,
179 PTP_IO io)
180 {
181 aiocb* cb = (aiocb*)overlapped;
182 native_aio* aio = (native_aio*)cb->m_internal;
183 tls_data.callback_prolog(&aio->m_pool);
184 cb->m_err = io_result;
185 cb->m_ret_len = (int)nbytes;
186 cb->m_internal_task.m_func = cb->m_callback;
187 cb->m_internal_task.m_group = cb->m_group;
188 cb->m_internal_task.m_arg = cb;
189 cb->m_internal_task.execute();
190 }
191
192 /**
193 Binds the file handle via CreateThreadpoolIo().
194 */
bind(native_file_handle & fd)195 virtual int bind(native_file_handle& fd) override
196 {
197 fd.m_ptp_io =
198 CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env));
199 if (fd.m_ptp_io)
200 return 0;
201 return -1;
202 }
203
204 /**
205 Unbind the file handle via CloseThreadpoolIo.
206 */
unbind(const native_file_handle & fd)207 virtual int unbind(const native_file_handle& fd) override
208 {
209 if (fd.m_ptp_io)
210 CloseThreadpoolIo(fd.m_ptp_io);
211 return 0;
212 }
213 };
214
215 PTP_POOL m_ptp_pool;
216 TP_CALLBACK_ENVIRON m_env;
217 PTP_CLEANUP_GROUP m_cleanup;
218 const int TASK_CACHE_SIZE= 10000;
219
220 struct task_cache_entry
221 {
222 thread_pool_win *m_pool;
223 task* m_task;
224 };
225 cache<task_cache_entry> m_task_cache;
226 std::atomic<int> m_thread_count;
227 public:
thread_pool_win(int min_threads=0,int max_threads=0)228 thread_pool_win(int min_threads= 0, int max_threads= 0)
229 : m_task_cache(TASK_CACHE_SIZE),m_thread_count(0)
230 {
231 InitializeThreadpoolEnvironment(&m_env);
232 m_ptp_pool= CreateThreadpool(NULL);
233 m_cleanup= CreateThreadpoolCleanupGroup();
234 SetThreadpoolCallbackPool(&m_env, m_ptp_pool);
235 SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0);
236 if (min_threads)
237 SetThreadpoolThreadMinimum(m_ptp_pool, min_threads);
238 if (max_threads)
239 SetThreadpoolThreadMaximum(m_ptp_pool, max_threads);
240 }
~thread_pool_win()241 ~thread_pool_win()
242 {
243 CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL);
244 CloseThreadpoolCleanupGroup(m_cleanup);
245 CloseThreadpool(m_ptp_pool);
246
247 // Wait until all threads finished and TLS destructors ran.
248 while(m_thread_count)
249 Sleep(1);
250 }
251 /**
252 PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback()
253 */
task_callback(PTP_CALLBACK_INSTANCE,void * param)254 static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param)
255 {
256 auto entry= (task_cache_entry *) param;
257 auto task= entry->m_task;
258
259 tls_data.callback_prolog(entry->m_pool);
260
261 entry->m_pool->m_task_cache.put(entry);
262
263 task->execute();
264 }
submit_task(task * task)265 virtual void submit_task(task *task) override
266 {
267 auto entry= m_task_cache.get();
268 task->add_ref();
269 entry->m_pool= this;
270 entry->m_task= task;
271 if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env))
272 abort();
273 }
274
create_native_aio(int max_io)275 aio *create_native_aio(int max_io) override
276 {
277 return new native_aio(*this, max_io);
278 }
279
create_timer(callback_func func,void * data)280 timer* create_timer(callback_func func, void* data) override
281 {
282 return new native_timer(*this, func, data);
283 }
284 };
285
286 thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data;
287
create_thread_pool_win(int min_threads,int max_threads)288 thread_pool *create_thread_pool_win(int min_threads, int max_threads)
289 {
290 return new thread_pool_win(min_threads, max_threads);
291 }
292 } // namespace tpool
293