1 /*
2 * Thread pooling
3 *
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20 */
21
22
23 #ifdef __REACTOS__
24 #include <rtl_vista.h>
25 #define NDEBUG
26 #include "wine/list.h"
27 #include <debug.h>
28
29 #define ERR(fmt, ...) DPRINT1(fmt, ##__VA_ARGS__)
30 #define FIXME(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
31 #define WARN(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
32 #define TRACE(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
33 #ifndef ARRAY_SIZE
34 #define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0]))
35 #endif
36
37 typedef struct _THREAD_NAME_INFORMATION
38 {
39 UNICODE_STRING ThreadName;
40 } THREAD_NAME_INFORMATION, *PTHREAD_NAME_INFORMATION;
41
42 typedef void (CALLBACK *PNTAPCFUNC)(ULONG_PTR,ULONG_PTR,ULONG_PTR);
43 typedef void (CALLBACK *PRTL_THREAD_START_ROUTINE)(LPVOID);
44 typedef DWORD (CALLBACK *PRTL_WORK_ITEM_ROUTINE)(LPVOID);
45 typedef void (NTAPI *RTL_WAITORTIMERCALLBACKFUNC)(PVOID,BOOLEAN);
46 typedef VOID (CALLBACK *PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD,DWORD,LPVOID);
47
48 typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO);
49 NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
50 #define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC
51
52 #define CRITICAL_SECTION RTL_CRITICAL_SECTION
53 #define GetProcessHeap() RtlGetProcessHeap()
54 #define GetCurrentProcess() NtCurrentProcess()
55 #define GetCurrentThread() NtCurrentThread()
56 #define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread)
57 #else
58 #include <assert.h>
59 #include <stdarg.h>
60 #include <limits.h>
61
62 #include "ntstatus.h"
63 #define WIN32_NO_STATUS
64 #include "winternl.h"
65
66 #include "wine/debug.h"
67 #include "wine/list.h"
68
69 #include "ntdll_misc.h"
70
71 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
72 #endif
73
74 /*
75 * Old thread pooling API
76 */
77
78 struct rtl_work_item
79 {
80 PRTL_WORK_ITEM_ROUTINE function;
81 PVOID context;
82 };
83
84 #define EXPIRE_NEVER (~(ULONGLONG)0)
85 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
86
87 #ifndef __REACTOS__
88 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
89 #endif
90
91 static struct
92 {
93 HANDLE compl_port;
94 RTL_CRITICAL_SECTION threadpool_compl_cs;
95 }
96 old_threadpool =
97 {
98 NULL, /* compl_port */
99 #ifdef __REACTOS__
100 {0}, /* threadpool_compl_cs */
101 #else
102 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
103 #endif
104 };
105
106 #ifndef __REACTOS__
107 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
108 {
109 0, 0, &old_threadpool.threadpool_compl_cs,
110 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
111 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
112 };
113 #endif
114
115 struct timer_queue;
116 struct queue_timer
117 {
118 struct timer_queue *q;
119 struct list entry;
120 ULONG runcount; /* number of callbacks pending execution */
121 RTL_WAITORTIMERCALLBACKFUNC callback;
122 PVOID param;
123 DWORD period;
124 ULONG flags;
125 ULONGLONG expire;
126 BOOL destroy; /* timer should be deleted; once set, never unset */
127 HANDLE event; /* removal event */
128 };
129
130 struct timer_queue
131 {
132 DWORD magic;
133 RTL_CRITICAL_SECTION cs;
134 struct list timers; /* sorted by expiration time */
135 BOOL quit; /* queue should be deleted; once set, never unset */
136 HANDLE event;
137 HANDLE thread;
138 };
139
140 /*
141 * Object-oriented thread pooling API
142 */
143
144 #define THREADPOOL_WORKER_TIMEOUT 5000
145 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
146
147 /* internal threadpool representation */
148 struct threadpool
149 {
150 LONG refcount;
151 LONG objcount;
152 BOOL shutdown;
153 CRITICAL_SECTION cs;
154 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
155 struct list pools[3];
156 RTL_CONDITION_VARIABLE update_event;
157 /* information about worker threads, locked via .cs */
158 int max_workers;
159 int min_workers;
160 int num_workers;
161 int num_busy_workers;
162 HANDLE compl_port;
163 TP_POOL_STACK_INFORMATION stack_info;
164 };
165
166 enum threadpool_objtype
167 {
168 TP_OBJECT_TYPE_SIMPLE,
169 TP_OBJECT_TYPE_WORK,
170 TP_OBJECT_TYPE_TIMER,
171 TP_OBJECT_TYPE_WAIT,
172 TP_OBJECT_TYPE_IO,
173 };
174
175 struct io_completion
176 {
177 IO_STATUS_BLOCK iosb;
178 ULONG_PTR cvalue;
179 };
180
181 /* internal threadpool object representation */
182 struct threadpool_object
183 {
184 void *win32_callback; /* leave space for kernelbase to store win32 callback */
185 LONG refcount;
186 BOOL shutdown;
187 /* read-only information */
188 enum threadpool_objtype type;
189 struct threadpool *pool;
190 struct threadpool_group *group;
191 PVOID userdata;
192 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
193 PTP_SIMPLE_CALLBACK finalization_callback;
194 BOOL may_run_long;
195 HMODULE race_dll;
196 TP_CALLBACK_PRIORITY priority;
197 /* information about the group, locked via .group->cs */
198 struct list group_entry;
199 BOOL is_group_member;
200 /* information about the pool, locked via .pool->cs */
201 struct list pool_entry;
202 RTL_CONDITION_VARIABLE finished_event;
203 RTL_CONDITION_VARIABLE group_finished_event;
204 HANDLE completed_event;
205 LONG num_pending_callbacks;
206 LONG num_running_callbacks;
207 LONG num_associated_callbacks;
208 /* arguments for callback */
209 union
210 {
211 struct
212 {
213 PTP_SIMPLE_CALLBACK callback;
214 } simple;
215 struct
216 {
217 PTP_WORK_CALLBACK callback;
218 } work;
219 struct
220 {
221 PTP_TIMER_CALLBACK callback;
222 /* information about the timer, locked via timerqueue.cs */
223 BOOL timer_initialized;
224 BOOL timer_pending;
225 struct list timer_entry;
226 BOOL timer_set;
227 ULONGLONG timeout;
228 LONG period;
229 LONG window_length;
230 } timer;
231 struct
232 {
233 PTP_WAIT_CALLBACK callback;
234 LONG signaled;
235 /* information about the wait object, locked via waitqueue.cs */
236 struct waitqueue_bucket *bucket;
237 BOOL wait_pending;
238 struct list wait_entry;
239 ULONGLONG timeout;
240 HANDLE handle;
241 DWORD flags;
242 RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
243 } wait;
244 struct
245 {
246 PTP_IO_CALLBACK callback;
247 /* locked via .pool->cs */
248 unsigned int pending_count, skipped_count, completion_count, completion_max;
249 BOOL shutting_down;
250 struct io_completion *completions;
251 } io;
252 } u;
253 };
254
255 /* internal threadpool instance representation */
256 struct threadpool_instance
257 {
258 struct threadpool_object *object;
259 DWORD threadid;
260 BOOL associated;
261 BOOL may_run_long;
262 struct
263 {
264 CRITICAL_SECTION *critical_section;
265 HANDLE mutex;
266 HANDLE semaphore;
267 LONG semaphore_count;
268 HANDLE event;
269 HMODULE library;
270 } cleanup;
271 };
272
273 /* internal threadpool group representation */
274 struct threadpool_group
275 {
276 LONG refcount;
277 BOOL shutdown;
278 CRITICAL_SECTION cs;
279 /* list of group members, locked via .cs */
280 struct list members;
281 };
282
283 #ifndef __REACTOS__
284 /* global timerqueue object */
285 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
286 #endif
287
288 static struct
289 {
290 CRITICAL_SECTION cs;
291 LONG objcount;
292 BOOL thread_running;
293 struct list pending_timers;
294 RTL_CONDITION_VARIABLE update_event;
295 }
296 timerqueue =
297 {
298 #ifdef __REACTOS__
299 {0}, /* cs */
300 #else
301 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
302 #endif
303 0, /* objcount */
304 FALSE, /* thread_running */
305 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
306 #if __REACTOS__
307 0,
308 #else
309 RTL_CONDITION_VARIABLE_INIT /* update_event */
310 #endif
311 };
312
313 #ifndef __REACTOS__
314 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
315 {
316 0, 0, &timerqueue.cs,
317 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
318 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
319 };
320
321 /* global waitqueue object */
322 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
323 #endif
324
325 static struct
326 {
327 CRITICAL_SECTION cs;
328 LONG num_buckets;
329 struct list buckets;
330 }
331 waitqueue =
332 {
333 #ifdef __REACTOS__
334 {0}, /* cs */
335 #else
336 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
337 #endif
338 0, /* num_buckets */
339 LIST_INIT( waitqueue.buckets ) /* buckets */
340 };
341
342 #ifndef __REACTOS__
343 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
344 {
345 0, 0, &waitqueue.cs,
346 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
347 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
348 };
349 #endif
350
351 struct waitqueue_bucket
352 {
353 struct list bucket_entry;
354 LONG objcount;
355 struct list reserved;
356 struct list waiting;
357 HANDLE update_event;
358 BOOL alertable;
359 };
360
361 #ifndef __REACTOS__
362 /* global I/O completion queue object */
363 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
364 #endif
365
366 static struct
367 {
368 CRITICAL_SECTION cs;
369 LONG objcount;
370 BOOL thread_running;
371 HANDLE port;
372 RTL_CONDITION_VARIABLE update_event;
373 }
374 ioqueue =
375 {
376 #ifdef __REACTOS__
377 .cs = {0},
378 #else
379 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
380 #endif
381 };
382
383 #ifndef __REACTOS__
384 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
385 {
386 0, 0, &ioqueue.cs,
387 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
388 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
389 };
390 #endif
391
impl_from_TP_POOL(TP_POOL * pool)392 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
393 {
394 return (struct threadpool *)pool;
395 }
396
impl_from_TP_WORK(TP_WORK * work)397 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
398 {
399 struct threadpool_object *object = (struct threadpool_object *)work;
400 assert( object->type == TP_OBJECT_TYPE_WORK );
401 return object;
402 }
403
impl_from_TP_TIMER(TP_TIMER * timer)404 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
405 {
406 struct threadpool_object *object = (struct threadpool_object *)timer;
407 assert( object->type == TP_OBJECT_TYPE_TIMER );
408 return object;
409 }
410
impl_from_TP_WAIT(TP_WAIT * wait)411 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
412 {
413 struct threadpool_object *object = (struct threadpool_object *)wait;
414 assert( object->type == TP_OBJECT_TYPE_WAIT );
415 return object;
416 }
417
impl_from_TP_IO(TP_IO * io)418 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
419 {
420 struct threadpool_object *object = (struct threadpool_object *)io;
421 assert( object->type == TP_OBJECT_TYPE_IO );
422 return object;
423 }
424
impl_from_TP_CLEANUP_GROUP(TP_CLEANUP_GROUP * group)425 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
426 {
427 return (struct threadpool_group *)group;
428 }
429
impl_from_TP_CALLBACK_INSTANCE(TP_CALLBACK_INSTANCE * instance)430 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
431 {
432 return (struct threadpool_instance *)instance;
433 }
434
435 #ifdef __REACTOS__
436 ULONG NTAPI threadpool_worker_proc(PVOID param );
437 #else
438 static void CALLBACK threadpool_worker_proc( void *param );
439 #endif
440 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
441 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
442 static void tp_object_prepare_shutdown( struct threadpool_object *object );
443 static BOOL tp_object_release( struct threadpool_object *object );
444 static struct threadpool *default_threadpool = NULL;
445
array_reserve(void ** elements,unsigned int * capacity,unsigned int count,unsigned int size)446 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
447 {
448 unsigned int new_capacity, max_capacity;
449 void *new_elements;
450
451 if (count <= *capacity)
452 return TRUE;
453
454 max_capacity = ~(SIZE_T)0 / size;
455 if (count > max_capacity)
456 return FALSE;
457
458 new_capacity = max(4, *capacity);
459 while (new_capacity < count && new_capacity <= max_capacity / 2)
460 new_capacity *= 2;
461 if (new_capacity < count)
462 new_capacity = max_capacity;
463
464 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
465 return FALSE;
466
467 *elements = new_elements;
468 *capacity = new_capacity;
469
470 return TRUE;
471 }
472
set_thread_name(const WCHAR * name)473 static void set_thread_name(const WCHAR *name)
474 {
475 #ifndef __REACTOS__ // This is impossible on non vista+
476 THREAD_NAME_INFORMATION info;
477
478 RtlInitUnicodeString(&info.ThreadName, name);
479 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
480 #endif
481 }
482
483 #ifndef __REACTOS__
process_rtl_work_item(TP_CALLBACK_INSTANCE * instance,void * userdata)484 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
485 {
486 struct rtl_work_item *item = userdata;
487
488 TRACE("executing %p(%p)\n", item->function, item->context);
489 item->function( item->context );
490
491 RtlFreeHeap( GetProcessHeap(), 0, item );
492 }
493
494 /***********************************************************************
495 * RtlQueueWorkItem (NTDLL.@)
496 *
497 * Queues a work item into a thread in the thread pool.
498 *
499 * PARAMS
500 * function [I] Work function to execute.
501 * context [I] Context to pass to the work function when it is executed.
502 * flags [I] Flags. See notes.
503 *
504 * RETURNS
505 * Success: STATUS_SUCCESS.
506 * Failure: Any NTSTATUS code.
507 *
508 * NOTES
509 * Flags can be one or more of the following:
510 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
511 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
512 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
513 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
514 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
515 */
RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE function,PVOID context,ULONG flags)516 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
517 {
518 TP_CALLBACK_ENVIRON environment;
519 struct rtl_work_item *item;
520 NTSTATUS status;
521
522 TRACE( "%p %p %lu\n", function, context, flags );
523
524 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
525 if (!item)
526 return STATUS_NO_MEMORY;
527
528 memset( &environment, 0, sizeof(environment) );
529 environment.Version = 1;
530 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
531 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
532
533 item->function = function;
534 item->context = context;
535
536 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
537 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
538 return status;
539 }
540
541 /***********************************************************************
542 * iocp_poller - get completion events and run callbacks
543 */
iocp_poller(LPVOID Arg)544 static DWORD CALLBACK iocp_poller(LPVOID Arg)
545 {
546 HANDLE cport = Arg;
547
548 while( TRUE )
549 {
550 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
551 LPVOID overlapped;
552 IO_STATUS_BLOCK iosb;
553 #ifdef __REACTOS__
554 NTSTATUS res = NtRemoveIoCompletion( cport, (PVOID)&callback, (PVOID)&overlapped, &iosb, NULL );
555 #else
556 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
557 #endif
558 if (res)
559 {
560 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
561 }
562 else
563 {
564 DWORD transferred = 0;
565 DWORD err = 0;
566
567 if (iosb.Status == STATUS_SUCCESS)
568 transferred = iosb.Information;
569 else
570 err = RtlNtStatusToDosError(iosb.Status);
571
572 callback( err, transferred, overlapped );
573 }
574 }
575 return 0;
576 }
577
578 /***********************************************************************
579 * RtlSetIoCompletionCallback (NTDLL.@)
580 *
581 * Binds a handle to a thread pool's completion port, and possibly
582 * starts a non-I/O thread to monitor this port and call functions back.
583 *
584 * PARAMS
585 * FileHandle [I] Handle to bind to a completion port.
586 * Function [I] Callback function to call on I/O completions.
587 * Flags [I] Not used.
588 *
589 * RETURNS
590 * Success: STATUS_SUCCESS.
591 * Failure: Any NTSTATUS code.
592 *
593 */
RtlSetIoCompletionCallback(HANDLE FileHandle,PRTL_OVERLAPPED_COMPLETION_ROUTINE Function,ULONG Flags)594 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
595 {
596 IO_STATUS_BLOCK iosb;
597 FILE_COMPLETION_INFORMATION info;
598
599 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
600
601 if (!old_threadpool.compl_port)
602 {
603 NTSTATUS res = STATUS_SUCCESS;
604
605 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
606 if (!old_threadpool.compl_port)
607 {
608 HANDLE cport;
609
610 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
611 if (!res)
612 {
613 /* FIXME native can start additional threads in case of e.g. hung callback function. */
614 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
615 if (!res)
616 old_threadpool.compl_port = cport;
617 else
618 NtClose( cport );
619 }
620 }
621 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
622 if (res) return res;
623 }
624
625 info.CompletionPort = old_threadpool.compl_port;
626 info.CompletionKey = (ULONG_PTR)Function;
627
628 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
629 }
630 #endif
631
get_nt_timeout(PLARGE_INTEGER pTime,ULONG timeout)632 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
633 {
634 if (timeout == INFINITE) return NULL;
635 pTime->QuadPart = (ULONGLONG)timeout * -10000;
636 return pTime;
637 }
638
639 /************************** Timer Queue Impl **************************/
640
queue_remove_timer(struct queue_timer * t)641 static void queue_remove_timer(struct queue_timer *t)
642 {
643 /* We MUST hold the queue cs while calling this function. This ensures
644 that we cannot queue another callback for this timer. The runcount
645 being zero makes sure we don't have any already queued. */
646 struct timer_queue *q = t->q;
647
648 assert(t->runcount == 0);
649 assert(t->destroy);
650
651 list_remove(&t->entry);
652 if (t->event)
653 NtSetEvent(t->event, NULL);
654 RtlFreeHeap(GetProcessHeap(), 0, t);
655
656 if (q->quit && list_empty(&q->timers))
657 NtSetEvent(q->event, NULL);
658 }
659
timer_cleanup_callback(struct queue_timer * t)660 static void timer_cleanup_callback(struct queue_timer *t)
661 {
662 struct timer_queue *q = t->q;
663 RtlEnterCriticalSection(&q->cs);
664
665 assert(0 < t->runcount);
666 --t->runcount;
667
668 if (t->destroy && t->runcount == 0)
669 queue_remove_timer(t);
670
671 RtlLeaveCriticalSection(&q->cs);
672 }
673
timer_callback_wrapper(LPVOID p)674 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
675 {
676 struct queue_timer *t = p;
677 t->callback(t->param, TRUE);
678 timer_cleanup_callback(t);
679 return 0;
680 }
681
queue_current_time(void)682 static inline ULONGLONG queue_current_time(void)
683 {
684 LARGE_INTEGER now, freq;
685 NtQueryPerformanceCounter(&now, &freq);
686 return now.QuadPart * 1000 / freq.QuadPart;
687 }
688
queue_add_timer(struct queue_timer * t,ULONGLONG time,BOOL set_event)689 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
690 BOOL set_event)
691 {
692 /* We MUST hold the queue cs while calling this function. */
693 struct timer_queue *q = t->q;
694 struct list *ptr = &q->timers;
695
696 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
697
698 if (time != EXPIRE_NEVER)
699 LIST_FOR_EACH(ptr, &q->timers)
700 {
701 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
702 if (time < cur->expire)
703 break;
704 }
705 list_add_before(ptr, &t->entry);
706
707 t->expire = time;
708
709 /* If we insert at the head of the list, we need to expire sooner
710 than expected. */
711 if (set_event && &t->entry == list_head(&q->timers))
712 NtSetEvent(q->event, NULL);
713 }
714
queue_move_timer(struct queue_timer * t,ULONGLONG time,BOOL set_event)715 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
716 BOOL set_event)
717 {
718 /* We MUST hold the queue cs while calling this function. */
719 list_remove(&t->entry);
720 queue_add_timer(t, time, set_event);
721 }
722
queue_timer_expire(struct timer_queue * q)723 static void queue_timer_expire(struct timer_queue *q)
724 {
725 struct queue_timer *t = NULL;
726
727 RtlEnterCriticalSection(&q->cs);
728 if (list_head(&q->timers))
729 {
730 ULONGLONG now, next;
731 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
732 if (!t->destroy && t->expire <= ((now = queue_current_time())))
733 {
734 ++t->runcount;
735 if (t->period)
736 {
737 next = t->expire + t->period;
738 /* avoid trigger cascade if overloaded / hibernated */
739 if (next < now)
740 next = now + t->period;
741 }
742 else
743 next = EXPIRE_NEVER;
744 queue_move_timer(t, next, FALSE);
745 }
746 else
747 t = NULL;
748 }
749 RtlLeaveCriticalSection(&q->cs);
750
751 if (t)
752 {
753 if (t->flags & WT_EXECUTEINTIMERTHREAD)
754 timer_callback_wrapper(t);
755 else
756 {
757 ULONG flags
758 = (t->flags
759 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
760 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
761 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
762 if (status != STATUS_SUCCESS)
763 timer_cleanup_callback(t);
764 }
765 }
766 }
767
queue_get_timeout(struct timer_queue * q)768 static ULONG queue_get_timeout(struct timer_queue *q)
769 {
770 struct queue_timer *t;
771 ULONG timeout = INFINITE;
772
773 RtlEnterCriticalSection(&q->cs);
774 if (list_head(&q->timers))
775 {
776 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
777 assert(!t->destroy || t->expire == EXPIRE_NEVER);
778
779 if (t->expire != EXPIRE_NEVER)
780 {
781 ULONGLONG time = queue_current_time();
782 timeout = t->expire < time ? 0 : t->expire - time;
783 }
784 }
785 RtlLeaveCriticalSection(&q->cs);
786
787 return timeout;
788 }
789
790 #ifdef __REACTOS__
timer_queue_thread_proc(PVOID p)791 ULONG NTAPI timer_queue_thread_proc(PVOID p)
792 #else
793 static void WINAPI timer_queue_thread_proc(LPVOID p)
794 #endif
795 {
796 struct timer_queue *q = p;
797 ULONG timeout_ms;
798
799 set_thread_name(L"wine_threadpool_timer_queue");
800 timeout_ms = INFINITE;
801 for (;;)
802 {
803 LARGE_INTEGER timeout;
804 NTSTATUS status;
805 BOOL done = FALSE;
806
807 status = NtWaitForSingleObject(
808 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
809
810 if (status == STATUS_WAIT_0)
811 {
812 /* There are two possible ways to trigger the event. Either
813 we are quitting and the last timer got removed, or a new
814 timer got put at the head of the list so we need to adjust
815 our timeout. */
816 RtlEnterCriticalSection(&q->cs);
817 if (q->quit && list_empty(&q->timers))
818 done = TRUE;
819 RtlLeaveCriticalSection(&q->cs);
820 }
821 else if (status == STATUS_TIMEOUT)
822 queue_timer_expire(q);
823
824 if (done)
825 break;
826
827 timeout_ms = queue_get_timeout(q);
828 }
829
830 NtClose(q->event);
831 RtlDeleteCriticalSection(&q->cs);
832 q->magic = 0;
833 RtlFreeHeap(GetProcessHeap(), 0, q);
834 RtlExitUserThread( 0 );
835 #ifdef __REACTOS__
836 return STATUS_SUCCESS;
837 #endif
838 }
839
queue_destroy_timer(struct queue_timer * t)840 static void queue_destroy_timer(struct queue_timer *t)
841 {
842 /* We MUST hold the queue cs while calling this function. */
843 t->destroy = TRUE;
844 if (t->runcount == 0)
845 /* Ensure a timer is promptly removed. If callbacks are pending,
846 it will be removed after the last one finishes by the callback
847 cleanup wrapper. */
848 queue_remove_timer(t);
849 else
850 /* Make sure no destroyed timer masks an active timer at the head
851 of the sorted list. */
852 queue_move_timer(t, EXPIRE_NEVER, FALSE);
853 }
854
855 /***********************************************************************
856 * RtlCreateTimerQueue (NTDLL.@)
857 *
858 * Creates a timer queue object and returns a handle to it.
859 *
860 * PARAMS
861 * NewTimerQueue [O] The newly created queue.
862 *
863 * RETURNS
864 * Success: STATUS_SUCCESS.
865 * Failure: Any NTSTATUS code.
866 */
RtlCreateTimerQueue(PHANDLE NewTimerQueue)867 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
868 {
869 NTSTATUS status;
870 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
871 if (!q)
872 return STATUS_NO_MEMORY;
873
874 RtlInitializeCriticalSection(&q->cs);
875 list_init(&q->timers);
876 q->quit = FALSE;
877 q->magic = TIMER_QUEUE_MAGIC;
878 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
879 if (status != STATUS_SUCCESS)
880 {
881 RtlFreeHeap(GetProcessHeap(), 0, q);
882 return status;
883 }
884 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
885 timer_queue_thread_proc, q, &q->thread, NULL);
886 if (status != STATUS_SUCCESS)
887 {
888 NtClose(q->event);
889 RtlFreeHeap(GetProcessHeap(), 0, q);
890 return status;
891 }
892
893 *NewTimerQueue = q;
894 return STATUS_SUCCESS;
895 }
896
897 /***********************************************************************
898 * RtlDeleteTimerQueueEx (NTDLL.@)
899 *
900 * Deletes a timer queue object.
901 *
902 * PARAMS
903 * TimerQueue [I] The timer queue to destroy.
904 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
905 * wait until all timers are finished firing before
906 * returning. Otherwise, return immediately and set the
907 * event when all timers are done.
908 *
909 * RETURNS
910 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
911 * Failure: Any NTSTATUS code.
912 */
RtlDeleteTimerQueueEx(HANDLE TimerQueue,HANDLE CompletionEvent)913 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
914 {
915 struct timer_queue *q = TimerQueue;
916 struct queue_timer *t, *temp;
917 HANDLE thread;
918 NTSTATUS status;
919
920 if (!q || q->magic != TIMER_QUEUE_MAGIC)
921 return STATUS_INVALID_HANDLE;
922
923 thread = q->thread;
924
925 RtlEnterCriticalSection(&q->cs);
926 q->quit = TRUE;
927 if (list_head(&q->timers))
928 /* When the last timer is removed, it will signal the timer thread to
929 exit... */
930 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
931 queue_destroy_timer(t);
932 else
933 /* However if we have none, we must do it ourselves. */
934 NtSetEvent(q->event, NULL);
935 RtlLeaveCriticalSection(&q->cs);
936
937 if (CompletionEvent == INVALID_HANDLE_VALUE)
938 {
939 NtWaitForSingleObject(thread, FALSE, NULL);
940 status = STATUS_SUCCESS;
941 }
942 else
943 {
944 if (CompletionEvent)
945 {
946 FIXME("asynchronous return on completion event unimplemented\n");
947 NtWaitForSingleObject(thread, FALSE, NULL);
948 NtSetEvent(CompletionEvent, NULL);
949 }
950 status = STATUS_PENDING;
951 }
952
953 NtClose(thread);
954 return status;
955 }
956
get_timer_queue(HANDLE TimerQueue)957 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
958 {
959 static struct timer_queue *default_timer_queue;
960
961 if (TimerQueue)
962 return TimerQueue;
963 else
964 {
965 if (!default_timer_queue)
966 {
967 HANDLE q;
968 NTSTATUS status = RtlCreateTimerQueue(&q);
969 if (status == STATUS_SUCCESS)
970 {
971 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
972 if (p)
973 /* Got beat to the punch. */
974 RtlDeleteTimerQueueEx(q, NULL);
975 }
976 }
977 return default_timer_queue;
978 }
979 }
980
981 /***********************************************************************
982 * RtlCreateTimer (NTDLL.@)
983 *
984 * Creates a new timer associated with the given queue.
985 *
986 * PARAMS
987 * TimerQueue [I] The queue to hold the timer.
988 * NewTimer [O] The newly created timer.
989 * Callback [I] The callback to fire.
990 * Parameter [I] The argument for the callback.
991 * DueTime [I] The delay, in milliseconds, before first firing the
992 * timer.
993 * Period [I] The period, in milliseconds, at which to fire the timer
994 * after the first callback. If zero, the timer will only
995 * fire once. It still needs to be deleted with
996 * RtlDeleteTimer.
997 * Flags [I] Flags controlling the execution of the callback. In
998 * addition to the WT_* thread pool flags (see
999 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1000 * WT_EXECUTEONLYONCE are supported.
1001 *
1002 * RETURNS
1003 * Success: STATUS_SUCCESS.
1004 * Failure: Any NTSTATUS code.
1005 */
RtlCreateTimer(HANDLE TimerQueue,HANDLE * NewTimer,RTL_WAITORTIMERCALLBACKFUNC Callback,PVOID Parameter,DWORD DueTime,DWORD Period,ULONG Flags)1006 NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
1007 RTL_WAITORTIMERCALLBACKFUNC Callback,
1008 PVOID Parameter, DWORD DueTime, DWORD Period,
1009 ULONG Flags)
1010 {
1011 NTSTATUS status;
1012 struct queue_timer *t;
1013 struct timer_queue *q = get_timer_queue(TimerQueue);
1014
1015 if (!q) return STATUS_NO_MEMORY;
1016 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1017
1018 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1019 if (!t)
1020 return STATUS_NO_MEMORY;
1021
1022 t->q = q;
1023 t->runcount = 0;
1024 t->callback = Callback;
1025 t->param = Parameter;
1026 t->period = Period;
1027 t->flags = Flags;
1028 t->destroy = FALSE;
1029 t->event = NULL;
1030
1031 status = STATUS_SUCCESS;
1032 RtlEnterCriticalSection(&q->cs);
1033 if (q->quit)
1034 status = STATUS_INVALID_HANDLE;
1035 else
1036 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1037 RtlLeaveCriticalSection(&q->cs);
1038
1039 if (status == STATUS_SUCCESS)
1040 *NewTimer = t;
1041 else
1042 RtlFreeHeap(GetProcessHeap(), 0, t);
1043
1044 return status;
1045 }
1046
1047 /***********************************************************************
1048 * RtlUpdateTimer (NTDLL.@)
1049 *
1050 * Changes the time at which a timer expires.
1051 *
1052 * PARAMS
1053 * TimerQueue [I] The queue that holds the timer.
1054 * Timer [I] The timer to update.
1055 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1056 * Period [I] The period, in milliseconds, at which to fire the timer
1057 * after the first callback. If zero, the timer will not
1058 * refire once. It still needs to be deleted with
1059 * RtlDeleteTimer.
1060 *
1061 * RETURNS
1062 * Success: STATUS_SUCCESS.
1063 * Failure: Any NTSTATUS code.
1064 */
RtlUpdateTimer(HANDLE TimerQueue,HANDLE Timer,DWORD DueTime,DWORD Period)1065 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1066 DWORD DueTime, DWORD Period)
1067 {
1068 struct queue_timer *t = Timer;
1069 struct timer_queue *q = t->q;
1070
1071 RtlEnterCriticalSection(&q->cs);
1072 /* Can't change a timer if it was once-only or destroyed. */
1073 if (t->expire != EXPIRE_NEVER)
1074 {
1075 t->period = Period;
1076 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1077 }
1078 RtlLeaveCriticalSection(&q->cs);
1079
1080 return STATUS_SUCCESS;
1081 }
1082
1083 /***********************************************************************
1084 * RtlDeleteTimer (NTDLL.@)
1085 *
1086 * Cancels a timer-queue timer.
1087 *
1088 * PARAMS
1089 * TimerQueue [I] The queue that holds the timer.
1090 * Timer [I] The timer to update.
1091 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1092 * wait until the timer is finished firing all pending
1093 * callbacks before returning. Otherwise, return
1094 * immediately and set the timer is done.
1095 *
1096 * RETURNS
1097 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1098 or if the completion event is NULL.
1099 * Failure: Any NTSTATUS code.
1100 */
RtlDeleteTimer(HANDLE TimerQueue,HANDLE Timer,HANDLE CompletionEvent)1101 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1102 HANDLE CompletionEvent)
1103 {
1104 struct queue_timer *t = Timer;
1105 struct timer_queue *q;
1106 NTSTATUS status = STATUS_PENDING;
1107 HANDLE event = NULL;
1108
1109 if (!Timer)
1110 return STATUS_INVALID_PARAMETER_1;
1111 q = t->q;
1112 if (CompletionEvent == INVALID_HANDLE_VALUE)
1113 {
1114 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1115 if (status == STATUS_SUCCESS)
1116 status = STATUS_PENDING;
1117 }
1118 else if (CompletionEvent)
1119 event = CompletionEvent;
1120
1121 RtlEnterCriticalSection(&q->cs);
1122 t->event = event;
1123 if (t->runcount == 0 && event)
1124 status = STATUS_SUCCESS;
1125 queue_destroy_timer(t);
1126 RtlLeaveCriticalSection(&q->cs);
1127
1128 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1129 {
1130 if (status == STATUS_PENDING)
1131 {
1132 NtWaitForSingleObject(event, FALSE, NULL);
1133 status = STATUS_SUCCESS;
1134 }
1135 NtClose(event);
1136 }
1137
1138 return status;
1139 }
1140
1141 /***********************************************************************
1142 * timerqueue_thread_proc (internal)
1143 */
1144 #ifdef __REACTOS__
timerqueue_thread_proc(PVOID param)1145 ULONG NTAPI timerqueue_thread_proc(PVOID param )
1146 #else
1147 static void CALLBACK timerqueue_thread_proc( void *param )
1148 #endif
1149 {
1150 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1151 struct threadpool_object *other_timer;
1152 LARGE_INTEGER now, timeout;
1153 struct list *ptr;
1154
1155 TRACE( "starting timer queue thread\n" );
1156 set_thread_name(L"wine_threadpool_timerqueue");
1157
1158 RtlEnterCriticalSection( &timerqueue.cs );
1159 for (;;)
1160 {
1161 NtQuerySystemTime( &now );
1162
1163 /* Check for expired timers. */
1164 while ((ptr = list_head( &timerqueue.pending_timers )))
1165 {
1166 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1167 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1168 assert( timer->u.timer.timer_pending );
1169 if (timer->u.timer.timeout > now.QuadPart)
1170 break;
1171
1172 /* Queue a new callback in one of the worker threads. */
1173 list_remove( &timer->u.timer.timer_entry );
1174 timer->u.timer.timer_pending = FALSE;
1175 tp_object_submit( timer, FALSE );
1176
1177 /* Insert the timer back into the queue, except it's marked for shutdown. */
1178 if (timer->u.timer.period && !timer->shutdown)
1179 {
1180 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1181 if (timer->u.timer.timeout <= now.QuadPart)
1182 timer->u.timer.timeout = now.QuadPart + 1;
1183
1184 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1185 struct threadpool_object, u.timer.timer_entry )
1186 {
1187 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1188 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1189 break;
1190 }
1191 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1192 timer->u.timer.timer_pending = TRUE;
1193 }
1194 }
1195
1196 timeout_lower = timeout_upper = MAXLONGLONG;
1197
1198 /* Determine next timeout and use the window length to optimize wakeup times. */
1199 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1200 struct threadpool_object, u.timer.timer_entry )
1201 {
1202 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1203 if (other_timer->u.timer.timeout >= timeout_upper)
1204 break;
1205
1206 timeout_lower = other_timer->u.timer.timeout;
1207 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1208 if (new_timeout < timeout_upper)
1209 timeout_upper = new_timeout;
1210 }
1211
1212 /* Wait for timer update events or until the next timer expires. */
1213 if (timerqueue.objcount)
1214 {
1215 timeout.QuadPart = timeout_lower;
1216 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1217 continue;
1218 }
1219
1220 /* All timers have been destroyed, if no new timers are created
1221 * within some amount of time, then we can shutdown this thread. */
1222 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1223 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1224 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1225 {
1226 break;
1227 }
1228 }
1229
1230 timerqueue.thread_running = FALSE;
1231 RtlLeaveCriticalSection( &timerqueue.cs );
1232
1233 TRACE( "terminating timer queue thread\n" );
1234 RtlExitUserThread( 0 );
1235 #ifdef __REACTOS__
1236 return STATUS_SUCCESS;
1237 #endif
1238 }
1239
1240 /***********************************************************************
1241 * tp_new_worker_thread (internal)
1242 *
1243 * Create and account a new worker thread for the desired pool.
1244 */
tp_new_worker_thread(struct threadpool * pool)1245 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1246 {
1247 HANDLE thread;
1248 NTSTATUS status;
1249
1250 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0,
1251 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1252 threadpool_worker_proc, pool, &thread, NULL );
1253 if (status == STATUS_SUCCESS)
1254 {
1255 InterlockedIncrement( &pool->refcount );
1256 pool->num_workers++;
1257 NtClose( thread );
1258 }
1259 return status;
1260 }
1261
1262 /***********************************************************************
1263 * tp_timerqueue_lock (internal)
1264 *
1265 * Acquires a lock on the global timerqueue. When the lock is acquired
1266 * successfully, it is guaranteed that the timer thread is running.
1267 */
tp_timerqueue_lock(struct threadpool_object * timer)1268 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1269 {
1270 NTSTATUS status = STATUS_SUCCESS;
1271 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1272
1273 timer->u.timer.timer_initialized = FALSE;
1274 timer->u.timer.timer_pending = FALSE;
1275 timer->u.timer.timer_set = FALSE;
1276 timer->u.timer.timeout = 0;
1277 timer->u.timer.period = 0;
1278 timer->u.timer.window_length = 0;
1279
1280 RtlEnterCriticalSection( &timerqueue.cs );
1281
1282 /* Make sure that the timerqueue thread is running. */
1283 if (!timerqueue.thread_running)
1284 {
1285 HANDLE thread;
1286 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1287 timerqueue_thread_proc, NULL, &thread, NULL );
1288 if (status == STATUS_SUCCESS)
1289 {
1290 timerqueue.thread_running = TRUE;
1291 NtClose( thread );
1292 }
1293 }
1294
1295 if (status == STATUS_SUCCESS)
1296 {
1297 timer->u.timer.timer_initialized = TRUE;
1298 timerqueue.objcount++;
1299 }
1300
1301 RtlLeaveCriticalSection( &timerqueue.cs );
1302 return status;
1303 }
1304
1305 /***********************************************************************
1306 * tp_timerqueue_unlock (internal)
1307 *
1308 * Releases a lock on the global timerqueue.
1309 */
tp_timerqueue_unlock(struct threadpool_object * timer)1310 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1311 {
1312 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1313
1314 RtlEnterCriticalSection( &timerqueue.cs );
1315 if (timer->u.timer.timer_initialized)
1316 {
1317 /* If timer was pending, remove it. */
1318 if (timer->u.timer.timer_pending)
1319 {
1320 list_remove( &timer->u.timer.timer_entry );
1321 timer->u.timer.timer_pending = FALSE;
1322 }
1323
1324 /* If the last timer object was destroyed, then wake up the thread. */
1325 if (!--timerqueue.objcount)
1326 {
1327 assert( list_empty( &timerqueue.pending_timers ) );
1328 RtlWakeAllConditionVariable( &timerqueue.update_event );
1329 }
1330
1331 timer->u.timer.timer_initialized = FALSE;
1332 }
1333 RtlLeaveCriticalSection( &timerqueue.cs );
1334 }
1335
1336 /***********************************************************************
1337 * waitqueue_thread_proc (internal)
1338 */
1339 #ifdef __REACTOS__
waitqueue_thread_proc(PVOID param)1340 void NTAPI waitqueue_thread_proc(PVOID param )
1341 #else
1342 static void CALLBACK waitqueue_thread_proc( void *param )
1343 #endif
1344 {
1345 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1346 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1347 struct waitqueue_bucket *bucket = param;
1348 struct threadpool_object *wait, *next;
1349 LARGE_INTEGER now, timeout;
1350 DWORD num_handles;
1351 NTSTATUS status;
1352
1353 TRACE( "starting wait queue thread\n" );
1354 set_thread_name(L"wine_threadpool_waitqueue");
1355
1356 RtlEnterCriticalSection( &waitqueue.cs );
1357
1358 for (;;)
1359 {
1360 NtQuerySystemTime( &now );
1361 timeout.QuadPart = MAXLONGLONG;
1362 num_handles = 0;
1363
1364 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1365 u.wait.wait_entry )
1366 {
1367 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1368 if (wait->u.wait.timeout <= now.QuadPart)
1369 {
1370 /* Wait object timed out. */
1371 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1372 {
1373 list_remove( &wait->u.wait.wait_entry );
1374 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1375 }
1376 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1377 {
1378 InterlockedIncrement( &wait->refcount );
1379 wait->num_pending_callbacks++;
1380 RtlEnterCriticalSection( &wait->pool->cs );
1381 tp_object_execute( wait, TRUE );
1382 RtlLeaveCriticalSection( &wait->pool->cs );
1383 tp_object_release( wait );
1384 }
1385 else tp_object_submit( wait, FALSE );
1386 }
1387 else
1388 {
1389 if (wait->u.wait.timeout < timeout.QuadPart)
1390 timeout.QuadPart = wait->u.wait.timeout;
1391
1392 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1393 InterlockedIncrement( &wait->refcount );
1394 objects[num_handles] = wait;
1395 handles[num_handles] = wait->u.wait.handle;
1396 num_handles++;
1397 }
1398 }
1399
1400 if (!bucket->objcount)
1401 {
1402 /* All wait objects have been destroyed, if no new wait objects are created
1403 * within some amount of time, then we can shutdown this thread. */
1404 assert( num_handles == 0 );
1405 RtlLeaveCriticalSection( &waitqueue.cs );
1406 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1407 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1408 RtlEnterCriticalSection( &waitqueue.cs );
1409
1410 if (status == STATUS_TIMEOUT && !bucket->objcount)
1411 break;
1412 }
1413 else
1414 {
1415 handles[num_handles] = bucket->update_event;
1416 RtlLeaveCriticalSection( &waitqueue.cs );
1417 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1418 RtlEnterCriticalSection( &waitqueue.cs );
1419
1420 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1421 {
1422 wait = objects[status - STATUS_WAIT_0];
1423 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1424 if (wait->u.wait.bucket)
1425 {
1426 /* Wait object signaled. */
1427 assert( wait->u.wait.bucket == bucket );
1428 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1429 {
1430 list_remove( &wait->u.wait.wait_entry );
1431 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1432 }
1433 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1434 {
1435 wait->u.wait.signaled++;
1436 wait->num_pending_callbacks++;
1437 RtlEnterCriticalSection( &wait->pool->cs );
1438 tp_object_execute( wait, TRUE );
1439 RtlLeaveCriticalSection( &wait->pool->cs );
1440 }
1441 else tp_object_submit( wait, TRUE );
1442 }
1443 else
1444 WARN("wait object %p triggered while object was destroyed\n", wait);
1445 }
1446
1447 /* Release temporary references to wait objects. */
1448 while (num_handles)
1449 {
1450 wait = objects[--num_handles];
1451 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1452 tp_object_release( wait );
1453 }
1454 }
1455
1456 /* Try to merge bucket with other threads. */
1457 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1458 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1459 {
1460 struct waitqueue_bucket *other_bucket;
1461 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1462 {
1463 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1464 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1465 {
1466 other_bucket->objcount += bucket->objcount;
1467 bucket->objcount = 0;
1468
1469 /* Update reserved list. */
1470 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1471 {
1472 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1473 wait->u.wait.bucket = other_bucket;
1474 }
1475 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1476
1477 /* Update waiting list. */
1478 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1479 {
1480 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1481 wait->u.wait.bucket = other_bucket;
1482 }
1483 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1484
1485 /* Move bucket to the end, to keep the probability of
1486 * newly added wait objects as small as possible. */
1487 list_remove( &bucket->bucket_entry );
1488 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1489
1490 NtSetEvent( other_bucket->update_event, NULL );
1491 break;
1492 }
1493 }
1494 }
1495 }
1496
1497 /* Remove this bucket from the list. */
1498 list_remove( &bucket->bucket_entry );
1499 if (!--waitqueue.num_buckets)
1500 assert( list_empty( &waitqueue.buckets ) );
1501
1502 RtlLeaveCriticalSection( &waitqueue.cs );
1503
1504 TRACE( "terminating wait queue thread\n" );
1505
1506 assert( bucket->objcount == 0 );
1507 assert( list_empty( &bucket->reserved ) );
1508 assert( list_empty( &bucket->waiting ) );
1509 NtClose( bucket->update_event );
1510
1511 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1512 RtlExitUserThread( 0 );
1513 }
1514
1515 /***********************************************************************
1516 * tp_waitqueue_lock (internal)
1517 */
tp_waitqueue_lock(struct threadpool_object * wait)1518 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1519 {
1520 struct waitqueue_bucket *bucket;
1521 NTSTATUS status;
1522 HANDLE thread;
1523 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1524 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1525
1526 wait->u.wait.signaled = 0;
1527 wait->u.wait.bucket = NULL;
1528 wait->u.wait.wait_pending = FALSE;
1529 wait->u.wait.timeout = 0;
1530 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1531
1532 RtlEnterCriticalSection( &waitqueue.cs );
1533
1534 /* Try to assign to existing bucket if possible. */
1535 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1536 {
1537 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1538 {
1539 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1540 wait->u.wait.bucket = bucket;
1541 bucket->objcount++;
1542
1543 status = STATUS_SUCCESS;
1544 goto out;
1545 }
1546 }
1547
1548 /* Create a new bucket and corresponding worker thread. */
1549 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1550 if (!bucket)
1551 {
1552 status = STATUS_NO_MEMORY;
1553 goto out;
1554 }
1555
1556 bucket->objcount = 0;
1557 bucket->alertable = alertable;
1558 list_init( &bucket->reserved );
1559 list_init( &bucket->waiting );
1560
1561 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1562 NULL, SynchronizationEvent, FALSE );
1563 if (status)
1564 {
1565 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1566 goto out;
1567 }
1568
1569 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1570 (PTHREAD_START_ROUTINE)waitqueue_thread_proc, bucket, &thread, NULL );
1571 if (status == STATUS_SUCCESS)
1572 {
1573 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1574 waitqueue.num_buckets++;
1575
1576 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1577 wait->u.wait.bucket = bucket;
1578 bucket->objcount++;
1579
1580 NtClose( thread );
1581 }
1582 else
1583 {
1584 NtClose( bucket->update_event );
1585 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1586 }
1587
1588 out:
1589 RtlLeaveCriticalSection( &waitqueue.cs );
1590 return status;
1591 }
1592
1593 /***********************************************************************
1594 * tp_waitqueue_unlock (internal)
1595 */
tp_waitqueue_unlock(struct threadpool_object * wait)1596 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1597 {
1598 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1599
1600 RtlEnterCriticalSection( &waitqueue.cs );
1601 if (wait->u.wait.bucket)
1602 {
1603 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1604 assert( bucket->objcount > 0 );
1605
1606 list_remove( &wait->u.wait.wait_entry );
1607 wait->u.wait.bucket = NULL;
1608 bucket->objcount--;
1609
1610 NtSetEvent( bucket->update_event, NULL );
1611 }
1612 RtlLeaveCriticalSection( &waitqueue.cs );
1613 }
1614
1615 #ifdef __REACTOS__
ioqueue_thread_proc(PVOID param)1616 ULONG NTAPI ioqueue_thread_proc(PVOID param )
1617 #else
1618 static void CALLBACK ioqueue_thread_proc( void *param )
1619 #endif
1620 {
1621 struct io_completion *completion;
1622 struct threadpool_object *io;
1623 IO_STATUS_BLOCK iosb;
1624 #ifdef __REACTOS__
1625 PVOID key, value;
1626 #else
1627 ULONG_PTR key, value;
1628 #endif
1629 BOOL destroy, skip;
1630 NTSTATUS status;
1631
1632 TRACE( "starting I/O completion thread\n" );
1633 set_thread_name(L"wine_threadpool_ioqueue");
1634
1635 RtlEnterCriticalSection( &ioqueue.cs );
1636
1637 for (;;)
1638 {
1639 RtlLeaveCriticalSection( &ioqueue.cs );
1640 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1641 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1642 RtlEnterCriticalSection( &ioqueue.cs );
1643
1644 destroy = skip = FALSE;
1645 io = (struct threadpool_object *)key;
1646
1647 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1648
1649 if (io && (io->shutdown || io->u.io.shutting_down))
1650 {
1651 RtlEnterCriticalSection( &io->pool->cs );
1652 if (!io->u.io.pending_count)
1653 {
1654 if (io->u.io.skipped_count)
1655 --io->u.io.skipped_count;
1656
1657 if (io->u.io.skipped_count)
1658 skip = TRUE;
1659 else
1660 destroy = TRUE;
1661 }
1662 RtlLeaveCriticalSection( &io->pool->cs );
1663 if (skip) continue;
1664 }
1665
1666 if (destroy)
1667 {
1668 --ioqueue.objcount;
1669 TRACE( "Releasing io %p.\n", io );
1670 io->shutdown = TRUE;
1671 tp_object_release( io );
1672 }
1673 else if (io)
1674 {
1675 RtlEnterCriticalSection( &io->pool->cs );
1676
1677 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1678
1679 if (io->u.io.pending_count)
1680 {
1681 --io->u.io.pending_count;
1682 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1683 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1684 {
1685 ERR( "Failed to allocate memory.\n" );
1686 RtlLeaveCriticalSection( &io->pool->cs );
1687 continue;
1688 }
1689
1690 completion = &io->u.io.completions[io->u.io.completion_count++];
1691 completion->iosb = iosb;
1692 #ifdef __REACTOS__
1693 completion->cvalue = (ULONG_PTR)value;
1694 #else
1695 completion->cvalue = value;
1696 #endif
1697
1698 tp_object_submit( io, FALSE );
1699 }
1700 RtlLeaveCriticalSection( &io->pool->cs );
1701 }
1702
1703 if (!ioqueue.objcount)
1704 {
1705 /* All I/O objects have been destroyed; if no new objects are
1706 * created within some amount of time, then we can shutdown this
1707 * thread. */
1708 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1709 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1710 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1711 break;
1712 }
1713 }
1714
1715 ioqueue.thread_running = FALSE;
1716 RtlLeaveCriticalSection( &ioqueue.cs );
1717
1718 TRACE( "terminating I/O completion thread\n" );
1719
1720 RtlExitUserThread( 0 );
1721
1722 #ifdef __REACTOS__
1723 return STATUS_SUCCESS;
1724 #endif
1725 }
1726
tp_ioqueue_lock(struct threadpool_object * io,HANDLE file)1727 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1728 {
1729 NTSTATUS status = STATUS_SUCCESS;
1730
1731 assert( io->type == TP_OBJECT_TYPE_IO );
1732
1733 RtlEnterCriticalSection( &ioqueue.cs );
1734
1735 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1736 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1737 {
1738 RtlLeaveCriticalSection( &ioqueue.cs );
1739 return status;
1740 }
1741
1742 if (!ioqueue.thread_running)
1743 {
1744 HANDLE thread;
1745
1746 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1747 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1748 {
1749 ioqueue.thread_running = TRUE;
1750 NtClose( thread );
1751 }
1752 }
1753
1754 if (status == STATUS_SUCCESS)
1755 {
1756 FILE_COMPLETION_INFORMATION info;
1757 IO_STATUS_BLOCK iosb;
1758
1759 #ifdef __REACTOS__
1760 info.Port = ioqueue.port;
1761 info.Key = io;
1762 #else
1763 info.CompletionPort = ioqueue.port;
1764 info.CompletionKey = (ULONG_PTR)io;
1765 #endif
1766
1767 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1768 }
1769
1770 if (status == STATUS_SUCCESS)
1771 {
1772 if (!ioqueue.objcount++)
1773 RtlWakeConditionVariable( &ioqueue.update_event );
1774 }
1775
1776 RtlLeaveCriticalSection( &ioqueue.cs );
1777 return status;
1778 }
1779
1780 /***********************************************************************
1781 * tp_threadpool_alloc (internal)
1782 *
1783 * Allocates a new threadpool object.
1784 */
tp_threadpool_alloc(struct threadpool ** out)1785 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1786 {
1787 #ifdef __REACTOS__
1788 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1789 #else
1790 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1791 #endif
1792 struct threadpool *pool;
1793 unsigned int i;
1794
1795 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1796 if (!pool)
1797 return STATUS_NO_MEMORY;
1798
1799 pool->refcount = 1;
1800 pool->objcount = 0;
1801 pool->shutdown = FALSE;
1802
1803 #ifdef __REACTOS__
1804 RtlInitializeCriticalSection( &pool->cs );
1805 #else
1806 RtlInitializeCriticalSectionEx( &pool->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1807
1808 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1809 #endif
1810
1811 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1812 list_init( &pool->pools[i] );
1813 RtlInitializeConditionVariable( &pool->update_event );
1814
1815 pool->max_workers = 500;
1816 pool->min_workers = 0;
1817 pool->num_workers = 0;
1818 pool->num_busy_workers = 0;
1819 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1820 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1821
1822 TRACE( "allocated threadpool %p\n", pool );
1823
1824 *out = pool;
1825 return STATUS_SUCCESS;
1826 }
1827
1828 /***********************************************************************
1829 * tp_threadpool_shutdown (internal)
1830 *
1831 * Prepares the shutdown of a threadpool object and notifies all worker
1832 * threads to terminate (after all remaining work items have been
1833 * processed).
1834 */
tp_threadpool_shutdown(struct threadpool * pool)1835 static void tp_threadpool_shutdown( struct threadpool *pool )
1836 {
1837 assert( pool != default_threadpool );
1838
1839 pool->shutdown = TRUE;
1840 RtlWakeAllConditionVariable( &pool->update_event );
1841 }
1842
1843 /***********************************************************************
1844 * tp_threadpool_release (internal)
1845 *
1846 * Releases a reference to a threadpool object.
1847 */
tp_threadpool_release(struct threadpool * pool)1848 static BOOL tp_threadpool_release( struct threadpool *pool )
1849 {
1850 unsigned int i;
1851
1852 if (InterlockedDecrement( &pool->refcount ))
1853 return FALSE;
1854
1855 TRACE( "destroying threadpool %p\n", pool );
1856
1857 assert( pool->shutdown );
1858 assert( !pool->objcount );
1859 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1860 assert( list_empty( &pool->pools[i] ) );
1861 #ifndef __REACTOS__
1862 pool->cs.DebugInfo->Spare[0] = 0;
1863 #endif
1864 RtlDeleteCriticalSection( &pool->cs );
1865
1866 RtlFreeHeap( GetProcessHeap(), 0, pool );
1867 return TRUE;
1868 }
1869
1870 /***********************************************************************
1871 * tp_threadpool_lock (internal)
1872 *
1873 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1874 * block. When the lock is acquired successfully, it is guaranteed that
1875 * there is at least one worker thread to process tasks.
1876 */
tp_threadpool_lock(struct threadpool ** out,TP_CALLBACK_ENVIRON * environment)1877 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1878 {
1879 struct threadpool *pool = NULL;
1880 NTSTATUS status = STATUS_SUCCESS;
1881
1882 if (environment)
1883 {
1884 #ifndef __REACTOS__ //Windows 7 stuff
1885 /* Validate environment parameters. */
1886 if (environment->Version == 3)
1887 {
1888 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1889
1890 switch (environment3->CallbackPriority)
1891 {
1892 case TP_CALLBACK_PRIORITY_HIGH:
1893 case TP_CALLBACK_PRIORITY_NORMAL:
1894 case TP_CALLBACK_PRIORITY_LOW:
1895 break;
1896 default:
1897 return STATUS_INVALID_PARAMETER;
1898 }
1899 }
1900 #endif
1901 pool = (struct threadpool *)environment->Pool;
1902 }
1903
1904 if (!pool)
1905 {
1906 if (!default_threadpool)
1907 {
1908 status = tp_threadpool_alloc( &pool );
1909 if (status != STATUS_SUCCESS)
1910 return status;
1911
1912 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1913 {
1914 tp_threadpool_shutdown( pool );
1915 tp_threadpool_release( pool );
1916 }
1917 }
1918
1919 pool = default_threadpool;
1920 }
1921
1922 RtlEnterCriticalSection( &pool->cs );
1923
1924 /* Make sure that the threadpool has at least one thread. */
1925 if (!pool->num_workers)
1926 status = tp_new_worker_thread( pool );
1927
1928 /* Keep a reference, and increment objcount to ensure that the
1929 * last thread doesn't terminate. */
1930 if (status == STATUS_SUCCESS)
1931 {
1932 InterlockedIncrement( &pool->refcount );
1933 pool->objcount++;
1934 }
1935
1936 RtlLeaveCriticalSection( &pool->cs );
1937
1938 if (status != STATUS_SUCCESS)
1939 return status;
1940
1941 *out = pool;
1942 return STATUS_SUCCESS;
1943 }
1944
1945 /***********************************************************************
1946 * tp_threadpool_unlock (internal)
1947 *
1948 * Releases a lock on a threadpool.
1949 */
tp_threadpool_unlock(struct threadpool * pool)1950 static void tp_threadpool_unlock( struct threadpool *pool )
1951 {
1952 RtlEnterCriticalSection( &pool->cs );
1953 pool->objcount--;
1954 RtlLeaveCriticalSection( &pool->cs );
1955 tp_threadpool_release( pool );
1956 }
1957
1958 /***********************************************************************
1959 * tp_group_alloc (internal)
1960 *
1961 * Allocates a new threadpool group object.
1962 */
tp_group_alloc(struct threadpool_group ** out)1963 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1964 {
1965 struct threadpool_group *group;
1966
1967 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1968 if (!group)
1969 return STATUS_NO_MEMORY;
1970
1971 group->refcount = 1;
1972 group->shutdown = FALSE;
1973
1974 #ifdef __REACTOS__
1975 RtlInitializeCriticalSection( &group->cs );
1976 #else
1977 RtlInitializeCriticalSectionEx( &group->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1978
1979 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1980 #endif
1981
1982 list_init( &group->members );
1983
1984 TRACE( "allocated group %p\n", group );
1985
1986 *out = group;
1987 return STATUS_SUCCESS;
1988 }
1989
1990 /***********************************************************************
1991 * tp_group_shutdown (internal)
1992 *
1993 * Marks the group object for shutdown.
1994 */
tp_group_shutdown(struct threadpool_group * group)1995 static void tp_group_shutdown( struct threadpool_group *group )
1996 {
1997 group->shutdown = TRUE;
1998 }
1999
2000 /***********************************************************************
2001 * tp_group_release (internal)
2002 *
2003 * Releases a reference to a group object.
2004 */
tp_group_release(struct threadpool_group * group)2005 static BOOL tp_group_release( struct threadpool_group *group )
2006 {
2007 if (InterlockedDecrement( &group->refcount ))
2008 return FALSE;
2009
2010 TRACE( "destroying group %p\n", group );
2011
2012 assert( group->shutdown );
2013 assert( list_empty( &group->members ) );
2014
2015 #ifndef __REACTOS__
2016 group->cs.DebugInfo->Spare[0] = 0;
2017 #endif
2018 RtlDeleteCriticalSection( &group->cs );
2019
2020 RtlFreeHeap( GetProcessHeap(), 0, group );
2021 return TRUE;
2022 }
2023
2024 /***********************************************************************
2025 * tp_object_initialize (internal)
2026 *
2027 * Initializes members of a threadpool object.
2028 */
tp_object_initialize(struct threadpool_object * object,struct threadpool * pool,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2029 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2030 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2031 {
2032 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2033
2034 object->refcount = 1;
2035 object->shutdown = FALSE;
2036
2037 object->pool = pool;
2038 object->group = NULL;
2039 object->userdata = userdata;
2040 object->group_cancel_callback = NULL;
2041 object->finalization_callback = NULL;
2042 object->may_run_long = 0;
2043 object->race_dll = NULL;
2044 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2045
2046 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2047 object->is_group_member = FALSE;
2048
2049 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2050 RtlInitializeConditionVariable( &object->finished_event );
2051 RtlInitializeConditionVariable( &object->group_finished_event );
2052 object->completed_event = NULL;
2053 object->num_pending_callbacks = 0;
2054 object->num_running_callbacks = 0;
2055 object->num_associated_callbacks = 0;
2056
2057 if (environment)
2058 {
2059 if (environment->Version != 1 && environment->Version != 3)
2060 FIXME( "unsupported environment version %lu\n", environment->Version );
2061
2062 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2063 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2064 object->finalization_callback = environment->FinalizationCallback;
2065 object->may_run_long = environment->u.s.LongFunction != 0;
2066 object->race_dll = environment->RaceDll;
2067 #ifndef __REACTOS__ //Windows 7 stuff
2068 if (environment->Version == 3)
2069 {
2070 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2071
2072 object->priority = environment_v3->CallbackPriority;
2073 assert( object->priority < ARRAY_SIZE(pool->pools) );
2074 }
2075 #endif
2076 if (environment->ActivationContext)
2077 FIXME( "activation context not supported yet\n" );
2078
2079 if (environment->u.s.Persistent)
2080 FIXME( "persistent threads not supported yet\n" );
2081 }
2082
2083 if (object->race_dll)
2084 LdrAddRefDll( 0, object->race_dll );
2085
2086 TRACE( "allocated object %p of type %u\n", object, object->type );
2087
2088 /* For simple callbacks we have to run tp_object_submit before adding this object
2089 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2090 * will be set, and tp_object_submit would fail with an assertion. */
2091
2092 if (is_simple_callback)
2093 tp_object_submit( object, FALSE );
2094
2095 if (object->group)
2096 {
2097 struct threadpool_group *group = object->group;
2098 InterlockedIncrement( &group->refcount );
2099
2100 RtlEnterCriticalSection( &group->cs );
2101 list_add_tail( &group->members, &object->group_entry );
2102 object->is_group_member = TRUE;
2103 RtlLeaveCriticalSection( &group->cs );
2104 }
2105
2106 if (is_simple_callback)
2107 tp_object_release( object );
2108 }
2109
tp_object_prio_queue(struct threadpool_object * object)2110 static void tp_object_prio_queue( struct threadpool_object *object )
2111 {
2112 ++object->pool->num_busy_workers;
2113 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2114 }
2115
2116 /***********************************************************************
2117 * tp_object_submit (internal)
2118 *
2119 * Submits a threadpool object to the associated threadpool. This
2120 * function has to be VOID because TpPostWork can never fail on Windows.
2121 */
tp_object_submit(struct threadpool_object * object,BOOL signaled)2122 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2123 {
2124 struct threadpool *pool = object->pool;
2125 NTSTATUS status = STATUS_UNSUCCESSFUL;
2126
2127 assert( !object->shutdown );
2128 assert( !pool->shutdown );
2129
2130 RtlEnterCriticalSection( &pool->cs );
2131
2132 /* Start new worker threads if required. */
2133 if (pool->num_busy_workers >= pool->num_workers &&
2134 pool->num_workers < pool->max_workers)
2135 status = tp_new_worker_thread( pool );
2136
2137 /* Queue work item and increment refcount. */
2138 InterlockedIncrement( &object->refcount );
2139 if (!object->num_pending_callbacks++)
2140 tp_object_prio_queue( object );
2141
2142 /* Count how often the object was signaled. */
2143 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2144 object->u.wait.signaled++;
2145
2146 /* No new thread started - wake up one existing thread. */
2147 if (status != STATUS_SUCCESS)
2148 {
2149 assert( pool->num_workers > 0 );
2150 RtlWakeConditionVariable( &pool->update_event );
2151 }
2152
2153 RtlLeaveCriticalSection( &pool->cs );
2154 }
2155
2156 /***********************************************************************
2157 * tp_object_cancel (internal)
2158 *
2159 * Cancels all currently pending callbacks for a specific object.
2160 */
tp_object_cancel(struct threadpool_object * object)2161 static void tp_object_cancel( struct threadpool_object *object )
2162 {
2163 struct threadpool *pool = object->pool;
2164 LONG pending_callbacks = 0;
2165
2166 RtlEnterCriticalSection( &pool->cs );
2167 if (object->num_pending_callbacks)
2168 {
2169 pending_callbacks = object->num_pending_callbacks;
2170 object->num_pending_callbacks = 0;
2171 list_remove( &object->pool_entry );
2172
2173 if (object->type == TP_OBJECT_TYPE_WAIT)
2174 object->u.wait.signaled = 0;
2175 }
2176 if (object->type == TP_OBJECT_TYPE_IO)
2177 {
2178 object->u.io.skipped_count += object->u.io.pending_count;
2179 object->u.io.pending_count = 0;
2180 }
2181 RtlLeaveCriticalSection( &pool->cs );
2182
2183 while (pending_callbacks--)
2184 tp_object_release( object );
2185 }
2186
object_is_finished(struct threadpool_object * object,BOOL group)2187 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2188 {
2189 if (object->num_pending_callbacks)
2190 return FALSE;
2191 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2192 return FALSE;
2193
2194 if (group)
2195 return !object->num_running_callbacks;
2196 else
2197 return !object->num_associated_callbacks;
2198 }
2199
2200 /***********************************************************************
2201 * tp_object_wait (internal)
2202 *
2203 * Waits until all pending and running callbacks of a specific object
2204 * have been processed.
2205 */
tp_object_wait(struct threadpool_object * object,BOOL group_wait)2206 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2207 {
2208 struct threadpool *pool = object->pool;
2209
2210 RtlEnterCriticalSection( &pool->cs );
2211 while (!object_is_finished( object, group_wait ))
2212 {
2213 if (group_wait)
2214 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2215 else
2216 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2217 }
2218 RtlLeaveCriticalSection( &pool->cs );
2219 }
2220
tp_ioqueue_unlock(struct threadpool_object * io)2221 static void tp_ioqueue_unlock( struct threadpool_object *io )
2222 {
2223 assert( io->type == TP_OBJECT_TYPE_IO );
2224
2225 RtlEnterCriticalSection( &ioqueue.cs );
2226
2227 assert(ioqueue.objcount);
2228
2229 if (!io->shutdown && !--ioqueue.objcount)
2230 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2231
2232 RtlLeaveCriticalSection( &ioqueue.cs );
2233 }
2234
2235 /***********************************************************************
2236 * tp_object_prepare_shutdown (internal)
2237 *
2238 * Prepares a threadpool object for shutdown.
2239 */
tp_object_prepare_shutdown(struct threadpool_object * object)2240 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2241 {
2242 if (object->type == TP_OBJECT_TYPE_TIMER)
2243 tp_timerqueue_unlock( object );
2244 else if (object->type == TP_OBJECT_TYPE_WAIT)
2245 tp_waitqueue_unlock( object );
2246 else if (object->type == TP_OBJECT_TYPE_IO)
2247 tp_ioqueue_unlock( object );
2248 }
2249
2250 /***********************************************************************
2251 * tp_object_release (internal)
2252 *
2253 * Releases a reference to a threadpool object.
2254 */
tp_object_release(struct threadpool_object * object)2255 static BOOL tp_object_release( struct threadpool_object *object )
2256 {
2257 if (InterlockedDecrement( &object->refcount ))
2258 return FALSE;
2259
2260 TRACE( "destroying object %p of type %u\n", object, object->type );
2261
2262 assert( object->shutdown );
2263 assert( !object->num_pending_callbacks );
2264 assert( !object->num_running_callbacks );
2265 assert( !object->num_associated_callbacks );
2266
2267 /* release reference to the group */
2268 if (object->group)
2269 {
2270 struct threadpool_group *group = object->group;
2271
2272 RtlEnterCriticalSection( &group->cs );
2273 if (object->is_group_member)
2274 {
2275 list_remove( &object->group_entry );
2276 object->is_group_member = FALSE;
2277 }
2278 RtlLeaveCriticalSection( &group->cs );
2279
2280 tp_group_release( group );
2281 }
2282
2283 tp_threadpool_unlock( object->pool );
2284
2285 if (object->race_dll)
2286 LdrUnloadDll( object->race_dll );
2287
2288 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2289 NtSetEvent( object->completed_event, NULL );
2290
2291 RtlFreeHeap( GetProcessHeap(), 0, object );
2292 return TRUE;
2293 }
2294
threadpool_get_next_item(const struct threadpool * pool)2295 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2296 {
2297 struct list *ptr;
2298 unsigned int i;
2299
2300 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2301 {
2302 if ((ptr = list_head( &pool->pools[i] )))
2303 break;
2304 }
2305
2306 return ptr;
2307 }
2308
2309 /***********************************************************************
2310 * tp_object_execute (internal)
2311 *
2312 * Executes a threadpool object callback, object->pool->cs has to be
2313 * held.
2314 */
tp_object_execute(struct threadpool_object * object,BOOL wait_thread)2315 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2316 {
2317 TP_CALLBACK_INSTANCE *callback_instance;
2318 struct threadpool_instance instance;
2319 struct io_completion completion;
2320 struct threadpool *pool = object->pool;
2321 TP_WAIT_RESULT wait_result = 0;
2322 NTSTATUS status;
2323
2324 object->num_pending_callbacks--;
2325
2326 /* For wait objects check if they were signaled or have timed out. */
2327 if (object->type == TP_OBJECT_TYPE_WAIT)
2328 {
2329 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2330 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2331 }
2332 else if (object->type == TP_OBJECT_TYPE_IO)
2333 {
2334 assert( object->u.io.completion_count );
2335 completion = object->u.io.completions[--object->u.io.completion_count];
2336 }
2337
2338 /* Leave critical section and do the actual callback. */
2339 object->num_associated_callbacks++;
2340 object->num_running_callbacks++;
2341 RtlLeaveCriticalSection( &pool->cs );
2342 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2343
2344 /* Initialize threadpool instance struct. */
2345 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2346 instance.object = object;
2347 instance.threadid = GetCurrentThreadId();
2348 instance.associated = TRUE;
2349 instance.may_run_long = object->may_run_long;
2350 instance.cleanup.critical_section = NULL;
2351 instance.cleanup.mutex = NULL;
2352 instance.cleanup.semaphore = NULL;
2353 instance.cleanup.semaphore_count = 0;
2354 instance.cleanup.event = NULL;
2355 instance.cleanup.library = NULL;
2356
2357 switch (object->type)
2358 {
2359 case TP_OBJECT_TYPE_SIMPLE:
2360 {
2361 TRACE( "executing simple callback %p(%p, %p)\n",
2362 object->u.simple.callback, callback_instance, object->userdata );
2363 object->u.simple.callback( callback_instance, object->userdata );
2364 TRACE( "callback %p returned\n", object->u.simple.callback );
2365 break;
2366 }
2367
2368 case TP_OBJECT_TYPE_WORK:
2369 {
2370 TRACE( "executing work callback %p(%p, %p, %p)\n",
2371 object->u.work.callback, callback_instance, object->userdata, object );
2372 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2373 TRACE( "callback %p returned\n", object->u.work.callback );
2374 break;
2375 }
2376
2377 case TP_OBJECT_TYPE_TIMER:
2378 {
2379 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2380 object->u.timer.callback, callback_instance, object->userdata, object );
2381 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2382 TRACE( "callback %p returned\n", object->u.timer.callback );
2383 break;
2384 }
2385
2386 case TP_OBJECT_TYPE_WAIT:
2387 {
2388 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2389 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2390 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2391 TRACE( "callback %p returned\n", object->u.wait.callback );
2392 break;
2393 }
2394
2395 case TP_OBJECT_TYPE_IO:
2396 {
2397 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2398 object->u.io.callback, callback_instance, object->userdata,
2399 completion.cvalue, &completion.iosb, (TP_IO *)object );
2400 object->u.io.callback( callback_instance, object->userdata,
2401 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2402 TRACE( "callback %p returned\n", object->u.io.callback );
2403 break;
2404 }
2405
2406 default:
2407 assert(0);
2408 break;
2409 }
2410
2411 /* Execute finalization callback. */
2412 if (object->finalization_callback)
2413 {
2414 TRACE( "executing finalization callback %p(%p, %p)\n",
2415 object->finalization_callback, callback_instance, object->userdata );
2416 object->finalization_callback( callback_instance, object->userdata );
2417 TRACE( "callback %p returned\n", object->finalization_callback );
2418 }
2419
2420 /* Execute cleanup tasks. */
2421 if (instance.cleanup.critical_section)
2422 {
2423 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2424 }
2425 if (instance.cleanup.mutex)
2426 {
2427 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2428 if (status != STATUS_SUCCESS) goto skip_cleanup;
2429 }
2430 if (instance.cleanup.semaphore)
2431 {
2432 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2433 if (status != STATUS_SUCCESS) goto skip_cleanup;
2434 }
2435 if (instance.cleanup.event)
2436 {
2437 status = NtSetEvent( instance.cleanup.event, NULL );
2438 if (status != STATUS_SUCCESS) goto skip_cleanup;
2439 }
2440 if (instance.cleanup.library)
2441 {
2442 LdrUnloadDll( instance.cleanup.library );
2443 }
2444
2445 skip_cleanup:
2446 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2447 RtlEnterCriticalSection( &pool->cs );
2448
2449 /* Simple callbacks are automatically shutdown after execution. */
2450 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2451 {
2452 tp_object_prepare_shutdown( object );
2453 object->shutdown = TRUE;
2454 }
2455
2456 object->num_running_callbacks--;
2457 if (object_is_finished( object, TRUE ))
2458 RtlWakeAllConditionVariable( &object->group_finished_event );
2459
2460 if (instance.associated)
2461 {
2462 object->num_associated_callbacks--;
2463 if (object_is_finished( object, FALSE ))
2464 RtlWakeAllConditionVariable( &object->finished_event );
2465 }
2466 }
2467
2468 /***********************************************************************
2469 * threadpool_worker_proc (internal)
2470 */
2471 #ifdef __REACTOS__
threadpool_worker_proc(PVOID param)2472 ULONG NTAPI threadpool_worker_proc(PVOID param )
2473 #else
2474 static void CALLBACK threadpool_worker_proc( void *param )
2475 #endif
2476 {
2477 struct threadpool *pool = param;
2478 LARGE_INTEGER timeout;
2479 struct list *ptr;
2480
2481 TRACE( "starting worker thread for pool %p\n", pool );
2482 set_thread_name(L"wine_threadpool_worker");
2483
2484 RtlEnterCriticalSection( &pool->cs );
2485 for (;;)
2486 {
2487 while ((ptr = threadpool_get_next_item( pool )))
2488 {
2489 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2490 assert( object->num_pending_callbacks > 0 );
2491
2492 /* If further pending callbacks are queued, move the work item to
2493 * the end of the pool list. Otherwise remove it from the pool. */
2494 list_remove( &object->pool_entry );
2495 if (object->num_pending_callbacks > 1)
2496 tp_object_prio_queue( object );
2497
2498 tp_object_execute( object, FALSE );
2499
2500 assert(pool->num_busy_workers);
2501 pool->num_busy_workers--;
2502
2503 tp_object_release( object );
2504 }
2505
2506 /* Shutdown worker thread if requested. */
2507 if (pool->shutdown)
2508 break;
2509
2510 /* Wait for new tasks or until the timeout expires. A thread only terminates
2511 * when no new tasks are available, and the number of threads can be
2512 * decreased without violating the min_workers limit. An exception is when
2513 * min_workers == 0, then objcount is used to detect if the last thread
2514 * can be terminated. */
2515 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2516 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2517 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2518 (!pool->min_workers && !pool->objcount)))
2519 {
2520 break;
2521 }
2522 }
2523 pool->num_workers--;
2524 RtlLeaveCriticalSection( &pool->cs );
2525
2526 TRACE( "terminating worker thread for pool %p\n", pool );
2527 tp_threadpool_release( pool );
2528 RtlExitUserThread( 0 );
2529 #ifdef __REACTOS__
2530 return STATUS_SUCCESS;
2531 #endif
2532 }
2533
2534 /***********************************************************************
2535 * TpAllocCleanupGroup (NTDLL.@)
2536 */
TpAllocCleanupGroup(TP_CLEANUP_GROUP ** out)2537 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2538 {
2539 TRACE( "%p\n", out );
2540
2541 return tp_group_alloc( (struct threadpool_group **)out );
2542 }
2543
2544 /***********************************************************************
2545 * TpAllocIoCompletion (NTDLL.@)
2546 */
TpAllocIoCompletion(TP_IO ** out,HANDLE file,PTP_IO_CALLBACK callback,void * userdata,TP_CALLBACK_ENVIRON * environment)2547 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2548 void *userdata, TP_CALLBACK_ENVIRON *environment )
2549 {
2550 struct threadpool_object *object;
2551 struct threadpool *pool;
2552 NTSTATUS status;
2553
2554 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2555
2556 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2557 return STATUS_NO_MEMORY;
2558
2559 if ((status = tp_threadpool_lock( &pool, environment )))
2560 {
2561 RtlFreeHeap( GetProcessHeap(), 0, object );
2562 return status;
2563 }
2564
2565 object->type = TP_OBJECT_TYPE_IO;
2566 object->u.io.callback = callback;
2567 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2568 {
2569 tp_threadpool_unlock( pool );
2570 RtlFreeHeap( GetProcessHeap(), 0, object );
2571 return status;
2572 }
2573
2574 if ((status = tp_ioqueue_lock( object, file )))
2575 {
2576 tp_threadpool_unlock( pool );
2577 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2578 RtlFreeHeap( GetProcessHeap(), 0, object );
2579 return status;
2580 }
2581
2582 tp_object_initialize( object, pool, userdata, environment );
2583
2584 *out = (TP_IO *)object;
2585 return STATUS_SUCCESS;
2586 }
2587
2588 /***********************************************************************
2589 * TpAllocPool (NTDLL.@)
2590 */
TpAllocPool(TP_POOL ** out,PVOID reserved)2591 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2592 {
2593 TRACE( "%p %p\n", out, reserved );
2594
2595 if (reserved)
2596 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2597
2598 return tp_threadpool_alloc( (struct threadpool **)out );
2599 }
2600
2601 /***********************************************************************
2602 * TpAllocTimer (NTDLL.@)
2603 */
TpAllocTimer(TP_TIMER ** out,PTP_TIMER_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2604 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2605 TP_CALLBACK_ENVIRON *environment )
2606 {
2607 struct threadpool_object *object;
2608 struct threadpool *pool;
2609 NTSTATUS status;
2610
2611 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2612
2613 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2614 if (!object)
2615 return STATUS_NO_MEMORY;
2616
2617 status = tp_threadpool_lock( &pool, environment );
2618 if (status)
2619 {
2620 RtlFreeHeap( GetProcessHeap(), 0, object );
2621 return status;
2622 }
2623
2624 object->type = TP_OBJECT_TYPE_TIMER;
2625 object->u.timer.callback = callback;
2626
2627 status = tp_timerqueue_lock( object );
2628 if (status)
2629 {
2630 tp_threadpool_unlock( pool );
2631 RtlFreeHeap( GetProcessHeap(), 0, object );
2632 return status;
2633 }
2634
2635 tp_object_initialize( object, pool, userdata, environment );
2636
2637 *out = (TP_TIMER *)object;
2638 return STATUS_SUCCESS;
2639 }
2640
tp_alloc_wait(TP_WAIT ** out,PTP_WAIT_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment,DWORD flags)2641 static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2642 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2643 {
2644 struct threadpool_object *object;
2645 struct threadpool *pool;
2646 NTSTATUS status;
2647
2648 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2649 if (!object)
2650 return STATUS_NO_MEMORY;
2651
2652 status = tp_threadpool_lock( &pool, environment );
2653 if (status)
2654 {
2655 RtlFreeHeap( GetProcessHeap(), 0, object );
2656 return status;
2657 }
2658
2659 object->type = TP_OBJECT_TYPE_WAIT;
2660 object->u.wait.callback = callback;
2661 object->u.wait.flags = flags;
2662
2663 status = tp_waitqueue_lock( object );
2664 if (status)
2665 {
2666 tp_threadpool_unlock( pool );
2667 RtlFreeHeap( GetProcessHeap(), 0, object );
2668 return status;
2669 }
2670
2671 tp_object_initialize( object, pool, userdata, environment );
2672
2673 *out = (TP_WAIT *)object;
2674 return STATUS_SUCCESS;
2675 }
2676
2677 /***********************************************************************
2678 * TpAllocWait (NTDLL.@)
2679 */
TpAllocWait(TP_WAIT ** out,PTP_WAIT_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2680 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2681 TP_CALLBACK_ENVIRON *environment )
2682 {
2683 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2684 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2685 }
2686
2687 /***********************************************************************
2688 * TpAllocWork (NTDLL.@)
2689 */
TpAllocWork(TP_WORK ** out,PTP_WORK_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)2690 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2691 TP_CALLBACK_ENVIRON *environment )
2692 {
2693 struct threadpool_object *object;
2694 struct threadpool *pool;
2695 NTSTATUS status;
2696
2697 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2698
2699 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2700 if (!object)
2701 return STATUS_NO_MEMORY;
2702
2703 status = tp_threadpool_lock( &pool, environment );
2704 if (status)
2705 {
2706 RtlFreeHeap( GetProcessHeap(), 0, object );
2707 return status;
2708 }
2709
2710 object->type = TP_OBJECT_TYPE_WORK;
2711 object->u.work.callback = callback;
2712 tp_object_initialize( object, pool, userdata, environment );
2713
2714 *out = (TP_WORK *)object;
2715 return STATUS_SUCCESS;
2716 }
2717
2718 /***********************************************************************
2719 * TpCancelAsyncIoOperation (NTDLL.@)
2720 */
TpCancelAsyncIoOperation(TP_IO * io)2721 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2722 {
2723 struct threadpool_object *this = impl_from_TP_IO( io );
2724
2725 TRACE( "%p\n", io );
2726
2727 RtlEnterCriticalSection( &this->pool->cs );
2728
2729 TRACE("pending_count %u.\n", this->u.io.pending_count);
2730
2731 this->u.io.pending_count--;
2732 if (object_is_finished( this, TRUE ))
2733 RtlWakeAllConditionVariable( &this->group_finished_event );
2734 if (object_is_finished( this, FALSE ))
2735 RtlWakeAllConditionVariable( &this->finished_event );
2736
2737 RtlLeaveCriticalSection( &this->pool->cs );
2738 }
2739
2740 /***********************************************************************
2741 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2742 */
TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE * instance,CRITICAL_SECTION * crit)2743 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2744 {
2745 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2746
2747 TRACE( "%p %p\n", instance, crit );
2748
2749 if (!this->cleanup.critical_section)
2750 this->cleanup.critical_section = crit;
2751 }
2752
2753 /***********************************************************************
2754 * TpCallbackMayRunLong (NTDLL.@)
2755 */
TpCallbackMayRunLong(TP_CALLBACK_INSTANCE * instance)2756 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2757 {
2758 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2759 struct threadpool_object *object = this->object;
2760 struct threadpool *pool;
2761 NTSTATUS status = STATUS_SUCCESS;
2762
2763 TRACE( "%p\n", instance );
2764
2765 if (this->threadid != GetCurrentThreadId())
2766 {
2767 ERR("called from wrong thread, ignoring\n");
2768 return STATUS_UNSUCCESSFUL; /* FIXME */
2769 }
2770
2771 if (this->may_run_long)
2772 return STATUS_SUCCESS;
2773
2774 pool = object->pool;
2775 RtlEnterCriticalSection( &pool->cs );
2776
2777 /* Start new worker threads if required. */
2778 if (pool->num_busy_workers >= pool->num_workers)
2779 {
2780 if (pool->num_workers < pool->max_workers)
2781 {
2782 status = tp_new_worker_thread( pool );
2783 }
2784 else
2785 {
2786 status = STATUS_TOO_MANY_THREADS;
2787 }
2788 }
2789
2790 RtlLeaveCriticalSection( &pool->cs );
2791 this->may_run_long = TRUE;
2792 return status;
2793 }
2794
2795 /***********************************************************************
2796 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2797 */
TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE mutex)2798 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2799 {
2800 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2801
2802 TRACE( "%p %p\n", instance, mutex );
2803
2804 if (!this->cleanup.mutex)
2805 this->cleanup.mutex = mutex;
2806 }
2807
2808 /***********************************************************************
2809 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2810 */
TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE semaphore,DWORD count)2811 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2812 {
2813 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2814
2815 TRACE( "%p %p %lu\n", instance, semaphore, count );
2816
2817 if (!this->cleanup.semaphore)
2818 {
2819 this->cleanup.semaphore = semaphore;
2820 this->cleanup.semaphore_count = count;
2821 }
2822 }
2823
2824 /***********************************************************************
2825 * TpCallbackSetEventOnCompletion (NTDLL.@)
2826 */
TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE * instance,HANDLE event)2827 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2828 {
2829 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2830
2831 TRACE( "%p %p\n", instance, event );
2832
2833 if (!this->cleanup.event)
2834 this->cleanup.event = event;
2835 }
2836
2837 /***********************************************************************
2838 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2839 */
TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE * instance,HMODULE module)2840 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2841 {
2842 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2843
2844 TRACE( "%p %p\n", instance, module );
2845
2846 if (!this->cleanup.library)
2847 this->cleanup.library = module;
2848 }
2849
2850 /***********************************************************************
2851 * TpDisassociateCallback (NTDLL.@)
2852 */
TpDisassociateCallback(TP_CALLBACK_INSTANCE * instance)2853 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2854 {
2855 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2856 struct threadpool_object *object = this->object;
2857 struct threadpool *pool;
2858
2859 TRACE( "%p\n", instance );
2860
2861 if (this->threadid != GetCurrentThreadId())
2862 {
2863 ERR("called from wrong thread, ignoring\n");
2864 return;
2865 }
2866
2867 if (!this->associated)
2868 return;
2869
2870 pool = object->pool;
2871 RtlEnterCriticalSection( &pool->cs );
2872
2873 object->num_associated_callbacks--;
2874 if (object_is_finished( object, FALSE ))
2875 RtlWakeAllConditionVariable( &object->finished_event );
2876
2877 RtlLeaveCriticalSection( &pool->cs );
2878 this->associated = FALSE;
2879 }
2880
2881 /***********************************************************************
2882 * TpIsTimerSet (NTDLL.@)
2883 */
TpIsTimerSet(TP_TIMER * timer)2884 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2885 {
2886 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2887
2888 TRACE( "%p\n", timer );
2889
2890 return this->u.timer.timer_set;
2891 }
2892
2893 /***********************************************************************
2894 * TpPostWork (NTDLL.@)
2895 */
TpPostWork(TP_WORK * work)2896 VOID WINAPI TpPostWork( TP_WORK *work )
2897 {
2898 struct threadpool_object *this = impl_from_TP_WORK( work );
2899
2900 TRACE( "%p\n", work );
2901
2902 tp_object_submit( this, FALSE );
2903 }
2904
2905 /***********************************************************************
2906 * TpReleaseCleanupGroup (NTDLL.@)
2907 */
TpReleaseCleanupGroup(TP_CLEANUP_GROUP * group)2908 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2909 {
2910 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2911
2912 TRACE( "%p\n", group );
2913
2914 tp_group_shutdown( this );
2915 tp_group_release( this );
2916 }
2917
2918 /***********************************************************************
2919 * TpReleaseCleanupGroupMembers (NTDLL.@)
2920 */
TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP * group,BOOL cancel_pending,PVOID userdata)2921 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2922 {
2923 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2924 struct threadpool_object *object, *next;
2925 struct list members;
2926
2927 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2928
2929 RtlEnterCriticalSection( &this->cs );
2930
2931 /* Unset group, increase references, and mark objects for shutdown */
2932 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2933 {
2934 assert( object->group == this );
2935 assert( object->is_group_member );
2936
2937 if (InterlockedIncrement( &object->refcount ) == 1)
2938 {
2939 /* Object is basically already destroyed, but group reference
2940 * was not deleted yet. We can safely ignore this object. */
2941 InterlockedDecrement( &object->refcount );
2942 list_remove( &object->group_entry );
2943 object->is_group_member = FALSE;
2944 continue;
2945 }
2946
2947 object->is_group_member = FALSE;
2948 tp_object_prepare_shutdown( object );
2949 }
2950
2951 /* Move members to a new temporary list */
2952 list_init( &members );
2953 list_move_tail( &members, &this->members );
2954
2955 RtlLeaveCriticalSection( &this->cs );
2956
2957 /* Cancel pending callbacks if requested */
2958 if (cancel_pending)
2959 {
2960 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2961 {
2962 tp_object_cancel( object );
2963 }
2964 }
2965
2966 /* Wait for remaining callbacks to finish */
2967 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2968 {
2969 tp_object_wait( object, TRUE );
2970
2971 if (!object->shutdown)
2972 {
2973 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2974 if (cancel_pending && object->group_cancel_callback)
2975 {
2976 TRACE( "executing group cancel callback %p(%p, %p)\n",
2977 object->group_cancel_callback, object->userdata, userdata );
2978 object->group_cancel_callback( object->userdata, userdata );
2979 TRACE( "callback %p returned\n", object->group_cancel_callback );
2980 }
2981
2982 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2983 tp_object_release( object );
2984 }
2985
2986 object->shutdown = TRUE;
2987 tp_object_release( object );
2988 }
2989 }
2990
2991 /***********************************************************************
2992 * TpReleaseIoCompletion (NTDLL.@)
2993 */
TpReleaseIoCompletion(TP_IO * io)2994 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2995 {
2996 struct threadpool_object *this = impl_from_TP_IO( io );
2997 BOOL can_destroy;
2998
2999 TRACE( "%p\n", io );
3000
3001 RtlEnterCriticalSection( &this->pool->cs );
3002 this->u.io.shutting_down = TRUE;
3003 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3004 RtlLeaveCriticalSection( &this->pool->cs );
3005
3006 if (can_destroy)
3007 {
3008 tp_object_prepare_shutdown( this );
3009 this->shutdown = TRUE;
3010 tp_object_release( this );
3011 }
3012 }
3013
3014 /***********************************************************************
3015 * TpReleasePool (NTDLL.@)
3016 */
TpReleasePool(TP_POOL * pool)3017 VOID WINAPI TpReleasePool( TP_POOL *pool )
3018 {
3019 struct threadpool *this = impl_from_TP_POOL( pool );
3020
3021 TRACE( "%p\n", pool );
3022
3023 tp_threadpool_shutdown( this );
3024 tp_threadpool_release( this );
3025 }
3026
3027 /***********************************************************************
3028 * TpReleaseTimer (NTDLL.@)
3029 */
TpReleaseTimer(TP_TIMER * timer)3030 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
3031 {
3032 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3033
3034 TRACE( "%p\n", timer );
3035
3036 tp_object_prepare_shutdown( this );
3037 this->shutdown = TRUE;
3038 tp_object_release( this );
3039 }
3040
3041 /***********************************************************************
3042 * TpReleaseWait (NTDLL.@)
3043 */
TpReleaseWait(TP_WAIT * wait)3044 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
3045 {
3046 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3047
3048 TRACE( "%p\n", wait );
3049
3050 tp_object_prepare_shutdown( this );
3051 this->shutdown = TRUE;
3052 tp_object_release( this );
3053 }
3054
3055 /***********************************************************************
3056 * TpReleaseWork (NTDLL.@)
3057 */
TpReleaseWork(TP_WORK * work)3058 VOID WINAPI TpReleaseWork( TP_WORK *work )
3059 {
3060 struct threadpool_object *this = impl_from_TP_WORK( work );
3061
3062 TRACE( "%p\n", work );
3063
3064 tp_object_prepare_shutdown( this );
3065 this->shutdown = TRUE;
3066 tp_object_release( this );
3067 }
3068
3069 /***********************************************************************
3070 * TpSetPoolMaxThreads (NTDLL.@)
3071 */
TpSetPoolMaxThreads(TP_POOL * pool,DWORD maximum)3072 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
3073 {
3074 struct threadpool *this = impl_from_TP_POOL( pool );
3075
3076 TRACE( "%p %lu\n", pool, maximum );
3077
3078 RtlEnterCriticalSection( &this->cs );
3079 this->max_workers = max( maximum, 1 );
3080 this->min_workers = min( this->min_workers, this->max_workers );
3081 RtlLeaveCriticalSection( &this->cs );
3082 }
3083
3084 /***********************************************************************
3085 * TpSetPoolMinThreads (NTDLL.@)
3086 */
TpSetPoolMinThreads(TP_POOL * pool,DWORD minimum)3087 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
3088 {
3089 struct threadpool *this = impl_from_TP_POOL( pool );
3090 NTSTATUS status = STATUS_SUCCESS;
3091
3092 TRACE( "%p %lu\n", pool, minimum );
3093
3094 RtlEnterCriticalSection( &this->cs );
3095
3096 while (this->num_workers < minimum)
3097 {
3098 status = tp_new_worker_thread( this );
3099 if (status != STATUS_SUCCESS)
3100 break;
3101 }
3102
3103 if (status == STATUS_SUCCESS)
3104 {
3105 this->min_workers = minimum;
3106 this->max_workers = max( this->min_workers, this->max_workers );
3107 }
3108
3109 RtlLeaveCriticalSection( &this->cs );
3110 return !status;
3111 }
3112
3113 /***********************************************************************
3114 * TpSetTimer (NTDLL.@)
3115 */
TpSetTimer(TP_TIMER * timer,LARGE_INTEGER * timeout,LONG period,LONG window_length)3116 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3117 {
3118 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3119 struct threadpool_object *other_timer;
3120 BOOL submit_timer = FALSE;
3121 ULONGLONG timestamp;
3122
3123 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3124
3125 RtlEnterCriticalSection( &timerqueue.cs );
3126
3127 assert( this->u.timer.timer_initialized );
3128 this->u.timer.timer_set = timeout != NULL;
3129
3130 /* Convert relative timeout to absolute timestamp and handle a timeout
3131 * of zero, which means that the timer is submitted immediately. */
3132 if (timeout)
3133 {
3134 timestamp = timeout->QuadPart;
3135 if ((LONGLONG)timestamp < 0)
3136 {
3137 LARGE_INTEGER now;
3138 NtQuerySystemTime( &now );
3139 timestamp = now.QuadPart - timestamp;
3140 }
3141 else if (!timestamp)
3142 {
3143 if (!period)
3144 timeout = NULL;
3145 else
3146 {
3147 LARGE_INTEGER now;
3148 NtQuerySystemTime( &now );
3149 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3150 }
3151 submit_timer = TRUE;
3152 }
3153 }
3154
3155 /* First remove existing timeout. */
3156 if (this->u.timer.timer_pending)
3157 {
3158 list_remove( &this->u.timer.timer_entry );
3159 this->u.timer.timer_pending = FALSE;
3160 }
3161
3162 /* If the timer was enabled, then add it back to the queue. */
3163 if (timeout)
3164 {
3165 this->u.timer.timeout = timestamp;
3166 this->u.timer.period = period;
3167 this->u.timer.window_length = window_length;
3168
3169 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3170 struct threadpool_object, u.timer.timer_entry )
3171 {
3172 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3173 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3174 break;
3175 }
3176 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3177
3178 /* Wake up the timer thread when the timeout has to be updated. */
3179 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3180 RtlWakeAllConditionVariable( &timerqueue.update_event );
3181
3182 this->u.timer.timer_pending = TRUE;
3183 }
3184
3185 RtlLeaveCriticalSection( &timerqueue.cs );
3186
3187 if (submit_timer)
3188 tp_object_submit( this, FALSE );
3189 }
3190
3191 /***********************************************************************
3192 * TpSetWait (NTDLL.@)
3193 */
TpSetWait(TP_WAIT * wait,HANDLE handle,LARGE_INTEGER * timeout)3194 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3195 {
3196 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3197 ULONGLONG timestamp = MAXLONGLONG;
3198
3199 TRACE( "%p %p %p\n", wait, handle, timeout );
3200
3201 RtlEnterCriticalSection( &waitqueue.cs );
3202
3203 assert( this->u.wait.bucket );
3204 this->u.wait.handle = handle;
3205
3206 if (handle || this->u.wait.wait_pending)
3207 {
3208 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3209 list_remove( &this->u.wait.wait_entry );
3210
3211 /* Convert relative timeout to absolute timestamp. */
3212 if (handle && timeout)
3213 {
3214 timestamp = timeout->QuadPart;
3215 if ((LONGLONG)timestamp < 0)
3216 {
3217 LARGE_INTEGER now;
3218 NtQuerySystemTime( &now );
3219 timestamp = now.QuadPart - timestamp;
3220 }
3221 }
3222
3223 /* Add wait object back into one of the queues. */
3224 if (handle)
3225 {
3226 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3227 this->u.wait.wait_pending = TRUE;
3228 this->u.wait.timeout = timestamp;
3229 }
3230 else
3231 {
3232 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3233 this->u.wait.wait_pending = FALSE;
3234 }
3235
3236 /* Wake up the wait queue thread. */
3237 NtSetEvent( bucket->update_event, NULL );
3238 }
3239
3240 RtlLeaveCriticalSection( &waitqueue.cs );
3241 }
3242
3243 /***********************************************************************
3244 * TpSimpleTryPost (NTDLL.@)
3245 */
TpSimpleTryPost(PTP_SIMPLE_CALLBACK callback,PVOID userdata,TP_CALLBACK_ENVIRON * environment)3246 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3247 TP_CALLBACK_ENVIRON *environment )
3248 {
3249 struct threadpool_object *object;
3250 struct threadpool *pool;
3251 NTSTATUS status;
3252
3253 TRACE( "%p %p %p\n", callback, userdata, environment );
3254
3255 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3256 if (!object)
3257 return STATUS_NO_MEMORY;
3258
3259 status = tp_threadpool_lock( &pool, environment );
3260 if (status)
3261 {
3262 RtlFreeHeap( GetProcessHeap(), 0, object );
3263 return status;
3264 }
3265
3266 object->type = TP_OBJECT_TYPE_SIMPLE;
3267 object->u.simple.callback = callback;
3268 tp_object_initialize( object, pool, userdata, environment );
3269
3270 return STATUS_SUCCESS;
3271 }
3272
3273 /***********************************************************************
3274 * TpStartAsyncIoOperation (NTDLL.@)
3275 */
TpStartAsyncIoOperation(TP_IO * io)3276 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3277 {
3278 struct threadpool_object *this = impl_from_TP_IO( io );
3279
3280 TRACE( "%p\n", io );
3281
3282 RtlEnterCriticalSection( &this->pool->cs );
3283
3284 this->u.io.pending_count++;
3285
3286 RtlLeaveCriticalSection( &this->pool->cs );
3287 }
3288
3289 /***********************************************************************
3290 * TpWaitForIoCompletion (NTDLL.@)
3291 */
TpWaitForIoCompletion(TP_IO * io,BOOL cancel_pending)3292 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3293 {
3294 struct threadpool_object *this = impl_from_TP_IO( io );
3295
3296 TRACE( "%p %d\n", io, cancel_pending );
3297
3298 if (cancel_pending)
3299 tp_object_cancel( this );
3300 tp_object_wait( this, FALSE );
3301 }
3302
3303 /***********************************************************************
3304 * TpWaitForTimer (NTDLL.@)
3305 */
TpWaitForTimer(TP_TIMER * timer,BOOL cancel_pending)3306 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3307 {
3308 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3309
3310 TRACE( "%p %d\n", timer, cancel_pending );
3311
3312 if (cancel_pending)
3313 tp_object_cancel( this );
3314 tp_object_wait( this, FALSE );
3315 }
3316
3317 /***********************************************************************
3318 * TpWaitForWait (NTDLL.@)
3319 */
TpWaitForWait(TP_WAIT * wait,BOOL cancel_pending)3320 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3321 {
3322 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3323
3324 TRACE( "%p %d\n", wait, cancel_pending );
3325
3326 if (cancel_pending)
3327 tp_object_cancel( this );
3328 tp_object_wait( this, FALSE );
3329 }
3330
3331 /***********************************************************************
3332 * TpWaitForWork (NTDLL.@)
3333 */
TpWaitForWork(TP_WORK * work,BOOL cancel_pending)3334 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3335 {
3336 struct threadpool_object *this = impl_from_TP_WORK( work );
3337
3338 TRACE( "%p %u\n", work, cancel_pending );
3339
3340 if (cancel_pending)
3341 tp_object_cancel( this );
3342 tp_object_wait( this, FALSE );
3343 }
3344
3345 /***********************************************************************
3346 * TpSetPoolStackInformation (NTDLL.@)
3347 */
TpSetPoolStackInformation(TP_POOL * pool,TP_POOL_STACK_INFORMATION * stack_info)3348 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3349 {
3350 struct threadpool *this = impl_from_TP_POOL( pool );
3351
3352 TRACE( "%p %p\n", pool, stack_info );
3353
3354 if (!stack_info)
3355 return STATUS_INVALID_PARAMETER;
3356
3357 RtlEnterCriticalSection( &this->cs );
3358 this->stack_info = *stack_info;
3359 RtlLeaveCriticalSection( &this->cs );
3360
3361 return STATUS_SUCCESS;
3362 }
3363
3364 /***********************************************************************
3365 * TpQueryPoolStackInformation (NTDLL.@)
3366 */
TpQueryPoolStackInformation(TP_POOL * pool,TP_POOL_STACK_INFORMATION * stack_info)3367 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3368 {
3369 struct threadpool *this = impl_from_TP_POOL( pool );
3370
3371 TRACE( "%p %p\n", pool, stack_info );
3372
3373 if (!stack_info)
3374 return STATUS_INVALID_PARAMETER;
3375
3376 RtlEnterCriticalSection( &this->cs );
3377 *stack_info = this->stack_info;
3378 RtlLeaveCriticalSection( &this->cs );
3379
3380 return STATUS_SUCCESS;
3381 }
3382
rtl_wait_callback(TP_CALLBACK_INSTANCE * instance,void * userdata,TP_WAIT * wait,TP_WAIT_RESULT result)3383 static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3384 {
3385 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3386 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3387 }
3388
3389 /***********************************************************************
3390 * RtlRegisterWait (NTDLL.@)
3391 *
3392 * Registers a wait for a handle to become signaled.
3393 *
3394 * PARAMS
3395 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3396 * Object [I] Object to wait to become signaled.
3397 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3398 * Context [I] Context to pass to the callback function when it is executed.
3399 * Milliseconds [I] Number of milliseconds to wait before timing out.
3400 * Flags [I] Flags. See notes.
3401 *
3402 * RETURNS
3403 * Success: STATUS_SUCCESS.
3404 * Failure: Any NTSTATUS code.
3405 *
3406 * NOTES
3407 * Flags can be one or more of the following:
3408 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3409 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3410 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3411 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3412 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3413 */
RtlRegisterWait(HANDLE * out,HANDLE handle,RTL_WAITORTIMERCALLBACKFUNC callback,void * context,ULONG milliseconds,ULONG flags)3414 NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3415 void *context, ULONG milliseconds, ULONG flags )
3416 {
3417 struct threadpool_object *object;
3418 TP_CALLBACK_ENVIRON environment;
3419 LARGE_INTEGER timeout;
3420 NTSTATUS status;
3421 TP_WAIT *wait;
3422
3423 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3424 out, handle, callback, context, milliseconds, flags );
3425
3426 memset( &environment, 0, sizeof(environment) );
3427 environment.Version = 1;
3428 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3429 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3430
3431 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3432 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3433 return status;
3434
3435 object = impl_from_TP_WAIT(wait);
3436 object->u.wait.rtl_callback = callback;
3437
3438 RtlEnterCriticalSection( &waitqueue.cs );
3439 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3440
3441 *out = object;
3442 RtlLeaveCriticalSection( &waitqueue.cs );
3443
3444 return STATUS_SUCCESS;
3445 }
3446
3447 /***********************************************************************
3448 * RtlDeregisterWaitEx (NTDLL.@)
3449 *
3450 * Cancels a wait operation and frees the resources associated with calling
3451 * RtlRegisterWait().
3452 *
3453 * PARAMS
3454 * WaitObject [I] Handle to the wait object to free.
3455 *
3456 * RETURNS
3457 * Success: STATUS_SUCCESS.
3458 * Failure: Any NTSTATUS code.
3459 */
RtlDeregisterWaitEx(HANDLE handle,HANDLE event)3460 NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3461 {
3462 struct threadpool_object *object = handle;
3463 NTSTATUS status;
3464
3465 TRACE( "handle %p, event %p\n", handle, event );
3466
3467 if (!object) return STATUS_INVALID_HANDLE;
3468
3469 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3470
3471 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3472 else
3473 {
3474 assert( object->completed_event == NULL );
3475 object->completed_event = event;
3476 }
3477
3478 RtlEnterCriticalSection( &object->pool->cs );
3479 if (object->num_pending_callbacks + object->num_running_callbacks
3480 + object->num_associated_callbacks) status = STATUS_PENDING;
3481 else status = STATUS_SUCCESS;
3482 RtlLeaveCriticalSection( &object->pool->cs );
3483
3484 TpReleaseWait( (TP_WAIT *)object );
3485 return status;
3486 }
3487
3488 /***********************************************************************
3489 * RtlDeregisterWait (NTDLL.@)
3490 *
3491 * Cancels a wait operation and frees the resources associated with calling
3492 * RtlRegisterWait().
3493 *
3494 * PARAMS
3495 * WaitObject [I] Handle to the wait object to free.
3496 *
3497 * RETURNS
3498 * Success: STATUS_SUCCESS.
3499 * Failure: Any NTSTATUS code.
3500 */
RtlDeregisterWait(HANDLE WaitHandle)3501 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3502 {
3503 return RtlDeregisterWaitEx(WaitHandle, NULL);
3504 }
3505
3506 #ifdef __REACTOS__
3507 VOID
3508 NTAPI
RtlpInitializeThreadPooling(VOID)3509 RtlpInitializeThreadPooling(
3510 VOID)
3511 {
3512 RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs);
3513 RtlInitializeCriticalSection(&timerqueue.cs);
3514 RtlInitializeCriticalSection(&waitqueue.cs);
3515 RtlInitializeCriticalSection(&ioqueue.cs);
3516 }
3517 #endif
3518