1 /* Copyright (C) 2012 Monty Program Ab
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 #ifdef _WIN32_WINNT
17 #undef _WIN32_WINNT
18 #endif
19
20 #define _WIN32_WINNT 0x0601
21
22 #include "mariadb.h"
23 #include <violite.h>
24 #include <sql_priv.h>
25 #include <sql_class.h>
26 #include <my_pthread.h>
27 #include <scheduler.h>
28 #include <sql_connect.h>
29 #include <mysqld.h>
30 #include <debug_sync.h>
31 #include <threadpool.h>
32 #include <windows.h>
33
34
35
36 /*
37 WEAK_SYMBOL(return_type, function_name, argument_type1,..,argument_typeN)
38
39 Declare and load function pointer from kernel32. The name of the static
40 variable that holds the function pointer is my_<original function name>
41 This should be combined with
42 #define <original function name> my_<original function name>
43 so that one could use Widows APIs transparently, without worrying whether
44 they are present in a particular version or not.
45
46 Of course, prior to use of any function there should be a check for correct
47 Windows version, or check whether function pointer is not NULL.
48 */
49 #define WEAK_SYMBOL(return_type, function, ...) \
50 typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \
51 static pFN_##function my_##function = (pFN_##function) \
52 (GetProcAddress(GetModuleHandle("kernel32"),#function))
53
54
55 WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL,
56 PTP_POOL_STACK_INFORMATION);
57 #define SetThreadpoolStackInformation my_SetThreadpoolStackInformation
58
59 /* Log a warning */
tp_log_warning(const char * msg,const char * fct)60 static void tp_log_warning(const char *msg, const char *fct)
61 {
62 sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct,
63 GetLastError());
64 }
65
66
67 static PTP_POOL pool;
68 static TP_CALLBACK_ENVIRON callback_environ;
69 static DWORD fls;
70
71 static bool skip_completion_port_on_success = false;
72
get_threadpool_win_callback_environ()73 PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
74 {
75 return pool? &callback_environ: 0;
76 }
77
78 /*
79 Threadpool callbacks.
80
81 io_completion_callback - handle client request
82 timer_callback - handle wait timeout (kill connection)
83 login_callback - user login (submitted as threadpool work)
84
85 */
86
87 static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
88 PVOID context, PTP_TIMER timer);
89
90 static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
91 PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io);
92
93
94 static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work);
95
96 static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance);
97
98 /* Get current time as Windows time */
now()99 static ulonglong now()
100 {
101 ulonglong current_time;
102 GetSystemTimeAsFileTime((PFILETIME)¤t_time);
103 return current_time;
104 }
105
106 struct TP_connection_win:public TP_connection
107 {
108 public:
109 TP_connection_win(CONNECT*);
110 ~TP_connection_win();
111 virtual int init();
112 virtual int start_io();
113 virtual void set_io_timeout(int sec);
114 virtual void wait_begin(int type);
115 virtual void wait_end();
116
117 ulonglong timeout;
118 enum_vio_type vio_type;
119 HANDLE handle;
120 OVERLAPPED overlapped;
121 PTP_CALLBACK_INSTANCE callback_instance;
122 PTP_IO io;
123 PTP_TIMER timer;
124 PTP_WORK work;
125 bool long_callback;
126
127 };
128
new_TP_connection(CONNECT * connect)129 struct TP_connection *new_TP_connection(CONNECT *connect)
130 {
131 TP_connection *c = new (std::nothrow) TP_connection_win(connect);
132 if (!c || c->init())
133 {
134 delete c;
135 return 0;
136 }
137 return c;
138 }
139
add(TP_connection * c)140 void TP_pool_win::add(TP_connection *c)
141 {
142 if(FlsGetValue(fls))
143 {
144 /* Inside threadpool(), execute callback directly. */
145 tp_callback(c);
146 }
147 else
148 {
149 SubmitThreadpoolWork(((TP_connection_win *)c)->work);
150 }
151 }
152
153
TP_connection_win(CONNECT * c)154 TP_connection_win::TP_connection_win(CONNECT *c) :
155 TP_connection(c),
156 timeout(ULONGLONG_MAX),
157 callback_instance(0),
158 io(0),
159 timer(0),
160 work(0)
161 {
162 }
163
164 #define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; }
165
init()166 int TP_connection_win::init()
167 {
168
169 memset(&overlapped, 0, sizeof(OVERLAPPED));
170 Vio *vio = connect->vio;
171 switch ((vio_type = vio->type))
172 {
173 case VIO_TYPE_SSL:
174 case VIO_TYPE_TCPIP:
175 handle= (HANDLE)mysql_socket_getfd(vio->mysql_socket);
176 break;
177 case VIO_TYPE_NAMEDPIPE:
178 handle= (HANDLE)vio->hPipe;
179 break;
180 default:
181 abort();
182 }
183
184
185 /* Performance tweaks (s. MSDN documentation)*/
186 UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE;
187 if (skip_completion_port_on_success)
188 {
189 flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
190 }
191 (void)SetFileCompletionNotificationModes(handle, flags);
192 /* Assign io completion callback */
193 CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
194 CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
195 CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
196 return 0;
197 }
198
199
200 /*
201 Start asynchronous read
202 */
start_io()203 int TP_connection_win::start_io()
204 {
205 DWORD num_bytes = 0;
206 static char c;
207 WSABUF buf;
208 buf.buf= &c;
209 buf.len= 0;
210 DWORD flags=0;
211 DWORD last_error= 0;
212
213 int retval;
214 StartThreadpoolIo(io);
215
216 if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL)
217 {
218 /* Start async io (sockets). */
219 if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags,
220 &overlapped, NULL) == 0)
221 {
222 retval= last_error= 0;
223 }
224 else
225 {
226 retval= -1;
227 last_error= WSAGetLastError();
228 }
229 }
230 else
231 {
232 /* Start async io (named pipe) */
233 if (ReadFile(handle, &c, 0, &num_bytes,&overlapped))
234 {
235 retval= last_error= 0;
236 }
237 else
238 {
239 retval= -1;
240 last_error= GetLastError();
241 }
242 }
243
244 if (retval == 0 || last_error == ERROR_MORE_DATA)
245 {
246 /*
247 IO successfully finished (synchronously).
248 If skip_completion_port_on_success is set, we need to handle it right
249 here, because completion callback would not be executed by the pool.
250 */
251 if (skip_completion_port_on_success)
252 {
253 CancelThreadpoolIo(io);
254 io_completion_callback(callback_instance, this, &overlapped, last_error,
255 num_bytes, io);
256 }
257 return 0;
258 }
259
260 if (last_error == ERROR_IO_PENDING)
261 {
262 return 0;
263 }
264
265 /* Some error occurred */
266 CancelThreadpoolIo(io);
267 return -1;
268 }
269
270 /*
271 Recalculate wait timeout, maybe reset timer.
272 */
set_io_timeout(int timeout_sec)273 void TP_connection_win::set_io_timeout(int timeout_sec)
274 {
275 ulonglong old_timeout= timeout;
276 ulonglong new_timeout = now() + 10000000LL * timeout_sec;
277
278 if (new_timeout < old_timeout)
279 {
280 SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000);
281 }
282 /* new_timeout > old_timeout case is handled by expiring timer. */
283 timeout = new_timeout;
284 }
285
286
~TP_connection_win()287 TP_connection_win::~TP_connection_win()
288 {
289 if (io)
290 CloseThreadpoolIo(io);
291
292 if (work)
293 CloseThreadpoolWork(work);
294
295 if (timer)
296 {
297 SetThreadpoolTimer(timer, 0, 0, 0);
298 WaitForThreadpoolTimerCallbacks(timer, TRUE);
299 CloseThreadpoolTimer(timer);
300 }
301 }
302
wait_begin(int type)303 void TP_connection_win::wait_begin(int type)
304 {
305 /*
306 Signal to the threadpool whenever callback can run long. Currently, binlog
307 waits are a good candidate, its waits are really long
308 */
309 if (type == THD_WAIT_BINLOG)
310 {
311 if (!long_callback && callback_instance)
312 {
313 CallbackMayRunLong(callback_instance);
314 long_callback= true;
315 }
316 }
317 }
318
wait_end()319 void TP_connection_win::wait_end()
320 {
321 /* Do we need to do anything ? */
322 }
323
324 /*
325 This function should be called first whenever a callback is invoked in the
326 threadpool, does my_thread_init() if not yet done
327 */
tp_win_callback_prolog()328 void tp_win_callback_prolog()
329 {
330 if (FlsGetValue(fls) == NULL)
331 {
332 /* Running in new worker thread*/
333 FlsSetValue(fls, (void *)1);
334 statistic_increment(thread_created, &LOCK_status);
335 tp_stats.num_worker_threads++;
336 my_thread_init();
337 }
338 }
339
340 extern ulong thread_created;
pre_callback(PVOID context,PTP_CALLBACK_INSTANCE instance)341 static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance)
342 {
343 tp_win_callback_prolog();
344 TP_connection_win *c = (TP_connection_win *)context;
345 c->callback_instance = instance;
346 c->long_callback = false;
347 }
348
349
350 /*
351 Decrement number of threads when a thread exits .
352 On Windows, FlsAlloc() provides the thread destruction callbacks.
353 */
thread_destructor(void * data)354 static VOID WINAPI thread_destructor(void *data)
355 {
356 if(data)
357 {
358 tp_stats.num_worker_threads--;
359 my_thread_end();
360 }
361 }
362
363
364
tp_callback(PTP_CALLBACK_INSTANCE instance,PVOID context)365 static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context)
366 {
367 pre_callback(context, instance);
368 tp_callback((TP_connection *)context);
369 }
370
371
372 /*
373 Handle read completion/notification.
374 */
io_completion_callback(PTP_CALLBACK_INSTANCE instance,PVOID context,PVOID overlapped,ULONG io_result,ULONG_PTR nbytes,PTP_IO io)375 static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
376 PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
377 {
378 TP_connection_win *c= (TP_connection_win *)context;
379 /*
380 Execute high priority connections immediately.
381 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)
382 which makes Windows threadpool place the items at the end of its internal work queue.
383 */
384 if (c->priority == TP_PRIORITY_HIGH)
385 tp_callback(instance, context);
386 else
387 SubmitThreadpoolWork(c->work);
388 }
389
390
391 /*
392 Timer callback.
393 Invoked when connection times out (wait_timeout)
394 */
timer_callback(PTP_CALLBACK_INSTANCE instance,PVOID parameter,PTP_TIMER timer)395 static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
396 PVOID parameter, PTP_TIMER timer)
397 {
398 TP_connection_win *c = (TP_connection_win *)parameter;
399 if (c->timeout <= now())
400 {
401 tp_timeout_handler(c);
402 }
403 else
404 {
405 /*
406 Reset timer.
407 There is a tiny possibility of a race condition, since the value of timeout
408 could have changed to smaller value in the thread doing io callback.
409
410 Given the relative unimportance of the wait timeout, we accept race
411 condition.
412 */
413 SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000);
414 }
415 }
416
work_callback(PTP_CALLBACK_INSTANCE instance,PVOID context,PTP_WORK work)417 static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work)
418 {
419 tp_callback(instance, context);
420 }
421
TP_pool_win()422 TP_pool_win::TP_pool_win()
423 {}
424
init()425 int TP_pool_win::init()
426 {
427 fls= FlsAlloc(thread_destructor);
428 pool= CreateThreadpool(NULL);
429
430 if (!pool)
431 {
432 sql_print_error("Can't create threadpool. "
433 "CreateThreadpool() failed with %d. Likely cause is memory pressure",
434 GetLastError());
435 return -1;
436 }
437
438 InitializeThreadpoolEnvironment(&callback_environ);
439 SetThreadpoolCallbackPool(&callback_environ, pool);
440
441 if (threadpool_max_threads)
442 {
443 SetThreadpoolThreadMaximum(pool, threadpool_max_threads);
444 }
445
446 if (threadpool_min_threads)
447 {
448 if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads))
449 {
450 tp_log_warning("Can't set threadpool minimum threads",
451 "SetThreadpoolThreadMinimum");
452 }
453 }
454
455 /*
456 Control stack size (OS must be Win7 or later)
457 */
458 if (SetThreadpoolStackInformation)
459 {
460 TP_POOL_STACK_INFORMATION stackinfo;
461 stackinfo.StackCommit = 0;
462 stackinfo.StackReserve = (SIZE_T)my_thread_stack_size;
463 if (!SetThreadpoolStackInformation(pool, &stackinfo))
464 {
465 tp_log_warning("Can't set threadpool stack size",
466 "SetThreadpoolStackInformation");
467 }
468 }
469 return 0;
470 }
471
472
473 /**
474 Scheduler callback : Destroy the scheduler.
475 */
~TP_pool_win()476 TP_pool_win::~TP_pool_win()
477 {
478 if (!pool)
479 return;
480 DestroyThreadpoolEnvironment(&callback_environ);
481 SetThreadpoolThreadMaximum(pool, 0);
482 CloseThreadpool(pool);
483 if (!tp_stats.num_worker_threads)
484 FlsFree(fls);
485 }
486 /**
487 Sets the number of idle threads the thread pool maintains in anticipation of new
488 requests.
489 */
set_min_threads(uint val)490 int TP_pool_win::set_min_threads(uint val)
491 {
492 SetThreadpoolThreadMinimum(pool, val);
493 return 0;
494 }
495
set_max_threads(uint val)496 int TP_pool_win::set_max_threads(uint val)
497 {
498 SetThreadpoolThreadMaximum(pool, val);
499 return 0;
500 }
501
502
new_connection(CONNECT * connect)503 TP_connection *TP_pool_win::new_connection(CONNECT *connect)
504 {
505 TP_connection *c= new (std::nothrow) TP_connection_win(connect);
506 if (!c )
507 return 0;
508 if (c->init())
509 {
510 delete c;
511 return 0;
512 }
513 return c;
514 }
515