1 /* Copyright (C) 2012 Monty Program Ab
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #ifdef _WIN32_WINNT
17 #undef _WIN32_WINNT
18 #endif
19 
20 #define _WIN32_WINNT 0x0601
21 
22 #include "mariadb.h"
23 #include <violite.h>
24 #include <sql_priv.h>
25 #include <sql_class.h>
26 #include <my_pthread.h>
27 #include <scheduler.h>
28 #include <sql_connect.h>
29 #include <mysqld.h>
30 #include <debug_sync.h>
31 #include <threadpool.h>
32 #include <windows.h>
33 
34 
35 
36 /*
37  WEAK_SYMBOL(return_type, function_name, argument_type1,..,argument_typeN)
38 
39  Declare and load function pointer from kernel32. The name of the static
40  variable that holds the function pointer is my_<original function name>
41  This should be combined with
42  #define <original function name> my_<original function name>
43  so that one could use Widows APIs transparently, without worrying whether
44  they are present in a particular version or not.
45 
46  Of course, prior to use of any function there should be a check for correct
47  Windows version, or check whether function pointer is not NULL.
48 */
49 #define WEAK_SYMBOL(return_type, function, ...) \
50   typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \
51   static pFN_##function my_##function = (pFN_##function) \
52     (GetProcAddress(GetModuleHandle("kernel32"),#function))
53 
54 
55 WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL,
56   PTP_POOL_STACK_INFORMATION);
57 #define SetThreadpoolStackInformation my_SetThreadpoolStackInformation
58 
59 /* Log a warning */
tp_log_warning(const char * msg,const char * fct)60 static void tp_log_warning(const char *msg, const char *fct)
61 {
62   sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct,
63     GetLastError());
64 }
65 
66 
67 static PTP_POOL pool;
68 static TP_CALLBACK_ENVIRON callback_environ;
69 static DWORD fls;
70 
71 static bool skip_completion_port_on_success = false;
72 
get_threadpool_win_callback_environ()73 PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
74 {
75   return pool? &callback_environ: 0;
76 }
77 
78 /*
79   Threadpool callbacks.
80 
81   io_completion_callback  - handle client request
82   timer_callback - handle wait timeout (kill connection)
83   login_callback - user login (submitted as threadpool work)
84 
85 */
86 
87 static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
88   PVOID context, PTP_TIMER timer);
89 
90 static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
91   PVOID context,  PVOID overlapped,  ULONG io_result, ULONG_PTR nbytes, PTP_IO io);
92 
93 
94 static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work);
95 
96 static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance);
97 
98 /* Get current time as Windows time */
now()99 static ulonglong now()
100 {
101   ulonglong current_time;
102   GetSystemTimeAsFileTime((PFILETIME)&current_time);
103   return current_time;
104 }
105 
106 struct TP_connection_win:public TP_connection
107 {
108 public:
109   TP_connection_win(CONNECT*);
110   ~TP_connection_win();
111   virtual int init();
112   virtual int start_io();
113   virtual void set_io_timeout(int sec);
114   virtual void wait_begin(int type);
115   virtual void wait_end();
116 
117   ulonglong timeout;
118   enum_vio_type vio_type;
119   HANDLE handle;
120   OVERLAPPED overlapped;
121   PTP_CALLBACK_INSTANCE callback_instance;
122   PTP_IO  io;
123   PTP_TIMER timer;
124   PTP_WORK  work;
125   bool long_callback;
126 
127 };
128 
new_TP_connection(CONNECT * connect)129 struct TP_connection *new_TP_connection(CONNECT *connect)
130 {
131   TP_connection *c = new (std::nothrow) TP_connection_win(connect);
132   if (!c || c->init())
133   {
134     delete c;
135     return 0;
136   }
137   return c;
138 }
139 
add(TP_connection * c)140 void TP_pool_win::add(TP_connection *c)
141 {
142   if(FlsGetValue(fls))
143   {
144     /* Inside threadpool(), execute callback directly. */
145     tp_callback(c);
146   }
147   else
148   {
149     SubmitThreadpoolWork(((TP_connection_win *)c)->work);
150   }
151 }
152 
153 
TP_connection_win(CONNECT * c)154 TP_connection_win::TP_connection_win(CONNECT *c) :
155   TP_connection(c),
156   timeout(ULONGLONG_MAX),
157   callback_instance(0),
158   io(0),
159   timer(0),
160   work(0)
161 {
162 }
163 
164 #define CHECK_ALLOC_ERROR(op) if (!(op))  {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; }
165 
init()166 int TP_connection_win::init()
167 {
168 
169   memset(&overlapped, 0, sizeof(OVERLAPPED));
170   Vio *vio = connect->vio;
171   switch ((vio_type =  vio->type))
172   {
173   case VIO_TYPE_SSL:
174   case VIO_TYPE_TCPIP:
175     handle= (HANDLE)mysql_socket_getfd(vio->mysql_socket);
176     break;
177   case VIO_TYPE_NAMEDPIPE:
178     handle= (HANDLE)vio->hPipe;
179     break;
180   default:
181     abort();
182   }
183 
184 
185   /* Performance tweaks (s. MSDN documentation)*/
186   UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE;
187   if (skip_completion_port_on_success)
188   {
189     flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
190   }
191   (void)SetFileCompletionNotificationModes(handle, flags);
192   /* Assign io completion callback */
193   CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
194   CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this,  &callback_environ));
195   CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
196   return 0;
197 }
198 
199 
200 /*
201   Start asynchronous read
202 */
start_io()203 int TP_connection_win::start_io()
204 {
205   DWORD num_bytes = 0;
206   static char c;
207   WSABUF buf;
208   buf.buf= &c;
209   buf.len= 0;
210   DWORD flags=0;
211   DWORD last_error= 0;
212 
213   int retval;
214   StartThreadpoolIo(io);
215 
216   if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL)
217   {
218     /* Start async io (sockets). */
219     if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags,
220           &overlapped,  NULL) == 0)
221     {
222        retval= last_error= 0;
223     }
224     else
225     {
226       retval= -1;
227       last_error=  WSAGetLastError();
228     }
229   }
230   else
231   {
232     /* Start async io (named pipe) */
233     if (ReadFile(handle, &c, 0, &num_bytes,&overlapped))
234     {
235       retval= last_error= 0;
236     }
237     else
238     {
239       retval= -1;
240       last_error= GetLastError();
241     }
242   }
243 
244   if (retval == 0 || last_error == ERROR_MORE_DATA)
245   {
246     /*
247       IO successfully finished (synchronously).
248       If skip_completion_port_on_success is set, we need to handle it right
249       here, because completion callback would not be executed by the pool.
250     */
251     if (skip_completion_port_on_success)
252     {
253       CancelThreadpoolIo(io);
254       io_completion_callback(callback_instance, this, &overlapped, last_error,
255         num_bytes, io);
256     }
257     return 0;
258   }
259 
260   if (last_error == ERROR_IO_PENDING)
261   {
262     return 0;
263   }
264 
265   /* Some error occurred */
266   CancelThreadpoolIo(io);
267   return -1;
268 }
269 
270 /*
271   Recalculate wait timeout, maybe reset timer.
272 */
set_io_timeout(int timeout_sec)273 void TP_connection_win::set_io_timeout(int timeout_sec)
274 {
275   ulonglong old_timeout= timeout;
276   ulonglong new_timeout = now() + 10000000LL * timeout_sec;
277 
278   if (new_timeout < old_timeout)
279   {
280     SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000);
281   }
282   /*  new_timeout > old_timeout case is handled by expiring timer. */
283   timeout = new_timeout;
284 }
285 
286 
~TP_connection_win()287 TP_connection_win::~TP_connection_win()
288 {
289   if (io)
290     CloseThreadpoolIo(io);
291 
292   if (work)
293     CloseThreadpoolWork(work);
294 
295   if (timer)
296   {
297     SetThreadpoolTimer(timer, 0, 0, 0);
298     WaitForThreadpoolTimerCallbacks(timer, TRUE);
299     CloseThreadpoolTimer(timer);
300   }
301 }
302 
wait_begin(int type)303 void TP_connection_win::wait_begin(int type)
304 {
305   /*
306     Signal to the threadpool whenever callback can run long. Currently, binlog
307     waits are a good candidate, its waits are really long
308   */
309   if (type == THD_WAIT_BINLOG)
310   {
311     if (!long_callback && callback_instance)
312     {
313       CallbackMayRunLong(callback_instance);
314       long_callback= true;
315     }
316   }
317 }
318 
wait_end()319 void TP_connection_win::wait_end()
320 {
321   /* Do we need to do anything ? */
322 }
323 
324 /*
325   This function should be called first whenever a callback is invoked in the
326   threadpool, does my_thread_init() if not yet done
327 */
tp_win_callback_prolog()328 void tp_win_callback_prolog()
329 {
330   if (FlsGetValue(fls) == NULL)
331   {
332     /* Running in new  worker thread*/
333     FlsSetValue(fls, (void *)1);
334     statistic_increment(thread_created, &LOCK_status);
335     tp_stats.num_worker_threads++;
336     my_thread_init();
337   }
338 }
339 
340 extern ulong thread_created;
pre_callback(PVOID context,PTP_CALLBACK_INSTANCE instance)341 static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance)
342 {
343   tp_win_callback_prolog();
344   TP_connection_win *c = (TP_connection_win *)context;
345   c->callback_instance = instance;
346   c->long_callback = false;
347 }
348 
349 
350 /*
351   Decrement number of threads when a thread exits .
352   On Windows, FlsAlloc() provides the thread destruction callbacks.
353 */
thread_destructor(void * data)354 static VOID WINAPI thread_destructor(void *data)
355 {
356   if(data)
357   {
358     tp_stats.num_worker_threads--;
359     my_thread_end();
360   }
361 }
362 
363 
364 
tp_callback(PTP_CALLBACK_INSTANCE instance,PVOID context)365 static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context)
366 {
367   pre_callback(context, instance);
368   tp_callback((TP_connection *)context);
369 }
370 
371 
372 /*
373   Handle read completion/notification.
374 */
io_completion_callback(PTP_CALLBACK_INSTANCE instance,PVOID context,PVOID overlapped,ULONG io_result,ULONG_PTR nbytes,PTP_IO io)375 static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
376   PVOID context,  PVOID overlapped,  ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
377 {
378   TP_connection_win *c= (TP_connection_win *)context;
379   /*
380     Execute high priority connections immediately.
381     'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)
382     which makes Windows threadpool place the items at the end of its internal work queue.
383   */
384   if (c->priority == TP_PRIORITY_HIGH)
385     tp_callback(instance, context);
386   else
387     SubmitThreadpoolWork(c->work);
388 }
389 
390 
391 /*
392   Timer callback.
393   Invoked when connection times out (wait_timeout)
394 */
timer_callback(PTP_CALLBACK_INSTANCE instance,PVOID parameter,PTP_TIMER timer)395 static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
396   PVOID parameter, PTP_TIMER timer)
397 {
398   TP_connection_win *c = (TP_connection_win *)parameter;
399   if (c->timeout <= now())
400   {
401     tp_timeout_handler(c);
402   }
403   else
404   {
405     /*
406       Reset timer.
407       There is a tiny possibility of a race condition, since the value of timeout
408       could have changed to smaller value in the thread doing io callback.
409 
410       Given the relative unimportance of the wait timeout, we accept race
411       condition.
412       */
413     SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000);
414   }
415 }
416 
work_callback(PTP_CALLBACK_INSTANCE instance,PVOID context,PTP_WORK work)417 static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work)
418 {
419   tp_callback(instance, context);
420 }
421 
TP_pool_win()422 TP_pool_win::TP_pool_win()
423 {}
424 
init()425 int TP_pool_win::init()
426 {
427   fls= FlsAlloc(thread_destructor);
428   pool= CreateThreadpool(NULL);
429 
430   if (!pool)
431   {
432     sql_print_error("Can't create threadpool. "
433       "CreateThreadpool() failed with %d. Likely cause is memory pressure",
434       GetLastError());
435     return -1;
436   }
437 
438   InitializeThreadpoolEnvironment(&callback_environ);
439   SetThreadpoolCallbackPool(&callback_environ, pool);
440 
441   if (threadpool_max_threads)
442   {
443     SetThreadpoolThreadMaximum(pool, threadpool_max_threads);
444   }
445 
446   if (threadpool_min_threads)
447   {
448     if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads))
449     {
450       tp_log_warning("Can't set threadpool minimum threads",
451         "SetThreadpoolThreadMinimum");
452     }
453   }
454 
455   /*
456     Control stack size (OS must be Win7 or later)
457   */
458   if (SetThreadpoolStackInformation)
459   {
460     TP_POOL_STACK_INFORMATION stackinfo;
461     stackinfo.StackCommit = 0;
462     stackinfo.StackReserve = (SIZE_T)my_thread_stack_size;
463     if (!SetThreadpoolStackInformation(pool, &stackinfo))
464     {
465       tp_log_warning("Can't set threadpool stack size",
466         "SetThreadpoolStackInformation");
467     }
468   }
469   return 0;
470 }
471 
472 
473 /**
474   Scheduler callback : Destroy the scheduler.
475 */
~TP_pool_win()476 TP_pool_win::~TP_pool_win()
477 {
478   if (!pool)
479     return;
480   DestroyThreadpoolEnvironment(&callback_environ);
481   SetThreadpoolThreadMaximum(pool, 0);
482   CloseThreadpool(pool);
483   if (!tp_stats.num_worker_threads)
484     FlsFree(fls);
485 }
486 /**
487   Sets the number of idle threads the thread pool maintains in anticipation of new
488   requests.
489 */
set_min_threads(uint val)490 int TP_pool_win::set_min_threads(uint val)
491 {
492   SetThreadpoolThreadMinimum(pool, val);
493   return 0;
494 }
495 
set_max_threads(uint val)496 int TP_pool_win::set_max_threads(uint val)
497 {
498   SetThreadpoolThreadMaximum(pool, val);
499   return 0;
500 }
501 
502 
new_connection(CONNECT * connect)503 TP_connection *TP_pool_win::new_connection(CONNECT *connect)
504 {
505   TP_connection *c= new (std::nothrow) TP_connection_win(connect);
506   if (!c )
507     return 0;
508   if (c->init())
509   {
510     delete c;
511     return 0;
512   }
513   return c;
514 }
515