xref: /reactos/ntoskrnl/ke/queue.c (revision 34593d93)
1 /*
2  * PROJECT:         ReactOS Kernel
3  * LICENSE:         GPL - See COPYING in the top level directory
4  * FILE:            ntoskrnl/ke/queue.c
5  * PURPOSE:         Implements kernel queues
6  * PROGRAMMERS:     Alex Ionescu (alex.ionescu@reactos.org)
7  *                  Gunnar Dalsnes
8  *                  Eric Kohl
9  */
10 
11 /* INCLUDES ******************************************************************/
12 
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <debug.h>
16 
17 /* PRIVATE FUNCTIONS *********************************************************/
18 
19 /*
20  * Called when a thread which has a queue entry is entering a wait state
21  */
22 VOID
23 FASTCALL
KiActivateWaiterQueue(IN PKQUEUE Queue)24 KiActivateWaiterQueue(IN PKQUEUE Queue)
25 {
26     PLIST_ENTRY QueueEntry;
27     PLIST_ENTRY WaitEntry;
28     PKWAIT_BLOCK WaitBlock;
29     PKTHREAD Thread;
30     ASSERT_QUEUE(Queue);
31 
32     /* Decrement the number of active threads */
33     Queue->CurrentCount--;
34 
35     /* Make sure the counts are OK */
36     if (Queue->CurrentCount < Queue->MaximumCount)
37     {
38         /* Get the Queue Entry */
39         QueueEntry = Queue->EntryListHead.Flink;
40 
41         /* Get the Wait Entry */
42         WaitEntry = Queue->Header.WaitListHead.Blink;
43 
44         /* Make sure that the Queue entries are not part of empty lists */
45         if ((WaitEntry != &Queue->Header.WaitListHead) &&
46             (QueueEntry != &Queue->EntryListHead))
47         {
48             /* Remove this entry */
49             RemoveEntryList(QueueEntry);
50             QueueEntry->Flink = NULL;
51 
52             /* Decrease the Signal State */
53             Queue->Header.SignalState--;
54 
55             /* Unwait the Thread */
56             WaitBlock = CONTAINING_RECORD(WaitEntry,
57                                           KWAIT_BLOCK,
58                                           WaitListEntry);
59             Thread = WaitBlock->Thread;
60             KiUnwaitThread(Thread, (LONG_PTR)QueueEntry, IO_NO_INCREMENT);
61         }
62     }
63 }
64 
65 /*
66  * Returns the previous number of entries in the queue
67  */
68 LONG
69 NTAPI
KiInsertQueue(IN PKQUEUE Queue,IN PLIST_ENTRY Entry,IN BOOLEAN Head)70 KiInsertQueue(IN PKQUEUE Queue,
71               IN PLIST_ENTRY Entry,
72               IN BOOLEAN Head)
73 {
74     ULONG InitialState;
75     PKTHREAD Thread = KeGetCurrentThread();
76     PKWAIT_BLOCK WaitBlock;
77     PLIST_ENTRY WaitEntry;
78     PKTIMER Timer;
79     ASSERT_QUEUE(Queue);
80 
81     /* Save the old state */
82     InitialState = Queue->Header.SignalState;
83 
84     /* Get the Entry */
85     WaitEntry = Queue->Header.WaitListHead.Blink;
86 
87     /*
88      * Why the KeGetCurrentThread()->Queue != Queue?
89      * KiInsertQueue might be called from an APC for the current thread.
90      * -Gunnar
91      */
92     if ((Queue->CurrentCount < Queue->MaximumCount) &&
93         (WaitEntry != &Queue->Header.WaitListHead) &&
94         ((Thread->Queue != Queue) ||
95          (Thread->WaitReason != WrQueue)))
96     {
97         /* Remove the wait entry */
98         RemoveEntryList(WaitEntry);
99 
100         /* Get the Wait Block and Thread */
101         WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
102         Thread = WaitBlock->Thread;
103 
104         /* Remove the queue from the thread's wait list */
105         Thread->WaitStatus = (LONG_PTR)Entry;
106         if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry);
107 
108         /* Increase the active threads and remove any wait reason */
109         Queue->CurrentCount++;
110         Thread->WaitReason = 0;
111 
112         /* Check if there's a Thread Timer */
113         Timer = &Thread->Timer;
114         if (Timer->Header.Inserted) KxRemoveTreeTimer(Timer);
115 
116         /* Reschedule the Thread */
117         KiReadyThread(Thread);
118     }
119     else
120     {
121         /* Increase the Entries */
122         Queue->Header.SignalState++;
123 
124         /* Check which mode we're using */
125         if (Head)
126         {
127             /* Insert in the head */
128             InsertHeadList(&Queue->EntryListHead, Entry);
129         }
130         else
131         {
132             /* Insert at the end */
133             InsertTailList(&Queue->EntryListHead, Entry);
134         }
135     }
136 
137     /* Return the previous state */
138     return InitialState;
139 }
140 
141 /* PUBLIC FUNCTIONS **********************************************************/
142 
143 /*
144  * @implemented
145  */
146 VOID
147 NTAPI
KeInitializeQueue(IN PKQUEUE Queue,IN ULONG Count OPTIONAL)148 KeInitializeQueue(IN PKQUEUE Queue,
149                   IN ULONG Count OPTIONAL)
150 {
151     /* Initialize the Header */
152     Queue->Header.Type = QueueObject;
153     Queue->Header.Abandoned = 0;
154     Queue->Header.Size = sizeof(KQUEUE) / sizeof(ULONG);
155     Queue->Header.SignalState = 0;
156     InitializeListHead(&(Queue->Header.WaitListHead));
157 
158     /* Initialize the Lists */
159     InitializeListHead(&Queue->EntryListHead);
160     InitializeListHead(&Queue->ThreadListHead);
161 
162     /* Set the Current and Maximum Count */
163     Queue->CurrentCount = 0;
164     Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
165 }
166 
167 /*
168  * @implemented
169  */
170 LONG
171 NTAPI
KeInsertHeadQueue(IN PKQUEUE Queue,IN PLIST_ENTRY Entry)172 KeInsertHeadQueue(IN PKQUEUE Queue,
173                   IN PLIST_ENTRY Entry)
174 {
175     LONG PreviousState;
176     KIRQL OldIrql;
177     ASSERT_QUEUE(Queue);
178     ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
179 
180     /* Lock the Dispatcher Database */
181     OldIrql = KiAcquireDispatcherLock();
182 
183     /* Insert the Queue */
184     PreviousState = KiInsertQueue(Queue, Entry, TRUE);
185 
186     /* Release the Dispatcher Lock */
187     KiReleaseDispatcherLock(OldIrql);
188 
189     /* Return previous State */
190     return PreviousState;
191 }
192 
193 /*
194  * @implemented
195  */
196 LONG
197 NTAPI
KeInsertQueue(IN PKQUEUE Queue,IN PLIST_ENTRY Entry)198 KeInsertQueue(IN PKQUEUE Queue,
199               IN PLIST_ENTRY Entry)
200 {
201     LONG PreviousState;
202     KIRQL OldIrql;
203     ASSERT_QUEUE(Queue);
204     ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
205 
206     /* Lock the Dispatcher Database */
207     OldIrql = KiAcquireDispatcherLock();
208 
209     /* Insert the Queue */
210     PreviousState = KiInsertQueue(Queue, Entry, FALSE);
211 
212     /* Release the Dispatcher Lock */
213     KiReleaseDispatcherLock(OldIrql);
214 
215     /* Return previous State */
216     return PreviousState;
217 }
218 
219 /*
220  * @implemented
221  *
222  * Returns number of entries in the queue
223  */
224 LONG
225 NTAPI
KeReadStateQueue(IN PKQUEUE Queue)226 KeReadStateQueue(IN PKQUEUE Queue)
227 {
228     /* Returns the Signal State */
229     ASSERT_QUEUE(Queue);
230     return Queue->Header.SignalState;
231 }
232 
233 /*
234  * @implemented
235  */
236 PLIST_ENTRY
237 NTAPI
KeRemoveQueue(IN PKQUEUE Queue,IN KPROCESSOR_MODE WaitMode,IN PLARGE_INTEGER Timeout OPTIONAL)238 KeRemoveQueue(IN PKQUEUE Queue,
239               IN KPROCESSOR_MODE WaitMode,
240               IN PLARGE_INTEGER Timeout OPTIONAL)
241 {
242     PLIST_ENTRY QueueEntry;
243     LONG_PTR Status;
244     PKTHREAD Thread = KeGetCurrentThread();
245     PKQUEUE PreviousQueue;
246     PKWAIT_BLOCK WaitBlock = &Thread->WaitBlock[0];
247     PKWAIT_BLOCK TimerBlock = &Thread->WaitBlock[TIMER_WAIT_BLOCK];
248     PKTIMER Timer = &Thread->Timer;
249     BOOLEAN Swappable;
250     PLARGE_INTEGER OriginalDueTime = Timeout;
251     LARGE_INTEGER DueTime = {{0}}, NewDueTime, InterruptTime;
252     ULONG Hand = 0;
253     ASSERT_QUEUE(Queue);
254     ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
255 
256     /* Check if the Lock is already held */
257     if (Thread->WaitNext)
258     {
259         /* It is, so next time don't do expect this */
260         Thread->WaitNext = FALSE;
261         KxQueueThreadWait();
262     }
263     else
264     {
265         /* Raise IRQL to synch, prepare the wait, then lock the database */
266         Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
267         KxQueueThreadWait();
268         KiAcquireDispatcherLockAtSynchLevel();
269     }
270 
271     /*
272      * This is needed so that we can set the new queue right here,
273      * before additional processing
274      */
275     PreviousQueue = Thread->Queue;
276     Thread->Queue = Queue;
277 
278     /* Check if this is a different queue */
279     if (Queue != PreviousQueue)
280     {
281         /* Get the current entry */
282         QueueEntry = &Thread->QueueListEntry;
283         if (PreviousQueue)
284         {
285             /* Remove from this list */
286             RemoveEntryList(QueueEntry);
287 
288             /* Wake the queue */
289             KiActivateWaiterQueue(PreviousQueue);
290         }
291 
292         /* Insert in this new Queue */
293         InsertTailList(&Queue->ThreadListHead, QueueEntry);
294     }
295     else
296     {
297         /* Same queue, decrement waiting threads */
298         Queue->CurrentCount--;
299     }
300 
301     /* Loop until the queue is processed */
302     while (TRUE)
303     {
304         /* Check if the counts are valid and if there is still a queued entry */
305         QueueEntry = Queue->EntryListHead.Flink;
306         if ((Queue->CurrentCount < Queue->MaximumCount) &&
307             (QueueEntry != &Queue->EntryListHead))
308         {
309             /* Decrease the number of entries */
310             Queue->Header.SignalState--;
311 
312             /* Increase numbef of running threads */
313             Queue->CurrentCount++;
314 
315             /* Check if the entry is valid. If not, bugcheck */
316             if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
317             {
318                 /* Invalid item */
319                 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM,
320                              (ULONG_PTR)QueueEntry,
321                              (ULONG_PTR)Queue,
322                              (ULONG_PTR)NULL,
323                              (ULONG_PTR)((PWORK_QUEUE_ITEM)QueueEntry)->
324                                          WorkerRoutine);
325             }
326 
327             /* Remove the Entry */
328             RemoveEntryList(QueueEntry);
329             QueueEntry->Flink = NULL;
330 
331             /* Nothing to wait on */
332             break;
333         }
334         else
335         {
336             /* Check if a kernel APC is pending and we're below APC_LEVEL */
337             if ((Thread->ApcState.KernelApcPending) &&
338                 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
339             {
340                 /* Increment the count and unlock the dispatcher */
341                 Queue->CurrentCount++;
342                 KiReleaseDispatcherLockFromSynchLevel();
343                 KiExitDispatcher(Thread->WaitIrql);
344             }
345             else
346             {
347                 /* Fail if there's a User APC Pending */
348                 if ((WaitMode != KernelMode) &&
349                     (Thread->ApcState.UserApcPending))
350                 {
351                     /* Return the status and increase the pending threads */
352                     QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
353                     Queue->CurrentCount++;
354                     break;
355                 }
356 
357                 /* Enable the Timeout Timer if there was any specified */
358                 if (Timeout)
359                 {
360                     /* Check if the timer expired */
361                     InterruptTime.QuadPart = KeQueryInterruptTime();
362                     if ((ULONG64)InterruptTime.QuadPart >= Timer->DueTime.QuadPart)
363                     {
364                         /* It did, so we don't need to wait */
365                         QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
366                         Queue->CurrentCount++;
367                         break;
368                     }
369 
370                     /* It didn't, so activate it */
371                     Timer->Header.Inserted = TRUE;
372                 }
373 
374                 /* Insert the wait block in the list */
375                 InsertTailList(&Queue->Header.WaitListHead,
376                                &WaitBlock->WaitListEntry);
377 
378                 /* Setup the wait information */
379                 Thread->State = Waiting;
380 
381                 /* Add the thread to the wait list */
382                 KiAddThreadToWaitList(Thread, Swappable);
383 
384                 /* Activate thread swap */
385                 ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL);
386                 KiSetThreadSwapBusy(Thread);
387 
388                 /* Check if we have a timer */
389                 if (Timeout)
390                 {
391                     /* Insert it */
392                     KxInsertTimer(Timer, Hand);
393                 }
394                 else
395                 {
396                     /* Otherwise, unlock the dispatcher */
397                     KiReleaseDispatcherLockFromSynchLevel();
398                 }
399 
400                 /* Do the actual swap */
401                 Status = KiSwapThread(Thread, KeGetCurrentPrcb());
402 
403                 /* Reset the wait reason */
404                 Thread->WaitReason = 0;
405 
406                 /* Check if we were executing an APC */
407                 if (Status != STATUS_KERNEL_APC) return (PLIST_ENTRY)Status;
408 
409                 /* Check if we had a timeout */
410                 if (Timeout)
411                 {
412                     /* Recalculate due times */
413                     Timeout = KiRecalculateDueTime(OriginalDueTime,
414                                                    &DueTime,
415                                                    &NewDueTime);
416                 }
417             }
418 
419             /* Start another wait */
420             Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
421             KxQueueThreadWait();
422             KiAcquireDispatcherLockAtSynchLevel();
423             Queue->CurrentCount--;
424         }
425     }
426 
427     /* Unlock Database and return */
428     KiReleaseDispatcherLockFromSynchLevel();
429     KiExitDispatcher(Thread->WaitIrql);
430     return QueueEntry;
431 }
432 
433 /*
434  * @implemented
435  */
436 PLIST_ENTRY
437 NTAPI
KeRundownQueue(IN PKQUEUE Queue)438 KeRundownQueue(IN PKQUEUE Queue)
439 {
440     PLIST_ENTRY FirstEntry, NextEntry;
441     PKTHREAD Thread;
442     KIRQL OldIrql;
443     ASSERT_QUEUE(Queue);
444     ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
445     ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
446 
447     /* Get the Dispatcher Lock */
448     OldIrql = KiAcquireDispatcherLock();
449 
450     /* Check if the list is empty */
451     FirstEntry = Queue->EntryListHead.Flink;
452     if (FirstEntry == &Queue->EntryListHead)
453     {
454         /* We won't return anything */
455         FirstEntry = NULL;
456     }
457     else
458     {
459         /* Remove this entry */
460         RemoveEntryList(&Queue->EntryListHead);
461     }
462 
463     /* Loop the list */
464     while (!IsListEmpty(&Queue->ThreadListHead))
465     {
466         /* Get the next entry */
467         NextEntry = Queue->ThreadListHead.Flink;
468 
469         /* Get the associated thread */
470         Thread = CONTAINING_RECORD(NextEntry, KTHREAD, QueueListEntry);
471 
472         /* Clear its queue */
473         Thread->Queue = NULL;
474 
475         /* Remove this entry */
476         RemoveEntryList(NextEntry);
477     }
478 
479     /* Release the dispatcher lock */
480     KiReleaseDispatcherLockFromSynchLevel();
481 
482     /* Exit the dispatcher and return the first entry (if any) */
483     KiExitDispatcher(OldIrql);
484     return FirstEntry;
485 }
486 
487 /* EOF */
488