xref: /reactos/sdk/lib/rtl/workitem.c (revision c2c66aff)
1 /*
2  * COPYRIGHT:         See COPYING in the top level directory
3  * PROJECT:           ReactOS system libraries
4  * PURPOSE:           Work Item implementation
5  * FILE:              lib/rtl/workitem.c
6  * PROGRAMMER:
7  */
8 
9 /* INCLUDES *****************************************************************/
10 
11 #include <rtl.h>
12 
13 #define NDEBUG
14 #include <debug.h>
15 
16 /* FUNCTIONS ***************************************************************/
17 
18 NTSTATUS
19 NTAPI
RtlpStartThread(IN PTHREAD_START_ROUTINE Function,IN PVOID Parameter,OUT PHANDLE ThreadHandle)20 RtlpStartThread(IN PTHREAD_START_ROUTINE Function,
21                 IN PVOID Parameter,
22                 OUT PHANDLE ThreadHandle)
23 {
24     /* Create a native worker thread -- used for SMSS, CSRSS, etc... */
25     return RtlCreateUserThread(NtCurrentProcess(),
26                                NULL,
27                                TRUE,
28                                0,
29                                0,
30                                0,
31                                Function,
32                                Parameter,
33                                ThreadHandle,
34                                NULL);
35 }
36 
37 NTSTATUS
38 NTAPI
RtlpExitThread(IN NTSTATUS ExitStatus)39 RtlpExitThread(IN NTSTATUS ExitStatus)
40 {
41     /* Kill a native worker thread -- used for SMSS, CSRSS, etc... */
42     return NtTerminateThread(NtCurrentThread(), ExitStatus);
43 }
44 
45 PRTL_START_POOL_THREAD RtlpStartThreadFunc = RtlpStartThread;
46 PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc = RtlpExitThread;
47 
48 #define MAX_WORKERTHREADS   0x100
49 #define WORKERTHREAD_CREATION_THRESHOLD 0x5
50 
51 typedef struct _RTLP_IOWORKERTHREAD
52 {
53     LIST_ENTRY ListEntry;
54     HANDLE ThreadHandle;
55     ULONG Flags;
56 } RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD;
57 
58 typedef struct _RTLP_WORKITEM
59 {
60     WORKERCALLBACKFUNC Function;
61     PVOID Context;
62     ULONG Flags;
63     HANDLE TokenHandle;
64 } RTLP_WORKITEM, *PRTLP_WORKITEM;
65 
66 static LONG ThreadPoolInitialized = 0;
67 static RTL_CRITICAL_SECTION ThreadPoolLock;
68 static PRTLP_IOWORKERTHREAD PersistentIoThread;
69 static LIST_ENTRY ThreadPoolIOWorkerThreadsList;
70 static HANDLE ThreadPoolCompletionPort;
71 static LONG ThreadPoolWorkerThreads;
72 static LONG ThreadPoolWorkerThreadsRequests;
73 static LONG ThreadPoolWorkerThreadsLongRequests;
74 static LONG ThreadPoolIOWorkerThreads;
75 static LONG ThreadPoolIOWorkerThreadsRequests;
76 static LONG ThreadPoolIOWorkerThreadsLongRequests;
77 
78 #define IsThreadPoolInitialized() (*((volatile LONG*)&ThreadPoolInitialized) == 1)
79 
80 static NTSTATUS
RtlpInitializeThreadPool(VOID)81 RtlpInitializeThreadPool(VOID)
82 {
83     NTSTATUS Status = STATUS_SUCCESS;
84     LONG InitStatus;
85 
86     do
87     {
88         InitStatus = InterlockedCompareExchange(&ThreadPoolInitialized,
89                                                  2,
90                                                  0);
91         if (InitStatus == 0)
92         {
93             /* We're the first thread to initialize the thread pool */
94 
95             InitializeListHead(&ThreadPoolIOWorkerThreadsList);
96 
97             PersistentIoThread = NULL;
98 
99             ThreadPoolWorkerThreads = 0;
100             ThreadPoolWorkerThreadsRequests = 0;
101             ThreadPoolWorkerThreadsLongRequests = 0;
102             ThreadPoolIOWorkerThreads = 0;
103             ThreadPoolIOWorkerThreadsRequests = 0;
104             ThreadPoolIOWorkerThreadsLongRequests = 0;
105 
106             /* Initialize the lock */
107             Status = RtlInitializeCriticalSection(&ThreadPoolLock);
108             if (!NT_SUCCESS(Status))
109                 goto Finish;
110 
111             /* Create the complection port */
112             Status = NtCreateIoCompletion(&ThreadPoolCompletionPort,
113                                           IO_COMPLETION_ALL_ACCESS,
114                                           NULL,
115                                           0);
116             if (!NT_SUCCESS(Status))
117             {
118                 RtlDeleteCriticalSection(&ThreadPoolLock);
119                 goto Finish;
120             }
121 
122 Finish:
123             /* Initialization done */
124             InterlockedExchange(&ThreadPoolInitialized,
125                                  1);
126             break;
127         }
128         else if (InitStatus == 2)
129         {
130             LARGE_INTEGER Timeout;
131 
132             /* Another thread is currently initializing the thread pool!
133                Poll after a short period of time to see if the initialization
134                was completed */
135 
136             Timeout.QuadPart = -10000000LL; /* Wait for a second */
137             NtDelayExecution(FALSE,
138                              &Timeout);
139         }
140     } while (InitStatus != 1);
141 
142     return Status;
143 }
144 
145 static NTSTATUS
RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)146 RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)
147 {
148     NTSTATUS Status;
149 
150     Status = NtOpenThreadToken(NtCurrentThread(),
151                                TOKEN_IMPERSONATE,
152                                TRUE,
153                                TokenHandle);
154     if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS)
155     {
156         *TokenHandle = NULL;
157         Status = STATUS_SUCCESS;
158     }
159 
160     return Status;
161 }
162 
163 static NTSTATUS
RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)164 RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)
165 {
166     NTSTATUS Status;
167     HANDLE ThreadHandle;
168     LARGE_INTEGER Timeout;
169     volatile LONG WorkerInitialized = 0;
170 
171     Timeout.QuadPart = -10000LL; /* Wait for 100ms */
172 
173     /* Start the thread */
174     Status = RtlpStartThreadFunc(StartRoutine, (PVOID)&WorkerInitialized, &ThreadHandle);
175     if (NT_SUCCESS(Status))
176     {
177         NtResumeThread(ThreadHandle, NULL);
178 
179         /* Poll until the thread got a chance to initialize */
180         while (WorkerInitialized == 0)
181         {
182             NtDelayExecution(FALSE,
183                              &Timeout);
184         }
185 
186         NtClose(ThreadHandle);
187     }
188 
189     return Status;
190 }
191 
192 static VOID
193 NTAPI
RtlpExecuteWorkItem(IN OUT PVOID NormalContext,IN OUT PVOID SystemArgument1,IN OUT PVOID SystemArgument2)194 RtlpExecuteWorkItem(IN OUT PVOID NormalContext,
195                     IN OUT PVOID SystemArgument1,
196                     IN OUT PVOID SystemArgument2)
197 {
198     NTSTATUS Status;
199     BOOLEAN Impersonated = FALSE;
200     RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
201 
202     RtlFreeHeap(RtlGetProcessHeap(),
203                 0,
204                 SystemArgument2);
205 
206     if (WorkItem.TokenHandle != NULL)
207     {
208         Status = NtSetInformationThread(NtCurrentThread(),
209                                         ThreadImpersonationToken,
210                                         &WorkItem.TokenHandle,
211                                         sizeof(HANDLE));
212 
213         NtClose(WorkItem.TokenHandle);
214 
215         if (NT_SUCCESS(Status))
216         {
217             Impersonated = TRUE;
218         }
219     }
220 
221     _SEH2_TRY
222     {
223         DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
224 
225         /* Execute the function */
226         WorkItem.Function(WorkItem.Context);
227     }
228     _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
229     {
230         DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
231     }
232     _SEH2_END;
233 
234     if (Impersonated)
235     {
236         WorkItem.TokenHandle = NULL;
237         Status = NtSetInformationThread(NtCurrentThread(),
238                                         ThreadImpersonationToken,
239                                         &WorkItem.TokenHandle,
240                                         sizeof(HANDLE));
241         if (!NT_SUCCESS(Status))
242         {
243             DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
244         }
245     }
246 
247     /* update the requests counter */
248     InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
249 
250     if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
251     {
252         InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
253     }
254 }
255 
256 
257 static NTSTATUS
RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)258 RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
259 {
260     NTSTATUS Status = STATUS_SUCCESS;
261 
262     InterlockedIncrement(&ThreadPoolWorkerThreadsRequests);
263 
264     if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
265     {
266         InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests);
267     }
268 
269     if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD)
270     {
271         Status = RtlpInitializeTimerThread();
272 
273         if (NT_SUCCESS(Status))
274         {
275             /* Queue an APC in the timer thread */
276             Status = NtQueueApcThread(TimerThreadHandle,
277                                       RtlpExecuteWorkItem,
278                                       NULL,
279                                       NULL,
280                                       WorkItem);
281         }
282     }
283     else
284     {
285         /* Queue an IO completion message */
286         Status = NtSetIoCompletion(ThreadPoolCompletionPort,
287                                    RtlpExecuteWorkItem,
288                                    WorkItem,
289                                    STATUS_SUCCESS,
290                                    0);
291     }
292 
293     if (!NT_SUCCESS(Status))
294     {
295         InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
296 
297         if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
298         {
299             InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
300         }
301     }
302 
303     return Status;
304 }
305 
306 static VOID
307 NTAPI
RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,IN OUT PVOID SystemArgument1,IN OUT PVOID SystemArgument2)308 RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,
309                       IN OUT PVOID SystemArgument1,
310                       IN OUT PVOID SystemArgument2)
311 {
312     NTSTATUS Status;
313     BOOLEAN Impersonated = FALSE;
314     PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext;
315     RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
316 
317     ASSERT(IoThread != NULL);
318 
319     RtlFreeHeap(RtlGetProcessHeap(),
320                 0,
321                 SystemArgument2);
322 
323     if (WorkItem.TokenHandle != NULL)
324     {
325         Status = NtSetInformationThread(NtCurrentThread(),
326                                         ThreadImpersonationToken,
327                                         &WorkItem.TokenHandle,
328                                         sizeof(HANDLE));
329 
330         NtClose(WorkItem.TokenHandle);
331 
332         if (NT_SUCCESS(Status))
333         {
334             Impersonated = TRUE;
335         }
336     }
337 
338     _SEH2_TRY
339     {
340         DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
341 
342         /* Execute the function */
343         WorkItem.Function(WorkItem.Context);
344     }
345     _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
346     {
347         DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
348     }
349     _SEH2_END;
350 
351     if (Impersonated)
352     {
353         WorkItem.TokenHandle = NULL;
354         Status = NtSetInformationThread(NtCurrentThread(),
355                                         ThreadImpersonationToken,
356                                         &WorkItem.TokenHandle,
357                                         sizeof(HANDLE));
358         if (!NT_SUCCESS(Status))
359         {
360             DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
361         }
362     }
363 
364     /* remove the long function flag */
365     if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
366     {
367         Status = RtlEnterCriticalSection(&ThreadPoolLock);
368         if (NT_SUCCESS(Status))
369         {
370             IoThread->Flags &= ~WT_EXECUTELONGFUNCTION;
371             RtlLeaveCriticalSection(&ThreadPoolLock);
372         }
373     }
374 
375     /* update the requests counter */
376     InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
377 
378     if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
379     {
380         InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
381     }
382 }
383 
384 static NTSTATUS
RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)385 RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
386 {
387     PLIST_ENTRY CurrentEntry;
388     PRTLP_IOWORKERTHREAD IoThread = NULL;
389     NTSTATUS Status = STATUS_SUCCESS;
390 
391     if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
392     {
393         if (PersistentIoThread != NULL)
394         {
395             /* We already have a persistent IO worker thread */
396             IoThread = PersistentIoThread;
397         }
398         else
399         {
400             /* We're not aware of any persistent IO worker thread. Search for a unused
401                worker thread that doesn't have a long function queued */
402             CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
403             while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
404             {
405                 IoThread = CONTAINING_RECORD(CurrentEntry,
406                                              RTLP_IOWORKERTHREAD,
407                                              ListEntry);
408 
409                 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
410                     break;
411 
412                 CurrentEntry = CurrentEntry->Flink;
413             }
414 
415             if (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
416             {
417                 /* Found a worker thread we can use. */
418                 ASSERT(IoThread != NULL);
419 
420                 IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD;
421                 PersistentIoThread = IoThread;
422             }
423             else
424             {
425                 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n");
426                 return STATUS_NO_MEMORY;
427             }
428         }
429     }
430     else
431     {
432         /* Find a worker thread that is not currently executing a long function */
433         CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
434         while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
435         {
436             IoThread = CONTAINING_RECORD(CurrentEntry,
437                                          RTLP_IOWORKERTHREAD,
438                                          ListEntry);
439 
440             if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
441             {
442                 /* if we're trying to queue a long function then make sure we're not dealing
443                    with the persistent thread */
444                 if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) && !(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD))
445                 {
446                     /* found a candidate */
447                     break;
448                 }
449             }
450 
451             CurrentEntry = CurrentEntry->Flink;
452         }
453 
454         if (CurrentEntry == &ThreadPoolIOWorkerThreadsList)
455         {
456             /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */
457             if (ThreadPoolIOWorkerThreads == 0)
458             {
459                 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n", WorkItem);
460                 ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList));
461                 return STATUS_NO_MEMORY;
462             }
463             else
464             {
465                 /* pick the first worker thread */
466                 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
467                 IoThread = CONTAINING_RECORD(CurrentEntry,
468                                              RTLP_IOWORKERTHREAD,
469                                              ListEntry);
470 
471                 /* Since this might be the persistent worker thread, don't run as a
472                    long function */
473                 WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION;
474             }
475         }
476 
477         /* Move the picked thread to the end of the list. Since we're always searching
478            from the beginning, this improves distribution of work items */
479         RemoveEntryList(&IoThread->ListEntry);
480         InsertTailList(&ThreadPoolIOWorkerThreadsList,
481                        &IoThread->ListEntry);
482     }
483 
484     ASSERT(IoThread != NULL);
485 
486     InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests);
487 
488     if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
489     {
490         /* We're about to queue a long function, mark the thread */
491         IoThread->Flags |= WT_EXECUTELONGFUNCTION;
492 
493         InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests);
494     }
495 
496     /* It's time to queue the work item */
497     Status = NtQueueApcThread(IoThread->ThreadHandle,
498                               RtlpExecuteIoWorkItem,
499                               IoThread,
500                               NULL,
501                               WorkItem);
502     if (!NT_SUCCESS(Status))
503     {
504         DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function);
505         InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
506 
507         if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
508         {
509             InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
510         }
511     }
512 
513     return Status;
514 }
515 
516 static BOOLEAN
RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL)517 RtlpIsIoPending(IN HANDLE ThreadHandle  OPTIONAL)
518 {
519     NTSTATUS Status;
520     ULONG IoPending;
521     BOOLEAN CreatedHandle = FALSE;
522     BOOLEAN IsIoPending = TRUE;
523 
524     if (ThreadHandle == NULL)
525     {
526         Status = NtDuplicateObject(NtCurrentProcess(),
527                                    NtCurrentThread(),
528                                    NtCurrentProcess(),
529                                    &ThreadHandle,
530                                    0,
531                                    0,
532                                    DUPLICATE_SAME_ACCESS);
533         if (!NT_SUCCESS(Status))
534         {
535             return IsIoPending;
536         }
537 
538         CreatedHandle = TRUE;
539     }
540 
541     Status = NtQueryInformationThread(ThreadHandle,
542                                       ThreadIsIoPending,
543                                       &IoPending,
544                                       sizeof(IoPending),
545                                       NULL);
546     if (NT_SUCCESS(Status) && IoPending == 0)
547     {
548         IsIoPending = FALSE;
549     }
550 
551     if (CreatedHandle)
552     {
553         NtClose(ThreadHandle);
554     }
555 
556     return IsIoPending;
557 }
558 
559 static ULONG
560 NTAPI
RtlpIoWorkerThreadProc(IN PVOID Parameter)561 RtlpIoWorkerThreadProc(IN PVOID Parameter)
562 {
563     volatile RTLP_IOWORKERTHREAD ThreadInfo;
564     LARGE_INTEGER Timeout;
565     BOOLEAN Terminate;
566     NTSTATUS Status = STATUS_SUCCESS;
567 
568     if (InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS)
569     {
570         /* Oops, too many worker threads... */
571         goto InitFailed;
572     }
573 
574     /* Get a thread handle to ourselves */
575     Status = NtDuplicateObject(NtCurrentProcess(),
576                                NtCurrentThread(),
577                                NtCurrentProcess(),
578                                (PHANDLE)&ThreadInfo.ThreadHandle,
579                                0,
580                                0,
581                                DUPLICATE_SAME_ACCESS);
582     if (!NT_SUCCESS(Status))
583     {
584         DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status);
585 
586 InitFailed:
587         InterlockedDecrement(&ThreadPoolIOWorkerThreads);
588 
589         /* Signal initialization completion */
590         InterlockedExchange((PLONG)Parameter,
591                             1);
592 
593         RtlpExitThreadFunc(Status);
594         return 0;
595     }
596 
597     ThreadInfo.Flags = 0;
598 
599     /* Insert the thread into the list */
600     InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList,
601                    (PLIST_ENTRY)&ThreadInfo.ListEntry);
602 
603     /* Signal initialization completion */
604     InterlockedExchange((PLONG)Parameter,
605                          1);
606 
607     for (;;)
608     {
609         Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
610 
611 Wait:
612         do
613         {
614             /* Perform an alertable wait, the work items are going to be executed as APCs */
615             Status = NtDelayExecution(TRUE,
616                                       &Timeout);
617 
618             /* Loop as long as we executed an APC */
619         } while (Status != STATUS_SUCCESS);
620 
621         /* We timed out, let's see if we're allowed to terminate */
622         Terminate = FALSE;
623 
624         Status = RtlEnterCriticalSection(&ThreadPoolLock);
625         if (NT_SUCCESS(Status))
626         {
627             if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
628             {
629                 /* This thread is supposed to be persistent. Don't terminate! */
630                 RtlLeaveCriticalSection(&ThreadPoolLock);
631 
632                 Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL;
633                 goto Wait;
634             }
635 
636             /* FIXME - figure out an effective method to determine if it's appropriate to
637                        lower the number of threads. For now let's always terminate if there's
638                        at least one thread and no queued items. */
639             Terminate = (*((volatile LONG*)&ThreadPoolIOWorkerThreads) - *((volatile LONG*)&ThreadPoolIOWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) &&
640                         (*((volatile LONG*)&ThreadPoolIOWorkerThreadsRequests) == 0);
641 
642             if (Terminate)
643             {
644                 /* Prevent termination as long as IO is pending */
645                 Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle);
646             }
647 
648             if (Terminate)
649             {
650                 /* Rundown the thread and unlink it from the list */
651                 InterlockedDecrement(&ThreadPoolIOWorkerThreads);
652                 RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry);
653             }
654 
655             RtlLeaveCriticalSection(&ThreadPoolLock);
656 
657             if (Terminate)
658             {
659                 /* Break the infinite loop and terminate */
660                 Status = STATUS_SUCCESS;
661                 break;
662             }
663         }
664         else
665         {
666             DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status);
667             break;
668         }
669     }
670 
671     NtClose(ThreadInfo.ThreadHandle);
672     RtlpExitThreadFunc(Status);
673     return 0;
674 }
675 
676 static ULONG
677 NTAPI
RtlpWorkerThreadProc(IN PVOID Parameter)678 RtlpWorkerThreadProc(IN PVOID Parameter)
679 {
680     LARGE_INTEGER Timeout;
681     BOOLEAN Terminate;
682     PVOID SystemArgument2;
683     IO_STATUS_BLOCK IoStatusBlock;
684     ULONG TimeoutCount = 0;
685     PKNORMAL_ROUTINE ApcRoutine;
686     NTSTATUS Status = STATUS_SUCCESS;
687 
688     if (InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS)
689     {
690         /* Signal initialization completion */
691         InterlockedExchange((PLONG)Parameter,
692                              1);
693 
694         /* Oops, too many worker threads... */
695         RtlpExitThreadFunc(Status);
696         return 0;
697     }
698 
699     /* Signal initialization completion */
700     InterlockedExchange((PLONG)Parameter,
701                          1);
702 
703     for (;;)
704     {
705         Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
706 
707         /* Dequeue a completion message */
708         Status = NtRemoveIoCompletion(ThreadPoolCompletionPort,
709                                       (PVOID*)&ApcRoutine,
710                                       &SystemArgument2,
711                                       &IoStatusBlock,
712                                       &Timeout);
713 
714         if (Status == STATUS_SUCCESS)
715         {
716             TimeoutCount = 0;
717 
718             _SEH2_TRY
719             {
720                 /* Call the APC routine */
721                 ApcRoutine(NULL,
722                            (PVOID)IoStatusBlock.Information,
723                            SystemArgument2);
724             }
725             _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
726             {
727                 (void)0;
728             }
729             _SEH2_END;
730         }
731         else
732         {
733             Terminate = FALSE;
734 
735             if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock)))
736                 continue;
737 
738             /* FIXME - this should be optimized, check if there's requests, etc */
739 
740             if (Status == STATUS_TIMEOUT)
741             {
742                 /* FIXME - we might want to optimize this */
743                 if (TimeoutCount++ > 2 &&
744                     *((volatile LONG*)&ThreadPoolWorkerThreads) - *((volatile LONG*)&ThreadPoolWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD)
745                 {
746                     Terminate = TRUE;
747                 }
748             }
749             else
750                 Terminate = TRUE;
751 
752             RtlLeaveCriticalSection(&ThreadPoolLock);
753 
754             if (Terminate)
755             {
756                 /* Prevent termination as long as IO is pending */
757                 Terminate = !RtlpIsIoPending(NULL);
758             }
759 
760             if (Terminate)
761             {
762                 InterlockedDecrement(&ThreadPoolWorkerThreads);
763                 Status = STATUS_SUCCESS;
764                 break;
765             }
766         }
767     }
768 
769     RtlpExitThreadFunc(Status);
770     return 0;
771 
772 }
773 
774 /*
775  * @implemented
776  */
777 NTSTATUS
778 NTAPI
RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,IN PVOID Context OPTIONAL,IN ULONG Flags)779 RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,
780                  IN PVOID Context  OPTIONAL,
781                  IN ULONG Flags)
782 {
783     LONG FreeWorkers;
784     NTSTATUS Status;
785     PRTLP_WORKITEM WorkItem;
786 
787     DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags);
788 
789     /* Initialize the thread pool if not already initialized */
790     if (!IsThreadPoolInitialized())
791     {
792         Status = RtlpInitializeThreadPool();
793 
794         if (!NT_SUCCESS(Status))
795             return Status;
796     }
797 
798     /* Allocate a work item */
799     WorkItem = RtlAllocateHeap(RtlGetProcessHeap(),
800                                0,
801                                sizeof(RTLP_WORKITEM));
802     if (WorkItem == NULL)
803         return STATUS_NO_MEMORY;
804 
805     WorkItem->Function = Function;
806     WorkItem->Context = Context;
807     WorkItem->Flags = Flags;
808 
809     if (Flags & WT_TRANSFER_IMPERSONATION)
810     {
811         Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle);
812 
813         if (!NT_SUCCESS(Status))
814         {
815             DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status);
816             goto Cleanup;
817         }
818     }
819     else
820         WorkItem->TokenHandle = NULL;
821 
822     Status = RtlEnterCriticalSection(&ThreadPoolLock);
823     if (NT_SUCCESS(Status))
824     {
825         if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD | WT_EXECUTEINPERSISTENTIOTHREAD))
826         {
827             /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
828 
829             FreeWorkers = ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsLongRequests;
830 
831             if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION)) == WT_EXECUTELONGFUNCTION) &&
832                 PersistentIoThread != NULL)
833             {
834                 /* We shouldn't queue a long function into the persistent IO thread */
835                 FreeWorkers--;
836             }
837 
838             /* See if it's a good idea to grow the pool */
839             if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS &&
840                 (FreeWorkers <= 0 || ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
841             {
842                 /* Grow the thread pool */
843                 Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc);
844 
845                 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolIOWorkerThreads) != 0)
846                 {
847                     /* We failed to create the thread, but there's at least one there so
848                        we can at least queue the request */
849                     Status = STATUS_SUCCESS;
850                 }
851             }
852 
853             if (NT_SUCCESS(Status))
854             {
855                 /* Queue a IO worker thread */
856                 Status = RtlpQueueIoWorkerThread(WorkItem);
857             }
858         }
859         else
860         {
861             /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
862 
863             FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests;
864 
865             /* See if it's a good idea to grow the pool */
866             if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS &&
867                 (FreeWorkers <= 0 || ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
868             {
869                 /* Grow the thread pool */
870                 Status = RtlpStartWorkerThread(RtlpWorkerThreadProc);
871 
872                 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolWorkerThreads) != 0)
873                 {
874                     /* We failed to create the thread, but there's at least one there so
875                        we can at least queue the request */
876                     Status = STATUS_SUCCESS;
877                 }
878             }
879 
880             if (NT_SUCCESS(Status))
881             {
882                 /* Queue a normal worker thread */
883                 Status = RtlpQueueWorkerThread(WorkItem);
884             }
885         }
886 
887         RtlLeaveCriticalSection(&ThreadPoolLock);
888     }
889 
890     if (!NT_SUCCESS(Status))
891     {
892         if (WorkItem->TokenHandle != NULL)
893         {
894             NtClose(WorkItem->TokenHandle);
895         }
896 
897 Cleanup:
898         RtlFreeHeap(RtlGetProcessHeap(),
899                     0,
900                     WorkItem);
901     }
902 
903     return Status;
904 }
905 
906 /*
907  * @unimplemented
908  */
909 NTSTATUS
910 NTAPI
RtlSetIoCompletionCallback(IN HANDLE FileHandle,IN PIO_APC_ROUTINE Callback,IN ULONG Flags)911 RtlSetIoCompletionCallback(IN HANDLE FileHandle,
912                            IN PIO_APC_ROUTINE Callback,
913                            IN ULONG Flags)
914 {
915     IO_STATUS_BLOCK IoStatusBlock;
916     FILE_COMPLETION_INFORMATION FileCompletionInfo;
917     NTSTATUS Status;
918 
919     DPRINT("RtlSetIoCompletionCallback(0x%p, 0x%p, 0x%x)\n", FileHandle, Callback, Flags);
920 
921     /* Initialize the thread pool if not already initialized */
922     if (!IsThreadPoolInitialized())
923     {
924         Status = RtlpInitializeThreadPool();
925         if (!NT_SUCCESS(Status))
926             return Status;
927     }
928 
929     FileCompletionInfo.Port = ThreadPoolCompletionPort;
930     FileCompletionInfo.Key = (PVOID)Callback;
931 
932     Status = NtSetInformationFile(FileHandle,
933                                   &IoStatusBlock,
934                                   &FileCompletionInfo,
935                                   sizeof(FileCompletionInfo),
936                                   FileCompletionInformation);
937 
938     return Status;
939 }
940 
941 /*
942  * @implemented
943  */
944 NTSTATUS
945 NTAPI
RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread,IN PRTL_EXIT_POOL_THREAD ExitPoolThread)946 RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread,
947                           IN PRTL_EXIT_POOL_THREAD ExitPoolThread)
948 {
949     RtlpStartThreadFunc = StartPoolThread;
950     RtlpExitThreadFunc = ExitPoolThread;
951     return STATUS_SUCCESS;
952 }
953