1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 namespace IceInternal
6 {
7     using System.Collections.Generic;
8     using System.Diagnostics;
9     using System.Threading;
10     using System.Threading.Tasks;
11 
ThreadPoolWorkItem()12     public delegate void ThreadPoolWorkItem();
AsyncCallback(object state)13     public delegate void AsyncCallback(object state);
14 
15     //
16     // Thread pool threads set a custom synchronization context to ensure that
17     // continuations from awaited methods continue executing on the thread pool
18     // and not on the thread that notifies the awaited task.
19     //
20     sealed class ThreadPoolSynchronizationContext : SynchronizationContext
21     {
ThreadPoolSynchronizationContext(ThreadPool threadPool)22         public ThreadPoolSynchronizationContext(ThreadPool threadPool)
23         {
24             _threadPool = threadPool;
25         }
26 
Post(SendOrPostCallback d, object state)27         public override void Post(SendOrPostCallback d, object state)
28         {
29             //
30             // Dispatch the continuation on the thread pool if this isn't called
31             // already from a thread pool thread. We don't use the dispatcher
32             // for the continuations, the dispatcher is only used when the
33             // call is initialy invoked (e.g.: a servant dispatch after being
34             // received is dispatched using the dispatcher which might dispatch
35             // the call on the UI thread which will then use its own synchronization
36             // context to execute continuations).
37             //
38             var ctx = Current as ThreadPoolSynchronizationContext;
39             if(ctx != this)
40             {
41                 _threadPool.dispatch(() => { d(state); }, null, false);
42             }
43             else
44             {
45                 d(state);
46             }
47         }
48 
Send(SendOrPostCallback d, object state)49         public override void Send(SendOrPostCallback d, object state)
50         {
51             throw new System.NotSupportedException("the thread pool doesn't support synchronous calls");
52         }
53 
54         private ThreadPool _threadPool;
55     }
56 
57     internal struct ThreadPoolMessage
58     {
ThreadPoolMessageIceInternal.ThreadPoolMessage59         public ThreadPoolMessage(object mutex)
60         {
61             _mutex = mutex;
62             _finish = false;
63             _finishWithIO = false;
64         }
65 
startIOScopeIceInternal.ThreadPoolMessage66         public bool startIOScope(ref ThreadPoolCurrent current)
67         {
68             // This must be called with the handler locked.
69             _finishWithIO = current.startMessage();
70             return _finishWithIO;
71         }
72 
finishIOScopeIceInternal.ThreadPoolMessage73         public void finishIOScope(ref ThreadPoolCurrent current)
74         {
75             if(_finishWithIO)
76             {
77                 lock(_mutex)
78                 {
79                     current.finishMessage(true);
80                 }
81             }
82         }
83 
completedIceInternal.ThreadPoolMessage84         public void completed(ref ThreadPoolCurrent current)
85         {
86             //
87             // Call finishMessage once IO is completed only if serialization is not enabled.
88             // Otherwise, finishMessage will be called when the event handler is done with
89             // the message (it will be called from destroy below).
90             //
91             Debug.Assert(_finishWithIO);
92             if(current.ioCompleted())
93             {
94                 _finishWithIO = false;
95                 _finish = true;
96             }
97         }
98 
destroyIceInternal.ThreadPoolMessage99         public void destroy(ref ThreadPoolCurrent current)
100         {
101             if(_finish)
102             {
103                 //
104                 // A ThreadPoolMessage instance must be created outside the synchronization
105                 // of the event handler. We need to lock the event handler here to call
106                 // finishMessage.
107                 //
108                 lock(_mutex)
109                 {
110                     current.finishMessage(false);
111                     Debug.Assert(!current.completedSynchronously);
112                 }
113             }
114         }
115 
116         private object _mutex;
117         private bool _finish;
118         private bool _finishWithIO;
119     }
120 
121     public struct ThreadPoolCurrent
122     {
ThreadPoolCurrentIceInternal.ThreadPoolCurrent123         public ThreadPoolCurrent(ThreadPool threadPool, EventHandler handler, int op)
124         {
125             _threadPool = threadPool;
126             _handler = handler;
127             operation = op;
128             completedSynchronously = false;
129         }
130 
131         public readonly int operation;
132         public bool completedSynchronously;
133 
ioCompletedIceInternal.ThreadPoolCurrent134         public bool ioCompleted()
135         {
136             return _threadPool.serialize();
137         }
138 
startMessageIceInternal.ThreadPoolCurrent139         public bool startMessage()
140         {
141             return _threadPool.startMessage(ref this);
142         }
143 
finishMessageIceInternal.ThreadPoolCurrent144         public void finishMessage(bool fromIOThread)
145         {
146             _threadPool.finishMessage(ref this, fromIOThread);
147         }
148 
149         internal readonly ThreadPool _threadPool;
150         internal readonly EventHandler _handler;
151     }
152 
153     public sealed class ThreadPool : System.Threading.Tasks.TaskScheduler
154     {
ThreadPool(Instance instance, string prefix, int timeout)155         public ThreadPool(Instance instance, string prefix, int timeout)
156         {
157             Ice.Properties properties = instance.initializationData().properties;
158 
159             _instance = instance;
160             _dispatcher = instance.initializationData().dispatcher;
161             _destroyed = false;
162             _prefix = prefix;
163             _threadIndex = 0;
164             _inUse = 0;
165             _serialize = properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
166             _serverIdleTime = timeout;
167 
168             string programName = properties.getProperty("Ice.ProgramName");
169             if(programName.Length > 0)
170             {
171                 _threadPrefix = programName + "-" + _prefix;
172             }
173             else
174             {
175                 _threadPrefix = _prefix;
176             }
177 
178             //
179             // We use just one thread as the default. This is the fastest
180             // possible setting, still allows one level of nesting, and
181             // doesn't require to make the servants thread safe.
182             //
183             int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
184             if(size < 1)
185             {
186                 string s = _prefix + ".Size < 1; Size adjusted to 1";
187                 _instance.initializationData().logger.warning(s);
188                 size = 1;
189             }
190 
191             int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
192             if(sizeMax < size)
193             {
194                 string s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
195                 _instance.initializationData().logger.warning(s);
196                 sizeMax = size;
197             }
198 
199             int sizeWarn = properties.getPropertyAsInt(_prefix + ".SizeWarn");
200             if(sizeWarn != 0 && sizeWarn < size)
201             {
202                 string s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")";
203                 _instance.initializationData().logger.warning(s);
204                 sizeWarn = size;
205             }
206             else if(sizeWarn > sizeMax)
207             {
208                 string s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax ("
209                     + sizeMax + ")";
210                 _instance.initializationData().logger.warning(s);
211                 sizeWarn = sizeMax;
212             }
213 
214             int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
215             if(threadIdleTime < 0)
216             {
217                 string s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
218                 _instance.initializationData().logger.warning(s);
219                 threadIdleTime = 0;
220             }
221 
222             _size = size;
223             _sizeMax = sizeMax;
224             _sizeWarn = sizeWarn;
225             _threadIdleTime = threadIdleTime;
226 
227             int stackSize = properties.getPropertyAsInt(_prefix + ".StackSize");
228             if(stackSize < 0)
229             {
230                 string s = _prefix + ".StackSize < 0; Size adjusted to OS default";
231                 _instance.initializationData().logger.warning(s);
232                 stackSize = 0;
233             }
234             _stackSize = stackSize;
235 
236             _priority = properties.getProperty(_prefix + ".ThreadPriority").Length > 0 ?
237                 Util.stringToThreadPriority(properties.getProperty(_prefix + ".ThreadPriority")) :
238                 Util.stringToThreadPriority(properties.getProperty("Ice.ThreadPriority"));
239 
240             if(_instance.traceLevels().threadPool >= 1)
241             {
242                 string s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " +
243                            _sizeWarn;
244                 _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
245             }
246 
247             _workItems = new Queue<ThreadPoolWorkItem>();
248 
249             try
250             {
251                 _threads = new List<WorkerThread>();
252                 for(int i = 0; i < _size; ++i)
253                 {
254                     WorkerThread thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
255                     thread.start(_priority);
256                     _threads.Add(thread);
257                 }
258             }
259             catch(System.Exception ex)
260             {
261                 string s = "cannot create thread for `" + _prefix + "':\n" + ex;
262                 _instance.initializationData().logger.error(s);
263 
264                 destroy();
265                 joinWithAllThreads();
266                 throw;
267             }
268         }
269 
destroy()270         public void destroy()
271         {
272             lock(this)
273             {
274                 if(_destroyed)
275                 {
276                     return;
277                 }
278                 _destroyed = true;
279                 Monitor.PulseAll(this);
280             }
281         }
282 
updateObservers()283         public void updateObservers()
284         {
285             lock(this)
286             {
287                 foreach(WorkerThread t in _threads)
288                 {
289                     t.updateObserver();
290                 }
291             }
292         }
293 
initialize(EventHandler handler)294         public void initialize(EventHandler handler)
295         {
296             handler._ready = 0;
297             handler._pending = 0;
298             handler._started = 0;
299             handler._finish = false;
300             handler._hasMoreData = false;
301             handler._registered = 0;
302         }
303 
register(EventHandler handler, int op)304         public void register(EventHandler handler, int op)
305         {
306             update(handler, SocketOperation.None, op);
307         }
308 
update(EventHandler handler, int remove, int add)309         public void update(EventHandler handler, int remove, int add)
310         {
311             lock(this)
312             {
313                 Debug.Assert(!_destroyed);
314 
315                 // Don't remove what needs to be added
316                 remove &= ~add;
317 
318                 // Don't remove/add if already un-registered or registered
319                 remove &= handler._registered;
320                 add &= ~handler._registered;
321                 if(remove == add)
322                 {
323                     return;
324                 }
325 
326                 handler._registered &= ~remove;
327                 handler._registered |= add;
328 
329                 if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0)
330                 {
331                     handler._pending |= SocketOperation.Read;
332                     executeNonBlocking(() =>
333                         {
334                             messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Read));
335                         });
336                 }
337                 else if((add & SocketOperation.Write) != 0 && (handler._pending & SocketOperation.Write) == 0)
338                 {
339                     handler._pending |= SocketOperation.Write;
340                     executeNonBlocking(() =>
341                         {
342                             messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write));
343                         });
344                 }
345             }
346         }
347 
unregister(EventHandler handler, int op)348         public void unregister(EventHandler handler, int op)
349         {
350             update(handler, op, SocketOperation.None);
351         }
352 
finish(EventHandler handler)353         public void finish(EventHandler handler)
354         {
355             lock(this)
356             {
357                 Debug.Assert(!_destroyed);
358 
359                 handler._registered = SocketOperation.None;
360 
361                 //
362                 // If there are no pending asynchronous operations, we can call finish on the handler now.
363                 //
364                 if(handler._pending == 0)
365                 {
366                     executeNonBlocking(() =>
367                        {
368                            ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None);
369                            handler.finished(ref current);
370                        });
371                 }
372                 else
373                 {
374                     handler._finish = true;
375                 }
376             }
377         }
378 
dispatchFromThisThread(System.Action call, Ice.Connection con)379         public void dispatchFromThisThread(System.Action call, Ice.Connection con)
380         {
381             if(_dispatcher != null)
382             {
383                 try
384                 {
385                     _dispatcher(call, con);
386                 }
387                 catch(System.Exception ex)
388                 {
389                     if(_instance.initializationData().properties.getPropertyAsIntWithDefault(
390                            "Ice.Warn.Dispatch", 1) > 1)
391                     {
392                         _instance.initializationData().logger.warning("dispatch exception:\n" + ex);
393                     }
394                 }
395             }
396             else
397             {
398                 call();
399             }
400         }
401 
dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true)402         public void dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true)
403         {
404             lock(this)
405             {
406                 if(_destroyed)
407                 {
408                     throw new Ice.CommunicatorDestroyedException();
409                 }
410 
411                 if(useDispatcher)
412                 {
413                     _workItems.Enqueue(() => { dispatchFromThisThread(call, con); });
414                 }
415                 else
416                 {
417                     _workItems.Enqueue(() => { call(); });
418                 }
419                 Monitor.Pulse(this);
420 
421                 //
422                 // If this is a dynamic thread pool which can still grow and if all threads are
423                 // currently busy dispatching or about to dispatch, we spawn a new thread to
424                 // execute this new work item right away.
425                 //
426                 if(_threads.Count < _sizeMax &&
427                    (_inUse + _workItems.Count) > _threads.Count &&
428                    !_destroyed)
429                 {
430                     if(_instance.traceLevels().threadPool >= 1)
431                     {
432                         string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1);
433                         _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
434                     }
435 
436                     try
437                     {
438                         WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
439                         t.start(_priority);
440                         _threads.Add(t);
441                     }
442                     catch(System.Exception ex)
443                     {
444                         string s = "cannot create thread for `" + _prefix + "':\n" + ex;
445                         _instance.initializationData().logger.error(s);
446                     }
447                 }
448             }
449         }
450 
executeNonBlocking(ThreadPoolWorkItem workItem)451         public void executeNonBlocking(ThreadPoolWorkItem workItem)
452         {
453             lock(this)
454             {
455                 Debug.Assert(!_destroyed);
456                 _instance.asyncIOThread().queue(workItem);
457             }
458         }
459 
joinWithAllThreads()460         public void joinWithAllThreads()
461         {
462             //
463             // _threads is immutable after destroy() has been called,
464             // therefore no synchronization is needed. (Synchronization
465             // wouldn't be possible here anyway, because otherwise the
466             // other threads would never terminate.)
467             //
468             Debug.Assert(_destroyed);
469             foreach(WorkerThread thread in _threads)
470             {
471                 thread.join();
472             }
473         }
474 
prefix()475         public string prefix()
476         {
477             return _prefix;
478         }
479 
serialize()480         public bool serialize()
481         {
482             return _serialize;
483         }
484 
QueueTask(System.Threading.Tasks.Task task)485         protected sealed override void QueueTask(System.Threading.Tasks.Task task)
486         {
487             dispatch(() => { TryExecuteTask(task); }, null, _dispatcher != null);
488         }
489 
TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)490         protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)
491         {
492             if(!taskWasPreviouslyQueued)
493             {
494                 dispatchFromThisThread(() => { TryExecuteTask(task); }, null);
495                 return true;
496             }
497             return false;
498         }
499 
TryDequeue(System.Threading.Tasks.Task task)500         protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)
501         {
502             return false;
503         }
504 
GetScheduledTasks()505         protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()
506         {
507             return new System.Threading.Tasks.Task[0];
508         }
509 
run(WorkerThread thread)510         private void run(WorkerThread thread)
511         {
512             ThreadPoolWorkItem workItem = null;
513             while(true)
514             {
515                 lock(this)
516                 {
517                     if(workItem != null)
518                     {
519                         Debug.Assert(_inUse > 0);
520                         --_inUse;
521                         if(_workItems.Count == 0)
522                         {
523                             thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
524                         }
525                     }
526 
527                     workItem = null;
528 
529                     while(_workItems.Count == 0)
530                     {
531                         if(_destroyed)
532                         {
533                             return;
534                         }
535 
536                         if(_threadIdleTime > 0)
537                         {
538                             if(!Monitor.Wait(this, _threadIdleTime * 1000) && _workItems.Count == 0) // If timeout
539                             {
540                                 if(_destroyed)
541                                 {
542                                     return;
543                                 }
544                                 else if(_serverIdleTime == 0 || _threads.Count > 1)
545                                 {
546                                     //
547                                     // If not the last thread or if server idle time isn't configured,
548                                     // we can exit. Unlike C++/Java, there's no need to have a thread
549                                     // always spawned in the thread pool because all the IO is done
550                                     // by the .NET thread pool threads. Instead, we'll just spawn a
551                                     // new thread when needed (i.e.: when a new work item is queued).
552                                     //
553                                     if(_instance.traceLevels().threadPool >= 1)
554                                     {
555                                         string s = "shrinking " + _prefix + ": Size=" + (_threads.Count - 1);
556                                         _instance.initializationData().logger.trace(
557                                             _instance.traceLevels().threadPoolCat, s);
558                                     }
559 
560                                     _threads.Remove(thread);
561                                     _instance.asyncIOThread().queue(() =>
562                                         {
563                                             thread.join();
564                                         });
565                                     return;
566                                 }
567                                 else
568                                 {
569                                     Debug.Assert(_serverIdleTime > 0 && _inUse == 0 && _threads.Count == 1);
570                                     if(!Monitor.Wait(this, _serverIdleTime * 1000)  &&
571                                        _workItems.Count == 0)
572                                     {
573                                         if(!_destroyed)
574                                         {
575                                             _workItems.Enqueue(() =>
576                                                {
577                                                    try
578                                                    {
579                                                        _instance.objectAdapterFactory().shutdown();
580                                                    }
581                                                    catch(Ice.CommunicatorDestroyedException)
582                                                    {
583                                                    }
584                                                });
585                                         }
586                                     }
587                                 }
588                             }
589                         }
590                         else
591                         {
592                             Monitor.Wait(this);
593                         }
594                     }
595 
596                     Debug.Assert(_workItems.Count > 0);
597                     workItem = _workItems.Dequeue();
598 
599                     Debug.Assert(_inUse >= 0);
600                     ++_inUse;
601 
602                     thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser);
603 
604                     if(_sizeMax > 1 && _inUse == _sizeWarn)
605                     {
606                         string s = "thread pool `" + _prefix + "' is running low on threads\n"
607                             + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
608                         _instance.initializationData().logger.warning(s);
609                     }
610                 }
611 
612                 try
613                 {
614                     workItem();
615                 }
616                 catch(System.Exception ex)
617                 {
618                     string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n';
619                     _instance.initializationData().logger.error(s);
620                 }
621             }
622         }
623 
startMessage(ref ThreadPoolCurrent current)624         public bool startMessage(ref ThreadPoolCurrent current)
625         {
626             Debug.Assert((current._handler._pending & current.operation) != 0);
627 
628             if((current._handler._started & current.operation) != 0)
629             {
630                 Debug.Assert((current._handler._ready & current.operation) == 0);
631                 current._handler._ready |= current.operation;
632                 current._handler._started &= ~current.operation;
633                 if(!current._handler.finishAsync(current.operation)) // Returns false if the handler is finished.
634                 {
635                     current._handler._pending &= ~current.operation;
636                     if(current._handler._pending == 0 && current._handler._finish)
637                     {
638                         finish(current._handler);
639                     }
640                     return false;
641                 }
642             }
643             else if((current._handler._ready & current.operation) == 0 &&
644                     (current._handler._registered & current.operation) != 0)
645             {
646                 Debug.Assert((current._handler._started & current.operation) == 0);
647                 bool completed = false;
648                 if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
649                 {
650                     current._handler._pending &= ~current.operation;
651                     if(current._handler._pending == 0 && current._handler._finish)
652                     {
653                         finish(current._handler);
654                     }
655                     return false;
656                 }
657                 else
658                 {
659                     current.completedSynchronously = completed;
660                     current._handler._started |= current.operation;
661                     return false;
662                 }
663             }
664 
665             if((current._handler._registered & current.operation) != 0)
666             {
667                 Debug.Assert((current._handler._ready & current.operation) != 0);
668                 current._handler._ready &= ~current.operation;
669                 return true;
670             }
671             else
672             {
673                 current._handler._pending &= ~current.operation;
674                 if(current._handler._pending == 0 && current._handler._finish)
675                 {
676                     finish(current._handler);
677                 }
678                 return false;
679             }
680         }
681 
finishMessage(ref ThreadPoolCurrent current, bool fromIOThread)682         public void finishMessage(ref ThreadPoolCurrent current, bool fromIOThread)
683         {
684             if((current._handler._registered & current.operation) != 0)
685             {
686                 if(fromIOThread)
687                 {
688                     Debug.Assert((current._handler._ready & current.operation) == 0);
689                     bool completed = false;
690                     if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
691                     {
692                         current._handler._pending &= ~current.operation;
693                     }
694                     else
695                     {
696                         Debug.Assert((current._handler._pending & current.operation) != 0);
697                         current.completedSynchronously = completed;
698                         current._handler._started |= current.operation;
699                     }
700                 }
701                 else
702                 {
703                     ThreadPoolCurrent c = current;
704                     executeNonBlocking(() =>
705                        {
706                            messageCallback(c);
707                        });
708                 }
709             }
710             else
711             {
712                 current._handler._pending &= ~current.operation;
713             }
714 
715             if(current._handler._pending == 0 && current._handler._finish)
716             {
717                 // There are no more pending async operations, it's time to call finish.
718                 finish(current._handler);
719             }
720         }
721 
asyncReadCallback(object state)722         public void asyncReadCallback(object state)
723         {
724             messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read));
725         }
726 
asyncWriteCallback(object state)727         public void asyncWriteCallback(object state)
728         {
729             messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write));
730         }
731 
messageCallback(ThreadPoolCurrent current)732         public void messageCallback(ThreadPoolCurrent current)
733         {
734             try
735             {
736                 do
737                 {
738                     current.completedSynchronously = false;
739                     current._handler.message(ref current);
740                 }
741                 while(current.completedSynchronously);
742             }
743             catch(System.Exception ex)
744             {
745                 string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString();
746                 _instance.initializationData().logger.error(s);
747             }
748         }
749 
getCallback(int operation)750         private AsyncCallback getCallback(int operation)
751         {
752             switch(operation)
753             {
754             case SocketOperation.Read:
755                 return asyncReadCallback;
756             case SocketOperation.Write:
757                 return asyncWriteCallback;
758             default:
759                 Debug.Assert(false);
760                 return null;
761             }
762         }
763 
764         private Instance _instance;
765         private System.Action<System.Action, Ice.Connection> _dispatcher;
766         private bool _destroyed;
767         private readonly string _prefix;
768         private readonly string _threadPrefix;
769 
770         private sealed class WorkerThread
771         {
772             private ThreadPool _threadPool;
773             private Ice.Instrumentation.ThreadObserver _observer;
774             private Ice.Instrumentation.ThreadState _state;
775 
WorkerThread(ThreadPool threadPool, string name)776             internal WorkerThread(ThreadPool threadPool, string name) : base()
777             {
778                 _threadPool = threadPool;
779                 _name = name;
780                 _state = Ice.Instrumentation.ThreadState.ThreadStateIdle;
781                 updateObserver();
782             }
783 
updateObserver()784             public void updateObserver()
785             {
786                 // Must be called with the thread pool mutex locked
787                 Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.initializationData().observer;
788                 if(obsv != null)
789                 {
790                     _observer = obsv.getThreadObserver(_threadPool._prefix, _name, _state, _observer);
791                     if(_observer != null)
792                     {
793                         _observer.attach();
794                     }
795                 }
796             }
797 
setState(Ice.Instrumentation.ThreadState s)798             public void setState(Ice.Instrumentation.ThreadState s)
799             {
800                 // Must be called with the thread pool mutex locked
801                 if(_observer != null)
802                 {
803                     if(_state != s)
804                     {
805                         _observer.stateChanged(_state, s);
806                     }
807                 }
808                 _state = s;
809             }
810 
getThread()811             public Thread getThread()
812             {
813                 return _thread;
814             }
815 
join()816             public void join()
817             {
818                 _thread.Join();
819             }
820 
start(ThreadPriority priority)821             public void start(ThreadPriority priority)
822             {
823                 if(_threadPool._stackSize == 0)
824                 {
825                     _thread = new Thread(new ThreadStart(Run));
826                 }
827                 else
828                 {
829                     _thread = new Thread(new ThreadStart(Run), _threadPool._stackSize);
830                 }
831                 _thread.IsBackground = true;
832                 _thread.Name = _name;
833                 _thread.Priority = priority;
834                 _thread.Start();
835             }
836 
Run()837             public void Run()
838             {
839                 //
840                 // Set the default synchronization context to allow async/await to run
841                 // continuations on the thread pool.
842                 //
843                 SynchronizationContext.SetSynchronizationContext(new ThreadPoolSynchronizationContext(_threadPool));
844 
845                 if(_threadPool._instance.initializationData().threadStart != null)
846                 {
847                     try
848                     {
849                         _threadPool._instance.initializationData().threadStart();
850                     }
851                     catch(System.Exception ex)
852                     {
853                         string s = "thread hook start() method raised an unexpected exception in `";
854                         s += _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex;
855                         _threadPool._instance.initializationData().logger.error(s);
856                     }
857                 }
858 
859                 try
860                 {
861                     _threadPool.run(this);
862                 }
863                 catch(System.Exception ex)
864                 {
865                     string s = "exception in `" + _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex;
866                     _threadPool._instance.initializationData().logger.error(s);
867                 }
868 
869                 if(_observer != null)
870                 {
871                     _observer.detach();
872                 }
873 
874                 if(_threadPool._instance.initializationData().threadStop != null)
875                 {
876                     try
877                     {
878                         _threadPool._instance.initializationData().threadStop();
879                     }
880                     catch(System.Exception ex)
881                     {
882                         string s = "thread hook stop() method raised an unexpected exception in `";
883                         s += _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex;
884                         _threadPool._instance.initializationData().logger.error(s);
885                     }
886                 }
887             }
888 
889             private readonly string _name;
890             private Thread _thread;
891         }
892 
893         private readonly int _size; // Number of threads that are pre-created.
894         private readonly int _sizeMax; // Maximum number of threads.
895         private readonly int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
896         private readonly bool _serialize; // True if requests need to be serialized over the connection.
897         private readonly ThreadPriority _priority;
898         private readonly int _serverIdleTime;
899         private readonly int _threadIdleTime;
900         private readonly int _stackSize;
901 
902         private List<WorkerThread> _threads; // All threads, running or not.
903         private int _threadIndex; // For assigning thread names.
904         private int _inUse; // Number of threads that are currently in use.
905 
906         private Queue<ThreadPoolWorkItem> _workItems;
907     }
908 }
909