1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <Ice/ThreadPool.h>
6 #include <Ice/EventHandler.h>
7 #include <Ice/Network.h>
8 #include <Ice/LocalException.h>
9 #include <Ice/Instance.h>
10 #include <Ice/LoggerUtil.h>
11 #include <Ice/Protocol.h>
12 #include <Ice/ObjectAdapterFactory.h>
13 #include <Ice/Properties.h>
14 #include <Ice/TraceLevels.h>
15
16 #if defined(ICE_OS_UWP)
17 # include <Ice/StringConverter.h>
18 #endif
19
20 using namespace std;
21 using namespace Ice;
22 using namespace Ice::Instrumentation;
23 using namespace IceInternal;
24
upCast(ThreadPool * p)25 ICE_API IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; }
26
27 namespace
28 {
29
30 class ShutdownWorkItem : public ThreadPoolWorkItem
31 {
32 public:
33
ShutdownWorkItem(const InstancePtr & instance)34 ShutdownWorkItem(const InstancePtr& instance) : _instance(instance)
35 {
36 }
37
38 virtual void
execute(ThreadPoolCurrent & current)39 execute(ThreadPoolCurrent& current)
40 {
41 current.ioCompleted();
42 try
43 {
44 _instance->objectAdapterFactory()->shutdown();
45 }
46 catch(const CommunicatorDestroyedException&)
47 {
48 }
49 }
50
51 private:
52
53 const InstancePtr _instance;
54 };
55
56 class FinishedWorkItem : public ThreadPoolWorkItem
57 {
58 public:
59
FinishedWorkItem(const EventHandlerPtr & handler,bool close)60 FinishedWorkItem(const EventHandlerPtr& handler, bool close) : _handler(handler), _close(close)
61 {
62 }
63
64 virtual void
execute(ThreadPoolCurrent & current)65 execute(ThreadPoolCurrent& current)
66 {
67 _handler->finished(current, _close);
68
69 //
70 // Break cyclic reference count.
71 //
72 if(_handler->getNativeInfo())
73 {
74 _handler->getNativeInfo()->setReadyCallback(0);
75 }
76 }
77
78 private:
79
80 const EventHandlerPtr _handler;
81 const bool _close;
82 };
83
84 class JoinThreadWorkItem : public ThreadPoolWorkItem
85 {
86 public:
87
JoinThreadWorkItem(const IceUtil::ThreadPtr & thread)88 JoinThreadWorkItem(const IceUtil::ThreadPtr& thread) : _thread(thread)
89 {
90 }
91
92 virtual void
execute(ThreadPoolCurrent &)93 execute(ThreadPoolCurrent&)
94 {
95 // No call to ioCompleted, this shouldn't block (and we don't want to cause
96 // a new thread to be started).
97 _thread->getThreadControl().join();
98 }
99
100 private:
101
102 IceUtil::ThreadPtr _thread;
103 };
104
105 //
106 // Exception raised by the thread pool work queue when the thread pool
107 // is destroyed.
108 //
109 class ThreadPoolDestroyedException
110 {
111 };
112
113 }
114
~DispatcherCall()115 Ice::DispatcherCall::~DispatcherCall()
116 {
117 // Out of line to avoid weak vtable
118 }
119
~Dispatcher()120 Ice::Dispatcher::~Dispatcher()
121 {
122 // Out of line to avoid weak vtable
123 }
124
DispatchWorkItem()125 IceInternal::DispatchWorkItem::DispatchWorkItem()
126 {
127 }
128
DispatchWorkItem(const Ice::ConnectionPtr & connection)129 IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection)
130 {
131 }
132
133 void
execute(ThreadPoolCurrent & current)134 IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current)
135 {
136 current.ioCompleted(); // Promote follower
137 current.dispatchFromThisThread(this);
138 }
139
ThreadPoolWorkQueue(ThreadPool & threadPool)140 IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool& threadPool) :
141 _threadPool(threadPool),
142 _destroyed(false)
143 {
144 _registered = SocketOperationRead;
145 }
146
147 void
destroy()148 IceInternal::ThreadPoolWorkQueue::destroy()
149 {
150 //Lock sync(*this); Called with the thread pool locked
151 assert(!_destroyed);
152 _destroyed = true;
153 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
154 _threadPool._selector.completed(this, SocketOperationRead);
155 #else
156 _threadPool._selector.ready(this, SocketOperationRead, true);
157 #endif
158 }
159
160 void
queue(const ThreadPoolWorkItemPtr & item)161 IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
162 {
163 //Lock sync(*this); Called with the thread pool locked
164 _workItems.push_back(item);
165 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
166 _threadPool._selector.completed(this, SocketOperationRead);
167 #else
168 if(_workItems.size() == 1)
169 {
170 _threadPool._selector.ready(this, SocketOperationRead, true);
171 }
172 #endif
173 }
174
175 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
176 bool
startAsync(SocketOperation)177 IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation)
178 {
179 assert(false);
180 return false;
181 }
182
183 bool
finishAsync(SocketOperation)184 IceInternal::ThreadPoolWorkQueue::finishAsync(SocketOperation)
185 {
186 assert(false);
187 return false;
188 }
189 #endif
190
191 void
message(ThreadPoolCurrent & current)192 IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
193 {
194 ThreadPoolWorkItemPtr workItem;
195 {
196 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_threadPool);
197 if(!_workItems.empty())
198 {
199 workItem = _workItems.front();
200 _workItems.pop_front();
201 }
202 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
203 else
204 {
205 assert(_destroyed);
206 _threadPool._selector.completed(this, SocketOperationRead);
207 }
208 #else
209 if(_workItems.empty() && !_destroyed)
210 {
211 _threadPool._selector.ready(this, SocketOperationRead, false);
212 }
213 #endif
214 }
215
216 if(workItem)
217 {
218 workItem->execute(current);
219 }
220 else
221 {
222 assert(_destroyed);
223 current.ioCompleted();
224 throw ThreadPoolDestroyedException();
225 }
226 }
227
228 void
finished(ThreadPoolCurrent &,bool)229 IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&, bool)
230 {
231 assert(false);
232 }
233
234 string
toString() const235 IceInternal::ThreadPoolWorkQueue::toString() const
236 {
237 return "work queue";
238 }
239
240 NativeInfoPtr
getNativeInfo()241 IceInternal::ThreadPoolWorkQueue::getNativeInfo()
242 {
243 return 0;
244 }
245
ThreadPool(const InstancePtr & instance,const string & prefix,int timeout)246 IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :
247 _instance(instance),
248 _dispatcher(_instance->initializationData().dispatcher),
249 _destroyed(false),
250 _prefix(prefix),
251 _selector(instance),
252 _nextThreadId(0),
253 _size(0),
254 _sizeIO(0),
255 _sizeMax(0),
256 _sizeWarn(0),
257 _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0),
258 _hasPriority(false),
259 _priority(0),
260 _serverIdleTime(timeout),
261 _threadIdleTime(0),
262 _stackSize(0),
263 _inUse(0),
264 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
265 _inUseIO(0),
266 _nextHandler(_handlers.end()),
267 #endif
268 _promote(true)
269 {
270 PropertiesPtr properties = _instance->initializationData().properties;
271 #ifndef ICE_OS_UWP
272 # ifdef _WIN32
273 SYSTEM_INFO sysInfo;
274 GetSystemInfo(&sysInfo);
275 int nProcessors = sysInfo.dwNumberOfProcessors;
276 # else
277 int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
278 # endif
279 #endif
280
281 //
282 // We use just one thread as the default. This is the fastest
283 // possible setting, still allows one level of nesting, and
284 // doesn't require to make the servants thread safe.
285 //
286 int size = properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1);
287 if(size < 1)
288 {
289 Warning out(_instance->initializationData().logger);
290 out << _prefix << ".Size < 1; Size adjusted to 1";
291 size = 1;
292 }
293
294 int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
295 #ifndef ICE_OS_UWP
296 if(sizeMax == -1)
297 {
298 sizeMax = nProcessors;
299 }
300 #endif
301 if(sizeMax < size)
302 {
303 Warning out(_instance->initializationData().logger);
304 out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")";
305 sizeMax = size;
306 }
307
308 int sizeWarn = properties->getPropertyAsInt(_prefix + ".SizeWarn");
309 if(sizeWarn != 0 && sizeWarn < size)
310 {
311 Warning out(_instance->initializationData().logger);
312 out << _prefix << ".SizeWarn < " << _prefix << ".Size; adjusted SizeWarn to Size (" << size << ")";
313 sizeWarn = size;
314 }
315 else if(sizeWarn > sizeMax)
316 {
317 Warning out(_instance->initializationData().logger);
318 out << _prefix << ".SizeWarn > " << _prefix << ".SizeMax; adjusted SizeWarn to SizeMax (" << sizeMax << ")";
319 sizeWarn = sizeMax;
320 }
321
322 int threadIdleTime = properties->getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
323 if(threadIdleTime < 0)
324 {
325 Warning out(_instance->initializationData().logger);
326 out << _prefix << ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
327 threadIdleTime = 0;
328 }
329
330 const_cast<int&>(_size) = size;
331 const_cast<int&>(_sizeMax) = sizeMax;
332 const_cast<int&>(_sizeWarn) = sizeWarn;
333 #ifndef ICE_OS_UWP
334 const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors);
335 #else
336 const_cast<int&>(_sizeIO) = sizeMax;
337 #endif
338 const_cast<int&>(_threadIdleTime) = threadIdleTime;
339
340 #ifdef ICE_USE_IOCP
341 _selector.setup(_sizeIO);
342 #endif
343
344 #if defined(__APPLE__)
345 //
346 // We use a default stack size of 1MB on macOS and the new C++11 mapping to allow transmitting
347 // class graphs with a depth of 100 (maximum default), 512KB is not enough otherwise.
348 //
349 int defaultStackSize = 1024 * 1024; // 1MB
350 #else
351 int defaultStackSize = 0;
352 #endif
353 int stackSize = properties->getPropertyAsIntWithDefault(_prefix + ".StackSize", defaultStackSize);
354 if(stackSize < 0)
355 {
356 Warning out(_instance->initializationData().logger);
357 out << _prefix << ".StackSize < 0; Size adjusted to OS default";
358 stackSize = 0;
359 }
360 const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize);
361
362 const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != "";
363 const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority");
364 if(!_hasPriority)
365 {
366 const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != "";
367 const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
368 }
369
370 _workQueue = ICE_MAKE_SHARED(ThreadPoolWorkQueue, *this);
371 _selector.initialize(_workQueue.get());
372
373 if(_instance->traceLevels()->threadPool >= 1)
374 {
375 Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
376 out << "creating " << _prefix << ": Size = " << _size << ", SizeMax = " << _sizeMax << ", SizeWarn = "
377 << _sizeWarn;
378 }
379
380 __setNoDelete(true);
381 try
382 {
383 for(int i = 0 ; i < _size ; ++i)
384 {
385 EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId());
386 if(_hasPriority)
387 {
388 thread->start(_stackSize, _priority);
389 }
390 else
391 {
392 thread->start(_stackSize);
393 }
394 _threads.insert(thread);
395 }
396 }
397 catch(const IceUtil::Exception& ex)
398 {
399 {
400 Error out(_instance->initializationData().logger);
401 out << "cannot create thread for `" << _prefix << "':\n" << ex;
402 }
403
404 destroy();
405 joinWithAllThreads();
406 __setNoDelete(false);
407 throw;
408 }
409 catch(...)
410 {
411 __setNoDelete(false);
412 throw;
413 }
414 __setNoDelete(false);
415 }
416
~ThreadPool()417 IceInternal::ThreadPool::~ThreadPool()
418 {
419 assert(_destroyed);
420 }
421
422 void
destroy()423 IceInternal::ThreadPool::destroy()
424 {
425 Lock sync(*this);
426 if(_destroyed)
427 {
428 return;
429 }
430 _destroyed = true;
431 _workQueue->destroy();
432 }
433
434 void
updateObservers()435 IceInternal::ThreadPool::updateObservers()
436 {
437 Lock sync(*this);
438 for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
439 {
440 (*p)->updateObserver();
441 }
442 }
443
444 void
initialize(const EventHandlerPtr & handler)445 IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler)
446 {
447 Lock sync(*this);
448 assert(!_destroyed);
449 _selector.initialize(handler.get());
450
451 class ReadyCallbackI : public ReadyCallback
452 {
453 public:
454
455 ReadyCallbackI(const ThreadPoolPtr& threadPool, const EventHandlerPtr& handler) :
456 _threadPool(threadPool), _handler(handler)
457 {
458 }
459
460 virtual void
461 ready(SocketOperation op, bool value)
462 {
463 _threadPool->ready(_handler, op, value);
464 }
465
466 private:
467
468 const ThreadPoolPtr _threadPool;
469 const EventHandlerPtr _handler;
470 };
471 handler->getNativeInfo()->setReadyCallback(new ReadyCallbackI(this, handler));
472 }
473
474 void
update(const EventHandlerPtr & handler,SocketOperation remove,SocketOperation add)475 IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation remove, SocketOperation add)
476 {
477 Lock sync(*this);
478 assert(!_destroyed);
479
480 // Don't remove what needs to be added
481 remove = static_cast<SocketOperation>(remove & ~add);
482
483 // Don't remove/add if already un-registered or registered
484 remove = static_cast<SocketOperation>(handler->_registered & remove);
485 add = static_cast<SocketOperation>(~handler->_registered & add);
486 if(remove == add)
487 {
488 return;
489 }
490
491 _selector.update(handler.get(), remove, add);
492 }
493
494 bool
finish(const EventHandlerPtr & handler,bool closeNow)495 IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
496 {
497 Lock sync(*this);
498 assert(!_destroyed);
499 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
500 closeNow = _selector.finish(handler.get(), closeNow); // This must be called before!
501 _workQueue->queue(new FinishedWorkItem(handler, !closeNow));
502 return closeNow;
503 #else
504 UNREFERENCED_PARAMETER(closeNow);
505
506 // If there are no pending asynchronous operations, we can call finish on the handler now.
507 if(!handler->_pending)
508 {
509 _workQueue->queue(new FinishedWorkItem(handler, false));
510 _selector.finish(handler.get());
511 }
512 else
513 {
514 handler->_finish = true;
515 }
516 return true; // Always close now to interrupt the pending call.
517 #endif
518 }
519
520 void
ready(const EventHandlerPtr & handler,SocketOperation op,bool value)521 IceInternal::ThreadPool::ready(const EventHandlerPtr& handler, SocketOperation op, bool value)
522 {
523 Lock sync(*this);
524 if(_destroyed)
525 {
526 return;
527 }
528 _selector.ready(handler.get(), op, value);
529 }
530
531 void
dispatchFromThisThread(const DispatchWorkItemPtr & workItem)532 IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workItem)
533 {
534 if(_dispatcher)
535 {
536 try
537 {
538 #ifdef ICE_CPP11_MAPPING
539 _dispatcher([workItem]()
540 {
541 workItem->run();
542 },
543 workItem->getConnection());
544 #else
545 _dispatcher->dispatch(workItem, workItem->getConnection());
546 #endif
547 }
548 catch(const std::exception& ex)
549 {
550 if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
551 {
552 Warning out(_instance->initializationData().logger);
553 out << "dispatch exception:\n" << ex;
554 }
555 }
556 catch(...)
557 {
558 if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
559 {
560 Warning out(_instance->initializationData().logger);
561 out << "dispatch exception:\nunknown c++ exception";
562 }
563 }
564 }
565 else
566 {
567 workItem->run();
568 }
569 }
570
571 void
dispatch(const DispatchWorkItemPtr & workItem)572 IceInternal::ThreadPool::dispatch(const DispatchWorkItemPtr& workItem)
573 {
574 Lock sync(*this);
575 if(_destroyed)
576 {
577 throw CommunicatorDestroyedException(__FILE__, __LINE__);
578 }
579 _workQueue->queue(workItem);
580 }
581
582 void
joinWithAllThreads()583 IceInternal::ThreadPool::joinWithAllThreads()
584 {
585 assert(_destroyed);
586
587 //
588 // _threads is immutable after destroy() has been called,
589 // therefore no synchronization is needed. (Synchronization
590 // wouldn't be possible here anyway, because otherwise the other
591 // threads would never terminate.)
592 //
593 for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
594 {
595 (*p)->getThreadControl().join();
596 }
597 _selector.destroy();
598 }
599
600 string
prefix() const601 IceInternal::ThreadPool::prefix() const
602 {
603 return _prefix;
604 }
605
606 void
run(const EventHandlerThreadPtr & thread)607 IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
608 {
609 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
610 ThreadPoolCurrent current(_instance, this, thread);
611 bool select = false;
612 while(true)
613 {
614 if(current._handler)
615 {
616 try
617 {
618 current._handler->message(current);
619 }
620 catch(const ThreadPoolDestroyedException&)
621 {
622 return;
623 }
624 catch(const exception& ex)
625 {
626 Error out(_instance->initializationData().logger);
627 out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
628 << current._handler->toString();
629 }
630 catch(...)
631 {
632 Error out(_instance->initializationData().logger);
633 out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
634 }
635 }
636 else if(select)
637 {
638 try
639 {
640 _selector.select(_serverIdleTime);
641 }
642 catch(const SelectorTimeoutException&)
643 {
644 Lock sync(*this);
645 if(!_destroyed && _inUse == 0)
646 {
647 _workQueue->queue(new ShutdownWorkItem(_instance)); // Select timed-out.
648 }
649 continue;
650 }
651 }
652
653 {
654 Lock sync(*this);
655 if(!current._handler)
656 {
657 if(select)
658 {
659 _selector.finishSelect(_handlers);
660 _nextHandler = _handlers.begin();
661 select = false;
662 }
663 else if(!current._leader && followerWait(current))
664 {
665 return; // Wait timed-out.
666 }
667 }
668 else if(_sizeMax > 1)
669 {
670 if(!current._ioCompleted)
671 {
672 //
673 // The handler didn't call ioCompleted() so we take care of decreasing
674 // the IO thread count now.
675 //
676 --_inUseIO;
677 }
678 else
679 {
680 //
681 // If the handler called ioCompleted(), we re-enable the handler in
682 // case it was disabled and we decrease the number of thread in use.
683 //
684 if(_serialize && current._handler.get() != _workQueue.get())
685 {
686 _selector.enable(current._handler.get(), current.operation);
687 }
688 assert(_inUse > 0);
689 --_inUse;
690 }
691
692 if(!current._leader && followerWait(current))
693 {
694 return; // Wait timed-out.
695 }
696 }
697
698 //
699 // Get the next ready handler.
700 //
701 while(_nextHandler != _handlers.end() &&
702 !(_nextHandler->second & ~_nextHandler->first->_disabled & _nextHandler->first->_registered))
703 {
704 ++_nextHandler;
705 }
706 if(_nextHandler != _handlers.end())
707 {
708 current._ioCompleted = false;
709 current._handler = ICE_GET_SHARED_FROM_THIS(_nextHandler->first);
710 current.operation = _nextHandler->second;
711 ++_nextHandler;
712 thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForIO));
713 }
714 else
715 {
716 current._handler = 0;
717 }
718
719 if(!current._handler)
720 {
721 //
722 // If there are no more ready handlers and there are still threads busy performing
723 // IO, we give up leadership and promote another follower (which will perform the
724 // select() only once all the IOs are completed). Otherwise, if there are no more
725 // threads peforming IOs, it's time to do another select().
726 //
727 if(_inUseIO > 0)
728 {
729 promoteFollower(current);
730 }
731 else
732 {
733 _handlers.clear();
734 _selector.startSelect();
735 select = true;
736 thread->setState(ICE_ENUM(ThreadState, ThreadStateIdle));
737 }
738 }
739 else if(_sizeMax > 1)
740 {
741 //
742 // Increment the IO thread count and if there are still threads available
743 // to perform IO and more handlers ready, we promote a follower.
744 //
745 ++_inUseIO;
746 if(_nextHandler != _handlers.end() && _inUseIO < _sizeIO)
747 {
748 promoteFollower(current);
749 }
750 }
751 }
752 }
753 #else
754 ThreadPoolCurrent current(_instance, this, thread);
755 while(true)
756 {
757 try
758 {
759 current._ioCompleted = false;
760 #ifdef ICE_OS_UWP
761 current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, _threadIdleTime));
762 #else
763 current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, current._count,
764 current._error, _threadIdleTime));
765 #endif
766 }
767 catch(const SelectorTimeoutException&)
768 {
769 if(_sizeMax > 1)
770 {
771 Lock sync(*this);
772
773 if(_destroyed)
774 {
775 continue;
776 }
777 else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit.
778 {
779 #ifndef ICE_OS_UWP
780 BOOL hasIO = false;
781 GetThreadIOPendingFlag(GetCurrentThread(), &hasIO);
782 if(hasIO)
783 {
784 continue;
785 }
786 #endif
787 if(_instance->traceLevels()->threadPool >= 1)
788 {
789 Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
790 out << "shrinking " << _prefix << ": Size = " << (_threads.size() - 1);
791 }
792 _threads.erase(thread);
793 _workQueue->queue(new JoinThreadWorkItem(thread));
794 return;
795 }
796 else if(_inUse > 0)
797 {
798 //
799 // If this is the last idle thread but there are still other threads
800 // busy dispatching, we go back waiting with _threadIdleTime. We only
801 // wait with _serverIdleTime when there's only one thread left.
802 //
803 continue;
804 }
805 assert(_threads.size() == 1);
806 }
807
808 try
809 {
810 #ifdef ICE_OS_UWP
811 current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, _serverIdleTime));
812 #else
813
814 current._handler = ICE_GET_SHARED_FROM_THIS(_selector.getNextHandler(current.operation, current._count,
815 current._error, _serverIdleTime));
816 #endif
817 }
818 catch(const SelectorTimeoutException&)
819 {
820 Lock sync(*this);
821 if(!_destroyed)
822 {
823 _workQueue->queue(new ShutdownWorkItem(_instance));
824 }
825 continue;
826 }
827 }
828
829 {
830 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
831 thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForIO));
832 }
833
834 try
835 {
836 assert(current._handler);
837 current._handler->message(current);
838 }
839 catch(const ThreadPoolDestroyedException&)
840 {
841 return;
842 }
843 catch(const exception& ex)
844 {
845 Error out(_instance->initializationData().logger);
846 out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString();
847 }
848 #ifdef ICE_OS_UWP
849 catch(Platform::Exception^ ex)
850 {
851 //
852 // We don't need to pass the wide string converter in the call to wstringToString
853 // because the wide string is using the platform default encoding.
854 //
855 Error out(_instance->initializationData().logger);
856 out << "exception in `" << _prefix << "':\n"
857 << wstringToString(ex->Message->Data(), _instance->getStringConverter())
858 << "\nevent handler: " << current._handler->toString();
859 }
860 #endif
861 catch(...)
862 {
863 Error out(_instance->initializationData().logger);
864 out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
865 }
866
867 {
868 Lock sync(*this);
869 if(_sizeMax > 1 && current._ioCompleted)
870 {
871 assert(_inUse > 0);
872 --_inUse;
873 }
874 thread->setState(ICE_ENUM(ThreadState, ThreadStateIdle));
875 }
876 }
877 #endif
878 }
879
880 bool
ioCompleted(ThreadPoolCurrent & current)881 IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
882 {
883 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
884
885 current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called.
886
887 current._thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForUser));
888
889 if(_sizeMax > 1)
890 {
891
892 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
893 --_inUseIO;
894
895 if(!_destroyed)
896 {
897 if(_serialize && current._handler.get() != _workQueue.get())
898 {
899 _selector.disable(current._handler.get(), current.operation);
900 }
901 }
902
903 if(current._leader)
904 {
905 //
906 // If this thread is still the leader, it's time to promote a new leader.
907 //
908 promoteFollower(current);
909 }
910 else if(_promote && (_nextHandler != _handlers.end() || _inUseIO == 0))
911 {
912 notify();
913 }
914 #endif
915
916 assert(_inUse >= 0);
917 ++_inUse;
918
919 if(_inUse == _sizeWarn)
920 {
921 Warning out(_instance->initializationData().logger);
922 out << "thread pool `" << _prefix << "' is running low on threads\n"
923 << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
924 }
925
926 if(!_destroyed)
927 {
928 assert(_inUse <= static_cast<int>(_threads.size()));
929 if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size()))
930 {
931 if(_instance->traceLevels()->threadPool >= 1)
932 {
933 Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
934 out << "growing " << _prefix << ": Size=" << _threads.size() + 1;
935 }
936
937 try
938 {
939 EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId());
940 if(_hasPriority)
941 {
942 thread->start(_stackSize, _priority);
943 }
944 else
945 {
946 thread->start(_stackSize);
947 }
948 _threads.insert(thread);
949 }
950 catch(const IceUtil::Exception& ex)
951 {
952 Error out(_instance->initializationData().logger);
953 out << "cannot create thread for `" << _prefix << "':\n" << ex;
954 }
955 }
956 }
957 }
958
959 return _serialize && current._handler.get() != _workQueue.get();
960 }
961
962 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
963 bool
startMessage(ThreadPoolCurrent & current)964 IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
965 {
966 assert(current._handler->_pending & current.operation);
967
968 if(current._handler->_started & current.operation)
969 {
970 assert(!(current._handler->_completed & current.operation));
971 current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed | current.operation);
972 current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation);
973
974 #ifndef ICE_OS_UWP
975 AsyncInfo* info = current._handler->getNativeInfo()->getAsyncInfo(current.operation);
976 info->count = current._count;
977 info->error = current._error;
978 #endif
979
980 if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished.
981 {
982 current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
983 if(!current._handler->_pending && current._handler->_finish)
984 {
985 Lock sync(*this);
986 _workQueue->queue(new FinishedWorkItem(current._handler, false));
987 _selector.finish(current._handler.get());
988 }
989 return false;
990 }
991 }
992 else if(!(current._handler->_completed & current.operation) && (current._handler->_registered & current.operation))
993 {
994 assert(!(current._handler->_started & current.operation));
995 if(current._handler->_ready & current.operation)
996 {
997 return true;
998 }
999 else if(!current._handler->startAsync(current.operation))
1000 {
1001 current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1002 if(!current._handler->_pending && current._handler->_finish)
1003 {
1004 Lock sync(*this);
1005 _workQueue->queue(new FinishedWorkItem(current._handler, false));
1006 _selector.finish(current._handler.get());
1007 }
1008 return false;
1009 }
1010 else
1011 {
1012 current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
1013 return false;
1014 }
1015 }
1016
1017 if(current._handler->_registered & current.operation)
1018 {
1019 assert(current._handler->_completed & current.operation);
1020 current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed & ~current.operation);
1021 return true;
1022 }
1023 else
1024 {
1025 current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1026 if(!current._handler->_pending && current._handler->_finish)
1027 {
1028 Lock sync(*this);
1029 _workQueue->queue(new FinishedWorkItem(current._handler, false));
1030 _selector.finish(current._handler.get());
1031 }
1032 return false;
1033 }
1034 }
1035
1036 void
finishMessage(ThreadPoolCurrent & current)1037 IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
1038 {
1039 if(current._handler->_registered & current.operation && !current._handler->_finish)
1040 {
1041 assert(!(current._handler->_completed & current.operation));
1042 if(current._handler->_ready & current.operation)
1043 {
1044 _selector.completed(current._handler.get(), current.operation);
1045 }
1046 else if(!current._handler->startAsync(current.operation))
1047 {
1048 current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1049 }
1050 else
1051 {
1052 assert(current._handler->_pending & current.operation);
1053 current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
1054 }
1055 }
1056 else
1057 {
1058 current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
1059 }
1060
1061 if(!current._handler->_pending && current._handler->_finish)
1062 {
1063 // There are no more pending async operations, it's time to call finish.
1064 Lock sync(*this);
1065 _workQueue->queue(new FinishedWorkItem(current._handler, false));
1066 _selector.finish(current._handler.get());
1067 }
1068 }
1069 #else
1070 void
promoteFollower(ThreadPoolCurrent & current)1071 IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current)
1072 {
1073 assert(!_promote && current._leader);
1074 _promote = true;
1075 if(_inUseIO < _sizeIO && (_nextHandler != _handlers.end() || _inUseIO == 0))
1076 {
1077 notify();
1078 }
1079 current._leader = false;
1080 }
1081
1082 bool
followerWait(ThreadPoolCurrent & current)1083 IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current)
1084 {
1085 assert(!current._leader);
1086
1087 current._thread->setState(ICE_ENUM(ThreadState, ThreadStateIdle));
1088
1089 //
1090 // It's important to clear the handler before waiting to make sure that
1091 // resources for the handler are released now if it's finished. We also
1092 // clear the per-thread stream.
1093 //
1094 current._handler = 0;
1095 current.stream.clear();
1096 current.stream.b.clear();
1097
1098 //
1099 // Wait to be promoted and for all the IO threads to be done.
1100 //
1101 while(!_promote || _inUseIO == _sizeIO || (_nextHandler == _handlers.end() && _inUseIO > 0))
1102 {
1103 if(_threadIdleTime)
1104 {
1105 if(!timedWait(IceUtil::Time::seconds(_threadIdleTime)))
1106 {
1107 if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
1108 (_nextHandler == _handlers.end() && _inUseIO > 0)))
1109 {
1110 if(_instance->traceLevels()->threadPool >= 1)
1111 {
1112 Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
1113 out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1);
1114 }
1115 assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
1116 _threads.erase(current._thread);
1117 _workQueue->queue(new JoinThreadWorkItem(current._thread));
1118 return true;
1119 }
1120 }
1121 }
1122 else
1123 {
1124 wait();
1125 }
1126 }
1127 current._leader = true; // The current thread has become the leader.
1128 _promote = false;
1129 return false;
1130 }
1131 #endif
1132
1133 string
nextThreadId()1134 IceInternal::ThreadPool::nextThreadId()
1135 {
1136 ostringstream os;
1137 os << _prefix << "-" << _nextThreadId++;
1138 return os.str();
1139 }
1140
EventHandlerThread(const ThreadPoolPtr & pool,const string & name)1141 IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool, const string& name) :
1142 IceUtil::Thread(name),
1143 _pool(pool),
1144 _state(ICE_ENUM(ThreadState, ThreadStateIdle))
1145 {
1146 updateObserver();
1147 }
1148
1149 void
updateObserver()1150 IceInternal::ThreadPool::EventHandlerThread::updateObserver()
1151 {
1152 // Must be called with the thread pool mutex locked
1153 const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer;
1154 if(obsv)
1155 {
1156 _observer.attach(obsv->getThreadObserver(_pool->_prefix, name(), _state, _observer.get()));
1157 }
1158 }
1159
1160 void
setState(Ice::Instrumentation::ThreadState s)1161 IceInternal::ThreadPool::EventHandlerThread::setState(Ice::Instrumentation::ThreadState s)
1162 {
1163 // Must be called with the thread pool mutex locked
1164 if(_observer)
1165 {
1166 if(_state != s)
1167 {
1168 _observer->stateChanged(_state, s);
1169 }
1170 }
1171 _state = s;
1172 }
1173
1174 void
run()1175 IceInternal::ThreadPool::EventHandlerThread::run()
1176 {
1177 #ifdef ICE_CPP11_MAPPING
1178 if(_pool->_instance->initializationData().threadStart)
1179 #else
1180 if(_pool->_instance->initializationData().threadHook)
1181 #endif
1182 {
1183 try
1184 {
1185 #ifdef ICE_CPP11_MAPPING
1186 _pool->_instance->initializationData().threadStart();
1187 #else
1188 _pool->_instance->initializationData().threadHook->start();
1189 #endif
1190 }
1191 catch(const exception& ex)
1192 {
1193 Error out(_pool->_instance->initializationData().logger);
1194 out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
1195 }
1196 catch(...)
1197 {
1198 Error out(_pool->_instance->initializationData().logger);
1199 out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "'";
1200 }
1201 }
1202
1203 try
1204 {
1205 _pool->run(this);
1206 }
1207 catch(const exception& ex)
1208 {
1209 Error out(_pool->_instance->initializationData().logger);
1210 out << "exception in `" << _pool->_prefix << "':\n" << ex;
1211 }
1212 catch(...)
1213 {
1214 Error out(_pool->_instance->initializationData().logger);
1215 out << "unknown exception in `" << _pool->_prefix << "'";
1216 }
1217
1218 _observer.detach();
1219
1220 #ifdef ICE_CPP11_MAPPING
1221 if(_pool->_instance->initializationData().threadStop)
1222 #else
1223 if(_pool->_instance->initializationData().threadHook)
1224 #endif
1225 {
1226 try
1227 {
1228 #ifdef ICE_CPP11_MAPPING
1229 _pool->_instance->initializationData().threadStop();
1230 #else
1231 _pool->_instance->initializationData().threadHook->stop();
1232 #endif
1233 }
1234 catch(const exception& ex)
1235 {
1236 Error out(_pool->_instance->initializationData().logger);
1237 out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
1238 }
1239 catch(...)
1240 {
1241 Error out(_pool->_instance->initializationData().logger);
1242 out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "'";
1243 }
1244 }
1245
1246 _pool = 0; // Break cyclic dependency.
1247 }
1248
ThreadPoolCurrent(const InstancePtr & instance,const ThreadPoolPtr & threadPool,const ThreadPool::EventHandlerThreadPtr & thread)1249 ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance,
1250 const ThreadPoolPtr& threadPool,
1251 const ThreadPool::EventHandlerThreadPtr& thread) :
1252 operation(SocketOperationNone),
1253 stream(instance.get(), Ice::currentProtocolEncoding),
1254 _threadPool(threadPool.get()),
1255 _thread(thread),
1256 _ioCompleted(false)
1257 #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_UWP)
1258 , _leader(false)
1259 #endif
1260 {
1261 }
1262