1 /* Copyright(C) 2019 MariaDB
2 
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6 
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11 
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15 
16 #include "tpool_structs.h"
17 #include <stdlib.h>
18 #include <tpool.h>
19 #include <windows.h>
20 #include <atomic>
21 
22 /**
23  Implementation of tpool/aio based on Windows native threadpool.
24 */
25 
26 namespace tpool
27 {
28 /**
29  Pool, based on Windows native(Vista+) threadpool.
30 */
31 class thread_pool_win : public thread_pool
32 {
33   /**
34     Handle per-thread init/term functions.
35     Since it is Windows that creates thread, and not us,
36     it is tricky. We employ thread local storage data
37     and check whether init function was called, inside every callback.
38   */
39   struct tls_data
40   {
41     thread_pool_win *m_pool;
~tls_datatpool::thread_pool_win::tls_data42     ~tls_data()
43     {
44       /* Call thread termination function. */
45       if (!m_pool)
46         return;
47 
48       if (m_pool->m_worker_destroy_callback)
49         m_pool->m_worker_destroy_callback();
50 
51       m_pool->m_thread_count--;
52     }
53     /** This needs to be called before every IO or simple task callback.*/
callback_prologtpool::thread_pool_win::tls_data54     void callback_prolog(thread_pool_win* pool)
55     {
56       assert(pool);
57       assert(!m_pool || (m_pool == pool));
58       if (m_pool)
59       {
60         // TLS data already initialized.
61         return;
62       }
63       m_pool = pool;
64       m_pool->m_thread_count++;
65       // Call the thread init function.
66       if (m_pool->m_worker_init_callback)
67         m_pool->m_worker_init_callback();
68     }
69   };
70 
71   static thread_local struct tls_data tls_data;
72   /** Timer */
73   class native_timer : public timer
74   {
75     std::mutex m_mtx; // protects against parallel execution
76     std::mutex m_shutdown_mtx; // protects m_on
77     PTP_TIMER m_ptp_timer;
78     callback_func m_func;
79     void *m_data;
80     thread_pool_win& m_pool;
81     int m_period;
82     bool m_on;
83 
timer_callback(PTP_CALLBACK_INSTANCE callback_instance,void * context,PTP_TIMER callback_timer)84     static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context,
85                                         PTP_TIMER callback_timer)
86     {
87       native_timer *timer= (native_timer *) context;
88       tls_data.callback_prolog(&timer->m_pool);
89       std::unique_lock<std::mutex> lk(timer->m_mtx, std::defer_lock);
90       if (!lk.try_lock())
91       {
92         /* Do not try to run timers in parallel */
93         return;
94       }
95       timer->m_func(timer->m_data);
96       dbug_execute_after_task_callback();
97       if (timer->m_period)
98         timer->set_time(timer->m_period, timer->m_period);
99     }
100 
101   public:
native_timer(thread_pool_win & pool,callback_func func,void * data)102      native_timer(thread_pool_win& pool, callback_func func, void* data) :
103           m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true)
104     {
105       m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env);
106     }
set_time(int initial_delay_ms,int period_ms)107     void set_time(int initial_delay_ms, int period_ms) override
108     {
109       std::unique_lock<std::mutex> lk(m_shutdown_mtx);
110       if (!m_on)
111         return;
112       long long initial_delay = -10000LL * initial_delay_ms;
113       SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0);
114       SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100);
115       m_period = period_ms;
116     }
disarm()117     void disarm() override
118     {
119       std::unique_lock<std::mutex> lk(m_shutdown_mtx);
120       m_on = false;
121       SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0);
122       lk.unlock();
123       /* Don't do it in timer callback, that will hang*/
124       WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE);
125     }
126 
~native_timer()127     ~native_timer()
128     {
129       disarm();
130       CloseThreadpoolTimer(m_ptp_timer);
131     }
132   };
133   /** AIO handler */
134   class native_aio : public aio
135   {
136     thread_pool_win& m_pool;
137 
138   public:
native_aio(thread_pool_win & pool,int max_io)139     native_aio(thread_pool_win &pool, int max_io)
140       : m_pool(pool)
141     {
142     }
143 
144     /**
145      Submit async IO.
146     */
submit_io(aiocb * cb)147     virtual int submit_io(aiocb* cb) override
148     {
149       memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
150 
151       ULARGE_INTEGER uli;
152       uli.QuadPart = cb->m_offset;
153       cb->Offset = uli.LowPart;
154       cb->OffsetHigh = uli.HighPart;
155       cb->m_internal = this;
156       StartThreadpoolIo(cb->m_fh.m_ptp_io);
157 
158       BOOL ok;
159       if (cb->m_opcode == aio_opcode::AIO_PREAD)
160         ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
161       else
162         ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
163 
164       if (ok || (GetLastError() == ERROR_IO_PENDING))
165         return 0;
166 
167       CancelThreadpoolIo(cb->m_fh.m_ptp_io);
168       return -1;
169     }
170 
171     /**
172      PTP_WIN32_IO_CALLBACK-typed function, required parameter for
173      CreateThreadpoolIo(). The user callback and other auxiliary data is put into
174      the extended OVERLAPPED parameter.
175     */
io_completion_callback(PTP_CALLBACK_INSTANCE instance,PVOID context,PVOID overlapped,ULONG io_result,ULONG_PTR nbytes,PTP_IO io)176     static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
177       PVOID context, PVOID overlapped,
178       ULONG io_result, ULONG_PTR nbytes,
179       PTP_IO io)
180     {
181       aiocb* cb = (aiocb*)overlapped;
182       native_aio* aio = (native_aio*)cb->m_internal;
183       tls_data.callback_prolog(&aio->m_pool);
184       cb->m_err = io_result;
185       cb->m_ret_len = (int)nbytes;
186       cb->m_internal_task.m_func = cb->m_callback;
187       cb->m_internal_task.m_group = cb->m_group;
188       cb->m_internal_task.m_arg = cb;
189       cb->m_internal_task.execute();
190     }
191 
192     /**
193       Binds the file handle via CreateThreadpoolIo().
194     */
bind(native_file_handle & fd)195     virtual int bind(native_file_handle& fd) override
196     {
197       fd.m_ptp_io =
198         CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env));
199       if (fd.m_ptp_io)
200         return 0;
201       return -1;
202     }
203 
204     /**
205       Unbind the file handle via CloseThreadpoolIo.
206     */
unbind(const native_file_handle & fd)207     virtual int unbind(const native_file_handle& fd) override
208     {
209       if (fd.m_ptp_io)
210         CloseThreadpoolIo(fd.m_ptp_io);
211       return 0;
212     }
213   };
214 
215   PTP_POOL m_ptp_pool;
216   TP_CALLBACK_ENVIRON m_env;
217   PTP_CLEANUP_GROUP m_cleanup;
218   const int TASK_CACHE_SIZE= 10000;
219 
220   struct task_cache_entry
221   {
222     thread_pool_win *m_pool;
223     task* m_task;
224   };
225   cache<task_cache_entry> m_task_cache;
226   std::atomic<int> m_thread_count;
227 public:
thread_pool_win(int min_threads=0,int max_threads=0)228   thread_pool_win(int min_threads= 0, int max_threads= 0)
229       : m_task_cache(TASK_CACHE_SIZE),m_thread_count(0)
230   {
231     InitializeThreadpoolEnvironment(&m_env);
232     m_ptp_pool= CreateThreadpool(NULL);
233     m_cleanup= CreateThreadpoolCleanupGroup();
234     SetThreadpoolCallbackPool(&m_env, m_ptp_pool);
235     SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0);
236     if (min_threads)
237       SetThreadpoolThreadMinimum(m_ptp_pool, min_threads);
238     if (max_threads)
239       SetThreadpoolThreadMaximum(m_ptp_pool, max_threads);
240   }
~thread_pool_win()241   ~thread_pool_win()
242   {
243     CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL);
244     CloseThreadpoolCleanupGroup(m_cleanup);
245     CloseThreadpool(m_ptp_pool);
246 
247     // Wait until all threads finished and TLS destructors ran.
248     while(m_thread_count)
249       Sleep(1);
250   }
251   /**
252    PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback()
253   */
task_callback(PTP_CALLBACK_INSTANCE,void * param)254   static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param)
255   {
256     auto entry= (task_cache_entry *) param;
257     auto task= entry->m_task;
258 
259     tls_data.callback_prolog(entry->m_pool);
260 
261     entry->m_pool->m_task_cache.put(entry);
262 
263     task->execute();
264   }
submit_task(task * task)265   virtual void submit_task(task *task) override
266   {
267     auto entry= m_task_cache.get();
268     task->add_ref();
269     entry->m_pool= this;
270     entry->m_task= task;
271     if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env))
272       abort();
273   }
274 
create_native_aio(int max_io)275   aio *create_native_aio(int max_io) override
276   {
277     return new native_aio(*this, max_io);
278   }
279 
create_timer(callback_func func,void * data)280   timer* create_timer(callback_func func, void* data)  override
281   {
282     return new native_timer(*this, func, data);
283   }
284 };
285 
286 thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data;
287 
create_thread_pool_win(int min_threads,int max_threads)288 thread_pool *create_thread_pool_win(int min_threads, int max_threads)
289 {
290   return new thread_pool_win(min_threads, max_threads);
291 }
292 } // namespace tpool
293