1 /* 2 * PROJECT: ReactOS Kernel 3 * LICENSE: GPL - See COPYING in the top level directory 4 * FILE: ntoskrnl/ke/queue.c 5 * PURPOSE: Implements kernel queues 6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org) 7 * Gunnar Dalsnes 8 * Eric Kohl 9 */ 10 11 /* INCLUDES ******************************************************************/ 12 13 #include <ntoskrnl.h> 14 #define NDEBUG 15 #include <debug.h> 16 17 /* PRIVATE FUNCTIONS *********************************************************/ 18 19 /* 20 * Called when a thread which has a queue entry is entering a wait state 21 */ 22 VOID 23 FASTCALL 24 KiActivateWaiterQueue(IN PKQUEUE Queue) 25 { 26 PLIST_ENTRY QueueEntry; 27 PLIST_ENTRY WaitEntry; 28 PKWAIT_BLOCK WaitBlock; 29 PKTHREAD Thread; 30 ASSERT_QUEUE(Queue); 31 32 /* Decrement the number of active threads */ 33 Queue->CurrentCount--; 34 35 /* Make sure the counts are OK */ 36 if (Queue->CurrentCount < Queue->MaximumCount) 37 { 38 /* Get the Queue Entry */ 39 QueueEntry = Queue->EntryListHead.Flink; 40 41 /* Get the Wait Entry */ 42 WaitEntry = Queue->Header.WaitListHead.Blink; 43 44 /* Make sure that the Queue entries are not part of empty lists */ 45 if ((WaitEntry != &Queue->Header.WaitListHead) && 46 (QueueEntry != &Queue->EntryListHead)) 47 { 48 /* Remove this entry */ 49 RemoveEntryList(QueueEntry); 50 QueueEntry->Flink = NULL; 51 52 /* Decrease the Signal State */ 53 Queue->Header.SignalState--; 54 55 /* Unwait the Thread */ 56 WaitBlock = CONTAINING_RECORD(WaitEntry, 57 KWAIT_BLOCK, 58 WaitListEntry); 59 Thread = WaitBlock->Thread; 60 KiUnwaitThread(Thread, (LONG_PTR)QueueEntry, IO_NO_INCREMENT); 61 } 62 } 63 } 64 65 /* 66 * Returns the previous number of entries in the queue 67 */ 68 LONG 69 NTAPI 70 KiInsertQueue(IN PKQUEUE Queue, 71 IN PLIST_ENTRY Entry, 72 IN BOOLEAN Head) 73 { 74 ULONG InitialState; 75 PKTHREAD Thread = KeGetCurrentThread(); 76 PKWAIT_BLOCK WaitBlock; 77 PLIST_ENTRY WaitEntry; 78 PKTIMER Timer; 79 ASSERT_QUEUE(Queue); 80 81 /* Save the old state */ 82 InitialState = Queue->Header.SignalState; 83 84 /* Get the Entry */ 85 WaitEntry = Queue->Header.WaitListHead.Blink; 86 87 /* 88 * Why the KeGetCurrentThread()->Queue != Queue? 89 * KiInsertQueue might be called from an APC for the current thread. 90 * -Gunnar 91 */ 92 if ((Queue->CurrentCount < Queue->MaximumCount) && 93 (WaitEntry != &Queue->Header.WaitListHead) && 94 ((Thread->Queue != Queue) || 95 (Thread->WaitReason != WrQueue))) 96 { 97 /* Remove the wait entry */ 98 RemoveEntryList(WaitEntry); 99 100 /* Get the Wait Block and Thread */ 101 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry); 102 Thread = WaitBlock->Thread; 103 104 /* Remove the queue from the thread's wait list */ 105 Thread->WaitStatus = (LONG_PTR)Entry; 106 if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry); 107 108 /* Increase the active threads and remove any wait reason */ 109 Queue->CurrentCount++; 110 Thread->WaitReason = 0; 111 112 /* Check if there's a Thread Timer */ 113 Timer = &Thread->Timer; 114 if (Timer->Header.Inserted) KxRemoveTreeTimer(Timer); 115 116 /* Reschedule the Thread */ 117 KiReadyThread(Thread); 118 } 119 else 120 { 121 /* Increase the Entries */ 122 Queue->Header.SignalState++; 123 124 /* Check which mode we're using */ 125 if (Head) 126 { 127 /* Insert in the head */ 128 InsertHeadList(&Queue->EntryListHead, Entry); 129 } 130 else 131 { 132 /* Insert at the end */ 133 InsertTailList(&Queue->EntryListHead, Entry); 134 } 135 } 136 137 /* Return the previous state */ 138 return InitialState; 139 } 140 141 /* PUBLIC FUNCTIONS **********************************************************/ 142 143 /* 144 * @implemented 145 */ 146 VOID 147 NTAPI 148 KeInitializeQueue(IN PKQUEUE Queue, 149 IN ULONG Count OPTIONAL) 150 { 151 /* Initialize the Header */ 152 Queue->Header.Type = QueueObject; 153 Queue->Header.Abandoned = 0; 154 Queue->Header.Size = sizeof(KQUEUE) / sizeof(ULONG); 155 Queue->Header.SignalState = 0; 156 InitializeListHead(&(Queue->Header.WaitListHead)); 157 158 /* Initialize the Lists */ 159 InitializeListHead(&Queue->EntryListHead); 160 InitializeListHead(&Queue->ThreadListHead); 161 162 /* Set the Current and Maximum Count */ 163 Queue->CurrentCount = 0; 164 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count; 165 } 166 167 /* 168 * @implemented 169 */ 170 LONG 171 NTAPI 172 KeInsertHeadQueue(IN PKQUEUE Queue, 173 IN PLIST_ENTRY Entry) 174 { 175 LONG PreviousState; 176 KIRQL OldIrql; 177 ASSERT_QUEUE(Queue); 178 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 179 180 /* Lock the Dispatcher Database */ 181 OldIrql = KiAcquireDispatcherLock(); 182 183 /* Insert the Queue */ 184 PreviousState = KiInsertQueue(Queue, Entry, TRUE); 185 186 /* Release the Dispatcher Lock */ 187 KiReleaseDispatcherLock(OldIrql); 188 189 /* Return previous State */ 190 return PreviousState; 191 } 192 193 /* 194 * @implemented 195 */ 196 LONG 197 NTAPI 198 KeInsertQueue(IN PKQUEUE Queue, 199 IN PLIST_ENTRY Entry) 200 { 201 LONG PreviousState; 202 KIRQL OldIrql; 203 ASSERT_QUEUE(Queue); 204 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 205 206 /* Lock the Dispatcher Database */ 207 OldIrql = KiAcquireDispatcherLock(); 208 209 /* Insert the Queue */ 210 PreviousState = KiInsertQueue(Queue, Entry, FALSE); 211 212 /* Release the Dispatcher Lock */ 213 KiReleaseDispatcherLock(OldIrql); 214 215 /* Return previous State */ 216 return PreviousState; 217 } 218 219 /* 220 * @implemented 221 * 222 * Returns number of entries in the queue 223 */ 224 LONG 225 NTAPI 226 KeReadStateQueue(IN PKQUEUE Queue) 227 { 228 /* Returns the Signal State */ 229 ASSERT_QUEUE(Queue); 230 return Queue->Header.SignalState; 231 } 232 233 /* 234 * @implemented 235 */ 236 PLIST_ENTRY 237 NTAPI 238 KeRemoveQueue(IN PKQUEUE Queue, 239 IN KPROCESSOR_MODE WaitMode, 240 IN PLARGE_INTEGER Timeout OPTIONAL) 241 { 242 PLIST_ENTRY QueueEntry; 243 LONG_PTR Status; 244 PKTHREAD Thread = KeGetCurrentThread(); 245 PKQUEUE PreviousQueue; 246 PKWAIT_BLOCK WaitBlock = &Thread->WaitBlock[0]; 247 PKWAIT_BLOCK TimerBlock = &Thread->WaitBlock[TIMER_WAIT_BLOCK]; 248 PKTIMER Timer = &Thread->Timer; 249 BOOLEAN Swappable; 250 PLARGE_INTEGER OriginalDueTime = Timeout; 251 LARGE_INTEGER DueTime = {{0}}, NewDueTime, InterruptTime; 252 ULONG Hand = 0; 253 ASSERT_QUEUE(Queue); 254 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 255 256 /* Check if the Lock is already held */ 257 if (Thread->WaitNext) 258 { 259 /* It is, so next time don't do expect this */ 260 Thread->WaitNext = FALSE; 261 KxQueueThreadWait(); 262 } 263 else 264 { 265 /* Raise IRQL to synch, prepare the wait, then lock the database */ 266 Thread->WaitIrql = KeRaiseIrqlToSynchLevel(); 267 KxQueueThreadWait(); 268 KiAcquireDispatcherLockAtSynchLevel(); 269 } 270 271 /* 272 * This is needed so that we can set the new queue right here, 273 * before additional processing 274 */ 275 PreviousQueue = Thread->Queue; 276 Thread->Queue = Queue; 277 278 /* Check if this is a different queue */ 279 if (Queue != PreviousQueue) 280 { 281 /* Get the current entry */ 282 QueueEntry = &Thread->QueueListEntry; 283 if (PreviousQueue) 284 { 285 /* Remove from this list */ 286 RemoveEntryList(QueueEntry); 287 288 /* Wake the queue */ 289 KiActivateWaiterQueue(PreviousQueue); 290 } 291 292 /* Insert in this new Queue */ 293 InsertTailList(&Queue->ThreadListHead, QueueEntry); 294 } 295 else 296 { 297 /* Same queue, decrement waiting threads */ 298 Queue->CurrentCount--; 299 } 300 301 /* Loop until the queue is processed */ 302 while (TRUE) 303 { 304 /* Check if the counts are valid and if there is still a queued entry */ 305 QueueEntry = Queue->EntryListHead.Flink; 306 if ((Queue->CurrentCount < Queue->MaximumCount) && 307 (QueueEntry != &Queue->EntryListHead)) 308 { 309 /* Decrease the number of entries */ 310 Queue->Header.SignalState--; 311 312 /* Increase numbef of running threads */ 313 Queue->CurrentCount++; 314 315 /* Check if the entry is valid. If not, bugcheck */ 316 if (!(QueueEntry->Flink) || !(QueueEntry->Blink)) 317 { 318 /* Invalid item */ 319 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM, 320 (ULONG_PTR)QueueEntry, 321 (ULONG_PTR)Queue, 322 (ULONG_PTR)NULL, 323 (ULONG_PTR)((PWORK_QUEUE_ITEM)QueueEntry)-> 324 WorkerRoutine); 325 } 326 327 /* Remove the Entry */ 328 RemoveEntryList(QueueEntry); 329 QueueEntry->Flink = NULL; 330 331 /* Nothing to wait on */ 332 break; 333 } 334 else 335 { 336 /* Check if a kernel APC is pending and we're below APC_LEVEL */ 337 if ((Thread->ApcState.KernelApcPending) && 338 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL)) 339 { 340 /* Increment the count and unlock the dispatcher */ 341 Queue->CurrentCount++; 342 KiReleaseDispatcherLockFromSynchLevel(); 343 KiExitDispatcher(Thread->WaitIrql); 344 } 345 else 346 { 347 /* Fail if there's a User APC Pending */ 348 if ((WaitMode != KernelMode) && 349 (Thread->ApcState.UserApcPending)) 350 { 351 /* Return the status and increase the pending threads */ 352 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC; 353 Queue->CurrentCount++; 354 break; 355 } 356 357 /* Enable the Timeout Timer if there was any specified */ 358 if (Timeout) 359 { 360 /* Check if the timer expired */ 361 InterruptTime.QuadPart = KeQueryInterruptTime(); 362 if ((ULONG64)InterruptTime.QuadPart >= Timer->DueTime.QuadPart) 363 { 364 /* It did, so we don't need to wait */ 365 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT; 366 Queue->CurrentCount++; 367 break; 368 } 369 370 /* It didn't, so activate it */ 371 Timer->Header.Inserted = TRUE; 372 } 373 374 /* Insert the wait block in the list */ 375 InsertTailList(&Queue->Header.WaitListHead, 376 &WaitBlock->WaitListEntry); 377 378 /* Setup the wait information */ 379 Thread->State = Waiting; 380 381 /* Add the thread to the wait list */ 382 KiAddThreadToWaitList(Thread, Swappable); 383 384 /* Activate thread swap */ 385 ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL); 386 KiSetThreadSwapBusy(Thread); 387 388 /* Check if we have a timer */ 389 if (Timeout) 390 { 391 /* Insert it */ 392 KxInsertTimer(Timer, Hand); 393 } 394 else 395 { 396 /* Otherwise, unlock the dispatcher */ 397 KiReleaseDispatcherLockFromSynchLevel(); 398 } 399 400 /* Do the actual swap */ 401 Status = KiSwapThread(Thread, KeGetCurrentPrcb()); 402 403 /* Reset the wait reason */ 404 Thread->WaitReason = 0; 405 406 /* Check if we were executing an APC */ 407 if (Status != STATUS_KERNEL_APC) return (PLIST_ENTRY)Status; 408 409 /* Check if we had a timeout */ 410 if (Timeout) 411 { 412 /* Recalculate due times */ 413 Timeout = KiRecalculateDueTime(OriginalDueTime, 414 &DueTime, 415 &NewDueTime); 416 } 417 } 418 419 /* Start another wait */ 420 Thread->WaitIrql = KeRaiseIrqlToSynchLevel(); 421 KxQueueThreadWait(); 422 KiAcquireDispatcherLockAtSynchLevel(); 423 Queue->CurrentCount--; 424 } 425 } 426 427 /* Unlock Database and return */ 428 KiReleaseDispatcherLockFromSynchLevel(); 429 KiExitDispatcher(Thread->WaitIrql); 430 return QueueEntry; 431 } 432 433 /* 434 * @implemented 435 */ 436 PLIST_ENTRY 437 NTAPI 438 KeRundownQueue(IN PKQUEUE Queue) 439 { 440 PLIST_ENTRY FirstEntry, NextEntry; 441 PKTHREAD Thread; 442 KIRQL OldIrql; 443 ASSERT_QUEUE(Queue); 444 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL); 445 ASSERT(IsListEmpty(&Queue->Header.WaitListHead)); 446 447 /* Get the Dispatcher Lock */ 448 OldIrql = KiAcquireDispatcherLock(); 449 450 /* Check if the list is empty */ 451 FirstEntry = Queue->EntryListHead.Flink; 452 if (FirstEntry == &Queue->EntryListHead) 453 { 454 /* We won't return anything */ 455 FirstEntry = NULL; 456 } 457 else 458 { 459 /* Remove this entry */ 460 RemoveEntryList(&Queue->EntryListHead); 461 } 462 463 /* Loop the list */ 464 while (!IsListEmpty(&Queue->ThreadListHead)) 465 { 466 /* Get the next entry */ 467 NextEntry = Queue->ThreadListHead.Flink; 468 469 /* Get the associated thread */ 470 Thread = CONTAINING_RECORD(NextEntry, KTHREAD, QueueListEntry); 471 472 /* Clear its queue */ 473 Thread->Queue = NULL; 474 475 /* Remove this entry */ 476 RemoveEntryList(NextEntry); 477 } 478 479 /* Release the dispatcher lock */ 480 KiReleaseDispatcherLockFromSynchLevel(); 481 482 /* Exit the dispatcher and return the first entry (if any) */ 483 KiExitDispatcher(OldIrql); 484 return FirstEntry; 485 } 486 487 /* EOF */ 488