1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS system libraries
4 * PURPOSE: Work Item implementation
5 * FILE: lib/rtl/workitem.c
6 * PROGRAMMER:
7 */
8
9 /* INCLUDES *****************************************************************/
10
11 #include <rtl.h>
12
13 #define NDEBUG
14 #include <debug.h>
15
16 /* FUNCTIONS ***************************************************************/
17
18 NTSTATUS
19 NTAPI
RtlpStartThread(IN PTHREAD_START_ROUTINE Function,IN PVOID Parameter,OUT PHANDLE ThreadHandle)20 RtlpStartThread(IN PTHREAD_START_ROUTINE Function,
21 IN PVOID Parameter,
22 OUT PHANDLE ThreadHandle)
23 {
24 /* Create a native worker thread -- used for SMSS, CSRSS, etc... */
25 return RtlCreateUserThread(NtCurrentProcess(),
26 NULL,
27 TRUE,
28 0,
29 0,
30 0,
31 Function,
32 Parameter,
33 ThreadHandle,
34 NULL);
35 }
36
37 NTSTATUS
38 NTAPI
RtlpExitThread(IN NTSTATUS ExitStatus)39 RtlpExitThread(IN NTSTATUS ExitStatus)
40 {
41 /* Kill a native worker thread -- used for SMSS, CSRSS, etc... */
42 return NtTerminateThread(NtCurrentThread(), ExitStatus);
43 }
44
45 PRTL_START_POOL_THREAD RtlpStartThreadFunc = RtlpStartThread;
46 PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc = RtlpExitThread;
47
48 #define MAX_WORKERTHREADS 0x100
49 #define WORKERTHREAD_CREATION_THRESHOLD 0x5
50
51 typedef struct _RTLP_IOWORKERTHREAD
52 {
53 LIST_ENTRY ListEntry;
54 HANDLE ThreadHandle;
55 ULONG Flags;
56 } RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD;
57
58 typedef struct _RTLP_WORKITEM
59 {
60 WORKERCALLBACKFUNC Function;
61 PVOID Context;
62 ULONG Flags;
63 HANDLE TokenHandle;
64 } RTLP_WORKITEM, *PRTLP_WORKITEM;
65
66 static LONG ThreadPoolInitialized = 0;
67 static RTL_CRITICAL_SECTION ThreadPoolLock;
68 static PRTLP_IOWORKERTHREAD PersistentIoThread;
69 static LIST_ENTRY ThreadPoolIOWorkerThreadsList;
70 static HANDLE ThreadPoolCompletionPort;
71 static LONG ThreadPoolWorkerThreads;
72 static LONG ThreadPoolWorkerThreadsRequests;
73 static LONG ThreadPoolWorkerThreadsLongRequests;
74 static LONG ThreadPoolIOWorkerThreads;
75 static LONG ThreadPoolIOWorkerThreadsRequests;
76 static LONG ThreadPoolIOWorkerThreadsLongRequests;
77
78 #define IsThreadPoolInitialized() (*((volatile LONG*)&ThreadPoolInitialized) == 1)
79
80 static NTSTATUS
RtlpInitializeThreadPool(VOID)81 RtlpInitializeThreadPool(VOID)
82 {
83 NTSTATUS Status = STATUS_SUCCESS;
84 LONG InitStatus;
85
86 do
87 {
88 InitStatus = InterlockedCompareExchange(&ThreadPoolInitialized,
89 2,
90 0);
91 if (InitStatus == 0)
92 {
93 /* We're the first thread to initialize the thread pool */
94
95 InitializeListHead(&ThreadPoolIOWorkerThreadsList);
96
97 PersistentIoThread = NULL;
98
99 ThreadPoolWorkerThreads = 0;
100 ThreadPoolWorkerThreadsRequests = 0;
101 ThreadPoolWorkerThreadsLongRequests = 0;
102 ThreadPoolIOWorkerThreads = 0;
103 ThreadPoolIOWorkerThreadsRequests = 0;
104 ThreadPoolIOWorkerThreadsLongRequests = 0;
105
106 /* Initialize the lock */
107 Status = RtlInitializeCriticalSection(&ThreadPoolLock);
108 if (!NT_SUCCESS(Status))
109 goto Finish;
110
111 /* Create the complection port */
112 Status = NtCreateIoCompletion(&ThreadPoolCompletionPort,
113 IO_COMPLETION_ALL_ACCESS,
114 NULL,
115 0);
116 if (!NT_SUCCESS(Status))
117 {
118 RtlDeleteCriticalSection(&ThreadPoolLock);
119 goto Finish;
120 }
121
122 Finish:
123 /* Initialization done */
124 InterlockedExchange(&ThreadPoolInitialized,
125 1);
126 break;
127 }
128 else if (InitStatus == 2)
129 {
130 LARGE_INTEGER Timeout;
131
132 /* Another thread is currently initializing the thread pool!
133 Poll after a short period of time to see if the initialization
134 was completed */
135
136 Timeout.QuadPart = -10000000LL; /* Wait for a second */
137 NtDelayExecution(FALSE,
138 &Timeout);
139 }
140 } while (InitStatus != 1);
141
142 return Status;
143 }
144
145 static NTSTATUS
RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)146 RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)
147 {
148 NTSTATUS Status;
149
150 Status = NtOpenThreadToken(NtCurrentThread(),
151 TOKEN_IMPERSONATE,
152 TRUE,
153 TokenHandle);
154 if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS)
155 {
156 *TokenHandle = NULL;
157 Status = STATUS_SUCCESS;
158 }
159
160 return Status;
161 }
162
163 static NTSTATUS
RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)164 RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)
165 {
166 NTSTATUS Status;
167 HANDLE ThreadHandle;
168 LARGE_INTEGER Timeout;
169 volatile LONG WorkerInitialized = 0;
170
171 Timeout.QuadPart = -10000LL; /* Wait for 100ms */
172
173 /* Start the thread */
174 Status = RtlpStartThreadFunc(StartRoutine, (PVOID)&WorkerInitialized, &ThreadHandle);
175 if (NT_SUCCESS(Status))
176 {
177 NtResumeThread(ThreadHandle, NULL);
178
179 /* Poll until the thread got a chance to initialize */
180 while (WorkerInitialized == 0)
181 {
182 NtDelayExecution(FALSE,
183 &Timeout);
184 }
185
186 NtClose(ThreadHandle);
187 }
188
189 return Status;
190 }
191
192 static VOID
193 NTAPI
RtlpExecuteWorkItem(IN OUT PVOID NormalContext,IN OUT PVOID SystemArgument1,IN OUT PVOID SystemArgument2)194 RtlpExecuteWorkItem(IN OUT PVOID NormalContext,
195 IN OUT PVOID SystemArgument1,
196 IN OUT PVOID SystemArgument2)
197 {
198 NTSTATUS Status;
199 BOOLEAN Impersonated = FALSE;
200 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
201
202 RtlFreeHeap(RtlGetProcessHeap(),
203 0,
204 SystemArgument2);
205
206 if (WorkItem.TokenHandle != NULL)
207 {
208 Status = NtSetInformationThread(NtCurrentThread(),
209 ThreadImpersonationToken,
210 &WorkItem.TokenHandle,
211 sizeof(HANDLE));
212
213 NtClose(WorkItem.TokenHandle);
214
215 if (NT_SUCCESS(Status))
216 {
217 Impersonated = TRUE;
218 }
219 }
220
221 _SEH2_TRY
222 {
223 DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
224
225 /* Execute the function */
226 WorkItem.Function(WorkItem.Context);
227 }
228 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
229 {
230 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
231 }
232 _SEH2_END;
233
234 if (Impersonated)
235 {
236 WorkItem.TokenHandle = NULL;
237 Status = NtSetInformationThread(NtCurrentThread(),
238 ThreadImpersonationToken,
239 &WorkItem.TokenHandle,
240 sizeof(HANDLE));
241 if (!NT_SUCCESS(Status))
242 {
243 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
244 }
245 }
246
247 /* update the requests counter */
248 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
249
250 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
251 {
252 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
253 }
254 }
255
256
257 static NTSTATUS
RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)258 RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
259 {
260 NTSTATUS Status = STATUS_SUCCESS;
261
262 InterlockedIncrement(&ThreadPoolWorkerThreadsRequests);
263
264 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
265 {
266 InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests);
267 }
268
269 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD)
270 {
271 Status = RtlpInitializeTimerThread();
272
273 if (NT_SUCCESS(Status))
274 {
275 /* Queue an APC in the timer thread */
276 Status = NtQueueApcThread(TimerThreadHandle,
277 RtlpExecuteWorkItem,
278 NULL,
279 NULL,
280 WorkItem);
281 }
282 }
283 else
284 {
285 /* Queue an IO completion message */
286 Status = NtSetIoCompletion(ThreadPoolCompletionPort,
287 RtlpExecuteWorkItem,
288 WorkItem,
289 STATUS_SUCCESS,
290 0);
291 }
292
293 if (!NT_SUCCESS(Status))
294 {
295 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
296
297 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
298 {
299 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
300 }
301 }
302
303 return Status;
304 }
305
306 static VOID
307 NTAPI
RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,IN OUT PVOID SystemArgument1,IN OUT PVOID SystemArgument2)308 RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,
309 IN OUT PVOID SystemArgument1,
310 IN OUT PVOID SystemArgument2)
311 {
312 NTSTATUS Status;
313 BOOLEAN Impersonated = FALSE;
314 PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext;
315 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
316
317 ASSERT(IoThread != NULL);
318
319 RtlFreeHeap(RtlGetProcessHeap(),
320 0,
321 SystemArgument2);
322
323 if (WorkItem.TokenHandle != NULL)
324 {
325 Status = NtSetInformationThread(NtCurrentThread(),
326 ThreadImpersonationToken,
327 &WorkItem.TokenHandle,
328 sizeof(HANDLE));
329
330 NtClose(WorkItem.TokenHandle);
331
332 if (NT_SUCCESS(Status))
333 {
334 Impersonated = TRUE;
335 }
336 }
337
338 _SEH2_TRY
339 {
340 DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
341
342 /* Execute the function */
343 WorkItem.Function(WorkItem.Context);
344 }
345 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
346 {
347 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
348 }
349 _SEH2_END;
350
351 if (Impersonated)
352 {
353 WorkItem.TokenHandle = NULL;
354 Status = NtSetInformationThread(NtCurrentThread(),
355 ThreadImpersonationToken,
356 &WorkItem.TokenHandle,
357 sizeof(HANDLE));
358 if (!NT_SUCCESS(Status))
359 {
360 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
361 }
362 }
363
364 /* remove the long function flag */
365 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
366 {
367 Status = RtlEnterCriticalSection(&ThreadPoolLock);
368 if (NT_SUCCESS(Status))
369 {
370 IoThread->Flags &= ~WT_EXECUTELONGFUNCTION;
371 RtlLeaveCriticalSection(&ThreadPoolLock);
372 }
373 }
374
375 /* update the requests counter */
376 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
377
378 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
379 {
380 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
381 }
382 }
383
384 static NTSTATUS
RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)385 RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
386 {
387 PLIST_ENTRY CurrentEntry;
388 PRTLP_IOWORKERTHREAD IoThread = NULL;
389 NTSTATUS Status = STATUS_SUCCESS;
390
391 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
392 {
393 if (PersistentIoThread != NULL)
394 {
395 /* We already have a persistent IO worker thread */
396 IoThread = PersistentIoThread;
397 }
398 else
399 {
400 /* We're not aware of any persistent IO worker thread. Search for a unused
401 worker thread that doesn't have a long function queued */
402 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
403 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
404 {
405 IoThread = CONTAINING_RECORD(CurrentEntry,
406 RTLP_IOWORKERTHREAD,
407 ListEntry);
408
409 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
410 break;
411
412 CurrentEntry = CurrentEntry->Flink;
413 }
414
415 if (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
416 {
417 /* Found a worker thread we can use. */
418 ASSERT(IoThread != NULL);
419
420 IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD;
421 PersistentIoThread = IoThread;
422 }
423 else
424 {
425 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n");
426 return STATUS_NO_MEMORY;
427 }
428 }
429 }
430 else
431 {
432 /* Find a worker thread that is not currently executing a long function */
433 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
434 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
435 {
436 IoThread = CONTAINING_RECORD(CurrentEntry,
437 RTLP_IOWORKERTHREAD,
438 ListEntry);
439
440 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
441 {
442 /* if we're trying to queue a long function then make sure we're not dealing
443 with the persistent thread */
444 if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) && !(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD))
445 {
446 /* found a candidate */
447 break;
448 }
449 }
450
451 CurrentEntry = CurrentEntry->Flink;
452 }
453
454 if (CurrentEntry == &ThreadPoolIOWorkerThreadsList)
455 {
456 /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */
457 if (ThreadPoolIOWorkerThreads == 0)
458 {
459 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n", WorkItem);
460 ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList));
461 return STATUS_NO_MEMORY;
462 }
463 else
464 {
465 /* pick the first worker thread */
466 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
467 IoThread = CONTAINING_RECORD(CurrentEntry,
468 RTLP_IOWORKERTHREAD,
469 ListEntry);
470
471 /* Since this might be the persistent worker thread, don't run as a
472 long function */
473 WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION;
474 }
475 }
476
477 /* Move the picked thread to the end of the list. Since we're always searching
478 from the beginning, this improves distribution of work items */
479 RemoveEntryList(&IoThread->ListEntry);
480 InsertTailList(&ThreadPoolIOWorkerThreadsList,
481 &IoThread->ListEntry);
482 }
483
484 ASSERT(IoThread != NULL);
485
486 InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests);
487
488 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
489 {
490 /* We're about to queue a long function, mark the thread */
491 IoThread->Flags |= WT_EXECUTELONGFUNCTION;
492
493 InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests);
494 }
495
496 /* It's time to queue the work item */
497 Status = NtQueueApcThread(IoThread->ThreadHandle,
498 RtlpExecuteIoWorkItem,
499 IoThread,
500 NULL,
501 WorkItem);
502 if (!NT_SUCCESS(Status))
503 {
504 DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function);
505 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
506
507 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
508 {
509 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
510 }
511 }
512
513 return Status;
514 }
515
516 static BOOLEAN
RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL)517 RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL)
518 {
519 NTSTATUS Status;
520 ULONG IoPending;
521 BOOLEAN CreatedHandle = FALSE;
522 BOOLEAN IsIoPending = TRUE;
523
524 if (ThreadHandle == NULL)
525 {
526 Status = NtDuplicateObject(NtCurrentProcess(),
527 NtCurrentThread(),
528 NtCurrentProcess(),
529 &ThreadHandle,
530 0,
531 0,
532 DUPLICATE_SAME_ACCESS);
533 if (!NT_SUCCESS(Status))
534 {
535 return IsIoPending;
536 }
537
538 CreatedHandle = TRUE;
539 }
540
541 Status = NtQueryInformationThread(ThreadHandle,
542 ThreadIsIoPending,
543 &IoPending,
544 sizeof(IoPending),
545 NULL);
546 if (NT_SUCCESS(Status) && IoPending == 0)
547 {
548 IsIoPending = FALSE;
549 }
550
551 if (CreatedHandle)
552 {
553 NtClose(ThreadHandle);
554 }
555
556 return IsIoPending;
557 }
558
559 static ULONG
560 NTAPI
RtlpIoWorkerThreadProc(IN PVOID Parameter)561 RtlpIoWorkerThreadProc(IN PVOID Parameter)
562 {
563 volatile RTLP_IOWORKERTHREAD ThreadInfo;
564 LARGE_INTEGER Timeout;
565 BOOLEAN Terminate;
566 NTSTATUS Status = STATUS_SUCCESS;
567
568 if (InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS)
569 {
570 /* Oops, too many worker threads... */
571 goto InitFailed;
572 }
573
574 /* Get a thread handle to ourselves */
575 Status = NtDuplicateObject(NtCurrentProcess(),
576 NtCurrentThread(),
577 NtCurrentProcess(),
578 (PHANDLE)&ThreadInfo.ThreadHandle,
579 0,
580 0,
581 DUPLICATE_SAME_ACCESS);
582 if (!NT_SUCCESS(Status))
583 {
584 DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status);
585
586 InitFailed:
587 InterlockedDecrement(&ThreadPoolIOWorkerThreads);
588
589 /* Signal initialization completion */
590 InterlockedExchange((PLONG)Parameter,
591 1);
592
593 RtlpExitThreadFunc(Status);
594 return 0;
595 }
596
597 ThreadInfo.Flags = 0;
598
599 /* Insert the thread into the list */
600 InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList,
601 (PLIST_ENTRY)&ThreadInfo.ListEntry);
602
603 /* Signal initialization completion */
604 InterlockedExchange((PLONG)Parameter,
605 1);
606
607 for (;;)
608 {
609 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
610
611 Wait:
612 do
613 {
614 /* Perform an alertable wait, the work items are going to be executed as APCs */
615 Status = NtDelayExecution(TRUE,
616 &Timeout);
617
618 /* Loop as long as we executed an APC */
619 } while (Status != STATUS_SUCCESS);
620
621 /* We timed out, let's see if we're allowed to terminate */
622 Terminate = FALSE;
623
624 Status = RtlEnterCriticalSection(&ThreadPoolLock);
625 if (NT_SUCCESS(Status))
626 {
627 if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
628 {
629 /* This thread is supposed to be persistent. Don't terminate! */
630 RtlLeaveCriticalSection(&ThreadPoolLock);
631
632 Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL;
633 goto Wait;
634 }
635
636 /* FIXME - figure out an effective method to determine if it's appropriate to
637 lower the number of threads. For now let's always terminate if there's
638 at least one thread and no queued items. */
639 Terminate = (*((volatile LONG*)&ThreadPoolIOWorkerThreads) - *((volatile LONG*)&ThreadPoolIOWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) &&
640 (*((volatile LONG*)&ThreadPoolIOWorkerThreadsRequests) == 0);
641
642 if (Terminate)
643 {
644 /* Prevent termination as long as IO is pending */
645 Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle);
646 }
647
648 if (Terminate)
649 {
650 /* Rundown the thread and unlink it from the list */
651 InterlockedDecrement(&ThreadPoolIOWorkerThreads);
652 RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry);
653 }
654
655 RtlLeaveCriticalSection(&ThreadPoolLock);
656
657 if (Terminate)
658 {
659 /* Break the infinite loop and terminate */
660 Status = STATUS_SUCCESS;
661 break;
662 }
663 }
664 else
665 {
666 DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status);
667 break;
668 }
669 }
670
671 NtClose(ThreadInfo.ThreadHandle);
672 RtlpExitThreadFunc(Status);
673 return 0;
674 }
675
676 static ULONG
677 NTAPI
RtlpWorkerThreadProc(IN PVOID Parameter)678 RtlpWorkerThreadProc(IN PVOID Parameter)
679 {
680 LARGE_INTEGER Timeout;
681 BOOLEAN Terminate;
682 PVOID SystemArgument2;
683 IO_STATUS_BLOCK IoStatusBlock;
684 ULONG TimeoutCount = 0;
685 PKNORMAL_ROUTINE ApcRoutine;
686 NTSTATUS Status = STATUS_SUCCESS;
687
688 if (InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS)
689 {
690 /* Signal initialization completion */
691 InterlockedExchange((PLONG)Parameter,
692 1);
693
694 /* Oops, too many worker threads... */
695 RtlpExitThreadFunc(Status);
696 return 0;
697 }
698
699 /* Signal initialization completion */
700 InterlockedExchange((PLONG)Parameter,
701 1);
702
703 for (;;)
704 {
705 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
706
707 /* Dequeue a completion message */
708 Status = NtRemoveIoCompletion(ThreadPoolCompletionPort,
709 (PVOID*)&ApcRoutine,
710 &SystemArgument2,
711 &IoStatusBlock,
712 &Timeout);
713
714 if (Status == STATUS_SUCCESS)
715 {
716 TimeoutCount = 0;
717
718 _SEH2_TRY
719 {
720 /* Call the APC routine */
721 ApcRoutine(NULL,
722 (PVOID)IoStatusBlock.Information,
723 SystemArgument2);
724 }
725 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
726 {
727 (void)0;
728 }
729 _SEH2_END;
730 }
731 else
732 {
733 Terminate = FALSE;
734
735 if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock)))
736 continue;
737
738 /* FIXME - this should be optimized, check if there's requests, etc */
739
740 if (Status == STATUS_TIMEOUT)
741 {
742 /* FIXME - we might want to optimize this */
743 if (TimeoutCount++ > 2 &&
744 *((volatile LONG*)&ThreadPoolWorkerThreads) - *((volatile LONG*)&ThreadPoolWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD)
745 {
746 Terminate = TRUE;
747 }
748 }
749 else
750 Terminate = TRUE;
751
752 RtlLeaveCriticalSection(&ThreadPoolLock);
753
754 if (Terminate)
755 {
756 /* Prevent termination as long as IO is pending */
757 Terminate = !RtlpIsIoPending(NULL);
758 }
759
760 if (Terminate)
761 {
762 InterlockedDecrement(&ThreadPoolWorkerThreads);
763 Status = STATUS_SUCCESS;
764 break;
765 }
766 }
767 }
768
769 RtlpExitThreadFunc(Status);
770 return 0;
771
772 }
773
774 /*
775 * @implemented
776 */
777 NTSTATUS
778 NTAPI
RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,IN PVOID Context OPTIONAL,IN ULONG Flags)779 RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,
780 IN PVOID Context OPTIONAL,
781 IN ULONG Flags)
782 {
783 LONG FreeWorkers;
784 NTSTATUS Status;
785 PRTLP_WORKITEM WorkItem;
786
787 DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags);
788
789 /* Initialize the thread pool if not already initialized */
790 if (!IsThreadPoolInitialized())
791 {
792 Status = RtlpInitializeThreadPool();
793
794 if (!NT_SUCCESS(Status))
795 return Status;
796 }
797
798 /* Allocate a work item */
799 WorkItem = RtlAllocateHeap(RtlGetProcessHeap(),
800 0,
801 sizeof(RTLP_WORKITEM));
802 if (WorkItem == NULL)
803 return STATUS_NO_MEMORY;
804
805 WorkItem->Function = Function;
806 WorkItem->Context = Context;
807 WorkItem->Flags = Flags;
808
809 if (Flags & WT_TRANSFER_IMPERSONATION)
810 {
811 Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle);
812
813 if (!NT_SUCCESS(Status))
814 {
815 DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status);
816 goto Cleanup;
817 }
818 }
819 else
820 WorkItem->TokenHandle = NULL;
821
822 Status = RtlEnterCriticalSection(&ThreadPoolLock);
823 if (NT_SUCCESS(Status))
824 {
825 if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD | WT_EXECUTEINPERSISTENTIOTHREAD))
826 {
827 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
828
829 FreeWorkers = ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsLongRequests;
830
831 if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION)) == WT_EXECUTELONGFUNCTION) &&
832 PersistentIoThread != NULL)
833 {
834 /* We shouldn't queue a long function into the persistent IO thread */
835 FreeWorkers--;
836 }
837
838 /* See if it's a good idea to grow the pool */
839 if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS &&
840 (FreeWorkers <= 0 || ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
841 {
842 /* Grow the thread pool */
843 Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc);
844
845 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolIOWorkerThreads) != 0)
846 {
847 /* We failed to create the thread, but there's at least one there so
848 we can at least queue the request */
849 Status = STATUS_SUCCESS;
850 }
851 }
852
853 if (NT_SUCCESS(Status))
854 {
855 /* Queue a IO worker thread */
856 Status = RtlpQueueIoWorkerThread(WorkItem);
857 }
858 }
859 else
860 {
861 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
862
863 FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests;
864
865 /* See if it's a good idea to grow the pool */
866 if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS &&
867 (FreeWorkers <= 0 || ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
868 {
869 /* Grow the thread pool */
870 Status = RtlpStartWorkerThread(RtlpWorkerThreadProc);
871
872 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolWorkerThreads) != 0)
873 {
874 /* We failed to create the thread, but there's at least one there so
875 we can at least queue the request */
876 Status = STATUS_SUCCESS;
877 }
878 }
879
880 if (NT_SUCCESS(Status))
881 {
882 /* Queue a normal worker thread */
883 Status = RtlpQueueWorkerThread(WorkItem);
884 }
885 }
886
887 RtlLeaveCriticalSection(&ThreadPoolLock);
888 }
889
890 if (!NT_SUCCESS(Status))
891 {
892 if (WorkItem->TokenHandle != NULL)
893 {
894 NtClose(WorkItem->TokenHandle);
895 }
896
897 Cleanup:
898 RtlFreeHeap(RtlGetProcessHeap(),
899 0,
900 WorkItem);
901 }
902
903 return Status;
904 }
905
906 /*
907 * @unimplemented
908 */
909 NTSTATUS
910 NTAPI
RtlSetIoCompletionCallback(IN HANDLE FileHandle,IN PIO_APC_ROUTINE Callback,IN ULONG Flags)911 RtlSetIoCompletionCallback(IN HANDLE FileHandle,
912 IN PIO_APC_ROUTINE Callback,
913 IN ULONG Flags)
914 {
915 IO_STATUS_BLOCK IoStatusBlock;
916 FILE_COMPLETION_INFORMATION FileCompletionInfo;
917 NTSTATUS Status;
918
919 DPRINT("RtlSetIoCompletionCallback(0x%p, 0x%p, 0x%x)\n", FileHandle, Callback, Flags);
920
921 /* Initialize the thread pool if not already initialized */
922 if (!IsThreadPoolInitialized())
923 {
924 Status = RtlpInitializeThreadPool();
925 if (!NT_SUCCESS(Status))
926 return Status;
927 }
928
929 FileCompletionInfo.Port = ThreadPoolCompletionPort;
930 FileCompletionInfo.Key = (PVOID)Callback;
931
932 Status = NtSetInformationFile(FileHandle,
933 &IoStatusBlock,
934 &FileCompletionInfo,
935 sizeof(FileCompletionInfo),
936 FileCompletionInformation);
937
938 return Status;
939 }
940
941 /*
942 * @implemented
943 */
944 NTSTATUS
945 NTAPI
RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread,IN PRTL_EXIT_POOL_THREAD ExitPoolThread)946 RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread,
947 IN PRTL_EXIT_POOL_THREAD ExitPoolThread)
948 {
949 RtlpStartThreadFunc = StartPoolThread;
950 RtlpExitThreadFunc = ExitPoolThread;
951 return STATUS_SUCCESS;
952 }
953