1405ed2b4SDenis Malikov /*
2405ed2b4SDenis Malikov * Thread pooling
3405ed2b4SDenis Malikov *
4405ed2b4SDenis Malikov * Copyright (c) 2006 Robert Shearman
5405ed2b4SDenis Malikov * Copyright (c) 2014-2016 Sebastian Lackner
6405ed2b4SDenis Malikov *
7405ed2b4SDenis Malikov * This library is free software; you can redistribute it and/or
8405ed2b4SDenis Malikov * modify it under the terms of the GNU Lesser General Public
9405ed2b4SDenis Malikov * License as published by the Free Software Foundation; either
10405ed2b4SDenis Malikov * version 2.1 of the License, or (at your option) any later version.
11405ed2b4SDenis Malikov *
12405ed2b4SDenis Malikov * This library is distributed in the hope that it will be useful,
13405ed2b4SDenis Malikov * but WITHOUT ANY WARRANTY; without even the implied warranty of
14405ed2b4SDenis Malikov * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15405ed2b4SDenis Malikov * Lesser General Public License for more details.
16405ed2b4SDenis Malikov *
17405ed2b4SDenis Malikov * You should have received a copy of the GNU Lesser General Public
18405ed2b4SDenis Malikov * License along with this library; if not, write to the Free Software
19405ed2b4SDenis Malikov * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20405ed2b4SDenis Malikov */
21405ed2b4SDenis Malikov
22*0bf42067SJustin Miller
23*0bf42067SJustin Miller #ifdef __REACTOS__
24*0bf42067SJustin Miller #include <rtl_vista.h>
25*0bf42067SJustin Miller #define NDEBUG
26*0bf42067SJustin Miller #include "wine/list.h"
27*0bf42067SJustin Miller #include <debug.h>
28*0bf42067SJustin Miller
29*0bf42067SJustin Miller #define ERR(fmt, ...) DPRINT1(fmt, ##__VA_ARGS__)
30*0bf42067SJustin Miller #define FIXME(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
31*0bf42067SJustin Miller #define WARN(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
32*0bf42067SJustin Miller #define TRACE(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
33*0bf42067SJustin Miller #ifndef ARRAY_SIZE
34*0bf42067SJustin Miller #define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0]))
35*0bf42067SJustin Miller #endif
36*0bf42067SJustin Miller
37*0bf42067SJustin Miller typedef struct _THREAD_NAME_INFORMATION
38*0bf42067SJustin Miller {
39*0bf42067SJustin Miller UNICODE_STRING ThreadName;
40*0bf42067SJustin Miller } THREAD_NAME_INFORMATION, *PTHREAD_NAME_INFORMATION;
41*0bf42067SJustin Miller
42*0bf42067SJustin Miller typedef void (CALLBACK *PNTAPCFUNC)(ULONG_PTR,ULONG_PTR,ULONG_PTR);
43*0bf42067SJustin Miller typedef void (CALLBACK *PRTL_THREAD_START_ROUTINE)(LPVOID);
44*0bf42067SJustin Miller typedef DWORD (CALLBACK *PRTL_WORK_ITEM_ROUTINE)(LPVOID);
45*0bf42067SJustin Miller typedef void (NTAPI *RTL_WAITORTIMERCALLBACKFUNC)(PVOID,BOOLEAN);
46*0bf42067SJustin Miller typedef VOID (CALLBACK *PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD,DWORD,LPVOID);
47*0bf42067SJustin Miller
48*0bf42067SJustin Miller typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO);
49*0bf42067SJustin Miller NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
50*0bf42067SJustin Miller #define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC
51*0bf42067SJustin Miller
52*0bf42067SJustin Miller #define CRITICAL_SECTION RTL_CRITICAL_SECTION
53*0bf42067SJustin Miller #define GetProcessHeap() RtlGetProcessHeap()
54*0bf42067SJustin Miller #define GetCurrentProcess() NtCurrentProcess()
55*0bf42067SJustin Miller #define GetCurrentThread() NtCurrentThread()
56*0bf42067SJustin Miller #define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread)
57*0bf42067SJustin Miller #else
58405ed2b4SDenis Malikov #include <assert.h>
59405ed2b4SDenis Malikov #include <stdarg.h>
60405ed2b4SDenis Malikov #include <limits.h>
61405ed2b4SDenis Malikov
62405ed2b4SDenis Malikov #include "ntstatus.h"
63405ed2b4SDenis Malikov #define WIN32_NO_STATUS
64405ed2b4SDenis Malikov #include "winternl.h"
65405ed2b4SDenis Malikov
66405ed2b4SDenis Malikov #include "wine/debug.h"
67405ed2b4SDenis Malikov #include "wine/list.h"
68405ed2b4SDenis Malikov
69405ed2b4SDenis Malikov #include "ntdll_misc.h"
70405ed2b4SDenis Malikov
71405ed2b4SDenis Malikov WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
72*0bf42067SJustin Miller #endif
73405ed2b4SDenis Malikov
74405ed2b4SDenis Malikov /*
75405ed2b4SDenis Malikov * Old thread pooling API
76405ed2b4SDenis Malikov */
77405ed2b4SDenis Malikov
78405ed2b4SDenis Malikov struct rtl_work_item
79405ed2b4SDenis Malikov {
80405ed2b4SDenis Malikov PRTL_WORK_ITEM_ROUTINE function;
81405ed2b4SDenis Malikov PVOID context;
82405ed2b4SDenis Malikov };
83405ed2b4SDenis Malikov
84405ed2b4SDenis Malikov #define EXPIRE_NEVER (~(ULONGLONG)0)
85405ed2b4SDenis Malikov #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
86405ed2b4SDenis Malikov
87*0bf42067SJustin Miller #ifndef __REACTOS__
88405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
89*0bf42067SJustin Miller #endif
90405ed2b4SDenis Malikov
91405ed2b4SDenis Malikov static struct
92405ed2b4SDenis Malikov {
93405ed2b4SDenis Malikov HANDLE compl_port;
94405ed2b4SDenis Malikov RTL_CRITICAL_SECTION threadpool_compl_cs;
95405ed2b4SDenis Malikov }
96405ed2b4SDenis Malikov old_threadpool =
97405ed2b4SDenis Malikov {
98405ed2b4SDenis Malikov NULL, /* compl_port */
99*0bf42067SJustin Miller #ifdef __REACTOS__
100*0bf42067SJustin Miller {0}, /* threadpool_compl_cs */
101*0bf42067SJustin Miller #else
102405ed2b4SDenis Malikov { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
103*0bf42067SJustin Miller #endif
104405ed2b4SDenis Malikov };
105405ed2b4SDenis Malikov
106*0bf42067SJustin Miller #ifndef __REACTOS__
107405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
108405ed2b4SDenis Malikov {
109405ed2b4SDenis Malikov 0, 0, &old_threadpool.threadpool_compl_cs,
110405ed2b4SDenis Malikov { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
111405ed2b4SDenis Malikov 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
112405ed2b4SDenis Malikov };
113*0bf42067SJustin Miller #endif
114405ed2b4SDenis Malikov
115405ed2b4SDenis Malikov struct timer_queue;
116405ed2b4SDenis Malikov struct queue_timer
117405ed2b4SDenis Malikov {
118405ed2b4SDenis Malikov struct timer_queue *q;
119405ed2b4SDenis Malikov struct list entry;
120405ed2b4SDenis Malikov ULONG runcount; /* number of callbacks pending execution */
121405ed2b4SDenis Malikov RTL_WAITORTIMERCALLBACKFUNC callback;
122405ed2b4SDenis Malikov PVOID param;
123405ed2b4SDenis Malikov DWORD period;
124405ed2b4SDenis Malikov ULONG flags;
125405ed2b4SDenis Malikov ULONGLONG expire;
126405ed2b4SDenis Malikov BOOL destroy; /* timer should be deleted; once set, never unset */
127405ed2b4SDenis Malikov HANDLE event; /* removal event */
128405ed2b4SDenis Malikov };
129405ed2b4SDenis Malikov
130405ed2b4SDenis Malikov struct timer_queue
131405ed2b4SDenis Malikov {
132405ed2b4SDenis Malikov DWORD magic;
133405ed2b4SDenis Malikov RTL_CRITICAL_SECTION cs;
134405ed2b4SDenis Malikov struct list timers; /* sorted by expiration time */
135405ed2b4SDenis Malikov BOOL quit; /* queue should be deleted; once set, never unset */
136405ed2b4SDenis Malikov HANDLE event;
137405ed2b4SDenis Malikov HANDLE thread;
138405ed2b4SDenis Malikov };
139405ed2b4SDenis Malikov
140405ed2b4SDenis Malikov /*
141405ed2b4SDenis Malikov * Object-oriented thread pooling API
142405ed2b4SDenis Malikov */
143405ed2b4SDenis Malikov
144405ed2b4SDenis Malikov #define THREADPOOL_WORKER_TIMEOUT 5000
145405ed2b4SDenis Malikov #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
146405ed2b4SDenis Malikov
147405ed2b4SDenis Malikov /* internal threadpool representation */
148405ed2b4SDenis Malikov struct threadpool
149405ed2b4SDenis Malikov {
150405ed2b4SDenis Malikov LONG refcount;
151405ed2b4SDenis Malikov LONG objcount;
152405ed2b4SDenis Malikov BOOL shutdown;
153405ed2b4SDenis Malikov CRITICAL_SECTION cs;
154405ed2b4SDenis Malikov /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
155405ed2b4SDenis Malikov struct list pools[3];
156405ed2b4SDenis Malikov RTL_CONDITION_VARIABLE update_event;
157405ed2b4SDenis Malikov /* information about worker threads, locked via .cs */
158405ed2b4SDenis Malikov int max_workers;
159405ed2b4SDenis Malikov int min_workers;
160405ed2b4SDenis Malikov int num_workers;
161405ed2b4SDenis Malikov int num_busy_workers;
162405ed2b4SDenis Malikov HANDLE compl_port;
163405ed2b4SDenis Malikov TP_POOL_STACK_INFORMATION stack_info;
164405ed2b4SDenis Malikov };
165405ed2b4SDenis Malikov
166405ed2b4SDenis Malikov enum threadpool_objtype
167405ed2b4SDenis Malikov {
168405ed2b4SDenis Malikov TP_OBJECT_TYPE_SIMPLE,
169405ed2b4SDenis Malikov TP_OBJECT_TYPE_WORK,
170405ed2b4SDenis Malikov TP_OBJECT_TYPE_TIMER,
171405ed2b4SDenis Malikov TP_OBJECT_TYPE_WAIT,
172405ed2b4SDenis Malikov TP_OBJECT_TYPE_IO,
173405ed2b4SDenis Malikov };
174405ed2b4SDenis Malikov
175405ed2b4SDenis Malikov struct io_completion
176405ed2b4SDenis Malikov {
177405ed2b4SDenis Malikov IO_STATUS_BLOCK iosb;
178405ed2b4SDenis Malikov ULONG_PTR cvalue;
179405ed2b4SDenis Malikov };
180405ed2b4SDenis Malikov
181405ed2b4SDenis Malikov /* internal threadpool object representation */
182405ed2b4SDenis Malikov struct threadpool_object
183405ed2b4SDenis Malikov {
184405ed2b4SDenis Malikov void *win32_callback; /* leave space for kernelbase to store win32 callback */
185405ed2b4SDenis Malikov LONG refcount;
186405ed2b4SDenis Malikov BOOL shutdown;
187405ed2b4SDenis Malikov /* read-only information */
188405ed2b4SDenis Malikov enum threadpool_objtype type;
189405ed2b4SDenis Malikov struct threadpool *pool;
190405ed2b4SDenis Malikov struct threadpool_group *group;
191405ed2b4SDenis Malikov PVOID userdata;
192405ed2b4SDenis Malikov PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
193405ed2b4SDenis Malikov PTP_SIMPLE_CALLBACK finalization_callback;
194405ed2b4SDenis Malikov BOOL may_run_long;
195405ed2b4SDenis Malikov HMODULE race_dll;
196405ed2b4SDenis Malikov TP_CALLBACK_PRIORITY priority;
197405ed2b4SDenis Malikov /* information about the group, locked via .group->cs */
198405ed2b4SDenis Malikov struct list group_entry;
199405ed2b4SDenis Malikov BOOL is_group_member;
200405ed2b4SDenis Malikov /* information about the pool, locked via .pool->cs */
201405ed2b4SDenis Malikov struct list pool_entry;
202405ed2b4SDenis Malikov RTL_CONDITION_VARIABLE finished_event;
203405ed2b4SDenis Malikov RTL_CONDITION_VARIABLE group_finished_event;
204405ed2b4SDenis Malikov HANDLE completed_event;
205405ed2b4SDenis Malikov LONG num_pending_callbacks;
206405ed2b4SDenis Malikov LONG num_running_callbacks;
207405ed2b4SDenis Malikov LONG num_associated_callbacks;
208405ed2b4SDenis Malikov /* arguments for callback */
209405ed2b4SDenis Malikov union
210405ed2b4SDenis Malikov {
211405ed2b4SDenis Malikov struct
212405ed2b4SDenis Malikov {
213405ed2b4SDenis Malikov PTP_SIMPLE_CALLBACK callback;
214405ed2b4SDenis Malikov } simple;
215405ed2b4SDenis Malikov struct
216405ed2b4SDenis Malikov {
217405ed2b4SDenis Malikov PTP_WORK_CALLBACK callback;
218405ed2b4SDenis Malikov } work;
219405ed2b4SDenis Malikov struct
220405ed2b4SDenis Malikov {
221405ed2b4SDenis Malikov PTP_TIMER_CALLBACK callback;
222405ed2b4SDenis Malikov /* information about the timer, locked via timerqueue.cs */
223405ed2b4SDenis Malikov BOOL timer_initialized;
224405ed2b4SDenis Malikov BOOL timer_pending;
225405ed2b4SDenis Malikov struct list timer_entry;
226405ed2b4SDenis Malikov BOOL timer_set;
227405ed2b4SDenis Malikov ULONGLONG timeout;
228405ed2b4SDenis Malikov LONG period;
229405ed2b4SDenis Malikov LONG window_length;
230405ed2b4SDenis Malikov } timer;
231405ed2b4SDenis Malikov struct
232405ed2b4SDenis Malikov {
233405ed2b4SDenis Malikov PTP_WAIT_CALLBACK callback;
234405ed2b4SDenis Malikov LONG signaled;
235405ed2b4SDenis Malikov /* information about the wait object, locked via waitqueue.cs */
236405ed2b4SDenis Malikov struct waitqueue_bucket *bucket;
237405ed2b4SDenis Malikov BOOL wait_pending;
238405ed2b4SDenis Malikov struct list wait_entry;
239405ed2b4SDenis Malikov ULONGLONG timeout;
240405ed2b4SDenis Malikov HANDLE handle;
241405ed2b4SDenis Malikov DWORD flags;
242405ed2b4SDenis Malikov RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
243405ed2b4SDenis Malikov } wait;
244405ed2b4SDenis Malikov struct
245405ed2b4SDenis Malikov {
246405ed2b4SDenis Malikov PTP_IO_CALLBACK callback;
247405ed2b4SDenis Malikov /* locked via .pool->cs */
248405ed2b4SDenis Malikov unsigned int pending_count, skipped_count, completion_count, completion_max;
249405ed2b4SDenis Malikov BOOL shutting_down;
250405ed2b4SDenis Malikov struct io_completion *completions;
251405ed2b4SDenis Malikov } io;
252405ed2b4SDenis Malikov } u;
253405ed2b4SDenis Malikov };
254405ed2b4SDenis Malikov
255405ed2b4SDenis Malikov /* internal threadpool instance representation */
256405ed2b4SDenis Malikov struct threadpool_instance
257405ed2b4SDenis Malikov {
258405ed2b4SDenis Malikov struct threadpool_object *object;
259405ed2b4SDenis Malikov DWORD threadid;
260405ed2b4SDenis Malikov BOOL associated;
261405ed2b4SDenis Malikov BOOL may_run_long;
262405ed2b4SDenis Malikov struct
263405ed2b4SDenis Malikov {
264405ed2b4SDenis Malikov CRITICAL_SECTION *critical_section;
265405ed2b4SDenis Malikov HANDLE mutex;
266405ed2b4SDenis Malikov HANDLE semaphore;
267405ed2b4SDenis Malikov LONG semaphore_count;
268405ed2b4SDenis Malikov HANDLE event;
269405ed2b4SDenis Malikov HMODULE library;
270405ed2b4SDenis Malikov } cleanup;
271405ed2b4SDenis Malikov };
272405ed2b4SDenis Malikov
273405ed2b4SDenis Malikov /* internal threadpool group representation */
274405ed2b4SDenis Malikov struct threadpool_group
275405ed2b4SDenis Malikov {
276405ed2b4SDenis Malikov LONG refcount;
277405ed2b4SDenis Malikov BOOL shutdown;
278405ed2b4SDenis Malikov CRITICAL_SECTION cs;
279405ed2b4SDenis Malikov /* list of group members, locked via .cs */
280405ed2b4SDenis Malikov struct list members;
281405ed2b4SDenis Malikov };
282405ed2b4SDenis Malikov
283*0bf42067SJustin Miller #ifndef __REACTOS__
284405ed2b4SDenis Malikov /* global timerqueue object */
285405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
286*0bf42067SJustin Miller #endif
287405ed2b4SDenis Malikov
288405ed2b4SDenis Malikov static struct
289405ed2b4SDenis Malikov {
290405ed2b4SDenis Malikov CRITICAL_SECTION cs;
291405ed2b4SDenis Malikov LONG objcount;
292405ed2b4SDenis Malikov BOOL thread_running;
293405ed2b4SDenis Malikov struct list pending_timers;
294405ed2b4SDenis Malikov RTL_CONDITION_VARIABLE update_event;
295405ed2b4SDenis Malikov }
296405ed2b4SDenis Malikov timerqueue =
297405ed2b4SDenis Malikov {
298*0bf42067SJustin Miller #ifdef __REACTOS__
299*0bf42067SJustin Miller {0}, /* cs */
300*0bf42067SJustin Miller #else
301405ed2b4SDenis Malikov { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
302*0bf42067SJustin Miller #endif
303405ed2b4SDenis Malikov 0, /* objcount */
304405ed2b4SDenis Malikov FALSE, /* thread_running */
305405ed2b4SDenis Malikov LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
306*0bf42067SJustin Miller #if __REACTOS__
307*0bf42067SJustin Miller 0,
308*0bf42067SJustin Miller #else
309405ed2b4SDenis Malikov RTL_CONDITION_VARIABLE_INIT /* update_event */
310*0bf42067SJustin Miller #endif
311405ed2b4SDenis Malikov };
312405ed2b4SDenis Malikov
313*0bf42067SJustin Miller #ifndef __REACTOS__
314405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
315405ed2b4SDenis Malikov {
316405ed2b4SDenis Malikov 0, 0, &timerqueue.cs,
317405ed2b4SDenis Malikov { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
318405ed2b4SDenis Malikov 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
319405ed2b4SDenis Malikov };
320405ed2b4SDenis Malikov
321405ed2b4SDenis Malikov /* global waitqueue object */
322405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
323*0bf42067SJustin Miller #endif
324405ed2b4SDenis Malikov
325405ed2b4SDenis Malikov static struct
326405ed2b4SDenis Malikov {
327405ed2b4SDenis Malikov CRITICAL_SECTION cs;
328405ed2b4SDenis Malikov LONG num_buckets;
329405ed2b4SDenis Malikov struct list buckets;
330405ed2b4SDenis Malikov }
331405ed2b4SDenis Malikov waitqueue =
332405ed2b4SDenis Malikov {
333*0bf42067SJustin Miller #ifdef __REACTOS__
334*0bf42067SJustin Miller {0}, /* cs */
335*0bf42067SJustin Miller #else
336405ed2b4SDenis Malikov { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
337*0bf42067SJustin Miller #endif
338405ed2b4SDenis Malikov 0, /* num_buckets */
339405ed2b4SDenis Malikov LIST_INIT( waitqueue.buckets ) /* buckets */
340405ed2b4SDenis Malikov };
341405ed2b4SDenis Malikov
342*0bf42067SJustin Miller #ifndef __REACTOS__
343405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
344405ed2b4SDenis Malikov {
345405ed2b4SDenis Malikov 0, 0, &waitqueue.cs,
346405ed2b4SDenis Malikov { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
347405ed2b4SDenis Malikov 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
348405ed2b4SDenis Malikov };
349*0bf42067SJustin Miller #endif
350405ed2b4SDenis Malikov
351405ed2b4SDenis Malikov struct waitqueue_bucket
352405ed2b4SDenis Malikov {
353405ed2b4SDenis Malikov struct list bucket_entry;
354405ed2b4SDenis Malikov LONG objcount;
355405ed2b4SDenis Malikov struct list reserved;
356405ed2b4SDenis Malikov struct list waiting;
357405ed2b4SDenis Malikov HANDLE update_event;
358405ed2b4SDenis Malikov BOOL alertable;
359405ed2b4SDenis Malikov };
360405ed2b4SDenis Malikov
361*0bf42067SJustin Miller #ifndef __REACTOS__
362405ed2b4SDenis Malikov /* global I/O completion queue object */
363405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
364*0bf42067SJustin Miller #endif
365405ed2b4SDenis Malikov
366405ed2b4SDenis Malikov static struct
367405ed2b4SDenis Malikov {
368405ed2b4SDenis Malikov CRITICAL_SECTION cs;
369405ed2b4SDenis Malikov LONG objcount;
370405ed2b4SDenis Malikov BOOL thread_running;
371405ed2b4SDenis Malikov HANDLE port;
372405ed2b4SDenis Malikov RTL_CONDITION_VARIABLE update_event;
373405ed2b4SDenis Malikov }
374405ed2b4SDenis Malikov ioqueue =
375405ed2b4SDenis Malikov {
376*0bf42067SJustin Miller #ifdef __REACTOS__
377*0bf42067SJustin Miller .cs = {0},
378*0bf42067SJustin Miller #else
379405ed2b4SDenis Malikov .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
380*0bf42067SJustin Miller #endif
381405ed2b4SDenis Malikov };
382405ed2b4SDenis Malikov
383*0bf42067SJustin Miller #ifndef __REACTOS__
384405ed2b4SDenis Malikov static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
385405ed2b4SDenis Malikov {
386405ed2b4SDenis Malikov 0, 0, &ioqueue.cs,
387405ed2b4SDenis Malikov { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
388405ed2b4SDenis Malikov 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
389405ed2b4SDenis Malikov };
390*0bf42067SJustin Miller #endif
391405ed2b4SDenis Malikov
impl_from_TP_POOL(TP_POOL * pool)392405ed2b4SDenis Malikov static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
393405ed2b4SDenis Malikov {
394405ed2b4SDenis Malikov return (struct threadpool *)pool;
395405ed2b4SDenis Malikov }
396405ed2b4SDenis Malikov
impl_from_TP_WORK(TP_WORK * work)397405ed2b4SDenis Malikov static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
398405ed2b4SDenis Malikov {
399405ed2b4SDenis Malikov struct threadpool_object *object = (struct threadpool_object *)work;
400405ed2b4SDenis Malikov assert( object->type == TP_OBJECT_TYPE_WORK );
401405ed2b4SDenis Malikov return object;
402405ed2b4SDenis Malikov }
403405ed2b4SDenis Malikov
impl_from_TP_TIMER(TP_TIMER * timer)404405ed2b4SDenis Malikov static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
405405ed2b4SDenis Malikov {
406405ed2b4SDenis Malikov struct threadpool_object *object = (struct threadpool_object *)timer;
407405ed2b4SDenis Malikov assert( object->type == TP_OBJECT_TYPE_TIMER );
408405ed2b4SDenis Malikov return object;
409405ed2b4SDenis Malikov }
410405ed2b4SDenis Malikov
impl_from_TP_WAIT(TP_WAIT * wait)411405ed2b4SDenis Malikov static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
412405ed2b4SDenis Malikov {
413405ed2b4SDenis Malikov struct threadpool_object *object = (struct threadpool_object *)wait;
414405ed2b4SDenis Malikov assert( object->type == TP_OBJECT_TYPE_WAIT );
415405ed2b4SDenis Malikov return object;
416405ed2b4SDenis Malikov }
417405ed2b4SDenis Malikov
impl_from_TP_IO(TP_IO * io)418405ed2b4SDenis Malikov static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
419405ed2b4SDenis Malikov {
420405ed2b4SDenis Malikov struct threadpool_object *object = (struct threadpool_object *)io;
421405ed2b4SDenis Malikov assert( object->type == TP_OBJECT_TYPE_IO );
422405ed2b4SDenis Malikov return object;
423405ed2b4SDenis Malikov }
424405ed2b4SDenis Malikov
impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP * group)425405ed2b4SDenis Malikov static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
426405ed2b4SDenis Malikov {
427405ed2b4SDenis Malikov return (struct threadpool_group *)group;
428405ed2b4SDenis Malikov }
429405ed2b4SDenis Malikov
impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE * instance)430405ed2b4SDenis Malikov static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
431405ed2b4SDenis Malikov {
432405ed2b4SDenis Malikov return (struct threadpool_instance *)instance;
433405ed2b4SDenis Malikov }
434405ed2b4SDenis Malikov
435*0bf42067SJustin Miller #ifdef __REACTOS__
436*0bf42067SJustin Miller ULONG NTAPI threadpool_worker_proc(PVOID param );
437*0bf42067SJustin Miller #else
438405ed2b4SDenis Malikov static void CALLBACK threadpool_worker_proc( void *param );
439*0bf42067SJustin Miller #endif
440405ed2b4SDenis Malikov static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
441405ed2b4SDenis Malikov static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
442405ed2b4SDenis Malikov static void tp_object_prepare_shutdown( struct threadpool_object *object );
443405ed2b4SDenis Malikov static BOOL tp_object_release( struct threadpool_object *object );
444405ed2b4SDenis Malikov static struct threadpool *default_threadpool = NULL;
445405ed2b4SDenis Malikov
array_reserve(void ** elements,unsigned int * capacity,unsigned int count,unsigned int size)446405ed2b4SDenis Malikov static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
447405ed2b4SDenis Malikov {
448405ed2b4SDenis Malikov unsigned int new_capacity, max_capacity;
449405ed2b4SDenis Malikov void *new_elements;
450405ed2b4SDenis Malikov
451405ed2b4SDenis Malikov if (count <= *capacity)
452405ed2b4SDenis Malikov return TRUE;
453405ed2b4SDenis Malikov
454405ed2b4SDenis Malikov max_capacity = ~(SIZE_T)0 / size;
455405ed2b4SDenis Malikov if (count > max_capacity)
456405ed2b4SDenis Malikov return FALSE;
457405ed2b4SDenis Malikov
458405ed2b4SDenis Malikov new_capacity = max(4, *capacity);
459405ed2b4SDenis Malikov while (new_capacity < count && new_capacity <= max_capacity / 2)
460405ed2b4SDenis Malikov new_capacity *= 2;
461405ed2b4SDenis Malikov if (new_capacity < count)
462405ed2b4SDenis Malikov new_capacity = max_capacity;
463405ed2b4SDenis Malikov
464405ed2b4SDenis Malikov if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
465405ed2b4SDenis Malikov return FALSE;
466405ed2b4SDenis Malikov
467405ed2b4SDenis Malikov *elements = new_elements;
468405ed2b4SDenis Malikov *capacity = new_capacity;
469405ed2b4SDenis Malikov
470405ed2b4SDenis Malikov return TRUE;
471405ed2b4SDenis Malikov }
472405ed2b4SDenis Malikov
set_thread_name(const WCHAR * name)473405ed2b4SDenis Malikov static void set_thread_name(const WCHAR *name)
474405ed2b4SDenis Malikov {
475*0bf42067SJustin Miller #ifndef __REACTOS__ // This is impossible on non vista+
476405ed2b4SDenis Malikov THREAD_NAME_INFORMATION info;
477405ed2b4SDenis Malikov
478405ed2b4SDenis Malikov RtlInitUnicodeString(&info.ThreadName, name);
479405ed2b4SDenis Malikov NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
480*0bf42067SJustin Miller #endif
481405ed2b4SDenis Malikov }
482405ed2b4SDenis Malikov
483*0bf42067SJustin Miller #ifndef __REACTOS__
process_rtl_work_item(TP_CALLBACK_INSTANCE * instance,void * userdata)484405ed2b4SDenis Malikov static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
485405ed2b4SDenis Malikov {
486405ed2b4SDenis Malikov struct rtl_work_item *item = userdata;
487405ed2b4SDenis Malikov
488405ed2b4SDenis Malikov TRACE("executing %p(%p)\n", item->function, item->context);
489405ed2b4SDenis Malikov item->function( item->context );
490405ed2b4SDenis Malikov
491405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, item );
492405ed2b4SDenis Malikov }
493405ed2b4SDenis Malikov
494405ed2b4SDenis Malikov /***********************************************************************
495405ed2b4SDenis Malikov * RtlQueueWorkItem (NTDLL.@)
496405ed2b4SDenis Malikov *
497405ed2b4SDenis Malikov * Queues a work item into a thread in the thread pool.
498405ed2b4SDenis Malikov *
499405ed2b4SDenis Malikov * PARAMS
500405ed2b4SDenis Malikov * function [I] Work function to execute.
501405ed2b4SDenis Malikov * context [I] Context to pass to the work function when it is executed.
502405ed2b4SDenis Malikov * flags [I] Flags. See notes.
503405ed2b4SDenis Malikov *
504405ed2b4SDenis Malikov * RETURNS
505405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
506405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
507405ed2b4SDenis Malikov *
508405ed2b4SDenis Malikov * NOTES
509405ed2b4SDenis Malikov * Flags can be one or more of the following:
510405ed2b4SDenis Malikov *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
511405ed2b4SDenis Malikov *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
512405ed2b4SDenis Malikov *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
513405ed2b4SDenis Malikov *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
514405ed2b4SDenis Malikov *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
515405ed2b4SDenis Malikov */
RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE function,PVOID context,ULONG flags)516405ed2b4SDenis Malikov NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
517405ed2b4SDenis Malikov {
518405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON environment;
519405ed2b4SDenis Malikov struct rtl_work_item *item;
520405ed2b4SDenis Malikov NTSTATUS status;
521405ed2b4SDenis Malikov
522405ed2b4SDenis Malikov TRACE( "%p %p %lu\n", function, context, flags );
523405ed2b4SDenis Malikov
524405ed2b4SDenis Malikov item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
525405ed2b4SDenis Malikov if (!item)
526405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
527405ed2b4SDenis Malikov
528405ed2b4SDenis Malikov memset( &environment, 0, sizeof(environment) );
529405ed2b4SDenis Malikov environment.Version = 1;
530405ed2b4SDenis Malikov environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
531405ed2b4SDenis Malikov environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
532405ed2b4SDenis Malikov
533405ed2b4SDenis Malikov item->function = function;
534405ed2b4SDenis Malikov item->context = context;
535405ed2b4SDenis Malikov
536405ed2b4SDenis Malikov status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
537405ed2b4SDenis Malikov if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
538405ed2b4SDenis Malikov return status;
539405ed2b4SDenis Malikov }
540405ed2b4SDenis Malikov
541405ed2b4SDenis Malikov /***********************************************************************
542405ed2b4SDenis Malikov * iocp_poller - get completion events and run callbacks
543405ed2b4SDenis Malikov */
iocp_poller(LPVOID Arg)544405ed2b4SDenis Malikov static DWORD CALLBACK iocp_poller(LPVOID Arg)
545405ed2b4SDenis Malikov {
546405ed2b4SDenis Malikov HANDLE cport = Arg;
547405ed2b4SDenis Malikov
548405ed2b4SDenis Malikov while( TRUE )
549405ed2b4SDenis Malikov {
550405ed2b4SDenis Malikov PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
551405ed2b4SDenis Malikov LPVOID overlapped;
552405ed2b4SDenis Malikov IO_STATUS_BLOCK iosb;
553*0bf42067SJustin Miller #ifdef __REACTOS__
554*0bf42067SJustin Miller NTSTATUS res = NtRemoveIoCompletion( cport, (PVOID)&callback, (PVOID)&overlapped, &iosb, NULL );
555*0bf42067SJustin Miller #else
556405ed2b4SDenis Malikov NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
557*0bf42067SJustin Miller #endif
558405ed2b4SDenis Malikov if (res)
559405ed2b4SDenis Malikov {
560405ed2b4SDenis Malikov ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
561405ed2b4SDenis Malikov }
562405ed2b4SDenis Malikov else
563405ed2b4SDenis Malikov {
564405ed2b4SDenis Malikov DWORD transferred = 0;
565405ed2b4SDenis Malikov DWORD err = 0;
566405ed2b4SDenis Malikov
567405ed2b4SDenis Malikov if (iosb.Status == STATUS_SUCCESS)
568405ed2b4SDenis Malikov transferred = iosb.Information;
569405ed2b4SDenis Malikov else
570405ed2b4SDenis Malikov err = RtlNtStatusToDosError(iosb.Status);
571405ed2b4SDenis Malikov
572405ed2b4SDenis Malikov callback( err, transferred, overlapped );
573405ed2b4SDenis Malikov }
574405ed2b4SDenis Malikov }
575405ed2b4SDenis Malikov return 0;
576405ed2b4SDenis Malikov }
577405ed2b4SDenis Malikov
578405ed2b4SDenis Malikov /***********************************************************************
579405ed2b4SDenis Malikov * RtlSetIoCompletionCallback (NTDLL.@)
580405ed2b4SDenis Malikov *
581405ed2b4SDenis Malikov * Binds a handle to a thread pool's completion port, and possibly
582405ed2b4SDenis Malikov * starts a non-I/O thread to monitor this port and call functions back.
583405ed2b4SDenis Malikov *
584405ed2b4SDenis Malikov * PARAMS
585405ed2b4SDenis Malikov * FileHandle [I] Handle to bind to a completion port.
586405ed2b4SDenis Malikov * Function [I] Callback function to call on I/O completions.
587405ed2b4SDenis Malikov * Flags [I] Not used.
588405ed2b4SDenis Malikov *
589405ed2b4SDenis Malikov * RETURNS
590405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
591405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
592405ed2b4SDenis Malikov *
593405ed2b4SDenis Malikov */
RtlSetIoCompletionCallback(HANDLE FileHandle,PRTL_OVERLAPPED_COMPLETION_ROUTINE Function,ULONG Flags)594405ed2b4SDenis Malikov NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
595405ed2b4SDenis Malikov {
596405ed2b4SDenis Malikov IO_STATUS_BLOCK iosb;
597405ed2b4SDenis Malikov FILE_COMPLETION_INFORMATION info;
598405ed2b4SDenis Malikov
599405ed2b4SDenis Malikov if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
600405ed2b4SDenis Malikov
601405ed2b4SDenis Malikov if (!old_threadpool.compl_port)
602405ed2b4SDenis Malikov {
603405ed2b4SDenis Malikov NTSTATUS res = STATUS_SUCCESS;
604405ed2b4SDenis Malikov
605405ed2b4SDenis Malikov RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
606405ed2b4SDenis Malikov if (!old_threadpool.compl_port)
607405ed2b4SDenis Malikov {
608405ed2b4SDenis Malikov HANDLE cport;
609405ed2b4SDenis Malikov
610405ed2b4SDenis Malikov res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
611405ed2b4SDenis Malikov if (!res)
612405ed2b4SDenis Malikov {
613405ed2b4SDenis Malikov /* FIXME native can start additional threads in case of e.g. hung callback function. */
614405ed2b4SDenis Malikov res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
615405ed2b4SDenis Malikov if (!res)
616405ed2b4SDenis Malikov old_threadpool.compl_port = cport;
617405ed2b4SDenis Malikov else
618405ed2b4SDenis Malikov NtClose( cport );
619405ed2b4SDenis Malikov }
620405ed2b4SDenis Malikov }
621405ed2b4SDenis Malikov RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
622405ed2b4SDenis Malikov if (res) return res;
623405ed2b4SDenis Malikov }
624405ed2b4SDenis Malikov
625405ed2b4SDenis Malikov info.CompletionPort = old_threadpool.compl_port;
626405ed2b4SDenis Malikov info.CompletionKey = (ULONG_PTR)Function;
627405ed2b4SDenis Malikov
628405ed2b4SDenis Malikov return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
629405ed2b4SDenis Malikov }
630*0bf42067SJustin Miller #endif
631405ed2b4SDenis Malikov
get_nt_timeout(PLARGE_INTEGER pTime,ULONG timeout)632405ed2b4SDenis Malikov static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
633405ed2b4SDenis Malikov {
634405ed2b4SDenis Malikov if (timeout == INFINITE) return NULL;
635405ed2b4SDenis Malikov pTime->QuadPart = (ULONGLONG)timeout * -10000;
636405ed2b4SDenis Malikov return pTime;
637405ed2b4SDenis Malikov }
638405ed2b4SDenis Malikov
639405ed2b4SDenis Malikov /************************** Timer Queue Impl **************************/
640405ed2b4SDenis Malikov
queue_remove_timer(struct queue_timer * t)641405ed2b4SDenis Malikov static void queue_remove_timer(struct queue_timer *t)
642405ed2b4SDenis Malikov {
643405ed2b4SDenis Malikov /* We MUST hold the queue cs while calling this function. This ensures
644405ed2b4SDenis Malikov that we cannot queue another callback for this timer. The runcount
645405ed2b4SDenis Malikov being zero makes sure we don't have any already queued. */
646405ed2b4SDenis Malikov struct timer_queue *q = t->q;
647405ed2b4SDenis Malikov
648405ed2b4SDenis Malikov assert(t->runcount == 0);
649405ed2b4SDenis Malikov assert(t->destroy);
650405ed2b4SDenis Malikov
651405ed2b4SDenis Malikov list_remove(&t->entry);
652405ed2b4SDenis Malikov if (t->event)
653405ed2b4SDenis Malikov NtSetEvent(t->event, NULL);
654405ed2b4SDenis Malikov RtlFreeHeap(GetProcessHeap(), 0, t);
655405ed2b4SDenis Malikov
656405ed2b4SDenis Malikov if (q->quit && list_empty(&q->timers))
657405ed2b4SDenis Malikov NtSetEvent(q->event, NULL);
658405ed2b4SDenis Malikov }
659405ed2b4SDenis Malikov
timer_cleanup_callback(struct queue_timer * t)660405ed2b4SDenis Malikov static void timer_cleanup_callback(struct queue_timer *t)
661405ed2b4SDenis Malikov {
662405ed2b4SDenis Malikov struct timer_queue *q = t->q;
663405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
664405ed2b4SDenis Malikov
665405ed2b4SDenis Malikov assert(0 < t->runcount);
666405ed2b4SDenis Malikov --t->runcount;
667405ed2b4SDenis Malikov
668405ed2b4SDenis Malikov if (t->destroy && t->runcount == 0)
669405ed2b4SDenis Malikov queue_remove_timer(t);
670405ed2b4SDenis Malikov
671405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
672405ed2b4SDenis Malikov }
673405ed2b4SDenis Malikov
timer_callback_wrapper(LPVOID p)674405ed2b4SDenis Malikov static DWORD WINAPI timer_callback_wrapper(LPVOID p)
675405ed2b4SDenis Malikov {
676405ed2b4SDenis Malikov struct queue_timer *t = p;
677405ed2b4SDenis Malikov t->callback(t->param, TRUE);
678405ed2b4SDenis Malikov timer_cleanup_callback(t);
679405ed2b4SDenis Malikov return 0;
680405ed2b4SDenis Malikov }
681405ed2b4SDenis Malikov
queue_current_time(void)682405ed2b4SDenis Malikov static inline ULONGLONG queue_current_time(void)
683405ed2b4SDenis Malikov {
684405ed2b4SDenis Malikov LARGE_INTEGER now, freq;
685405ed2b4SDenis Malikov NtQueryPerformanceCounter(&now, &freq);
686405ed2b4SDenis Malikov return now.QuadPart * 1000 / freq.QuadPart;
687405ed2b4SDenis Malikov }
688405ed2b4SDenis Malikov
queue_add_timer(struct queue_timer * t,ULONGLONG time,BOOL set_event)689405ed2b4SDenis Malikov static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
690405ed2b4SDenis Malikov BOOL set_event)
691405ed2b4SDenis Malikov {
692405ed2b4SDenis Malikov /* We MUST hold the queue cs while calling this function. */
693405ed2b4SDenis Malikov struct timer_queue *q = t->q;
694405ed2b4SDenis Malikov struct list *ptr = &q->timers;
695405ed2b4SDenis Malikov
696405ed2b4SDenis Malikov assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
697405ed2b4SDenis Malikov
698405ed2b4SDenis Malikov if (time != EXPIRE_NEVER)
699405ed2b4SDenis Malikov LIST_FOR_EACH(ptr, &q->timers)
700405ed2b4SDenis Malikov {
701405ed2b4SDenis Malikov struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
702405ed2b4SDenis Malikov if (time < cur->expire)
703405ed2b4SDenis Malikov break;
704405ed2b4SDenis Malikov }
705405ed2b4SDenis Malikov list_add_before(ptr, &t->entry);
706405ed2b4SDenis Malikov
707405ed2b4SDenis Malikov t->expire = time;
708405ed2b4SDenis Malikov
709405ed2b4SDenis Malikov /* If we insert at the head of the list, we need to expire sooner
710405ed2b4SDenis Malikov than expected. */
711405ed2b4SDenis Malikov if (set_event && &t->entry == list_head(&q->timers))
712405ed2b4SDenis Malikov NtSetEvent(q->event, NULL);
713405ed2b4SDenis Malikov }
714405ed2b4SDenis Malikov
queue_move_timer(struct queue_timer * t,ULONGLONG time,BOOL set_event)715405ed2b4SDenis Malikov static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
716405ed2b4SDenis Malikov BOOL set_event)
717405ed2b4SDenis Malikov {
718405ed2b4SDenis Malikov /* We MUST hold the queue cs while calling this function. */
719405ed2b4SDenis Malikov list_remove(&t->entry);
720405ed2b4SDenis Malikov queue_add_timer(t, time, set_event);
721405ed2b4SDenis Malikov }
722405ed2b4SDenis Malikov
queue_timer_expire(struct timer_queue * q)723405ed2b4SDenis Malikov static void queue_timer_expire(struct timer_queue *q)
724405ed2b4SDenis Malikov {
725405ed2b4SDenis Malikov struct queue_timer *t = NULL;
726405ed2b4SDenis Malikov
727405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
728405ed2b4SDenis Malikov if (list_head(&q->timers))
729405ed2b4SDenis Malikov {
730405ed2b4SDenis Malikov ULONGLONG now, next;
731405ed2b4SDenis Malikov t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
732405ed2b4SDenis Malikov if (!t->destroy && t->expire <= ((now = queue_current_time())))
733405ed2b4SDenis Malikov {
734405ed2b4SDenis Malikov ++t->runcount;
735405ed2b4SDenis Malikov if (t->period)
736405ed2b4SDenis Malikov {
737405ed2b4SDenis Malikov next = t->expire + t->period;
738405ed2b4SDenis Malikov /* avoid trigger cascade if overloaded / hibernated */
739405ed2b4SDenis Malikov if (next < now)
740405ed2b4SDenis Malikov next = now + t->period;
741405ed2b4SDenis Malikov }
742405ed2b4SDenis Malikov else
743405ed2b4SDenis Malikov next = EXPIRE_NEVER;
744405ed2b4SDenis Malikov queue_move_timer(t, next, FALSE);
745405ed2b4SDenis Malikov }
746405ed2b4SDenis Malikov else
747405ed2b4SDenis Malikov t = NULL;
748405ed2b4SDenis Malikov }
749405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
750405ed2b4SDenis Malikov
751405ed2b4SDenis Malikov if (t)
752405ed2b4SDenis Malikov {
753405ed2b4SDenis Malikov if (t->flags & WT_EXECUTEINTIMERTHREAD)
754405ed2b4SDenis Malikov timer_callback_wrapper(t);
755405ed2b4SDenis Malikov else
756405ed2b4SDenis Malikov {
757405ed2b4SDenis Malikov ULONG flags
758405ed2b4SDenis Malikov = (t->flags
759405ed2b4SDenis Malikov & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
760405ed2b4SDenis Malikov | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
761405ed2b4SDenis Malikov NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
762405ed2b4SDenis Malikov if (status != STATUS_SUCCESS)
763405ed2b4SDenis Malikov timer_cleanup_callback(t);
764405ed2b4SDenis Malikov }
765405ed2b4SDenis Malikov }
766405ed2b4SDenis Malikov }
767405ed2b4SDenis Malikov
queue_get_timeout(struct timer_queue * q)768405ed2b4SDenis Malikov static ULONG queue_get_timeout(struct timer_queue *q)
769405ed2b4SDenis Malikov {
770405ed2b4SDenis Malikov struct queue_timer *t;
771405ed2b4SDenis Malikov ULONG timeout = INFINITE;
772405ed2b4SDenis Malikov
773405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
774405ed2b4SDenis Malikov if (list_head(&q->timers))
775405ed2b4SDenis Malikov {
776405ed2b4SDenis Malikov t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
777405ed2b4SDenis Malikov assert(!t->destroy || t->expire == EXPIRE_NEVER);
778405ed2b4SDenis Malikov
779405ed2b4SDenis Malikov if (t->expire != EXPIRE_NEVER)
780405ed2b4SDenis Malikov {
781405ed2b4SDenis Malikov ULONGLONG time = queue_current_time();
782405ed2b4SDenis Malikov timeout = t->expire < time ? 0 : t->expire - time;
783405ed2b4SDenis Malikov }
784405ed2b4SDenis Malikov }
785405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
786405ed2b4SDenis Malikov
787405ed2b4SDenis Malikov return timeout;
788405ed2b4SDenis Malikov }
789405ed2b4SDenis Malikov
790*0bf42067SJustin Miller #ifdef __REACTOS__
timer_queue_thread_proc(PVOID p)791*0bf42067SJustin Miller ULONG NTAPI timer_queue_thread_proc(PVOID p)
792*0bf42067SJustin Miller #else
793405ed2b4SDenis Malikov static void WINAPI timer_queue_thread_proc(LPVOID p)
794*0bf42067SJustin Miller #endif
795405ed2b4SDenis Malikov {
796405ed2b4SDenis Malikov struct timer_queue *q = p;
797405ed2b4SDenis Malikov ULONG timeout_ms;
798405ed2b4SDenis Malikov
799405ed2b4SDenis Malikov set_thread_name(L"wine_threadpool_timer_queue");
800405ed2b4SDenis Malikov timeout_ms = INFINITE;
801405ed2b4SDenis Malikov for (;;)
802405ed2b4SDenis Malikov {
803405ed2b4SDenis Malikov LARGE_INTEGER timeout;
804405ed2b4SDenis Malikov NTSTATUS status;
805405ed2b4SDenis Malikov BOOL done = FALSE;
806405ed2b4SDenis Malikov
807405ed2b4SDenis Malikov status = NtWaitForSingleObject(
808405ed2b4SDenis Malikov q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
809405ed2b4SDenis Malikov
810405ed2b4SDenis Malikov if (status == STATUS_WAIT_0)
811405ed2b4SDenis Malikov {
812405ed2b4SDenis Malikov /* There are two possible ways to trigger the event. Either
813405ed2b4SDenis Malikov we are quitting and the last timer got removed, or a new
814405ed2b4SDenis Malikov timer got put at the head of the list so we need to adjust
815405ed2b4SDenis Malikov our timeout. */
816405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
817405ed2b4SDenis Malikov if (q->quit && list_empty(&q->timers))
818405ed2b4SDenis Malikov done = TRUE;
819405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
820405ed2b4SDenis Malikov }
821405ed2b4SDenis Malikov else if (status == STATUS_TIMEOUT)
822405ed2b4SDenis Malikov queue_timer_expire(q);
823405ed2b4SDenis Malikov
824405ed2b4SDenis Malikov if (done)
825405ed2b4SDenis Malikov break;
826405ed2b4SDenis Malikov
827405ed2b4SDenis Malikov timeout_ms = queue_get_timeout(q);
828405ed2b4SDenis Malikov }
829405ed2b4SDenis Malikov
830405ed2b4SDenis Malikov NtClose(q->event);
831405ed2b4SDenis Malikov RtlDeleteCriticalSection(&q->cs);
832405ed2b4SDenis Malikov q->magic = 0;
833405ed2b4SDenis Malikov RtlFreeHeap(GetProcessHeap(), 0, q);
834405ed2b4SDenis Malikov RtlExitUserThread( 0 );
835*0bf42067SJustin Miller #ifdef __REACTOS__
836*0bf42067SJustin Miller return STATUS_SUCCESS;
837*0bf42067SJustin Miller #endif
838405ed2b4SDenis Malikov }
839405ed2b4SDenis Malikov
queue_destroy_timer(struct queue_timer * t)840405ed2b4SDenis Malikov static void queue_destroy_timer(struct queue_timer *t)
841405ed2b4SDenis Malikov {
842405ed2b4SDenis Malikov /* We MUST hold the queue cs while calling this function. */
843405ed2b4SDenis Malikov t->destroy = TRUE;
844405ed2b4SDenis Malikov if (t->runcount == 0)
845405ed2b4SDenis Malikov /* Ensure a timer is promptly removed. If callbacks are pending,
846405ed2b4SDenis Malikov it will be removed after the last one finishes by the callback
847405ed2b4SDenis Malikov cleanup wrapper. */
848405ed2b4SDenis Malikov queue_remove_timer(t);
849405ed2b4SDenis Malikov else
850405ed2b4SDenis Malikov /* Make sure no destroyed timer masks an active timer at the head
851405ed2b4SDenis Malikov of the sorted list. */
852405ed2b4SDenis Malikov queue_move_timer(t, EXPIRE_NEVER, FALSE);
853405ed2b4SDenis Malikov }
854405ed2b4SDenis Malikov
855405ed2b4SDenis Malikov /***********************************************************************
856405ed2b4SDenis Malikov * RtlCreateTimerQueue (NTDLL.@)
857405ed2b4SDenis Malikov *
858405ed2b4SDenis Malikov * Creates a timer queue object and returns a handle to it.
859405ed2b4SDenis Malikov *
860405ed2b4SDenis Malikov * PARAMS
861405ed2b4SDenis Malikov * NewTimerQueue [O] The newly created queue.
862405ed2b4SDenis Malikov *
863405ed2b4SDenis Malikov * RETURNS
864405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
865405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
866405ed2b4SDenis Malikov */
RtlCreateTimerQueue(PHANDLE NewTimerQueue)867405ed2b4SDenis Malikov NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
868405ed2b4SDenis Malikov {
869405ed2b4SDenis Malikov NTSTATUS status;
870405ed2b4SDenis Malikov struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
871405ed2b4SDenis Malikov if (!q)
872405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
873405ed2b4SDenis Malikov
874405ed2b4SDenis Malikov RtlInitializeCriticalSection(&q->cs);
875405ed2b4SDenis Malikov list_init(&q->timers);
876405ed2b4SDenis Malikov q->quit = FALSE;
877405ed2b4SDenis Malikov q->magic = TIMER_QUEUE_MAGIC;
878405ed2b4SDenis Malikov status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
879405ed2b4SDenis Malikov if (status != STATUS_SUCCESS)
880405ed2b4SDenis Malikov {
881405ed2b4SDenis Malikov RtlFreeHeap(GetProcessHeap(), 0, q);
882405ed2b4SDenis Malikov return status;
883405ed2b4SDenis Malikov }
884405ed2b4SDenis Malikov status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
885405ed2b4SDenis Malikov timer_queue_thread_proc, q, &q->thread, NULL);
886405ed2b4SDenis Malikov if (status != STATUS_SUCCESS)
887405ed2b4SDenis Malikov {
888405ed2b4SDenis Malikov NtClose(q->event);
889405ed2b4SDenis Malikov RtlFreeHeap(GetProcessHeap(), 0, q);
890405ed2b4SDenis Malikov return status;
891405ed2b4SDenis Malikov }
892405ed2b4SDenis Malikov
893405ed2b4SDenis Malikov *NewTimerQueue = q;
894405ed2b4SDenis Malikov return STATUS_SUCCESS;
895405ed2b4SDenis Malikov }
896405ed2b4SDenis Malikov
897405ed2b4SDenis Malikov /***********************************************************************
898405ed2b4SDenis Malikov * RtlDeleteTimerQueueEx (NTDLL.@)
899405ed2b4SDenis Malikov *
900405ed2b4SDenis Malikov * Deletes a timer queue object.
901405ed2b4SDenis Malikov *
902405ed2b4SDenis Malikov * PARAMS
903405ed2b4SDenis Malikov * TimerQueue [I] The timer queue to destroy.
904405ed2b4SDenis Malikov * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
905405ed2b4SDenis Malikov * wait until all timers are finished firing before
906405ed2b4SDenis Malikov * returning. Otherwise, return immediately and set the
907405ed2b4SDenis Malikov * event when all timers are done.
908405ed2b4SDenis Malikov *
909405ed2b4SDenis Malikov * RETURNS
910405ed2b4SDenis Malikov * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
911405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
912405ed2b4SDenis Malikov */
RtlDeleteTimerQueueEx(HANDLE TimerQueue,HANDLE CompletionEvent)913405ed2b4SDenis Malikov NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
914405ed2b4SDenis Malikov {
915405ed2b4SDenis Malikov struct timer_queue *q = TimerQueue;
916405ed2b4SDenis Malikov struct queue_timer *t, *temp;
917405ed2b4SDenis Malikov HANDLE thread;
918405ed2b4SDenis Malikov NTSTATUS status;
919405ed2b4SDenis Malikov
920405ed2b4SDenis Malikov if (!q || q->magic != TIMER_QUEUE_MAGIC)
921405ed2b4SDenis Malikov return STATUS_INVALID_HANDLE;
922405ed2b4SDenis Malikov
923405ed2b4SDenis Malikov thread = q->thread;
924405ed2b4SDenis Malikov
925405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
926405ed2b4SDenis Malikov q->quit = TRUE;
927405ed2b4SDenis Malikov if (list_head(&q->timers))
928405ed2b4SDenis Malikov /* When the last timer is removed, it will signal the timer thread to
929405ed2b4SDenis Malikov exit... */
930405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
931405ed2b4SDenis Malikov queue_destroy_timer(t);
932405ed2b4SDenis Malikov else
933405ed2b4SDenis Malikov /* However if we have none, we must do it ourselves. */
934405ed2b4SDenis Malikov NtSetEvent(q->event, NULL);
935405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
936405ed2b4SDenis Malikov
937405ed2b4SDenis Malikov if (CompletionEvent == INVALID_HANDLE_VALUE)
938405ed2b4SDenis Malikov {
939405ed2b4SDenis Malikov NtWaitForSingleObject(thread, FALSE, NULL);
940405ed2b4SDenis Malikov status = STATUS_SUCCESS;
941405ed2b4SDenis Malikov }
942405ed2b4SDenis Malikov else
943405ed2b4SDenis Malikov {
944405ed2b4SDenis Malikov if (CompletionEvent)
945405ed2b4SDenis Malikov {
946405ed2b4SDenis Malikov FIXME("asynchronous return on completion event unimplemented\n");
947405ed2b4SDenis Malikov NtWaitForSingleObject(thread, FALSE, NULL);
948405ed2b4SDenis Malikov NtSetEvent(CompletionEvent, NULL);
949405ed2b4SDenis Malikov }
950405ed2b4SDenis Malikov status = STATUS_PENDING;
951405ed2b4SDenis Malikov }
952405ed2b4SDenis Malikov
953405ed2b4SDenis Malikov NtClose(thread);
954405ed2b4SDenis Malikov return status;
955405ed2b4SDenis Malikov }
956405ed2b4SDenis Malikov
get_timer_queue(HANDLE TimerQueue)957405ed2b4SDenis Malikov static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
958405ed2b4SDenis Malikov {
959405ed2b4SDenis Malikov static struct timer_queue *default_timer_queue;
960405ed2b4SDenis Malikov
961405ed2b4SDenis Malikov if (TimerQueue)
962405ed2b4SDenis Malikov return TimerQueue;
963405ed2b4SDenis Malikov else
964405ed2b4SDenis Malikov {
965405ed2b4SDenis Malikov if (!default_timer_queue)
966405ed2b4SDenis Malikov {
967405ed2b4SDenis Malikov HANDLE q;
968405ed2b4SDenis Malikov NTSTATUS status = RtlCreateTimerQueue(&q);
969405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
970405ed2b4SDenis Malikov {
971405ed2b4SDenis Malikov PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
972405ed2b4SDenis Malikov if (p)
973405ed2b4SDenis Malikov /* Got beat to the punch. */
974405ed2b4SDenis Malikov RtlDeleteTimerQueueEx(q, NULL);
975405ed2b4SDenis Malikov }
976405ed2b4SDenis Malikov }
977405ed2b4SDenis Malikov return default_timer_queue;
978405ed2b4SDenis Malikov }
979405ed2b4SDenis Malikov }
980405ed2b4SDenis Malikov
981405ed2b4SDenis Malikov /***********************************************************************
982405ed2b4SDenis Malikov * RtlCreateTimer (NTDLL.@)
983405ed2b4SDenis Malikov *
984405ed2b4SDenis Malikov * Creates a new timer associated with the given queue.
985405ed2b4SDenis Malikov *
986405ed2b4SDenis Malikov * PARAMS
987405ed2b4SDenis Malikov * TimerQueue [I] The queue to hold the timer.
988405ed2b4SDenis Malikov * NewTimer [O] The newly created timer.
989405ed2b4SDenis Malikov * Callback [I] The callback to fire.
990405ed2b4SDenis Malikov * Parameter [I] The argument for the callback.
991405ed2b4SDenis Malikov * DueTime [I] The delay, in milliseconds, before first firing the
992405ed2b4SDenis Malikov * timer.
993405ed2b4SDenis Malikov * Period [I] The period, in milliseconds, at which to fire the timer
994405ed2b4SDenis Malikov * after the first callback. If zero, the timer will only
995405ed2b4SDenis Malikov * fire once. It still needs to be deleted with
996405ed2b4SDenis Malikov * RtlDeleteTimer.
997405ed2b4SDenis Malikov * Flags [I] Flags controlling the execution of the callback. In
998405ed2b4SDenis Malikov * addition to the WT_* thread pool flags (see
999405ed2b4SDenis Malikov * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1000405ed2b4SDenis Malikov * WT_EXECUTEONLYONCE are supported.
1001405ed2b4SDenis Malikov *
1002405ed2b4SDenis Malikov * RETURNS
1003405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
1004405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
1005405ed2b4SDenis Malikov */
RtlCreateTimer(HANDLE TimerQueue,HANDLE * NewTimer,RTL_WAITORTIMERCALLBACKFUNC Callback,PVOID Parameter,DWORD DueTime,DWORD Period,ULONG Flags)1006405ed2b4SDenis Malikov NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
1007405ed2b4SDenis Malikov RTL_WAITORTIMERCALLBACKFUNC Callback,
1008405ed2b4SDenis Malikov PVOID Parameter, DWORD DueTime, DWORD Period,
1009405ed2b4SDenis Malikov ULONG Flags)
1010405ed2b4SDenis Malikov {
1011405ed2b4SDenis Malikov NTSTATUS status;
1012405ed2b4SDenis Malikov struct queue_timer *t;
1013405ed2b4SDenis Malikov struct timer_queue *q = get_timer_queue(TimerQueue);
1014405ed2b4SDenis Malikov
1015405ed2b4SDenis Malikov if (!q) return STATUS_NO_MEMORY;
1016405ed2b4SDenis Malikov if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1017405ed2b4SDenis Malikov
1018405ed2b4SDenis Malikov t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1019405ed2b4SDenis Malikov if (!t)
1020405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
1021405ed2b4SDenis Malikov
1022405ed2b4SDenis Malikov t->q = q;
1023405ed2b4SDenis Malikov t->runcount = 0;
1024405ed2b4SDenis Malikov t->callback = Callback;
1025405ed2b4SDenis Malikov t->param = Parameter;
1026405ed2b4SDenis Malikov t->period = Period;
1027405ed2b4SDenis Malikov t->flags = Flags;
1028405ed2b4SDenis Malikov t->destroy = FALSE;
1029405ed2b4SDenis Malikov t->event = NULL;
1030405ed2b4SDenis Malikov
1031405ed2b4SDenis Malikov status = STATUS_SUCCESS;
1032405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
1033405ed2b4SDenis Malikov if (q->quit)
1034405ed2b4SDenis Malikov status = STATUS_INVALID_HANDLE;
1035405ed2b4SDenis Malikov else
1036405ed2b4SDenis Malikov queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1037405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
1038405ed2b4SDenis Malikov
1039405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1040405ed2b4SDenis Malikov *NewTimer = t;
1041405ed2b4SDenis Malikov else
1042405ed2b4SDenis Malikov RtlFreeHeap(GetProcessHeap(), 0, t);
1043405ed2b4SDenis Malikov
1044405ed2b4SDenis Malikov return status;
1045405ed2b4SDenis Malikov }
1046405ed2b4SDenis Malikov
1047405ed2b4SDenis Malikov /***********************************************************************
1048405ed2b4SDenis Malikov * RtlUpdateTimer (NTDLL.@)
1049405ed2b4SDenis Malikov *
1050405ed2b4SDenis Malikov * Changes the time at which a timer expires.
1051405ed2b4SDenis Malikov *
1052405ed2b4SDenis Malikov * PARAMS
1053405ed2b4SDenis Malikov * TimerQueue [I] The queue that holds the timer.
1054405ed2b4SDenis Malikov * Timer [I] The timer to update.
1055405ed2b4SDenis Malikov * DueTime [I] The delay, in milliseconds, before next firing the timer.
1056405ed2b4SDenis Malikov * Period [I] The period, in milliseconds, at which to fire the timer
1057405ed2b4SDenis Malikov * after the first callback. If zero, the timer will not
1058405ed2b4SDenis Malikov * refire once. It still needs to be deleted with
1059405ed2b4SDenis Malikov * RtlDeleteTimer.
1060405ed2b4SDenis Malikov *
1061405ed2b4SDenis Malikov * RETURNS
1062405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
1063405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
1064405ed2b4SDenis Malikov */
RtlUpdateTimer(HANDLE TimerQueue,HANDLE Timer,DWORD DueTime,DWORD Period)1065405ed2b4SDenis Malikov NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1066405ed2b4SDenis Malikov DWORD DueTime, DWORD Period)
1067405ed2b4SDenis Malikov {
1068405ed2b4SDenis Malikov struct queue_timer *t = Timer;
1069405ed2b4SDenis Malikov struct timer_queue *q = t->q;
1070405ed2b4SDenis Malikov
1071405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
1072405ed2b4SDenis Malikov /* Can't change a timer if it was once-only or destroyed. */
1073405ed2b4SDenis Malikov if (t->expire != EXPIRE_NEVER)
1074405ed2b4SDenis Malikov {
1075405ed2b4SDenis Malikov t->period = Period;
1076405ed2b4SDenis Malikov queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1077405ed2b4SDenis Malikov }
1078405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
1079405ed2b4SDenis Malikov
1080405ed2b4SDenis Malikov return STATUS_SUCCESS;
1081405ed2b4SDenis Malikov }
1082405ed2b4SDenis Malikov
1083405ed2b4SDenis Malikov /***********************************************************************
1084405ed2b4SDenis Malikov * RtlDeleteTimer (NTDLL.@)
1085405ed2b4SDenis Malikov *
1086405ed2b4SDenis Malikov * Cancels a timer-queue timer.
1087405ed2b4SDenis Malikov *
1088405ed2b4SDenis Malikov * PARAMS
1089405ed2b4SDenis Malikov * TimerQueue [I] The queue that holds the timer.
1090405ed2b4SDenis Malikov * Timer [I] The timer to update.
1091405ed2b4SDenis Malikov * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1092405ed2b4SDenis Malikov * wait until the timer is finished firing all pending
1093405ed2b4SDenis Malikov * callbacks before returning. Otherwise, return
1094405ed2b4SDenis Malikov * immediately and set the timer is done.
1095405ed2b4SDenis Malikov *
1096405ed2b4SDenis Malikov * RETURNS
1097405ed2b4SDenis Malikov * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1098405ed2b4SDenis Malikov or if the completion event is NULL.
1099405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
1100405ed2b4SDenis Malikov */
RtlDeleteTimer(HANDLE TimerQueue,HANDLE Timer,HANDLE CompletionEvent)1101405ed2b4SDenis Malikov NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1102405ed2b4SDenis Malikov HANDLE CompletionEvent)
1103405ed2b4SDenis Malikov {
1104405ed2b4SDenis Malikov struct queue_timer *t = Timer;
1105405ed2b4SDenis Malikov struct timer_queue *q;
1106405ed2b4SDenis Malikov NTSTATUS status = STATUS_PENDING;
1107405ed2b4SDenis Malikov HANDLE event = NULL;
1108405ed2b4SDenis Malikov
1109405ed2b4SDenis Malikov if (!Timer)
1110405ed2b4SDenis Malikov return STATUS_INVALID_PARAMETER_1;
1111405ed2b4SDenis Malikov q = t->q;
1112405ed2b4SDenis Malikov if (CompletionEvent == INVALID_HANDLE_VALUE)
1113405ed2b4SDenis Malikov {
1114405ed2b4SDenis Malikov status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1115405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1116405ed2b4SDenis Malikov status = STATUS_PENDING;
1117405ed2b4SDenis Malikov }
1118405ed2b4SDenis Malikov else if (CompletionEvent)
1119405ed2b4SDenis Malikov event = CompletionEvent;
1120405ed2b4SDenis Malikov
1121405ed2b4SDenis Malikov RtlEnterCriticalSection(&q->cs);
1122405ed2b4SDenis Malikov t->event = event;
1123405ed2b4SDenis Malikov if (t->runcount == 0 && event)
1124405ed2b4SDenis Malikov status = STATUS_SUCCESS;
1125405ed2b4SDenis Malikov queue_destroy_timer(t);
1126405ed2b4SDenis Malikov RtlLeaveCriticalSection(&q->cs);
1127405ed2b4SDenis Malikov
1128405ed2b4SDenis Malikov if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1129405ed2b4SDenis Malikov {
1130405ed2b4SDenis Malikov if (status == STATUS_PENDING)
1131405ed2b4SDenis Malikov {
1132405ed2b4SDenis Malikov NtWaitForSingleObject(event, FALSE, NULL);
1133405ed2b4SDenis Malikov status = STATUS_SUCCESS;
1134405ed2b4SDenis Malikov }
1135405ed2b4SDenis Malikov NtClose(event);
1136405ed2b4SDenis Malikov }
1137405ed2b4SDenis Malikov
1138405ed2b4SDenis Malikov return status;
1139405ed2b4SDenis Malikov }
1140405ed2b4SDenis Malikov
1141405ed2b4SDenis Malikov /***********************************************************************
1142405ed2b4SDenis Malikov * timerqueue_thread_proc (internal)
1143405ed2b4SDenis Malikov */
1144*0bf42067SJustin Miller #ifdef __REACTOS__
timerqueue_thread_proc(PVOID param)1145*0bf42067SJustin Miller ULONG NTAPI timerqueue_thread_proc(PVOID param )
1146*0bf42067SJustin Miller #else
1147405ed2b4SDenis Malikov static void CALLBACK timerqueue_thread_proc( void *param )
1148*0bf42067SJustin Miller #endif
1149405ed2b4SDenis Malikov {
1150405ed2b4SDenis Malikov ULONGLONG timeout_lower, timeout_upper, new_timeout;
1151405ed2b4SDenis Malikov struct threadpool_object *other_timer;
1152405ed2b4SDenis Malikov LARGE_INTEGER now, timeout;
1153405ed2b4SDenis Malikov struct list *ptr;
1154405ed2b4SDenis Malikov
1155405ed2b4SDenis Malikov TRACE( "starting timer queue thread\n" );
1156405ed2b4SDenis Malikov set_thread_name(L"wine_threadpool_timerqueue");
1157405ed2b4SDenis Malikov
1158405ed2b4SDenis Malikov RtlEnterCriticalSection( &timerqueue.cs );
1159405ed2b4SDenis Malikov for (;;)
1160405ed2b4SDenis Malikov {
1161405ed2b4SDenis Malikov NtQuerySystemTime( &now );
1162405ed2b4SDenis Malikov
1163405ed2b4SDenis Malikov /* Check for expired timers. */
1164405ed2b4SDenis Malikov while ((ptr = list_head( &timerqueue.pending_timers )))
1165405ed2b4SDenis Malikov {
1166405ed2b4SDenis Malikov struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1167405ed2b4SDenis Malikov assert( timer->type == TP_OBJECT_TYPE_TIMER );
1168405ed2b4SDenis Malikov assert( timer->u.timer.timer_pending );
1169405ed2b4SDenis Malikov if (timer->u.timer.timeout > now.QuadPart)
1170405ed2b4SDenis Malikov break;
1171405ed2b4SDenis Malikov
1172405ed2b4SDenis Malikov /* Queue a new callback in one of the worker threads. */
1173405ed2b4SDenis Malikov list_remove( &timer->u.timer.timer_entry );
1174405ed2b4SDenis Malikov timer->u.timer.timer_pending = FALSE;
1175405ed2b4SDenis Malikov tp_object_submit( timer, FALSE );
1176405ed2b4SDenis Malikov
1177405ed2b4SDenis Malikov /* Insert the timer back into the queue, except it's marked for shutdown. */
1178405ed2b4SDenis Malikov if (timer->u.timer.period && !timer->shutdown)
1179405ed2b4SDenis Malikov {
1180405ed2b4SDenis Malikov timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1181405ed2b4SDenis Malikov if (timer->u.timer.timeout <= now.QuadPart)
1182405ed2b4SDenis Malikov timer->u.timer.timeout = now.QuadPart + 1;
1183405ed2b4SDenis Malikov
1184405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1185405ed2b4SDenis Malikov struct threadpool_object, u.timer.timer_entry )
1186405ed2b4SDenis Malikov {
1187405ed2b4SDenis Malikov assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1188405ed2b4SDenis Malikov if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1189405ed2b4SDenis Malikov break;
1190405ed2b4SDenis Malikov }
1191405ed2b4SDenis Malikov list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1192405ed2b4SDenis Malikov timer->u.timer.timer_pending = TRUE;
1193405ed2b4SDenis Malikov }
1194405ed2b4SDenis Malikov }
1195405ed2b4SDenis Malikov
1196405ed2b4SDenis Malikov timeout_lower = timeout_upper = MAXLONGLONG;
1197405ed2b4SDenis Malikov
1198405ed2b4SDenis Malikov /* Determine next timeout and use the window length to optimize wakeup times. */
1199405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1200405ed2b4SDenis Malikov struct threadpool_object, u.timer.timer_entry )
1201405ed2b4SDenis Malikov {
1202405ed2b4SDenis Malikov assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1203405ed2b4SDenis Malikov if (other_timer->u.timer.timeout >= timeout_upper)
1204405ed2b4SDenis Malikov break;
1205405ed2b4SDenis Malikov
1206405ed2b4SDenis Malikov timeout_lower = other_timer->u.timer.timeout;
1207405ed2b4SDenis Malikov new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1208405ed2b4SDenis Malikov if (new_timeout < timeout_upper)
1209405ed2b4SDenis Malikov timeout_upper = new_timeout;
1210405ed2b4SDenis Malikov }
1211405ed2b4SDenis Malikov
1212405ed2b4SDenis Malikov /* Wait for timer update events or until the next timer expires. */
1213405ed2b4SDenis Malikov if (timerqueue.objcount)
1214405ed2b4SDenis Malikov {
1215405ed2b4SDenis Malikov timeout.QuadPart = timeout_lower;
1216405ed2b4SDenis Malikov RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1217405ed2b4SDenis Malikov continue;
1218405ed2b4SDenis Malikov }
1219405ed2b4SDenis Malikov
1220405ed2b4SDenis Malikov /* All timers have been destroyed, if no new timers are created
1221405ed2b4SDenis Malikov * within some amount of time, then we can shutdown this thread. */
1222405ed2b4SDenis Malikov timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1223405ed2b4SDenis Malikov if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1224405ed2b4SDenis Malikov &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1225405ed2b4SDenis Malikov {
1226405ed2b4SDenis Malikov break;
1227405ed2b4SDenis Malikov }
1228405ed2b4SDenis Malikov }
1229405ed2b4SDenis Malikov
1230405ed2b4SDenis Malikov timerqueue.thread_running = FALSE;
1231405ed2b4SDenis Malikov RtlLeaveCriticalSection( &timerqueue.cs );
1232405ed2b4SDenis Malikov
1233405ed2b4SDenis Malikov TRACE( "terminating timer queue thread\n" );
1234405ed2b4SDenis Malikov RtlExitUserThread( 0 );
1235*0bf42067SJustin Miller #ifdef __REACTOS__
1236*0bf42067SJustin Miller return STATUS_SUCCESS;
1237*0bf42067SJustin Miller #endif
1238405ed2b4SDenis Malikov }
1239405ed2b4SDenis Malikov
1240405ed2b4SDenis Malikov /***********************************************************************
1241405ed2b4SDenis Malikov * tp_new_worker_thread (internal)
1242405ed2b4SDenis Malikov *
1243405ed2b4SDenis Malikov * Create and account a new worker thread for the desired pool.
1244405ed2b4SDenis Malikov */
tp_new_worker_thread(struct threadpool * pool)1245405ed2b4SDenis Malikov static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1246405ed2b4SDenis Malikov {
1247405ed2b4SDenis Malikov HANDLE thread;
1248405ed2b4SDenis Malikov NTSTATUS status;
1249405ed2b4SDenis Malikov
1250405ed2b4SDenis Malikov status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0,
1251405ed2b4SDenis Malikov pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1252405ed2b4SDenis Malikov threadpool_worker_proc, pool, &thread, NULL );
1253405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1254405ed2b4SDenis Malikov {
1255405ed2b4SDenis Malikov InterlockedIncrement( &pool->refcount );
1256405ed2b4SDenis Malikov pool->num_workers++;
1257405ed2b4SDenis Malikov NtClose( thread );
1258405ed2b4SDenis Malikov }
1259405ed2b4SDenis Malikov return status;
1260405ed2b4SDenis Malikov }
1261405ed2b4SDenis Malikov
1262405ed2b4SDenis Malikov /***********************************************************************
1263405ed2b4SDenis Malikov * tp_timerqueue_lock (internal)
1264405ed2b4SDenis Malikov *
1265405ed2b4SDenis Malikov * Acquires a lock on the global timerqueue. When the lock is acquired
1266405ed2b4SDenis Malikov * successfully, it is guaranteed that the timer thread is running.
1267405ed2b4SDenis Malikov */
tp_timerqueue_lock(struct threadpool_object * timer)1268405ed2b4SDenis Malikov static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1269405ed2b4SDenis Malikov {
1270405ed2b4SDenis Malikov NTSTATUS status = STATUS_SUCCESS;
1271405ed2b4SDenis Malikov assert( timer->type == TP_OBJECT_TYPE_TIMER );
1272405ed2b4SDenis Malikov
1273405ed2b4SDenis Malikov timer->u.timer.timer_initialized = FALSE;
1274405ed2b4SDenis Malikov timer->u.timer.timer_pending = FALSE;
1275405ed2b4SDenis Malikov timer->u.timer.timer_set = FALSE;
1276405ed2b4SDenis Malikov timer->u.timer.timeout = 0;
1277405ed2b4SDenis Malikov timer->u.timer.period = 0;
1278405ed2b4SDenis Malikov timer->u.timer.window_length = 0;
1279405ed2b4SDenis Malikov
1280405ed2b4SDenis Malikov RtlEnterCriticalSection( &timerqueue.cs );
1281405ed2b4SDenis Malikov
1282405ed2b4SDenis Malikov /* Make sure that the timerqueue thread is running. */
1283405ed2b4SDenis Malikov if (!timerqueue.thread_running)
1284405ed2b4SDenis Malikov {
1285405ed2b4SDenis Malikov HANDLE thread;
1286405ed2b4SDenis Malikov status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1287405ed2b4SDenis Malikov timerqueue_thread_proc, NULL, &thread, NULL );
1288405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1289405ed2b4SDenis Malikov {
1290405ed2b4SDenis Malikov timerqueue.thread_running = TRUE;
1291405ed2b4SDenis Malikov NtClose( thread );
1292405ed2b4SDenis Malikov }
1293405ed2b4SDenis Malikov }
1294405ed2b4SDenis Malikov
1295405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1296405ed2b4SDenis Malikov {
1297405ed2b4SDenis Malikov timer->u.timer.timer_initialized = TRUE;
1298405ed2b4SDenis Malikov timerqueue.objcount++;
1299405ed2b4SDenis Malikov }
1300405ed2b4SDenis Malikov
1301405ed2b4SDenis Malikov RtlLeaveCriticalSection( &timerqueue.cs );
1302405ed2b4SDenis Malikov return status;
1303405ed2b4SDenis Malikov }
1304405ed2b4SDenis Malikov
1305405ed2b4SDenis Malikov /***********************************************************************
1306405ed2b4SDenis Malikov * tp_timerqueue_unlock (internal)
1307405ed2b4SDenis Malikov *
1308405ed2b4SDenis Malikov * Releases a lock on the global timerqueue.
1309405ed2b4SDenis Malikov */
tp_timerqueue_unlock(struct threadpool_object * timer)1310405ed2b4SDenis Malikov static void tp_timerqueue_unlock( struct threadpool_object *timer )
1311405ed2b4SDenis Malikov {
1312405ed2b4SDenis Malikov assert( timer->type == TP_OBJECT_TYPE_TIMER );
1313405ed2b4SDenis Malikov
1314405ed2b4SDenis Malikov RtlEnterCriticalSection( &timerqueue.cs );
1315405ed2b4SDenis Malikov if (timer->u.timer.timer_initialized)
1316405ed2b4SDenis Malikov {
1317405ed2b4SDenis Malikov /* If timer was pending, remove it. */
1318405ed2b4SDenis Malikov if (timer->u.timer.timer_pending)
1319405ed2b4SDenis Malikov {
1320405ed2b4SDenis Malikov list_remove( &timer->u.timer.timer_entry );
1321405ed2b4SDenis Malikov timer->u.timer.timer_pending = FALSE;
1322405ed2b4SDenis Malikov }
1323405ed2b4SDenis Malikov
1324405ed2b4SDenis Malikov /* If the last timer object was destroyed, then wake up the thread. */
1325405ed2b4SDenis Malikov if (!--timerqueue.objcount)
1326405ed2b4SDenis Malikov {
1327405ed2b4SDenis Malikov assert( list_empty( &timerqueue.pending_timers ) );
1328405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &timerqueue.update_event );
1329405ed2b4SDenis Malikov }
1330405ed2b4SDenis Malikov
1331405ed2b4SDenis Malikov timer->u.timer.timer_initialized = FALSE;
1332405ed2b4SDenis Malikov }
1333405ed2b4SDenis Malikov RtlLeaveCriticalSection( &timerqueue.cs );
1334405ed2b4SDenis Malikov }
1335405ed2b4SDenis Malikov
1336405ed2b4SDenis Malikov /***********************************************************************
1337405ed2b4SDenis Malikov * waitqueue_thread_proc (internal)
1338405ed2b4SDenis Malikov */
1339*0bf42067SJustin Miller #ifdef __REACTOS__
waitqueue_thread_proc(PVOID param)1340*0bf42067SJustin Miller void NTAPI waitqueue_thread_proc(PVOID param )
1341*0bf42067SJustin Miller #else
1342405ed2b4SDenis Malikov static void CALLBACK waitqueue_thread_proc( void *param )
1343*0bf42067SJustin Miller #endif
1344405ed2b4SDenis Malikov {
1345405ed2b4SDenis Malikov struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1346405ed2b4SDenis Malikov HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1347405ed2b4SDenis Malikov struct waitqueue_bucket *bucket = param;
1348405ed2b4SDenis Malikov struct threadpool_object *wait, *next;
1349405ed2b4SDenis Malikov LARGE_INTEGER now, timeout;
1350405ed2b4SDenis Malikov DWORD num_handles;
1351405ed2b4SDenis Malikov NTSTATUS status;
1352405ed2b4SDenis Malikov
1353405ed2b4SDenis Malikov TRACE( "starting wait queue thread\n" );
1354405ed2b4SDenis Malikov set_thread_name(L"wine_threadpool_waitqueue");
1355405ed2b4SDenis Malikov
1356405ed2b4SDenis Malikov RtlEnterCriticalSection( &waitqueue.cs );
1357405ed2b4SDenis Malikov
1358405ed2b4SDenis Malikov for (;;)
1359405ed2b4SDenis Malikov {
1360405ed2b4SDenis Malikov NtQuerySystemTime( &now );
1361405ed2b4SDenis Malikov timeout.QuadPart = MAXLONGLONG;
1362405ed2b4SDenis Malikov num_handles = 0;
1363405ed2b4SDenis Malikov
1364405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1365405ed2b4SDenis Malikov u.wait.wait_entry )
1366405ed2b4SDenis Malikov {
1367405ed2b4SDenis Malikov assert( wait->type == TP_OBJECT_TYPE_WAIT );
1368405ed2b4SDenis Malikov if (wait->u.wait.timeout <= now.QuadPart)
1369405ed2b4SDenis Malikov {
1370405ed2b4SDenis Malikov /* Wait object timed out. */
1371405ed2b4SDenis Malikov if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1372405ed2b4SDenis Malikov {
1373405ed2b4SDenis Malikov list_remove( &wait->u.wait.wait_entry );
1374405ed2b4SDenis Malikov list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1375405ed2b4SDenis Malikov }
1376405ed2b4SDenis Malikov if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1377405ed2b4SDenis Malikov {
1378405ed2b4SDenis Malikov InterlockedIncrement( &wait->refcount );
1379405ed2b4SDenis Malikov wait->num_pending_callbacks++;
1380405ed2b4SDenis Malikov RtlEnterCriticalSection( &wait->pool->cs );
1381405ed2b4SDenis Malikov tp_object_execute( wait, TRUE );
1382405ed2b4SDenis Malikov RtlLeaveCriticalSection( &wait->pool->cs );
1383405ed2b4SDenis Malikov tp_object_release( wait );
1384405ed2b4SDenis Malikov }
1385405ed2b4SDenis Malikov else tp_object_submit( wait, FALSE );
1386405ed2b4SDenis Malikov }
1387405ed2b4SDenis Malikov else
1388405ed2b4SDenis Malikov {
1389405ed2b4SDenis Malikov if (wait->u.wait.timeout < timeout.QuadPart)
1390405ed2b4SDenis Malikov timeout.QuadPart = wait->u.wait.timeout;
1391405ed2b4SDenis Malikov
1392405ed2b4SDenis Malikov assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1393405ed2b4SDenis Malikov InterlockedIncrement( &wait->refcount );
1394405ed2b4SDenis Malikov objects[num_handles] = wait;
1395405ed2b4SDenis Malikov handles[num_handles] = wait->u.wait.handle;
1396405ed2b4SDenis Malikov num_handles++;
1397405ed2b4SDenis Malikov }
1398405ed2b4SDenis Malikov }
1399405ed2b4SDenis Malikov
1400405ed2b4SDenis Malikov if (!bucket->objcount)
1401405ed2b4SDenis Malikov {
1402405ed2b4SDenis Malikov /* All wait objects have been destroyed, if no new wait objects are created
1403405ed2b4SDenis Malikov * within some amount of time, then we can shutdown this thread. */
1404405ed2b4SDenis Malikov assert( num_handles == 0 );
1405405ed2b4SDenis Malikov RtlLeaveCriticalSection( &waitqueue.cs );
1406405ed2b4SDenis Malikov timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1407405ed2b4SDenis Malikov status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1408405ed2b4SDenis Malikov RtlEnterCriticalSection( &waitqueue.cs );
1409405ed2b4SDenis Malikov
1410405ed2b4SDenis Malikov if (status == STATUS_TIMEOUT && !bucket->objcount)
1411405ed2b4SDenis Malikov break;
1412405ed2b4SDenis Malikov }
1413405ed2b4SDenis Malikov else
1414405ed2b4SDenis Malikov {
1415405ed2b4SDenis Malikov handles[num_handles] = bucket->update_event;
1416405ed2b4SDenis Malikov RtlLeaveCriticalSection( &waitqueue.cs );
1417405ed2b4SDenis Malikov status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1418405ed2b4SDenis Malikov RtlEnterCriticalSection( &waitqueue.cs );
1419405ed2b4SDenis Malikov
1420405ed2b4SDenis Malikov if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1421405ed2b4SDenis Malikov {
1422405ed2b4SDenis Malikov wait = objects[status - STATUS_WAIT_0];
1423405ed2b4SDenis Malikov assert( wait->type == TP_OBJECT_TYPE_WAIT );
1424405ed2b4SDenis Malikov if (wait->u.wait.bucket)
1425405ed2b4SDenis Malikov {
1426405ed2b4SDenis Malikov /* Wait object signaled. */
1427405ed2b4SDenis Malikov assert( wait->u.wait.bucket == bucket );
1428405ed2b4SDenis Malikov if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1429405ed2b4SDenis Malikov {
1430405ed2b4SDenis Malikov list_remove( &wait->u.wait.wait_entry );
1431405ed2b4SDenis Malikov list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1432405ed2b4SDenis Malikov }
1433405ed2b4SDenis Malikov if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1434405ed2b4SDenis Malikov {
1435405ed2b4SDenis Malikov wait->u.wait.signaled++;
1436405ed2b4SDenis Malikov wait->num_pending_callbacks++;
1437405ed2b4SDenis Malikov RtlEnterCriticalSection( &wait->pool->cs );
1438405ed2b4SDenis Malikov tp_object_execute( wait, TRUE );
1439405ed2b4SDenis Malikov RtlLeaveCriticalSection( &wait->pool->cs );
1440405ed2b4SDenis Malikov }
1441405ed2b4SDenis Malikov else tp_object_submit( wait, TRUE );
1442405ed2b4SDenis Malikov }
1443405ed2b4SDenis Malikov else
1444405ed2b4SDenis Malikov WARN("wait object %p triggered while object was destroyed\n", wait);
1445405ed2b4SDenis Malikov }
1446405ed2b4SDenis Malikov
1447405ed2b4SDenis Malikov /* Release temporary references to wait objects. */
1448405ed2b4SDenis Malikov while (num_handles)
1449405ed2b4SDenis Malikov {
1450405ed2b4SDenis Malikov wait = objects[--num_handles];
1451405ed2b4SDenis Malikov assert( wait->type == TP_OBJECT_TYPE_WAIT );
1452405ed2b4SDenis Malikov tp_object_release( wait );
1453405ed2b4SDenis Malikov }
1454405ed2b4SDenis Malikov }
1455405ed2b4SDenis Malikov
1456405ed2b4SDenis Malikov /* Try to merge bucket with other threads. */
1457405ed2b4SDenis Malikov if (waitqueue.num_buckets > 1 && bucket->objcount &&
1458405ed2b4SDenis Malikov bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1459405ed2b4SDenis Malikov {
1460405ed2b4SDenis Malikov struct waitqueue_bucket *other_bucket;
1461405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1462405ed2b4SDenis Malikov {
1463405ed2b4SDenis Malikov if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1464405ed2b4SDenis Malikov other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1465405ed2b4SDenis Malikov {
1466405ed2b4SDenis Malikov other_bucket->objcount += bucket->objcount;
1467405ed2b4SDenis Malikov bucket->objcount = 0;
1468405ed2b4SDenis Malikov
1469405ed2b4SDenis Malikov /* Update reserved list. */
1470405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1471405ed2b4SDenis Malikov {
1472405ed2b4SDenis Malikov assert( wait->type == TP_OBJECT_TYPE_WAIT );
1473405ed2b4SDenis Malikov wait->u.wait.bucket = other_bucket;
1474405ed2b4SDenis Malikov }
1475405ed2b4SDenis Malikov list_move_tail( &other_bucket->reserved, &bucket->reserved );
1476405ed2b4SDenis Malikov
1477405ed2b4SDenis Malikov /* Update waiting list. */
1478405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1479405ed2b4SDenis Malikov {
1480405ed2b4SDenis Malikov assert( wait->type == TP_OBJECT_TYPE_WAIT );
1481405ed2b4SDenis Malikov wait->u.wait.bucket = other_bucket;
1482405ed2b4SDenis Malikov }
1483405ed2b4SDenis Malikov list_move_tail( &other_bucket->waiting, &bucket->waiting );
1484405ed2b4SDenis Malikov
1485405ed2b4SDenis Malikov /* Move bucket to the end, to keep the probability of
1486405ed2b4SDenis Malikov * newly added wait objects as small as possible. */
1487405ed2b4SDenis Malikov list_remove( &bucket->bucket_entry );
1488405ed2b4SDenis Malikov list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1489405ed2b4SDenis Malikov
1490405ed2b4SDenis Malikov NtSetEvent( other_bucket->update_event, NULL );
1491405ed2b4SDenis Malikov break;
1492405ed2b4SDenis Malikov }
1493405ed2b4SDenis Malikov }
1494405ed2b4SDenis Malikov }
1495405ed2b4SDenis Malikov }
1496405ed2b4SDenis Malikov
1497405ed2b4SDenis Malikov /* Remove this bucket from the list. */
1498405ed2b4SDenis Malikov list_remove( &bucket->bucket_entry );
1499405ed2b4SDenis Malikov if (!--waitqueue.num_buckets)
1500405ed2b4SDenis Malikov assert( list_empty( &waitqueue.buckets ) );
1501405ed2b4SDenis Malikov
1502405ed2b4SDenis Malikov RtlLeaveCriticalSection( &waitqueue.cs );
1503405ed2b4SDenis Malikov
1504405ed2b4SDenis Malikov TRACE( "terminating wait queue thread\n" );
1505405ed2b4SDenis Malikov
1506405ed2b4SDenis Malikov assert( bucket->objcount == 0 );
1507405ed2b4SDenis Malikov assert( list_empty( &bucket->reserved ) );
1508405ed2b4SDenis Malikov assert( list_empty( &bucket->waiting ) );
1509405ed2b4SDenis Malikov NtClose( bucket->update_event );
1510405ed2b4SDenis Malikov
1511405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, bucket );
1512405ed2b4SDenis Malikov RtlExitUserThread( 0 );
1513405ed2b4SDenis Malikov }
1514405ed2b4SDenis Malikov
1515405ed2b4SDenis Malikov /***********************************************************************
1516405ed2b4SDenis Malikov * tp_waitqueue_lock (internal)
1517405ed2b4SDenis Malikov */
tp_waitqueue_lock(struct threadpool_object * wait)1518405ed2b4SDenis Malikov static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1519405ed2b4SDenis Malikov {
1520405ed2b4SDenis Malikov struct waitqueue_bucket *bucket;
1521405ed2b4SDenis Malikov NTSTATUS status;
1522405ed2b4SDenis Malikov HANDLE thread;
1523405ed2b4SDenis Malikov BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1524405ed2b4SDenis Malikov assert( wait->type == TP_OBJECT_TYPE_WAIT );
1525405ed2b4SDenis Malikov
1526405ed2b4SDenis Malikov wait->u.wait.signaled = 0;
1527405ed2b4SDenis Malikov wait->u.wait.bucket = NULL;
1528405ed2b4SDenis Malikov wait->u.wait.wait_pending = FALSE;
1529405ed2b4SDenis Malikov wait->u.wait.timeout = 0;
1530405ed2b4SDenis Malikov wait->u.wait.handle = INVALID_HANDLE_VALUE;
1531405ed2b4SDenis Malikov
1532405ed2b4SDenis Malikov RtlEnterCriticalSection( &waitqueue.cs );
1533405ed2b4SDenis Malikov
1534405ed2b4SDenis Malikov /* Try to assign to existing bucket if possible. */
1535405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1536405ed2b4SDenis Malikov {
1537405ed2b4SDenis Malikov if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1538405ed2b4SDenis Malikov {
1539405ed2b4SDenis Malikov list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1540405ed2b4SDenis Malikov wait->u.wait.bucket = bucket;
1541405ed2b4SDenis Malikov bucket->objcount++;
1542405ed2b4SDenis Malikov
1543405ed2b4SDenis Malikov status = STATUS_SUCCESS;
1544405ed2b4SDenis Malikov goto out;
1545405ed2b4SDenis Malikov }
1546405ed2b4SDenis Malikov }
1547405ed2b4SDenis Malikov
1548405ed2b4SDenis Malikov /* Create a new bucket and corresponding worker thread. */
1549405ed2b4SDenis Malikov bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1550405ed2b4SDenis Malikov if (!bucket)
1551405ed2b4SDenis Malikov {
1552405ed2b4SDenis Malikov status = STATUS_NO_MEMORY;
1553405ed2b4SDenis Malikov goto out;
1554405ed2b4SDenis Malikov }
1555405ed2b4SDenis Malikov
1556405ed2b4SDenis Malikov bucket->objcount = 0;
1557405ed2b4SDenis Malikov bucket->alertable = alertable;
1558405ed2b4SDenis Malikov list_init( &bucket->reserved );
1559405ed2b4SDenis Malikov list_init( &bucket->waiting );
1560405ed2b4SDenis Malikov
1561405ed2b4SDenis Malikov status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1562405ed2b4SDenis Malikov NULL, SynchronizationEvent, FALSE );
1563405ed2b4SDenis Malikov if (status)
1564405ed2b4SDenis Malikov {
1565405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, bucket );
1566405ed2b4SDenis Malikov goto out;
1567405ed2b4SDenis Malikov }
1568405ed2b4SDenis Malikov
1569405ed2b4SDenis Malikov status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1570*0bf42067SJustin Miller (PTHREAD_START_ROUTINE)waitqueue_thread_proc, bucket, &thread, NULL );
1571405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1572405ed2b4SDenis Malikov {
1573405ed2b4SDenis Malikov list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1574405ed2b4SDenis Malikov waitqueue.num_buckets++;
1575405ed2b4SDenis Malikov
1576405ed2b4SDenis Malikov list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1577405ed2b4SDenis Malikov wait->u.wait.bucket = bucket;
1578405ed2b4SDenis Malikov bucket->objcount++;
1579405ed2b4SDenis Malikov
1580405ed2b4SDenis Malikov NtClose( thread );
1581405ed2b4SDenis Malikov }
1582405ed2b4SDenis Malikov else
1583405ed2b4SDenis Malikov {
1584405ed2b4SDenis Malikov NtClose( bucket->update_event );
1585405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, bucket );
1586405ed2b4SDenis Malikov }
1587405ed2b4SDenis Malikov
1588405ed2b4SDenis Malikov out:
1589405ed2b4SDenis Malikov RtlLeaveCriticalSection( &waitqueue.cs );
1590405ed2b4SDenis Malikov return status;
1591405ed2b4SDenis Malikov }
1592405ed2b4SDenis Malikov
1593405ed2b4SDenis Malikov /***********************************************************************
1594405ed2b4SDenis Malikov * tp_waitqueue_unlock (internal)
1595405ed2b4SDenis Malikov */
tp_waitqueue_unlock(struct threadpool_object * wait)1596405ed2b4SDenis Malikov static void tp_waitqueue_unlock( struct threadpool_object *wait )
1597405ed2b4SDenis Malikov {
1598405ed2b4SDenis Malikov assert( wait->type == TP_OBJECT_TYPE_WAIT );
1599405ed2b4SDenis Malikov
1600405ed2b4SDenis Malikov RtlEnterCriticalSection( &waitqueue.cs );
1601405ed2b4SDenis Malikov if (wait->u.wait.bucket)
1602405ed2b4SDenis Malikov {
1603405ed2b4SDenis Malikov struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1604405ed2b4SDenis Malikov assert( bucket->objcount > 0 );
1605405ed2b4SDenis Malikov
1606405ed2b4SDenis Malikov list_remove( &wait->u.wait.wait_entry );
1607405ed2b4SDenis Malikov wait->u.wait.bucket = NULL;
1608405ed2b4SDenis Malikov bucket->objcount--;
1609405ed2b4SDenis Malikov
1610405ed2b4SDenis Malikov NtSetEvent( bucket->update_event, NULL );
1611405ed2b4SDenis Malikov }
1612405ed2b4SDenis Malikov RtlLeaveCriticalSection( &waitqueue.cs );
1613405ed2b4SDenis Malikov }
1614405ed2b4SDenis Malikov
1615*0bf42067SJustin Miller #ifdef __REACTOS__
ioqueue_thread_proc(PVOID param)1616*0bf42067SJustin Miller ULONG NTAPI ioqueue_thread_proc(PVOID param )
1617*0bf42067SJustin Miller #else
1618405ed2b4SDenis Malikov static void CALLBACK ioqueue_thread_proc( void *param )
1619*0bf42067SJustin Miller #endif
1620405ed2b4SDenis Malikov {
1621405ed2b4SDenis Malikov struct io_completion *completion;
1622405ed2b4SDenis Malikov struct threadpool_object *io;
1623405ed2b4SDenis Malikov IO_STATUS_BLOCK iosb;
1624*0bf42067SJustin Miller #ifdef __REACTOS__
1625*0bf42067SJustin Miller PVOID key, value;
1626*0bf42067SJustin Miller #else
1627405ed2b4SDenis Malikov ULONG_PTR key, value;
1628*0bf42067SJustin Miller #endif
1629405ed2b4SDenis Malikov BOOL destroy, skip;
1630405ed2b4SDenis Malikov NTSTATUS status;
1631405ed2b4SDenis Malikov
1632405ed2b4SDenis Malikov TRACE( "starting I/O completion thread\n" );
1633405ed2b4SDenis Malikov set_thread_name(L"wine_threadpool_ioqueue");
1634405ed2b4SDenis Malikov
1635405ed2b4SDenis Malikov RtlEnterCriticalSection( &ioqueue.cs );
1636405ed2b4SDenis Malikov
1637405ed2b4SDenis Malikov for (;;)
1638405ed2b4SDenis Malikov {
1639405ed2b4SDenis Malikov RtlLeaveCriticalSection( &ioqueue.cs );
1640405ed2b4SDenis Malikov if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1641405ed2b4SDenis Malikov ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1642405ed2b4SDenis Malikov RtlEnterCriticalSection( &ioqueue.cs );
1643405ed2b4SDenis Malikov
1644405ed2b4SDenis Malikov destroy = skip = FALSE;
1645405ed2b4SDenis Malikov io = (struct threadpool_object *)key;
1646405ed2b4SDenis Malikov
1647405ed2b4SDenis Malikov TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1648405ed2b4SDenis Malikov
1649405ed2b4SDenis Malikov if (io && (io->shutdown || io->u.io.shutting_down))
1650405ed2b4SDenis Malikov {
1651405ed2b4SDenis Malikov RtlEnterCriticalSection( &io->pool->cs );
1652405ed2b4SDenis Malikov if (!io->u.io.pending_count)
1653405ed2b4SDenis Malikov {
1654405ed2b4SDenis Malikov if (io->u.io.skipped_count)
1655405ed2b4SDenis Malikov --io->u.io.skipped_count;
1656405ed2b4SDenis Malikov
1657405ed2b4SDenis Malikov if (io->u.io.skipped_count)
1658405ed2b4SDenis Malikov skip = TRUE;
1659405ed2b4SDenis Malikov else
1660405ed2b4SDenis Malikov destroy = TRUE;
1661405ed2b4SDenis Malikov }
1662405ed2b4SDenis Malikov RtlLeaveCriticalSection( &io->pool->cs );
1663405ed2b4SDenis Malikov if (skip) continue;
1664405ed2b4SDenis Malikov }
1665405ed2b4SDenis Malikov
1666405ed2b4SDenis Malikov if (destroy)
1667405ed2b4SDenis Malikov {
1668405ed2b4SDenis Malikov --ioqueue.objcount;
1669405ed2b4SDenis Malikov TRACE( "Releasing io %p.\n", io );
1670405ed2b4SDenis Malikov io->shutdown = TRUE;
1671405ed2b4SDenis Malikov tp_object_release( io );
1672405ed2b4SDenis Malikov }
1673405ed2b4SDenis Malikov else if (io)
1674405ed2b4SDenis Malikov {
1675405ed2b4SDenis Malikov RtlEnterCriticalSection( &io->pool->cs );
1676405ed2b4SDenis Malikov
1677405ed2b4SDenis Malikov TRACE( "pending_count %u.\n", io->u.io.pending_count );
1678405ed2b4SDenis Malikov
1679405ed2b4SDenis Malikov if (io->u.io.pending_count)
1680405ed2b4SDenis Malikov {
1681405ed2b4SDenis Malikov --io->u.io.pending_count;
1682405ed2b4SDenis Malikov if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1683405ed2b4SDenis Malikov io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1684405ed2b4SDenis Malikov {
1685405ed2b4SDenis Malikov ERR( "Failed to allocate memory.\n" );
1686405ed2b4SDenis Malikov RtlLeaveCriticalSection( &io->pool->cs );
1687405ed2b4SDenis Malikov continue;
1688405ed2b4SDenis Malikov }
1689405ed2b4SDenis Malikov
1690405ed2b4SDenis Malikov completion = &io->u.io.completions[io->u.io.completion_count++];
1691405ed2b4SDenis Malikov completion->iosb = iosb;
1692*0bf42067SJustin Miller #ifdef __REACTOS__
1693*0bf42067SJustin Miller completion->cvalue = (ULONG_PTR)value;
1694*0bf42067SJustin Miller #else
1695405ed2b4SDenis Malikov completion->cvalue = value;
1696*0bf42067SJustin Miller #endif
1697405ed2b4SDenis Malikov
1698405ed2b4SDenis Malikov tp_object_submit( io, FALSE );
1699405ed2b4SDenis Malikov }
1700405ed2b4SDenis Malikov RtlLeaveCriticalSection( &io->pool->cs );
1701405ed2b4SDenis Malikov }
1702405ed2b4SDenis Malikov
1703405ed2b4SDenis Malikov if (!ioqueue.objcount)
1704405ed2b4SDenis Malikov {
1705405ed2b4SDenis Malikov /* All I/O objects have been destroyed; if no new objects are
1706405ed2b4SDenis Malikov * created within some amount of time, then we can shutdown this
1707405ed2b4SDenis Malikov * thread. */
1708405ed2b4SDenis Malikov LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1709405ed2b4SDenis Malikov if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1710405ed2b4SDenis Malikov &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1711405ed2b4SDenis Malikov break;
1712405ed2b4SDenis Malikov }
1713405ed2b4SDenis Malikov }
1714405ed2b4SDenis Malikov
1715405ed2b4SDenis Malikov ioqueue.thread_running = FALSE;
1716405ed2b4SDenis Malikov RtlLeaveCriticalSection( &ioqueue.cs );
1717405ed2b4SDenis Malikov
1718405ed2b4SDenis Malikov TRACE( "terminating I/O completion thread\n" );
1719405ed2b4SDenis Malikov
1720405ed2b4SDenis Malikov RtlExitUserThread( 0 );
1721*0bf42067SJustin Miller
1722*0bf42067SJustin Miller #ifdef __REACTOS__
1723*0bf42067SJustin Miller return STATUS_SUCCESS;
1724*0bf42067SJustin Miller #endif
1725405ed2b4SDenis Malikov }
1726405ed2b4SDenis Malikov
tp_ioqueue_lock(struct threadpool_object * io,HANDLE file)1727405ed2b4SDenis Malikov static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1728405ed2b4SDenis Malikov {
1729405ed2b4SDenis Malikov NTSTATUS status = STATUS_SUCCESS;
1730405ed2b4SDenis Malikov
1731405ed2b4SDenis Malikov assert( io->type == TP_OBJECT_TYPE_IO );
1732405ed2b4SDenis Malikov
1733405ed2b4SDenis Malikov RtlEnterCriticalSection( &ioqueue.cs );
1734405ed2b4SDenis Malikov
1735405ed2b4SDenis Malikov if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1736405ed2b4SDenis Malikov IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1737405ed2b4SDenis Malikov {
1738405ed2b4SDenis Malikov RtlLeaveCriticalSection( &ioqueue.cs );
1739405ed2b4SDenis Malikov return status;
1740405ed2b4SDenis Malikov }
1741405ed2b4SDenis Malikov
1742405ed2b4SDenis Malikov if (!ioqueue.thread_running)
1743405ed2b4SDenis Malikov {
1744405ed2b4SDenis Malikov HANDLE thread;
1745405ed2b4SDenis Malikov
1746405ed2b4SDenis Malikov if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1747405ed2b4SDenis Malikov 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1748405ed2b4SDenis Malikov {
1749405ed2b4SDenis Malikov ioqueue.thread_running = TRUE;
1750405ed2b4SDenis Malikov NtClose( thread );
1751405ed2b4SDenis Malikov }
1752405ed2b4SDenis Malikov }
1753405ed2b4SDenis Malikov
1754405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1755405ed2b4SDenis Malikov {
1756405ed2b4SDenis Malikov FILE_COMPLETION_INFORMATION info;
1757405ed2b4SDenis Malikov IO_STATUS_BLOCK iosb;
1758405ed2b4SDenis Malikov
1759*0bf42067SJustin Miller #ifdef __REACTOS__
1760*0bf42067SJustin Miller info.Port = ioqueue.port;
1761*0bf42067SJustin Miller info.Key = io;
1762*0bf42067SJustin Miller #else
1763405ed2b4SDenis Malikov info.CompletionPort = ioqueue.port;
1764405ed2b4SDenis Malikov info.CompletionKey = (ULONG_PTR)io;
1765*0bf42067SJustin Miller #endif
1766405ed2b4SDenis Malikov
1767405ed2b4SDenis Malikov status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1768405ed2b4SDenis Malikov }
1769405ed2b4SDenis Malikov
1770405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1771405ed2b4SDenis Malikov {
1772405ed2b4SDenis Malikov if (!ioqueue.objcount++)
1773405ed2b4SDenis Malikov RtlWakeConditionVariable( &ioqueue.update_event );
1774405ed2b4SDenis Malikov }
1775405ed2b4SDenis Malikov
1776405ed2b4SDenis Malikov RtlLeaveCriticalSection( &ioqueue.cs );
1777405ed2b4SDenis Malikov return status;
1778405ed2b4SDenis Malikov }
1779405ed2b4SDenis Malikov
1780405ed2b4SDenis Malikov /***********************************************************************
1781405ed2b4SDenis Malikov * tp_threadpool_alloc (internal)
1782405ed2b4SDenis Malikov *
1783405ed2b4SDenis Malikov * Allocates a new threadpool object.
1784405ed2b4SDenis Malikov */
tp_threadpool_alloc(struct threadpool ** out)1785405ed2b4SDenis Malikov static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1786405ed2b4SDenis Malikov {
1787*0bf42067SJustin Miller #ifdef __REACTOS__
1788*0bf42067SJustin Miller IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1789*0bf42067SJustin Miller #else
1790405ed2b4SDenis Malikov IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1791*0bf42067SJustin Miller #endif
1792405ed2b4SDenis Malikov struct threadpool *pool;
1793405ed2b4SDenis Malikov unsigned int i;
1794405ed2b4SDenis Malikov
1795405ed2b4SDenis Malikov pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1796405ed2b4SDenis Malikov if (!pool)
1797405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
1798405ed2b4SDenis Malikov
1799405ed2b4SDenis Malikov pool->refcount = 1;
1800405ed2b4SDenis Malikov pool->objcount = 0;
1801405ed2b4SDenis Malikov pool->shutdown = FALSE;
1802405ed2b4SDenis Malikov
1803*0bf42067SJustin Miller #ifdef __REACTOS__
1804*0bf42067SJustin Miller RtlInitializeCriticalSection( &pool->cs );
1805*0bf42067SJustin Miller #else
1806405ed2b4SDenis Malikov RtlInitializeCriticalSectionEx( &pool->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1807*0bf42067SJustin Miller
1808405ed2b4SDenis Malikov pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1809*0bf42067SJustin Miller #endif
1810405ed2b4SDenis Malikov
1811405ed2b4SDenis Malikov for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1812405ed2b4SDenis Malikov list_init( &pool->pools[i] );
1813405ed2b4SDenis Malikov RtlInitializeConditionVariable( &pool->update_event );
1814405ed2b4SDenis Malikov
1815405ed2b4SDenis Malikov pool->max_workers = 500;
1816405ed2b4SDenis Malikov pool->min_workers = 0;
1817405ed2b4SDenis Malikov pool->num_workers = 0;
1818405ed2b4SDenis Malikov pool->num_busy_workers = 0;
1819405ed2b4SDenis Malikov pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1820405ed2b4SDenis Malikov pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1821405ed2b4SDenis Malikov
1822405ed2b4SDenis Malikov TRACE( "allocated threadpool %p\n", pool );
1823405ed2b4SDenis Malikov
1824405ed2b4SDenis Malikov *out = pool;
1825405ed2b4SDenis Malikov return STATUS_SUCCESS;
1826405ed2b4SDenis Malikov }
1827405ed2b4SDenis Malikov
1828405ed2b4SDenis Malikov /***********************************************************************
1829405ed2b4SDenis Malikov * tp_threadpool_shutdown (internal)
1830405ed2b4SDenis Malikov *
1831405ed2b4SDenis Malikov * Prepares the shutdown of a threadpool object and notifies all worker
1832405ed2b4SDenis Malikov * threads to terminate (after all remaining work items have been
1833405ed2b4SDenis Malikov * processed).
1834405ed2b4SDenis Malikov */
tp_threadpool_shutdown(struct threadpool * pool)1835405ed2b4SDenis Malikov static void tp_threadpool_shutdown( struct threadpool *pool )
1836405ed2b4SDenis Malikov {
1837405ed2b4SDenis Malikov assert( pool != default_threadpool );
1838405ed2b4SDenis Malikov
1839405ed2b4SDenis Malikov pool->shutdown = TRUE;
1840405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &pool->update_event );
1841405ed2b4SDenis Malikov }
1842405ed2b4SDenis Malikov
1843405ed2b4SDenis Malikov /***********************************************************************
1844405ed2b4SDenis Malikov * tp_threadpool_release (internal)
1845405ed2b4SDenis Malikov *
1846405ed2b4SDenis Malikov * Releases a reference to a threadpool object.
1847405ed2b4SDenis Malikov */
tp_threadpool_release(struct threadpool * pool)1848405ed2b4SDenis Malikov static BOOL tp_threadpool_release( struct threadpool *pool )
1849405ed2b4SDenis Malikov {
1850405ed2b4SDenis Malikov unsigned int i;
1851405ed2b4SDenis Malikov
1852405ed2b4SDenis Malikov if (InterlockedDecrement( &pool->refcount ))
1853405ed2b4SDenis Malikov return FALSE;
1854405ed2b4SDenis Malikov
1855405ed2b4SDenis Malikov TRACE( "destroying threadpool %p\n", pool );
1856405ed2b4SDenis Malikov
1857405ed2b4SDenis Malikov assert( pool->shutdown );
1858405ed2b4SDenis Malikov assert( !pool->objcount );
1859405ed2b4SDenis Malikov for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1860405ed2b4SDenis Malikov assert( list_empty( &pool->pools[i] ) );
1861*0bf42067SJustin Miller #ifndef __REACTOS__
1862405ed2b4SDenis Malikov pool->cs.DebugInfo->Spare[0] = 0;
1863*0bf42067SJustin Miller #endif
1864405ed2b4SDenis Malikov RtlDeleteCriticalSection( &pool->cs );
1865405ed2b4SDenis Malikov
1866405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, pool );
1867405ed2b4SDenis Malikov return TRUE;
1868405ed2b4SDenis Malikov }
1869405ed2b4SDenis Malikov
1870405ed2b4SDenis Malikov /***********************************************************************
1871405ed2b4SDenis Malikov * tp_threadpool_lock (internal)
1872405ed2b4SDenis Malikov *
1873405ed2b4SDenis Malikov * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1874405ed2b4SDenis Malikov * block. When the lock is acquired successfully, it is guaranteed that
1875405ed2b4SDenis Malikov * there is at least one worker thread to process tasks.
1876405ed2b4SDenis Malikov */
tp_threadpool_lock(struct threadpool ** out,TP_CALLBACK_ENVIRON * environment)1877405ed2b4SDenis Malikov static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1878405ed2b4SDenis Malikov {
1879405ed2b4SDenis Malikov struct threadpool *pool = NULL;
1880405ed2b4SDenis Malikov NTSTATUS status = STATUS_SUCCESS;
1881405ed2b4SDenis Malikov
1882405ed2b4SDenis Malikov if (environment)
1883405ed2b4SDenis Malikov {
1884*0bf42067SJustin Miller #ifndef __REACTOS__ //Windows 7 stuff
1885405ed2b4SDenis Malikov /* Validate environment parameters. */
1886405ed2b4SDenis Malikov if (environment->Version == 3)
1887405ed2b4SDenis Malikov {
1888405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1889405ed2b4SDenis Malikov
1890405ed2b4SDenis Malikov switch (environment3->CallbackPriority)
1891405ed2b4SDenis Malikov {
1892405ed2b4SDenis Malikov case TP_CALLBACK_PRIORITY_HIGH:
1893405ed2b4SDenis Malikov case TP_CALLBACK_PRIORITY_NORMAL:
1894405ed2b4SDenis Malikov case TP_CALLBACK_PRIORITY_LOW:
1895405ed2b4SDenis Malikov break;
1896405ed2b4SDenis Malikov default:
1897405ed2b4SDenis Malikov return STATUS_INVALID_PARAMETER;
1898405ed2b4SDenis Malikov }
1899405ed2b4SDenis Malikov }
1900*0bf42067SJustin Miller #endif
1901405ed2b4SDenis Malikov pool = (struct threadpool *)environment->Pool;
1902405ed2b4SDenis Malikov }
1903405ed2b4SDenis Malikov
1904405ed2b4SDenis Malikov if (!pool)
1905405ed2b4SDenis Malikov {
1906405ed2b4SDenis Malikov if (!default_threadpool)
1907405ed2b4SDenis Malikov {
1908405ed2b4SDenis Malikov status = tp_threadpool_alloc( &pool );
1909405ed2b4SDenis Malikov if (status != STATUS_SUCCESS)
1910405ed2b4SDenis Malikov return status;
1911405ed2b4SDenis Malikov
1912405ed2b4SDenis Malikov if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1913405ed2b4SDenis Malikov {
1914405ed2b4SDenis Malikov tp_threadpool_shutdown( pool );
1915405ed2b4SDenis Malikov tp_threadpool_release( pool );
1916405ed2b4SDenis Malikov }
1917405ed2b4SDenis Malikov }
1918405ed2b4SDenis Malikov
1919405ed2b4SDenis Malikov pool = default_threadpool;
1920405ed2b4SDenis Malikov }
1921405ed2b4SDenis Malikov
1922405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
1923405ed2b4SDenis Malikov
1924405ed2b4SDenis Malikov /* Make sure that the threadpool has at least one thread. */
1925405ed2b4SDenis Malikov if (!pool->num_workers)
1926405ed2b4SDenis Malikov status = tp_new_worker_thread( pool );
1927405ed2b4SDenis Malikov
1928405ed2b4SDenis Malikov /* Keep a reference, and increment objcount to ensure that the
1929405ed2b4SDenis Malikov * last thread doesn't terminate. */
1930405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
1931405ed2b4SDenis Malikov {
1932405ed2b4SDenis Malikov InterlockedIncrement( &pool->refcount );
1933405ed2b4SDenis Malikov pool->objcount++;
1934405ed2b4SDenis Malikov }
1935405ed2b4SDenis Malikov
1936405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
1937405ed2b4SDenis Malikov
1938405ed2b4SDenis Malikov if (status != STATUS_SUCCESS)
1939405ed2b4SDenis Malikov return status;
1940405ed2b4SDenis Malikov
1941405ed2b4SDenis Malikov *out = pool;
1942405ed2b4SDenis Malikov return STATUS_SUCCESS;
1943405ed2b4SDenis Malikov }
1944405ed2b4SDenis Malikov
1945405ed2b4SDenis Malikov /***********************************************************************
1946405ed2b4SDenis Malikov * tp_threadpool_unlock (internal)
1947405ed2b4SDenis Malikov *
1948405ed2b4SDenis Malikov * Releases a lock on a threadpool.
1949405ed2b4SDenis Malikov */
tp_threadpool_unlock(struct threadpool * pool)1950405ed2b4SDenis Malikov static void tp_threadpool_unlock( struct threadpool *pool )
1951405ed2b4SDenis Malikov {
1952405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
1953405ed2b4SDenis Malikov pool->objcount--;
1954405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
1955405ed2b4SDenis Malikov tp_threadpool_release( pool );
1956405ed2b4SDenis Malikov }
1957405ed2b4SDenis Malikov
1958405ed2b4SDenis Malikov /***********************************************************************
1959405ed2b4SDenis Malikov * tp_group_alloc (internal)
1960405ed2b4SDenis Malikov *
1961405ed2b4SDenis Malikov * Allocates a new threadpool group object.
1962405ed2b4SDenis Malikov */
tp_group_alloc(struct threadpool_group ** out)1963405ed2b4SDenis Malikov static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1964405ed2b4SDenis Malikov {
1965405ed2b4SDenis Malikov struct threadpool_group *group;
1966405ed2b4SDenis Malikov
1967405ed2b4SDenis Malikov group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1968405ed2b4SDenis Malikov if (!group)
1969405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
1970405ed2b4SDenis Malikov
1971405ed2b4SDenis Malikov group->refcount = 1;
1972405ed2b4SDenis Malikov group->shutdown = FALSE;
1973405ed2b4SDenis Malikov
1974*0bf42067SJustin Miller #ifdef __REACTOS__
1975*0bf42067SJustin Miller RtlInitializeCriticalSection( &group->cs );
1976*0bf42067SJustin Miller #else
1977405ed2b4SDenis Malikov RtlInitializeCriticalSectionEx( &group->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1978*0bf42067SJustin Miller
1979405ed2b4SDenis Malikov group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1980*0bf42067SJustin Miller #endif
1981405ed2b4SDenis Malikov
1982405ed2b4SDenis Malikov list_init( &group->members );
1983405ed2b4SDenis Malikov
1984405ed2b4SDenis Malikov TRACE( "allocated group %p\n", group );
1985405ed2b4SDenis Malikov
1986405ed2b4SDenis Malikov *out = group;
1987405ed2b4SDenis Malikov return STATUS_SUCCESS;
1988405ed2b4SDenis Malikov }
1989405ed2b4SDenis Malikov
1990405ed2b4SDenis Malikov /***********************************************************************
1991405ed2b4SDenis Malikov * tp_group_shutdown (internal)
1992405ed2b4SDenis Malikov *
1993405ed2b4SDenis Malikov * Marks the group object for shutdown.
1994405ed2b4SDenis Malikov */
tp_group_shutdown(struct threadpool_group * group)1995405ed2b4SDenis Malikov static void tp_group_shutdown( struct threadpool_group *group )
1996405ed2b4SDenis Malikov {
1997405ed2b4SDenis Malikov group->shutdown = TRUE;
1998405ed2b4SDenis Malikov }
1999405ed2b4SDenis Malikov
2000405ed2b4SDenis Malikov /***********************************************************************
2001405ed2b4SDenis Malikov * tp_group_release (internal)
2002405ed2b4SDenis Malikov *
2003405ed2b4SDenis Malikov * Releases a reference to a group object.
2004405ed2b4SDenis Malikov */
tp_group_release(struct threadpool_group * group)2005405ed2b4SDenis Malikov static BOOL tp_group_release( struct threadpool_group *group )
2006405ed2b4SDenis Malikov {
2007405ed2b4SDenis Malikov if (InterlockedDecrement( &group->refcount ))
2008405ed2b4SDenis Malikov return FALSE;
2009405ed2b4SDenis Malikov
2010405ed2b4SDenis Malikov TRACE( "destroying group %p\n", group );
2011405ed2b4SDenis Malikov
2012405ed2b4SDenis Malikov assert( group->shutdown );
2013405ed2b4SDenis Malikov assert( list_empty( &group->members ) );
2014405ed2b4SDenis Malikov
2015*0bf42067SJustin Miller #ifndef __REACTOS__
2016405ed2b4SDenis Malikov group->cs.DebugInfo->Spare[0] = 0;
2017*0bf42067SJustin Miller #endif
2018405ed2b4SDenis Malikov RtlDeleteCriticalSection( &group->cs );
2019405ed2b4SDenis Malikov
2020405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, group );
2021405ed2b4SDenis Malikov return TRUE;
2022405ed2b4SDenis Malikov }
2023405ed2b4SDenis Malikov
2024405ed2b4SDenis Malikov /***********************************************************************
2025405ed2b4SDenis Malikov * tp_object_initialize (internal)
2026405ed2b4SDenis Malikov *
2027405ed2b4SDenis Malikov * Initializes members of a threadpool object.
2028405ed2b4SDenis Malikov */
tp_object_initialize(struct threadpool_object * object,struct threadpool * pool,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2029405ed2b4SDenis Malikov static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2030405ed2b4SDenis Malikov PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2031405ed2b4SDenis Malikov {
2032405ed2b4SDenis Malikov BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2033405ed2b4SDenis Malikov
2034405ed2b4SDenis Malikov object->refcount = 1;
2035405ed2b4SDenis Malikov object->shutdown = FALSE;
2036405ed2b4SDenis Malikov
2037405ed2b4SDenis Malikov object->pool = pool;
2038405ed2b4SDenis Malikov object->group = NULL;
2039405ed2b4SDenis Malikov object->userdata = userdata;
2040405ed2b4SDenis Malikov object->group_cancel_callback = NULL;
2041405ed2b4SDenis Malikov object->finalization_callback = NULL;
2042405ed2b4SDenis Malikov object->may_run_long = 0;
2043405ed2b4SDenis Malikov object->race_dll = NULL;
2044405ed2b4SDenis Malikov object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2045405ed2b4SDenis Malikov
2046405ed2b4SDenis Malikov memset( &object->group_entry, 0, sizeof(object->group_entry) );
2047405ed2b4SDenis Malikov object->is_group_member = FALSE;
2048405ed2b4SDenis Malikov
2049405ed2b4SDenis Malikov memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2050405ed2b4SDenis Malikov RtlInitializeConditionVariable( &object->finished_event );
2051405ed2b4SDenis Malikov RtlInitializeConditionVariable( &object->group_finished_event );
2052405ed2b4SDenis Malikov object->completed_event = NULL;
2053405ed2b4SDenis Malikov object->num_pending_callbacks = 0;
2054405ed2b4SDenis Malikov object->num_running_callbacks = 0;
2055405ed2b4SDenis Malikov object->num_associated_callbacks = 0;
2056405ed2b4SDenis Malikov
2057405ed2b4SDenis Malikov if (environment)
2058405ed2b4SDenis Malikov {
2059405ed2b4SDenis Malikov if (environment->Version != 1 && environment->Version != 3)
2060405ed2b4SDenis Malikov FIXME( "unsupported environment version %lu\n", environment->Version );
2061405ed2b4SDenis Malikov
2062405ed2b4SDenis Malikov object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2063405ed2b4SDenis Malikov object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2064405ed2b4SDenis Malikov object->finalization_callback = environment->FinalizationCallback;
2065405ed2b4SDenis Malikov object->may_run_long = environment->u.s.LongFunction != 0;
2066405ed2b4SDenis Malikov object->race_dll = environment->RaceDll;
2067*0bf42067SJustin Miller #ifndef __REACTOS__ //Windows 7 stuff
2068405ed2b4SDenis Malikov if (environment->Version == 3)
2069405ed2b4SDenis Malikov {
2070405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2071405ed2b4SDenis Malikov
2072405ed2b4SDenis Malikov object->priority = environment_v3->CallbackPriority;
2073405ed2b4SDenis Malikov assert( object->priority < ARRAY_SIZE(pool->pools) );
2074405ed2b4SDenis Malikov }
2075*0bf42067SJustin Miller #endif
2076405ed2b4SDenis Malikov if (environment->ActivationContext)
2077405ed2b4SDenis Malikov FIXME( "activation context not supported yet\n" );
2078405ed2b4SDenis Malikov
2079405ed2b4SDenis Malikov if (environment->u.s.Persistent)
2080405ed2b4SDenis Malikov FIXME( "persistent threads not supported yet\n" );
2081405ed2b4SDenis Malikov }
2082405ed2b4SDenis Malikov
2083405ed2b4SDenis Malikov if (object->race_dll)
2084405ed2b4SDenis Malikov LdrAddRefDll( 0, object->race_dll );
2085405ed2b4SDenis Malikov
2086405ed2b4SDenis Malikov TRACE( "allocated object %p of type %u\n", object, object->type );
2087405ed2b4SDenis Malikov
2088405ed2b4SDenis Malikov /* For simple callbacks we have to run tp_object_submit before adding this object
2089405ed2b4SDenis Malikov * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2090405ed2b4SDenis Malikov * will be set, and tp_object_submit would fail with an assertion. */
2091405ed2b4SDenis Malikov
2092405ed2b4SDenis Malikov if (is_simple_callback)
2093405ed2b4SDenis Malikov tp_object_submit( object, FALSE );
2094405ed2b4SDenis Malikov
2095405ed2b4SDenis Malikov if (object->group)
2096405ed2b4SDenis Malikov {
2097405ed2b4SDenis Malikov struct threadpool_group *group = object->group;
2098405ed2b4SDenis Malikov InterlockedIncrement( &group->refcount );
2099405ed2b4SDenis Malikov
2100405ed2b4SDenis Malikov RtlEnterCriticalSection( &group->cs );
2101405ed2b4SDenis Malikov list_add_tail( &group->members, &object->group_entry );
2102405ed2b4SDenis Malikov object->is_group_member = TRUE;
2103405ed2b4SDenis Malikov RtlLeaveCriticalSection( &group->cs );
2104405ed2b4SDenis Malikov }
2105405ed2b4SDenis Malikov
2106405ed2b4SDenis Malikov if (is_simple_callback)
2107405ed2b4SDenis Malikov tp_object_release( object );
2108405ed2b4SDenis Malikov }
2109405ed2b4SDenis Malikov
tp_object_prio_queue(struct threadpool_object * object)2110405ed2b4SDenis Malikov static void tp_object_prio_queue( struct threadpool_object *object )
2111405ed2b4SDenis Malikov {
2112405ed2b4SDenis Malikov ++object->pool->num_busy_workers;
2113405ed2b4SDenis Malikov list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2114405ed2b4SDenis Malikov }
2115405ed2b4SDenis Malikov
2116405ed2b4SDenis Malikov /***********************************************************************
2117405ed2b4SDenis Malikov * tp_object_submit (internal)
2118405ed2b4SDenis Malikov *
2119405ed2b4SDenis Malikov * Submits a threadpool object to the associated threadpool. This
2120405ed2b4SDenis Malikov * function has to be VOID because TpPostWork can never fail on Windows.
2121405ed2b4SDenis Malikov */
tp_object_submit(struct threadpool_object * object,BOOL signaled)2122405ed2b4SDenis Malikov static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2123405ed2b4SDenis Malikov {
2124405ed2b4SDenis Malikov struct threadpool *pool = object->pool;
2125405ed2b4SDenis Malikov NTSTATUS status = STATUS_UNSUCCESSFUL;
2126405ed2b4SDenis Malikov
2127405ed2b4SDenis Malikov assert( !object->shutdown );
2128405ed2b4SDenis Malikov assert( !pool->shutdown );
2129405ed2b4SDenis Malikov
2130405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
2131405ed2b4SDenis Malikov
2132405ed2b4SDenis Malikov /* Start new worker threads if required. */
2133405ed2b4SDenis Malikov if (pool->num_busy_workers >= pool->num_workers &&
2134405ed2b4SDenis Malikov pool->num_workers < pool->max_workers)
2135405ed2b4SDenis Malikov status = tp_new_worker_thread( pool );
2136405ed2b4SDenis Malikov
2137405ed2b4SDenis Malikov /* Queue work item and increment refcount. */
2138405ed2b4SDenis Malikov InterlockedIncrement( &object->refcount );
2139405ed2b4SDenis Malikov if (!object->num_pending_callbacks++)
2140405ed2b4SDenis Malikov tp_object_prio_queue( object );
2141405ed2b4SDenis Malikov
2142405ed2b4SDenis Malikov /* Count how often the object was signaled. */
2143405ed2b4SDenis Malikov if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2144405ed2b4SDenis Malikov object->u.wait.signaled++;
2145405ed2b4SDenis Malikov
2146405ed2b4SDenis Malikov /* No new thread started - wake up one existing thread. */
2147405ed2b4SDenis Malikov if (status != STATUS_SUCCESS)
2148405ed2b4SDenis Malikov {
2149405ed2b4SDenis Malikov assert( pool->num_workers > 0 );
2150405ed2b4SDenis Malikov RtlWakeConditionVariable( &pool->update_event );
2151405ed2b4SDenis Malikov }
2152405ed2b4SDenis Malikov
2153405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
2154405ed2b4SDenis Malikov }
2155405ed2b4SDenis Malikov
2156405ed2b4SDenis Malikov /***********************************************************************
2157405ed2b4SDenis Malikov * tp_object_cancel (internal)
2158405ed2b4SDenis Malikov *
2159405ed2b4SDenis Malikov * Cancels all currently pending callbacks for a specific object.
2160405ed2b4SDenis Malikov */
tp_object_cancel(struct threadpool_object * object)2161405ed2b4SDenis Malikov static void tp_object_cancel( struct threadpool_object *object )
2162405ed2b4SDenis Malikov {
2163405ed2b4SDenis Malikov struct threadpool *pool = object->pool;
2164405ed2b4SDenis Malikov LONG pending_callbacks = 0;
2165405ed2b4SDenis Malikov
2166405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
2167405ed2b4SDenis Malikov if (object->num_pending_callbacks)
2168405ed2b4SDenis Malikov {
2169405ed2b4SDenis Malikov pending_callbacks = object->num_pending_callbacks;
2170405ed2b4SDenis Malikov object->num_pending_callbacks = 0;
2171405ed2b4SDenis Malikov list_remove( &object->pool_entry );
2172405ed2b4SDenis Malikov
2173405ed2b4SDenis Malikov if (object->type == TP_OBJECT_TYPE_WAIT)
2174405ed2b4SDenis Malikov object->u.wait.signaled = 0;
2175405ed2b4SDenis Malikov }
2176405ed2b4SDenis Malikov if (object->type == TP_OBJECT_TYPE_IO)
2177405ed2b4SDenis Malikov {
2178405ed2b4SDenis Malikov object->u.io.skipped_count += object->u.io.pending_count;
2179405ed2b4SDenis Malikov object->u.io.pending_count = 0;
2180405ed2b4SDenis Malikov }
2181405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
2182405ed2b4SDenis Malikov
2183405ed2b4SDenis Malikov while (pending_callbacks--)
2184405ed2b4SDenis Malikov tp_object_release( object );
2185405ed2b4SDenis Malikov }
2186405ed2b4SDenis Malikov
object_is_finished(struct threadpool_object * object,BOOL group)2187405ed2b4SDenis Malikov static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2188405ed2b4SDenis Malikov {
2189405ed2b4SDenis Malikov if (object->num_pending_callbacks)
2190405ed2b4SDenis Malikov return FALSE;
2191405ed2b4SDenis Malikov if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2192405ed2b4SDenis Malikov return FALSE;
2193405ed2b4SDenis Malikov
2194405ed2b4SDenis Malikov if (group)
2195405ed2b4SDenis Malikov return !object->num_running_callbacks;
2196405ed2b4SDenis Malikov else
2197405ed2b4SDenis Malikov return !object->num_associated_callbacks;
2198405ed2b4SDenis Malikov }
2199405ed2b4SDenis Malikov
2200405ed2b4SDenis Malikov /***********************************************************************
2201405ed2b4SDenis Malikov * tp_object_wait (internal)
2202405ed2b4SDenis Malikov *
2203405ed2b4SDenis Malikov * Waits until all pending and running callbacks of a specific object
2204405ed2b4SDenis Malikov * have been processed.
2205405ed2b4SDenis Malikov */
tp_object_wait(struct threadpool_object * object,BOOL group_wait)2206405ed2b4SDenis Malikov static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2207405ed2b4SDenis Malikov {
2208405ed2b4SDenis Malikov struct threadpool *pool = object->pool;
2209405ed2b4SDenis Malikov
2210405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
2211405ed2b4SDenis Malikov while (!object_is_finished( object, group_wait ))
2212405ed2b4SDenis Malikov {
2213405ed2b4SDenis Malikov if (group_wait)
2214405ed2b4SDenis Malikov RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2215405ed2b4SDenis Malikov else
2216405ed2b4SDenis Malikov RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2217405ed2b4SDenis Malikov }
2218405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
2219405ed2b4SDenis Malikov }
2220405ed2b4SDenis Malikov
tp_ioqueue_unlock(struct threadpool_object * io)2221405ed2b4SDenis Malikov static void tp_ioqueue_unlock( struct threadpool_object *io )
2222405ed2b4SDenis Malikov {
2223405ed2b4SDenis Malikov assert( io->type == TP_OBJECT_TYPE_IO );
2224405ed2b4SDenis Malikov
2225405ed2b4SDenis Malikov RtlEnterCriticalSection( &ioqueue.cs );
2226405ed2b4SDenis Malikov
2227405ed2b4SDenis Malikov assert(ioqueue.objcount);
2228405ed2b4SDenis Malikov
2229405ed2b4SDenis Malikov if (!io->shutdown && !--ioqueue.objcount)
2230405ed2b4SDenis Malikov NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2231405ed2b4SDenis Malikov
2232405ed2b4SDenis Malikov RtlLeaveCriticalSection( &ioqueue.cs );
2233405ed2b4SDenis Malikov }
2234405ed2b4SDenis Malikov
2235405ed2b4SDenis Malikov /***********************************************************************
2236405ed2b4SDenis Malikov * tp_object_prepare_shutdown (internal)
2237405ed2b4SDenis Malikov *
2238405ed2b4SDenis Malikov * Prepares a threadpool object for shutdown.
2239405ed2b4SDenis Malikov */
tp_object_prepare_shutdown(struct threadpool_object * object)2240405ed2b4SDenis Malikov static void tp_object_prepare_shutdown( struct threadpool_object *object )
2241405ed2b4SDenis Malikov {
2242405ed2b4SDenis Malikov if (object->type == TP_OBJECT_TYPE_TIMER)
2243405ed2b4SDenis Malikov tp_timerqueue_unlock( object );
2244405ed2b4SDenis Malikov else if (object->type == TP_OBJECT_TYPE_WAIT)
2245405ed2b4SDenis Malikov tp_waitqueue_unlock( object );
2246405ed2b4SDenis Malikov else if (object->type == TP_OBJECT_TYPE_IO)
2247405ed2b4SDenis Malikov tp_ioqueue_unlock( object );
2248405ed2b4SDenis Malikov }
2249405ed2b4SDenis Malikov
2250405ed2b4SDenis Malikov /***********************************************************************
2251405ed2b4SDenis Malikov * tp_object_release (internal)
2252405ed2b4SDenis Malikov *
2253405ed2b4SDenis Malikov * Releases a reference to a threadpool object.
2254405ed2b4SDenis Malikov */
tp_object_release(struct threadpool_object * object)2255405ed2b4SDenis Malikov static BOOL tp_object_release( struct threadpool_object *object )
2256405ed2b4SDenis Malikov {
2257405ed2b4SDenis Malikov if (InterlockedDecrement( &object->refcount ))
2258405ed2b4SDenis Malikov return FALSE;
2259405ed2b4SDenis Malikov
2260405ed2b4SDenis Malikov TRACE( "destroying object %p of type %u\n", object, object->type );
2261405ed2b4SDenis Malikov
2262405ed2b4SDenis Malikov assert( object->shutdown );
2263405ed2b4SDenis Malikov assert( !object->num_pending_callbacks );
2264405ed2b4SDenis Malikov assert( !object->num_running_callbacks );
2265405ed2b4SDenis Malikov assert( !object->num_associated_callbacks );
2266405ed2b4SDenis Malikov
2267405ed2b4SDenis Malikov /* release reference to the group */
2268405ed2b4SDenis Malikov if (object->group)
2269405ed2b4SDenis Malikov {
2270405ed2b4SDenis Malikov struct threadpool_group *group = object->group;
2271405ed2b4SDenis Malikov
2272405ed2b4SDenis Malikov RtlEnterCriticalSection( &group->cs );
2273405ed2b4SDenis Malikov if (object->is_group_member)
2274405ed2b4SDenis Malikov {
2275405ed2b4SDenis Malikov list_remove( &object->group_entry );
2276405ed2b4SDenis Malikov object->is_group_member = FALSE;
2277405ed2b4SDenis Malikov }
2278405ed2b4SDenis Malikov RtlLeaveCriticalSection( &group->cs );
2279405ed2b4SDenis Malikov
2280405ed2b4SDenis Malikov tp_group_release( group );
2281405ed2b4SDenis Malikov }
2282405ed2b4SDenis Malikov
2283405ed2b4SDenis Malikov tp_threadpool_unlock( object->pool );
2284405ed2b4SDenis Malikov
2285405ed2b4SDenis Malikov if (object->race_dll)
2286405ed2b4SDenis Malikov LdrUnloadDll( object->race_dll );
2287405ed2b4SDenis Malikov
2288405ed2b4SDenis Malikov if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2289405ed2b4SDenis Malikov NtSetEvent( object->completed_event, NULL );
2290405ed2b4SDenis Malikov
2291405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2292405ed2b4SDenis Malikov return TRUE;
2293405ed2b4SDenis Malikov }
2294405ed2b4SDenis Malikov
threadpool_get_next_item(const struct threadpool * pool)2295405ed2b4SDenis Malikov static struct list *threadpool_get_next_item( const struct threadpool *pool )
2296405ed2b4SDenis Malikov {
2297405ed2b4SDenis Malikov struct list *ptr;
2298405ed2b4SDenis Malikov unsigned int i;
2299405ed2b4SDenis Malikov
2300405ed2b4SDenis Malikov for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2301405ed2b4SDenis Malikov {
2302405ed2b4SDenis Malikov if ((ptr = list_head( &pool->pools[i] )))
2303405ed2b4SDenis Malikov break;
2304405ed2b4SDenis Malikov }
2305405ed2b4SDenis Malikov
2306405ed2b4SDenis Malikov return ptr;
2307405ed2b4SDenis Malikov }
2308405ed2b4SDenis Malikov
2309405ed2b4SDenis Malikov /***********************************************************************
2310405ed2b4SDenis Malikov * tp_object_execute (internal)
2311405ed2b4SDenis Malikov *
2312405ed2b4SDenis Malikov * Executes a threadpool object callback, object->pool->cs has to be
2313405ed2b4SDenis Malikov * held.
2314405ed2b4SDenis Malikov */
tp_object_execute(struct threadpool_object * object,BOOL wait_thread)2315405ed2b4SDenis Malikov static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2316405ed2b4SDenis Malikov {
2317405ed2b4SDenis Malikov TP_CALLBACK_INSTANCE *callback_instance;
2318405ed2b4SDenis Malikov struct threadpool_instance instance;
2319405ed2b4SDenis Malikov struct io_completion completion;
2320405ed2b4SDenis Malikov struct threadpool *pool = object->pool;
2321405ed2b4SDenis Malikov TP_WAIT_RESULT wait_result = 0;
2322405ed2b4SDenis Malikov NTSTATUS status;
2323405ed2b4SDenis Malikov
2324405ed2b4SDenis Malikov object->num_pending_callbacks--;
2325405ed2b4SDenis Malikov
2326405ed2b4SDenis Malikov /* For wait objects check if they were signaled or have timed out. */
2327405ed2b4SDenis Malikov if (object->type == TP_OBJECT_TYPE_WAIT)
2328405ed2b4SDenis Malikov {
2329405ed2b4SDenis Malikov wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2330405ed2b4SDenis Malikov if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2331405ed2b4SDenis Malikov }
2332405ed2b4SDenis Malikov else if (object->type == TP_OBJECT_TYPE_IO)
2333405ed2b4SDenis Malikov {
2334405ed2b4SDenis Malikov assert( object->u.io.completion_count );
2335405ed2b4SDenis Malikov completion = object->u.io.completions[--object->u.io.completion_count];
2336405ed2b4SDenis Malikov }
2337405ed2b4SDenis Malikov
2338405ed2b4SDenis Malikov /* Leave critical section and do the actual callback. */
2339405ed2b4SDenis Malikov object->num_associated_callbacks++;
2340405ed2b4SDenis Malikov object->num_running_callbacks++;
2341405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
2342405ed2b4SDenis Malikov if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2343405ed2b4SDenis Malikov
2344405ed2b4SDenis Malikov /* Initialize threadpool instance struct. */
2345405ed2b4SDenis Malikov callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2346405ed2b4SDenis Malikov instance.object = object;
2347405ed2b4SDenis Malikov instance.threadid = GetCurrentThreadId();
2348405ed2b4SDenis Malikov instance.associated = TRUE;
2349405ed2b4SDenis Malikov instance.may_run_long = object->may_run_long;
2350405ed2b4SDenis Malikov instance.cleanup.critical_section = NULL;
2351405ed2b4SDenis Malikov instance.cleanup.mutex = NULL;
2352405ed2b4SDenis Malikov instance.cleanup.semaphore = NULL;
2353405ed2b4SDenis Malikov instance.cleanup.semaphore_count = 0;
2354405ed2b4SDenis Malikov instance.cleanup.event = NULL;
2355405ed2b4SDenis Malikov instance.cleanup.library = NULL;
2356405ed2b4SDenis Malikov
2357405ed2b4SDenis Malikov switch (object->type)
2358405ed2b4SDenis Malikov {
2359405ed2b4SDenis Malikov case TP_OBJECT_TYPE_SIMPLE:
2360405ed2b4SDenis Malikov {
2361405ed2b4SDenis Malikov TRACE( "executing simple callback %p(%p, %p)\n",
2362405ed2b4SDenis Malikov object->u.simple.callback, callback_instance, object->userdata );
2363405ed2b4SDenis Malikov object->u.simple.callback( callback_instance, object->userdata );
2364405ed2b4SDenis Malikov TRACE( "callback %p returned\n", object->u.simple.callback );
2365405ed2b4SDenis Malikov break;
2366405ed2b4SDenis Malikov }
2367405ed2b4SDenis Malikov
2368405ed2b4SDenis Malikov case TP_OBJECT_TYPE_WORK:
2369405ed2b4SDenis Malikov {
2370405ed2b4SDenis Malikov TRACE( "executing work callback %p(%p, %p, %p)\n",
2371405ed2b4SDenis Malikov object->u.work.callback, callback_instance, object->userdata, object );
2372405ed2b4SDenis Malikov object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2373405ed2b4SDenis Malikov TRACE( "callback %p returned\n", object->u.work.callback );
2374405ed2b4SDenis Malikov break;
2375405ed2b4SDenis Malikov }
2376405ed2b4SDenis Malikov
2377405ed2b4SDenis Malikov case TP_OBJECT_TYPE_TIMER:
2378405ed2b4SDenis Malikov {
2379405ed2b4SDenis Malikov TRACE( "executing timer callback %p(%p, %p, %p)\n",
2380405ed2b4SDenis Malikov object->u.timer.callback, callback_instance, object->userdata, object );
2381405ed2b4SDenis Malikov object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2382405ed2b4SDenis Malikov TRACE( "callback %p returned\n", object->u.timer.callback );
2383405ed2b4SDenis Malikov break;
2384405ed2b4SDenis Malikov }
2385405ed2b4SDenis Malikov
2386405ed2b4SDenis Malikov case TP_OBJECT_TYPE_WAIT:
2387405ed2b4SDenis Malikov {
2388405ed2b4SDenis Malikov TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2389405ed2b4SDenis Malikov object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2390405ed2b4SDenis Malikov object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2391405ed2b4SDenis Malikov TRACE( "callback %p returned\n", object->u.wait.callback );
2392405ed2b4SDenis Malikov break;
2393405ed2b4SDenis Malikov }
2394405ed2b4SDenis Malikov
2395405ed2b4SDenis Malikov case TP_OBJECT_TYPE_IO:
2396405ed2b4SDenis Malikov {
2397405ed2b4SDenis Malikov TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2398405ed2b4SDenis Malikov object->u.io.callback, callback_instance, object->userdata,
2399405ed2b4SDenis Malikov completion.cvalue, &completion.iosb, (TP_IO *)object );
2400405ed2b4SDenis Malikov object->u.io.callback( callback_instance, object->userdata,
2401405ed2b4SDenis Malikov (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2402405ed2b4SDenis Malikov TRACE( "callback %p returned\n", object->u.io.callback );
2403405ed2b4SDenis Malikov break;
2404405ed2b4SDenis Malikov }
2405405ed2b4SDenis Malikov
2406405ed2b4SDenis Malikov default:
2407405ed2b4SDenis Malikov assert(0);
2408405ed2b4SDenis Malikov break;
2409405ed2b4SDenis Malikov }
2410405ed2b4SDenis Malikov
2411405ed2b4SDenis Malikov /* Execute finalization callback. */
2412405ed2b4SDenis Malikov if (object->finalization_callback)
2413405ed2b4SDenis Malikov {
2414405ed2b4SDenis Malikov TRACE( "executing finalization callback %p(%p, %p)\n",
2415405ed2b4SDenis Malikov object->finalization_callback, callback_instance, object->userdata );
2416405ed2b4SDenis Malikov object->finalization_callback( callback_instance, object->userdata );
2417405ed2b4SDenis Malikov TRACE( "callback %p returned\n", object->finalization_callback );
2418405ed2b4SDenis Malikov }
2419405ed2b4SDenis Malikov
2420405ed2b4SDenis Malikov /* Execute cleanup tasks. */
2421405ed2b4SDenis Malikov if (instance.cleanup.critical_section)
2422405ed2b4SDenis Malikov {
2423405ed2b4SDenis Malikov RtlLeaveCriticalSection( instance.cleanup.critical_section );
2424405ed2b4SDenis Malikov }
2425405ed2b4SDenis Malikov if (instance.cleanup.mutex)
2426405ed2b4SDenis Malikov {
2427405ed2b4SDenis Malikov status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2428405ed2b4SDenis Malikov if (status != STATUS_SUCCESS) goto skip_cleanup;
2429405ed2b4SDenis Malikov }
2430405ed2b4SDenis Malikov if (instance.cleanup.semaphore)
2431405ed2b4SDenis Malikov {
2432405ed2b4SDenis Malikov status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2433405ed2b4SDenis Malikov if (status != STATUS_SUCCESS) goto skip_cleanup;
2434405ed2b4SDenis Malikov }
2435405ed2b4SDenis Malikov if (instance.cleanup.event)
2436405ed2b4SDenis Malikov {
2437405ed2b4SDenis Malikov status = NtSetEvent( instance.cleanup.event, NULL );
2438405ed2b4SDenis Malikov if (status != STATUS_SUCCESS) goto skip_cleanup;
2439405ed2b4SDenis Malikov }
2440405ed2b4SDenis Malikov if (instance.cleanup.library)
2441405ed2b4SDenis Malikov {
2442405ed2b4SDenis Malikov LdrUnloadDll( instance.cleanup.library );
2443405ed2b4SDenis Malikov }
2444405ed2b4SDenis Malikov
2445405ed2b4SDenis Malikov skip_cleanup:
2446405ed2b4SDenis Malikov if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2447405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
2448405ed2b4SDenis Malikov
2449405ed2b4SDenis Malikov /* Simple callbacks are automatically shutdown after execution. */
2450405ed2b4SDenis Malikov if (object->type == TP_OBJECT_TYPE_SIMPLE)
2451405ed2b4SDenis Malikov {
2452405ed2b4SDenis Malikov tp_object_prepare_shutdown( object );
2453405ed2b4SDenis Malikov object->shutdown = TRUE;
2454405ed2b4SDenis Malikov }
2455405ed2b4SDenis Malikov
2456405ed2b4SDenis Malikov object->num_running_callbacks--;
2457405ed2b4SDenis Malikov if (object_is_finished( object, TRUE ))
2458405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &object->group_finished_event );
2459405ed2b4SDenis Malikov
2460405ed2b4SDenis Malikov if (instance.associated)
2461405ed2b4SDenis Malikov {
2462405ed2b4SDenis Malikov object->num_associated_callbacks--;
2463405ed2b4SDenis Malikov if (object_is_finished( object, FALSE ))
2464405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &object->finished_event );
2465405ed2b4SDenis Malikov }
2466405ed2b4SDenis Malikov }
2467405ed2b4SDenis Malikov
2468405ed2b4SDenis Malikov /***********************************************************************
2469405ed2b4SDenis Malikov * threadpool_worker_proc (internal)
2470405ed2b4SDenis Malikov */
2471*0bf42067SJustin Miller #ifdef __REACTOS__
threadpool_worker_proc(PVOID param)2472*0bf42067SJustin Miller ULONG NTAPI threadpool_worker_proc(PVOID param )
2473*0bf42067SJustin Miller #else
2474405ed2b4SDenis Malikov static void CALLBACK threadpool_worker_proc( void *param )
2475*0bf42067SJustin Miller #endif
2476405ed2b4SDenis Malikov {
2477405ed2b4SDenis Malikov struct threadpool *pool = param;
2478405ed2b4SDenis Malikov LARGE_INTEGER timeout;
2479405ed2b4SDenis Malikov struct list *ptr;
2480405ed2b4SDenis Malikov
2481405ed2b4SDenis Malikov TRACE( "starting worker thread for pool %p\n", pool );
2482405ed2b4SDenis Malikov set_thread_name(L"wine_threadpool_worker");
2483405ed2b4SDenis Malikov
2484405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
2485405ed2b4SDenis Malikov for (;;)
2486405ed2b4SDenis Malikov {
2487405ed2b4SDenis Malikov while ((ptr = threadpool_get_next_item( pool )))
2488405ed2b4SDenis Malikov {
2489405ed2b4SDenis Malikov struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2490405ed2b4SDenis Malikov assert( object->num_pending_callbacks > 0 );
2491405ed2b4SDenis Malikov
2492405ed2b4SDenis Malikov /* If further pending callbacks are queued, move the work item to
2493405ed2b4SDenis Malikov * the end of the pool list. Otherwise remove it from the pool. */
2494405ed2b4SDenis Malikov list_remove( &object->pool_entry );
2495405ed2b4SDenis Malikov if (object->num_pending_callbacks > 1)
2496405ed2b4SDenis Malikov tp_object_prio_queue( object );
2497405ed2b4SDenis Malikov
2498405ed2b4SDenis Malikov tp_object_execute( object, FALSE );
2499405ed2b4SDenis Malikov
2500405ed2b4SDenis Malikov assert(pool->num_busy_workers);
2501405ed2b4SDenis Malikov pool->num_busy_workers--;
2502405ed2b4SDenis Malikov
2503405ed2b4SDenis Malikov tp_object_release( object );
2504405ed2b4SDenis Malikov }
2505405ed2b4SDenis Malikov
2506405ed2b4SDenis Malikov /* Shutdown worker thread if requested. */
2507405ed2b4SDenis Malikov if (pool->shutdown)
2508405ed2b4SDenis Malikov break;
2509405ed2b4SDenis Malikov
2510405ed2b4SDenis Malikov /* Wait for new tasks or until the timeout expires. A thread only terminates
2511405ed2b4SDenis Malikov * when no new tasks are available, and the number of threads can be
2512405ed2b4SDenis Malikov * decreased without violating the min_workers limit. An exception is when
2513405ed2b4SDenis Malikov * min_workers == 0, then objcount is used to detect if the last thread
2514405ed2b4SDenis Malikov * can be terminated. */
2515405ed2b4SDenis Malikov timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2516405ed2b4SDenis Malikov if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2517405ed2b4SDenis Malikov !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2518405ed2b4SDenis Malikov (!pool->min_workers && !pool->objcount)))
2519405ed2b4SDenis Malikov {
2520405ed2b4SDenis Malikov break;
2521405ed2b4SDenis Malikov }
2522405ed2b4SDenis Malikov }
2523405ed2b4SDenis Malikov pool->num_workers--;
2524405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
2525405ed2b4SDenis Malikov
2526405ed2b4SDenis Malikov TRACE( "terminating worker thread for pool %p\n", pool );
2527405ed2b4SDenis Malikov tp_threadpool_release( pool );
2528405ed2b4SDenis Malikov RtlExitUserThread( 0 );
2529*0bf42067SJustin Miller #ifdef __REACTOS__
2530*0bf42067SJustin Miller return STATUS_SUCCESS;
2531*0bf42067SJustin Miller #endif
2532405ed2b4SDenis Malikov }
2533405ed2b4SDenis Malikov
2534405ed2b4SDenis Malikov /***********************************************************************
2535405ed2b4SDenis Malikov * TpAllocCleanupGroup (NTDLL.@)
2536405ed2b4SDenis Malikov */
TpAllocCleanupGroup(TP_CLEANUP_GROUP ** out)2537405ed2b4SDenis Malikov NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2538405ed2b4SDenis Malikov {
2539405ed2b4SDenis Malikov TRACE( "%p\n", out );
2540405ed2b4SDenis Malikov
2541405ed2b4SDenis Malikov return tp_group_alloc( (struct threadpool_group **)out );
2542405ed2b4SDenis Malikov }
2543405ed2b4SDenis Malikov
2544405ed2b4SDenis Malikov /***********************************************************************
2545405ed2b4SDenis Malikov * TpAllocIoCompletion (NTDLL.@)
2546405ed2b4SDenis Malikov */
TpAllocIoCompletion(TP_IO ** out,HANDLE file,PTP_IO_CALLBACK callback,void * userdata,TP_CALLBACK_ENVIRON * environment)2547405ed2b4SDenis Malikov NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2548405ed2b4SDenis Malikov void *userdata, TP_CALLBACK_ENVIRON *environment )
2549405ed2b4SDenis Malikov {
2550405ed2b4SDenis Malikov struct threadpool_object *object;
2551405ed2b4SDenis Malikov struct threadpool *pool;
2552405ed2b4SDenis Malikov NTSTATUS status;
2553405ed2b4SDenis Malikov
2554405ed2b4SDenis Malikov TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2555405ed2b4SDenis Malikov
2556405ed2b4SDenis Malikov if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2557405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
2558405ed2b4SDenis Malikov
2559405ed2b4SDenis Malikov if ((status = tp_threadpool_lock( &pool, environment )))
2560405ed2b4SDenis Malikov {
2561405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2562405ed2b4SDenis Malikov return status;
2563405ed2b4SDenis Malikov }
2564405ed2b4SDenis Malikov
2565405ed2b4SDenis Malikov object->type = TP_OBJECT_TYPE_IO;
2566405ed2b4SDenis Malikov object->u.io.callback = callback;
2567405ed2b4SDenis Malikov if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2568405ed2b4SDenis Malikov {
2569405ed2b4SDenis Malikov tp_threadpool_unlock( pool );
2570405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2571405ed2b4SDenis Malikov return status;
2572405ed2b4SDenis Malikov }
2573405ed2b4SDenis Malikov
2574405ed2b4SDenis Malikov if ((status = tp_ioqueue_lock( object, file )))
2575405ed2b4SDenis Malikov {
2576405ed2b4SDenis Malikov tp_threadpool_unlock( pool );
2577405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2578405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2579405ed2b4SDenis Malikov return status;
2580405ed2b4SDenis Malikov }
2581405ed2b4SDenis Malikov
2582405ed2b4SDenis Malikov tp_object_initialize( object, pool, userdata, environment );
2583405ed2b4SDenis Malikov
2584405ed2b4SDenis Malikov *out = (TP_IO *)object;
2585405ed2b4SDenis Malikov return STATUS_SUCCESS;
2586405ed2b4SDenis Malikov }
2587405ed2b4SDenis Malikov
2588405ed2b4SDenis Malikov /***********************************************************************
2589405ed2b4SDenis Malikov * TpAllocPool (NTDLL.@)
2590405ed2b4SDenis Malikov */
TpAllocPool(TP_POOL ** out,PVOID reserved)2591405ed2b4SDenis Malikov NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2592405ed2b4SDenis Malikov {
2593405ed2b4SDenis Malikov TRACE( "%p %p\n", out, reserved );
2594405ed2b4SDenis Malikov
2595405ed2b4SDenis Malikov if (reserved)
2596405ed2b4SDenis Malikov FIXME( "reserved argument is nonzero (%p)\n", reserved );
2597405ed2b4SDenis Malikov
2598405ed2b4SDenis Malikov return tp_threadpool_alloc( (struct threadpool **)out );
2599405ed2b4SDenis Malikov }
2600405ed2b4SDenis Malikov
2601405ed2b4SDenis Malikov /***********************************************************************
2602405ed2b4SDenis Malikov * TpAllocTimer (NTDLL.@)
2603405ed2b4SDenis Malikov */
TpAllocTimer(TP_TIMER ** out,PTP_TIMER_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2604405ed2b4SDenis Malikov NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2605405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON *environment )
2606405ed2b4SDenis Malikov {
2607405ed2b4SDenis Malikov struct threadpool_object *object;
2608405ed2b4SDenis Malikov struct threadpool *pool;
2609405ed2b4SDenis Malikov NTSTATUS status;
2610405ed2b4SDenis Malikov
2611405ed2b4SDenis Malikov TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2612405ed2b4SDenis Malikov
2613405ed2b4SDenis Malikov object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2614405ed2b4SDenis Malikov if (!object)
2615405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
2616405ed2b4SDenis Malikov
2617405ed2b4SDenis Malikov status = tp_threadpool_lock( &pool, environment );
2618405ed2b4SDenis Malikov if (status)
2619405ed2b4SDenis Malikov {
2620405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2621405ed2b4SDenis Malikov return status;
2622405ed2b4SDenis Malikov }
2623405ed2b4SDenis Malikov
2624405ed2b4SDenis Malikov object->type = TP_OBJECT_TYPE_TIMER;
2625405ed2b4SDenis Malikov object->u.timer.callback = callback;
2626405ed2b4SDenis Malikov
2627405ed2b4SDenis Malikov status = tp_timerqueue_lock( object );
2628405ed2b4SDenis Malikov if (status)
2629405ed2b4SDenis Malikov {
2630405ed2b4SDenis Malikov tp_threadpool_unlock( pool );
2631405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2632405ed2b4SDenis Malikov return status;
2633405ed2b4SDenis Malikov }
2634405ed2b4SDenis Malikov
2635405ed2b4SDenis Malikov tp_object_initialize( object, pool, userdata, environment );
2636405ed2b4SDenis Malikov
2637405ed2b4SDenis Malikov *out = (TP_TIMER *)object;
2638405ed2b4SDenis Malikov return STATUS_SUCCESS;
2639405ed2b4SDenis Malikov }
2640405ed2b4SDenis Malikov
tp_alloc_wait(TP_WAIT ** out,PTP_WAIT_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment,DWORD flags)2641405ed2b4SDenis Malikov static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2642405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON *environment, DWORD flags )
2643405ed2b4SDenis Malikov {
2644405ed2b4SDenis Malikov struct threadpool_object *object;
2645405ed2b4SDenis Malikov struct threadpool *pool;
2646405ed2b4SDenis Malikov NTSTATUS status;
2647405ed2b4SDenis Malikov
2648405ed2b4SDenis Malikov object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2649405ed2b4SDenis Malikov if (!object)
2650405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
2651405ed2b4SDenis Malikov
2652405ed2b4SDenis Malikov status = tp_threadpool_lock( &pool, environment );
2653405ed2b4SDenis Malikov if (status)
2654405ed2b4SDenis Malikov {
2655405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2656405ed2b4SDenis Malikov return status;
2657405ed2b4SDenis Malikov }
2658405ed2b4SDenis Malikov
2659405ed2b4SDenis Malikov object->type = TP_OBJECT_TYPE_WAIT;
2660405ed2b4SDenis Malikov object->u.wait.callback = callback;
2661405ed2b4SDenis Malikov object->u.wait.flags = flags;
2662405ed2b4SDenis Malikov
2663405ed2b4SDenis Malikov status = tp_waitqueue_lock( object );
2664405ed2b4SDenis Malikov if (status)
2665405ed2b4SDenis Malikov {
2666405ed2b4SDenis Malikov tp_threadpool_unlock( pool );
2667405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2668405ed2b4SDenis Malikov return status;
2669405ed2b4SDenis Malikov }
2670405ed2b4SDenis Malikov
2671405ed2b4SDenis Malikov tp_object_initialize( object, pool, userdata, environment );
2672405ed2b4SDenis Malikov
2673405ed2b4SDenis Malikov *out = (TP_WAIT *)object;
2674405ed2b4SDenis Malikov return STATUS_SUCCESS;
2675405ed2b4SDenis Malikov }
2676405ed2b4SDenis Malikov
2677405ed2b4SDenis Malikov /***********************************************************************
2678405ed2b4SDenis Malikov * TpAllocWait (NTDLL.@)
2679405ed2b4SDenis Malikov */
TpAllocWait(TP_WAIT ** out,PTP_WAIT_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2680405ed2b4SDenis Malikov NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2681405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON *environment )
2682405ed2b4SDenis Malikov {
2683405ed2b4SDenis Malikov TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2684405ed2b4SDenis Malikov return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2685405ed2b4SDenis Malikov }
2686405ed2b4SDenis Malikov
2687405ed2b4SDenis Malikov /***********************************************************************
2688405ed2b4SDenis Malikov * TpAllocWork (NTDLL.@)
2689405ed2b4SDenis Malikov */
TpAllocWork(TP_WORK ** out,PTP_WORK_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2690405ed2b4SDenis Malikov NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2691405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON *environment )
2692405ed2b4SDenis Malikov {
2693405ed2b4SDenis Malikov struct threadpool_object *object;
2694405ed2b4SDenis Malikov struct threadpool *pool;
2695405ed2b4SDenis Malikov NTSTATUS status;
2696405ed2b4SDenis Malikov
2697405ed2b4SDenis Malikov TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2698405ed2b4SDenis Malikov
2699405ed2b4SDenis Malikov object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2700405ed2b4SDenis Malikov if (!object)
2701405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
2702405ed2b4SDenis Malikov
2703405ed2b4SDenis Malikov status = tp_threadpool_lock( &pool, environment );
2704405ed2b4SDenis Malikov if (status)
2705405ed2b4SDenis Malikov {
2706405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
2707405ed2b4SDenis Malikov return status;
2708405ed2b4SDenis Malikov }
2709405ed2b4SDenis Malikov
2710405ed2b4SDenis Malikov object->type = TP_OBJECT_TYPE_WORK;
2711405ed2b4SDenis Malikov object->u.work.callback = callback;
2712405ed2b4SDenis Malikov tp_object_initialize( object, pool, userdata, environment );
2713405ed2b4SDenis Malikov
2714405ed2b4SDenis Malikov *out = (TP_WORK *)object;
2715405ed2b4SDenis Malikov return STATUS_SUCCESS;
2716405ed2b4SDenis Malikov }
2717405ed2b4SDenis Malikov
2718405ed2b4SDenis Malikov /***********************************************************************
2719405ed2b4SDenis Malikov * TpCancelAsyncIoOperation (NTDLL.@)
2720405ed2b4SDenis Malikov */
TpCancelAsyncIoOperation(TP_IO * io)2721405ed2b4SDenis Malikov void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2722405ed2b4SDenis Malikov {
2723405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_IO( io );
2724405ed2b4SDenis Malikov
2725405ed2b4SDenis Malikov TRACE( "%p\n", io );
2726405ed2b4SDenis Malikov
2727405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->pool->cs );
2728405ed2b4SDenis Malikov
2729405ed2b4SDenis Malikov TRACE("pending_count %u.\n", this->u.io.pending_count);
2730405ed2b4SDenis Malikov
2731405ed2b4SDenis Malikov this->u.io.pending_count--;
2732405ed2b4SDenis Malikov if (object_is_finished( this, TRUE ))
2733405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &this->group_finished_event );
2734405ed2b4SDenis Malikov if (object_is_finished( this, FALSE ))
2735405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &this->finished_event );
2736405ed2b4SDenis Malikov
2737405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->pool->cs );
2738405ed2b4SDenis Malikov }
2739405ed2b4SDenis Malikov
2740405ed2b4SDenis Malikov /***********************************************************************
2741405ed2b4SDenis Malikov * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2742405ed2b4SDenis Malikov */
TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE * instance,CRITICAL_SECTION * crit)2743405ed2b4SDenis Malikov VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2744405ed2b4SDenis Malikov {
2745405ed2b4SDenis Malikov struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2746405ed2b4SDenis Malikov
2747405ed2b4SDenis Malikov TRACE( "%p %p\n", instance, crit );
2748405ed2b4SDenis Malikov
2749405ed2b4SDenis Malikov if (!this->cleanup.critical_section)
2750405ed2b4SDenis Malikov this->cleanup.critical_section = crit;
2751405ed2b4SDenis Malikov }
2752405ed2b4SDenis Malikov
2753405ed2b4SDenis Malikov /***********************************************************************
2754405ed2b4SDenis Malikov * TpCallbackMayRunLong (NTDLL.@)
2755405ed2b4SDenis Malikov */
TpCallbackMayRunLong(TP_CALLBACK_INSTANCE * instance)2756405ed2b4SDenis Malikov NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2757405ed2b4SDenis Malikov {
2758405ed2b4SDenis Malikov struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2759405ed2b4SDenis Malikov struct threadpool_object *object = this->object;
2760405ed2b4SDenis Malikov struct threadpool *pool;
2761405ed2b4SDenis Malikov NTSTATUS status = STATUS_SUCCESS;
2762405ed2b4SDenis Malikov
2763405ed2b4SDenis Malikov TRACE( "%p\n", instance );
2764405ed2b4SDenis Malikov
2765405ed2b4SDenis Malikov if (this->threadid != GetCurrentThreadId())
2766405ed2b4SDenis Malikov {
2767405ed2b4SDenis Malikov ERR("called from wrong thread, ignoring\n");
2768405ed2b4SDenis Malikov return STATUS_UNSUCCESSFUL; /* FIXME */
2769405ed2b4SDenis Malikov }
2770405ed2b4SDenis Malikov
2771405ed2b4SDenis Malikov if (this->may_run_long)
2772405ed2b4SDenis Malikov return STATUS_SUCCESS;
2773405ed2b4SDenis Malikov
2774405ed2b4SDenis Malikov pool = object->pool;
2775405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
2776405ed2b4SDenis Malikov
2777405ed2b4SDenis Malikov /* Start new worker threads if required. */
2778405ed2b4SDenis Malikov if (pool->num_busy_workers >= pool->num_workers)
2779405ed2b4SDenis Malikov {
2780405ed2b4SDenis Malikov if (pool->num_workers < pool->max_workers)
2781405ed2b4SDenis Malikov {
2782405ed2b4SDenis Malikov status = tp_new_worker_thread( pool );
2783405ed2b4SDenis Malikov }
2784405ed2b4SDenis Malikov else
2785405ed2b4SDenis Malikov {
2786405ed2b4SDenis Malikov status = STATUS_TOO_MANY_THREADS;
2787405ed2b4SDenis Malikov }
2788405ed2b4SDenis Malikov }
2789405ed2b4SDenis Malikov
2790405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
2791405ed2b4SDenis Malikov this->may_run_long = TRUE;
2792405ed2b4SDenis Malikov return status;
2793405ed2b4SDenis Malikov }
2794405ed2b4SDenis Malikov
2795405ed2b4SDenis Malikov /***********************************************************************
2796405ed2b4SDenis Malikov * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2797405ed2b4SDenis Malikov */
TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE mutex)2798405ed2b4SDenis Malikov VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2799405ed2b4SDenis Malikov {
2800405ed2b4SDenis Malikov struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2801405ed2b4SDenis Malikov
2802405ed2b4SDenis Malikov TRACE( "%p %p\n", instance, mutex );
2803405ed2b4SDenis Malikov
2804405ed2b4SDenis Malikov if (!this->cleanup.mutex)
2805405ed2b4SDenis Malikov this->cleanup.mutex = mutex;
2806405ed2b4SDenis Malikov }
2807405ed2b4SDenis Malikov
2808405ed2b4SDenis Malikov /***********************************************************************
2809405ed2b4SDenis Malikov * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2810405ed2b4SDenis Malikov */
TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE semaphore,DWORD count)2811405ed2b4SDenis Malikov VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2812405ed2b4SDenis Malikov {
2813405ed2b4SDenis Malikov struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2814405ed2b4SDenis Malikov
2815405ed2b4SDenis Malikov TRACE( "%p %p %lu\n", instance, semaphore, count );
2816405ed2b4SDenis Malikov
2817405ed2b4SDenis Malikov if (!this->cleanup.semaphore)
2818405ed2b4SDenis Malikov {
2819405ed2b4SDenis Malikov this->cleanup.semaphore = semaphore;
2820405ed2b4SDenis Malikov this->cleanup.semaphore_count = count;
2821405ed2b4SDenis Malikov }
2822405ed2b4SDenis Malikov }
2823405ed2b4SDenis Malikov
2824405ed2b4SDenis Malikov /***********************************************************************
2825405ed2b4SDenis Malikov * TpCallbackSetEventOnCompletion (NTDLL.@)
2826405ed2b4SDenis Malikov */
TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE event)2827405ed2b4SDenis Malikov VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2828405ed2b4SDenis Malikov {
2829405ed2b4SDenis Malikov struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2830405ed2b4SDenis Malikov
2831405ed2b4SDenis Malikov TRACE( "%p %p\n", instance, event );
2832405ed2b4SDenis Malikov
2833405ed2b4SDenis Malikov if (!this->cleanup.event)
2834405ed2b4SDenis Malikov this->cleanup.event = event;
2835405ed2b4SDenis Malikov }
2836405ed2b4SDenis Malikov
2837405ed2b4SDenis Malikov /***********************************************************************
2838405ed2b4SDenis Malikov * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2839405ed2b4SDenis Malikov */
TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE * instance,HMODULE module)2840405ed2b4SDenis Malikov VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2841405ed2b4SDenis Malikov {
2842405ed2b4SDenis Malikov struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2843405ed2b4SDenis Malikov
2844405ed2b4SDenis Malikov TRACE( "%p %p\n", instance, module );
2845405ed2b4SDenis Malikov
2846405ed2b4SDenis Malikov if (!this->cleanup.library)
2847405ed2b4SDenis Malikov this->cleanup.library = module;
2848405ed2b4SDenis Malikov }
2849405ed2b4SDenis Malikov
2850405ed2b4SDenis Malikov /***********************************************************************
2851405ed2b4SDenis Malikov * TpDisassociateCallback (NTDLL.@)
2852405ed2b4SDenis Malikov */
TpDisassociateCallback(TP_CALLBACK_INSTANCE * instance)2853405ed2b4SDenis Malikov VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2854405ed2b4SDenis Malikov {
2855405ed2b4SDenis Malikov struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2856405ed2b4SDenis Malikov struct threadpool_object *object = this->object;
2857405ed2b4SDenis Malikov struct threadpool *pool;
2858405ed2b4SDenis Malikov
2859405ed2b4SDenis Malikov TRACE( "%p\n", instance );
2860405ed2b4SDenis Malikov
2861405ed2b4SDenis Malikov if (this->threadid != GetCurrentThreadId())
2862405ed2b4SDenis Malikov {
2863405ed2b4SDenis Malikov ERR("called from wrong thread, ignoring\n");
2864405ed2b4SDenis Malikov return;
2865405ed2b4SDenis Malikov }
2866405ed2b4SDenis Malikov
2867405ed2b4SDenis Malikov if (!this->associated)
2868405ed2b4SDenis Malikov return;
2869405ed2b4SDenis Malikov
2870405ed2b4SDenis Malikov pool = object->pool;
2871405ed2b4SDenis Malikov RtlEnterCriticalSection( &pool->cs );
2872405ed2b4SDenis Malikov
2873405ed2b4SDenis Malikov object->num_associated_callbacks--;
2874405ed2b4SDenis Malikov if (object_is_finished( object, FALSE ))
2875405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &object->finished_event );
2876405ed2b4SDenis Malikov
2877405ed2b4SDenis Malikov RtlLeaveCriticalSection( &pool->cs );
2878405ed2b4SDenis Malikov this->associated = FALSE;
2879405ed2b4SDenis Malikov }
2880405ed2b4SDenis Malikov
2881405ed2b4SDenis Malikov /***********************************************************************
2882405ed2b4SDenis Malikov * TpIsTimerSet (NTDLL.@)
2883405ed2b4SDenis Malikov */
TpIsTimerSet(TP_TIMER * timer)2884405ed2b4SDenis Malikov BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2885405ed2b4SDenis Malikov {
2886405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_TIMER( timer );
2887405ed2b4SDenis Malikov
2888405ed2b4SDenis Malikov TRACE( "%p\n", timer );
2889405ed2b4SDenis Malikov
2890405ed2b4SDenis Malikov return this->u.timer.timer_set;
2891405ed2b4SDenis Malikov }
2892405ed2b4SDenis Malikov
2893405ed2b4SDenis Malikov /***********************************************************************
2894405ed2b4SDenis Malikov * TpPostWork (NTDLL.@)
2895405ed2b4SDenis Malikov */
TpPostWork(TP_WORK * work)2896405ed2b4SDenis Malikov VOID WINAPI TpPostWork( TP_WORK *work )
2897405ed2b4SDenis Malikov {
2898405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_WORK( work );
2899405ed2b4SDenis Malikov
2900405ed2b4SDenis Malikov TRACE( "%p\n", work );
2901405ed2b4SDenis Malikov
2902405ed2b4SDenis Malikov tp_object_submit( this, FALSE );
2903405ed2b4SDenis Malikov }
2904405ed2b4SDenis Malikov
2905405ed2b4SDenis Malikov /***********************************************************************
2906405ed2b4SDenis Malikov * TpReleaseCleanupGroup (NTDLL.@)
2907405ed2b4SDenis Malikov */
TpReleaseCleanupGroup(TP_CLEANUP_GROUP * group)2908405ed2b4SDenis Malikov VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2909405ed2b4SDenis Malikov {
2910405ed2b4SDenis Malikov struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2911405ed2b4SDenis Malikov
2912405ed2b4SDenis Malikov TRACE( "%p\n", group );
2913405ed2b4SDenis Malikov
2914405ed2b4SDenis Malikov tp_group_shutdown( this );
2915405ed2b4SDenis Malikov tp_group_release( this );
2916405ed2b4SDenis Malikov }
2917405ed2b4SDenis Malikov
2918405ed2b4SDenis Malikov /***********************************************************************
2919405ed2b4SDenis Malikov * TpReleaseCleanupGroupMembers (NTDLL.@)
2920405ed2b4SDenis Malikov */
TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP * group,BOOL cancel_pending,PVOID userdata)2921405ed2b4SDenis Malikov VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2922405ed2b4SDenis Malikov {
2923405ed2b4SDenis Malikov struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2924405ed2b4SDenis Malikov struct threadpool_object *object, *next;
2925405ed2b4SDenis Malikov struct list members;
2926405ed2b4SDenis Malikov
2927405ed2b4SDenis Malikov TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2928405ed2b4SDenis Malikov
2929405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->cs );
2930405ed2b4SDenis Malikov
2931405ed2b4SDenis Malikov /* Unset group, increase references, and mark objects for shutdown */
2932405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2933405ed2b4SDenis Malikov {
2934405ed2b4SDenis Malikov assert( object->group == this );
2935405ed2b4SDenis Malikov assert( object->is_group_member );
2936405ed2b4SDenis Malikov
2937405ed2b4SDenis Malikov if (InterlockedIncrement( &object->refcount ) == 1)
2938405ed2b4SDenis Malikov {
2939405ed2b4SDenis Malikov /* Object is basically already destroyed, but group reference
2940405ed2b4SDenis Malikov * was not deleted yet. We can safely ignore this object. */
2941405ed2b4SDenis Malikov InterlockedDecrement( &object->refcount );
2942405ed2b4SDenis Malikov list_remove( &object->group_entry );
2943405ed2b4SDenis Malikov object->is_group_member = FALSE;
2944405ed2b4SDenis Malikov continue;
2945405ed2b4SDenis Malikov }
2946405ed2b4SDenis Malikov
2947405ed2b4SDenis Malikov object->is_group_member = FALSE;
2948405ed2b4SDenis Malikov tp_object_prepare_shutdown( object );
2949405ed2b4SDenis Malikov }
2950405ed2b4SDenis Malikov
2951405ed2b4SDenis Malikov /* Move members to a new temporary list */
2952405ed2b4SDenis Malikov list_init( &members );
2953405ed2b4SDenis Malikov list_move_tail( &members, &this->members );
2954405ed2b4SDenis Malikov
2955405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->cs );
2956405ed2b4SDenis Malikov
2957405ed2b4SDenis Malikov /* Cancel pending callbacks if requested */
2958405ed2b4SDenis Malikov if (cancel_pending)
2959405ed2b4SDenis Malikov {
2960405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2961405ed2b4SDenis Malikov {
2962405ed2b4SDenis Malikov tp_object_cancel( object );
2963405ed2b4SDenis Malikov }
2964405ed2b4SDenis Malikov }
2965405ed2b4SDenis Malikov
2966405ed2b4SDenis Malikov /* Wait for remaining callbacks to finish */
2967405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2968405ed2b4SDenis Malikov {
2969405ed2b4SDenis Malikov tp_object_wait( object, TRUE );
2970405ed2b4SDenis Malikov
2971405ed2b4SDenis Malikov if (!object->shutdown)
2972405ed2b4SDenis Malikov {
2973405ed2b4SDenis Malikov /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2974405ed2b4SDenis Malikov if (cancel_pending && object->group_cancel_callback)
2975405ed2b4SDenis Malikov {
2976405ed2b4SDenis Malikov TRACE( "executing group cancel callback %p(%p, %p)\n",
2977405ed2b4SDenis Malikov object->group_cancel_callback, object->userdata, userdata );
2978405ed2b4SDenis Malikov object->group_cancel_callback( object->userdata, userdata );
2979405ed2b4SDenis Malikov TRACE( "callback %p returned\n", object->group_cancel_callback );
2980405ed2b4SDenis Malikov }
2981405ed2b4SDenis Malikov
2982405ed2b4SDenis Malikov if (object->type != TP_OBJECT_TYPE_SIMPLE)
2983405ed2b4SDenis Malikov tp_object_release( object );
2984405ed2b4SDenis Malikov }
2985405ed2b4SDenis Malikov
2986405ed2b4SDenis Malikov object->shutdown = TRUE;
2987405ed2b4SDenis Malikov tp_object_release( object );
2988405ed2b4SDenis Malikov }
2989405ed2b4SDenis Malikov }
2990405ed2b4SDenis Malikov
2991405ed2b4SDenis Malikov /***********************************************************************
2992405ed2b4SDenis Malikov * TpReleaseIoCompletion (NTDLL.@)
2993405ed2b4SDenis Malikov */
TpReleaseIoCompletion(TP_IO * io)2994405ed2b4SDenis Malikov void WINAPI TpReleaseIoCompletion( TP_IO *io )
2995405ed2b4SDenis Malikov {
2996405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_IO( io );
2997405ed2b4SDenis Malikov BOOL can_destroy;
2998405ed2b4SDenis Malikov
2999405ed2b4SDenis Malikov TRACE( "%p\n", io );
3000405ed2b4SDenis Malikov
3001405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->pool->cs );
3002405ed2b4SDenis Malikov this->u.io.shutting_down = TRUE;
3003405ed2b4SDenis Malikov can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3004405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->pool->cs );
3005405ed2b4SDenis Malikov
3006405ed2b4SDenis Malikov if (can_destroy)
3007405ed2b4SDenis Malikov {
3008405ed2b4SDenis Malikov tp_object_prepare_shutdown( this );
3009405ed2b4SDenis Malikov this->shutdown = TRUE;
3010405ed2b4SDenis Malikov tp_object_release( this );
3011405ed2b4SDenis Malikov }
3012405ed2b4SDenis Malikov }
3013405ed2b4SDenis Malikov
3014405ed2b4SDenis Malikov /***********************************************************************
3015405ed2b4SDenis Malikov * TpReleasePool (NTDLL.@)
3016405ed2b4SDenis Malikov */
TpReleasePool(TP_POOL * pool)3017405ed2b4SDenis Malikov VOID WINAPI TpReleasePool( TP_POOL *pool )
3018405ed2b4SDenis Malikov {
3019405ed2b4SDenis Malikov struct threadpool *this = impl_from_TP_POOL( pool );
3020405ed2b4SDenis Malikov
3021405ed2b4SDenis Malikov TRACE( "%p\n", pool );
3022405ed2b4SDenis Malikov
3023405ed2b4SDenis Malikov tp_threadpool_shutdown( this );
3024405ed2b4SDenis Malikov tp_threadpool_release( this );
3025405ed2b4SDenis Malikov }
3026405ed2b4SDenis Malikov
3027405ed2b4SDenis Malikov /***********************************************************************
3028405ed2b4SDenis Malikov * TpReleaseTimer (NTDLL.@)
3029405ed2b4SDenis Malikov */
TpReleaseTimer(TP_TIMER * timer)3030405ed2b4SDenis Malikov VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
3031405ed2b4SDenis Malikov {
3032405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_TIMER( timer );
3033405ed2b4SDenis Malikov
3034405ed2b4SDenis Malikov TRACE( "%p\n", timer );
3035405ed2b4SDenis Malikov
3036405ed2b4SDenis Malikov tp_object_prepare_shutdown( this );
3037405ed2b4SDenis Malikov this->shutdown = TRUE;
3038405ed2b4SDenis Malikov tp_object_release( this );
3039405ed2b4SDenis Malikov }
3040405ed2b4SDenis Malikov
3041405ed2b4SDenis Malikov /***********************************************************************
3042405ed2b4SDenis Malikov * TpReleaseWait (NTDLL.@)
3043405ed2b4SDenis Malikov */
TpReleaseWait(TP_WAIT * wait)3044405ed2b4SDenis Malikov VOID WINAPI TpReleaseWait( TP_WAIT *wait )
3045405ed2b4SDenis Malikov {
3046405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_WAIT( wait );
3047405ed2b4SDenis Malikov
3048405ed2b4SDenis Malikov TRACE( "%p\n", wait );
3049405ed2b4SDenis Malikov
3050405ed2b4SDenis Malikov tp_object_prepare_shutdown( this );
3051405ed2b4SDenis Malikov this->shutdown = TRUE;
3052405ed2b4SDenis Malikov tp_object_release( this );
3053405ed2b4SDenis Malikov }
3054405ed2b4SDenis Malikov
3055405ed2b4SDenis Malikov /***********************************************************************
3056405ed2b4SDenis Malikov * TpReleaseWork (NTDLL.@)
3057405ed2b4SDenis Malikov */
TpReleaseWork(TP_WORK * work)3058405ed2b4SDenis Malikov VOID WINAPI TpReleaseWork( TP_WORK *work )
3059405ed2b4SDenis Malikov {
3060405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_WORK( work );
3061405ed2b4SDenis Malikov
3062405ed2b4SDenis Malikov TRACE( "%p\n", work );
3063405ed2b4SDenis Malikov
3064405ed2b4SDenis Malikov tp_object_prepare_shutdown( this );
3065405ed2b4SDenis Malikov this->shutdown = TRUE;
3066405ed2b4SDenis Malikov tp_object_release( this );
3067405ed2b4SDenis Malikov }
3068405ed2b4SDenis Malikov
3069405ed2b4SDenis Malikov /***********************************************************************
3070405ed2b4SDenis Malikov * TpSetPoolMaxThreads (NTDLL.@)
3071405ed2b4SDenis Malikov */
TpSetPoolMaxThreads(TP_POOL * pool,DWORD maximum)3072405ed2b4SDenis Malikov VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
3073405ed2b4SDenis Malikov {
3074405ed2b4SDenis Malikov struct threadpool *this = impl_from_TP_POOL( pool );
3075405ed2b4SDenis Malikov
3076405ed2b4SDenis Malikov TRACE( "%p %lu\n", pool, maximum );
3077405ed2b4SDenis Malikov
3078405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->cs );
3079405ed2b4SDenis Malikov this->max_workers = max( maximum, 1 );
3080405ed2b4SDenis Malikov this->min_workers = min( this->min_workers, this->max_workers );
3081405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->cs );
3082405ed2b4SDenis Malikov }
3083405ed2b4SDenis Malikov
3084405ed2b4SDenis Malikov /***********************************************************************
3085405ed2b4SDenis Malikov * TpSetPoolMinThreads (NTDLL.@)
3086405ed2b4SDenis Malikov */
TpSetPoolMinThreads(TP_POOL * pool,DWORD minimum)3087405ed2b4SDenis Malikov BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
3088405ed2b4SDenis Malikov {
3089405ed2b4SDenis Malikov struct threadpool *this = impl_from_TP_POOL( pool );
3090405ed2b4SDenis Malikov NTSTATUS status = STATUS_SUCCESS;
3091405ed2b4SDenis Malikov
3092405ed2b4SDenis Malikov TRACE( "%p %lu\n", pool, minimum );
3093405ed2b4SDenis Malikov
3094405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->cs );
3095405ed2b4SDenis Malikov
3096405ed2b4SDenis Malikov while (this->num_workers < minimum)
3097405ed2b4SDenis Malikov {
3098405ed2b4SDenis Malikov status = tp_new_worker_thread( this );
3099405ed2b4SDenis Malikov if (status != STATUS_SUCCESS)
3100405ed2b4SDenis Malikov break;
3101405ed2b4SDenis Malikov }
3102405ed2b4SDenis Malikov
3103405ed2b4SDenis Malikov if (status == STATUS_SUCCESS)
3104405ed2b4SDenis Malikov {
3105405ed2b4SDenis Malikov this->min_workers = minimum;
3106405ed2b4SDenis Malikov this->max_workers = max( this->min_workers, this->max_workers );
3107405ed2b4SDenis Malikov }
3108405ed2b4SDenis Malikov
3109405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->cs );
3110405ed2b4SDenis Malikov return !status;
3111405ed2b4SDenis Malikov }
3112405ed2b4SDenis Malikov
3113405ed2b4SDenis Malikov /***********************************************************************
3114405ed2b4SDenis Malikov * TpSetTimer (NTDLL.@)
3115405ed2b4SDenis Malikov */
TpSetTimer(TP_TIMER * timer,LARGE_INTEGER * timeout,LONG period,LONG window_length)3116405ed2b4SDenis Malikov VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3117405ed2b4SDenis Malikov {
3118405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_TIMER( timer );
3119405ed2b4SDenis Malikov struct threadpool_object *other_timer;
3120405ed2b4SDenis Malikov BOOL submit_timer = FALSE;
3121405ed2b4SDenis Malikov ULONGLONG timestamp;
3122405ed2b4SDenis Malikov
3123405ed2b4SDenis Malikov TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3124405ed2b4SDenis Malikov
3125405ed2b4SDenis Malikov RtlEnterCriticalSection( &timerqueue.cs );
3126405ed2b4SDenis Malikov
3127405ed2b4SDenis Malikov assert( this->u.timer.timer_initialized );
3128405ed2b4SDenis Malikov this->u.timer.timer_set = timeout != NULL;
3129405ed2b4SDenis Malikov
3130405ed2b4SDenis Malikov /* Convert relative timeout to absolute timestamp and handle a timeout
3131405ed2b4SDenis Malikov * of zero, which means that the timer is submitted immediately. */
3132405ed2b4SDenis Malikov if (timeout)
3133405ed2b4SDenis Malikov {
3134405ed2b4SDenis Malikov timestamp = timeout->QuadPart;
3135405ed2b4SDenis Malikov if ((LONGLONG)timestamp < 0)
3136405ed2b4SDenis Malikov {
3137405ed2b4SDenis Malikov LARGE_INTEGER now;
3138405ed2b4SDenis Malikov NtQuerySystemTime( &now );
3139405ed2b4SDenis Malikov timestamp = now.QuadPart - timestamp;
3140405ed2b4SDenis Malikov }
3141405ed2b4SDenis Malikov else if (!timestamp)
3142405ed2b4SDenis Malikov {
3143405ed2b4SDenis Malikov if (!period)
3144405ed2b4SDenis Malikov timeout = NULL;
3145405ed2b4SDenis Malikov else
3146405ed2b4SDenis Malikov {
3147405ed2b4SDenis Malikov LARGE_INTEGER now;
3148405ed2b4SDenis Malikov NtQuerySystemTime( &now );
3149405ed2b4SDenis Malikov timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3150405ed2b4SDenis Malikov }
3151405ed2b4SDenis Malikov submit_timer = TRUE;
3152405ed2b4SDenis Malikov }
3153405ed2b4SDenis Malikov }
3154405ed2b4SDenis Malikov
3155405ed2b4SDenis Malikov /* First remove existing timeout. */
3156405ed2b4SDenis Malikov if (this->u.timer.timer_pending)
3157405ed2b4SDenis Malikov {
3158405ed2b4SDenis Malikov list_remove( &this->u.timer.timer_entry );
3159405ed2b4SDenis Malikov this->u.timer.timer_pending = FALSE;
3160405ed2b4SDenis Malikov }
3161405ed2b4SDenis Malikov
3162405ed2b4SDenis Malikov /* If the timer was enabled, then add it back to the queue. */
3163405ed2b4SDenis Malikov if (timeout)
3164405ed2b4SDenis Malikov {
3165405ed2b4SDenis Malikov this->u.timer.timeout = timestamp;
3166405ed2b4SDenis Malikov this->u.timer.period = period;
3167405ed2b4SDenis Malikov this->u.timer.window_length = window_length;
3168405ed2b4SDenis Malikov
3169405ed2b4SDenis Malikov LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3170405ed2b4SDenis Malikov struct threadpool_object, u.timer.timer_entry )
3171405ed2b4SDenis Malikov {
3172405ed2b4SDenis Malikov assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3173405ed2b4SDenis Malikov if (this->u.timer.timeout < other_timer->u.timer.timeout)
3174405ed2b4SDenis Malikov break;
3175405ed2b4SDenis Malikov }
3176405ed2b4SDenis Malikov list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3177405ed2b4SDenis Malikov
3178405ed2b4SDenis Malikov /* Wake up the timer thread when the timeout has to be updated. */
3179405ed2b4SDenis Malikov if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3180405ed2b4SDenis Malikov RtlWakeAllConditionVariable( &timerqueue.update_event );
3181405ed2b4SDenis Malikov
3182405ed2b4SDenis Malikov this->u.timer.timer_pending = TRUE;
3183405ed2b4SDenis Malikov }
3184405ed2b4SDenis Malikov
3185405ed2b4SDenis Malikov RtlLeaveCriticalSection( &timerqueue.cs );
3186405ed2b4SDenis Malikov
3187405ed2b4SDenis Malikov if (submit_timer)
3188405ed2b4SDenis Malikov tp_object_submit( this, FALSE );
3189405ed2b4SDenis Malikov }
3190405ed2b4SDenis Malikov
3191405ed2b4SDenis Malikov /***********************************************************************
3192405ed2b4SDenis Malikov * TpSetWait (NTDLL.@)
3193405ed2b4SDenis Malikov */
TpSetWait(TP_WAIT * wait,HANDLE handle,LARGE_INTEGER * timeout)3194405ed2b4SDenis Malikov VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3195405ed2b4SDenis Malikov {
3196405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_WAIT( wait );
3197405ed2b4SDenis Malikov ULONGLONG timestamp = MAXLONGLONG;
3198405ed2b4SDenis Malikov
3199405ed2b4SDenis Malikov TRACE( "%p %p %p\n", wait, handle, timeout );
3200405ed2b4SDenis Malikov
3201405ed2b4SDenis Malikov RtlEnterCriticalSection( &waitqueue.cs );
3202405ed2b4SDenis Malikov
3203405ed2b4SDenis Malikov assert( this->u.wait.bucket );
3204405ed2b4SDenis Malikov this->u.wait.handle = handle;
3205405ed2b4SDenis Malikov
3206405ed2b4SDenis Malikov if (handle || this->u.wait.wait_pending)
3207405ed2b4SDenis Malikov {
3208405ed2b4SDenis Malikov struct waitqueue_bucket *bucket = this->u.wait.bucket;
3209405ed2b4SDenis Malikov list_remove( &this->u.wait.wait_entry );
3210405ed2b4SDenis Malikov
3211405ed2b4SDenis Malikov /* Convert relative timeout to absolute timestamp. */
3212405ed2b4SDenis Malikov if (handle && timeout)
3213405ed2b4SDenis Malikov {
3214405ed2b4SDenis Malikov timestamp = timeout->QuadPart;
3215405ed2b4SDenis Malikov if ((LONGLONG)timestamp < 0)
3216405ed2b4SDenis Malikov {
3217405ed2b4SDenis Malikov LARGE_INTEGER now;
3218405ed2b4SDenis Malikov NtQuerySystemTime( &now );
3219405ed2b4SDenis Malikov timestamp = now.QuadPart - timestamp;
3220405ed2b4SDenis Malikov }
3221405ed2b4SDenis Malikov }
3222405ed2b4SDenis Malikov
3223405ed2b4SDenis Malikov /* Add wait object back into one of the queues. */
3224405ed2b4SDenis Malikov if (handle)
3225405ed2b4SDenis Malikov {
3226405ed2b4SDenis Malikov list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3227405ed2b4SDenis Malikov this->u.wait.wait_pending = TRUE;
3228405ed2b4SDenis Malikov this->u.wait.timeout = timestamp;
3229405ed2b4SDenis Malikov }
3230405ed2b4SDenis Malikov else
3231405ed2b4SDenis Malikov {
3232405ed2b4SDenis Malikov list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3233405ed2b4SDenis Malikov this->u.wait.wait_pending = FALSE;
3234405ed2b4SDenis Malikov }
3235405ed2b4SDenis Malikov
3236405ed2b4SDenis Malikov /* Wake up the wait queue thread. */
3237405ed2b4SDenis Malikov NtSetEvent( bucket->update_event, NULL );
3238405ed2b4SDenis Malikov }
3239405ed2b4SDenis Malikov
3240405ed2b4SDenis Malikov RtlLeaveCriticalSection( &waitqueue.cs );
3241405ed2b4SDenis Malikov }
3242405ed2b4SDenis Malikov
3243405ed2b4SDenis Malikov /***********************************************************************
3244405ed2b4SDenis Malikov * TpSimpleTryPost (NTDLL.@)
3245405ed2b4SDenis Malikov */
TpSimpleTryPost(PTP_SIMPLE_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)3246405ed2b4SDenis Malikov NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3247405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON *environment )
3248405ed2b4SDenis Malikov {
3249405ed2b4SDenis Malikov struct threadpool_object *object;
3250405ed2b4SDenis Malikov struct threadpool *pool;
3251405ed2b4SDenis Malikov NTSTATUS status;
3252405ed2b4SDenis Malikov
3253405ed2b4SDenis Malikov TRACE( "%p %p %p\n", callback, userdata, environment );
3254405ed2b4SDenis Malikov
3255405ed2b4SDenis Malikov object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3256405ed2b4SDenis Malikov if (!object)
3257405ed2b4SDenis Malikov return STATUS_NO_MEMORY;
3258405ed2b4SDenis Malikov
3259405ed2b4SDenis Malikov status = tp_threadpool_lock( &pool, environment );
3260405ed2b4SDenis Malikov if (status)
3261405ed2b4SDenis Malikov {
3262405ed2b4SDenis Malikov RtlFreeHeap( GetProcessHeap(), 0, object );
3263405ed2b4SDenis Malikov return status;
3264405ed2b4SDenis Malikov }
3265405ed2b4SDenis Malikov
3266405ed2b4SDenis Malikov object->type = TP_OBJECT_TYPE_SIMPLE;
3267405ed2b4SDenis Malikov object->u.simple.callback = callback;
3268405ed2b4SDenis Malikov tp_object_initialize( object, pool, userdata, environment );
3269405ed2b4SDenis Malikov
3270405ed2b4SDenis Malikov return STATUS_SUCCESS;
3271405ed2b4SDenis Malikov }
3272405ed2b4SDenis Malikov
3273405ed2b4SDenis Malikov /***********************************************************************
3274405ed2b4SDenis Malikov * TpStartAsyncIoOperation (NTDLL.@)
3275405ed2b4SDenis Malikov */
TpStartAsyncIoOperation(TP_IO * io)3276405ed2b4SDenis Malikov void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3277405ed2b4SDenis Malikov {
3278405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_IO( io );
3279405ed2b4SDenis Malikov
3280405ed2b4SDenis Malikov TRACE( "%p\n", io );
3281405ed2b4SDenis Malikov
3282405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->pool->cs );
3283405ed2b4SDenis Malikov
3284405ed2b4SDenis Malikov this->u.io.pending_count++;
3285405ed2b4SDenis Malikov
3286405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->pool->cs );
3287405ed2b4SDenis Malikov }
3288405ed2b4SDenis Malikov
3289405ed2b4SDenis Malikov /***********************************************************************
3290405ed2b4SDenis Malikov * TpWaitForIoCompletion (NTDLL.@)
3291405ed2b4SDenis Malikov */
TpWaitForIoCompletion(TP_IO * io,BOOL cancel_pending)3292405ed2b4SDenis Malikov void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3293405ed2b4SDenis Malikov {
3294405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_IO( io );
3295405ed2b4SDenis Malikov
3296405ed2b4SDenis Malikov TRACE( "%p %d\n", io, cancel_pending );
3297405ed2b4SDenis Malikov
3298405ed2b4SDenis Malikov if (cancel_pending)
3299405ed2b4SDenis Malikov tp_object_cancel( this );
3300405ed2b4SDenis Malikov tp_object_wait( this, FALSE );
3301405ed2b4SDenis Malikov }
3302405ed2b4SDenis Malikov
3303405ed2b4SDenis Malikov /***********************************************************************
3304405ed2b4SDenis Malikov * TpWaitForTimer (NTDLL.@)
3305405ed2b4SDenis Malikov */
TpWaitForTimer(TP_TIMER * timer,BOOL cancel_pending)3306405ed2b4SDenis Malikov VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3307405ed2b4SDenis Malikov {
3308405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_TIMER( timer );
3309405ed2b4SDenis Malikov
3310405ed2b4SDenis Malikov TRACE( "%p %d\n", timer, cancel_pending );
3311405ed2b4SDenis Malikov
3312405ed2b4SDenis Malikov if (cancel_pending)
3313405ed2b4SDenis Malikov tp_object_cancel( this );
3314405ed2b4SDenis Malikov tp_object_wait( this, FALSE );
3315405ed2b4SDenis Malikov }
3316405ed2b4SDenis Malikov
3317405ed2b4SDenis Malikov /***********************************************************************
3318405ed2b4SDenis Malikov * TpWaitForWait (NTDLL.@)
3319405ed2b4SDenis Malikov */
TpWaitForWait(TP_WAIT * wait,BOOL cancel_pending)3320405ed2b4SDenis Malikov VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3321405ed2b4SDenis Malikov {
3322405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_WAIT( wait );
3323405ed2b4SDenis Malikov
3324405ed2b4SDenis Malikov TRACE( "%p %d\n", wait, cancel_pending );
3325405ed2b4SDenis Malikov
3326405ed2b4SDenis Malikov if (cancel_pending)
3327405ed2b4SDenis Malikov tp_object_cancel( this );
3328405ed2b4SDenis Malikov tp_object_wait( this, FALSE );
3329405ed2b4SDenis Malikov }
3330405ed2b4SDenis Malikov
3331405ed2b4SDenis Malikov /***********************************************************************
3332405ed2b4SDenis Malikov * TpWaitForWork (NTDLL.@)
3333405ed2b4SDenis Malikov */
TpWaitForWork(TP_WORK * work,BOOL cancel_pending)3334405ed2b4SDenis Malikov VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3335405ed2b4SDenis Malikov {
3336405ed2b4SDenis Malikov struct threadpool_object *this = impl_from_TP_WORK( work );
3337405ed2b4SDenis Malikov
3338405ed2b4SDenis Malikov TRACE( "%p %u\n", work, cancel_pending );
3339405ed2b4SDenis Malikov
3340405ed2b4SDenis Malikov if (cancel_pending)
3341405ed2b4SDenis Malikov tp_object_cancel( this );
3342405ed2b4SDenis Malikov tp_object_wait( this, FALSE );
3343405ed2b4SDenis Malikov }
3344405ed2b4SDenis Malikov
3345405ed2b4SDenis Malikov /***********************************************************************
3346405ed2b4SDenis Malikov * TpSetPoolStackInformation (NTDLL.@)
3347405ed2b4SDenis Malikov */
TpSetPoolStackInformation(TP_POOL * pool,TP_POOL_STACK_INFORMATION * stack_info)3348405ed2b4SDenis Malikov NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3349405ed2b4SDenis Malikov {
3350405ed2b4SDenis Malikov struct threadpool *this = impl_from_TP_POOL( pool );
3351405ed2b4SDenis Malikov
3352405ed2b4SDenis Malikov TRACE( "%p %p\n", pool, stack_info );
3353405ed2b4SDenis Malikov
3354405ed2b4SDenis Malikov if (!stack_info)
3355405ed2b4SDenis Malikov return STATUS_INVALID_PARAMETER;
3356405ed2b4SDenis Malikov
3357405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->cs );
3358405ed2b4SDenis Malikov this->stack_info = *stack_info;
3359405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->cs );
3360405ed2b4SDenis Malikov
3361405ed2b4SDenis Malikov return STATUS_SUCCESS;
3362405ed2b4SDenis Malikov }
3363405ed2b4SDenis Malikov
3364405ed2b4SDenis Malikov /***********************************************************************
3365405ed2b4SDenis Malikov * TpQueryPoolStackInformation (NTDLL.@)
3366405ed2b4SDenis Malikov */
TpQueryPoolStackInformation(TP_POOL * pool,TP_POOL_STACK_INFORMATION * stack_info)3367405ed2b4SDenis Malikov NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3368405ed2b4SDenis Malikov {
3369405ed2b4SDenis Malikov struct threadpool *this = impl_from_TP_POOL( pool );
3370405ed2b4SDenis Malikov
3371405ed2b4SDenis Malikov TRACE( "%p %p\n", pool, stack_info );
3372405ed2b4SDenis Malikov
3373405ed2b4SDenis Malikov if (!stack_info)
3374405ed2b4SDenis Malikov return STATUS_INVALID_PARAMETER;
3375405ed2b4SDenis Malikov
3376405ed2b4SDenis Malikov RtlEnterCriticalSection( &this->cs );
3377405ed2b4SDenis Malikov *stack_info = this->stack_info;
3378405ed2b4SDenis Malikov RtlLeaveCriticalSection( &this->cs );
3379405ed2b4SDenis Malikov
3380405ed2b4SDenis Malikov return STATUS_SUCCESS;
3381405ed2b4SDenis Malikov }
3382405ed2b4SDenis Malikov
rtl_wait_callback(TP_CALLBACK_INSTANCE * instance,void * userdata,TP_WAIT * wait,TP_WAIT_RESULT result)3383405ed2b4SDenis Malikov static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3384405ed2b4SDenis Malikov {
3385405ed2b4SDenis Malikov struct threadpool_object *object = impl_from_TP_WAIT(wait);
3386405ed2b4SDenis Malikov object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3387405ed2b4SDenis Malikov }
3388405ed2b4SDenis Malikov
3389405ed2b4SDenis Malikov /***********************************************************************
3390405ed2b4SDenis Malikov * RtlRegisterWait (NTDLL.@)
3391405ed2b4SDenis Malikov *
3392405ed2b4SDenis Malikov * Registers a wait for a handle to become signaled.
3393405ed2b4SDenis Malikov *
3394405ed2b4SDenis Malikov * PARAMS
3395405ed2b4SDenis Malikov * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3396405ed2b4SDenis Malikov * Object [I] Object to wait to become signaled.
3397405ed2b4SDenis Malikov * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3398405ed2b4SDenis Malikov * Context [I] Context to pass to the callback function when it is executed.
3399405ed2b4SDenis Malikov * Milliseconds [I] Number of milliseconds to wait before timing out.
3400405ed2b4SDenis Malikov * Flags [I] Flags. See notes.
3401405ed2b4SDenis Malikov *
3402405ed2b4SDenis Malikov * RETURNS
3403405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
3404405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
3405405ed2b4SDenis Malikov *
3406405ed2b4SDenis Malikov * NOTES
3407405ed2b4SDenis Malikov * Flags can be one or more of the following:
3408405ed2b4SDenis Malikov *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3409405ed2b4SDenis Malikov *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3410405ed2b4SDenis Malikov *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3411405ed2b4SDenis Malikov *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3412405ed2b4SDenis Malikov *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3413405ed2b4SDenis Malikov */
RtlRegisterWait(HANDLE * out,HANDLE handle,RTL_WAITORTIMERCALLBACKFUNC callback,void * context,ULONG milliseconds,ULONG flags)3414405ed2b4SDenis Malikov NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3415405ed2b4SDenis Malikov void *context, ULONG milliseconds, ULONG flags )
3416405ed2b4SDenis Malikov {
3417405ed2b4SDenis Malikov struct threadpool_object *object;
3418405ed2b4SDenis Malikov TP_CALLBACK_ENVIRON environment;
3419405ed2b4SDenis Malikov LARGE_INTEGER timeout;
3420405ed2b4SDenis Malikov NTSTATUS status;
3421405ed2b4SDenis Malikov TP_WAIT *wait;
3422405ed2b4SDenis Malikov
3423405ed2b4SDenis Malikov TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3424405ed2b4SDenis Malikov out, handle, callback, context, milliseconds, flags );
3425405ed2b4SDenis Malikov
3426405ed2b4SDenis Malikov memset( &environment, 0, sizeof(environment) );
3427405ed2b4SDenis Malikov environment.Version = 1;
3428405ed2b4SDenis Malikov environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3429405ed2b4SDenis Malikov environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3430405ed2b4SDenis Malikov
3431405ed2b4SDenis Malikov flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3432405ed2b4SDenis Malikov if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3433405ed2b4SDenis Malikov return status;
3434405ed2b4SDenis Malikov
3435405ed2b4SDenis Malikov object = impl_from_TP_WAIT(wait);
3436405ed2b4SDenis Malikov object->u.wait.rtl_callback = callback;
3437405ed2b4SDenis Malikov
3438405ed2b4SDenis Malikov RtlEnterCriticalSection( &waitqueue.cs );
3439405ed2b4SDenis Malikov TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3440405ed2b4SDenis Malikov
3441405ed2b4SDenis Malikov *out = object;
3442405ed2b4SDenis Malikov RtlLeaveCriticalSection( &waitqueue.cs );
3443405ed2b4SDenis Malikov
3444405ed2b4SDenis Malikov return STATUS_SUCCESS;
3445405ed2b4SDenis Malikov }
3446405ed2b4SDenis Malikov
3447405ed2b4SDenis Malikov /***********************************************************************
3448405ed2b4SDenis Malikov * RtlDeregisterWaitEx (NTDLL.@)
3449405ed2b4SDenis Malikov *
3450405ed2b4SDenis Malikov * Cancels a wait operation and frees the resources associated with calling
3451405ed2b4SDenis Malikov * RtlRegisterWait().
3452405ed2b4SDenis Malikov *
3453405ed2b4SDenis Malikov * PARAMS
3454405ed2b4SDenis Malikov * WaitObject [I] Handle to the wait object to free.
3455405ed2b4SDenis Malikov *
3456405ed2b4SDenis Malikov * RETURNS
3457405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
3458405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
3459405ed2b4SDenis Malikov */
RtlDeregisterWaitEx(HANDLE handle,HANDLE event)3460405ed2b4SDenis Malikov NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3461405ed2b4SDenis Malikov {
3462405ed2b4SDenis Malikov struct threadpool_object *object = handle;
3463405ed2b4SDenis Malikov NTSTATUS status;
3464405ed2b4SDenis Malikov
3465405ed2b4SDenis Malikov TRACE( "handle %p, event %p\n", handle, event );
3466405ed2b4SDenis Malikov
3467405ed2b4SDenis Malikov if (!object) return STATUS_INVALID_HANDLE;
3468405ed2b4SDenis Malikov
3469405ed2b4SDenis Malikov TpSetWait( (TP_WAIT *)object, NULL, NULL );
3470405ed2b4SDenis Malikov
3471405ed2b4SDenis Malikov if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3472405ed2b4SDenis Malikov else
3473405ed2b4SDenis Malikov {
3474405ed2b4SDenis Malikov assert( object->completed_event == NULL );
3475405ed2b4SDenis Malikov object->completed_event = event;
3476405ed2b4SDenis Malikov }
3477405ed2b4SDenis Malikov
3478405ed2b4SDenis Malikov RtlEnterCriticalSection( &object->pool->cs );
3479405ed2b4SDenis Malikov if (object->num_pending_callbacks + object->num_running_callbacks
3480405ed2b4SDenis Malikov + object->num_associated_callbacks) status = STATUS_PENDING;
3481405ed2b4SDenis Malikov else status = STATUS_SUCCESS;
3482405ed2b4SDenis Malikov RtlLeaveCriticalSection( &object->pool->cs );
3483405ed2b4SDenis Malikov
3484405ed2b4SDenis Malikov TpReleaseWait( (TP_WAIT *)object );
3485405ed2b4SDenis Malikov return status;
3486405ed2b4SDenis Malikov }
3487405ed2b4SDenis Malikov
3488405ed2b4SDenis Malikov /***********************************************************************
3489405ed2b4SDenis Malikov * RtlDeregisterWait (NTDLL.@)
3490405ed2b4SDenis Malikov *
3491405ed2b4SDenis Malikov * Cancels a wait operation and frees the resources associated with calling
3492405ed2b4SDenis Malikov * RtlRegisterWait().
3493405ed2b4SDenis Malikov *
3494405ed2b4SDenis Malikov * PARAMS
3495405ed2b4SDenis Malikov * WaitObject [I] Handle to the wait object to free.
3496405ed2b4SDenis Malikov *
3497405ed2b4SDenis Malikov * RETURNS
3498405ed2b4SDenis Malikov * Success: STATUS_SUCCESS.
3499405ed2b4SDenis Malikov * Failure: Any NTSTATUS code.
3500405ed2b4SDenis Malikov */
RtlDeregisterWait(HANDLE WaitHandle)3501405ed2b4SDenis Malikov NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3502405ed2b4SDenis Malikov {
3503405ed2b4SDenis Malikov return RtlDeregisterWaitEx(WaitHandle, NULL);
3504405ed2b4SDenis Malikov }
3505*0bf42067SJustin Miller
3506*0bf42067SJustin Miller #ifdef __REACTOS__
3507*0bf42067SJustin Miller VOID
3508*0bf42067SJustin Miller NTAPI
RtlpInitializeThreadPooling(VOID)3509*0bf42067SJustin Miller RtlpInitializeThreadPooling(
3510*0bf42067SJustin Miller VOID)
3511*0bf42067SJustin Miller {
3512*0bf42067SJustin Miller RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs);
3513*0bf42067SJustin Miller RtlInitializeCriticalSection(&timerqueue.cs);
3514*0bf42067SJustin Miller RtlInitializeCriticalSection(&waitqueue.cs);
3515*0bf42067SJustin Miller RtlInitializeCriticalSection(&ioqueue.cs);
3516*0bf42067SJustin Miller }
3517*0bf42067SJustin Miller #endif
3518