xref: /reactos/ntoskrnl/ex/work.c (revision cf1a95a5)
1 /*
2  * COPYRIGHT:          See COPYING in the top level directory
3  * PROJECT:            ReactOS Kernel
4  * FILE:               ntoskrnl/ex/work.c
5  * PURPOSE:            Manage system work queues and worker threads
6  * PROGRAMMER:         Alex Ionescu (alex@relsoft.net)
7  */
8 
9 /* INCLUDES ******************************************************************/
10 
11 #include <ntoskrnl.h>
12 #define NDEBUG
13 #include <debug.h>
14 
15 /* DATA **********************************************************************/
16 
17 /* Number of worker threads for each Queue */
18 #define EX_HYPERCRITICAL_WORK_THREADS               1
19 #define EX_DELAYED_WORK_THREADS                     3
20 #define EX_CRITICAL_WORK_THREADS                    5
21 
22 /* Magic flag for dynamic worker threads */
23 #define EX_DYNAMIC_WORK_THREAD                      0x80000000
24 
25 /* Worker thread priority increments (added to base priority) */
26 #define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT   7
27 #define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT        5
28 #define EX_DELAYED_QUEUE_PRIORITY_INCREMENT         4
29 
30 /* The actual worker queue array */
31 EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
32 
33 /* Accounting of the total threads and registry hacked threads */
34 ULONG ExCriticalWorkerThreads;
35 ULONG ExDelayedWorkerThreads;
36 ULONG ExpAdditionalCriticalWorkerThreads;
37 ULONG ExpAdditionalDelayedWorkerThreads;
38 
39 /* Future support for stack swapping worker threads */
40 BOOLEAN ExpWorkersCanSwap;
41 LIST_ENTRY ExpWorkerListHead;
42 FAST_MUTEX ExpWorkerSwapinMutex;
43 
44 /* The worker balance set manager events */
45 KEVENT ExpThreadSetManagerEvent;
46 KEVENT ExpThreadSetManagerShutdownEvent;
47 
48 /* Thread pointers for future worker thread shutdown support */
49 PETHREAD ExpWorkerThreadBalanceManagerPtr;
50 PETHREAD ExpLastWorkerThread;
51 
52 /* PRIVATE FUNCTIONS *********************************************************/
53 
54 /*++
55  * @name ExpWorkerThreadEntryPoint
56  *
57  *     The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
58  *     worker thread created by teh system.
59  *
60  * @param Context
61  *        Contains the work queue type masked with a flag specifing whether the
62  *        thread is dynamic or not.
63  *
64  * @return None.
65  *
66  * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
67  *          while a static thread will never timeout.
68  *
69  *          Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
70  *          active impersonation info, and must not have disabled APCs.
71  *
72  *          NB: We will re-enable APCs for broken threads but all other cases
73  *              will generate a bugcheck.
74  *
75  *--*/
76 VOID
77 NTAPI
ExpWorkerThreadEntryPoint(IN PVOID Context)78 ExpWorkerThreadEntryPoint(IN PVOID Context)
79 {
80     PWORK_QUEUE_ITEM WorkItem;
81     PLIST_ENTRY QueueEntry;
82     WORK_QUEUE_TYPE WorkQueueType;
83     PEX_WORK_QUEUE WorkQueue;
84     LARGE_INTEGER Timeout;
85     PLARGE_INTEGER TimeoutPointer = NULL;
86     PETHREAD Thread = PsGetCurrentThread();
87     KPROCESSOR_MODE WaitMode;
88     EX_QUEUE_WORKER_INFO OldValue, NewValue;
89 
90     /* Check if this is a dyamic thread */
91     if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD)
92     {
93         /* It is, which means we will eventually time out after 10 minutes */
94         Timeout.QuadPart = Int32x32To64(10, -10000000 * 60);
95         TimeoutPointer = &Timeout;
96     }
97 
98     /* Get Queue Type and Worker Queue */
99     WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context &
100                                       ~EX_DYNAMIC_WORK_THREAD);
101     WorkQueue = &ExWorkerQueue[WorkQueueType];
102 
103     /* Select the wait mode */
104     WaitMode = (UCHAR)WorkQueue->Info.WaitMode;
105 
106     /* Nobody should have initialized this yet, do it now */
107     ASSERT(Thread->ExWorkerCanWaitUser == 0);
108     if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE;
109 
110     /* If we shouldn't swap, disable that feature */
111     if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE);
112 
113     /* Set the worker flags */
114     do
115     {
116         /* Check if the queue is being disabled */
117         if (WorkQueue->Info.QueueDisabled)
118         {
119             /* Re-enable stack swapping and kill us */
120             KeSetKernelStackSwapEnable(TRUE);
121             PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
122         }
123 
124         /* Increase the worker count */
125         OldValue = WorkQueue->Info;
126         NewValue = OldValue;
127         NewValue.WorkerCount++;
128     }
129     while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
130                                       *(PLONG)&NewValue,
131                                       *(PLONG)&OldValue) != *(PLONG)&OldValue);
132 
133     /* Success, you are now officially a worker thread! */
134     Thread->ActiveExWorker = TRUE;
135 
136     /* Loop forever */
137 ProcessLoop:
138     for (;;)
139     {
140         /* Wait for something to happen on the queue */
141         QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue,
142                                    WaitMode,
143                                    TimeoutPointer);
144 
145         /* Check if we timed out and quit this loop in that case */
146         if ((NTSTATUS)(ULONG_PTR)QueueEntry == STATUS_TIMEOUT) break;
147 
148         /* Increment Processed Work Items */
149         InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
150 
151         /* Get the Work Item */
152         WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
153 
154         /* Make sure nobody is trying to play smart with us */
155         ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress);
156 
157         /* Call the Worker Routine */
158         WorkItem->WorkerRoutine(WorkItem->Parameter);
159 
160         /* Make sure APCs are not disabled */
161         if (Thread->Tcb.CombinedApcDisable != 0)
162         {
163             /* We're nice and do it behind your back */
164             DPRINT1("Warning: Broken Worker Thread: %p %p %p came back "
165                     "with APCs disabled!\n",
166                     WorkItem->WorkerRoutine,
167                     WorkItem->Parameter,
168                     WorkItem);
169             ASSERT(Thread->Tcb.CombinedApcDisable == 0);
170             Thread->Tcb.CombinedApcDisable = 0;
171         }
172 
173         /* Make sure it returned at right IRQL */
174         if (KeGetCurrentIrql() != PASSIVE_LEVEL)
175         {
176             /* It didn't, bugcheck! */
177             KeBugCheckEx(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
178                          (ULONG_PTR)WorkItem->WorkerRoutine,
179                          KeGetCurrentIrql(),
180                          (ULONG_PTR)WorkItem->Parameter,
181                          (ULONG_PTR)WorkItem);
182         }
183 
184         /* Make sure it returned with Impersionation Disabled */
185         if (Thread->ActiveImpersonationInfo)
186         {
187             /* It didn't, bugcheck! */
188             KeBugCheckEx(IMPERSONATING_WORKER_THREAD,
189                          (ULONG_PTR)WorkItem->WorkerRoutine,
190                          (ULONG_PTR)WorkItem->Parameter,
191                          (ULONG_PTR)WorkItem,
192                          0);
193         }
194     }
195 
196     /* This is a dynamic thread. Terminate it unless IRPs are pending */
197     if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop;
198 
199     /* Don't terminate it if the queue is disabled either */
200     if (WorkQueue->Info.QueueDisabled) goto ProcessLoop;
201 
202     /* Set the worker flags */
203     do
204     {
205         /* Decrease the worker count */
206         OldValue = WorkQueue->Info;
207         NewValue = OldValue;
208         NewValue.WorkerCount--;
209     }
210     while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
211                                       *(PLONG)&NewValue,
212                                       *(PLONG)&OldValue) != *(PLONG)&OldValue);
213 
214     /* Decrement dynamic thread count */
215     InterlockedDecrement(&WorkQueue->DynamicThreadCount);
216 
217     /* We're not a worker thread anymore */
218     Thread->ActiveExWorker = FALSE;
219 
220     /* Re-enable the stack swap */
221     KeSetKernelStackSwapEnable(TRUE);
222     return;
223 }
224 
225 /*++
226  * @name ExpCreateWorkerThread
227  *
228  *     The ExpCreateWorkerThread routine creates a new worker thread for the
229  *     specified queue.
230  *
231  * @param QueueType
232  *        Type of the queue to use for this thread. Valid values are:
233  *          - DelayedWorkQueue
234  *          - CriticalWorkQueue
235  *          - HyperCriticalWorkQueue
236  *
237  * @param Dynamic
238  *        Specifies whether or not this thread is a dynamic thread.
239  *
240  * @return None.
241  *
242  * @remarks HyperCritical work threads run at priority 7; Critical work threads
243  *          run at priority 5, and delayed work threads run at priority 4.
244  *
245  *          This, worker threads cannot pre-empty a normal user-mode thread.
246  *
247  *--*/
248 VOID
249 NTAPI
ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,IN BOOLEAN Dynamic)250 ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,
251                       IN BOOLEAN Dynamic)
252 {
253     PETHREAD Thread;
254     HANDLE hThread;
255     ULONG Context;
256     KPRIORITY Priority;
257     NTSTATUS Status;
258 
259     /* Check if this is going to be a dynamic thread */
260     Context = WorkQueueType;
261 
262     /* Add the dynamic mask */
263     if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
264 
265     /* Create the System Thread */
266     Status = PsCreateSystemThread(&hThread,
267                                   THREAD_ALL_ACCESS,
268                                   NULL,
269                                   NULL,
270                                   NULL,
271                                   ExpWorkerThreadEntryPoint,
272                                   UlongToPtr(Context));
273     if (!NT_SUCCESS(Status))
274     {
275         /* Well... */
276         DPRINT1("Failed to create worker thread: 0x%08x\n", Status);
277         return;
278     }
279 
280     /* If the thread is dynamic */
281     if (Dynamic)
282     {
283         /* Increase the count */
284         InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount);
285     }
286 
287     /* Set the priority */
288     if (WorkQueueType == DelayedWorkQueue)
289     {
290         /* Priority == 4 */
291         Priority = EX_DELAYED_QUEUE_PRIORITY_INCREMENT;
292     }
293     else if (WorkQueueType == CriticalWorkQueue)
294     {
295         /* Priority == 5 */
296         Priority = EX_CRITICAL_QUEUE_PRIORITY_INCREMENT;
297     }
298     else
299     {
300         /* Priority == 7 */
301         Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT;
302     }
303 
304     /* Get the Thread */
305     ObReferenceObjectByHandle(hThread,
306                               THREAD_SET_INFORMATION,
307                               PsThreadType,
308                               KernelMode,
309                               (PVOID*)&Thread,
310                               NULL);
311 
312     /* Set the Priority */
313     KeSetBasePriorityThread(&Thread->Tcb, Priority);
314 
315     /* Dereference and close handle */
316     ObDereferenceObject(Thread);
317     ObCloseHandle(hThread, KernelMode);
318 }
319 
320 /*++
321  * @name ExpDetectWorkerThreadDeadlock
322  *
323  *     The ExpDetectWorkerThreadDeadlock routine checks every queue and creates
324  *     a dynamic thread if the queue seems to be deadlocked.
325  *
326  * @param None
327  *
328  * @return None.
329  *
330  * @remarks The algorithm for deciding if a new thread must be created is based
331  *          on whether the queue has processed no new items in the last second,
332  *          and new items are still enqueued.
333  *
334  *--*/
335 VOID
336 NTAPI
ExpDetectWorkerThreadDeadlock(VOID)337 ExpDetectWorkerThreadDeadlock(VOID)
338 {
339     ULONG i;
340     PEX_WORK_QUEUE Queue;
341 
342     /* Loop the 3 queues */
343     for (i = 0; i < MaximumWorkQueue; i++)
344     {
345         /* Get the queue */
346         Queue = &ExWorkerQueue[i];
347         ASSERT(Queue->DynamicThreadCount <= 16);
348 
349         /* Check if stuff is on the queue that still is unprocessed */
350         if ((Queue->QueueDepthLastPass) &&
351             (Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) &&
352             (Queue->DynamicThreadCount < 16))
353         {
354             /* Stuff is still on the queue and nobody did anything about it */
355             DPRINT1("EX: Work Queue Deadlock detected: %lu\n", i);
356             ExpCreateWorkerThread(i, TRUE);
357             DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount);
358         }
359 
360         /* Update our data */
361         Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed;
362         Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue);
363     }
364 }
365 
366 /*++
367  * @name ExpCheckDynamicThreadCount
368  *
369  *     The ExpCheckDynamicThreadCount routine checks every queue and creates
370  *     a dynamic thread if the queue requires one.
371  *
372  * @param None
373  *
374  * @return None.
375  *
376  * @remarks The algorithm for deciding if a new thread must be created is
377  *          documented in the ExQueueWorkItem routine.
378  *
379  *--*/
380 VOID
381 NTAPI
ExpCheckDynamicThreadCount(VOID)382 ExpCheckDynamicThreadCount(VOID)
383 {
384     ULONG i;
385     PEX_WORK_QUEUE Queue;
386 
387     /* Loop the 3 queues */
388     for (i = 0; i < MaximumWorkQueue; i++)
389     {
390         /* Get the queue */
391         Queue = &ExWorkerQueue[i];
392 
393         /* Check if still need a new thread. See ExQueueWorkItem */
394         if ((Queue->Info.MakeThreadsAsNecessary) &&
395             (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) &&
396             (Queue->WorkerQueue.CurrentCount <
397              Queue->WorkerQueue.MaximumCount) &&
398             (Queue->DynamicThreadCount < 16))
399         {
400             /* Create a new thread */
401             DPRINT1("EX: Creating new dynamic thread as requested\n");
402             ExpCreateWorkerThread(i, TRUE);
403         }
404     }
405 }
406 
407 /*++
408  * @name ExpWorkerThreadBalanceManager
409  *
410  *     The ExpWorkerThreadBalanceManager routine is the entrypoint for the
411  *     worker thread balance set manager.
412  *
413  * @param Context
414  *        Unused.
415  *
416  * @return None.
417  *
418  * @remarks The worker thread balance set manager listens every second, but can
419  *          also be woken up by an event when a new thread is needed, or by the
420  *          special shutdown event. This thread runs at priority 7.
421  *
422  *          This routine must run at IRQL == PASSIVE_LEVEL.
423  *
424  *--*/
425 VOID
426 NTAPI
ExpWorkerThreadBalanceManager(IN PVOID Context)427 ExpWorkerThreadBalanceManager(IN PVOID Context)
428 {
429     KTIMER Timer;
430     LARGE_INTEGER Timeout;
431     NTSTATUS Status;
432     PVOID WaitEvents[3];
433     PAGED_CODE();
434     UNREFERENCED_PARAMETER(Context);
435 
436     /* Raise our priority above all other worker threads */
437     KeSetBasePriorityThread(KeGetCurrentThread(),
438                             EX_CRITICAL_QUEUE_PRIORITY_INCREMENT + 1);
439 
440     /* Setup the timer */
441     KeInitializeTimer(&Timer);
442     Timeout.QuadPart = Int32x32To64(-1, 10000000);
443 
444     /* We'll wait on the periodic timer and also the emergency event */
445     WaitEvents[0] = &Timer;
446     WaitEvents[1] = &ExpThreadSetManagerEvent;
447     WaitEvents[2] = &ExpThreadSetManagerShutdownEvent;
448 
449     /* Start wait loop */
450     for (;;)
451     {
452         /* Wait for the timer */
453         KeSetTimer(&Timer, Timeout, NULL);
454         Status = KeWaitForMultipleObjects(3,
455                                           WaitEvents,
456                                           WaitAny,
457                                           Executive,
458                                           KernelMode,
459                                           FALSE,
460                                           NULL,
461                                           NULL);
462         if (Status == 0)
463         {
464             /* Our timer expired. Check for deadlocks */
465             ExpDetectWorkerThreadDeadlock();
466         }
467         else if (Status == 1)
468         {
469             /* Someone notified us, verify if we should create a new thread */
470             ExpCheckDynamicThreadCount();
471         }
472         else if (Status == 2)
473         {
474             /* We are shutting down. Cancel the timer */
475             DPRINT1("System shutdown\n");
476             KeCancelTimer(&Timer);
477 
478             /* Make sure we have a final thread */
479             ASSERT(ExpLastWorkerThread);
480 
481             /* Wait for it */
482             KeWaitForSingleObject(ExpLastWorkerThread,
483                                   Executive,
484                                   KernelMode,
485                                   FALSE,
486                                   NULL);
487 
488             /* Dereference it and kill us */
489             ObDereferenceObject(ExpLastWorkerThread);
490             PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
491         }
492 
493         /*
494          * If WinDBG wants to attach or kill a user-mode process, and/or
495          * page-in an address region, queue a debugger worker thread.
496          */
497         if (ExpDebuggerWork == WinKdWorkerStart)
498         {
499              ExInitializeWorkItem(&ExpDebuggerWorkItem, ExpDebuggerWorker, NULL);
500              ExpDebuggerWork = WinKdWorkerInitialized;
501              ExQueueWorkItem(&ExpDebuggerWorkItem, DelayedWorkQueue);
502         }
503     }
504 }
505 
506 /*++
507  * @name ExpInitializeWorkerThreads
508  *
509  *     The ExpInitializeWorkerThreads routine initializes worker thread and
510  *     work queue support.
511  *
512  * @param None.
513  *
514  * @return None.
515  *
516  * @remarks This routine is only called once during system initialization.
517  *
518  *--*/
519 CODE_SEG("INIT")
520 VOID
521 NTAPI
ExpInitializeWorkerThreads(VOID)522 ExpInitializeWorkerThreads(VOID)
523 {
524     ULONG WorkQueueType;
525     ULONG CriticalThreads, DelayedThreads;
526     HANDLE ThreadHandle;
527     PETHREAD Thread;
528     ULONG i;
529     NTSTATUS Status;
530 
531     /* Setup the stack swap support */
532     ExInitializeFastMutex(&ExpWorkerSwapinMutex);
533     InitializeListHead(&ExpWorkerListHead);
534     ExpWorkersCanSwap = TRUE;
535 
536     /* Set the number of critical and delayed threads. We shouldn't hardcode */
537     DelayedThreads = EX_DELAYED_WORK_THREADS;
538     CriticalThreads = EX_CRITICAL_WORK_THREADS;
539 
540     /* Protect against greedy registry modifications */
541     ExpAdditionalDelayedWorkerThreads =
542         min(ExpAdditionalDelayedWorkerThreads, 16);
543     ExpAdditionalCriticalWorkerThreads =
544         min(ExpAdditionalCriticalWorkerThreads, 16);
545 
546     /* Calculate final count */
547     DelayedThreads += ExpAdditionalDelayedWorkerThreads;
548     CriticalThreads += ExpAdditionalCriticalWorkerThreads;
549 
550     /* Initialize the Array */
551     for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++)
552     {
553         /* Clear the structure and initialize the queue */
554         RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
555         KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
556     }
557 
558     /* Dynamic threads are only used for the critical queue */
559     ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE;
560 
561     /* Initialize the balance set manager events */
562     KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE);
563     KeInitializeEvent(&ExpThreadSetManagerShutdownEvent,
564                       NotificationEvent,
565                       FALSE);
566 
567     /* Create the built-in worker threads for the critical queue */
568     for (i = 0; i < CriticalThreads; i++)
569     {
570         /* Create the thread */
571         ExpCreateWorkerThread(CriticalWorkQueue, FALSE);
572         ExCriticalWorkerThreads++;
573     }
574 
575     /* Create the built-in worker threads for the delayed queue */
576     for (i = 0; i < DelayedThreads; i++)
577     {
578         /* Create the thread */
579         ExpCreateWorkerThread(DelayedWorkQueue, FALSE);
580         ExDelayedWorkerThreads++;
581     }
582 
583     /* Create the built-in worker thread for the hypercritical queue */
584     ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE);
585 
586     /* Create the balance set manager thread */
587     Status = PsCreateSystemThread(&ThreadHandle,
588                                   THREAD_ALL_ACCESS,
589                                   NULL,
590                                   0,
591                                   NULL,
592                                   ExpWorkerThreadBalanceManager,
593                                   NULL);
594     if (!NT_SUCCESS(Status))
595     {
596         KeBugCheckEx(PHASE1_INITIALIZATION_FAILED, Status, 0, 0, 0);
597     }
598 
599     /* Get a pointer to it for the shutdown process */
600     ObReferenceObjectByHandle(ThreadHandle,
601                               THREAD_ALL_ACCESS,
602                               NULL,
603                               KernelMode,
604                               (PVOID*)&Thread,
605                               NULL);
606     ExpWorkerThreadBalanceManagerPtr = Thread;
607 
608     /* Close the handle and return */
609     ObCloseHandle(ThreadHandle, KernelMode);
610 }
611 
612 VOID
613 NTAPI
ExpSetSwappingKernelApc(IN PKAPC Apc,OUT PKNORMAL_ROUTINE * NormalRoutine,IN OUT PVOID * NormalContext,IN OUT PVOID * SystemArgument1,IN OUT PVOID * SystemArgument2)614 ExpSetSwappingKernelApc(IN PKAPC Apc,
615                         OUT PKNORMAL_ROUTINE *NormalRoutine,
616                         IN OUT PVOID *NormalContext,
617                         IN OUT PVOID *SystemArgument1,
618                         IN OUT PVOID *SystemArgument2)
619 {
620     PBOOLEAN AllowSwap;
621     PKEVENT Event = (PKEVENT)*SystemArgument1;
622 
623     /* Make sure it's an active worker */
624     if (PsGetCurrentThread()->ActiveExWorker)
625     {
626         /* Read the setting from the context flag */
627         AllowSwap = (PBOOLEAN)NormalContext;
628         KeSetKernelStackSwapEnable(*AllowSwap);
629     }
630 
631     /* Let caller know that we're done */
632     KeSetEvent(Event, 0, FALSE);
633 }
634 
635 VOID
636 NTAPI
ExSwapinWorkerThreads(IN BOOLEAN AllowSwap)637 ExSwapinWorkerThreads(IN BOOLEAN AllowSwap)
638 {
639     KEVENT Event;
640     PETHREAD CurrentThread = PsGetCurrentThread(), Thread;
641     PEPROCESS Process = PsInitialSystemProcess;
642     KAPC Apc;
643     PAGED_CODE();
644 
645     /* Initialize an event so we know when we're done */
646     KeInitializeEvent(&Event, NotificationEvent, FALSE);
647 
648     /* Lock this routine */
649     ExAcquireFastMutex(&ExpWorkerSwapinMutex);
650 
651     /* New threads cannot swap anymore */
652     ExpWorkersCanSwap = AllowSwap;
653 
654     /* Loop all threads in the system process */
655     Thread = PsGetNextProcessThread(Process, NULL);
656     while (Thread)
657     {
658         /* Skip threads with explicit permission to do this */
659         if (Thread->ExWorkerCanWaitUser) goto Next;
660 
661         /* Check if we reached ourselves */
662         if (Thread == CurrentThread)
663         {
664             /* Do it inline */
665             KeSetKernelStackSwapEnable(AllowSwap);
666         }
667         else
668         {
669             /* Queue an APC */
670             KeInitializeApc(&Apc,
671                             &Thread->Tcb,
672                             InsertApcEnvironment,
673                             ExpSetSwappingKernelApc,
674                             NULL,
675                             NULL,
676                             KernelMode,
677                             &AllowSwap);
678             if (KeInsertQueueApc(&Apc, &Event, NULL, 3))
679             {
680                 /* Wait for the APC to run */
681                 KeWaitForSingleObject(&Event, Executive, KernelMode, FALSE, NULL);
682                 KeClearEvent(&Event);
683             }
684         }
685 
686         /* Next thread */
687 Next:
688         Thread = PsGetNextProcessThread(Process, Thread);
689     }
690 
691     /* Release the lock */
692     ExReleaseFastMutex(&ExpWorkerSwapinMutex);
693 }
694 
695 /* PUBLIC FUNCTIONS **********************************************************/
696 
697 /*++
698  * @name ExQueueWorkItem
699  * @implemented NT4
700  *
701  *     The ExQueueWorkItem routine acquires rundown protection for
702  *     the specified descriptor.
703  *
704  * @param WorkItem
705  *        Pointer to an initialized Work Queue Item structure. This structure
706  *        must be located in nonpaged pool memory.
707  *
708  * @param QueueType
709  *        Type of the queue to use for this item. Can be one of the following:
710  *          - DelayedWorkQueue
711  *          - CriticalWorkQueue
712  *          - HyperCriticalWorkQueue
713  *
714  * @return None.
715  *
716  * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
717  *
718  *          Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
719  *
720  *--*/
721 VOID
722 NTAPI
ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem,IN WORK_QUEUE_TYPE QueueType)723 ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem,
724                 IN WORK_QUEUE_TYPE QueueType)
725 {
726     PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType];
727     ASSERT(QueueType < MaximumWorkQueue);
728     ASSERT(WorkItem->List.Flink == NULL);
729 
730     /* Don't try to trick us */
731     if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress)
732     {
733         /* Bugcheck the system */
734         KeBugCheckEx(WORKER_INVALID,
735                      1,
736                      (ULONG_PTR)WorkItem,
737                      (ULONG_PTR)WorkItem->WorkerRoutine,
738                      0);
739     }
740 
741     /* Insert the Queue */
742     KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List);
743     ASSERT(!WorkQueue->Info.QueueDisabled);
744 
745     /*
746      * Check if we need a new thread. Our decision is as follows:
747      *  - This queue type must support Dynamic Threads (duh!)
748      *  - It actually has to have unprocessed items
749      *  - We have CPUs which could be handling another thread
750      *  - We haven't abused our usage of dynamic threads.
751      */
752     if ((WorkQueue->Info.MakeThreadsAsNecessary) &&
753         (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) &&
754         (WorkQueue->WorkerQueue.CurrentCount <
755          WorkQueue->WorkerQueue.MaximumCount) &&
756         (WorkQueue->DynamicThreadCount < 16))
757     {
758         /* Let the balance manager know about it */
759         DPRINT1("Requesting a new thread. CurrentCount: %lu. MaxCount: %lu\n",
760                 WorkQueue->WorkerQueue.CurrentCount,
761                 WorkQueue->WorkerQueue.MaximumCount);
762         KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE);
763     }
764 }
765 
766 /* EOF */
767