1 #ifndef UTIL__THREAD_POOL_OLD__HPP
2 #define UTIL__THREAD_POOL_OLD__HPP
3
4 /* $Id: thread_pool_old.hpp 489095 2016-01-08 13:02:41Z ivanov $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Author: Aaron Ucko
30 *
31 * File Description:
32 * Pools of generic request-handling threads.
33 *
34 * TEMPLATES:
35 * CBlockingQueue<> -- queue of requests, with efficiently blocking Get()
36 * CThreadInPool<> -- abstract request-handling thread
37 * CPoolOfThreads<> -- abstract pool of threads sharing a request queue
38 *
39 * SPECIALIZATIONS:
40 * CStdRequest -- abstract request type
41 * CStdThreadInPool -- thread handling CStdRequest
42 * CStdPoolOfThreads -- pool of threads handling CStdRequest
43 */
44
45 #include <corelib/ncbistd.hpp>
46 #include <corelib/ncbithr.hpp>
47 #include <corelib/ncbitime.hpp>
48 #include <corelib/ncbi_limits.hpp>
49 #include <corelib/ncbi_param.hpp>
50 #include <util/util_exception.hpp>
51 #include <util/error_codes.hpp>
52
53 #include <set>
54
55
56 /** @addtogroup ThreadedPools
57 *
58 * @{
59 */
60
61 BEGIN_NCBI_SCOPE
62
63
64 /////////////////////////////////////////////////////////////////////////////
65 ///
66 /// CQueueItemBase -- skeleton blocking-queue item, sans actual request
67
68 class CQueueItemBase : public CObject {
69 public:
70 enum EStatus {
71 ePending, ///< still in the queue
72 eActive, ///< extracted but not yet released
73 eComplete, ///< extracted and released
74 eWithdrawn, ///< dropped by submitter's request
75 eForciblyCaught ///< let an exception escape
76 };
77
78 /// Every request has an associated 32-bit priority field, but
79 /// only the top eight bits are under direct user control. (The
80 /// rest are a counter.)
81 typedef Uint4 TPriority;
82 typedef Uint1 TUserPriority;
83
CQueueItemBase(TPriority priority)84 CQueueItemBase(TPriority priority)
85 : m_Priority(priority), m_Status(ePending)
86 { }
87
operator >(const CQueueItemBase & item) const88 bool operator> (const CQueueItemBase& item) const
89 { return m_Priority > item.m_Priority; }
90
GetPriority(void) const91 const TPriority& GetPriority(void) const { return m_Priority; }
GetStatus(void) const92 const EStatus& GetStatus(void) const { return m_Status; }
GetUserPriority(void) const93 TUserPriority GetUserPriority(void) const { return TUserPriority(m_Priority >> 24); }
94
MarkAsComplete(void)95 void MarkAsComplete(void) { x_SetStatus(eComplete); }
MarkAsForciblyCaught(void)96 void MarkAsForciblyCaught(void) { x_SetStatus(eForciblyCaught); }
97
98 protected:
99 TPriority m_Priority;
100 EStatus m_Status;
101
x_SetStatus(EStatus new_status)102 virtual void x_SetStatus(EStatus new_status)
103 { m_Status = new_status; }
104 };
105
106
107 /////////////////////////////////////////////////////////////////////////////
108 ///
109 /// CBlockingQueue<> -- queue of requests, with efficiently blocking Get()
110
111 template <typename TRequest>
112 class CBlockingQueue
113 {
114 public:
115 typedef CQueueItemBase::TPriority TPriority;
116 typedef CQueueItemBase::TUserPriority TUserPriority;
117
118 class CQueueItem;
119 typedef CRef<CQueueItem> TItemHandle;
120
121 /// It may be desirable to store handles obtained from GetHandle() in
122 /// instances of CCompletingHandle to ensure that they are marked as
123 /// complete when all is said and done, even in the face of exceptions.
124 class CCompletingHandle : public TItemHandle
125 {
126 public:
CCompletingHandle(const TItemHandle & h)127 CCompletingHandle(const TItemHandle& h)
128 : TItemHandle(h)
129 { }
130
~CCompletingHandle()131 ~CCompletingHandle() {
132 if (this->NotEmpty()) {
133 this->GetObject().MarkAsComplete();
134 }
135 }
136 };
137
138 /// Constructor
139 ///
140 /// @param max_size
141 /// The maximum size of the queue (may not be zero!)
CBlockingQueue(size_t max_size=kMax_UInt)142 CBlockingQueue(size_t max_size = kMax_UInt)
143 : m_GetSem(0,1), m_PutSem(1,1), m_HungerSem(0,1), m_HungerCnt(0),
144 m_MaxSize(min(max_size, size_t(0xFFFFFF))),
145 m_RequestCounter(0xFFFFFF)
146 { _ASSERT(max_size > 0); }
147
148 /// Put a request into the queue. If the queue remains full for
149 /// the duration of the (optional) timeout, throw an exception.
150 ///
151 /// @param request
152 /// Request
153 /// @param priority
154 /// The priority of the request. The higher the priority
155 /// the sooner the request will be processed.
156 /// @param timeout_sec
157 /// Number of whole seconds in timeout
158 /// @param timeout_nsec
159 /// Number of additional nanoseconds in timeout
160 TItemHandle Put(const TRequest& request, TUserPriority priority = 0,
161 unsigned int timeout_sec = 0,
162 unsigned int timeout_nsec = 0);
163
164 /// Wait for room in the queue for up to
165 /// timeout_sec + timeout_nsec/1E9 seconds.
166 ///
167 /// @param timeout_sec
168 /// Number of seconds
169 /// @param timeout_nsec
170 /// Number of nanoseconds
171 void WaitForRoom(unsigned int timeout_sec = kMax_UInt,
172 unsigned int timeout_nsec = 0) const;
173
174 /// Wait for the queue to have waiting readers, for up to
175 /// timeout_sec + timeout_nsec/1E9 seconds.
176 ///
177 /// @param timeout_sec
178 /// Number of seconds
179 /// @param timeout_nsec
180 /// Number of nanoseconds
181 void WaitForHunger(unsigned int timeout_sec = kMax_UInt,
182 unsigned int timeout_nsec = 0) const;
183
184 /// Get the first available request from the queue, and return a
185 /// handle to it.
186 /// Blocks politely if empty.
187 /// Waits up to timeout_sec + timeout_nsec/1E9 seconds.
188 ///
189 /// @param timeout_sec
190 /// Number of seconds
191 /// @param timeout_nsec
192 /// Number of nanoseconds
193 TItemHandle GetHandle(unsigned int timeout_sec = kMax_UInt,
194 unsigned int timeout_nsec = 0);
195
196 /// Get the first available request from the queue, and return
197 /// just the request.
198 /// Blocks politely if empty.
199 /// Waits up to timeout_sec + timeout_nsec/1E9 seconds.
200 ///
201 /// @param timeout_sec
202 /// Number of seconds
203 /// @param timeout_nsec
204 /// Number of nanoseconds
205 NCBI_DEPRECATED
206 TRequest Get(unsigned int timeout_sec = kMax_UInt,
207 unsigned int timeout_nsec = 0);
208
209 /// Get the number of requests in the queue
210 size_t GetSize (void) const;
211
212 /// Get the maximun number of requests that can be put into the queue
GetMaxSize(void) const213 size_t GetMaxSize (void) const { return m_MaxSize; }
214
215 /// Check if the queue is empty
IsEmpty(void) const216 bool IsEmpty (void) const { return GetSize() == 0; }
217
218 /// Check if the queue is full
IsFull(void) const219 bool IsFull (void) const { return GetSize() == GetMaxSize(); }
220
221 /// Adjust a pending request's priority.
222 void SetUserPriority(TItemHandle handle, TUserPriority priority);
223
224 /// Withdraw a pending request from consideration.
225 void Withdraw(TItemHandle handle);
226
227 /// Get the number of threads waiting for requests, for debugging
228 /// purposes only.
GetHunger(void) const229 size_t GetHunger(void) const { return m_HungerCnt; }
230
231 class CQueueItem : public CQueueItemBase
232 {
233 public:
234 // typedef CBlockingQueue<TRequest> TQueue;
CQueueItem(Uint4 priority,TRequest request)235 CQueueItem(Uint4 priority, TRequest request)
236 : CQueueItemBase(priority), m_Request(request)
237 { }
238
GetRequest(void) const239 const TRequest& GetRequest(void) const { return m_Request; }
SetRequest(void)240 TRequest& SetRequest(void) { return m_Request; }
241 // void SetUserPriority(TUserPriority p);
242 // void Withdraw(void);
243
244 protected:
245 // Specialized for CRef<CStdRequest> in thread_pool.cpp
x_SetStatus(EStatus new_status)246 void x_SetStatus(EStatus new_status)
247 { CQueueItemBase::x_SetStatus(new_status); }
248
249 private:
250 friend class CBlockingQueue<TRequest>;
251
252 // TQueue& m_Queue;
253 TRequest m_Request;
254 };
255
256 protected:
257 struct SItemHandleGreater {
operator ()CBlockingQueue::SItemHandleGreater258 bool operator()(const TItemHandle& i1, const TItemHandle& i2) const
259 { return static_cast<CQueueItemBase>(*i1)
260 > static_cast<CQueueItemBase>(*i2); }
261 };
262
263 /// The type of the queue
264 typedef set<TItemHandle, SItemHandleGreater> TRealQueue;
265
266 // Derived classes should take care to use these members properly.
267 volatile TRealQueue m_Queue; ///< The queue
268 CSemaphore m_GetSem; ///< Raised if the queue contains data
269 mutable CSemaphore m_PutSem; ///< Raised if the queue has room
270 mutable CSemaphore m_HungerSem; ///< Raised if Get[Handle] has to wait
271 mutable CMutex m_Mutex; ///< Guards access to queue
272 size_t m_HungerCnt; ///< Number of threads waiting for data
273
274 private:
275 size_t m_MaxSize; ///< The maximum size of the queue
276 Uint4 m_RequestCounter; ///
277
278 typedef bool (CBlockingQueue::*TQueuePredicate)(const TRealQueue& q) const;
279
x_GetSemPred(const TRealQueue & q) const280 bool x_GetSemPred(const TRealQueue& q) const
281 { return !q.empty(); }
x_PutSemPred(const TRealQueue & q) const282 bool x_PutSemPred(const TRealQueue& q) const
283 { return q.size() < m_MaxSize; }
x_HungerSemPred(const TRealQueue & q) const284 bool x_HungerSemPred(const TRealQueue& q) const
285 { return m_HungerCnt > q.size(); }
286
287 bool x_WaitForPredicate(TQueuePredicate pred, CSemaphore& sem,
288 CMutexGuard& guard, unsigned int timeout_sec,
289 unsigned int timeout_nsec) const;
290
291 private:
292 /// forbidden
293 CBlockingQueue(const CBlockingQueue&);
294 CBlockingQueue& operator=(const CBlockingQueue&);
295 };
296
297
298 /////////////////////////////////////////////////////////////////////////////
299 ///
300 /// CThreadInPool<> -- abstract request-handling thread
301
302 template <typename TRequest> class CPoolOfThreads;
303
304 template <typename TRequest>
305 class CThreadInPool : public CThread
306 {
307 public:
308 typedef CPoolOfThreads<TRequest> TPool;
309 typedef typename CBlockingQueue<TRequest>::TItemHandle TItemHandle;
310 typedef typename CBlockingQueue<TRequest>::CCompletingHandle
311 TCompletingHandle;
312
313 /// Thread run mode
314 enum ERunMode {
315 eNormal, ///< Process request and stay in the pool
316 eRunOnce ///< Process request and die
317 };
318
319 /// Constructor
320 ///
321 /// @param pool
322 /// A pool where this thead is placed
323 /// @param mode
324 /// A running mode of this thread
CThreadInPool(TPool * pool,ERunMode mode=eNormal)325 CThreadInPool(TPool* pool, ERunMode mode = eNormal)
326 : m_Pool(pool), m_RunMode(mode), m_Counter(NULL) {}
327
328 void CountSelf(CAtomicCounter* counter);
329
330 protected:
331 /// Destructor
332 virtual ~CThreadInPool(void);
333
334 /// Intit this thread. It is called at beginning of Main()
Init(void)335 virtual void Init(void) {}
336
337 /// Process a request.
338 /// It is called from Main() for each request this thread handles
339 ///
340 /// @param
341 /// A request for processing
342 virtual void ProcessRequest(TItemHandle handle);
343
344 /// Older interface (still delegated to by default)
345 virtual void ProcessRequest(const TRequest& req) = 0;
346
347 /// Clean up. It is called by OnExit()
x_OnExit(void)348 virtual void x_OnExit(void) {}
349
350 /// Get run mode
GetRunMode(void) const351 ERunMode GetRunMode(void) const { return m_RunMode; }
352
353 private:
354 // to prevent overriding; inherited from CThread
355 virtual void* Main(void);
356 virtual void OnExit(void);
357
358 void x_HandleOneRequest(bool catch_all);
359 void x_UnregisterThread(void);
360
361 class CAutoUnregGuard
362 {
363 public:
364 typedef CThreadInPool<TRequest> TThread;
365 CAutoUnregGuard(TThread* thr);
366 ~CAutoUnregGuard(void);
367
368 private:
369 TThread* m_Thread;
370 };
371
372 friend class CAutoUnregGuard;
373
374
375 TPool* m_Pool; ///< The pool that holds this thread
376 ERunMode m_RunMode; ///< How long to keep running
377 CAtomicCounter* m_Counter;
378 };
379
380
381 /////////////////////////////////////////////////////////////////////////////
382 ///
383 /// CPoolOfThreads<> -- abstract pool of threads sharing a request queue
384
385 template <typename TRequest>
386 class CPoolOfThreads
387 {
388 public:
389 typedef CThreadInPool<TRequest> TThread;
390 typedef typename TThread::ERunMode ERunMode;
391
392 typedef CBlockingQueue<TRequest> TQueue;
393 typedef typename TQueue::TUserPriority TUserPriority;
394 typedef typename TQueue::TItemHandle TItemHandle;
395
396 /// Constructor
397 ///
398 /// @param max_threads
399 /// The maximum number of threads that this pool can run
400 /// @param queue_size
401 /// The maximum number of requests in the queue
402 /// @param spawn_threashold
403 /// The number of requests in the queue after which
404 /// a new thread is started
405 /// @param max_urgent_threads
406 /// The maximum number of urgent threads running simultaneously
407 CPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
408 unsigned int spawn_threshold = 1,
409 unsigned int max_urgent_threads = kMax_UInt,
410 const string& thread_name = kEmptyStr);
411
412 /// Destructor
413 virtual ~CPoolOfThreads(void);
414
415 /// Start processing threads
416 ///
417 /// @param num_threads
418 /// The number of threads to start
419 void Spawn(unsigned int num_threads);
420
421 /// Put a request in the queue with a given priority
422 ///
423 /// @param request
424 /// A request
425 /// @param priority
426 /// The priority of the request. The higher the priority
427 /// the sooner the request will be processed.
428 TItemHandle AcceptRequest(const TRequest& request,
429 TUserPriority priority = 0,
430 unsigned int timeout_sec = 0,
431 unsigned int timeout_nsec = 0);
432
433 /// Puts a request in the queue with the highest priority
434 /// It will run a new thread even if the maximum of allowed threads
435 /// has been already reached
436 ///
437 /// @param request
438 /// A request
439 TItemHandle AcceptUrgentRequest(const TRequest& request,
440 unsigned int timeout_sec = 0,
441 unsigned int timeout_nsec = 0);
442
443 /// Wait for the room in the queue up to
444 /// timeout_sec + timeout_nsec/1E9 seconds.
445 ///
446 /// @param timeout_sec
447 /// Number of seconds
448 /// @param timeout_nsec
449 /// Number of nanoseconds
450 void WaitForRoom(unsigned int timeout_sec = kMax_UInt,
451 unsigned int timeout_nsec = 0);
452
453 /// Check if the queue is full
IsFull(void) const454 bool IsFull(void) const { return m_Queue.IsFull(); }
455
456 /// Check if the queue is empty
IsEmpty(void) const457 bool IsEmpty(void) const { return m_Queue.IsEmpty(); }
458
459 /// Check whether a new request could be immediately processed
460 ///
461 /// @param urgent
462 /// Whether the request would be urgent.
463 bool HasImmediateRoom(bool urgent = false) const;
464
465 /// Adjust a pending request's priority.
466 void SetUserPriority(TItemHandle handle, TUserPriority priority);
467
468 /// Withdraw a pending request from consideration.
Withdraw(TItemHandle handle)469 void Withdraw(TItemHandle handle)
470 { m_Queue.Withdraw(handle); }
471
472 /// Get the number of requests in the queue
GetQueueSize(void) const473 size_t GetQueueSize(void) const
474 { return m_Queue.GetSize(); }
475
476
477 protected:
478
479 /// Create a new thread
480 ///
481 /// @param mode
482 /// How long the thread should stay around
483 virtual TThread* NewThread(ERunMode mode) = 0;
484
485 /// Register a thread. It is called by TThread::Main.
486 /// It should detach a thread if not tracking
487 ///
488 /// @param thread
489 /// A thread to register
Register(TThread & thread)490 virtual void Register(TThread& thread) { thread.Detach(); }
491
492 /// Unregister a thread
493 ///
494 /// @param thread
495 /// A thread to unregister
UnRegister(TThread &)496 virtual void UnRegister(TThread&) {}
497
498
499 typedef CAtomicCounter::TValue TACValue;
500
501 /// The maximum number of threads the pool can hold
502 volatile TACValue m_MaxThreads;
503 /// The maximum number of urgent threads running simultaneously
504 volatile TACValue m_MaxUrgentThreads;
505 int m_Threshold; ///< for delta
506 /// The current number of threads in the pool
507 CAtomicCounter m_ThreadCount;
508 /// The current number of urgent threads running now
509 CAtomicCounter m_UrgentThreadCount;
510 /// The difference between the number of unfinished requests and
511 /// the total number of threads in the pool.
512 volatile int m_Delta;
513 /// The guard for m_MaxThreads, m_MaxUrgentThreads, and m_Delta.
514 mutable CMutex m_Mutex;
515 /// The request queue
516 TQueue m_Queue;
517 bool m_QueuingForbidden;
518
519 const string m_ThreadName;
520
521 private:
522 friend class CThreadInPool<TRequest>;
523 TItemHandle x_AcceptRequest(const TRequest& req,
524 TUserPriority priority,
525 bool urgent,
526 unsigned int timeout_sec = 0,
527 unsigned int timeout_nsec = 0);
528
529 void x_RunNewThread(ERunMode mode, CAtomicCounter* counter);
530 };
531
532 /////////////////////////////////////////////////////////////////////////////
533 //
534 // SPECIALIZATIONS:
535 //
536
537 /////////////////////////////////////////////////////////////////////////////
538 //
539 // CStdRequest -- abstract request type
540
541 class CStdRequest : public CObject
542 {
543 public:
544 ///Destructor
~CStdRequest(void)545 virtual ~CStdRequest(void) {}
546
547 /// Do the actual job
548 /// Called by whichever thread handles this request.
549 virtual void Process(void) = 0;
550
551 typedef CQueueItemBase::EStatus EStatus;
552
553 /// Callback for status changes
OnStatusChange(EStatus,EStatus)554 virtual void OnStatusChange(EStatus /* old */, EStatus /* new */) {}
555 };
556
557
558 EMPTY_TEMPLATE
559 inline
x_SetStatus(EStatus new_status)560 void CBlockingQueue<CRef<CStdRequest> >::CQueueItem::x_SetStatus
561 (EStatus new_status)
562 {
563 EStatus old_status = GetStatus();
564 CQueueItemBase::x_SetStatus(new_status);
565 m_Request->OnStatusChange(old_status, new_status);
566 }
567
568
569
570 /////////////////////////////////////////////////////////////////////////////
571 //
572 // CStdThreadInPool -- thread handling CStdRequest
573
574 class NCBI_XUTIL_EXPORT CStdThreadInPool
575 : public CThreadInPool< CRef< CStdRequest > >
576 {
577 public:
578 typedef CThreadInPool< CRef< CStdRequest > > TParent;
579
580 /// Constructor
581 ///
582 /// @param pool
583 /// A pool where this thead is placed
584 /// @param mode
585 /// A running mode of this thread
CStdThreadInPool(TPool * pool,ERunMode mode=eNormal)586 CStdThreadInPool(TPool* pool, ERunMode mode = eNormal)
587 : TParent(pool, mode) {}
588
589 protected:
590 /// Process a request.
591 ///
592 /// @param
593 /// A request for processing
ProcessRequest(const CRef<CStdRequest> & req)594 virtual void ProcessRequest(const CRef<CStdRequest>& req)
595 { const_cast<CStdRequest&>(*req).Process(); }
596
597 // Avoid shadowing the handle-based version.
ProcessRequest(TItemHandle handle)598 virtual void ProcessRequest(TItemHandle handle)
599 { TParent::ProcessRequest(handle); }
600 };
601
602 /////////////////////////////////////////////////////////////////////////////
603 //
604 // CStdPoolOfThreads -- pool of threads handling CStdRequest
605
606 class NCBI_XUTIL_EXPORT CStdPoolOfThreads
607 : public CPoolOfThreads< CRef< CStdRequest > >
608 {
609 public:
610 typedef CPoolOfThreads< CRef< CStdRequest > > TParent;
611
612 /// Constructor
613 ///
614 /// @param max_threads
615 /// The maximum number of threads that this pool can run
616 /// @param queue_size
617 /// The maximum number of requests in the queue
618 /// @param spawn_threshold
619 /// The number of requests in the queue after which
620 /// a new thread is started
621 /// @param max_urgent_threads
622 /// The maximum number of urgent threads running simultaneously
CStdPoolOfThreads(unsigned int max_threads,unsigned int queue_size,unsigned int spawn_threshold=1,unsigned int max_urgent_threads=kMax_UInt,const string & thread_name=kEmptyStr)623 CStdPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
624 unsigned int spawn_threshold = 1,
625 unsigned int max_urgent_threads = kMax_UInt,
626 const string& thread_name = kEmptyStr)
627 : TParent(max_threads, queue_size, spawn_threshold, max_urgent_threads,
628 thread_name)
629 {}
630
631 virtual ~CStdPoolOfThreads();
632
633 enum EKillFlags {
634 fKill_Wait = 0x1, ///< Wait for all threads in the pool to finish.
635 fKill_Reopen = 0x2 ///< Allow a fresh batch of worker threads.
636 };
637 typedef int TKillFlags; ///< binary OR of EKillFlags
638
639 /// Causes all threads in the pool to exit cleanly after finishing
640 /// all pending requests, optionally waiting for them to die.
641 ///
642 /// @param flags
643 /// Governs optional behavior
644 virtual void KillAllThreads(TKillFlags flags);
645
646 /// Causes all threads in the pool to exit cleanly after finishing
647 /// all pending requests, optionally waiting for them to die.
648 ///
649 /// @param wait
650 /// If true will wait until all thread in the pool finish their job
KillAllThreads(bool wait)651 virtual void KillAllThreads(bool wait)
652 { KillAllThreads(wait ? (fKill_Wait | fKill_Reopen) : fKill_Reopen); }
653
654 /// Register a thread.
655 ///
656 /// @param thread
657 /// A thread to register
658 virtual void Register(TThread& thread);
659
660 /// Unregister a thread
661 ///
662 /// @param thread
663 /// A thread to unregister
664 virtual void UnRegister(TThread& thread);
665
666 protected:
667 /// Create a new thread
668 ///
669 /// @param mode
670 /// A thread's running mode
NewThread(TThread::ERunMode mode)671 virtual TThread* NewThread(TThread::ERunMode mode)
672 { return new CStdThreadInPool(this, mode); }
673
674 private:
675 typedef list<CRef<TThread> > TThreads;
676 TThreads m_Threads;
677 };
678
679
680 NCBI_PARAM_DECL(bool, ThreadPool, Catch_Unhandled_Exceptions);
681 typedef NCBI_PARAM_TYPE(ThreadPool, Catch_Unhandled_Exceptions) TParamThreadPoolCatchExceptions;
682
683
684
685 /////////////////////////////////////////////////////////////////////////////
686
687 /////////////////////////////////////////////////////////////////////////////
688 // IMPLEMENTATION of INLINE functions
689 /////////////////////////////////////////////////////////////////////////////
690
691
692 /////////////////////////////////////////////////////////////////////////////
693 // CBlockingQueue<>::
694 //
695
696 template <typename TRequest>
697 typename CBlockingQueue<TRequest>::TItemHandle
Put(const TRequest & data,TUserPriority priority,unsigned int timeout_sec,unsigned int timeout_nsec)698 CBlockingQueue<TRequest>::Put(const TRequest& data, TUserPriority priority,
699 unsigned int timeout_sec,
700 unsigned int timeout_nsec)
701 {
702 CMutexGuard guard(m_Mutex);
703 // Having the mutex, we can safely drop "volatile"
704 TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
705 if ( !x_WaitForPredicate(&CBlockingQueue::x_PutSemPred, m_PutSem, guard,
706 timeout_sec, timeout_nsec) ) {
707 NCBI_THROW(CBlockingQueueException, eFull,
708 "CBlockingQueue<>::Put: "
709 "attempt to insert into a full queue");
710 }
711 if (m_RequestCounter == 0) {
712 m_RequestCounter = 0xFFFFFF;
713 NON_CONST_ITERATE (typename TRealQueue, it, q) {
714 CQueueItem& val = const_cast<CQueueItem&>(**it);
715 val.m_Priority = (val.m_Priority & 0xFF000000) | m_RequestCounter--;
716 }
717 }
718 /// Structure of the internal priority:
719 /// The highest byte is a user specified priority;
720 /// the other three bytes are a counter which ensures that
721 /// requests with the same user-specified priority are processed
722 /// in FIFO order
723 TPriority real_priority = (priority << 24) | m_RequestCounter--;
724 TItemHandle handle(new CQueueItem(real_priority, data));
725 q.insert(handle);
726 m_GetSem.TryWait();
727 m_GetSem.Post();
728 if (q.size() == m_MaxSize) {
729 m_PutSem.TryWait();
730 }
731 return handle;
732 }
733
734
735 template <typename TRequest>
WaitForRoom(unsigned int timeout_sec,unsigned int timeout_nsec) const736 void CBlockingQueue<TRequest>::WaitForRoom(unsigned int timeout_sec,
737 unsigned int timeout_nsec) const
738 {
739 // Make sure there's room, but don't actually change any state
740 CMutexGuard guard(m_Mutex);
741 if (x_WaitForPredicate(&CBlockingQueue::x_PutSemPred, m_PutSem, guard,
742 timeout_sec, timeout_nsec)) {
743 m_PutSem.Post(); // signal that the room still exists
744 } else {
745 NCBI_THROW(CBlockingQueueException, eTimedOut,
746 "CBlockingQueue<>::WaitForRoom: timed out");
747 }
748 }
749
750 template <typename TRequest>
WaitForHunger(unsigned int timeout_sec,unsigned int timeout_nsec) const751 void CBlockingQueue<TRequest>::WaitForHunger(unsigned int timeout_sec,
752 unsigned int timeout_nsec) const
753 {
754 CMutexGuard guard(m_Mutex);
755 if (x_WaitForPredicate(&CBlockingQueue::x_HungerSemPred, m_HungerSem, guard,
756 timeout_sec, timeout_nsec)) {
757 m_HungerSem.Post();
758 } else {
759 NCBI_THROW(CBlockingQueueException, eTimedOut,
760 "CBlockingQueue<>::WaitForHunger: timed out");
761 }
762 }
763
764
765 template <typename TRequest>
766 typename CBlockingQueue<TRequest>::TItemHandle
GetHandle(unsigned int timeout_sec,unsigned int timeout_nsec)767 CBlockingQueue<TRequest>::GetHandle(unsigned int timeout_sec,
768 unsigned int timeout_nsec)
769 {
770 CMutexGuard guard(m_Mutex);
771 // Having the mutex, we can safely drop "volatile"
772 TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
773
774 if (q.empty()) {
775 _VERIFY(++m_HungerCnt);
776 m_HungerSem.TryWait();
777 m_HungerSem.Post();
778
779 bool ok = x_WaitForPredicate(&CBlockingQueue::x_GetSemPred, m_GetSem,
780 guard, timeout_sec, timeout_nsec);
781
782 if (--m_HungerCnt <= q.size()) {
783 m_HungerSem.TryWait();
784 }
785
786 if ( !ok ) {
787 NCBI_THROW(CBlockingQueueException, eTimedOut,
788 "CBlockingQueue<>::Get[Handle]: timed out");
789 }
790 }
791
792 TItemHandle handle(*q.begin());
793 q.erase(q.begin());
794 if (m_HungerCnt > q.size()) {
795 m_HungerSem.TryWait();
796 m_HungerSem.Post();
797 }
798 if ( ! q.empty() ) {
799 m_GetSem.TryWait();
800 m_GetSem.Post();
801 }
802
803 // Get the attention of WaitForRoom() or the like; do this
804 // regardless of queue size because derived classes may want
805 // to insert multiple objects atomically.
806 m_PutSem.TryWait();
807 m_PutSem.Post();
808
809 guard.Release(); // avoid possible deadlocks from x_SetStatus
810 handle->x_SetStatus(CQueueItem::eActive);
811 return handle;
812 }
813
814 template <typename TRequest>
Get(unsigned int timeout_sec,unsigned int timeout_nsec)815 TRequest CBlockingQueue<TRequest>::Get(unsigned int timeout_sec,
816 unsigned int timeout_nsec)
817 {
818 TItemHandle handle = GetHandle(timeout_sec, timeout_nsec);
819 handle->MarkAsComplete(); // almost certainly premature, but our last chance
820 return handle->GetRequest();
821 }
822
823
824 template <typename TRequest>
GetSize(void) const825 size_t CBlockingQueue<TRequest>::GetSize(void) const
826 {
827 CMutexGuard guard(m_Mutex);
828 return const_cast<const TRealQueue&>(m_Queue).size();
829 }
830
831
832 template <typename TRequest>
SetUserPriority(TItemHandle handle,TUserPriority priority)833 void CBlockingQueue<TRequest>::SetUserPriority(TItemHandle handle,
834 TUserPriority priority)
835 {
836 if (handle->GetUserPriority() == priority
837 || handle->GetStatus() != CQueueItem::ePending) {
838 return;
839 }
840 CMutexGuard guard(m_Mutex);
841 // Having the mutex, we can safely drop "volatile"
842 TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
843 typename TRealQueue::iterator it = q.find(handle);
844 // These sanity checks protect against race conditions and
845 // accidental use of handles from other queues.
846 if (it != q.end() && *it == handle) {
847 q.erase(it);
848 TPriority counter = handle->m_Priority & 0xFFFFFF;
849 handle->m_Priority = (priority << 24) | counter;
850 q.insert(handle);
851 }
852 }
853
854
855 template <typename TRequest>
Withdraw(TItemHandle handle)856 void CBlockingQueue<TRequest>::Withdraw(TItemHandle handle)
857 {
858 if (handle->GetStatus() != CQueueItem::ePending) {
859 return;
860 }
861 {{
862 CMutexGuard guard(m_Mutex);
863 // Having the mutex, we can safely drop "volatile"
864 TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
865 typename TRealQueue::iterator it = q.find(handle);
866 // These sanity checks protect against race conditions and
867 // accidental use of handles from other queues.
868 if (it != q.end() && *it == handle) {
869 q.erase(it);
870
871 if(q.empty()) {
872 // m_GetSem may be signaled - clear it
873 m_GetSem.TryWait();
874 }
875 } else {
876 return;
877 }
878 }}
879 // run outside the guard to avoid possible deadlocks from x_SetStatus
880 handle->x_SetStatus(CQueueItem::eWithdrawn);
881 }
882
883 template <typename TRequest>
x_WaitForPredicate(TQueuePredicate pred,CSemaphore & sem,CMutexGuard & guard,unsigned int timeout_sec,unsigned int timeout_nsec) const884 bool CBlockingQueue<TRequest>::x_WaitForPredicate(TQueuePredicate pred,
885 CSemaphore& sem,
886 CMutexGuard& guard,
887 unsigned int timeout_sec,
888 unsigned int timeout_nsec)
889 const
890 {
891 const TRealQueue& q = const_cast<const TRealQueue&>(m_Queue);
892 if ( !(this->*pred)(q) ) {
893 #if SIZEOF_INT == SIZEOF_LONG
894 // If long is larger, converting from unsigned int to (signed)
895 // long for CTimeSpan will automatically be safe.
896 unsigned int extra_sec = timeout_nsec / kNanoSecondsPerSecond;
897 timeout_nsec %= kNanoSecondsPerSecond;
898 // Do the comparison this way to avoid overflow.
899 if (timeout_sec >= kMax_Int - extra_sec) {
900 timeout_sec = kMax_Int; // clamp
901 } else {
902 timeout_sec += extra_sec;
903 }
904 #endif
905 // _ASSERT(timeout_nsec <= (unsigned long)kMax_Long);
906 CTimeSpan span(timeout_sec, timeout_nsec);
907 while (span.GetSign() == ePositive && !(this->*pred)(q) ) {
908 CTime start(CTime::eCurrent, CTime::eGmt);
909 // Temporarily release the mutex while waiting, to avoid deadlock.
910 guard.Release();
911 sem.TryWait((unsigned int)span.GetCompleteSeconds(),
912 (unsigned int)span.GetNanoSecondsAfterSecond());
913 guard.Guard(m_Mutex);
914 span -= CurrentTime(CTime::eGmt) - start;
915 }
916 }
917 sem.TryWait();
918 return (this->*pred)(q);
919 }
920
921 /////////////////////////////////////////////////////////////////////////////
922 // CThreadInPool<>::
923 //
924
925 template <typename TRequest>
CountSelf(CAtomicCounter * counter)926 void CThreadInPool<TRequest>::CountSelf(CAtomicCounter* counter)
927 {
928 _ASSERT(m_Counter == NULL);
929 counter->Add(1);
930 m_Counter = counter;
931 }
932
933 template <typename TRequest>
~CThreadInPool()934 CThreadInPool<TRequest>::~CThreadInPool()
935 {
936 if (m_Counter != NULL) {
937 m_Counter->Add(-1);
938 }
939 }
940
941 template <typename TRequest>
CAutoUnregGuard(TThread * thr)942 CThreadInPool<TRequest>::CAutoUnregGuard::CAutoUnregGuard(TThread* thr)
943 : m_Thread(thr)
944 {}
945
946 template <typename TRequest>
~CAutoUnregGuard(void)947 CThreadInPool<TRequest>::CAutoUnregGuard::~CAutoUnregGuard(void)
948 {
949 m_Thread->x_UnregisterThread();
950 }
951
952
953 template <typename TRequest>
x_UnregisterThread(void)954 void CThreadInPool<TRequest>::x_UnregisterThread(void)
955 {
956 if (m_Counter != NULL) {
957 m_Counter->Add(-1);
958 m_Counter = NULL;
959 }
960 m_Pool->UnRegister(*this);
961 }
962
963 template <typename TRequest>
x_HandleOneRequest(bool catch_all)964 void CThreadInPool<TRequest>::x_HandleOneRequest(bool catch_all)
965 {
966 TItemHandle handle;
967 {{
968 CMutexGuard guard(m_Pool->m_Mutex);
969 --m_Pool->m_Delta;
970 }}
971 try {
972 handle.Reset(m_Pool->m_Queue.GetHandle());
973 } catch (CBlockingQueueException& e) {
974 // work around "impossible" timeouts
975 NCBI_REPORT_EXCEPTION_XX(Util_Thread, 1, "Unexpected timeout", e);
976 CMutexGuard guard(m_Pool->m_Mutex);
977 ++m_Pool->m_Delta;
978 return;
979 }
980 if (catch_all) {
981 try {
982 ProcessRequest(handle);
983 } catch (std::exception& e) {
984 handle->MarkAsForciblyCaught();
985 NCBI_REPORT_EXCEPTION_XX(Util_Thread, 2,
986 "Exception from thread in pool: ", e);
987 // throw;
988 } catch (...) {
989 handle->MarkAsForciblyCaught();
990 // silently propagate non-standard exceptions because they're
991 // likely to be CExitThreadException.
992 // ERR_POST_XX(Util_Thread, 3,
993 // "Thread in pool threw non-standard exception.");
994 throw;
995 }
996 }
997 else {
998 ProcessRequest(handle);
999 }
1000 }
1001
1002 template <typename TRequest>
Main(void)1003 void* CThreadInPool<TRequest>::Main(void)
1004 {
1005 _ASSERT(m_Pool);
1006
1007 const string& name = m_Pool->m_ThreadName;
1008
1009 if (!name.empty()) {
1010 SetCurrentThreadName(name);
1011 }
1012
1013 try {
1014 m_Pool->Register(*this);
1015 } catch (CThreadException&) {
1016 ERR_POST(Warning << "New worker thread blocked at the last minute.");
1017 return 0;
1018 }
1019 CAutoUnregGuard guard(this);
1020
1021 Init();
1022 bool catch_all = TParamThreadPoolCatchExceptions::GetDefault();
1023
1024 for (;;) {
1025 x_HandleOneRequest(catch_all);
1026 if (m_RunMode == eRunOnce)
1027 break;
1028 }
1029
1030 return 0;
1031 }
1032
1033
1034 template <typename TRequest>
OnExit(void)1035 void CThreadInPool<TRequest>::OnExit(void)
1036 {
1037 try {
1038 x_OnExit();
1039 } STD_CATCH_ALL_XX(Util_Thread, 6, "x_OnExit")
1040 }
1041
1042 template <typename TRequest>
ProcessRequest(TItemHandle handle)1043 void CThreadInPool<TRequest>::ProcessRequest(TItemHandle handle)
1044 {
1045 TCompletingHandle completer = handle;
1046 ProcessRequest(completer->GetRequest());
1047 }
1048
1049
1050 /////////////////////////////////////////////////////////////////////////////
1051 // CPoolOfThreads<>::
1052 //
1053
1054 template <typename TRequest>
CPoolOfThreads(unsigned int max_threads,unsigned int queue_size,unsigned int spawn_threshold,unsigned int max_urgent_threads,const string & thread_name)1055 CPoolOfThreads<TRequest>::CPoolOfThreads(unsigned int max_threads,
1056 unsigned int queue_size,
1057 unsigned int spawn_threshold,
1058 unsigned int max_urgent_threads,
1059 const string& thread_name)
1060 : m_MaxThreads(max_threads), m_MaxUrgentThreads(max_urgent_threads),
1061 m_Threshold(spawn_threshold), m_Delta(0),
1062 m_Queue(queue_size > 0 ? queue_size : max_threads),
1063 m_QueuingForbidden(queue_size == 0),
1064 m_ThreadName(thread_name)
1065 {
1066 m_ThreadCount.Set(0);
1067 m_UrgentThreadCount.Set(0);
1068 }
1069
1070
1071 template <typename TRequest>
~CPoolOfThreads(void)1072 CPoolOfThreads<TRequest>::~CPoolOfThreads(void)
1073 {
1074 CAtomicCounter::TValue n = m_ThreadCount.Get() + m_UrgentThreadCount.Get();
1075 if (n) {
1076 ERR_POST_XX(Util_Thread, 4,
1077 Warning << "CPoolOfThreads<>::~CPoolOfThreads: "
1078 << n << " thread(s) still active");
1079 }
1080 }
1081
1082 template <typename TRequest>
Spawn(unsigned int num_threads)1083 void CPoolOfThreads<TRequest>::Spawn(unsigned int num_threads)
1084 {
1085 for (unsigned int i = 0; i < num_threads; i++)
1086 {
1087 x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1088 }
1089 }
1090
1091
1092 template <typename TRequest>
1093 inline
1094 typename CPoolOfThreads<TRequest>::TItemHandle
AcceptRequest(const TRequest & req,TUserPriority priority,unsigned int timeout_sec,unsigned int timeout_nsec)1095 CPoolOfThreads<TRequest>::AcceptRequest(const TRequest& req,
1096 TUserPriority priority,
1097 unsigned int timeout_sec,
1098 unsigned int timeout_nsec)
1099 {
1100 return x_AcceptRequest(req, priority, false, timeout_sec, timeout_nsec);
1101 }
1102
1103 template <typename TRequest>
1104 inline
1105 typename CPoolOfThreads<TRequest>::TItemHandle
AcceptUrgentRequest(const TRequest & req,unsigned int timeout_sec,unsigned int timeout_nsec)1106 CPoolOfThreads<TRequest>::AcceptUrgentRequest(const TRequest& req,
1107 unsigned int timeout_sec,
1108 unsigned int timeout_nsec)
1109 {
1110 return x_AcceptRequest(req, 0xFF, true, timeout_sec, timeout_nsec);
1111 }
1112
1113 template <typename TRequest>
1114 inline
HasImmediateRoom(bool urgent) const1115 bool CPoolOfThreads<TRequest>::HasImmediateRoom(bool urgent) const
1116 {
1117 CMutexGuard guard(m_Mutex);
1118
1119 if (m_Queue.IsFull()) {
1120 return false; // temporary blockage
1121 } else if (m_Delta < 0) {
1122 return true;
1123 } else if (m_ThreadCount.Get() < m_MaxThreads) {
1124 return true;
1125 } else if (urgent && m_UrgentThreadCount.Get() < m_MaxUrgentThreads) {
1126 return true;
1127 } else {
1128 try {
1129 m_Queue.WaitForHunger(0);
1130 // This should be redundant with the m_Delta < 0 case, now that
1131 // m_Mutex guards it.
1132 ERR_POST_XX(Util_Thread, 5,
1133 "Possible thread pool bug. delta: "
1134 << const_cast<int&>(m_Delta)
1135 << "; hunger: " << m_Queue.GetHunger());
1136 return true;
1137 } catch (...) {
1138 }
1139 return false;
1140 }
1141 }
1142
1143 template <typename TRequest>
1144 inline
WaitForRoom(unsigned int timeout_sec,unsigned int timeout_nsec)1145 void CPoolOfThreads<TRequest>::WaitForRoom(unsigned int timeout_sec,
1146 unsigned int timeout_nsec)
1147 {
1148 if (HasImmediateRoom()) {
1149 return;
1150 } else if (m_QueuingForbidden) {
1151 m_Queue.WaitForHunger(timeout_sec, timeout_nsec);
1152 } else {
1153 m_Queue.WaitForRoom(timeout_sec, timeout_nsec);
1154 }
1155 }
1156
1157 template <typename TRequest>
1158 inline
1159 typename CPoolOfThreads<TRequest>::TItemHandle
x_AcceptRequest(const TRequest & req,TUserPriority priority,bool urgent,unsigned int timeout_sec,unsigned int timeout_nsec)1160 CPoolOfThreads<TRequest>::x_AcceptRequest(const TRequest& req,
1161 TUserPriority priority,
1162 bool urgent,
1163 unsigned int timeout_sec,
1164 unsigned int timeout_nsec)
1165 {
1166 bool new_thread = false;
1167 TItemHandle handle;
1168 {{
1169 CMutexGuard guard(m_Mutex);
1170 // we reserved 0xFF priority for urgent requests
1171 if( priority == 0xFF && !urgent )
1172 --priority;
1173 if (m_QueuingForbidden && !HasImmediateRoom(urgent) ) {
1174 NCBI_THROW(CBlockingQueueException, eFull,
1175 "CPoolOfThreads<>::x_AcceptRequest: "
1176 "attempt to insert into a full queue");
1177 }
1178 handle = m_Queue.Put(req, priority, timeout_sec, timeout_nsec);
1179 if (++m_Delta >= m_Threshold
1180 && m_ThreadCount.Get() < m_MaxThreads) {
1181 // Add another thread to the pool because they're all busy.
1182 new_thread = true;
1183 } else if (urgent && m_UrgentThreadCount.Get() >= m_MaxUrgentThreads) {
1184 // Prevent from running a new urgent thread if we have reached
1185 // the maximum number of urgent threads
1186 urgent = false;
1187 }
1188 }}
1189
1190 if (new_thread) {
1191 x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1192 } else if (urgent) {
1193 x_RunNewThread(TThread::eRunOnce, &m_UrgentThreadCount);
1194 }
1195
1196 return handle;
1197 }
1198
1199 template <typename TRequest>
1200 inline
x_RunNewThread(ERunMode mode,CAtomicCounter * counter)1201 void CPoolOfThreads<TRequest>::x_RunNewThread(ERunMode mode,
1202 CAtomicCounter* counter)
1203 {
1204 try {
1205 CRef<TThread> thr(NewThread(mode));
1206 thr->CountSelf(counter);
1207 thr->Run();
1208 }
1209 catch (CThreadException& ex) {
1210 ERR_POST_XX(Util_Thread, 13,
1211 Critical << "Ignoring error while starting new thread: "
1212 << ex);
1213 }
1214 }
1215
1216 template <typename TRequest>
1217 inline
SetUserPriority(TItemHandle handle,TUserPriority priority)1218 void CPoolOfThreads<TRequest>::SetUserPriority(TItemHandle handle,
1219 TUserPriority priority)
1220 {
1221 // Maintain segregation between urgent and non-urgent requests
1222 if (handle->GetUserPriority() == 0xFF) {
1223 return;
1224 } else if (priority == 0xFF) {
1225 priority = 0xFE;
1226 }
1227 m_Queue.SetUserPriority(handle, priority);
1228 }
1229
1230 END_NCBI_SCOPE
1231
1232
1233 /* @} */
1234
1235 #endif /* UTIL__THREAD_POOL_OLD__HPP */
1236