1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <Ice/ThreadPool.h>
6 #include <Ice/EventHandler.h>
7 #include <Ice/Network.h>
8 #include <Ice/LocalException.h>
9 #include <Ice/Instance.h>
10 #include <Ice/LoggerUtil.h>
11 #include <Ice/Protocol.h>
12 #include <Ice/ObjectAdapterFactory.h>
13 #include <Ice/Properties.h>
14 #include <Ice/TraceLevels.h>
15 
16 #if defined(ICE_OS_UWP)
17 #   include <Ice/StringConverter.h>
18 #endif
19 
20 using namespace std;
21 using namespace Ice;
22 using namespace Ice::Instrumentation;
23 using namespace IceInternal;
24 
upCast(ThreadPool * p)25 ICE_API IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; }
26 
27 namespace
28 {
29 
30 class ShutdownWorkItem : public ThreadPoolWorkItem
31 {
32 public:
33 
ShutdownWorkItem(const InstancePtr & instance)34     ShutdownWorkItem(const InstancePtr& instance) : _instance(instance)
35     {
36     }
37 
38     virtual void
execute(ThreadPoolCurrent & current)39     execute(ThreadPoolCurrent& current)
40     {
41         current.ioCompleted();
42         try
43         {
44             _instance->objectAdapterFactory()->shutdown();
45         }
46         catch(const CommunicatorDestroyedException&)
47         {
48         }
49     }
50 
51 private:
52 
53     const InstancePtr _instance;
54 };
55 
56 class FinishedWorkItem : public ThreadPoolWorkItem
57 {
58 public:
59 
FinishedWorkItem(const EventHandlerPtr & handler,bool close)60     FinishedWorkItem(const EventHandlerPtr& handler, bool close) : _handler(handler), _close(close)
61     {
62     }
63 
64     virtual void
execute(ThreadPoolCurrent & current)65     execute(ThreadPoolCurrent& current)
66     {
67         _handler->finished(current, _close);
68 
69         //
70         // Break cyclic reference count.
71         //
72         if(_handler->getNativeInfo())
73         {
74             _handler->getNativeInfo()->setReadyCallback(0);
75         }
76     }
77 
78 private:
79 
80     const EventHandlerPtr _handler;
81     const bool _close;
82 };
83 
84 class JoinThreadWorkItem : public ThreadPoolWorkItem
85 {
86 public:
87 
JoinThreadWorkItem(const IceUtil::ThreadPtr & thread)88     JoinThreadWorkItem(const IceUtil::ThreadPtr& thread) : _thread(thread)
89     {
90     }
91 
92     virtual void
execute(ThreadPoolCurrent &)93     execute(ThreadPoolCurrent&)
94     {
95         // No call to ioCompleted, this shouldn't block (and we don't want to cause
96         // a new thread to be started).
97         _thread->getThreadControl().join();
98     }
99 
100 private:
101 
102     IceUtil::ThreadPtr _thread;
103 };
104 
105 //
106 // Exception raised by the thread pool work queue when the thread pool
107 // is destroyed.
108 //
109 class ThreadPoolDestroyedException
110 {
111 };
112 
113 }
114 
~DispatcherCall()115 Ice::DispatcherCall::~DispatcherCall()
116 {
117     // Out of line to avoid weak vtable
118 }
119 
~Dispatcher()120 Ice::Dispatcher::~Dispatcher()
121 {
122     // Out of line to avoid weak vtable
123 }
124 
DispatchWorkItem()125 IceInternal::DispatchWorkItem::DispatchWorkItem()
126 {
127 }
128 
DispatchWorkItem(const Ice::ConnectionPtr & connection)129 IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection)
130 {
131 }
132 
133 void
execute(ThreadPoolCurrent & current)134 IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current)
135 {
136     current.ioCompleted(); // Promote follower
137     current.dispatchFromThisThread(this);
138 }
139 
ThreadPoolWorkQueue(ThreadPool & threadPool)140 IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool& threadPool) :
141     _threadPool(threadPool),
142     _destroyed(false)
143 {
144     _registered = SocketOperationRead;
145 }
146 
147 void
destroy()148 IceInternal::ThreadPoolWorkQueue::destroy()
149 {
150     //Lock sync(*this); Called with the thread pool locked
151     assert(!_destroyed);
152     _destroyed = true;
153 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
154     _threadPool._selector.completed(this, SocketOperationRead);
155 #else
156     _threadPool._selector.ready(this, SocketOperationRead, true);
157 #endif
158 }
159 
160 void
queue(const ThreadPoolWorkItemPtr & item)161 IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
162 {
163     //Lock sync(*this); Called with the thread pool locked
164     _workItems.push_back(item);
165 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
166     _threadPool._selector.completed(this, SocketOperationRead);
167 #else
168     if(_workItems.size() == 1)
169     {
170         _threadPool._selector.ready(this, SocketOperationRead, true);
171     }
172 #endif
173 }
174 
175 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
176 bool
startAsync(SocketOperation)177 IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation)
178 {
179     assert(false);
180     return false;
181 }
182 
183 bool
finishAsync(SocketOperation)184 IceInternal::ThreadPoolWorkQueue::finishAsync(SocketOperation)
185 {
186     assert(false);
187     return false;
188 }
189 #endif
190 
191 void
message(ThreadPoolCurrent & current)192 IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
193 {
194     ThreadPoolWorkItemPtr workItem;
195     {
196         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_threadPool);
197         if(!_workItems.empty())
198         {
199             workItem = _workItems.front();
200             _workItems.pop_front();
201         }
202 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
203         else
204         {
205             assert(_destroyed);
206             _threadPool._selector.completed(this, SocketOperationRead);
207         }
208 #else
209         if(_workItems.empty() && !_destroyed)
210         {
211             _threadPool._selector.ready(this, SocketOperationRead, false);
212         }
213 #endif
214     }
215 
216     if(workItem)
217     {
218         workItem->execute(current);
219     }
220     else
221     {
222         assert(_destroyed);
223         current.ioCompleted();
224         throw ThreadPoolDestroyedException();
225     }
226 }
227 
228 void
finished(ThreadPoolCurrent &,bool)229 IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&, bool)
230 {
231     assert(false);
232 }
233 
234 string
toString() const235 IceInternal::ThreadPoolWorkQueue::toString() const
236 {
237     return "work queue";
238 }
239 
240 NativeInfoPtr
getNativeInfo()241 IceInternal::ThreadPoolWorkQueue::getNativeInfo()
242 {
243     return 0;
244 }
245 
ThreadPool(const InstancePtr & instance,const string & prefix,int timeout)246 IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :
247     _instance(instance),
248     _dispatcher(_instance->initializationData().dispatcher),
249     _destroyed(false),
250     _prefix(prefix),
251     _selector(instance),
252     _nextThreadId(0),
253     _size(0),
254     _sizeIO(0),
255     _sizeMax(0),
256     _sizeWarn(0),
257     _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0),
258     _hasPriority(false),
259     _priority(0),
260     _serverIdleTime(timeout),
261     _threadIdleTime(0),
262     _stackSize(0),
263     _inUse(0),
264 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
265     _inUseIO(0),
266     _nextHandler(_handlers.end()),
267 #endif
268     _promote(true)
269 {
270     PropertiesPtr properties = _instance->initializationData().properties;
271 #ifndef ICE_OS_UWP
272 #   ifdef _WIN32
273     SYSTEM_INFO sysInfo;
274     GetSystemInfo(&sysInfo);
275     int nProcessors = sysInfo.dwNumberOfProcessors;
276 #   else
277     int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
278 #   endif
279 #endif
280 
281     //
282     // We use just one thread as the default. This is the fastest
283     // possible setting, still allows one level of nesting, and
284     // doesn't require to make the servants thread safe.
285     //
286     int size = properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1);
287     if(size < 1)
288     {
289         Warning out(_instance->initializationData().logger);
290         out << _prefix << ".Size < 1; Size adjusted to 1";
291         size = 1;
292     }
293 
294     int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
295 #ifndef ICE_OS_UWP
296     if(sizeMax == -1)
297     {
298         sizeMax = nProcessors;
299     }
300 #endif
301     if(sizeMax < size)
302     {
303         Warning out(_instance->initializationData().logger);
304         out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")";
305         sizeMax = size;
306     }
307 
308     int sizeWarn = properties->getPropertyAsInt(_prefix + ".SizeWarn");
309     if(sizeWarn != 0 && sizeWarn < size)
310     {
311         Warning out(_instance->initializationData().logger);
312         out << _prefix << ".SizeWarn < " << _prefix << ".Size; adjusted SizeWarn to Size (" << size << ")";
313         sizeWarn = size;
314     }
315     else if(sizeWarn > sizeMax)
316     {
317         Warning out(_instance->initializationData().logger);
318         out << _prefix << ".SizeWarn > " << _prefix << ".SizeMax; adjusted SizeWarn to SizeMax (" << sizeMax << ")";
319         sizeWarn = sizeMax;
320     }
321 
322     int threadIdleTime = properties->getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
323     if(threadIdleTime < 0)
324     {
325         Warning out(_instance->initializationData().logger);
326         out << _prefix << ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
327         threadIdleTime = 0;
328     }
329 
330     const_cast<int&>(_size) = size;
331     const_cast<int&>(_sizeMax) = sizeMax;
332     const_cast<int&>(_sizeWarn) = sizeWarn;
333 #ifndef ICE_OS_UWP
334     const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors);
335 #else
336     const_cast<int&>(_sizeIO) = sizeMax;
337 #endif
338     const_cast<int&>(_threadIdleTime) = threadIdleTime;
339 
340 #ifdef ICE_USE_IOCP
341     _selector.setup(_sizeIO);
342 #endif
343 
344 #if defined(__APPLE__)
345     //
346     // We use a default stack size of 1MB on macOS and the new C++11 mapping to allow transmitting
347     // class graphs with a depth of 100 (maximum default), 512KB is not enough otherwise.
348     //
349     int defaultStackSize = 1024 * 1024; // 1MB
350 #else
351     int defaultStackSize = 0;
352 #endif
353     int stackSize = properties->getPropertyAsIntWithDefault(_prefix + ".StackSize", defaultStackSize);
354     if(stackSize < 0)
355     {
356         Warning out(_instance->initializationData().logger);
357         out << _prefix << ".StackSize < 0; Size adjusted to OS default";
358         stackSize = 0;
359     }
360     const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize);
361 
362     const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != "";
363     const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority");
364     if(!_hasPriority)
365     {
366         const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != "";
367         const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
368     }
369 
370     _workQueue = ICE_MAKE_SHARED(ThreadPoolWorkQueue, *this);
371     _selector.initialize(_workQueue.get());
372 
373     if(_instance->traceLevels()->threadPool >= 1)
374     {
375         Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
376         out << "creating " << _prefix << ": Size = " << _size << ", SizeMax = " << _sizeMax << ", SizeWarn = "
377             << _sizeWarn;
378     }
379 
380     __setNoDelete(true);
381     try
382     {
383         for(int i = 0 ; i < _size ; ++i)
384         {
385             EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId());
386             if(_hasPriority)
387             {
388                 thread->start(_stackSize, _priority);
389             }
390             else
391             {
392                 thread->start(_stackSize);
393             }
394             _threads.insert(thread);
395         }
396     }
397     catch(const IceUtil::Exception& ex)
398     {
399         {
400             Error out(_instance->initializationData().logger);
401             out << "cannot create thread for `" << _prefix << "':\n" << ex;
402         }
403 
404         destroy();
405         joinWithAllThreads();
406         __setNoDelete(false);
407         throw;
408     }
409     catch(...)
410     {
411         __setNoDelete(false);
412         throw;
413     }
414     __setNoDelete(false);
415 }
416 
~ThreadPool()417 IceInternal::ThreadPool::~ThreadPool()
418 {
419     assert(_destroyed);
420 }
421 
422 void
destroy()423 IceInternal::ThreadPool::destroy()
424 {
425     Lock sync(*this);
426     if(_destroyed)
427     {
428         return;
429     }
430     _destroyed = true;
431     _workQueue->destroy();
432 }
433 
434 void
updateObservers()435 IceInternal::ThreadPool::updateObservers()
436 {
437     Lock sync(*this);
438     for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
439     {
440         (*p)->updateObserver();
441     }
442 }
443 
444 void
initialize(const EventHandlerPtr & handler)445 IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler)
446 {
447     Lock sync(*this);
448     assert(!_destroyed);
449     _selector.initialize(handler.get());
450 
451     class ReadyCallbackI : public ReadyCallback
452     {
453     public:
454 
455         ReadyCallbackI(const ThreadPoolPtr& threadPool, const EventHandlerPtr& handler) :
456             _threadPool(threadPool), _handler(handler)
457         {
458         }
459 
460         virtual void
461         ready(SocketOperation op, bool value)
462         {
463             _threadPool->ready(_handler, op, value);
464         }
465 
466     private:
467 
468         const ThreadPoolPtr _threadPool;
469         const EventHandlerPtr _handler;
470     };
471     handler->getNativeInfo()->setReadyCallback(new ReadyCallbackI(this, handler));
472 }
473 
474 void
update(const EventHandlerPtr & handler,SocketOperation remove,SocketOperation add)475 IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation remove, SocketOperation add)
476 {
477     Lock sync(*this);
478     assert(!_destroyed);
479 
480     // Don't remove what needs to be added
481     remove = static_cast<SocketOperation>(remove & ~add);
482 
483     // Don't remove/add if already un-registered or registered
484     remove  = static_cast<SocketOperation>(handler->_registered & remove);
485     add  = static_cast<SocketOperation>(~handler->_registered & add);
486     if(remove == add)
487     {
488         return;
489     }
490 
491     _selector.update(handler.get(), remove, add);
492 }
493 
494 bool
finish(const EventHandlerPtr & handler,bool closeNow)495 IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
496 {
497     Lock sync(*this);
498     assert(!_destroyed);
499 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
500     closeNow = _selector.finish(handler.get(), closeNow); // This must be called before!
501     _workQueue->queue(new FinishedWorkItem(handler, !closeNow));
502     return closeNow;
503 #else
504     UNREFERENCED_PARAMETER(closeNow);
505 
506     // If there are no pending asynchronous operations, we can call finish on the handler now.
507     if(!handler->_pending)
508     {
509         _workQueue->queue(new FinishedWorkItem(handler, false));
510         _selector.finish(handler.get());
511     }
512     else
513     {
514         handler->_finish = true;
515     }
516     return true; // Always close now to interrupt the pending call.
517 #endif
518 }
519 
520 void
ready(const EventHandlerPtr & handler,SocketOperation op,bool value)521 IceInternal::ThreadPool::ready(const EventHandlerPtr& handler, SocketOperation op, bool value)
522 {
523     Lock sync(*this);
524     if(_destroyed)
525     {
526         return;
527     }
528     _selector.ready(handler.get(), op, value);
529 }
530 
531 void
dispatchFromThisThread(const DispatchWorkItemPtr & workItem)532 IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workItem)
533 {
534     if(_dispatcher)
535     {
536         try
537         {
538 #ifdef ICE_CPP11_MAPPING
539             _dispatcher([workItem]()
540             {
541                 workItem->run();
542             },
543             workItem->getConnection());
544 #else
545             _dispatcher->dispatch(workItem, workItem->getConnection());
546 #endif
547         }
548         catch(const std::exception& ex)
549         {
550             if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
551             {
552                 Warning out(_instance->initializationData().logger);
553                 out << "dispatch exception:\n" << ex;
554             }
555         }
556         catch(...)
557         {
558             if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
559             {
560                 Warning out(_instance->initializationData().logger);
561                 out << "dispatch exception:\nunknown c++ exception";
562             }
563         }
564     }
565     else
566     {
567         workItem->run();
568     }
569 }
570 
571 void
dispatch(const DispatchWorkItemPtr & workItem)572 IceInternal::ThreadPool::dispatch(const DispatchWorkItemPtr& workItem)
573 {
574     Lock sync(*this);
575     if(_destroyed)
576     {
577         throw CommunicatorDestroyedException(__FILE__, __LINE__);
578     }
579     _workQueue->queue(workItem);
580 }
581 
582 void
joinWithAllThreads()583 IceInternal::ThreadPool::joinWithAllThreads()
584 {
585     assert(_destroyed);
586 
587     //
588     // _threads is immutable after destroy() has been called,
589     // therefore no synchronization is needed. (Synchronization
590     // wouldn't be possible here anyway, because otherwise the other
591     // threads would never terminate.)
592     //
593     for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
594     {
595         (*p)->getThreadControl().join();
596     }
597     _selector.destroy();
598 }
599 
600 string
prefix() const601 IceInternal::ThreadPool::prefix() const
602 {
603     return _prefix;
604 }
605 
606 void
run(const EventHandlerThreadPtr & thread)607 IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
608 {
609 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
610     ThreadPoolCurrent current(_instance, this, thread);
611     bool select = false;
612     while(true)
613     {
614         if(current._handler)
615         {
616             try
617             {
618                 current._handler->message(current);
619             }
620             catch(const ThreadPoolDestroyedException&)
621             {
622                 return;
623             }
624             catch(const exception& ex)
625             {
626                 Error out(_instance->initializationData().logger);
627                 out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
628                     << current._handler->toString();
629             }
630             catch(...)
631             {
632                 Error out(_instance->initializationData().logger);
633                 out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
634             }
635         }
636         else if(select)
637         {
638             try
639             {
640                 _selector.select(_serverIdleTime);
641             }
642             catch(const SelectorTimeoutException&)
643             {
644                 Lock sync(*this);
645                 if(!_destroyed && _inUse == 0)
646                 {
647                     _workQueue->queue(new ShutdownWorkItem(_instance)); // Select timed-out.
648                 }
649                 continue;
650             }
651         }
652 
653         {
654             Lock sync(*this);
655             if(!current._handler)
656             {
657                 if(select)
658                 {
659                     _selector.finishSelect(_handlers);
660                     _nextHandler = _handlers.begin();
661                     select = false;
662                 }
663                 else if(!current._leader && followerWait(current))
664                 {
665                     return; // Wait timed-out.
666                 }
667             }
668             else if(_sizeMax > 1)
669             {
670                 if(!current._ioCompleted)
671                 {
672                     //
673                     // The handler didn't call ioCompleted() so we take care of decreasing
674                     // the IO thread count now.
675                     //
676                     --_inUseIO;
677                 }
678                 else
679                 {
680                     //
681                     // If the handler called ioCompleted(), we re-enable the handler in
682                     // case it was disabled and we decrease the number of thread in use.
683                     //
684                     if(_serialize && current._handler.get() != _workQueue.get())
685                     {
686                         _selector.enable(current._handler.get(), current.operation);
687                     }
688                     assert(_inUse > 0);
689                     --_inUse;
690                 }
691 
692                 if(!current._leader && followerWait(current))
693                 {
694                     return; // Wait timed-out.
695                 }
696             }
697 
698             //
699             // Get the next ready handler.
700             //
701             while(_nextHandler != _handlers.end() &&
702                     !(_nextHandler->second & ~_nextHandler->first->_disabled & _nextHandler->first->_registered))
703             {
704                 ++_nextHandler;
705             }
706             if(_nextHandler != _handlers.end())
707             {
708                 current._ioCompleted = false;
709                 current._handler = ICE_GET_SHARED_FROM_THIS(_nextHandler->first);
710                 current.operation = _nextHandler->second;
711                 ++_nextHandler;
712                 thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForIO));
713             }
714             else
715             {
716                 current._handler = 0;
717             }
718 
719             if(!current._handler)
720             {
721                 //
722                 // If there are no more ready handlers and there are still threads busy performing
723                 // IO, we give up leadership and promote another follower (which will perform the
724                 // select() only once all the IOs are completed). Otherwise, if there are no more
725                 // threads peforming IOs, it's time to do another select().
726                 //
727                 if(_inUseIO > 0)
728                 {
729                     promoteFollower(current);
730                 }
731                 else
732                 {
733                     _handlers.clear();
734                     _selector.startSelect();
735                     select = true;
736                     thread->setState(ICE_ENUM(ThreadState, ThreadStateIdle));
737                 }
738             }
739             else if(_sizeMax > 1)
740             {
741                 //
742                 // Increment the IO thread count and if there are still threads available
743                 // to perform IO and more handlers ready, we promote a follower.
744                 //
745                 ++_inUseIO;
746                 if(_nextHandler != _handlers.end() && _inUseIO < _sizeIO)
747                 {
748                     promoteFollower(current);
749                 }
750             }
751         }
752     }
753 #else
754     ThreadPoolCurrent current(_instance, this, thread);
755     while(true)
756     {
757         try
758         {
759             current._ioCompleted = false;
760 #ifdef ICE_OS_UWP
761             current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, _threadIdleTime));
762 #else
763             current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, current._count,
764                                                                                  current._error, _threadIdleTime));
765 #endif
766         }
767         catch(const SelectorTimeoutException&)
768         {
769             if(_sizeMax > 1)
770             {
771                 Lock sync(*this);
772 
773                 if(_destroyed)
774                 {
775                     continue;
776                 }
777                 else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit.
778                 {
779 #ifndef ICE_OS_UWP
780                     BOOL hasIO = false;
781                     GetThreadIOPendingFlag(GetCurrentThread(), &hasIO);
782                     if(hasIO)
783                     {
784                         continue;
785                     }
786 #endif
787                     if(_instance->traceLevels()->threadPool >= 1)
788                     {
789                         Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
790                         out << "shrinking " << _prefix << ": Size = " << (_threads.size() - 1);
791                     }
792                     _threads.erase(thread);
793                     _workQueue->queue(new JoinThreadWorkItem(thread));
794                     return;
795                 }
796                 else if(_inUse > 0)
797                 {
798                     //
799                     // If this is the last idle thread but there are still other threads
800                     // busy dispatching, we go back waiting with _threadIdleTime. We only
801                     // wait with _serverIdleTime when there's only one thread left.
802                     //
803                     continue;
804                 }
805                 assert(_threads.size() == 1);
806             }
807 
808             try
809             {
810 #ifdef ICE_OS_UWP
811                 current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, _serverIdleTime));
812 #else
813 
814                 current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, current._count,
815                                                             current._error, _serverIdleTime));
816 #endif
817             }
818             catch(const SelectorTimeoutException&)
819             {
820                 Lock sync(*this);
821                 if(!_destroyed)
822                 {
823                     _workQueue->queue(new ShutdownWorkItem(_instance));
824                 }
825                 continue;
826             }
827         }
828 
829         {
830             IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
831             thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForIO));
832         }
833 
834         try
835         {
836             assert(current._handler);
837             current._handler->message(current);
838         }
839         catch(const ThreadPoolDestroyedException&)
840         {
841             return;
842         }
843         catch(const exception& ex)
844         {
845             Error out(_instance->initializationData().logger);
846             out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString();
847         }
848 #ifdef ICE_OS_UWP
849         catch(Platform::Exception^ ex)
850         {
851             //
852             // We don't need to pass the wide string converter in the call to wstringToString
853             // because the wide string is using the platform default encoding.
854             //
855             Error out(_instance->initializationData().logger);
856             out << "exception in `" << _prefix << "':\n"
857                 << wstringToString(ex->Message->Data(), _instance->getStringConverter())
858                 << "\nevent handler: " << current._handler->toString();
859         }
860 #endif
861         catch(...)
862         {
863             Error out(_instance->initializationData().logger);
864             out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
865         }
866 
867         {
868             Lock sync(*this);
869             if(_sizeMax > 1 && current._ioCompleted)
870             {
871                 assert(_inUse > 0);
872                 --_inUse;
873             }
874             thread->setState(ICE_ENUM(ThreadState, ThreadStateIdle));
875         }
876     }
877 #endif
878 }
879 
880 bool
ioCompleted(ThreadPoolCurrent & current)881 IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
882 {
883     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
884 
885     current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called.
886 
887     current._thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForUser));
888 
889     if(_sizeMax > 1)
890     {
891 
892 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
893         --_inUseIO;
894 
895         if(!_destroyed)
896         {
897             if(_serialize && current._handler.get() != _workQueue.get())
898             {
899                 _selector.disable(current._handler.get(), current.operation);
900             }
901         }
902 
903         if(current._leader)
904         {
905             //
906             // If this thread is still the leader, it's time to promote a new leader.
907             //
908             promoteFollower(current);
909         }
910         else if(_promote && (_nextHandler != _handlers.end() || _inUseIO == 0))
911         {
912             notify();
913         }
914 #endif
915 
916         assert(_inUse >= 0);
917         ++_inUse;
918 
919         if(_inUse == _sizeWarn)
920         {
921             Warning out(_instance->initializationData().logger);
922             out << "thread pool `" << _prefix << "' is running low on threads\n"
923                 << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
924         }
925 
926         if(!_destroyed)
927         {
928             assert(_inUse <= static_cast<int>(_threads.size()));
929             if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size()))
930             {
931                 if(_instance->traceLevels()->threadPool >= 1)
932                 {
933                     Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
934                     out << "growing " << _prefix << ": Size=" << _threads.size() + 1;
935                 }
936 
937                 try
938                 {
939                     EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId());
940                     if(_hasPriority)
941                     {
942                         thread->start(_stackSize, _priority);
943                     }
944                     else
945                     {
946                         thread->start(_stackSize);
947                     }
948                     _threads.insert(thread);
949                 }
950                 catch(const IceUtil::Exception& ex)
951                 {
952                     Error out(_instance->initializationData().logger);
953                     out << "cannot create thread for `" << _prefix << "':\n" << ex;
954                 }
955             }
956         }
957     }
958 
959     return _serialize && current._handler.get() != _workQueue.get();
960 }
961 
962 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
963 bool
startMessage(ThreadPoolCurrent & current)964 IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
965 {
966     assert(current._handler->_pending & current.operation);
967 
968     if(current._handler->_started & current.operation)
969     {
970         assert(!(current._handler->_completed & current.operation));
971         current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed | current.operation);
972         current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation);
973 
974 #ifndef ICE_OS_UWP
975         AsyncInfo* info = current._handler->getNativeInfo()->getAsyncInfo(current.operation);
976         info->count = current._count;
977         info->error = current._error;
978 #endif
979 
980         if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished.
981         {
982             current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
983             if(!current._handler->_pending && current._handler->_finish)
984             {
985                 Lock sync(*this);
986                 _workQueue->queue(new FinishedWorkItem(current._handler, false));
987                 _selector.finish(current._handler.get());
988             }
989             return false;
990         }
991     }
992     else if(!(current._handler->_completed & current.operation) && (current._handler->_registered & current.operation))
993     {
994         assert(!(current._handler->_started & current.operation));
995         if(current._handler->_ready & current.operation)
996         {
997             return true;
998         }
999         else if(!current._handler->startAsync(current.operation))
1000         {
1001             current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1002             if(!current._handler->_pending && current._handler->_finish)
1003             {
1004                 Lock sync(*this);
1005                 _workQueue->queue(new FinishedWorkItem(current._handler, false));
1006                 _selector.finish(current._handler.get());
1007             }
1008             return false;
1009         }
1010         else
1011         {
1012             current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
1013             return false;
1014         }
1015     }
1016 
1017     if(current._handler->_registered & current.operation)
1018     {
1019         assert(current._handler->_completed & current.operation);
1020         current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed & ~current.operation);
1021         return true;
1022     }
1023     else
1024     {
1025         current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1026         if(!current._handler->_pending && current._handler->_finish)
1027         {
1028             Lock sync(*this);
1029             _workQueue->queue(new FinishedWorkItem(current._handler, false));
1030             _selector.finish(current._handler.get());
1031         }
1032         return false;
1033     }
1034 }
1035 
1036 void
finishMessage(ThreadPoolCurrent & current)1037 IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
1038 {
1039     if(current._handler->_registered & current.operation && !current._handler->_finish)
1040     {
1041         assert(!(current._handler->_completed & current.operation));
1042         if(current._handler->_ready & current.operation)
1043         {
1044             _selector.completed(current._handler.get(), current.operation);
1045         }
1046         else if(!current._handler->startAsync(current.operation))
1047         {
1048             current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1049         }
1050         else
1051         {
1052             assert(current._handler->_pending & current.operation);
1053             current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
1054         }
1055     }
1056     else
1057     {
1058         current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1059     }
1060 
1061     if(!current._handler->_pending && current._handler->_finish)
1062     {
1063         // There are no more pending async operations, it's time to call finish.
1064         Lock sync(*this);
1065         _workQueue->queue(new FinishedWorkItem(current._handler, false));
1066         _selector.finish(current._handler.get());
1067     }
1068 }
1069 #else
1070 void
promoteFollower(ThreadPoolCurrent & current)1071 IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current)
1072 {
1073     assert(!_promote && current._leader);
1074     _promote = true;
1075     if(_inUseIO < _sizeIO && (_nextHandler != _handlers.end() || _inUseIO == 0))
1076     {
1077         notify();
1078     }
1079     current._leader = false;
1080 }
1081 
1082 bool
followerWait(ThreadPoolCurrent & current)1083 IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current)
1084 {
1085     assert(!current._leader);
1086 
1087     current._thread->setState(ICE_ENUM(ThreadState, ThreadStateIdle));
1088 
1089     //
1090     // It's important to clear the handler before waiting to make sure that
1091     // resources for the handler are released now if it's finished. We also
1092     // clear the per-thread stream.
1093     //
1094     current._handler = 0;
1095     current.stream.clear();
1096     current.stream.b.clear();
1097 
1098     //
1099     // Wait to be promoted and for all the IO threads to be done.
1100     //
1101     while(!_promote || _inUseIO == _sizeIO || (_nextHandler == _handlers.end() && _inUseIO > 0))
1102     {
1103         if(_threadIdleTime)
1104         {
1105             if(!timedWait(IceUtil::Time::seconds(_threadIdleTime)))
1106             {
1107                 if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
1108                                    (_nextHandler == _handlers.end() && _inUseIO > 0)))
1109                 {
1110                     if(_instance->traceLevels()->threadPool >= 1)
1111                     {
1112                         Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
1113                         out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1);
1114                     }
1115                     assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
1116                     _threads.erase(current._thread);
1117                     _workQueue->queue(new JoinThreadWorkItem(current._thread));
1118                     return true;
1119                 }
1120             }
1121         }
1122         else
1123         {
1124             wait();
1125         }
1126     }
1127     current._leader = true; // The current thread has become the leader.
1128     _promote = false;
1129     return false;
1130 }
1131 #endif
1132 
1133 string
nextThreadId()1134 IceInternal::ThreadPool::nextThreadId()
1135 {
1136     ostringstream os;
1137     os << _prefix << "-" << _nextThreadId++;
1138     return os.str();
1139 }
1140 
EventHandlerThread(const ThreadPoolPtr & pool,const string & name)1141 IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool, const string& name) :
1142     IceUtil::Thread(name),
1143     _pool(pool),
1144     _state(ICE_ENUM(ThreadState, ThreadStateIdle))
1145 {
1146     updateObserver();
1147 }
1148 
1149 void
updateObserver()1150 IceInternal::ThreadPool::EventHandlerThread::updateObserver()
1151 {
1152     // Must be called with the thread pool mutex locked
1153     const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer;
1154     if(obsv)
1155     {
1156         _observer.attach(obsv->getThreadObserver(_pool->_prefix, name(), _state, _observer.get()));
1157     }
1158 }
1159 
1160 void
setState(Ice::Instrumentation::ThreadState s)1161 IceInternal::ThreadPool::EventHandlerThread::setState(Ice::Instrumentation::ThreadState s)
1162 {
1163     // Must be called with the thread pool mutex locked
1164     if(_observer)
1165     {
1166         if(_state != s)
1167         {
1168             _observer->stateChanged(_state, s);
1169         }
1170     }
1171     _state = s;
1172 }
1173 
1174 void
run()1175 IceInternal::ThreadPool::EventHandlerThread::run()
1176 {
1177 #ifdef ICE_CPP11_MAPPING
1178     if(_pool->_instance->initializationData().threadStart)
1179 #else
1180     if(_pool->_instance->initializationData().threadHook)
1181 #endif
1182     {
1183         try
1184         {
1185 #ifdef ICE_CPP11_MAPPING
1186             _pool->_instance->initializationData().threadStart();
1187 #else
1188             _pool->_instance->initializationData().threadHook->start();
1189 #endif
1190         }
1191         catch(const exception& ex)
1192         {
1193             Error out(_pool->_instance->initializationData().logger);
1194             out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
1195         }
1196         catch(...)
1197         {
1198             Error out(_pool->_instance->initializationData().logger);
1199             out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "'";
1200         }
1201     }
1202 
1203     try
1204     {
1205         _pool->run(this);
1206     }
1207     catch(const exception& ex)
1208     {
1209         Error out(_pool->_instance->initializationData().logger);
1210         out << "exception in `" << _pool->_prefix << "':\n" << ex;
1211     }
1212     catch(...)
1213     {
1214         Error out(_pool->_instance->initializationData().logger);
1215         out << "unknown exception in `" << _pool->_prefix << "'";
1216     }
1217 
1218     _observer.detach();
1219 
1220 #ifdef ICE_CPP11_MAPPING
1221     if(_pool->_instance->initializationData().threadStop)
1222 #else
1223     if(_pool->_instance->initializationData().threadHook)
1224 #endif
1225     {
1226         try
1227         {
1228 #ifdef ICE_CPP11_MAPPING
1229             _pool->_instance->initializationData().threadStop();
1230 #else
1231             _pool->_instance->initializationData().threadHook->stop();
1232 #endif
1233         }
1234         catch(const exception& ex)
1235         {
1236             Error out(_pool->_instance->initializationData().logger);
1237             out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
1238         }
1239         catch(...)
1240         {
1241             Error out(_pool->_instance->initializationData().logger);
1242             out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "'";
1243         }
1244     }
1245 
1246     _pool = 0; // Break cyclic dependency.
1247 }
1248 
ThreadPoolCurrent(const InstancePtr & instance,const ThreadPoolPtr & threadPool,const ThreadPool::EventHandlerThreadPtr & thread)1249 ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance,
1250                                      const ThreadPoolPtr& threadPool,
1251                                      const ThreadPool::EventHandlerThreadPtr& thread) :
1252     operation(SocketOperationNone),
1253     stream(instance.get(), Ice::currentProtocolEncoding),
1254     _threadPool(threadPool.get()),
1255     _thread(thread),
1256     _ioCompleted(false)
1257 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
1258     , _leader(false)
1259 #endif
1260 {
1261 }
1262