1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 namespace IceInternal 6 { 7 using System.Collections.Generic; 8 using System.Diagnostics; 9 using System.Threading; 10 using System.Threading.Tasks; 11 ThreadPoolWorkItem()12 public delegate void ThreadPoolWorkItem(); AsyncCallback(object state)13 public delegate void AsyncCallback(object state); 14 15 // 16 // Thread pool threads set a custom synchronization context to ensure that 17 // continuations from awaited methods continue executing on the thread pool 18 // and not on the thread that notifies the awaited task. 19 // 20 sealed class ThreadPoolSynchronizationContext : SynchronizationContext 21 { ThreadPoolSynchronizationContext(ThreadPool threadPool)22 public ThreadPoolSynchronizationContext(ThreadPool threadPool) 23 { 24 _threadPool = threadPool; 25 } 26 Post(SendOrPostCallback d, object state)27 public override void Post(SendOrPostCallback d, object state) 28 { 29 // 30 // Dispatch the continuation on the thread pool if this isn't called 31 // already from a thread pool thread. We don't use the dispatcher 32 // for the continuations, the dispatcher is only used when the 33 // call is initialy invoked (e.g.: a servant dispatch after being 34 // received is dispatched using the dispatcher which might dispatch 35 // the call on the UI thread which will then use its own synchronization 36 // context to execute continuations). 37 // 38 var ctx = Current as ThreadPoolSynchronizationContext; 39 if(ctx != this) 40 { 41 _threadPool.dispatch(() => { d(state); }, null, false); 42 } 43 else 44 { 45 d(state); 46 } 47 } 48 Send(SendOrPostCallback d, object state)49 public override void Send(SendOrPostCallback d, object state) 50 { 51 throw new System.NotSupportedException("the thread pool doesn't support synchronous calls"); 52 } 53 54 private ThreadPool _threadPool; 55 } 56 57 internal struct ThreadPoolMessage 58 { ThreadPoolMessageIceInternal.ThreadPoolMessage59 public ThreadPoolMessage(object mutex) 60 { 61 _mutex = mutex; 62 _finish = false; 63 _finishWithIO = false; 64 } 65 startIOScopeIceInternal.ThreadPoolMessage66 public bool startIOScope(ref ThreadPoolCurrent current) 67 { 68 // This must be called with the handler locked. 69 _finishWithIO = current.startMessage(); 70 return _finishWithIO; 71 } 72 finishIOScopeIceInternal.ThreadPoolMessage73 public void finishIOScope(ref ThreadPoolCurrent current) 74 { 75 if(_finishWithIO) 76 { 77 lock(_mutex) 78 { 79 current.finishMessage(true); 80 } 81 } 82 } 83 completedIceInternal.ThreadPoolMessage84 public void completed(ref ThreadPoolCurrent current) 85 { 86 // 87 // Call finishMessage once IO is completed only if serialization is not enabled. 88 // Otherwise, finishMessage will be called when the event handler is done with 89 // the message (it will be called from destroy below). 90 // 91 Debug.Assert(_finishWithIO); 92 if(current.ioCompleted()) 93 { 94 _finishWithIO = false; 95 _finish = true; 96 } 97 } 98 destroyIceInternal.ThreadPoolMessage99 public void destroy(ref ThreadPoolCurrent current) 100 { 101 if(_finish) 102 { 103 // 104 // A ThreadPoolMessage instance must be created outside the synchronization 105 // of the event handler. We need to lock the event handler here to call 106 // finishMessage. 107 // 108 lock(_mutex) 109 { 110 current.finishMessage(false); 111 Debug.Assert(!current.completedSynchronously); 112 } 113 } 114 } 115 116 private object _mutex; 117 private bool _finish; 118 private bool _finishWithIO; 119 } 120 121 public struct ThreadPoolCurrent 122 { ThreadPoolCurrentIceInternal.ThreadPoolCurrent123 public ThreadPoolCurrent(ThreadPool threadPool, EventHandler handler, int op) 124 { 125 _threadPool = threadPool; 126 _handler = handler; 127 operation = op; 128 completedSynchronously = false; 129 } 130 131 public readonly int operation; 132 public bool completedSynchronously; 133 ioCompletedIceInternal.ThreadPoolCurrent134 public bool ioCompleted() 135 { 136 return _threadPool.serialize(); 137 } 138 startMessageIceInternal.ThreadPoolCurrent139 public bool startMessage() 140 { 141 return _threadPool.startMessage(ref this); 142 } 143 finishMessageIceInternal.ThreadPoolCurrent144 public void finishMessage(bool fromIOThread) 145 { 146 _threadPool.finishMessage(ref this, fromIOThread); 147 } 148 149 internal readonly ThreadPool _threadPool; 150 internal readonly EventHandler _handler; 151 } 152 153 public sealed class ThreadPool : System.Threading.Tasks.TaskScheduler 154 { ThreadPool(Instance instance, string prefix, int timeout)155 public ThreadPool(Instance instance, string prefix, int timeout) 156 { 157 Ice.Properties properties = instance.initializationData().properties; 158 159 _instance = instance; 160 _dispatcher = instance.initializationData().dispatcher; 161 _destroyed = false; 162 _prefix = prefix; 163 _threadIndex = 0; 164 _inUse = 0; 165 _serialize = properties.getPropertyAsInt(_prefix + ".Serialize") > 0; 166 _serverIdleTime = timeout; 167 168 string programName = properties.getProperty("Ice.ProgramName"); 169 if(programName.Length > 0) 170 { 171 _threadPrefix = programName + "-" + _prefix; 172 } 173 else 174 { 175 _threadPrefix = _prefix; 176 } 177 178 // 179 // We use just one thread as the default. This is the fastest 180 // possible setting, still allows one level of nesting, and 181 // doesn't require to make the servants thread safe. 182 // 183 int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1); 184 if(size < 1) 185 { 186 string s = _prefix + ".Size < 1; Size adjusted to 1"; 187 _instance.initializationData().logger.warning(s); 188 size = 1; 189 } 190 191 int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); 192 if(sizeMax < size) 193 { 194 string s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")"; 195 _instance.initializationData().logger.warning(s); 196 sizeMax = size; 197 } 198 199 int sizeWarn = properties.getPropertyAsInt(_prefix + ".SizeWarn"); 200 if(sizeWarn != 0 && sizeWarn < size) 201 { 202 string s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")"; 203 _instance.initializationData().logger.warning(s); 204 sizeWarn = size; 205 } 206 else if(sizeWarn > sizeMax) 207 { 208 string s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax (" 209 + sizeMax + ")"; 210 _instance.initializationData().logger.warning(s); 211 sizeWarn = sizeMax; 212 } 213 214 int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60); 215 if(threadIdleTime < 0) 216 { 217 string s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0"; 218 _instance.initializationData().logger.warning(s); 219 threadIdleTime = 0; 220 } 221 222 _size = size; 223 _sizeMax = sizeMax; 224 _sizeWarn = sizeWarn; 225 _threadIdleTime = threadIdleTime; 226 227 int stackSize = properties.getPropertyAsInt(_prefix + ".StackSize"); 228 if(stackSize < 0) 229 { 230 string s = _prefix + ".StackSize < 0; Size adjusted to OS default"; 231 _instance.initializationData().logger.warning(s); 232 stackSize = 0; 233 } 234 _stackSize = stackSize; 235 236 _priority = properties.getProperty(_prefix + ".ThreadPriority").Length > 0 ? 237 Util.stringToThreadPriority(properties.getProperty(_prefix + ".ThreadPriority")) : 238 Util.stringToThreadPriority(properties.getProperty("Ice.ThreadPriority")); 239 240 if(_instance.traceLevels().threadPool >= 1) 241 { 242 string s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " + 243 _sizeWarn; 244 _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); 245 } 246 247 _workItems = new Queue<ThreadPoolWorkItem>(); 248 249 try 250 { 251 _threads = new List<WorkerThread>(); 252 for(int i = 0; i < _size; ++i) 253 { 254 WorkerThread thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); 255 thread.start(_priority); 256 _threads.Add(thread); 257 } 258 } 259 catch(System.Exception ex) 260 { 261 string s = "cannot create thread for `" + _prefix + "':\n" + ex; 262 _instance.initializationData().logger.error(s); 263 264 destroy(); 265 joinWithAllThreads(); 266 throw; 267 } 268 } 269 destroy()270 public void destroy() 271 { 272 lock(this) 273 { 274 if(_destroyed) 275 { 276 return; 277 } 278 _destroyed = true; 279 Monitor.PulseAll(this); 280 } 281 } 282 updateObservers()283 public void updateObservers() 284 { 285 lock(this) 286 { 287 foreach(WorkerThread t in _threads) 288 { 289 t.updateObserver(); 290 } 291 } 292 } 293 initialize(EventHandler handler)294 public void initialize(EventHandler handler) 295 { 296 handler._ready = 0; 297 handler._pending = 0; 298 handler._started = 0; 299 handler._finish = false; 300 handler._hasMoreData = false; 301 handler._registered = 0; 302 } 303 register(EventHandler handler, int op)304 public void register(EventHandler handler, int op) 305 { 306 update(handler, SocketOperation.None, op); 307 } 308 update(EventHandler handler, int remove, int add)309 public void update(EventHandler handler, int remove, int add) 310 { 311 lock(this) 312 { 313 Debug.Assert(!_destroyed); 314 315 // Don't remove what needs to be added 316 remove &= ~add; 317 318 // Don't remove/add if already un-registered or registered 319 remove &= handler._registered; 320 add &= ~handler._registered; 321 if(remove == add) 322 { 323 return; 324 } 325 326 handler._registered &= ~remove; 327 handler._registered |= add; 328 329 if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0) 330 { 331 handler._pending |= SocketOperation.Read; 332 executeNonBlocking(() => 333 { 334 messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Read)); 335 }); 336 } 337 else if((add & SocketOperation.Write) != 0 && (handler._pending & SocketOperation.Write) == 0) 338 { 339 handler._pending |= SocketOperation.Write; 340 executeNonBlocking(() => 341 { 342 messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write)); 343 }); 344 } 345 } 346 } 347 unregister(EventHandler handler, int op)348 public void unregister(EventHandler handler, int op) 349 { 350 update(handler, op, SocketOperation.None); 351 } 352 finish(EventHandler handler)353 public void finish(EventHandler handler) 354 { 355 lock(this) 356 { 357 Debug.Assert(!_destroyed); 358 359 handler._registered = SocketOperation.None; 360 361 // 362 // If there are no pending asynchronous operations, we can call finish on the handler now. 363 // 364 if(handler._pending == 0) 365 { 366 executeNonBlocking(() => 367 { 368 ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None); 369 handler.finished(ref current); 370 }); 371 } 372 else 373 { 374 handler._finish = true; 375 } 376 } 377 } 378 dispatchFromThisThread(System.Action call, Ice.Connection con)379 public void dispatchFromThisThread(System.Action call, Ice.Connection con) 380 { 381 if(_dispatcher != null) 382 { 383 try 384 { 385 _dispatcher(call, con); 386 } 387 catch(System.Exception ex) 388 { 389 if(_instance.initializationData().properties.getPropertyAsIntWithDefault( 390 "Ice.Warn.Dispatch", 1) > 1) 391 { 392 _instance.initializationData().logger.warning("dispatch exception:\n" + ex); 393 } 394 } 395 } 396 else 397 { 398 call(); 399 } 400 } 401 dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true)402 public void dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true) 403 { 404 lock(this) 405 { 406 if(_destroyed) 407 { 408 throw new Ice.CommunicatorDestroyedException(); 409 } 410 411 if(useDispatcher) 412 { 413 _workItems.Enqueue(() => { dispatchFromThisThread(call, con); }); 414 } 415 else 416 { 417 _workItems.Enqueue(() => { call(); }); 418 } 419 Monitor.Pulse(this); 420 421 // 422 // If this is a dynamic thread pool which can still grow and if all threads are 423 // currently busy dispatching or about to dispatch, we spawn a new thread to 424 // execute this new work item right away. 425 // 426 if(_threads.Count < _sizeMax && 427 (_inUse + _workItems.Count) > _threads.Count && 428 !_destroyed) 429 { 430 if(_instance.traceLevels().threadPool >= 1) 431 { 432 string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1); 433 _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); 434 } 435 436 try 437 { 438 WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); 439 t.start(_priority); 440 _threads.Add(t); 441 } 442 catch(System.Exception ex) 443 { 444 string s = "cannot create thread for `" + _prefix + "':\n" + ex; 445 _instance.initializationData().logger.error(s); 446 } 447 } 448 } 449 } 450 executeNonBlocking(ThreadPoolWorkItem workItem)451 public void executeNonBlocking(ThreadPoolWorkItem workItem) 452 { 453 lock(this) 454 { 455 Debug.Assert(!_destroyed); 456 _instance.asyncIOThread().queue(workItem); 457 } 458 } 459 joinWithAllThreads()460 public void joinWithAllThreads() 461 { 462 // 463 // _threads is immutable after destroy() has been called, 464 // therefore no synchronization is needed. (Synchronization 465 // wouldn't be possible here anyway, because otherwise the 466 // other threads would never terminate.) 467 // 468 Debug.Assert(_destroyed); 469 foreach(WorkerThread thread in _threads) 470 { 471 thread.join(); 472 } 473 } 474 prefix()475 public string prefix() 476 { 477 return _prefix; 478 } 479 serialize()480 public bool serialize() 481 { 482 return _serialize; 483 } 484 QueueTask(System.Threading.Tasks.Task task)485 protected sealed override void QueueTask(System.Threading.Tasks.Task task) 486 { 487 dispatch(() => { TryExecuteTask(task); }, null, _dispatcher != null); 488 } 489 TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)490 protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) 491 { 492 if(!taskWasPreviouslyQueued) 493 { 494 dispatchFromThisThread(() => { TryExecuteTask(task); }, null); 495 return true; 496 } 497 return false; 498 } 499 TryDequeue(System.Threading.Tasks.Task task)500 protected sealed override bool TryDequeue(System.Threading.Tasks.Task task) 501 { 502 return false; 503 } 504 GetScheduledTasks()505 protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks() 506 { 507 return new System.Threading.Tasks.Task[0]; 508 } 509 run(WorkerThread thread)510 private void run(WorkerThread thread) 511 { 512 ThreadPoolWorkItem workItem = null; 513 while(true) 514 { 515 lock(this) 516 { 517 if(workItem != null) 518 { 519 Debug.Assert(_inUse > 0); 520 --_inUse; 521 if(_workItems.Count == 0) 522 { 523 thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); 524 } 525 } 526 527 workItem = null; 528 529 while(_workItems.Count == 0) 530 { 531 if(_destroyed) 532 { 533 return; 534 } 535 536 if(_threadIdleTime > 0) 537 { 538 if(!Monitor.Wait(this, _threadIdleTime * 1000) && _workItems.Count == 0) // If timeout 539 { 540 if(_destroyed) 541 { 542 return; 543 } 544 else if(_serverIdleTime == 0 || _threads.Count > 1) 545 { 546 // 547 // If not the last thread or if server idle time isn't configured, 548 // we can exit. Unlike C++/Java, there's no need to have a thread 549 // always spawned in the thread pool because all the IO is done 550 // by the .NET thread pool threads. Instead, we'll just spawn a 551 // new thread when needed (i.e.: when a new work item is queued). 552 // 553 if(_instance.traceLevels().threadPool >= 1) 554 { 555 string s = "shrinking " + _prefix + ": Size=" + (_threads.Count - 1); 556 _instance.initializationData().logger.trace( 557 _instance.traceLevels().threadPoolCat, s); 558 } 559 560 _threads.Remove(thread); 561 _instance.asyncIOThread().queue(() => 562 { 563 thread.join(); 564 }); 565 return; 566 } 567 else 568 { 569 Debug.Assert(_serverIdleTime > 0 && _inUse == 0 && _threads.Count == 1); 570 if(!Monitor.Wait(this, _serverIdleTime * 1000) && 571 _workItems.Count == 0) 572 { 573 if(!_destroyed) 574 { 575 _workItems.Enqueue(() => 576 { 577 try 578 { 579 _instance.objectAdapterFactory().shutdown(); 580 } 581 catch(Ice.CommunicatorDestroyedException) 582 { 583 } 584 }); 585 } 586 } 587 } 588 } 589 } 590 else 591 { 592 Monitor.Wait(this); 593 } 594 } 595 596 Debug.Assert(_workItems.Count > 0); 597 workItem = _workItems.Dequeue(); 598 599 Debug.Assert(_inUse >= 0); 600 ++_inUse; 601 602 thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser); 603 604 if(_sizeMax > 1 && _inUse == _sizeWarn) 605 { 606 string s = "thread pool `" + _prefix + "' is running low on threads\n" 607 + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; 608 _instance.initializationData().logger.warning(s); 609 } 610 } 611 612 try 613 { 614 workItem(); 615 } 616 catch(System.Exception ex) 617 { 618 string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n'; 619 _instance.initializationData().logger.error(s); 620 } 621 } 622 } 623 startMessage(ref ThreadPoolCurrent current)624 public bool startMessage(ref ThreadPoolCurrent current) 625 { 626 Debug.Assert((current._handler._pending & current.operation) != 0); 627 628 if((current._handler._started & current.operation) != 0) 629 { 630 Debug.Assert((current._handler._ready & current.operation) == 0); 631 current._handler._ready |= current.operation; 632 current._handler._started &= ~current.operation; 633 if(!current._handler.finishAsync(current.operation)) // Returns false if the handler is finished. 634 { 635 current._handler._pending &= ~current.operation; 636 if(current._handler._pending == 0 && current._handler._finish) 637 { 638 finish(current._handler); 639 } 640 return false; 641 } 642 } 643 else if((current._handler._ready & current.operation) == 0 && 644 (current._handler._registered & current.operation) != 0) 645 { 646 Debug.Assert((current._handler._started & current.operation) == 0); 647 bool completed = false; 648 if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed)) 649 { 650 current._handler._pending &= ~current.operation; 651 if(current._handler._pending == 0 && current._handler._finish) 652 { 653 finish(current._handler); 654 } 655 return false; 656 } 657 else 658 { 659 current.completedSynchronously = completed; 660 current._handler._started |= current.operation; 661 return false; 662 } 663 } 664 665 if((current._handler._registered & current.operation) != 0) 666 { 667 Debug.Assert((current._handler._ready & current.operation) != 0); 668 current._handler._ready &= ~current.operation; 669 return true; 670 } 671 else 672 { 673 current._handler._pending &= ~current.operation; 674 if(current._handler._pending == 0 && current._handler._finish) 675 { 676 finish(current._handler); 677 } 678 return false; 679 } 680 } 681 finishMessage(ref ThreadPoolCurrent current, bool fromIOThread)682 public void finishMessage(ref ThreadPoolCurrent current, bool fromIOThread) 683 { 684 if((current._handler._registered & current.operation) != 0) 685 { 686 if(fromIOThread) 687 { 688 Debug.Assert((current._handler._ready & current.operation) == 0); 689 bool completed = false; 690 if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed)) 691 { 692 current._handler._pending &= ~current.operation; 693 } 694 else 695 { 696 Debug.Assert((current._handler._pending & current.operation) != 0); 697 current.completedSynchronously = completed; 698 current._handler._started |= current.operation; 699 } 700 } 701 else 702 { 703 ThreadPoolCurrent c = current; 704 executeNonBlocking(() => 705 { 706 messageCallback(c); 707 }); 708 } 709 } 710 else 711 { 712 current._handler._pending &= ~current.operation; 713 } 714 715 if(current._handler._pending == 0 && current._handler._finish) 716 { 717 // There are no more pending async operations, it's time to call finish. 718 finish(current._handler); 719 } 720 } 721 asyncReadCallback(object state)722 public void asyncReadCallback(object state) 723 { 724 messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read)); 725 } 726 asyncWriteCallback(object state)727 public void asyncWriteCallback(object state) 728 { 729 messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write)); 730 } 731 messageCallback(ThreadPoolCurrent current)732 public void messageCallback(ThreadPoolCurrent current) 733 { 734 try 735 { 736 do 737 { 738 current.completedSynchronously = false; 739 current._handler.message(ref current); 740 } 741 while(current.completedSynchronously); 742 } 743 catch(System.Exception ex) 744 { 745 string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString(); 746 _instance.initializationData().logger.error(s); 747 } 748 } 749 getCallback(int operation)750 private AsyncCallback getCallback(int operation) 751 { 752 switch(operation) 753 { 754 case SocketOperation.Read: 755 return asyncReadCallback; 756 case SocketOperation.Write: 757 return asyncWriteCallback; 758 default: 759 Debug.Assert(false); 760 return null; 761 } 762 } 763 764 private Instance _instance; 765 private System.Action<System.Action, Ice.Connection> _dispatcher; 766 private bool _destroyed; 767 private readonly string _prefix; 768 private readonly string _threadPrefix; 769 770 private sealed class WorkerThread 771 { 772 private ThreadPool _threadPool; 773 private Ice.Instrumentation.ThreadObserver _observer; 774 private Ice.Instrumentation.ThreadState _state; 775 WorkerThread(ThreadPool threadPool, string name)776 internal WorkerThread(ThreadPool threadPool, string name) : base() 777 { 778 _threadPool = threadPool; 779 _name = name; 780 _state = Ice.Instrumentation.ThreadState.ThreadStateIdle; 781 updateObserver(); 782 } 783 updateObserver()784 public void updateObserver() 785 { 786 // Must be called with the thread pool mutex locked 787 Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.initializationData().observer; 788 if(obsv != null) 789 { 790 _observer = obsv.getThreadObserver(_threadPool._prefix, _name, _state, _observer); 791 if(_observer != null) 792 { 793 _observer.attach(); 794 } 795 } 796 } 797 setState(Ice.Instrumentation.ThreadState s)798 public void setState(Ice.Instrumentation.ThreadState s) 799 { 800 // Must be called with the thread pool mutex locked 801 if(_observer != null) 802 { 803 if(_state != s) 804 { 805 _observer.stateChanged(_state, s); 806 } 807 } 808 _state = s; 809 } 810 getThread()811 public Thread getThread() 812 { 813 return _thread; 814 } 815 join()816 public void join() 817 { 818 _thread.Join(); 819 } 820 start(ThreadPriority priority)821 public void start(ThreadPriority priority) 822 { 823 if(_threadPool._stackSize == 0) 824 { 825 _thread = new Thread(new ThreadStart(Run)); 826 } 827 else 828 { 829 _thread = new Thread(new ThreadStart(Run), _threadPool._stackSize); 830 } 831 _thread.IsBackground = true; 832 _thread.Name = _name; 833 _thread.Priority = priority; 834 _thread.Start(); 835 } 836 Run()837 public void Run() 838 { 839 // 840 // Set the default synchronization context to allow async/await to run 841 // continuations on the thread pool. 842 // 843 SynchronizationContext.SetSynchronizationContext(new ThreadPoolSynchronizationContext(_threadPool)); 844 845 if(_threadPool._instance.initializationData().threadStart != null) 846 { 847 try 848 { 849 _threadPool._instance.initializationData().threadStart(); 850 } 851 catch(System.Exception ex) 852 { 853 string s = "thread hook start() method raised an unexpected exception in `"; 854 s += _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex; 855 _threadPool._instance.initializationData().logger.error(s); 856 } 857 } 858 859 try 860 { 861 _threadPool.run(this); 862 } 863 catch(System.Exception ex) 864 { 865 string s = "exception in `" + _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex; 866 _threadPool._instance.initializationData().logger.error(s); 867 } 868 869 if(_observer != null) 870 { 871 _observer.detach(); 872 } 873 874 if(_threadPool._instance.initializationData().threadStop != null) 875 { 876 try 877 { 878 _threadPool._instance.initializationData().threadStop(); 879 } 880 catch(System.Exception ex) 881 { 882 string s = "thread hook stop() method raised an unexpected exception in `"; 883 s += _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex; 884 _threadPool._instance.initializationData().logger.error(s); 885 } 886 } 887 } 888 889 private readonly string _name; 890 private Thread _thread; 891 } 892 893 private readonly int _size; // Number of threads that are pre-created. 894 private readonly int _sizeMax; // Maximum number of threads. 895 private readonly int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. 896 private readonly bool _serialize; // True if requests need to be serialized over the connection. 897 private readonly ThreadPriority _priority; 898 private readonly int _serverIdleTime; 899 private readonly int _threadIdleTime; 900 private readonly int _stackSize; 901 902 private List<WorkerThread> _threads; // All threads, running or not. 903 private int _threadIndex; // For assigning thread names. 904 private int _inUse; // Number of threads that are currently in use. 905 906 private Queue<ThreadPoolWorkItem> _workItems; 907 } 908 } 909