xref: /reactos/sdk/lib/rtl/threadpool.c (revision 0bf42067)
1 /*
2  * Thread pooling
3  *
4  * Copyright (c) 2006 Robert Shearman
5  * Copyright (c) 2014-2016 Sebastian Lackner
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20  */
21 
22 
23 #ifdef __REACTOS__
24 #include <rtl_vista.h>
25 #define NDEBUG
26 #include "wine/list.h"
27 #include <debug.h>
28 
29 #define ERR(fmt, ...)    DPRINT1(fmt, ##__VA_ARGS__)
30 #define FIXME(fmt, ...)  DPRINT(fmt, ##__VA_ARGS__)
31 #define WARN(fmt, ...)   DPRINT(fmt, ##__VA_ARGS__)
32 #define TRACE(fmt, ...)  DPRINT(fmt, ##__VA_ARGS__)
33 #ifndef ARRAY_SIZE
34 #define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0]))
35 #endif
36 
37 typedef struct _THREAD_NAME_INFORMATION
38 {
39     UNICODE_STRING ThreadName;
40 } THREAD_NAME_INFORMATION, *PTHREAD_NAME_INFORMATION;
41 
42 typedef void (CALLBACK *PNTAPCFUNC)(ULONG_PTR,ULONG_PTR,ULONG_PTR);
43 typedef void (CALLBACK *PRTL_THREAD_START_ROUTINE)(LPVOID);
44 typedef DWORD (CALLBACK *PRTL_WORK_ITEM_ROUTINE)(LPVOID);
45 typedef void (NTAPI *RTL_WAITORTIMERCALLBACKFUNC)(PVOID,BOOLEAN);
46 typedef VOID (CALLBACK *PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD,DWORD,LPVOID);
47 
48 typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO);
49 NTSYSAPI NTSTATUS  WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
50 #define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC
51 
52 #define CRITICAL_SECTION RTL_CRITICAL_SECTION
53 #define GetProcessHeap() RtlGetProcessHeap()
54 #define GetCurrentProcess() NtCurrentProcess()
55 #define GetCurrentThread() NtCurrentThread()
56 #define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread)
57 #else
58 #include <assert.h>
59 #include <stdarg.h>
60 #include <limits.h>
61 
62 #include "ntstatus.h"
63 #define WIN32_NO_STATUS
64 #include "winternl.h"
65 
66 #include "wine/debug.h"
67 #include "wine/list.h"
68 
69 #include "ntdll_misc.h"
70 
71 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
72 #endif
73 
74 /*
75  * Old thread pooling API
76  */
77 
78 struct rtl_work_item
79 {
80     PRTL_WORK_ITEM_ROUTINE function;
81     PVOID context;
82 };
83 
84 #define EXPIRE_NEVER       (~(ULONGLONG)0)
85 #define TIMER_QUEUE_MAGIC  0x516d6954   /* TimQ */
86 
87 #ifndef __REACTOS__
88 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
89 #endif
90 
91 static struct
92 {
93     HANDLE                  compl_port;
94     RTL_CRITICAL_SECTION    threadpool_compl_cs;
95 }
96 old_threadpool =
97 {
98     NULL,                                       /* compl_port */
99 #ifdef __REACTOS__
100     {0},                                        /* threadpool_compl_cs */
101 #else
102     { &critsect_compl_debug, -1, 0, 0, 0, 0 },  /* threadpool_compl_cs */
103 #endif
104 };
105 
106 #ifndef __REACTOS__
107 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
108 {
109     0, 0, &old_threadpool.threadpool_compl_cs,
110     { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
111       0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
112 };
113 #endif
114 
115 struct timer_queue;
116 struct queue_timer
117 {
118     struct timer_queue *q;
119     struct list entry;
120     ULONG runcount;             /* number of callbacks pending execution */
121     RTL_WAITORTIMERCALLBACKFUNC callback;
122     PVOID param;
123     DWORD period;
124     ULONG flags;
125     ULONGLONG expire;
126     BOOL destroy;               /* timer should be deleted; once set, never unset */
127     HANDLE event;               /* removal event */
128 };
129 
130 struct timer_queue
131 {
132     DWORD magic;
133     RTL_CRITICAL_SECTION cs;
134     struct list timers;         /* sorted by expiration time */
135     BOOL quit;                  /* queue should be deleted; once set, never unset */
136     HANDLE event;
137     HANDLE thread;
138 };
139 
140 /*
141  * Object-oriented thread pooling API
142  */
143 
144 #define THREADPOOL_WORKER_TIMEOUT 5000
145 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
146 
147 /* internal threadpool representation */
148 struct threadpool
149 {
150     LONG                    refcount;
151     LONG                    objcount;
152     BOOL                    shutdown;
153     CRITICAL_SECTION        cs;
154     /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
155     struct list             pools[3];
156     RTL_CONDITION_VARIABLE  update_event;
157     /* information about worker threads, locked via .cs */
158     int                     max_workers;
159     int                     min_workers;
160     int                     num_workers;
161     int                     num_busy_workers;
162     HANDLE                  compl_port;
163     TP_POOL_STACK_INFORMATION stack_info;
164 };
165 
166 enum threadpool_objtype
167 {
168     TP_OBJECT_TYPE_SIMPLE,
169     TP_OBJECT_TYPE_WORK,
170     TP_OBJECT_TYPE_TIMER,
171     TP_OBJECT_TYPE_WAIT,
172     TP_OBJECT_TYPE_IO,
173 };
174 
175 struct io_completion
176 {
177     IO_STATUS_BLOCK iosb;
178     ULONG_PTR cvalue;
179 };
180 
181 /* internal threadpool object representation */
182 struct threadpool_object
183 {
184     void                   *win32_callback; /* leave space for kernelbase to store win32 callback */
185     LONG                    refcount;
186     BOOL                    shutdown;
187     /* read-only information */
188     enum threadpool_objtype type;
189     struct threadpool       *pool;
190     struct threadpool_group *group;
191     PVOID                   userdata;
192     PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
193     PTP_SIMPLE_CALLBACK     finalization_callback;
194     BOOL                    may_run_long;
195     HMODULE                 race_dll;
196     TP_CALLBACK_PRIORITY    priority;
197     /* information about the group, locked via .group->cs */
198     struct list             group_entry;
199     BOOL                    is_group_member;
200     /* information about the pool, locked via .pool->cs */
201     struct list             pool_entry;
202     RTL_CONDITION_VARIABLE  finished_event;
203     RTL_CONDITION_VARIABLE  group_finished_event;
204     HANDLE                  completed_event;
205     LONG                    num_pending_callbacks;
206     LONG                    num_running_callbacks;
207     LONG                    num_associated_callbacks;
208     /* arguments for callback */
209     union
210     {
211         struct
212         {
213             PTP_SIMPLE_CALLBACK callback;
214         } simple;
215         struct
216         {
217             PTP_WORK_CALLBACK callback;
218         } work;
219         struct
220         {
221             PTP_TIMER_CALLBACK callback;
222             /* information about the timer, locked via timerqueue.cs */
223             BOOL            timer_initialized;
224             BOOL            timer_pending;
225             struct list     timer_entry;
226             BOOL            timer_set;
227             ULONGLONG       timeout;
228             LONG            period;
229             LONG            window_length;
230         } timer;
231         struct
232         {
233             PTP_WAIT_CALLBACK callback;
234             LONG            signaled;
235             /* information about the wait object, locked via waitqueue.cs */
236             struct waitqueue_bucket *bucket;
237             BOOL            wait_pending;
238             struct list     wait_entry;
239             ULONGLONG       timeout;
240             HANDLE          handle;
241             DWORD           flags;
242             RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
243         } wait;
244         struct
245         {
246             PTP_IO_CALLBACK callback;
247             /* locked via .pool->cs */
248             unsigned int    pending_count, skipped_count, completion_count, completion_max;
249             BOOL            shutting_down;
250             struct io_completion *completions;
251         } io;
252     } u;
253 };
254 
255 /* internal threadpool instance representation */
256 struct threadpool_instance
257 {
258     struct threadpool_object *object;
259     DWORD                   threadid;
260     BOOL                    associated;
261     BOOL                    may_run_long;
262     struct
263     {
264         CRITICAL_SECTION    *critical_section;
265         HANDLE              mutex;
266         HANDLE              semaphore;
267         LONG                semaphore_count;
268         HANDLE              event;
269         HMODULE             library;
270     } cleanup;
271 };
272 
273 /* internal threadpool group representation */
274 struct threadpool_group
275 {
276     LONG                    refcount;
277     BOOL                    shutdown;
278     CRITICAL_SECTION        cs;
279     /* list of group members, locked via .cs */
280     struct list             members;
281 };
282 
283 #ifndef __REACTOS__
284 /* global timerqueue object */
285 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
286 #endif
287 
288 static struct
289 {
290     CRITICAL_SECTION        cs;
291     LONG                    objcount;
292     BOOL                    thread_running;
293     struct list             pending_timers;
294     RTL_CONDITION_VARIABLE  update_event;
295 }
296 timerqueue =
297 {
298 #ifdef __REACTOS__
299     {0},                                        /* cs */
300 #else
301     { &timerqueue_debug, -1, 0, 0, 0, 0 },      /* cs */
302 #endif
303     0,                                          /* objcount */
304     FALSE,                                      /* thread_running */
305     LIST_INIT( timerqueue.pending_timers ),     /* pending_timers */
306 #if __REACTOS__
307     0,
308 #else
309     RTL_CONDITION_VARIABLE_INIT                 /* update_event */
310 #endif
311 };
312 
313 #ifndef __REACTOS__
314 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
315 {
316     0, 0, &timerqueue.cs,
317     { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
318       0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
319 };
320 
321 /* global waitqueue object */
322 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
323 #endif
324 
325 static struct
326 {
327     CRITICAL_SECTION        cs;
328     LONG                    num_buckets;
329     struct list             buckets;
330 }
331 waitqueue =
332 {
333 #ifdef __REACTOS__
334     {0},       /* cs */
335 #else
336     { &waitqueue_debug, -1, 0, 0, 0, 0 },       /* cs */
337 #endif
338     0,                                          /* num_buckets */
339     LIST_INIT( waitqueue.buckets )              /* buckets */
340 };
341 
342 #ifndef __REACTOS__
343 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
344 {
345     0, 0, &waitqueue.cs,
346     { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
347       0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
348 };
349 #endif
350 
351 struct waitqueue_bucket
352 {
353     struct list             bucket_entry;
354     LONG                    objcount;
355     struct list             reserved;
356     struct list             waiting;
357     HANDLE                  update_event;
358     BOOL                    alertable;
359 };
360 
361 #ifndef __REACTOS__
362 /* global I/O completion queue object */
363 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
364 #endif
365 
366 static struct
367 {
368     CRITICAL_SECTION        cs;
369     LONG                    objcount;
370     BOOL                    thread_running;
371     HANDLE                  port;
372     RTL_CONDITION_VARIABLE  update_event;
373 }
374 ioqueue =
375 {
376 #ifdef __REACTOS__
377     .cs = {0},
378 #else
379     .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
380 #endif
381 };
382 
383 #ifndef __REACTOS__
384 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
385 {
386     0, 0, &ioqueue.cs,
387     { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
388       0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
389 };
390 #endif
391 
impl_from_TP_POOL(TP_POOL * pool)392 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
393 {
394     return (struct threadpool *)pool;
395 }
396 
impl_from_TP_WORK(TP_WORK * work)397 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
398 {
399     struct threadpool_object *object = (struct threadpool_object *)work;
400     assert( object->type == TP_OBJECT_TYPE_WORK );
401     return object;
402 }
403 
impl_from_TP_TIMER(TP_TIMER * timer)404 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
405 {
406     struct threadpool_object *object = (struct threadpool_object *)timer;
407     assert( object->type == TP_OBJECT_TYPE_TIMER );
408     return object;
409 }
410 
impl_from_TP_WAIT(TP_WAIT * wait)411 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
412 {
413     struct threadpool_object *object = (struct threadpool_object *)wait;
414     assert( object->type == TP_OBJECT_TYPE_WAIT );
415     return object;
416 }
417 
impl_from_TP_IO(TP_IO * io)418 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
419 {
420     struct threadpool_object *object = (struct threadpool_object *)io;
421     assert( object->type == TP_OBJECT_TYPE_IO );
422     return object;
423 }
424 
impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP * group)425 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
426 {
427     return (struct threadpool_group *)group;
428 }
429 
impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE * instance)430 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
431 {
432     return (struct threadpool_instance *)instance;
433 }
434 
435 #ifdef __REACTOS__
436 ULONG NTAPI threadpool_worker_proc(PVOID param );
437 #else
438 static void CALLBACK threadpool_worker_proc( void *param );
439 #endif
440 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
441 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
442 static void tp_object_prepare_shutdown( struct threadpool_object *object );
443 static BOOL tp_object_release( struct threadpool_object *object );
444 static struct threadpool *default_threadpool = NULL;
445 
array_reserve(void ** elements,unsigned int * capacity,unsigned int count,unsigned int size)446 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
447 {
448     unsigned int new_capacity, max_capacity;
449     void *new_elements;
450 
451     if (count <= *capacity)
452         return TRUE;
453 
454     max_capacity = ~(SIZE_T)0 / size;
455     if (count > max_capacity)
456         return FALSE;
457 
458     new_capacity = max(4, *capacity);
459     while (new_capacity < count && new_capacity <= max_capacity / 2)
460         new_capacity *= 2;
461     if (new_capacity < count)
462         new_capacity = max_capacity;
463 
464     if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
465         return FALSE;
466 
467     *elements = new_elements;
468     *capacity = new_capacity;
469 
470     return TRUE;
471 }
472 
set_thread_name(const WCHAR * name)473 static void set_thread_name(const WCHAR *name)
474 {
475 #ifndef __REACTOS__ // This is impossible on non vista+
476     THREAD_NAME_INFORMATION info;
477 
478     RtlInitUnicodeString(&info.ThreadName, name);
479     NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
480 #endif
481 }
482 
483 #ifndef __REACTOS__
process_rtl_work_item(TP_CALLBACK_INSTANCE * instance,void * userdata)484 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
485 {
486     struct rtl_work_item *item = userdata;
487 
488     TRACE("executing %p(%p)\n", item->function, item->context);
489     item->function( item->context );
490 
491     RtlFreeHeap( GetProcessHeap(), 0, item );
492 }
493 
494 /***********************************************************************
495  *              RtlQueueWorkItem   (NTDLL.@)
496  *
497  * Queues a work item into a thread in the thread pool.
498  *
499  * PARAMS
500  *  function [I] Work function to execute.
501  *  context  [I] Context to pass to the work function when it is executed.
502  *  flags    [I] Flags. See notes.
503  *
504  * RETURNS
505  *  Success: STATUS_SUCCESS.
506  *  Failure: Any NTSTATUS code.
507  *
508  * NOTES
509  *  Flags can be one or more of the following:
510  *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
511  *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
512  *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
513  *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
514  *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
515  */
RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE function,PVOID context,ULONG flags)516 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
517 {
518     TP_CALLBACK_ENVIRON environment;
519     struct rtl_work_item *item;
520     NTSTATUS status;
521 
522     TRACE( "%p %p %lu\n", function, context, flags );
523 
524     item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
525     if (!item)
526         return STATUS_NO_MEMORY;
527 
528     memset( &environment, 0, sizeof(environment) );
529     environment.Version = 1;
530     environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
531     environment.u.s.Persistent   = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
532 
533     item->function = function;
534     item->context  = context;
535 
536     status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
537     if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
538     return status;
539 }
540 
541 /***********************************************************************
542  * iocp_poller - get completion events and run callbacks
543  */
iocp_poller(LPVOID Arg)544 static DWORD CALLBACK iocp_poller(LPVOID Arg)
545 {
546     HANDLE cport = Arg;
547 
548     while( TRUE )
549     {
550         PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
551         LPVOID overlapped;
552         IO_STATUS_BLOCK iosb;
553 #ifdef __REACTOS__
554         NTSTATUS res = NtRemoveIoCompletion( cport, (PVOID)&callback, (PVOID)&overlapped, &iosb, NULL );
555 #else
556         NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
557 #endif
558         if (res)
559         {
560             ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
561         }
562         else
563         {
564             DWORD transferred = 0;
565             DWORD err = 0;
566 
567             if (iosb.Status == STATUS_SUCCESS)
568                 transferred = iosb.Information;
569             else
570                 err = RtlNtStatusToDosError(iosb.Status);
571 
572             callback( err, transferred, overlapped );
573         }
574     }
575     return 0;
576 }
577 
578 /***********************************************************************
579  *              RtlSetIoCompletionCallback  (NTDLL.@)
580  *
581  * Binds a handle to a thread pool's completion port, and possibly
582  * starts a non-I/O thread to monitor this port and call functions back.
583  *
584  * PARAMS
585  *  FileHandle [I] Handle to bind to a completion port.
586  *  Function   [I] Callback function to call on I/O completions.
587  *  Flags      [I] Not used.
588  *
589  * RETURNS
590  *  Success: STATUS_SUCCESS.
591  *  Failure: Any NTSTATUS code.
592  *
593  */
RtlSetIoCompletionCallback(HANDLE FileHandle,PRTL_OVERLAPPED_COMPLETION_ROUTINE Function,ULONG Flags)594 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
595 {
596     IO_STATUS_BLOCK iosb;
597     FILE_COMPLETION_INFORMATION info;
598 
599     if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
600 
601     if (!old_threadpool.compl_port)
602     {
603         NTSTATUS res = STATUS_SUCCESS;
604 
605         RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
606         if (!old_threadpool.compl_port)
607         {
608             HANDLE cport;
609 
610             res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
611             if (!res)
612             {
613                 /* FIXME native can start additional threads in case of e.g. hung callback function. */
614                 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
615                 if (!res)
616                     old_threadpool.compl_port = cport;
617                 else
618                     NtClose( cport );
619             }
620         }
621         RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
622         if (res) return res;
623     }
624 
625     info.CompletionPort = old_threadpool.compl_port;
626     info.CompletionKey = (ULONG_PTR)Function;
627 
628     return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
629 }
630 #endif
631 
get_nt_timeout(PLARGE_INTEGER pTime,ULONG timeout)632 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
633 {
634     if (timeout == INFINITE) return NULL;
635     pTime->QuadPart = (ULONGLONG)timeout * -10000;
636     return pTime;
637 }
638 
639 /************************** Timer Queue Impl **************************/
640 
queue_remove_timer(struct queue_timer * t)641 static void queue_remove_timer(struct queue_timer *t)
642 {
643     /* We MUST hold the queue cs while calling this function.  This ensures
644        that we cannot queue another callback for this timer.  The runcount
645        being zero makes sure we don't have any already queued.  */
646     struct timer_queue *q = t->q;
647 
648     assert(t->runcount == 0);
649     assert(t->destroy);
650 
651     list_remove(&t->entry);
652     if (t->event)
653         NtSetEvent(t->event, NULL);
654     RtlFreeHeap(GetProcessHeap(), 0, t);
655 
656     if (q->quit && list_empty(&q->timers))
657         NtSetEvent(q->event, NULL);
658 }
659 
timer_cleanup_callback(struct queue_timer * t)660 static void timer_cleanup_callback(struct queue_timer *t)
661 {
662     struct timer_queue *q = t->q;
663     RtlEnterCriticalSection(&q->cs);
664 
665     assert(0 < t->runcount);
666     --t->runcount;
667 
668     if (t->destroy && t->runcount == 0)
669         queue_remove_timer(t);
670 
671     RtlLeaveCriticalSection(&q->cs);
672 }
673 
timer_callback_wrapper(LPVOID p)674 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
675 {
676     struct queue_timer *t = p;
677     t->callback(t->param, TRUE);
678     timer_cleanup_callback(t);
679     return 0;
680 }
681 
queue_current_time(void)682 static inline ULONGLONG queue_current_time(void)
683 {
684     LARGE_INTEGER now, freq;
685     NtQueryPerformanceCounter(&now, &freq);
686     return now.QuadPart * 1000 / freq.QuadPart;
687 }
688 
queue_add_timer(struct queue_timer * t,ULONGLONG time,BOOL set_event)689 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
690                             BOOL set_event)
691 {
692     /* We MUST hold the queue cs while calling this function.  */
693     struct timer_queue *q = t->q;
694     struct list *ptr = &q->timers;
695 
696     assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
697 
698     if (time != EXPIRE_NEVER)
699         LIST_FOR_EACH(ptr, &q->timers)
700         {
701             struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
702             if (time < cur->expire)
703                 break;
704         }
705     list_add_before(ptr, &t->entry);
706 
707     t->expire = time;
708 
709     /* If we insert at the head of the list, we need to expire sooner
710        than expected.  */
711     if (set_event && &t->entry == list_head(&q->timers))
712         NtSetEvent(q->event, NULL);
713 }
714 
queue_move_timer(struct queue_timer * t,ULONGLONG time,BOOL set_event)715 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
716                                     BOOL set_event)
717 {
718     /* We MUST hold the queue cs while calling this function.  */
719     list_remove(&t->entry);
720     queue_add_timer(t, time, set_event);
721 }
722 
queue_timer_expire(struct timer_queue * q)723 static void queue_timer_expire(struct timer_queue *q)
724 {
725     struct queue_timer *t = NULL;
726 
727     RtlEnterCriticalSection(&q->cs);
728     if (list_head(&q->timers))
729     {
730         ULONGLONG now, next;
731         t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
732         if (!t->destroy && t->expire <= ((now = queue_current_time())))
733         {
734             ++t->runcount;
735             if (t->period)
736             {
737                 next = t->expire + t->period;
738                 /* avoid trigger cascade if overloaded / hibernated */
739                 if (next < now)
740                     next = now + t->period;
741             }
742             else
743                 next = EXPIRE_NEVER;
744             queue_move_timer(t, next, FALSE);
745         }
746         else
747             t = NULL;
748     }
749     RtlLeaveCriticalSection(&q->cs);
750 
751     if (t)
752     {
753         if (t->flags & WT_EXECUTEINTIMERTHREAD)
754             timer_callback_wrapper(t);
755         else
756         {
757             ULONG flags
758                 = (t->flags
759                    & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
760                       | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
761             NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
762             if (status != STATUS_SUCCESS)
763                 timer_cleanup_callback(t);
764         }
765     }
766 }
767 
queue_get_timeout(struct timer_queue * q)768 static ULONG queue_get_timeout(struct timer_queue *q)
769 {
770     struct queue_timer *t;
771     ULONG timeout = INFINITE;
772 
773     RtlEnterCriticalSection(&q->cs);
774     if (list_head(&q->timers))
775     {
776         t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
777         assert(!t->destroy || t->expire == EXPIRE_NEVER);
778 
779         if (t->expire != EXPIRE_NEVER)
780         {
781             ULONGLONG time = queue_current_time();
782             timeout = t->expire < time ? 0 : t->expire - time;
783         }
784     }
785     RtlLeaveCriticalSection(&q->cs);
786 
787     return timeout;
788 }
789 
790 #ifdef __REACTOS__
timer_queue_thread_proc(PVOID p)791 ULONG NTAPI timer_queue_thread_proc(PVOID p)
792 #else
793 static void WINAPI timer_queue_thread_proc(LPVOID p)
794 #endif
795 {
796     struct timer_queue *q = p;
797     ULONG timeout_ms;
798 
799     set_thread_name(L"wine_threadpool_timer_queue");
800     timeout_ms = INFINITE;
801     for (;;)
802     {
803         LARGE_INTEGER timeout;
804         NTSTATUS status;
805         BOOL done = FALSE;
806 
807         status = NtWaitForSingleObject(
808             q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
809 
810         if (status == STATUS_WAIT_0)
811         {
812             /* There are two possible ways to trigger the event.  Either
813                we are quitting and the last timer got removed, or a new
814                timer got put at the head of the list so we need to adjust
815                our timeout.  */
816             RtlEnterCriticalSection(&q->cs);
817             if (q->quit && list_empty(&q->timers))
818                 done = TRUE;
819             RtlLeaveCriticalSection(&q->cs);
820         }
821         else if (status == STATUS_TIMEOUT)
822             queue_timer_expire(q);
823 
824         if (done)
825             break;
826 
827         timeout_ms = queue_get_timeout(q);
828     }
829 
830     NtClose(q->event);
831     RtlDeleteCriticalSection(&q->cs);
832     q->magic = 0;
833     RtlFreeHeap(GetProcessHeap(), 0, q);
834     RtlExitUserThread( 0 );
835 #ifdef __REACTOS__
836     return STATUS_SUCCESS;
837 #endif
838 }
839 
queue_destroy_timer(struct queue_timer * t)840 static void queue_destroy_timer(struct queue_timer *t)
841 {
842     /* We MUST hold the queue cs while calling this function.  */
843     t->destroy = TRUE;
844     if (t->runcount == 0)
845         /* Ensure a timer is promptly removed.  If callbacks are pending,
846            it will be removed after the last one finishes by the callback
847            cleanup wrapper.  */
848         queue_remove_timer(t);
849     else
850         /* Make sure no destroyed timer masks an active timer at the head
851            of the sorted list.  */
852         queue_move_timer(t, EXPIRE_NEVER, FALSE);
853 }
854 
855 /***********************************************************************
856  *              RtlCreateTimerQueue   (NTDLL.@)
857  *
858  * Creates a timer queue object and returns a handle to it.
859  *
860  * PARAMS
861  *  NewTimerQueue [O] The newly created queue.
862  *
863  * RETURNS
864  *  Success: STATUS_SUCCESS.
865  *  Failure: Any NTSTATUS code.
866  */
RtlCreateTimerQueue(PHANDLE NewTimerQueue)867 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
868 {
869     NTSTATUS status;
870     struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
871     if (!q)
872         return STATUS_NO_MEMORY;
873 
874     RtlInitializeCriticalSection(&q->cs);
875     list_init(&q->timers);
876     q->quit = FALSE;
877     q->magic = TIMER_QUEUE_MAGIC;
878     status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
879     if (status != STATUS_SUCCESS)
880     {
881         RtlFreeHeap(GetProcessHeap(), 0, q);
882         return status;
883     }
884     status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
885                                  timer_queue_thread_proc, q, &q->thread, NULL);
886     if (status != STATUS_SUCCESS)
887     {
888         NtClose(q->event);
889         RtlFreeHeap(GetProcessHeap(), 0, q);
890         return status;
891     }
892 
893     *NewTimerQueue = q;
894     return STATUS_SUCCESS;
895 }
896 
897 /***********************************************************************
898  *              RtlDeleteTimerQueueEx   (NTDLL.@)
899  *
900  * Deletes a timer queue object.
901  *
902  * PARAMS
903  *  TimerQueue      [I] The timer queue to destroy.
904  *  CompletionEvent [I] If NULL, return immediately.  If INVALID_HANDLE_VALUE,
905  *                      wait until all timers are finished firing before
906  *                      returning.  Otherwise, return immediately and set the
907  *                      event when all timers are done.
908  *
909  * RETURNS
910  *  Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
911  *  Failure: Any NTSTATUS code.
912  */
RtlDeleteTimerQueueEx(HANDLE TimerQueue,HANDLE CompletionEvent)913 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
914 {
915     struct timer_queue *q = TimerQueue;
916     struct queue_timer *t, *temp;
917     HANDLE thread;
918     NTSTATUS status;
919 
920     if (!q || q->magic != TIMER_QUEUE_MAGIC)
921         return STATUS_INVALID_HANDLE;
922 
923     thread = q->thread;
924 
925     RtlEnterCriticalSection(&q->cs);
926     q->quit = TRUE;
927     if (list_head(&q->timers))
928         /* When the last timer is removed, it will signal the timer thread to
929            exit...  */
930         LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
931             queue_destroy_timer(t);
932     else
933         /* However if we have none, we must do it ourselves.  */
934         NtSetEvent(q->event, NULL);
935     RtlLeaveCriticalSection(&q->cs);
936 
937     if (CompletionEvent == INVALID_HANDLE_VALUE)
938     {
939         NtWaitForSingleObject(thread, FALSE, NULL);
940         status = STATUS_SUCCESS;
941     }
942     else
943     {
944         if (CompletionEvent)
945         {
946             FIXME("asynchronous return on completion event unimplemented\n");
947             NtWaitForSingleObject(thread, FALSE, NULL);
948             NtSetEvent(CompletionEvent, NULL);
949         }
950         status = STATUS_PENDING;
951     }
952 
953     NtClose(thread);
954     return status;
955 }
956 
get_timer_queue(HANDLE TimerQueue)957 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
958 {
959     static struct timer_queue *default_timer_queue;
960 
961     if (TimerQueue)
962         return TimerQueue;
963     else
964     {
965         if (!default_timer_queue)
966         {
967             HANDLE q;
968             NTSTATUS status = RtlCreateTimerQueue(&q);
969             if (status == STATUS_SUCCESS)
970             {
971                 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
972                 if (p)
973                     /* Got beat to the punch.  */
974                     RtlDeleteTimerQueueEx(q, NULL);
975             }
976         }
977         return default_timer_queue;
978     }
979 }
980 
981 /***********************************************************************
982  *              RtlCreateTimer   (NTDLL.@)
983  *
984  * Creates a new timer associated with the given queue.
985  *
986  * PARAMS
987  *  TimerQueue [I] The queue to hold the timer.
988  *  NewTimer   [O] The newly created timer.
989  *  Callback   [I] The callback to fire.
990  *  Parameter  [I] The argument for the callback.
991  *  DueTime    [I] The delay, in milliseconds, before first firing the
992  *                 timer.
993  *  Period     [I] The period, in milliseconds, at which to fire the timer
994  *                 after the first callback.  If zero, the timer will only
995  *                 fire once.  It still needs to be deleted with
996  *                 RtlDeleteTimer.
997  * Flags       [I] Flags controlling the execution of the callback.  In
998  *                 addition to the WT_* thread pool flags (see
999  *                 RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1000  *                 WT_EXECUTEONLYONCE are supported.
1001  *
1002  * RETURNS
1003  *  Success: STATUS_SUCCESS.
1004  *  Failure: Any NTSTATUS code.
1005  */
RtlCreateTimer(HANDLE TimerQueue,HANDLE * NewTimer,RTL_WAITORTIMERCALLBACKFUNC Callback,PVOID Parameter,DWORD DueTime,DWORD Period,ULONG Flags)1006 NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
1007                                RTL_WAITORTIMERCALLBACKFUNC Callback,
1008                                PVOID Parameter, DWORD DueTime, DWORD Period,
1009                                ULONG Flags)
1010 {
1011     NTSTATUS status;
1012     struct queue_timer *t;
1013     struct timer_queue *q = get_timer_queue(TimerQueue);
1014 
1015     if (!q) return STATUS_NO_MEMORY;
1016     if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1017 
1018     t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1019     if (!t)
1020         return STATUS_NO_MEMORY;
1021 
1022     t->q = q;
1023     t->runcount = 0;
1024     t->callback = Callback;
1025     t->param = Parameter;
1026     t->period = Period;
1027     t->flags = Flags;
1028     t->destroy = FALSE;
1029     t->event = NULL;
1030 
1031     status = STATUS_SUCCESS;
1032     RtlEnterCriticalSection(&q->cs);
1033     if (q->quit)
1034         status = STATUS_INVALID_HANDLE;
1035     else
1036         queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1037     RtlLeaveCriticalSection(&q->cs);
1038 
1039     if (status == STATUS_SUCCESS)
1040         *NewTimer = t;
1041     else
1042         RtlFreeHeap(GetProcessHeap(), 0, t);
1043 
1044     return status;
1045 }
1046 
1047 /***********************************************************************
1048  *              RtlUpdateTimer   (NTDLL.@)
1049  *
1050  * Changes the time at which a timer expires.
1051  *
1052  * PARAMS
1053  *  TimerQueue [I] The queue that holds the timer.
1054  *  Timer      [I] The timer to update.
1055  *  DueTime    [I] The delay, in milliseconds, before next firing the timer.
1056  *  Period     [I] The period, in milliseconds, at which to fire the timer
1057  *                 after the first callback.  If zero, the timer will not
1058  *                 refire once.  It still needs to be deleted with
1059  *                 RtlDeleteTimer.
1060  *
1061  * RETURNS
1062  *  Success: STATUS_SUCCESS.
1063  *  Failure: Any NTSTATUS code.
1064  */
RtlUpdateTimer(HANDLE TimerQueue,HANDLE Timer,DWORD DueTime,DWORD Period)1065 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1066                                DWORD DueTime, DWORD Period)
1067 {
1068     struct queue_timer *t = Timer;
1069     struct timer_queue *q = t->q;
1070 
1071     RtlEnterCriticalSection(&q->cs);
1072     /* Can't change a timer if it was once-only or destroyed.  */
1073     if (t->expire != EXPIRE_NEVER)
1074     {
1075         t->period = Period;
1076         queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1077     }
1078     RtlLeaveCriticalSection(&q->cs);
1079 
1080     return STATUS_SUCCESS;
1081 }
1082 
1083 /***********************************************************************
1084  *              RtlDeleteTimer   (NTDLL.@)
1085  *
1086  * Cancels a timer-queue timer.
1087  *
1088  * PARAMS
1089  *  TimerQueue      [I] The queue that holds the timer.
1090  *  Timer           [I] The timer to update.
1091  *  CompletionEvent [I] If NULL, return immediately.  If INVALID_HANDLE_VALUE,
1092  *                      wait until the timer is finished firing all pending
1093  *                      callbacks before returning.  Otherwise, return
1094  *                      immediately and set the timer is done.
1095  *
1096  * RETURNS
1097  *  Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1098              or if the completion event is NULL.
1099  *  Failure: Any NTSTATUS code.
1100  */
RtlDeleteTimer(HANDLE TimerQueue,HANDLE Timer,HANDLE CompletionEvent)1101 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1102                                HANDLE CompletionEvent)
1103 {
1104     struct queue_timer *t = Timer;
1105     struct timer_queue *q;
1106     NTSTATUS status = STATUS_PENDING;
1107     HANDLE event = NULL;
1108 
1109     if (!Timer)
1110         return STATUS_INVALID_PARAMETER_1;
1111     q = t->q;
1112     if (CompletionEvent == INVALID_HANDLE_VALUE)
1113     {
1114         status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1115         if (status == STATUS_SUCCESS)
1116             status = STATUS_PENDING;
1117     }
1118     else if (CompletionEvent)
1119         event = CompletionEvent;
1120 
1121     RtlEnterCriticalSection(&q->cs);
1122     t->event = event;
1123     if (t->runcount == 0 && event)
1124         status = STATUS_SUCCESS;
1125     queue_destroy_timer(t);
1126     RtlLeaveCriticalSection(&q->cs);
1127 
1128     if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1129     {
1130         if (status == STATUS_PENDING)
1131         {
1132             NtWaitForSingleObject(event, FALSE, NULL);
1133             status = STATUS_SUCCESS;
1134         }
1135         NtClose(event);
1136     }
1137 
1138     return status;
1139 }
1140 
1141 /***********************************************************************
1142  *           timerqueue_thread_proc    (internal)
1143  */
1144 #ifdef __REACTOS__
timerqueue_thread_proc(PVOID param)1145 ULONG NTAPI timerqueue_thread_proc(PVOID param )
1146 #else
1147 static void CALLBACK timerqueue_thread_proc( void *param )
1148 #endif
1149 {
1150     ULONGLONG timeout_lower, timeout_upper, new_timeout;
1151     struct threadpool_object *other_timer;
1152     LARGE_INTEGER now, timeout;
1153     struct list *ptr;
1154 
1155     TRACE( "starting timer queue thread\n" );
1156     set_thread_name(L"wine_threadpool_timerqueue");
1157 
1158     RtlEnterCriticalSection( &timerqueue.cs );
1159     for (;;)
1160     {
1161         NtQuerySystemTime( &now );
1162 
1163         /* Check for expired timers. */
1164         while ((ptr = list_head( &timerqueue.pending_timers )))
1165         {
1166             struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1167             assert( timer->type == TP_OBJECT_TYPE_TIMER );
1168             assert( timer->u.timer.timer_pending );
1169             if (timer->u.timer.timeout > now.QuadPart)
1170                 break;
1171 
1172             /* Queue a new callback in one of the worker threads. */
1173             list_remove( &timer->u.timer.timer_entry );
1174             timer->u.timer.timer_pending = FALSE;
1175             tp_object_submit( timer, FALSE );
1176 
1177             /* Insert the timer back into the queue, except it's marked for shutdown. */
1178             if (timer->u.timer.period && !timer->shutdown)
1179             {
1180                 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1181                 if (timer->u.timer.timeout <= now.QuadPart)
1182                     timer->u.timer.timeout = now.QuadPart + 1;
1183 
1184                 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1185                                      struct threadpool_object, u.timer.timer_entry )
1186                 {
1187                     assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1188                     if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1189                         break;
1190                 }
1191                 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1192                 timer->u.timer.timer_pending = TRUE;
1193             }
1194         }
1195 
1196         timeout_lower = timeout_upper = MAXLONGLONG;
1197 
1198         /* Determine next timeout and use the window length to optimize wakeup times. */
1199         LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1200                              struct threadpool_object, u.timer.timer_entry )
1201         {
1202             assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1203             if (other_timer->u.timer.timeout >= timeout_upper)
1204                 break;
1205 
1206             timeout_lower = other_timer->u.timer.timeout;
1207             new_timeout   = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1208             if (new_timeout < timeout_upper)
1209                 timeout_upper = new_timeout;
1210         }
1211 
1212         /* Wait for timer update events or until the next timer expires. */
1213         if (timerqueue.objcount)
1214         {
1215             timeout.QuadPart = timeout_lower;
1216             RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1217             continue;
1218         }
1219 
1220         /* All timers have been destroyed, if no new timers are created
1221          * within some amount of time, then we can shutdown this thread. */
1222         timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1223         if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1224             &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1225         {
1226             break;
1227         }
1228     }
1229 
1230     timerqueue.thread_running = FALSE;
1231     RtlLeaveCriticalSection( &timerqueue.cs );
1232 
1233     TRACE( "terminating timer queue thread\n" );
1234     RtlExitUserThread( 0 );
1235 #ifdef __REACTOS__
1236     return STATUS_SUCCESS;
1237 #endif
1238 }
1239 
1240 /***********************************************************************
1241  *           tp_new_worker_thread    (internal)
1242  *
1243  * Create and account a new worker thread for the desired pool.
1244  */
tp_new_worker_thread(struct threadpool * pool)1245 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1246 {
1247     HANDLE thread;
1248     NTSTATUS status;
1249 
1250     status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0,
1251                                   pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1252                                   threadpool_worker_proc, pool, &thread, NULL );
1253     if (status == STATUS_SUCCESS)
1254     {
1255         InterlockedIncrement( &pool->refcount );
1256         pool->num_workers++;
1257         NtClose( thread );
1258     }
1259     return status;
1260 }
1261 
1262 /***********************************************************************
1263  *           tp_timerqueue_lock    (internal)
1264  *
1265  * Acquires a lock on the global timerqueue. When the lock is acquired
1266  * successfully, it is guaranteed that the timer thread is running.
1267  */
tp_timerqueue_lock(struct threadpool_object * timer)1268 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1269 {
1270     NTSTATUS status = STATUS_SUCCESS;
1271     assert( timer->type == TP_OBJECT_TYPE_TIMER );
1272 
1273     timer->u.timer.timer_initialized    = FALSE;
1274     timer->u.timer.timer_pending        = FALSE;
1275     timer->u.timer.timer_set            = FALSE;
1276     timer->u.timer.timeout              = 0;
1277     timer->u.timer.period               = 0;
1278     timer->u.timer.window_length        = 0;
1279 
1280     RtlEnterCriticalSection( &timerqueue.cs );
1281 
1282     /* Make sure that the timerqueue thread is running. */
1283     if (!timerqueue.thread_running)
1284     {
1285         HANDLE thread;
1286         status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1287                                       timerqueue_thread_proc, NULL, &thread, NULL );
1288         if (status == STATUS_SUCCESS)
1289         {
1290             timerqueue.thread_running = TRUE;
1291             NtClose( thread );
1292         }
1293     }
1294 
1295     if (status == STATUS_SUCCESS)
1296     {
1297         timer->u.timer.timer_initialized = TRUE;
1298         timerqueue.objcount++;
1299     }
1300 
1301     RtlLeaveCriticalSection( &timerqueue.cs );
1302     return status;
1303 }
1304 
1305 /***********************************************************************
1306  *           tp_timerqueue_unlock    (internal)
1307  *
1308  * Releases a lock on the global timerqueue.
1309  */
tp_timerqueue_unlock(struct threadpool_object * timer)1310 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1311 {
1312     assert( timer->type == TP_OBJECT_TYPE_TIMER );
1313 
1314     RtlEnterCriticalSection( &timerqueue.cs );
1315     if (timer->u.timer.timer_initialized)
1316     {
1317         /* If timer was pending, remove it. */
1318         if (timer->u.timer.timer_pending)
1319         {
1320             list_remove( &timer->u.timer.timer_entry );
1321             timer->u.timer.timer_pending = FALSE;
1322         }
1323 
1324         /* If the last timer object was destroyed, then wake up the thread. */
1325         if (!--timerqueue.objcount)
1326         {
1327             assert( list_empty( &timerqueue.pending_timers ) );
1328             RtlWakeAllConditionVariable( &timerqueue.update_event );
1329         }
1330 
1331         timer->u.timer.timer_initialized = FALSE;
1332     }
1333     RtlLeaveCriticalSection( &timerqueue.cs );
1334 }
1335 
1336 /***********************************************************************
1337  *           waitqueue_thread_proc    (internal)
1338  */
1339 #ifdef __REACTOS__
waitqueue_thread_proc(PVOID param)1340 void NTAPI waitqueue_thread_proc(PVOID param )
1341 #else
1342 static void CALLBACK waitqueue_thread_proc( void *param )
1343 #endif
1344 {
1345     struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1346     HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1347     struct waitqueue_bucket *bucket = param;
1348     struct threadpool_object *wait, *next;
1349     LARGE_INTEGER now, timeout;
1350     DWORD num_handles;
1351     NTSTATUS status;
1352 
1353     TRACE( "starting wait queue thread\n" );
1354     set_thread_name(L"wine_threadpool_waitqueue");
1355 
1356     RtlEnterCriticalSection( &waitqueue.cs );
1357 
1358     for (;;)
1359     {
1360         NtQuerySystemTime( &now );
1361         timeout.QuadPart = MAXLONGLONG;
1362         num_handles = 0;
1363 
1364         LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1365                                   u.wait.wait_entry )
1366         {
1367             assert( wait->type == TP_OBJECT_TYPE_WAIT );
1368             if (wait->u.wait.timeout <= now.QuadPart)
1369             {
1370                 /* Wait object timed out. */
1371                 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1372                 {
1373                     list_remove( &wait->u.wait.wait_entry );
1374                     list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1375                 }
1376                 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1377                 {
1378                     InterlockedIncrement( &wait->refcount );
1379                     wait->num_pending_callbacks++;
1380                     RtlEnterCriticalSection( &wait->pool->cs );
1381                     tp_object_execute( wait, TRUE );
1382                     RtlLeaveCriticalSection( &wait->pool->cs );
1383                     tp_object_release( wait );
1384                 }
1385                 else tp_object_submit( wait, FALSE );
1386             }
1387             else
1388             {
1389                 if (wait->u.wait.timeout < timeout.QuadPart)
1390                     timeout.QuadPart = wait->u.wait.timeout;
1391 
1392                 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1393                 InterlockedIncrement( &wait->refcount );
1394                 objects[num_handles] = wait;
1395                 handles[num_handles] = wait->u.wait.handle;
1396                 num_handles++;
1397             }
1398         }
1399 
1400         if (!bucket->objcount)
1401         {
1402             /* All wait objects have been destroyed, if no new wait objects are created
1403              * within some amount of time, then we can shutdown this thread. */
1404             assert( num_handles == 0 );
1405             RtlLeaveCriticalSection( &waitqueue.cs );
1406             timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1407             status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1408             RtlEnterCriticalSection( &waitqueue.cs );
1409 
1410             if (status == STATUS_TIMEOUT && !bucket->objcount)
1411                 break;
1412         }
1413         else
1414         {
1415             handles[num_handles] = bucket->update_event;
1416             RtlLeaveCriticalSection( &waitqueue.cs );
1417             status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1418             RtlEnterCriticalSection( &waitqueue.cs );
1419 
1420             if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1421             {
1422                 wait = objects[status - STATUS_WAIT_0];
1423                 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1424                 if (wait->u.wait.bucket)
1425                 {
1426                     /* Wait object signaled. */
1427                     assert( wait->u.wait.bucket == bucket );
1428                     if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1429                     {
1430                         list_remove( &wait->u.wait.wait_entry );
1431                         list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1432                     }
1433                     if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1434                     {
1435                         wait->u.wait.signaled++;
1436                         wait->num_pending_callbacks++;
1437                         RtlEnterCriticalSection( &wait->pool->cs );
1438                         tp_object_execute( wait, TRUE );
1439                         RtlLeaveCriticalSection( &wait->pool->cs );
1440                     }
1441                     else tp_object_submit( wait, TRUE );
1442                 }
1443                 else
1444                     WARN("wait object %p triggered while object was destroyed\n", wait);
1445             }
1446 
1447             /* Release temporary references to wait objects. */
1448             while (num_handles)
1449             {
1450                 wait = objects[--num_handles];
1451                 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1452                 tp_object_release( wait );
1453             }
1454         }
1455 
1456         /* Try to merge bucket with other threads. */
1457         if (waitqueue.num_buckets > 1 && bucket->objcount &&
1458             bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1459         {
1460             struct waitqueue_bucket *other_bucket;
1461             LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1462             {
1463                 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1464                     other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1465                 {
1466                     other_bucket->objcount += bucket->objcount;
1467                     bucket->objcount = 0;
1468 
1469                     /* Update reserved list. */
1470                     LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1471                     {
1472                         assert( wait->type == TP_OBJECT_TYPE_WAIT );
1473                         wait->u.wait.bucket = other_bucket;
1474                     }
1475                     list_move_tail( &other_bucket->reserved, &bucket->reserved );
1476 
1477                     /* Update waiting list. */
1478                     LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1479                     {
1480                         assert( wait->type == TP_OBJECT_TYPE_WAIT );
1481                         wait->u.wait.bucket = other_bucket;
1482                     }
1483                     list_move_tail( &other_bucket->waiting, &bucket->waiting );
1484 
1485                     /* Move bucket to the end, to keep the probability of
1486                      * newly added wait objects as small as possible. */
1487                     list_remove( &bucket->bucket_entry );
1488                     list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1489 
1490                     NtSetEvent( other_bucket->update_event, NULL );
1491                     break;
1492                 }
1493             }
1494         }
1495     }
1496 
1497     /* Remove this bucket from the list. */
1498     list_remove( &bucket->bucket_entry );
1499     if (!--waitqueue.num_buckets)
1500         assert( list_empty( &waitqueue.buckets ) );
1501 
1502     RtlLeaveCriticalSection( &waitqueue.cs );
1503 
1504     TRACE( "terminating wait queue thread\n" );
1505 
1506     assert( bucket->objcount == 0 );
1507     assert( list_empty( &bucket->reserved ) );
1508     assert( list_empty( &bucket->waiting ) );
1509     NtClose( bucket->update_event );
1510 
1511     RtlFreeHeap( GetProcessHeap(), 0, bucket );
1512     RtlExitUserThread( 0 );
1513 }
1514 
1515 /***********************************************************************
1516  *           tp_waitqueue_lock    (internal)
1517  */
tp_waitqueue_lock(struct threadpool_object * wait)1518 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1519 {
1520     struct waitqueue_bucket *bucket;
1521     NTSTATUS status;
1522     HANDLE thread;
1523     BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1524     assert( wait->type == TP_OBJECT_TYPE_WAIT );
1525 
1526     wait->u.wait.signaled       = 0;
1527     wait->u.wait.bucket         = NULL;
1528     wait->u.wait.wait_pending   = FALSE;
1529     wait->u.wait.timeout        = 0;
1530     wait->u.wait.handle         = INVALID_HANDLE_VALUE;
1531 
1532     RtlEnterCriticalSection( &waitqueue.cs );
1533 
1534     /* Try to assign to existing bucket if possible. */
1535     LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1536     {
1537         if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1538         {
1539             list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1540             wait->u.wait.bucket = bucket;
1541             bucket->objcount++;
1542 
1543             status = STATUS_SUCCESS;
1544             goto out;
1545         }
1546     }
1547 
1548     /* Create a new bucket and corresponding worker thread. */
1549     bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1550     if (!bucket)
1551     {
1552         status = STATUS_NO_MEMORY;
1553         goto out;
1554     }
1555 
1556     bucket->objcount = 0;
1557     bucket->alertable = alertable;
1558     list_init( &bucket->reserved );
1559     list_init( &bucket->waiting );
1560 
1561     status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1562                             NULL, SynchronizationEvent, FALSE );
1563     if (status)
1564     {
1565         RtlFreeHeap( GetProcessHeap(), 0, bucket );
1566         goto out;
1567     }
1568 
1569     status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1570                                   (PTHREAD_START_ROUTINE)waitqueue_thread_proc, bucket, &thread, NULL );
1571     if (status == STATUS_SUCCESS)
1572     {
1573         list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1574         waitqueue.num_buckets++;
1575 
1576         list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1577         wait->u.wait.bucket = bucket;
1578         bucket->objcount++;
1579 
1580         NtClose( thread );
1581     }
1582     else
1583     {
1584         NtClose( bucket->update_event );
1585         RtlFreeHeap( GetProcessHeap(), 0, bucket );
1586     }
1587 
1588 out:
1589     RtlLeaveCriticalSection( &waitqueue.cs );
1590     return status;
1591 }
1592 
1593 /***********************************************************************
1594  *           tp_waitqueue_unlock    (internal)
1595  */
tp_waitqueue_unlock(struct threadpool_object * wait)1596 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1597 {
1598     assert( wait->type == TP_OBJECT_TYPE_WAIT );
1599 
1600     RtlEnterCriticalSection( &waitqueue.cs );
1601     if (wait->u.wait.bucket)
1602     {
1603         struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1604         assert( bucket->objcount > 0 );
1605 
1606         list_remove( &wait->u.wait.wait_entry );
1607         wait->u.wait.bucket = NULL;
1608         bucket->objcount--;
1609 
1610         NtSetEvent( bucket->update_event, NULL );
1611     }
1612     RtlLeaveCriticalSection( &waitqueue.cs );
1613 }
1614 
1615 #ifdef __REACTOS__
ioqueue_thread_proc(PVOID param)1616 ULONG NTAPI ioqueue_thread_proc(PVOID param )
1617 #else
1618 static void CALLBACK ioqueue_thread_proc( void *param )
1619 #endif
1620 {
1621     struct io_completion *completion;
1622     struct threadpool_object *io;
1623     IO_STATUS_BLOCK iosb;
1624 #ifdef __REACTOS__
1625     PVOID key, value;
1626 #else
1627     ULONG_PTR key, value;
1628 #endif
1629     BOOL destroy, skip;
1630     NTSTATUS status;
1631 
1632     TRACE( "starting I/O completion thread\n" );
1633     set_thread_name(L"wine_threadpool_ioqueue");
1634 
1635     RtlEnterCriticalSection( &ioqueue.cs );
1636 
1637     for (;;)
1638     {
1639         RtlLeaveCriticalSection( &ioqueue.cs );
1640         if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1641             ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1642         RtlEnterCriticalSection( &ioqueue.cs );
1643 
1644         destroy = skip = FALSE;
1645         io = (struct threadpool_object *)key;
1646 
1647         TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1648 
1649         if (io && (io->shutdown || io->u.io.shutting_down))
1650         {
1651             RtlEnterCriticalSection( &io->pool->cs );
1652             if (!io->u.io.pending_count)
1653             {
1654                 if (io->u.io.skipped_count)
1655                     --io->u.io.skipped_count;
1656 
1657                 if (io->u.io.skipped_count)
1658                     skip = TRUE;
1659                 else
1660                     destroy = TRUE;
1661             }
1662             RtlLeaveCriticalSection( &io->pool->cs );
1663             if (skip) continue;
1664         }
1665 
1666         if (destroy)
1667         {
1668             --ioqueue.objcount;
1669             TRACE( "Releasing io %p.\n", io );
1670             io->shutdown = TRUE;
1671             tp_object_release( io );
1672         }
1673         else if (io)
1674         {
1675             RtlEnterCriticalSection( &io->pool->cs );
1676 
1677             TRACE( "pending_count %u.\n", io->u.io.pending_count );
1678 
1679             if (io->u.io.pending_count)
1680             {
1681                 --io->u.io.pending_count;
1682                 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1683                         io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1684                 {
1685                     ERR( "Failed to allocate memory.\n" );
1686                     RtlLeaveCriticalSection( &io->pool->cs );
1687                     continue;
1688                 }
1689 
1690                 completion = &io->u.io.completions[io->u.io.completion_count++];
1691                 completion->iosb = iosb;
1692 #ifdef __REACTOS__
1693                 completion->cvalue = (ULONG_PTR)value;
1694 #else
1695                 completion->cvalue = value;
1696 #endif
1697 
1698                 tp_object_submit( io, FALSE );
1699             }
1700             RtlLeaveCriticalSection( &io->pool->cs );
1701         }
1702 
1703         if (!ioqueue.objcount)
1704         {
1705             /* All I/O objects have been destroyed; if no new objects are
1706              * created within some amount of time, then we can shutdown this
1707              * thread. */
1708             LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1709             if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1710                     &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1711                 break;
1712         }
1713     }
1714 
1715     ioqueue.thread_running = FALSE;
1716     RtlLeaveCriticalSection( &ioqueue.cs );
1717 
1718     TRACE( "terminating I/O completion thread\n" );
1719 
1720     RtlExitUserThread( 0 );
1721 
1722 #ifdef __REACTOS__
1723     return STATUS_SUCCESS;
1724 #endif
1725 }
1726 
tp_ioqueue_lock(struct threadpool_object * io,HANDLE file)1727 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1728 {
1729     NTSTATUS status = STATUS_SUCCESS;
1730 
1731     assert( io->type == TP_OBJECT_TYPE_IO );
1732 
1733     RtlEnterCriticalSection( &ioqueue.cs );
1734 
1735     if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1736             IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1737     {
1738         RtlLeaveCriticalSection( &ioqueue.cs );
1739         return status;
1740     }
1741 
1742     if (!ioqueue.thread_running)
1743     {
1744         HANDLE thread;
1745 
1746         if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1747                                             0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1748         {
1749             ioqueue.thread_running = TRUE;
1750             NtClose( thread );
1751         }
1752     }
1753 
1754     if (status == STATUS_SUCCESS)
1755     {
1756         FILE_COMPLETION_INFORMATION info;
1757         IO_STATUS_BLOCK iosb;
1758 
1759 #ifdef __REACTOS__
1760         info.Port = ioqueue.port;
1761         info.Key = io;
1762 #else
1763         info.CompletionPort = ioqueue.port;
1764         info.CompletionKey = (ULONG_PTR)io;
1765 #endif
1766 
1767         status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1768     }
1769 
1770     if (status == STATUS_SUCCESS)
1771     {
1772         if (!ioqueue.objcount++)
1773             RtlWakeConditionVariable( &ioqueue.update_event );
1774     }
1775 
1776     RtlLeaveCriticalSection( &ioqueue.cs );
1777     return status;
1778 }
1779 
1780 /***********************************************************************
1781  *           tp_threadpool_alloc    (internal)
1782  *
1783  * Allocates a new threadpool object.
1784  */
tp_threadpool_alloc(struct threadpool ** out)1785 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1786 {
1787 #ifdef __REACTOS__
1788     IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1789 #else
1790     IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1791 #endif
1792     struct threadpool *pool;
1793     unsigned int i;
1794 
1795     pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1796     if (!pool)
1797         return STATUS_NO_MEMORY;
1798 
1799     pool->refcount              = 1;
1800     pool->objcount              = 0;
1801     pool->shutdown              = FALSE;
1802 
1803 #ifdef __REACTOS__
1804     RtlInitializeCriticalSection( &pool->cs );
1805 #else
1806     RtlInitializeCriticalSectionEx( &pool->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1807 
1808     pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1809 #endif
1810 
1811     for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1812         list_init( &pool->pools[i] );
1813     RtlInitializeConditionVariable( &pool->update_event );
1814 
1815     pool->max_workers             = 500;
1816     pool->min_workers             = 0;
1817     pool->num_workers             = 0;
1818     pool->num_busy_workers        = 0;
1819     pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1820     pool->stack_info.StackCommit  = nt->OptionalHeader.SizeOfStackCommit;
1821 
1822     TRACE( "allocated threadpool %p\n", pool );
1823 
1824     *out = pool;
1825     return STATUS_SUCCESS;
1826 }
1827 
1828 /***********************************************************************
1829  *           tp_threadpool_shutdown    (internal)
1830  *
1831  * Prepares the shutdown of a threadpool object and notifies all worker
1832  * threads to terminate (after all remaining work items have been
1833  * processed).
1834  */
tp_threadpool_shutdown(struct threadpool * pool)1835 static void tp_threadpool_shutdown( struct threadpool *pool )
1836 {
1837     assert( pool != default_threadpool );
1838 
1839     pool->shutdown = TRUE;
1840     RtlWakeAllConditionVariable( &pool->update_event );
1841 }
1842 
1843 /***********************************************************************
1844  *           tp_threadpool_release    (internal)
1845  *
1846  * Releases a reference to a threadpool object.
1847  */
tp_threadpool_release(struct threadpool * pool)1848 static BOOL tp_threadpool_release( struct threadpool *pool )
1849 {
1850     unsigned int i;
1851 
1852     if (InterlockedDecrement( &pool->refcount ))
1853         return FALSE;
1854 
1855     TRACE( "destroying threadpool %p\n", pool );
1856 
1857     assert( pool->shutdown );
1858     assert( !pool->objcount );
1859     for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1860         assert( list_empty( &pool->pools[i] ) );
1861 #ifndef __REACTOS__
1862     pool->cs.DebugInfo->Spare[0] = 0;
1863 #endif
1864     RtlDeleteCriticalSection( &pool->cs );
1865 
1866     RtlFreeHeap( GetProcessHeap(), 0, pool );
1867     return TRUE;
1868 }
1869 
1870 /***********************************************************************
1871  *           tp_threadpool_lock    (internal)
1872  *
1873  * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1874  * block. When the lock is acquired successfully, it is guaranteed that
1875  * there is at least one worker thread to process tasks.
1876  */
tp_threadpool_lock(struct threadpool ** out,TP_CALLBACK_ENVIRON * environment)1877 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1878 {
1879     struct threadpool *pool = NULL;
1880     NTSTATUS status = STATUS_SUCCESS;
1881 
1882     if (environment)
1883     {
1884 #ifndef __REACTOS__ //Windows 7 stuff
1885         /* Validate environment parameters. */
1886         if (environment->Version == 3)
1887         {
1888             TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1889 
1890             switch (environment3->CallbackPriority)
1891             {
1892                 case TP_CALLBACK_PRIORITY_HIGH:
1893                 case TP_CALLBACK_PRIORITY_NORMAL:
1894                 case TP_CALLBACK_PRIORITY_LOW:
1895                     break;
1896                 default:
1897                     return STATUS_INVALID_PARAMETER;
1898             }
1899         }
1900 #endif
1901         pool = (struct threadpool *)environment->Pool;
1902     }
1903 
1904     if (!pool)
1905     {
1906         if (!default_threadpool)
1907         {
1908             status = tp_threadpool_alloc( &pool );
1909             if (status != STATUS_SUCCESS)
1910                 return status;
1911 
1912             if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1913             {
1914                 tp_threadpool_shutdown( pool );
1915                 tp_threadpool_release( pool );
1916             }
1917         }
1918 
1919         pool = default_threadpool;
1920     }
1921 
1922     RtlEnterCriticalSection( &pool->cs );
1923 
1924     /* Make sure that the threadpool has at least one thread. */
1925     if (!pool->num_workers)
1926         status = tp_new_worker_thread( pool );
1927 
1928     /* Keep a reference, and increment objcount to ensure that the
1929      * last thread doesn't terminate. */
1930     if (status == STATUS_SUCCESS)
1931     {
1932         InterlockedIncrement( &pool->refcount );
1933         pool->objcount++;
1934     }
1935 
1936     RtlLeaveCriticalSection( &pool->cs );
1937 
1938     if (status != STATUS_SUCCESS)
1939         return status;
1940 
1941     *out = pool;
1942     return STATUS_SUCCESS;
1943 }
1944 
1945 /***********************************************************************
1946  *           tp_threadpool_unlock    (internal)
1947  *
1948  * Releases a lock on a threadpool.
1949  */
tp_threadpool_unlock(struct threadpool * pool)1950 static void tp_threadpool_unlock( struct threadpool *pool )
1951 {
1952     RtlEnterCriticalSection( &pool->cs );
1953     pool->objcount--;
1954     RtlLeaveCriticalSection( &pool->cs );
1955     tp_threadpool_release( pool );
1956 }
1957 
1958 /***********************************************************************
1959  *           tp_group_alloc    (internal)
1960  *
1961  * Allocates a new threadpool group object.
1962  */
tp_group_alloc(struct threadpool_group ** out)1963 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1964 {
1965     struct threadpool_group *group;
1966 
1967     group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1968     if (!group)
1969         return STATUS_NO_MEMORY;
1970 
1971     group->refcount     = 1;
1972     group->shutdown     = FALSE;
1973 
1974 #ifdef __REACTOS__
1975     RtlInitializeCriticalSection( &group->cs );
1976 #else
1977     RtlInitializeCriticalSectionEx( &group->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1978 
1979     group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1980 #endif
1981 
1982     list_init( &group->members );
1983 
1984     TRACE( "allocated group %p\n", group );
1985 
1986     *out = group;
1987     return STATUS_SUCCESS;
1988 }
1989 
1990 /***********************************************************************
1991  *           tp_group_shutdown    (internal)
1992  *
1993  * Marks the group object for shutdown.
1994  */
tp_group_shutdown(struct threadpool_group * group)1995 static void tp_group_shutdown( struct threadpool_group *group )
1996 {
1997     group->shutdown = TRUE;
1998 }
1999 
2000 /***********************************************************************
2001  *           tp_group_release    (internal)
2002  *
2003  * Releases a reference to a group object.
2004  */
tp_group_release(struct threadpool_group * group)2005 static BOOL tp_group_release( struct threadpool_group *group )
2006 {
2007     if (InterlockedDecrement( &group->refcount ))
2008         return FALSE;
2009 
2010     TRACE( "destroying group %p\n", group );
2011 
2012     assert( group->shutdown );
2013     assert( list_empty( &group->members ) );
2014 
2015 #ifndef __REACTOS__
2016     group->cs.DebugInfo->Spare[0] = 0;
2017 #endif
2018     RtlDeleteCriticalSection( &group->cs );
2019 
2020     RtlFreeHeap( GetProcessHeap(), 0, group );
2021     return TRUE;
2022 }
2023 
2024 /***********************************************************************
2025  *           tp_object_initialize    (internal)
2026  *
2027  * Initializes members of a threadpool object.
2028  */
tp_object_initialize(struct threadpool_object * object,struct threadpool * pool,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2029 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2030                                   PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2031 {
2032     BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2033 
2034     object->refcount                = 1;
2035     object->shutdown                = FALSE;
2036 
2037     object->pool                    = pool;
2038     object->group                   = NULL;
2039     object->userdata                = userdata;
2040     object->group_cancel_callback   = NULL;
2041     object->finalization_callback   = NULL;
2042     object->may_run_long            = 0;
2043     object->race_dll                = NULL;
2044     object->priority                = TP_CALLBACK_PRIORITY_NORMAL;
2045 
2046     memset( &object->group_entry, 0, sizeof(object->group_entry) );
2047     object->is_group_member         = FALSE;
2048 
2049     memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2050     RtlInitializeConditionVariable( &object->finished_event );
2051     RtlInitializeConditionVariable( &object->group_finished_event );
2052     object->completed_event         = NULL;
2053     object->num_pending_callbacks   = 0;
2054     object->num_running_callbacks   = 0;
2055     object->num_associated_callbacks = 0;
2056 
2057     if (environment)
2058     {
2059         if (environment->Version != 1 && environment->Version != 3)
2060             FIXME( "unsupported environment version %lu\n", environment->Version );
2061 
2062         object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2063         object->group_cancel_callback   = environment->CleanupGroupCancelCallback;
2064         object->finalization_callback   = environment->FinalizationCallback;
2065         object->may_run_long            = environment->u.s.LongFunction != 0;
2066         object->race_dll                = environment->RaceDll;
2067 #ifndef __REACTOS__ //Windows 7 stuff
2068         if (environment->Version == 3)
2069         {
2070             TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2071 
2072             object->priority = environment_v3->CallbackPriority;
2073             assert( object->priority < ARRAY_SIZE(pool->pools) );
2074         }
2075 #endif
2076         if (environment->ActivationContext)
2077             FIXME( "activation context not supported yet\n" );
2078 
2079         if (environment->u.s.Persistent)
2080             FIXME( "persistent threads not supported yet\n" );
2081     }
2082 
2083     if (object->race_dll)
2084         LdrAddRefDll( 0, object->race_dll );
2085 
2086     TRACE( "allocated object %p of type %u\n", object, object->type );
2087 
2088     /* For simple callbacks we have to run tp_object_submit before adding this object
2089      * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2090      * will be set, and tp_object_submit would fail with an assertion. */
2091 
2092     if (is_simple_callback)
2093         tp_object_submit( object, FALSE );
2094 
2095     if (object->group)
2096     {
2097         struct threadpool_group *group = object->group;
2098         InterlockedIncrement( &group->refcount );
2099 
2100         RtlEnterCriticalSection( &group->cs );
2101         list_add_tail( &group->members, &object->group_entry );
2102         object->is_group_member = TRUE;
2103         RtlLeaveCriticalSection( &group->cs );
2104     }
2105 
2106     if (is_simple_callback)
2107         tp_object_release( object );
2108 }
2109 
tp_object_prio_queue(struct threadpool_object * object)2110 static void tp_object_prio_queue( struct threadpool_object *object )
2111 {
2112     ++object->pool->num_busy_workers;
2113     list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2114 }
2115 
2116 /***********************************************************************
2117  *           tp_object_submit    (internal)
2118  *
2119  * Submits a threadpool object to the associated threadpool. This
2120  * function has to be VOID because TpPostWork can never fail on Windows.
2121  */
tp_object_submit(struct threadpool_object * object,BOOL signaled)2122 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2123 {
2124     struct threadpool *pool = object->pool;
2125     NTSTATUS status = STATUS_UNSUCCESSFUL;
2126 
2127     assert( !object->shutdown );
2128     assert( !pool->shutdown );
2129 
2130     RtlEnterCriticalSection( &pool->cs );
2131 
2132     /* Start new worker threads if required. */
2133     if (pool->num_busy_workers >= pool->num_workers &&
2134         pool->num_workers < pool->max_workers)
2135         status = tp_new_worker_thread( pool );
2136 
2137     /* Queue work item and increment refcount. */
2138     InterlockedIncrement( &object->refcount );
2139     if (!object->num_pending_callbacks++)
2140         tp_object_prio_queue( object );
2141 
2142     /* Count how often the object was signaled. */
2143     if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2144         object->u.wait.signaled++;
2145 
2146     /* No new thread started - wake up one existing thread. */
2147     if (status != STATUS_SUCCESS)
2148     {
2149         assert( pool->num_workers > 0 );
2150         RtlWakeConditionVariable( &pool->update_event );
2151     }
2152 
2153     RtlLeaveCriticalSection( &pool->cs );
2154 }
2155 
2156 /***********************************************************************
2157  *           tp_object_cancel    (internal)
2158  *
2159  * Cancels all currently pending callbacks for a specific object.
2160  */
tp_object_cancel(struct threadpool_object * object)2161 static void tp_object_cancel( struct threadpool_object *object )
2162 {
2163     struct threadpool *pool = object->pool;
2164     LONG pending_callbacks = 0;
2165 
2166     RtlEnterCriticalSection( &pool->cs );
2167     if (object->num_pending_callbacks)
2168     {
2169         pending_callbacks = object->num_pending_callbacks;
2170         object->num_pending_callbacks = 0;
2171         list_remove( &object->pool_entry );
2172 
2173         if (object->type == TP_OBJECT_TYPE_WAIT)
2174             object->u.wait.signaled = 0;
2175     }
2176     if (object->type == TP_OBJECT_TYPE_IO)
2177     {
2178         object->u.io.skipped_count += object->u.io.pending_count;
2179         object->u.io.pending_count = 0;
2180     }
2181     RtlLeaveCriticalSection( &pool->cs );
2182 
2183     while (pending_callbacks--)
2184         tp_object_release( object );
2185 }
2186 
object_is_finished(struct threadpool_object * object,BOOL group)2187 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2188 {
2189     if (object->num_pending_callbacks)
2190         return FALSE;
2191     if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2192         return FALSE;
2193 
2194     if (group)
2195         return !object->num_running_callbacks;
2196     else
2197         return !object->num_associated_callbacks;
2198 }
2199 
2200 /***********************************************************************
2201  *           tp_object_wait    (internal)
2202  *
2203  * Waits until all pending and running callbacks of a specific object
2204  * have been processed.
2205  */
tp_object_wait(struct threadpool_object * object,BOOL group_wait)2206 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2207 {
2208     struct threadpool *pool = object->pool;
2209 
2210     RtlEnterCriticalSection( &pool->cs );
2211     while (!object_is_finished( object, group_wait ))
2212     {
2213         if (group_wait)
2214             RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2215         else
2216             RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2217     }
2218     RtlLeaveCriticalSection( &pool->cs );
2219 }
2220 
tp_ioqueue_unlock(struct threadpool_object * io)2221 static void tp_ioqueue_unlock( struct threadpool_object *io )
2222 {
2223     assert( io->type == TP_OBJECT_TYPE_IO );
2224 
2225     RtlEnterCriticalSection( &ioqueue.cs );
2226 
2227     assert(ioqueue.objcount);
2228 
2229     if (!io->shutdown && !--ioqueue.objcount)
2230         NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2231 
2232     RtlLeaveCriticalSection( &ioqueue.cs );
2233 }
2234 
2235 /***********************************************************************
2236  *           tp_object_prepare_shutdown    (internal)
2237  *
2238  * Prepares a threadpool object for shutdown.
2239  */
tp_object_prepare_shutdown(struct threadpool_object * object)2240 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2241 {
2242     if (object->type == TP_OBJECT_TYPE_TIMER)
2243         tp_timerqueue_unlock( object );
2244     else if (object->type == TP_OBJECT_TYPE_WAIT)
2245         tp_waitqueue_unlock( object );
2246     else if (object->type == TP_OBJECT_TYPE_IO)
2247         tp_ioqueue_unlock( object );
2248 }
2249 
2250 /***********************************************************************
2251  *           tp_object_release    (internal)
2252  *
2253  * Releases a reference to a threadpool object.
2254  */
tp_object_release(struct threadpool_object * object)2255 static BOOL tp_object_release( struct threadpool_object *object )
2256 {
2257     if (InterlockedDecrement( &object->refcount ))
2258         return FALSE;
2259 
2260     TRACE( "destroying object %p of type %u\n", object, object->type );
2261 
2262     assert( object->shutdown );
2263     assert( !object->num_pending_callbacks );
2264     assert( !object->num_running_callbacks );
2265     assert( !object->num_associated_callbacks );
2266 
2267     /* release reference to the group */
2268     if (object->group)
2269     {
2270         struct threadpool_group *group = object->group;
2271 
2272         RtlEnterCriticalSection( &group->cs );
2273         if (object->is_group_member)
2274         {
2275             list_remove( &object->group_entry );
2276             object->is_group_member = FALSE;
2277         }
2278         RtlLeaveCriticalSection( &group->cs );
2279 
2280         tp_group_release( group );
2281     }
2282 
2283     tp_threadpool_unlock( object->pool );
2284 
2285     if (object->race_dll)
2286         LdrUnloadDll( object->race_dll );
2287 
2288     if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2289         NtSetEvent( object->completed_event, NULL );
2290 
2291     RtlFreeHeap( GetProcessHeap(), 0, object );
2292     return TRUE;
2293 }
2294 
threadpool_get_next_item(const struct threadpool * pool)2295 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2296 {
2297     struct list *ptr;
2298     unsigned int i;
2299 
2300     for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2301     {
2302         if ((ptr = list_head( &pool->pools[i] )))
2303             break;
2304     }
2305 
2306     return ptr;
2307 }
2308 
2309 /***********************************************************************
2310  *           tp_object_execute    (internal)
2311  *
2312  * Executes a threadpool object callback, object->pool->cs has to be
2313  * held.
2314  */
tp_object_execute(struct threadpool_object * object,BOOL wait_thread)2315 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2316 {
2317     TP_CALLBACK_INSTANCE *callback_instance;
2318     struct threadpool_instance instance;
2319     struct io_completion completion;
2320     struct threadpool *pool = object->pool;
2321     TP_WAIT_RESULT wait_result = 0;
2322     NTSTATUS status;
2323 
2324     object->num_pending_callbacks--;
2325 
2326     /* For wait objects check if they were signaled or have timed out. */
2327     if (object->type == TP_OBJECT_TYPE_WAIT)
2328     {
2329         wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2330         if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2331     }
2332     else if (object->type == TP_OBJECT_TYPE_IO)
2333     {
2334         assert( object->u.io.completion_count );
2335         completion = object->u.io.completions[--object->u.io.completion_count];
2336     }
2337 
2338     /* Leave critical section and do the actual callback. */
2339     object->num_associated_callbacks++;
2340     object->num_running_callbacks++;
2341     RtlLeaveCriticalSection( &pool->cs );
2342     if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2343 
2344     /* Initialize threadpool instance struct. */
2345     callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2346     instance.object                     = object;
2347     instance.threadid                   = GetCurrentThreadId();
2348     instance.associated                 = TRUE;
2349     instance.may_run_long               = object->may_run_long;
2350     instance.cleanup.critical_section   = NULL;
2351     instance.cleanup.mutex              = NULL;
2352     instance.cleanup.semaphore          = NULL;
2353     instance.cleanup.semaphore_count    = 0;
2354     instance.cleanup.event              = NULL;
2355     instance.cleanup.library            = NULL;
2356 
2357     switch (object->type)
2358     {
2359         case TP_OBJECT_TYPE_SIMPLE:
2360         {
2361             TRACE( "executing simple callback %p(%p, %p)\n",
2362                    object->u.simple.callback, callback_instance, object->userdata );
2363             object->u.simple.callback( callback_instance, object->userdata );
2364             TRACE( "callback %p returned\n", object->u.simple.callback );
2365             break;
2366         }
2367 
2368         case TP_OBJECT_TYPE_WORK:
2369         {
2370             TRACE( "executing work callback %p(%p, %p, %p)\n",
2371                    object->u.work.callback, callback_instance, object->userdata, object );
2372             object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2373             TRACE( "callback %p returned\n", object->u.work.callback );
2374             break;
2375         }
2376 
2377         case TP_OBJECT_TYPE_TIMER:
2378         {
2379             TRACE( "executing timer callback %p(%p, %p, %p)\n",
2380                    object->u.timer.callback, callback_instance, object->userdata, object );
2381             object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2382             TRACE( "callback %p returned\n", object->u.timer.callback );
2383             break;
2384         }
2385 
2386         case TP_OBJECT_TYPE_WAIT:
2387         {
2388             TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2389                    object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2390             object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2391             TRACE( "callback %p returned\n", object->u.wait.callback );
2392             break;
2393         }
2394 
2395         case TP_OBJECT_TYPE_IO:
2396         {
2397             TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2398                     object->u.io.callback, callback_instance, object->userdata,
2399                     completion.cvalue, &completion.iosb, (TP_IO *)object );
2400             object->u.io.callback( callback_instance, object->userdata,
2401                     (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2402             TRACE( "callback %p returned\n", object->u.io.callback );
2403             break;
2404         }
2405 
2406         default:
2407             assert(0);
2408             break;
2409     }
2410 
2411     /* Execute finalization callback. */
2412     if (object->finalization_callback)
2413     {
2414         TRACE( "executing finalization callback %p(%p, %p)\n",
2415                object->finalization_callback, callback_instance, object->userdata );
2416         object->finalization_callback( callback_instance, object->userdata );
2417         TRACE( "callback %p returned\n", object->finalization_callback );
2418     }
2419 
2420     /* Execute cleanup tasks. */
2421     if (instance.cleanup.critical_section)
2422     {
2423         RtlLeaveCriticalSection( instance.cleanup.critical_section );
2424     }
2425     if (instance.cleanup.mutex)
2426     {
2427         status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2428         if (status != STATUS_SUCCESS) goto skip_cleanup;
2429     }
2430     if (instance.cleanup.semaphore)
2431     {
2432         status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2433         if (status != STATUS_SUCCESS) goto skip_cleanup;
2434     }
2435     if (instance.cleanup.event)
2436     {
2437         status = NtSetEvent( instance.cleanup.event, NULL );
2438         if (status != STATUS_SUCCESS) goto skip_cleanup;
2439     }
2440     if (instance.cleanup.library)
2441     {
2442         LdrUnloadDll( instance.cleanup.library );
2443     }
2444 
2445 skip_cleanup:
2446     if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2447     RtlEnterCriticalSection( &pool->cs );
2448 
2449     /* Simple callbacks are automatically shutdown after execution. */
2450     if (object->type == TP_OBJECT_TYPE_SIMPLE)
2451     {
2452         tp_object_prepare_shutdown( object );
2453         object->shutdown = TRUE;
2454     }
2455 
2456     object->num_running_callbacks--;
2457     if (object_is_finished( object, TRUE ))
2458         RtlWakeAllConditionVariable( &object->group_finished_event );
2459 
2460     if (instance.associated)
2461     {
2462         object->num_associated_callbacks--;
2463         if (object_is_finished( object, FALSE ))
2464             RtlWakeAllConditionVariable( &object->finished_event );
2465     }
2466 }
2467 
2468 /***********************************************************************
2469  *           threadpool_worker_proc    (internal)
2470  */
2471 #ifdef __REACTOS__
threadpool_worker_proc(PVOID param)2472 ULONG NTAPI threadpool_worker_proc(PVOID param )
2473 #else
2474 static void CALLBACK threadpool_worker_proc( void *param )
2475 #endif
2476 {
2477     struct threadpool *pool = param;
2478     LARGE_INTEGER timeout;
2479     struct list *ptr;
2480 
2481     TRACE( "starting worker thread for pool %p\n", pool );
2482     set_thread_name(L"wine_threadpool_worker");
2483 
2484     RtlEnterCriticalSection( &pool->cs );
2485     for (;;)
2486     {
2487         while ((ptr = threadpool_get_next_item( pool )))
2488         {
2489             struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2490             assert( object->num_pending_callbacks > 0 );
2491 
2492             /* If further pending callbacks are queued, move the work item to
2493              * the end of the pool list. Otherwise remove it from the pool. */
2494             list_remove( &object->pool_entry );
2495             if (object->num_pending_callbacks > 1)
2496                 tp_object_prio_queue( object );
2497 
2498             tp_object_execute( object, FALSE );
2499 
2500             assert(pool->num_busy_workers);
2501             pool->num_busy_workers--;
2502 
2503             tp_object_release( object );
2504         }
2505 
2506         /* Shutdown worker thread if requested. */
2507         if (pool->shutdown)
2508             break;
2509 
2510         /* Wait for new tasks or until the timeout expires. A thread only terminates
2511          * when no new tasks are available, and the number of threads can be
2512          * decreased without violating the min_workers limit. An exception is when
2513          * min_workers == 0, then objcount is used to detect if the last thread
2514          * can be terminated. */
2515         timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2516         if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2517             !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2518             (!pool->min_workers && !pool->objcount)))
2519         {
2520             break;
2521         }
2522     }
2523     pool->num_workers--;
2524     RtlLeaveCriticalSection( &pool->cs );
2525 
2526     TRACE( "terminating worker thread for pool %p\n", pool );
2527     tp_threadpool_release( pool );
2528     RtlExitUserThread( 0 );
2529 #ifdef __REACTOS__
2530     return STATUS_SUCCESS;
2531 #endif
2532 }
2533 
2534 /***********************************************************************
2535  *           TpAllocCleanupGroup    (NTDLL.@)
2536  */
TpAllocCleanupGroup(TP_CLEANUP_GROUP ** out)2537 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2538 {
2539     TRACE( "%p\n", out );
2540 
2541     return tp_group_alloc( (struct threadpool_group **)out );
2542 }
2543 
2544 /***********************************************************************
2545  *           TpAllocIoCompletion    (NTDLL.@)
2546  */
TpAllocIoCompletion(TP_IO ** out,HANDLE file,PTP_IO_CALLBACK callback,void * userdata,TP_CALLBACK_ENVIRON * environment)2547 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2548                                      void *userdata, TP_CALLBACK_ENVIRON *environment )
2549 {
2550     struct threadpool_object *object;
2551     struct threadpool *pool;
2552     NTSTATUS status;
2553 
2554     TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2555 
2556     if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2557         return STATUS_NO_MEMORY;
2558 
2559     if ((status = tp_threadpool_lock( &pool, environment )))
2560     {
2561         RtlFreeHeap( GetProcessHeap(), 0, object );
2562         return status;
2563     }
2564 
2565     object->type = TP_OBJECT_TYPE_IO;
2566     object->u.io.callback = callback;
2567     if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2568     {
2569         tp_threadpool_unlock( pool );
2570         RtlFreeHeap( GetProcessHeap(), 0, object );
2571         return status;
2572     }
2573 
2574     if ((status = tp_ioqueue_lock( object, file )))
2575     {
2576         tp_threadpool_unlock( pool );
2577         RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2578         RtlFreeHeap( GetProcessHeap(), 0, object );
2579         return status;
2580     }
2581 
2582     tp_object_initialize( object, pool, userdata, environment );
2583 
2584     *out = (TP_IO *)object;
2585     return STATUS_SUCCESS;
2586 }
2587 
2588 /***********************************************************************
2589  *           TpAllocPool    (NTDLL.@)
2590  */
TpAllocPool(TP_POOL ** out,PVOID reserved)2591 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2592 {
2593     TRACE( "%p %p\n", out, reserved );
2594 
2595     if (reserved)
2596         FIXME( "reserved argument is nonzero (%p)\n", reserved );
2597 
2598     return tp_threadpool_alloc( (struct threadpool **)out );
2599 }
2600 
2601 /***********************************************************************
2602  *           TpAllocTimer    (NTDLL.@)
2603  */
TpAllocTimer(TP_TIMER ** out,PTP_TIMER_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2604 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2605                               TP_CALLBACK_ENVIRON *environment )
2606 {
2607     struct threadpool_object *object;
2608     struct threadpool *pool;
2609     NTSTATUS status;
2610 
2611     TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2612 
2613     object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2614     if (!object)
2615         return STATUS_NO_MEMORY;
2616 
2617     status = tp_threadpool_lock( &pool, environment );
2618     if (status)
2619     {
2620         RtlFreeHeap( GetProcessHeap(), 0, object );
2621         return status;
2622     }
2623 
2624     object->type = TP_OBJECT_TYPE_TIMER;
2625     object->u.timer.callback = callback;
2626 
2627     status = tp_timerqueue_lock( object );
2628     if (status)
2629     {
2630         tp_threadpool_unlock( pool );
2631         RtlFreeHeap( GetProcessHeap(), 0, object );
2632         return status;
2633     }
2634 
2635     tp_object_initialize( object, pool, userdata, environment );
2636 
2637     *out = (TP_TIMER *)object;
2638     return STATUS_SUCCESS;
2639 }
2640 
tp_alloc_wait(TP_WAIT ** out,PTP_WAIT_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment,DWORD flags)2641 static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2642                                TP_CALLBACK_ENVIRON *environment, DWORD flags )
2643 {
2644     struct threadpool_object *object;
2645     struct threadpool *pool;
2646     NTSTATUS status;
2647 
2648     object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2649     if (!object)
2650         return STATUS_NO_MEMORY;
2651 
2652     status = tp_threadpool_lock( &pool, environment );
2653     if (status)
2654     {
2655         RtlFreeHeap( GetProcessHeap(), 0, object );
2656         return status;
2657     }
2658 
2659     object->type = TP_OBJECT_TYPE_WAIT;
2660     object->u.wait.callback = callback;
2661     object->u.wait.flags = flags;
2662 
2663     status = tp_waitqueue_lock( object );
2664     if (status)
2665     {
2666         tp_threadpool_unlock( pool );
2667         RtlFreeHeap( GetProcessHeap(), 0, object );
2668         return status;
2669     }
2670 
2671     tp_object_initialize( object, pool, userdata, environment );
2672 
2673     *out = (TP_WAIT *)object;
2674     return STATUS_SUCCESS;
2675 }
2676 
2677 /***********************************************************************
2678  *           TpAllocWait     (NTDLL.@)
2679  */
TpAllocWait(TP_WAIT ** out,PTP_WAIT_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2680 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2681                              TP_CALLBACK_ENVIRON *environment )
2682 {
2683     TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2684     return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2685 }
2686 
2687 /***********************************************************************
2688  *           TpAllocWork    (NTDLL.@)
2689  */
TpAllocWork(TP_WORK ** out,PTP_WORK_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2690 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2691                              TP_CALLBACK_ENVIRON *environment )
2692 {
2693     struct threadpool_object *object;
2694     struct threadpool *pool;
2695     NTSTATUS status;
2696 
2697     TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2698 
2699     object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2700     if (!object)
2701         return STATUS_NO_MEMORY;
2702 
2703     status = tp_threadpool_lock( &pool, environment );
2704     if (status)
2705     {
2706         RtlFreeHeap( GetProcessHeap(), 0, object );
2707         return status;
2708     }
2709 
2710     object->type = TP_OBJECT_TYPE_WORK;
2711     object->u.work.callback = callback;
2712     tp_object_initialize( object, pool, userdata, environment );
2713 
2714     *out = (TP_WORK *)object;
2715     return STATUS_SUCCESS;
2716 }
2717 
2718 /***********************************************************************
2719  *           TpCancelAsyncIoOperation    (NTDLL.@)
2720  */
TpCancelAsyncIoOperation(TP_IO * io)2721 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2722 {
2723     struct threadpool_object *this = impl_from_TP_IO( io );
2724 
2725     TRACE( "%p\n", io );
2726 
2727     RtlEnterCriticalSection( &this->pool->cs );
2728 
2729     TRACE("pending_count %u.\n", this->u.io.pending_count);
2730 
2731     this->u.io.pending_count--;
2732     if (object_is_finished( this, TRUE ))
2733         RtlWakeAllConditionVariable( &this->group_finished_event );
2734     if (object_is_finished( this, FALSE ))
2735         RtlWakeAllConditionVariable( &this->finished_event );
2736 
2737     RtlLeaveCriticalSection( &this->pool->cs );
2738 }
2739 
2740 /***********************************************************************
2741  *           TpCallbackLeaveCriticalSectionOnCompletion    (NTDLL.@)
2742  */
TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE * instance,CRITICAL_SECTION * crit)2743 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2744 {
2745     struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2746 
2747     TRACE( "%p %p\n", instance, crit );
2748 
2749     if (!this->cleanup.critical_section)
2750         this->cleanup.critical_section = crit;
2751 }
2752 
2753 /***********************************************************************
2754  *           TpCallbackMayRunLong    (NTDLL.@)
2755  */
TpCallbackMayRunLong(TP_CALLBACK_INSTANCE * instance)2756 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2757 {
2758     struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2759     struct threadpool_object *object = this->object;
2760     struct threadpool *pool;
2761     NTSTATUS status = STATUS_SUCCESS;
2762 
2763     TRACE( "%p\n", instance );
2764 
2765     if (this->threadid != GetCurrentThreadId())
2766     {
2767         ERR("called from wrong thread, ignoring\n");
2768         return STATUS_UNSUCCESSFUL; /* FIXME */
2769     }
2770 
2771     if (this->may_run_long)
2772         return STATUS_SUCCESS;
2773 
2774     pool = object->pool;
2775     RtlEnterCriticalSection( &pool->cs );
2776 
2777     /* Start new worker threads if required. */
2778     if (pool->num_busy_workers >= pool->num_workers)
2779     {
2780         if (pool->num_workers < pool->max_workers)
2781         {
2782             status = tp_new_worker_thread( pool );
2783         }
2784         else
2785         {
2786             status = STATUS_TOO_MANY_THREADS;
2787         }
2788     }
2789 
2790     RtlLeaveCriticalSection( &pool->cs );
2791     this->may_run_long = TRUE;
2792     return status;
2793 }
2794 
2795 /***********************************************************************
2796  *           TpCallbackReleaseMutexOnCompletion    (NTDLL.@)
2797  */
TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE mutex)2798 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2799 {
2800     struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2801 
2802     TRACE( "%p %p\n", instance, mutex );
2803 
2804     if (!this->cleanup.mutex)
2805         this->cleanup.mutex = mutex;
2806 }
2807 
2808 /***********************************************************************
2809  *           TpCallbackReleaseSemaphoreOnCompletion    (NTDLL.@)
2810  */
TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE semaphore,DWORD count)2811 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2812 {
2813     struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2814 
2815     TRACE( "%p %p %lu\n", instance, semaphore, count );
2816 
2817     if (!this->cleanup.semaphore)
2818     {
2819         this->cleanup.semaphore = semaphore;
2820         this->cleanup.semaphore_count = count;
2821     }
2822 }
2823 
2824 /***********************************************************************
2825  *           TpCallbackSetEventOnCompletion    (NTDLL.@)
2826  */
TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE event)2827 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2828 {
2829     struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2830 
2831     TRACE( "%p %p\n", instance, event );
2832 
2833     if (!this->cleanup.event)
2834         this->cleanup.event = event;
2835 }
2836 
2837 /***********************************************************************
2838  *           TpCallbackUnloadDllOnCompletion    (NTDLL.@)
2839  */
TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE * instance,HMODULE module)2840 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2841 {
2842     struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2843 
2844     TRACE( "%p %p\n", instance, module );
2845 
2846     if (!this->cleanup.library)
2847         this->cleanup.library = module;
2848 }
2849 
2850 /***********************************************************************
2851  *           TpDisassociateCallback    (NTDLL.@)
2852  */
TpDisassociateCallback(TP_CALLBACK_INSTANCE * instance)2853 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2854 {
2855     struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2856     struct threadpool_object *object = this->object;
2857     struct threadpool *pool;
2858 
2859     TRACE( "%p\n", instance );
2860 
2861     if (this->threadid != GetCurrentThreadId())
2862     {
2863         ERR("called from wrong thread, ignoring\n");
2864         return;
2865     }
2866 
2867     if (!this->associated)
2868         return;
2869 
2870     pool = object->pool;
2871     RtlEnterCriticalSection( &pool->cs );
2872 
2873     object->num_associated_callbacks--;
2874     if (object_is_finished( object, FALSE ))
2875         RtlWakeAllConditionVariable( &object->finished_event );
2876 
2877     RtlLeaveCriticalSection( &pool->cs );
2878     this->associated = FALSE;
2879 }
2880 
2881 /***********************************************************************
2882  *           TpIsTimerSet    (NTDLL.@)
2883  */
TpIsTimerSet(TP_TIMER * timer)2884 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2885 {
2886     struct threadpool_object *this = impl_from_TP_TIMER( timer );
2887 
2888     TRACE( "%p\n", timer );
2889 
2890     return this->u.timer.timer_set;
2891 }
2892 
2893 /***********************************************************************
2894  *           TpPostWork    (NTDLL.@)
2895  */
TpPostWork(TP_WORK * work)2896 VOID WINAPI TpPostWork( TP_WORK *work )
2897 {
2898     struct threadpool_object *this = impl_from_TP_WORK( work );
2899 
2900     TRACE( "%p\n", work );
2901 
2902     tp_object_submit( this, FALSE );
2903 }
2904 
2905 /***********************************************************************
2906  *           TpReleaseCleanupGroup    (NTDLL.@)
2907  */
TpReleaseCleanupGroup(TP_CLEANUP_GROUP * group)2908 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2909 {
2910     struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2911 
2912     TRACE( "%p\n", group );
2913 
2914     tp_group_shutdown( this );
2915     tp_group_release( this );
2916 }
2917 
2918 /***********************************************************************
2919  *           TpReleaseCleanupGroupMembers    (NTDLL.@)
2920  */
TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP * group,BOOL cancel_pending,PVOID userdata)2921 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2922 {
2923     struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2924     struct threadpool_object *object, *next;
2925     struct list members;
2926 
2927     TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2928 
2929     RtlEnterCriticalSection( &this->cs );
2930 
2931     /* Unset group, increase references, and mark objects for shutdown */
2932     LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2933     {
2934         assert( object->group == this );
2935         assert( object->is_group_member );
2936 
2937         if (InterlockedIncrement( &object->refcount ) == 1)
2938         {
2939             /* Object is basically already destroyed, but group reference
2940              * was not deleted yet. We can safely ignore this object. */
2941             InterlockedDecrement( &object->refcount );
2942             list_remove( &object->group_entry );
2943             object->is_group_member = FALSE;
2944             continue;
2945         }
2946 
2947         object->is_group_member = FALSE;
2948         tp_object_prepare_shutdown( object );
2949     }
2950 
2951     /* Move members to a new temporary list */
2952     list_init( &members );
2953     list_move_tail( &members, &this->members );
2954 
2955     RtlLeaveCriticalSection( &this->cs );
2956 
2957     /* Cancel pending callbacks if requested */
2958     if (cancel_pending)
2959     {
2960         LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2961         {
2962             tp_object_cancel( object );
2963         }
2964     }
2965 
2966     /* Wait for remaining callbacks to finish */
2967     LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2968     {
2969         tp_object_wait( object, TRUE );
2970 
2971         if (!object->shutdown)
2972         {
2973             /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2974             if (cancel_pending && object->group_cancel_callback)
2975             {
2976                 TRACE( "executing group cancel callback %p(%p, %p)\n",
2977                        object->group_cancel_callback, object->userdata, userdata );
2978                 object->group_cancel_callback( object->userdata, userdata );
2979                 TRACE( "callback %p returned\n", object->group_cancel_callback );
2980             }
2981 
2982             if (object->type != TP_OBJECT_TYPE_SIMPLE)
2983                 tp_object_release( object );
2984         }
2985 
2986         object->shutdown = TRUE;
2987         tp_object_release( object );
2988     }
2989 }
2990 
2991 /***********************************************************************
2992  *           TpReleaseIoCompletion    (NTDLL.@)
2993  */
TpReleaseIoCompletion(TP_IO * io)2994 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2995 {
2996     struct threadpool_object *this = impl_from_TP_IO( io );
2997     BOOL can_destroy;
2998 
2999     TRACE( "%p\n", io );
3000 
3001     RtlEnterCriticalSection( &this->pool->cs );
3002     this->u.io.shutting_down = TRUE;
3003     can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3004     RtlLeaveCriticalSection( &this->pool->cs );
3005 
3006     if (can_destroy)
3007     {
3008         tp_object_prepare_shutdown( this );
3009         this->shutdown = TRUE;
3010         tp_object_release( this );
3011     }
3012 }
3013 
3014 /***********************************************************************
3015  *           TpReleasePool    (NTDLL.@)
3016  */
TpReleasePool(TP_POOL * pool)3017 VOID WINAPI TpReleasePool( TP_POOL *pool )
3018 {
3019     struct threadpool *this = impl_from_TP_POOL( pool );
3020 
3021     TRACE( "%p\n", pool );
3022 
3023     tp_threadpool_shutdown( this );
3024     tp_threadpool_release( this );
3025 }
3026 
3027 /***********************************************************************
3028  *           TpReleaseTimer     (NTDLL.@)
3029  */
TpReleaseTimer(TP_TIMER * timer)3030 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
3031 {
3032     struct threadpool_object *this = impl_from_TP_TIMER( timer );
3033 
3034     TRACE( "%p\n", timer );
3035 
3036     tp_object_prepare_shutdown( this );
3037     this->shutdown = TRUE;
3038     tp_object_release( this );
3039 }
3040 
3041 /***********************************************************************
3042  *           TpReleaseWait    (NTDLL.@)
3043  */
TpReleaseWait(TP_WAIT * wait)3044 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
3045 {
3046     struct threadpool_object *this = impl_from_TP_WAIT( wait );
3047 
3048     TRACE( "%p\n", wait );
3049 
3050     tp_object_prepare_shutdown( this );
3051     this->shutdown = TRUE;
3052     tp_object_release( this );
3053 }
3054 
3055 /***********************************************************************
3056  *           TpReleaseWork    (NTDLL.@)
3057  */
TpReleaseWork(TP_WORK * work)3058 VOID WINAPI TpReleaseWork( TP_WORK *work )
3059 {
3060     struct threadpool_object *this = impl_from_TP_WORK( work );
3061 
3062     TRACE( "%p\n", work );
3063 
3064     tp_object_prepare_shutdown( this );
3065     this->shutdown = TRUE;
3066     tp_object_release( this );
3067 }
3068 
3069 /***********************************************************************
3070  *           TpSetPoolMaxThreads    (NTDLL.@)
3071  */
TpSetPoolMaxThreads(TP_POOL * pool,DWORD maximum)3072 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
3073 {
3074     struct threadpool *this = impl_from_TP_POOL( pool );
3075 
3076     TRACE( "%p %lu\n", pool, maximum );
3077 
3078     RtlEnterCriticalSection( &this->cs );
3079     this->max_workers = max( maximum, 1 );
3080     this->min_workers = min( this->min_workers, this->max_workers );
3081     RtlLeaveCriticalSection( &this->cs );
3082 }
3083 
3084 /***********************************************************************
3085  *           TpSetPoolMinThreads    (NTDLL.@)
3086  */
TpSetPoolMinThreads(TP_POOL * pool,DWORD minimum)3087 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
3088 {
3089     struct threadpool *this = impl_from_TP_POOL( pool );
3090     NTSTATUS status = STATUS_SUCCESS;
3091 
3092     TRACE( "%p %lu\n", pool, minimum );
3093 
3094     RtlEnterCriticalSection( &this->cs );
3095 
3096     while (this->num_workers < minimum)
3097     {
3098         status = tp_new_worker_thread( this );
3099         if (status != STATUS_SUCCESS)
3100             break;
3101     }
3102 
3103     if (status == STATUS_SUCCESS)
3104     {
3105         this->min_workers = minimum;
3106         this->max_workers = max( this->min_workers, this->max_workers );
3107     }
3108 
3109     RtlLeaveCriticalSection( &this->cs );
3110     return !status;
3111 }
3112 
3113 /***********************************************************************
3114  *           TpSetTimer    (NTDLL.@)
3115  */
TpSetTimer(TP_TIMER * timer,LARGE_INTEGER * timeout,LONG period,LONG window_length)3116 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3117 {
3118     struct threadpool_object *this = impl_from_TP_TIMER( timer );
3119     struct threadpool_object *other_timer;
3120     BOOL submit_timer = FALSE;
3121     ULONGLONG timestamp;
3122 
3123     TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3124 
3125     RtlEnterCriticalSection( &timerqueue.cs );
3126 
3127     assert( this->u.timer.timer_initialized );
3128     this->u.timer.timer_set = timeout != NULL;
3129 
3130     /* Convert relative timeout to absolute timestamp and handle a timeout
3131      * of zero, which means that the timer is submitted immediately. */
3132     if (timeout)
3133     {
3134         timestamp = timeout->QuadPart;
3135         if ((LONGLONG)timestamp < 0)
3136         {
3137             LARGE_INTEGER now;
3138             NtQuerySystemTime( &now );
3139             timestamp = now.QuadPart - timestamp;
3140         }
3141         else if (!timestamp)
3142         {
3143             if (!period)
3144                 timeout = NULL;
3145             else
3146             {
3147                 LARGE_INTEGER now;
3148                 NtQuerySystemTime( &now );
3149                 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3150             }
3151             submit_timer = TRUE;
3152         }
3153     }
3154 
3155     /* First remove existing timeout. */
3156     if (this->u.timer.timer_pending)
3157     {
3158         list_remove( &this->u.timer.timer_entry );
3159         this->u.timer.timer_pending = FALSE;
3160     }
3161 
3162     /* If the timer was enabled, then add it back to the queue. */
3163     if (timeout)
3164     {
3165         this->u.timer.timeout       = timestamp;
3166         this->u.timer.period        = period;
3167         this->u.timer.window_length = window_length;
3168 
3169         LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3170                              struct threadpool_object, u.timer.timer_entry )
3171         {
3172             assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3173             if (this->u.timer.timeout < other_timer->u.timer.timeout)
3174                 break;
3175         }
3176         list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3177 
3178         /* Wake up the timer thread when the timeout has to be updated. */
3179         if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3180             RtlWakeAllConditionVariable( &timerqueue.update_event );
3181 
3182         this->u.timer.timer_pending = TRUE;
3183     }
3184 
3185     RtlLeaveCriticalSection( &timerqueue.cs );
3186 
3187     if (submit_timer)
3188        tp_object_submit( this, FALSE );
3189 }
3190 
3191 /***********************************************************************
3192  *           TpSetWait    (NTDLL.@)
3193  */
TpSetWait(TP_WAIT * wait,HANDLE handle,LARGE_INTEGER * timeout)3194 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3195 {
3196     struct threadpool_object *this = impl_from_TP_WAIT( wait );
3197     ULONGLONG timestamp = MAXLONGLONG;
3198 
3199     TRACE( "%p %p %p\n", wait, handle, timeout );
3200 
3201     RtlEnterCriticalSection( &waitqueue.cs );
3202 
3203     assert( this->u.wait.bucket );
3204     this->u.wait.handle = handle;
3205 
3206     if (handle || this->u.wait.wait_pending)
3207     {
3208         struct waitqueue_bucket *bucket = this->u.wait.bucket;
3209         list_remove( &this->u.wait.wait_entry );
3210 
3211         /* Convert relative timeout to absolute timestamp. */
3212         if (handle && timeout)
3213         {
3214             timestamp = timeout->QuadPart;
3215             if ((LONGLONG)timestamp < 0)
3216             {
3217                 LARGE_INTEGER now;
3218                 NtQuerySystemTime( &now );
3219                 timestamp = now.QuadPart - timestamp;
3220             }
3221         }
3222 
3223         /* Add wait object back into one of the queues. */
3224         if (handle)
3225         {
3226             list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3227             this->u.wait.wait_pending = TRUE;
3228             this->u.wait.timeout = timestamp;
3229         }
3230         else
3231         {
3232             list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3233             this->u.wait.wait_pending = FALSE;
3234         }
3235 
3236         /* Wake up the wait queue thread. */
3237         NtSetEvent( bucket->update_event, NULL );
3238     }
3239 
3240     RtlLeaveCriticalSection( &waitqueue.cs );
3241 }
3242 
3243 /***********************************************************************
3244  *           TpSimpleTryPost    (NTDLL.@)
3245  */
TpSimpleTryPost(PTP_SIMPLE_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)3246 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3247                                  TP_CALLBACK_ENVIRON *environment )
3248 {
3249     struct threadpool_object *object;
3250     struct threadpool *pool;
3251     NTSTATUS status;
3252 
3253     TRACE( "%p %p %p\n", callback, userdata, environment );
3254 
3255     object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3256     if (!object)
3257         return STATUS_NO_MEMORY;
3258 
3259     status = tp_threadpool_lock( &pool, environment );
3260     if (status)
3261     {
3262         RtlFreeHeap( GetProcessHeap(), 0, object );
3263         return status;
3264     }
3265 
3266     object->type = TP_OBJECT_TYPE_SIMPLE;
3267     object->u.simple.callback = callback;
3268     tp_object_initialize( object, pool, userdata, environment );
3269 
3270     return STATUS_SUCCESS;
3271 }
3272 
3273 /***********************************************************************
3274  *           TpStartAsyncIoOperation    (NTDLL.@)
3275  */
TpStartAsyncIoOperation(TP_IO * io)3276 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3277 {
3278     struct threadpool_object *this = impl_from_TP_IO( io );
3279 
3280     TRACE( "%p\n", io );
3281 
3282     RtlEnterCriticalSection( &this->pool->cs );
3283 
3284     this->u.io.pending_count++;
3285 
3286     RtlLeaveCriticalSection( &this->pool->cs );
3287 }
3288 
3289 /***********************************************************************
3290  *           TpWaitForIoCompletion    (NTDLL.@)
3291  */
TpWaitForIoCompletion(TP_IO * io,BOOL cancel_pending)3292 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3293 {
3294     struct threadpool_object *this = impl_from_TP_IO( io );
3295 
3296     TRACE( "%p %d\n", io, cancel_pending );
3297 
3298     if (cancel_pending)
3299         tp_object_cancel( this );
3300     tp_object_wait( this, FALSE );
3301 }
3302 
3303 /***********************************************************************
3304  *           TpWaitForTimer    (NTDLL.@)
3305  */
TpWaitForTimer(TP_TIMER * timer,BOOL cancel_pending)3306 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3307 {
3308     struct threadpool_object *this = impl_from_TP_TIMER( timer );
3309 
3310     TRACE( "%p %d\n", timer, cancel_pending );
3311 
3312     if (cancel_pending)
3313         tp_object_cancel( this );
3314     tp_object_wait( this, FALSE );
3315 }
3316 
3317 /***********************************************************************
3318  *           TpWaitForWait    (NTDLL.@)
3319  */
TpWaitForWait(TP_WAIT * wait,BOOL cancel_pending)3320 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3321 {
3322     struct threadpool_object *this = impl_from_TP_WAIT( wait );
3323 
3324     TRACE( "%p %d\n", wait, cancel_pending );
3325 
3326     if (cancel_pending)
3327         tp_object_cancel( this );
3328     tp_object_wait( this, FALSE );
3329 }
3330 
3331 /***********************************************************************
3332  *           TpWaitForWork    (NTDLL.@)
3333  */
TpWaitForWork(TP_WORK * work,BOOL cancel_pending)3334 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3335 {
3336     struct threadpool_object *this = impl_from_TP_WORK( work );
3337 
3338     TRACE( "%p %u\n", work, cancel_pending );
3339 
3340     if (cancel_pending)
3341         tp_object_cancel( this );
3342     tp_object_wait( this, FALSE );
3343 }
3344 
3345 /***********************************************************************
3346  *           TpSetPoolStackInformation    (NTDLL.@)
3347  */
TpSetPoolStackInformation(TP_POOL * pool,TP_POOL_STACK_INFORMATION * stack_info)3348 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3349 {
3350     struct threadpool *this = impl_from_TP_POOL( pool );
3351 
3352     TRACE( "%p %p\n", pool, stack_info );
3353 
3354     if (!stack_info)
3355         return STATUS_INVALID_PARAMETER;
3356 
3357     RtlEnterCriticalSection( &this->cs );
3358     this->stack_info = *stack_info;
3359     RtlLeaveCriticalSection( &this->cs );
3360 
3361     return STATUS_SUCCESS;
3362 }
3363 
3364 /***********************************************************************
3365  *           TpQueryPoolStackInformation    (NTDLL.@)
3366  */
TpQueryPoolStackInformation(TP_POOL * pool,TP_POOL_STACK_INFORMATION * stack_info)3367 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3368 {
3369     struct threadpool *this = impl_from_TP_POOL( pool );
3370 
3371     TRACE( "%p %p\n", pool, stack_info );
3372 
3373     if (!stack_info)
3374         return STATUS_INVALID_PARAMETER;
3375 
3376     RtlEnterCriticalSection( &this->cs );
3377     *stack_info = this->stack_info;
3378     RtlLeaveCriticalSection( &this->cs );
3379 
3380     return STATUS_SUCCESS;
3381 }
3382 
rtl_wait_callback(TP_CALLBACK_INSTANCE * instance,void * userdata,TP_WAIT * wait,TP_WAIT_RESULT result)3383 static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3384 {
3385     struct threadpool_object *object = impl_from_TP_WAIT(wait);
3386     object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3387 }
3388 
3389 /***********************************************************************
3390  *              RtlRegisterWait   (NTDLL.@)
3391  *
3392  * Registers a wait for a handle to become signaled.
3393  *
3394  * PARAMS
3395  *  NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3396  *  Object   [I] Object to wait to become signaled.
3397  *  Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3398  *  Context  [I] Context to pass to the callback function when it is executed.
3399  *  Milliseconds [I] Number of milliseconds to wait before timing out.
3400  *  Flags    [I] Flags. See notes.
3401  *
3402  * RETURNS
3403  *  Success: STATUS_SUCCESS.
3404  *  Failure: Any NTSTATUS code.
3405  *
3406  * NOTES
3407  *  Flags can be one or more of the following:
3408  *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3409  *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3410  *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3411  *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3412  *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3413  */
RtlRegisterWait(HANDLE * out,HANDLE handle,RTL_WAITORTIMERCALLBACKFUNC callback,void * context,ULONG milliseconds,ULONG flags)3414 NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3415                                  void *context, ULONG milliseconds, ULONG flags )
3416 {
3417     struct threadpool_object *object;
3418     TP_CALLBACK_ENVIRON environment;
3419     LARGE_INTEGER timeout;
3420     NTSTATUS status;
3421     TP_WAIT *wait;
3422 
3423     TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3424             out, handle, callback, context, milliseconds, flags );
3425 
3426     memset( &environment, 0, sizeof(environment) );
3427     environment.Version = 1;
3428     environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3429     environment.u.s.Persistent   = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3430 
3431     flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3432     if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3433         return status;
3434 
3435     object = impl_from_TP_WAIT(wait);
3436     object->u.wait.rtl_callback = callback;
3437 
3438     RtlEnterCriticalSection( &waitqueue.cs );
3439     TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3440 
3441     *out = object;
3442     RtlLeaveCriticalSection( &waitqueue.cs );
3443 
3444     return STATUS_SUCCESS;
3445 }
3446 
3447 /***********************************************************************
3448  *              RtlDeregisterWaitEx   (NTDLL.@)
3449  *
3450  * Cancels a wait operation and frees the resources associated with calling
3451  * RtlRegisterWait().
3452  *
3453  * PARAMS
3454  *  WaitObject [I] Handle to the wait object to free.
3455  *
3456  * RETURNS
3457  *  Success: STATUS_SUCCESS.
3458  *  Failure: Any NTSTATUS code.
3459  */
RtlDeregisterWaitEx(HANDLE handle,HANDLE event)3460 NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3461 {
3462     struct threadpool_object *object = handle;
3463     NTSTATUS status;
3464 
3465     TRACE( "handle %p, event %p\n", handle, event );
3466 
3467     if (!object) return STATUS_INVALID_HANDLE;
3468 
3469     TpSetWait( (TP_WAIT *)object, NULL, NULL );
3470 
3471     if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3472     else
3473     {
3474         assert( object->completed_event == NULL );
3475         object->completed_event = event;
3476     }
3477 
3478     RtlEnterCriticalSection( &object->pool->cs );
3479     if (object->num_pending_callbacks + object->num_running_callbacks
3480         + object->num_associated_callbacks) status = STATUS_PENDING;
3481     else status = STATUS_SUCCESS;
3482     RtlLeaveCriticalSection( &object->pool->cs );
3483 
3484     TpReleaseWait( (TP_WAIT *)object );
3485     return status;
3486 }
3487 
3488 /***********************************************************************
3489  *              RtlDeregisterWait   (NTDLL.@)
3490  *
3491  * Cancels a wait operation and frees the resources associated with calling
3492  * RtlRegisterWait().
3493  *
3494  * PARAMS
3495  *  WaitObject [I] Handle to the wait object to free.
3496  *
3497  * RETURNS
3498  *  Success: STATUS_SUCCESS.
3499  *  Failure: Any NTSTATUS code.
3500  */
RtlDeregisterWait(HANDLE WaitHandle)3501 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3502 {
3503     return RtlDeregisterWaitEx(WaitHandle, NULL);
3504 }
3505 
3506 #ifdef __REACTOS__
3507 VOID
3508 NTAPI
RtlpInitializeThreadPooling(VOID)3509 RtlpInitializeThreadPooling(
3510     VOID)
3511 {
3512     RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs);
3513     RtlInitializeCriticalSection(&timerqueue.cs);
3514     RtlInitializeCriticalSection(&waitqueue.cs);
3515     RtlInitializeCriticalSection(&ioqueue.cs);
3516 }
3517 #endif
3518