1 /*
2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
7 * Gunnar Dalsnes
8 * Eric Kohl
9 */
10
11 /* INCLUDES ******************************************************************/
12
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <debug.h>
16
17 /* PRIVATE FUNCTIONS *********************************************************/
18
19 /*
20 * Called when a thread which has a queue entry is entering a wait state
21 */
22 VOID
23 FASTCALL
KiActivateWaiterQueue(IN PKQUEUE Queue)24 KiActivateWaiterQueue(IN PKQUEUE Queue)
25 {
26 PLIST_ENTRY QueueEntry;
27 PLIST_ENTRY WaitEntry;
28 PKWAIT_BLOCK WaitBlock;
29 PKTHREAD Thread;
30 ASSERT_QUEUE(Queue);
31
32 /* Decrement the number of active threads */
33 Queue->CurrentCount--;
34
35 /* Make sure the counts are OK */
36 if (Queue->CurrentCount < Queue->MaximumCount)
37 {
38 /* Get the Queue Entry */
39 QueueEntry = Queue->EntryListHead.Flink;
40
41 /* Get the Wait Entry */
42 WaitEntry = Queue->Header.WaitListHead.Blink;
43
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry != &Queue->Header.WaitListHead) &&
46 (QueueEntry != &Queue->EntryListHead))
47 {
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry);
50 QueueEntry->Flink = NULL;
51
52 /* Decrease the Signal State */
53 Queue->Header.SignalState--;
54
55 /* Unwait the Thread */
56 WaitBlock = CONTAINING_RECORD(WaitEntry,
57 KWAIT_BLOCK,
58 WaitListEntry);
59 Thread = WaitBlock->Thread;
60 KiUnwaitThread(Thread, (LONG_PTR)QueueEntry, IO_NO_INCREMENT);
61 }
62 }
63 }
64
65 /*
66 * Returns the previous number of entries in the queue
67 */
68 LONG
69 NTAPI
KiInsertQueue(IN PKQUEUE Queue,IN PLIST_ENTRY Entry,IN BOOLEAN Head)70 KiInsertQueue(IN PKQUEUE Queue,
71 IN PLIST_ENTRY Entry,
72 IN BOOLEAN Head)
73 {
74 ULONG InitialState;
75 PKTHREAD Thread = KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock;
77 PLIST_ENTRY WaitEntry;
78 PKTIMER Timer;
79 ASSERT_QUEUE(Queue);
80
81 /* Save the old state */
82 InitialState = Queue->Header.SignalState;
83
84 /* Get the Entry */
85 WaitEntry = Queue->Header.WaitListHead.Blink;
86
87 /*
88 * Why the KeGetCurrentThread()->Queue != Queue?
89 * KiInsertQueue might be called from an APC for the current thread.
90 * -Gunnar
91 */
92 if ((Queue->CurrentCount < Queue->MaximumCount) &&
93 (WaitEntry != &Queue->Header.WaitListHead) &&
94 ((Thread->Queue != Queue) ||
95 (Thread->WaitReason != WrQueue)))
96 {
97 /* Remove the wait entry */
98 RemoveEntryList(WaitEntry);
99
100 /* Get the Wait Block and Thread */
101 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
102 Thread = WaitBlock->Thread;
103
104 /* Remove the queue from the thread's wait list */
105 Thread->WaitStatus = (LONG_PTR)Entry;
106 if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry);
107
108 /* Increase the active threads and remove any wait reason */
109 Queue->CurrentCount++;
110 Thread->WaitReason = 0;
111
112 /* Check if there's a Thread Timer */
113 Timer = &Thread->Timer;
114 if (Timer->Header.Inserted) KxRemoveTreeTimer(Timer);
115
116 /* Reschedule the Thread */
117 KiReadyThread(Thread);
118 }
119 else
120 {
121 /* Increase the Entries */
122 Queue->Header.SignalState++;
123
124 /* Check which mode we're using */
125 if (Head)
126 {
127 /* Insert in the head */
128 InsertHeadList(&Queue->EntryListHead, Entry);
129 }
130 else
131 {
132 /* Insert at the end */
133 InsertTailList(&Queue->EntryListHead, Entry);
134 }
135 }
136
137 /* Return the previous state */
138 return InitialState;
139 }
140
141 /* PUBLIC FUNCTIONS **********************************************************/
142
143 /*
144 * @implemented
145 */
146 VOID
147 NTAPI
KeInitializeQueue(IN PKQUEUE Queue,IN ULONG Count OPTIONAL)148 KeInitializeQueue(IN PKQUEUE Queue,
149 IN ULONG Count OPTIONAL)
150 {
151 /* Initialize the Header */
152 Queue->Header.Type = QueueObject;
153 Queue->Header.Abandoned = 0;
154 Queue->Header.Size = sizeof(KQUEUE) / sizeof(ULONG);
155 Queue->Header.SignalState = 0;
156 InitializeListHead(&(Queue->Header.WaitListHead));
157
158 /* Initialize the Lists */
159 InitializeListHead(&Queue->EntryListHead);
160 InitializeListHead(&Queue->ThreadListHead);
161
162 /* Set the Current and Maximum Count */
163 Queue->CurrentCount = 0;
164 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
165 }
166
167 /*
168 * @implemented
169 */
170 LONG
171 NTAPI
KeInsertHeadQueue(IN PKQUEUE Queue,IN PLIST_ENTRY Entry)172 KeInsertHeadQueue(IN PKQUEUE Queue,
173 IN PLIST_ENTRY Entry)
174 {
175 LONG PreviousState;
176 KIRQL OldIrql;
177 ASSERT_QUEUE(Queue);
178 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
179
180 /* Lock the Dispatcher Database */
181 OldIrql = KiAcquireDispatcherLock();
182
183 /* Insert the Queue */
184 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
185
186 /* Release the Dispatcher Lock */
187 KiReleaseDispatcherLock(OldIrql);
188
189 /* Return previous State */
190 return PreviousState;
191 }
192
193 /*
194 * @implemented
195 */
196 LONG
197 NTAPI
KeInsertQueue(IN PKQUEUE Queue,IN PLIST_ENTRY Entry)198 KeInsertQueue(IN PKQUEUE Queue,
199 IN PLIST_ENTRY Entry)
200 {
201 LONG PreviousState;
202 KIRQL OldIrql;
203 ASSERT_QUEUE(Queue);
204 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
205
206 /* Lock the Dispatcher Database */
207 OldIrql = KiAcquireDispatcherLock();
208
209 /* Insert the Queue */
210 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
211
212 /* Release the Dispatcher Lock */
213 KiReleaseDispatcherLock(OldIrql);
214
215 /* Return previous State */
216 return PreviousState;
217 }
218
219 /*
220 * @implemented
221 *
222 * Returns number of entries in the queue
223 */
224 LONG
225 NTAPI
KeReadStateQueue(IN PKQUEUE Queue)226 KeReadStateQueue(IN PKQUEUE Queue)
227 {
228 /* Returns the Signal State */
229 ASSERT_QUEUE(Queue);
230 return Queue->Header.SignalState;
231 }
232
233 /*
234 * @implemented
235 */
236 PLIST_ENTRY
237 NTAPI
KeRemoveQueue(IN PKQUEUE Queue,IN KPROCESSOR_MODE WaitMode,IN PLARGE_INTEGER Timeout OPTIONAL)238 KeRemoveQueue(IN PKQUEUE Queue,
239 IN KPROCESSOR_MODE WaitMode,
240 IN PLARGE_INTEGER Timeout OPTIONAL)
241 {
242 PLIST_ENTRY QueueEntry;
243 LONG_PTR Status;
244 PKTHREAD Thread = KeGetCurrentThread();
245 PKQUEUE PreviousQueue;
246 PKWAIT_BLOCK WaitBlock = &Thread->WaitBlock[0];
247 PKWAIT_BLOCK TimerBlock = &Thread->WaitBlock[TIMER_WAIT_BLOCK];
248 PKTIMER Timer = &Thread->Timer;
249 BOOLEAN Swappable;
250 PLARGE_INTEGER OriginalDueTime = Timeout;
251 LARGE_INTEGER DueTime = {{0}}, NewDueTime, InterruptTime;
252 ULONG Hand = 0;
253 ASSERT_QUEUE(Queue);
254 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
255
256 /* Check if the Lock is already held */
257 if (Thread->WaitNext)
258 {
259 /* It is, so next time don't do expect this */
260 Thread->WaitNext = FALSE;
261 KxQueueThreadWait();
262 }
263 else
264 {
265 /* Raise IRQL to synch, prepare the wait, then lock the database */
266 Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
267 KxQueueThreadWait();
268 KiAcquireDispatcherLockAtSynchLevel();
269 }
270
271 /*
272 * This is needed so that we can set the new queue right here,
273 * before additional processing
274 */
275 PreviousQueue = Thread->Queue;
276 Thread->Queue = Queue;
277
278 /* Check if this is a different queue */
279 if (Queue != PreviousQueue)
280 {
281 /* Get the current entry */
282 QueueEntry = &Thread->QueueListEntry;
283 if (PreviousQueue)
284 {
285 /* Remove from this list */
286 RemoveEntryList(QueueEntry);
287
288 /* Wake the queue */
289 KiActivateWaiterQueue(PreviousQueue);
290 }
291
292 /* Insert in this new Queue */
293 InsertTailList(&Queue->ThreadListHead, QueueEntry);
294 }
295 else
296 {
297 /* Same queue, decrement waiting threads */
298 Queue->CurrentCount--;
299 }
300
301 /* Loop until the queue is processed */
302 while (TRUE)
303 {
304 /* Check if the counts are valid and if there is still a queued entry */
305 QueueEntry = Queue->EntryListHead.Flink;
306 if ((Queue->CurrentCount < Queue->MaximumCount) &&
307 (QueueEntry != &Queue->EntryListHead))
308 {
309 /* Decrease the number of entries */
310 Queue->Header.SignalState--;
311
312 /* Increase numbef of running threads */
313 Queue->CurrentCount++;
314
315 /* Check if the entry is valid. If not, bugcheck */
316 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
317 {
318 /* Invalid item */
319 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM,
320 (ULONG_PTR)QueueEntry,
321 (ULONG_PTR)Queue,
322 (ULONG_PTR)NULL,
323 (ULONG_PTR)((PWORK_QUEUE_ITEM)QueueEntry)->
324 WorkerRoutine);
325 }
326
327 /* Remove the Entry */
328 RemoveEntryList(QueueEntry);
329 QueueEntry->Flink = NULL;
330
331 /* Nothing to wait on */
332 break;
333 }
334 else
335 {
336 /* Check if a kernel APC is pending and we're below APC_LEVEL */
337 if ((Thread->ApcState.KernelApcPending) &&
338 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
339 {
340 /* Increment the count and unlock the dispatcher */
341 Queue->CurrentCount++;
342 KiReleaseDispatcherLockFromSynchLevel();
343 KiExitDispatcher(Thread->WaitIrql);
344 }
345 else
346 {
347 /* Fail if there's a User APC Pending */
348 if ((WaitMode != KernelMode) &&
349 (Thread->ApcState.UserApcPending))
350 {
351 /* Return the status and increase the pending threads */
352 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
353 Queue->CurrentCount++;
354 break;
355 }
356
357 /* Enable the Timeout Timer if there was any specified */
358 if (Timeout)
359 {
360 /* Check if the timer expired */
361 InterruptTime.QuadPart = KeQueryInterruptTime();
362 if ((ULONG64)InterruptTime.QuadPart >= Timer->DueTime.QuadPart)
363 {
364 /* It did, so we don't need to wait */
365 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
366 Queue->CurrentCount++;
367 break;
368 }
369
370 /* It didn't, so activate it */
371 Timer->Header.Inserted = TRUE;
372 }
373
374 /* Insert the wait block in the list */
375 InsertTailList(&Queue->Header.WaitListHead,
376 &WaitBlock->WaitListEntry);
377
378 /* Setup the wait information */
379 Thread->State = Waiting;
380
381 /* Add the thread to the wait list */
382 KiAddThreadToWaitList(Thread, Swappable);
383
384 /* Activate thread swap */
385 ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL);
386 KiSetThreadSwapBusy(Thread);
387
388 /* Check if we have a timer */
389 if (Timeout)
390 {
391 /* Insert it */
392 KxInsertTimer(Timer, Hand);
393 }
394 else
395 {
396 /* Otherwise, unlock the dispatcher */
397 KiReleaseDispatcherLockFromSynchLevel();
398 }
399
400 /* Do the actual swap */
401 Status = KiSwapThread(Thread, KeGetCurrentPrcb());
402
403 /* Reset the wait reason */
404 Thread->WaitReason = 0;
405
406 /* Check if we were executing an APC */
407 if (Status != STATUS_KERNEL_APC) return (PLIST_ENTRY)Status;
408
409 /* Check if we had a timeout */
410 if (Timeout)
411 {
412 /* Recalculate due times */
413 Timeout = KiRecalculateDueTime(OriginalDueTime,
414 &DueTime,
415 &NewDueTime);
416 }
417 }
418
419 /* Start another wait */
420 Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
421 KxQueueThreadWait();
422 KiAcquireDispatcherLockAtSynchLevel();
423 Queue->CurrentCount--;
424 }
425 }
426
427 /* Unlock Database and return */
428 KiReleaseDispatcherLockFromSynchLevel();
429 KiExitDispatcher(Thread->WaitIrql);
430 return QueueEntry;
431 }
432
433 /*
434 * @implemented
435 */
436 PLIST_ENTRY
437 NTAPI
KeRundownQueue(IN PKQUEUE Queue)438 KeRundownQueue(IN PKQUEUE Queue)
439 {
440 PLIST_ENTRY FirstEntry, NextEntry;
441 PKTHREAD Thread;
442 KIRQL OldIrql;
443 ASSERT_QUEUE(Queue);
444 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
445 ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
446
447 /* Get the Dispatcher Lock */
448 OldIrql = KiAcquireDispatcherLock();
449
450 /* Check if the list is empty */
451 FirstEntry = Queue->EntryListHead.Flink;
452 if (FirstEntry == &Queue->EntryListHead)
453 {
454 /* We won't return anything */
455 FirstEntry = NULL;
456 }
457 else
458 {
459 /* Remove this entry */
460 RemoveEntryList(&Queue->EntryListHead);
461 }
462
463 /* Loop the list */
464 while (!IsListEmpty(&Queue->ThreadListHead))
465 {
466 /* Get the next entry */
467 NextEntry = Queue->ThreadListHead.Flink;
468
469 /* Get the associated thread */
470 Thread = CONTAINING_RECORD(NextEntry, KTHREAD, QueueListEntry);
471
472 /* Clear its queue */
473 Thread->Queue = NULL;
474
475 /* Remove this entry */
476 RemoveEntryList(NextEntry);
477 }
478
479 /* Release the dispatcher lock */
480 KiReleaseDispatcherLockFromSynchLevel();
481
482 /* Exit the dispatcher and return the first entry (if any) */
483 KiExitDispatcher(OldIrql);
484 return FirstEntry;
485 }
486
487 /* EOF */
488