1 /* 2 * COPYRIGHT: See COPYING in the top level directory 3 * PROJECT: ReactOS system libraries 4 * PURPOSE: Work Item implementation 5 * FILE: lib/rtl/workitem.c 6 * PROGRAMMER: 7 */ 8 9 /* INCLUDES *****************************************************************/ 10 11 #include <rtl.h> 12 13 #define NDEBUG 14 #include <debug.h> 15 16 /* FUNCTIONS ***************************************************************/ 17 18 NTSTATUS 19 NTAPI 20 RtlpStartThread(IN PTHREAD_START_ROUTINE Function, 21 IN PVOID Parameter, 22 OUT PHANDLE ThreadHandle) 23 { 24 /* Create a native worker thread -- used for SMSS, CSRSS, etc... */ 25 return RtlCreateUserThread(NtCurrentProcess(), 26 NULL, 27 TRUE, 28 0, 29 0, 30 0, 31 Function, 32 Parameter, 33 ThreadHandle, 34 NULL); 35 } 36 37 NTSTATUS 38 NTAPI 39 RtlpExitThread(IN NTSTATUS ExitStatus) 40 { 41 /* Kill a native worker thread -- used for SMSS, CSRSS, etc... */ 42 return NtTerminateThread(NtCurrentThread(), ExitStatus); 43 } 44 45 PRTL_START_POOL_THREAD RtlpStartThreadFunc = RtlpStartThread; 46 PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc = RtlpExitThread; 47 48 #define MAX_WORKERTHREADS 0x100 49 #define WORKERTHREAD_CREATION_THRESHOLD 0x5 50 51 typedef struct _RTLP_IOWORKERTHREAD 52 { 53 LIST_ENTRY ListEntry; 54 HANDLE ThreadHandle; 55 ULONG Flags; 56 } RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD; 57 58 typedef struct _RTLP_WORKITEM 59 { 60 WORKERCALLBACKFUNC Function; 61 PVOID Context; 62 ULONG Flags; 63 HANDLE TokenHandle; 64 } RTLP_WORKITEM, *PRTLP_WORKITEM; 65 66 static LONG ThreadPoolInitialized = 0; 67 static RTL_CRITICAL_SECTION ThreadPoolLock; 68 static PRTLP_IOWORKERTHREAD PersistentIoThread; 69 static LIST_ENTRY ThreadPoolIOWorkerThreadsList; 70 static HANDLE ThreadPoolCompletionPort; 71 static LONG ThreadPoolWorkerThreads; 72 static LONG ThreadPoolWorkerThreadsRequests; 73 static LONG ThreadPoolWorkerThreadsLongRequests; 74 static LONG ThreadPoolIOWorkerThreads; 75 static LONG ThreadPoolIOWorkerThreadsRequests; 76 static LONG ThreadPoolIOWorkerThreadsLongRequests; 77 78 #define IsThreadPoolInitialized() (*((volatile LONG*)&ThreadPoolInitialized) == 1) 79 80 static NTSTATUS 81 RtlpInitializeThreadPool(VOID) 82 { 83 NTSTATUS Status = STATUS_SUCCESS; 84 LONG InitStatus; 85 86 do 87 { 88 InitStatus = InterlockedCompareExchange(&ThreadPoolInitialized, 89 2, 90 0); 91 if (InitStatus == 0) 92 { 93 /* We're the first thread to initialize the thread pool */ 94 95 InitializeListHead(&ThreadPoolIOWorkerThreadsList); 96 97 PersistentIoThread = NULL; 98 99 ThreadPoolWorkerThreads = 0; 100 ThreadPoolWorkerThreadsRequests = 0; 101 ThreadPoolWorkerThreadsLongRequests = 0; 102 ThreadPoolIOWorkerThreads = 0; 103 ThreadPoolIOWorkerThreadsRequests = 0; 104 ThreadPoolIOWorkerThreadsLongRequests = 0; 105 106 /* Initialize the lock */ 107 Status = RtlInitializeCriticalSection(&ThreadPoolLock); 108 if (!NT_SUCCESS(Status)) 109 goto Finish; 110 111 /* Create the complection port */ 112 Status = NtCreateIoCompletion(&ThreadPoolCompletionPort, 113 IO_COMPLETION_ALL_ACCESS, 114 NULL, 115 0); 116 if (!NT_SUCCESS(Status)) 117 { 118 RtlDeleteCriticalSection(&ThreadPoolLock); 119 goto Finish; 120 } 121 122 Finish: 123 /* Initialization done */ 124 InterlockedExchange(&ThreadPoolInitialized, 125 1); 126 break; 127 } 128 else if (InitStatus == 2) 129 { 130 LARGE_INTEGER Timeout; 131 132 /* Another thread is currently initializing the thread pool! 133 Poll after a short period of time to see if the initialization 134 was completed */ 135 136 Timeout.QuadPart = -10000000LL; /* Wait for a second */ 137 NtDelayExecution(FALSE, 138 &Timeout); 139 } 140 } while (InitStatus != 1); 141 142 return Status; 143 } 144 145 static NTSTATUS 146 RtlpGetImpersonationToken(OUT PHANDLE TokenHandle) 147 { 148 NTSTATUS Status; 149 150 Status = NtOpenThreadToken(NtCurrentThread(), 151 TOKEN_IMPERSONATE, 152 TRUE, 153 TokenHandle); 154 if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS) 155 { 156 *TokenHandle = NULL; 157 Status = STATUS_SUCCESS; 158 } 159 160 return Status; 161 } 162 163 static NTSTATUS 164 RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine) 165 { 166 NTSTATUS Status; 167 HANDLE ThreadHandle; 168 LARGE_INTEGER Timeout; 169 volatile LONG WorkerInitialized = 0; 170 171 Timeout.QuadPart = -10000LL; /* Wait for 100ms */ 172 173 /* Start the thread */ 174 Status = RtlpStartThreadFunc(StartRoutine, (PVOID)&WorkerInitialized, &ThreadHandle); 175 if (NT_SUCCESS(Status)) 176 { 177 NtResumeThread(ThreadHandle, NULL); 178 179 /* Poll until the thread got a chance to initialize */ 180 while (WorkerInitialized == 0) 181 { 182 NtDelayExecution(FALSE, 183 &Timeout); 184 } 185 186 NtClose(ThreadHandle); 187 } 188 189 return Status; 190 } 191 192 static VOID 193 NTAPI 194 RtlpExecuteWorkItem(IN OUT PVOID NormalContext, 195 IN OUT PVOID SystemArgument1, 196 IN OUT PVOID SystemArgument2) 197 { 198 NTSTATUS Status; 199 BOOLEAN Impersonated = FALSE; 200 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2; 201 202 RtlFreeHeap(RtlGetProcessHeap(), 203 0, 204 SystemArgument2); 205 206 if (WorkItem.TokenHandle != NULL) 207 { 208 Status = NtSetInformationThread(NtCurrentThread(), 209 ThreadImpersonationToken, 210 &WorkItem.TokenHandle, 211 sizeof(HANDLE)); 212 213 NtClose(WorkItem.TokenHandle); 214 215 if (NT_SUCCESS(Status)) 216 { 217 Impersonated = TRUE; 218 } 219 } 220 221 _SEH2_TRY 222 { 223 DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle); 224 225 /* Execute the function */ 226 WorkItem.Function(WorkItem.Context); 227 } 228 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER) 229 { 230 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function); 231 } 232 _SEH2_END; 233 234 if (Impersonated) 235 { 236 WorkItem.TokenHandle = NULL; 237 Status = NtSetInformationThread(NtCurrentThread(), 238 ThreadImpersonationToken, 239 &WorkItem.TokenHandle, 240 sizeof(HANDLE)); 241 if (!NT_SUCCESS(Status)) 242 { 243 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status); 244 } 245 } 246 247 /* update the requests counter */ 248 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests); 249 250 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION) 251 { 252 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests); 253 } 254 } 255 256 257 static NTSTATUS 258 RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem) 259 { 260 NTSTATUS Status = STATUS_SUCCESS; 261 262 InterlockedIncrement(&ThreadPoolWorkerThreadsRequests); 263 264 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 265 { 266 InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests); 267 } 268 269 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD) 270 { 271 Status = RtlpInitializeTimerThread(); 272 273 if (NT_SUCCESS(Status)) 274 { 275 /* Queue an APC in the timer thread */ 276 Status = NtQueueApcThread(TimerThreadHandle, 277 RtlpExecuteWorkItem, 278 NULL, 279 NULL, 280 WorkItem); 281 } 282 } 283 else 284 { 285 /* Queue an IO completion message */ 286 Status = NtSetIoCompletion(ThreadPoolCompletionPort, 287 RtlpExecuteWorkItem, 288 WorkItem, 289 STATUS_SUCCESS, 290 0); 291 } 292 293 if (!NT_SUCCESS(Status)) 294 { 295 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests); 296 297 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 298 { 299 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests); 300 } 301 } 302 303 return Status; 304 } 305 306 static VOID 307 NTAPI 308 RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext, 309 IN OUT PVOID SystemArgument1, 310 IN OUT PVOID SystemArgument2) 311 { 312 NTSTATUS Status; 313 BOOLEAN Impersonated = FALSE; 314 PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext; 315 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2; 316 317 ASSERT(IoThread != NULL); 318 319 RtlFreeHeap(RtlGetProcessHeap(), 320 0, 321 SystemArgument2); 322 323 if (WorkItem.TokenHandle != NULL) 324 { 325 Status = NtSetInformationThread(NtCurrentThread(), 326 ThreadImpersonationToken, 327 &WorkItem.TokenHandle, 328 sizeof(HANDLE)); 329 330 NtClose(WorkItem.TokenHandle); 331 332 if (NT_SUCCESS(Status)) 333 { 334 Impersonated = TRUE; 335 } 336 } 337 338 _SEH2_TRY 339 { 340 DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle); 341 342 /* Execute the function */ 343 WorkItem.Function(WorkItem.Context); 344 } 345 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER) 346 { 347 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function); 348 } 349 _SEH2_END; 350 351 if (Impersonated) 352 { 353 WorkItem.TokenHandle = NULL; 354 Status = NtSetInformationThread(NtCurrentThread(), 355 ThreadImpersonationToken, 356 &WorkItem.TokenHandle, 357 sizeof(HANDLE)); 358 if (!NT_SUCCESS(Status)) 359 { 360 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status); 361 } 362 } 363 364 /* remove the long function flag */ 365 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION) 366 { 367 Status = RtlEnterCriticalSection(&ThreadPoolLock); 368 if (NT_SUCCESS(Status)) 369 { 370 IoThread->Flags &= ~WT_EXECUTELONGFUNCTION; 371 RtlLeaveCriticalSection(&ThreadPoolLock); 372 } 373 } 374 375 /* update the requests counter */ 376 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests); 377 378 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION) 379 { 380 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests); 381 } 382 } 383 384 static NTSTATUS 385 RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem) 386 { 387 PLIST_ENTRY CurrentEntry; 388 PRTLP_IOWORKERTHREAD IoThread = NULL; 389 NTSTATUS Status = STATUS_SUCCESS; 390 391 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD) 392 { 393 if (PersistentIoThread != NULL) 394 { 395 /* We already have a persistent IO worker thread */ 396 IoThread = PersistentIoThread; 397 } 398 else 399 { 400 /* We're not aware of any persistent IO worker thread. Search for a unused 401 worker thread that doesn't have a long function queued */ 402 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink; 403 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList) 404 { 405 IoThread = CONTAINING_RECORD(CurrentEntry, 406 RTLP_IOWORKERTHREAD, 407 ListEntry); 408 409 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION)) 410 break; 411 412 CurrentEntry = CurrentEntry->Flink; 413 } 414 415 if (CurrentEntry != &ThreadPoolIOWorkerThreadsList) 416 { 417 /* Found a worker thread we can use. */ 418 ASSERT(IoThread != NULL); 419 420 IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD; 421 PersistentIoThread = IoThread; 422 } 423 else 424 { 425 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n"); 426 return STATUS_NO_MEMORY; 427 } 428 } 429 } 430 else 431 { 432 /* Find a worker thread that is not currently executing a long function */ 433 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink; 434 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList) 435 { 436 IoThread = CONTAINING_RECORD(CurrentEntry, 437 RTLP_IOWORKERTHREAD, 438 ListEntry); 439 440 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION)) 441 { 442 /* if we're trying to queue a long function then make sure we're not dealing 443 with the persistent thread */ 444 if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) && !(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)) 445 { 446 /* found a candidate */ 447 break; 448 } 449 } 450 451 CurrentEntry = CurrentEntry->Flink; 452 } 453 454 if (CurrentEntry == &ThreadPoolIOWorkerThreadsList) 455 { 456 /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */ 457 if (ThreadPoolIOWorkerThreads == 0) 458 { 459 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n", WorkItem); 460 ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList)); 461 return STATUS_NO_MEMORY; 462 } 463 else 464 { 465 /* pick the first worker thread */ 466 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink; 467 IoThread = CONTAINING_RECORD(CurrentEntry, 468 RTLP_IOWORKERTHREAD, 469 ListEntry); 470 471 /* Since this might be the persistent worker thread, don't run as a 472 long function */ 473 WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION; 474 } 475 } 476 477 /* Move the picked thread to the end of the list. Since we're always searching 478 from the beginning, this improves distribution of work items */ 479 RemoveEntryList(&IoThread->ListEntry); 480 InsertTailList(&ThreadPoolIOWorkerThreadsList, 481 &IoThread->ListEntry); 482 } 483 484 ASSERT(IoThread != NULL); 485 486 InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests); 487 488 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 489 { 490 /* We're about to queue a long function, mark the thread */ 491 IoThread->Flags |= WT_EXECUTELONGFUNCTION; 492 493 InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests); 494 } 495 496 /* It's time to queue the work item */ 497 Status = NtQueueApcThread(IoThread->ThreadHandle, 498 RtlpExecuteIoWorkItem, 499 IoThread, 500 NULL, 501 WorkItem); 502 if (!NT_SUCCESS(Status)) 503 { 504 DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function); 505 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests); 506 507 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION) 508 { 509 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests); 510 } 511 } 512 513 return Status; 514 } 515 516 static BOOLEAN 517 RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL) 518 { 519 NTSTATUS Status; 520 ULONG IoPending; 521 BOOLEAN CreatedHandle = FALSE; 522 BOOLEAN IsIoPending = TRUE; 523 524 if (ThreadHandle == NULL) 525 { 526 Status = NtDuplicateObject(NtCurrentProcess(), 527 NtCurrentThread(), 528 NtCurrentProcess(), 529 &ThreadHandle, 530 0, 531 0, 532 DUPLICATE_SAME_ACCESS); 533 if (!NT_SUCCESS(Status)) 534 { 535 return IsIoPending; 536 } 537 538 CreatedHandle = TRUE; 539 } 540 541 Status = NtQueryInformationThread(ThreadHandle, 542 ThreadIsIoPending, 543 &IoPending, 544 sizeof(IoPending), 545 NULL); 546 if (NT_SUCCESS(Status) && IoPending == 0) 547 { 548 IsIoPending = FALSE; 549 } 550 551 if (CreatedHandle) 552 { 553 NtClose(ThreadHandle); 554 } 555 556 return IsIoPending; 557 } 558 559 static ULONG 560 NTAPI 561 RtlpIoWorkerThreadProc(IN PVOID Parameter) 562 { 563 volatile RTLP_IOWORKERTHREAD ThreadInfo; 564 LARGE_INTEGER Timeout; 565 BOOLEAN Terminate; 566 NTSTATUS Status = STATUS_SUCCESS; 567 568 if (InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS) 569 { 570 /* Oops, too many worker threads... */ 571 goto InitFailed; 572 } 573 574 /* Get a thread handle to ourselves */ 575 Status = NtDuplicateObject(NtCurrentProcess(), 576 NtCurrentThread(), 577 NtCurrentProcess(), 578 (PHANDLE)&ThreadInfo.ThreadHandle, 579 0, 580 0, 581 DUPLICATE_SAME_ACCESS); 582 if (!NT_SUCCESS(Status)) 583 { 584 DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status); 585 586 InitFailed: 587 InterlockedDecrement(&ThreadPoolIOWorkerThreads); 588 589 /* Signal initialization completion */ 590 InterlockedExchange((PLONG)Parameter, 591 1); 592 593 RtlpExitThreadFunc(Status); 594 return 0; 595 } 596 597 ThreadInfo.Flags = 0; 598 599 /* Insert the thread into the list */ 600 InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList, 601 (PLIST_ENTRY)&ThreadInfo.ListEntry); 602 603 /* Signal initialization completion */ 604 InterlockedExchange((PLONG)Parameter, 605 1); 606 607 for (;;) 608 { 609 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */ 610 611 Wait: 612 do 613 { 614 /* Perform an alertable wait, the work items are going to be executed as APCs */ 615 Status = NtDelayExecution(TRUE, 616 &Timeout); 617 618 /* Loop as long as we executed an APC */ 619 } while (Status != STATUS_SUCCESS); 620 621 /* We timed out, let's see if we're allowed to terminate */ 622 Terminate = FALSE; 623 624 Status = RtlEnterCriticalSection(&ThreadPoolLock); 625 if (NT_SUCCESS(Status)) 626 { 627 if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD) 628 { 629 /* This thread is supposed to be persistent. Don't terminate! */ 630 RtlLeaveCriticalSection(&ThreadPoolLock); 631 632 Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL; 633 goto Wait; 634 } 635 636 /* FIXME - figure out an effective method to determine if it's appropriate to 637 lower the number of threads. For now let's always terminate if there's 638 at least one thread and no queued items. */ 639 Terminate = (*((volatile LONG*)&ThreadPoolIOWorkerThreads) - *((volatile LONG*)&ThreadPoolIOWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) && 640 (*((volatile LONG*)&ThreadPoolIOWorkerThreadsRequests) == 0); 641 642 if (Terminate) 643 { 644 /* Prevent termination as long as IO is pending */ 645 Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle); 646 } 647 648 if (Terminate) 649 { 650 /* Rundown the thread and unlink it from the list */ 651 InterlockedDecrement(&ThreadPoolIOWorkerThreads); 652 RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry); 653 } 654 655 RtlLeaveCriticalSection(&ThreadPoolLock); 656 657 if (Terminate) 658 { 659 /* Break the infinite loop and terminate */ 660 Status = STATUS_SUCCESS; 661 break; 662 } 663 } 664 else 665 { 666 DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status); 667 break; 668 } 669 } 670 671 NtClose(ThreadInfo.ThreadHandle); 672 RtlpExitThreadFunc(Status); 673 return 0; 674 } 675 676 static ULONG 677 NTAPI 678 RtlpWorkerThreadProc(IN PVOID Parameter) 679 { 680 LARGE_INTEGER Timeout; 681 BOOLEAN Terminate; 682 PVOID SystemArgument2; 683 IO_STATUS_BLOCK IoStatusBlock; 684 ULONG TimeoutCount = 0; 685 PKNORMAL_ROUTINE ApcRoutine; 686 NTSTATUS Status = STATUS_SUCCESS; 687 688 if (InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS) 689 { 690 /* Signal initialization completion */ 691 InterlockedExchange((PLONG)Parameter, 692 1); 693 694 /* Oops, too many worker threads... */ 695 RtlpExitThreadFunc(Status); 696 return 0; 697 } 698 699 /* Signal initialization completion */ 700 InterlockedExchange((PLONG)Parameter, 701 1); 702 703 for (;;) 704 { 705 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */ 706 707 /* Dequeue a completion message */ 708 Status = NtRemoveIoCompletion(ThreadPoolCompletionPort, 709 (PVOID*)&ApcRoutine, 710 &SystemArgument2, 711 &IoStatusBlock, 712 &Timeout); 713 714 if (Status == STATUS_SUCCESS) 715 { 716 TimeoutCount = 0; 717 718 _SEH2_TRY 719 { 720 /* Call the APC routine */ 721 ApcRoutine(NULL, 722 (PVOID)IoStatusBlock.Information, 723 SystemArgument2); 724 } 725 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER) 726 { 727 (void)0; 728 } 729 _SEH2_END; 730 } 731 else 732 { 733 Terminate = FALSE; 734 735 if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock))) 736 continue; 737 738 /* FIXME - this should be optimized, check if there's requests, etc */ 739 740 if (Status == STATUS_TIMEOUT) 741 { 742 /* FIXME - we might want to optimize this */ 743 if (TimeoutCount++ > 2 && 744 *((volatile LONG*)&ThreadPoolWorkerThreads) - *((volatile LONG*)&ThreadPoolWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) 745 { 746 Terminate = TRUE; 747 } 748 } 749 else 750 Terminate = TRUE; 751 752 RtlLeaveCriticalSection(&ThreadPoolLock); 753 754 if (Terminate) 755 { 756 /* Prevent termination as long as IO is pending */ 757 Terminate = !RtlpIsIoPending(NULL); 758 } 759 760 if (Terminate) 761 { 762 InterlockedDecrement(&ThreadPoolWorkerThreads); 763 Status = STATUS_SUCCESS; 764 break; 765 } 766 } 767 } 768 769 RtlpExitThreadFunc(Status); 770 return 0; 771 772 } 773 774 /* 775 * @implemented 776 */ 777 NTSTATUS 778 NTAPI 779 RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function, 780 IN PVOID Context OPTIONAL, 781 IN ULONG Flags) 782 { 783 LONG FreeWorkers; 784 NTSTATUS Status; 785 PRTLP_WORKITEM WorkItem; 786 787 DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags); 788 789 /* Initialize the thread pool if not already initialized */ 790 if (!IsThreadPoolInitialized()) 791 { 792 Status = RtlpInitializeThreadPool(); 793 794 if (!NT_SUCCESS(Status)) 795 return Status; 796 } 797 798 /* Allocate a work item */ 799 WorkItem = RtlAllocateHeap(RtlGetProcessHeap(), 800 0, 801 sizeof(RTLP_WORKITEM)); 802 if (WorkItem == NULL) 803 return STATUS_NO_MEMORY; 804 805 WorkItem->Function = Function; 806 WorkItem->Context = Context; 807 WorkItem->Flags = Flags; 808 809 if (Flags & WT_TRANSFER_IMPERSONATION) 810 { 811 Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle); 812 813 if (!NT_SUCCESS(Status)) 814 { 815 DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status); 816 goto Cleanup; 817 } 818 } 819 else 820 WorkItem->TokenHandle = NULL; 821 822 Status = RtlEnterCriticalSection(&ThreadPoolLock); 823 if (NT_SUCCESS(Status)) 824 { 825 if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD | WT_EXECUTEINPERSISTENTIOTHREAD)) 826 { 827 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */ 828 829 FreeWorkers = ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsLongRequests; 830 831 if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION)) == WT_EXECUTELONGFUNCTION) && 832 PersistentIoThread != NULL) 833 { 834 /* We shouldn't queue a long function into the persistent IO thread */ 835 FreeWorkers--; 836 } 837 838 /* See if it's a good idea to grow the pool */ 839 if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS && 840 (FreeWorkers <= 0 || ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD)) 841 { 842 /* Grow the thread pool */ 843 Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc); 844 845 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolIOWorkerThreads) != 0) 846 { 847 /* We failed to create the thread, but there's at least one there so 848 we can at least queue the request */ 849 Status = STATUS_SUCCESS; 850 } 851 } 852 853 if (NT_SUCCESS(Status)) 854 { 855 /* Queue a IO worker thread */ 856 Status = RtlpQueueIoWorkerThread(WorkItem); 857 } 858 } 859 else 860 { 861 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */ 862 863 FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests; 864 865 /* See if it's a good idea to grow the pool */ 866 if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS && 867 (FreeWorkers <= 0 || ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD)) 868 { 869 /* Grow the thread pool */ 870 Status = RtlpStartWorkerThread(RtlpWorkerThreadProc); 871 872 if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolWorkerThreads) != 0) 873 { 874 /* We failed to create the thread, but there's at least one there so 875 we can at least queue the request */ 876 Status = STATUS_SUCCESS; 877 } 878 } 879 880 if (NT_SUCCESS(Status)) 881 { 882 /* Queue a normal worker thread */ 883 Status = RtlpQueueWorkerThread(WorkItem); 884 } 885 } 886 887 RtlLeaveCriticalSection(&ThreadPoolLock); 888 } 889 890 if (!NT_SUCCESS(Status)) 891 { 892 if (WorkItem->TokenHandle != NULL) 893 { 894 NtClose(WorkItem->TokenHandle); 895 } 896 897 Cleanup: 898 RtlFreeHeap(RtlGetProcessHeap(), 899 0, 900 WorkItem); 901 } 902 903 return Status; 904 } 905 906 /* 907 * @unimplemented 908 */ 909 NTSTATUS 910 NTAPI 911 RtlSetIoCompletionCallback(IN HANDLE FileHandle, 912 IN PIO_APC_ROUTINE Callback, 913 IN ULONG Flags) 914 { 915 IO_STATUS_BLOCK IoStatusBlock; 916 FILE_COMPLETION_INFORMATION FileCompletionInfo; 917 NTSTATUS Status; 918 919 DPRINT("RtlSetIoCompletionCallback(0x%p, 0x%p, 0x%x)\n", FileHandle, Callback, Flags); 920 921 /* Initialize the thread pool if not already initialized */ 922 if (!IsThreadPoolInitialized()) 923 { 924 Status = RtlpInitializeThreadPool(); 925 if (!NT_SUCCESS(Status)) 926 return Status; 927 } 928 929 FileCompletionInfo.Port = ThreadPoolCompletionPort; 930 FileCompletionInfo.Key = (PVOID)Callback; 931 932 Status = NtSetInformationFile(FileHandle, 933 &IoStatusBlock, 934 &FileCompletionInfo, 935 sizeof(FileCompletionInfo), 936 FileCompletionInformation); 937 938 return Status; 939 } 940 941 /* 942 * @implemented 943 */ 944 NTSTATUS 945 NTAPI 946 RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread, 947 IN PRTL_EXIT_POOL_THREAD ExitPoolThread) 948 { 949 RtlpStartThreadFunc = StartPoolThread; 950 RtlpExitThreadFunc = ExitPoolThread; 951 return STATUS_SUCCESS; 952 } 953