1 #ifndef UTIL__THREAD_POOL_OLD__HPP
2 #define UTIL__THREAD_POOL_OLD__HPP
3 
4 /*  $Id: thread_pool_old.hpp 489095 2016-01-08 13:02:41Z ivanov $
5 * ===========================================================================
6 *
7 *                            PUBLIC DOMAIN NOTICE
8 *               National Center for Biotechnology Information
9 *
10 *  This software/database is a "United States Government Work" under the
11 *  terms of the United States Copyright Act.  It was written as part of
12 *  the author's official duties as a United States Government employee and
13 *  thus cannot be copyrighted.  This software/database is freely available
14 *  to the public for use. The National Library of Medicine and the U.S.
15 *  Government have not placed any restriction on its use or reproduction.
16 *
17 *  Although all reasonable efforts have been taken to ensure the accuracy
18 *  and reliability of the software and data, the NLM and the U.S.
19 *  Government do not and cannot warrant the performance or results that
20 *  may be obtained by using this software or data. The NLM and the U.S.
21 *  Government disclaim all warranties, express or implied, including
22 *  warranties of performance, merchantability or fitness for any particular
23 *  purpose.
24 *
25 *  Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Author:  Aaron Ucko
30 *
31 * File Description:
32 *   Pools of generic request-handling threads.
33 *
34 *   TEMPLATES:
35 *      CBlockingQueue<>  -- queue of requests, with efficiently blocking Get()
36 *      CThreadInPool<>   -- abstract request-handling thread
37 *      CPoolOfThreads<>  -- abstract pool of threads sharing a request queue
38 *
39 *   SPECIALIZATIONS:
40 *      CStdRequest       -- abstract request type
41 *      CStdThreadInPool  -- thread handling CStdRequest
42 *      CStdPoolOfThreads -- pool of threads handling CStdRequest
43 */
44 
45 #include <corelib/ncbistd.hpp>
46 #include <corelib/ncbithr.hpp>
47 #include <corelib/ncbitime.hpp>
48 #include <corelib/ncbi_limits.hpp>
49 #include <corelib/ncbi_param.hpp>
50 #include <util/util_exception.hpp>
51 #include <util/error_codes.hpp>
52 
53 #include <set>
54 
55 
56 /** @addtogroup ThreadedPools
57  *
58  * @{
59  */
60 
61 BEGIN_NCBI_SCOPE
62 
63 
64 /////////////////////////////////////////////////////////////////////////////
65 ///
66 ///     CQueueItemBase -- skeleton blocking-queue item, sans actual request
67 
68 class CQueueItemBase : public CObject {
69 public:
70     enum EStatus {
71         ePending,       ///< still in the queue
72         eActive,        ///< extracted but not yet released
73         eComplete,      ///< extracted and released
74         eWithdrawn,     ///< dropped by submitter's request
75         eForciblyCaught ///< let an exception escape
76     };
77 
78     /// Every request has an associated 32-bit priority field, but
79     /// only the top eight bits are under direct user control.  (The
80     /// rest are a counter.)
81     typedef Uint4 TPriority;
82     typedef Uint1 TUserPriority;
83 
CQueueItemBase(TPriority priority)84     CQueueItemBase(TPriority priority)
85         : m_Priority(priority), m_Status(ePending)
86         { }
87 
operator >(const CQueueItemBase & item) const88     bool operator> (const CQueueItemBase& item) const
89         { return m_Priority > item.m_Priority; }
90 
GetPriority(void) const91     const TPriority& GetPriority(void) const     { return m_Priority; }
GetStatus(void) const92     const EStatus&   GetStatus(void) const       { return m_Status; }
GetUserPriority(void) const93     TUserPriority    GetUserPriority(void) const { return TUserPriority(m_Priority >> 24); }
94 
MarkAsComplete(void)95     void MarkAsComplete(void)        { x_SetStatus(eComplete); }
MarkAsForciblyCaught(void)96     void MarkAsForciblyCaught(void)  { x_SetStatus(eForciblyCaught); }
97 
98 protected:
99     TPriority m_Priority;
100     EStatus   m_Status;
101 
x_SetStatus(EStatus new_status)102     virtual void x_SetStatus(EStatus new_status)
103         { m_Status = new_status; }
104 };
105 
106 
107 /////////////////////////////////////////////////////////////////////////////
108 ///
109 ///     CBlockingQueue<>  -- queue of requests, with efficiently blocking Get()
110 
111 template <typename TRequest>
112 class CBlockingQueue
113 {
114 public:
115     typedef CQueueItemBase::TPriority     TPriority;
116     typedef CQueueItemBase::TUserPriority TUserPriority;
117 
118     class CQueueItem;
119     typedef CRef<CQueueItem> TItemHandle;
120 
121     /// It may be desirable to store handles obtained from GetHandle() in
122     /// instances of CCompletingHandle to ensure that they are marked as
123     /// complete when all is said and done, even in the face of exceptions.
124     class CCompletingHandle : public TItemHandle
125     {
126     public:
CCompletingHandle(const TItemHandle & h)127         CCompletingHandle(const TItemHandle& h)
128             : TItemHandle(h)
129             { }
130 
~CCompletingHandle()131         ~CCompletingHandle() {
132             if (this->NotEmpty()) {
133                 this->GetObject().MarkAsComplete();
134             }
135         }
136     };
137 
138     /// Constructor
139     ///
140     /// @param max_size
141     ///   The maximum size of the queue (may not be zero!)
CBlockingQueue(size_t max_size=kMax_UInt)142     CBlockingQueue(size_t max_size = kMax_UInt)
143         : m_GetSem(0,1), m_PutSem(1,1), m_HungerSem(0,1), m_HungerCnt(0),
144           m_MaxSize(min(max_size, size_t(0xFFFFFF))),
145           m_RequestCounter(0xFFFFFF)
146         { _ASSERT(max_size > 0); }
147 
148     /// Put a request into the queue.  If the queue remains full for
149     /// the duration of the (optional) timeout, throw an exception.
150     ///
151     /// @param request
152     ///   Request
153     /// @param priority
154     ///   The priority of the request. The higher the priority
155     ///   the sooner the request will be processed.
156     /// @param timeout_sec
157     ///   Number of whole seconds in timeout
158     /// @param timeout_nsec
159     ///   Number of additional nanoseconds in timeout
160     TItemHandle  Put(const TRequest& request, TUserPriority priority = 0,
161                      unsigned int timeout_sec = 0,
162                      unsigned int timeout_nsec = 0);
163 
164     /// Wait for room in the queue for up to
165     /// timeout_sec + timeout_nsec/1E9 seconds.
166     ///
167     /// @param timeout_sec
168     ///   Number of seconds
169     /// @param timeout_nsec
170     ///   Number of nanoseconds
171     void         WaitForRoom(unsigned int timeout_sec  = kMax_UInt,
172                              unsigned int timeout_nsec = 0) const;
173 
174     /// Wait for the queue to have waiting readers, for up to
175     /// timeout_sec + timeout_nsec/1E9 seconds.
176     ///
177     /// @param timeout_sec
178     ///   Number of seconds
179     /// @param timeout_nsec
180     ///   Number of nanoseconds
181     void         WaitForHunger(unsigned int timeout_sec  = kMax_UInt,
182                                unsigned int timeout_nsec = 0) const;
183 
184     /// Get the first available request from the queue, and return a
185     /// handle to it.
186     /// Blocks politely if empty.
187     /// Waits up to timeout_sec + timeout_nsec/1E9 seconds.
188     ///
189     /// @param timeout_sec
190     ///   Number of seconds
191     /// @param timeout_nsec
192     ///   Number of nanoseconds
193     TItemHandle  GetHandle(unsigned int timeout_sec  = kMax_UInt,
194                            unsigned int timeout_nsec = 0);
195 
196     /// Get the first available request from the queue, and return
197     /// just the request.
198     /// Blocks politely if empty.
199     /// Waits up to timeout_sec + timeout_nsec/1E9 seconds.
200     ///
201     /// @param timeout_sec
202     ///   Number of seconds
203     /// @param timeout_nsec
204     ///   Number of nanoseconds
205     NCBI_DEPRECATED
206     TRequest     Get(unsigned int timeout_sec  = kMax_UInt,
207                      unsigned int timeout_nsec = 0);
208 
209     /// Get the number of requests in the queue
210     size_t       GetSize    (void) const;
211 
212     /// Get the maximun number of requests that can be put into the queue
GetMaxSize(void) const213     size_t       GetMaxSize (void) const { return m_MaxSize; }
214 
215     /// Check if the queue is empty
IsEmpty(void) const216     bool         IsEmpty    (void) const { return GetSize() == 0; }
217 
218     /// Check if the queue is full
IsFull(void) const219     bool         IsFull     (void) const { return GetSize() == GetMaxSize(); }
220 
221     /// Adjust a pending request's priority.
222     void         SetUserPriority(TItemHandle handle, TUserPriority priority);
223 
224     /// Withdraw a pending request from consideration.
225     void         Withdraw(TItemHandle handle);
226 
227     /// Get the number of threads waiting for requests, for debugging
228     /// purposes only.
GetHunger(void) const229     size_t       GetHunger(void) const { return m_HungerCnt; }
230 
231     class CQueueItem : public CQueueItemBase
232     {
233     public:
234         // typedef CBlockingQueue<TRequest> TQueue;
CQueueItem(Uint4 priority,TRequest request)235         CQueueItem(Uint4 priority, TRequest request)
236             : CQueueItemBase(priority), m_Request(request)
237             { }
238 
GetRequest(void) const239         const TRequest& GetRequest(void) const { return m_Request; }
SetRequest(void)240         TRequest&       SetRequest(void)       { return m_Request; }
241         // void SetUserPriority(TUserPriority p);
242         // void Withdraw(void);
243 
244     protected:
245         // Specialized for CRef<CStdRequest> in thread_pool.cpp
x_SetStatus(EStatus new_status)246         void x_SetStatus(EStatus new_status)
247             { CQueueItemBase::x_SetStatus(new_status); }
248 
249     private:
250         friend class CBlockingQueue<TRequest>;
251 
252         // TQueue&   m_Queue;
253         TRequest  m_Request;
254     };
255 
256 protected:
257     struct SItemHandleGreater {
operator ()CBlockingQueue::SItemHandleGreater258         bool operator()(const TItemHandle& i1, const TItemHandle& i2) const
259             { return static_cast<CQueueItemBase>(*i1)
260                     > static_cast<CQueueItemBase>(*i2); }
261     };
262 
263     /// The type of the queue
264     typedef set<TItemHandle, SItemHandleGreater> TRealQueue;
265 
266     // Derived classes should take care to use these members properly.
267     volatile TRealQueue m_Queue;     ///< The queue
268     CSemaphore          m_GetSem;    ///< Raised if the queue contains data
269     mutable CSemaphore  m_PutSem;    ///< Raised if the queue has room
270     mutable CSemaphore  m_HungerSem; ///< Raised if Get[Handle] has to wait
271     mutable CMutex      m_Mutex;     ///< Guards access to queue
272     size_t              m_HungerCnt; ///< Number of threads waiting for data
273 
274 private:
275     size_t              m_MaxSize;        ///< The maximum size of the queue
276     Uint4               m_RequestCounter; ///
277 
278     typedef bool (CBlockingQueue::*TQueuePredicate)(const TRealQueue& q) const;
279 
x_GetSemPred(const TRealQueue & q) const280     bool x_GetSemPred(const TRealQueue& q) const
281         { return !q.empty(); }
x_PutSemPred(const TRealQueue & q) const282     bool x_PutSemPred(const TRealQueue& q) const
283         { return q.size() < m_MaxSize; }
x_HungerSemPred(const TRealQueue & q) const284     bool x_HungerSemPred(const TRealQueue& q) const
285         { return m_HungerCnt > q.size(); }
286 
287     bool x_WaitForPredicate(TQueuePredicate pred, CSemaphore& sem,
288                             CMutexGuard& guard, unsigned int timeout_sec,
289                             unsigned int timeout_nsec) const;
290 
291 private:
292     /// forbidden
293     CBlockingQueue(const CBlockingQueue&);
294     CBlockingQueue& operator=(const CBlockingQueue&);
295 };
296 
297 
298 /////////////////////////////////////////////////////////////////////////////
299 ///
300 /// CThreadInPool<>   -- abstract request-handling thread
301 
302 template <typename TRequest> class CPoolOfThreads;
303 
304 template <typename TRequest>
305 class CThreadInPool : public CThread
306 {
307 public:
308     typedef CPoolOfThreads<TRequest> TPool;
309     typedef typename CBlockingQueue<TRequest>::TItemHandle TItemHandle;
310     typedef typename CBlockingQueue<TRequest>::CCompletingHandle
311         TCompletingHandle;
312 
313     /// Thread run mode
314     enum ERunMode {
315         eNormal,   ///< Process request and stay in the pool
316         eRunOnce   ///< Process request and die
317     };
318 
319     /// Constructor
320     ///
321     /// @param pool
322     ///   A pool where this thead is placed
323     /// @param mode
324     ///   A running mode of this thread
CThreadInPool(TPool * pool,ERunMode mode=eNormal)325     CThreadInPool(TPool* pool, ERunMode mode = eNormal)
326         : m_Pool(pool), m_RunMode(mode), m_Counter(NULL) {}
327 
328     void CountSelf(CAtomicCounter* counter);
329 
330 protected:
331     /// Destructor
332     virtual ~CThreadInPool(void);
333 
334     /// Intit this thread. It is called at beginning of Main()
Init(void)335     virtual void Init(void) {}
336 
337     /// Process a request.
338     /// It is called from Main() for each request this thread handles
339     ///
340     /// @param
341     ///   A request for processing
342     virtual void ProcessRequest(TItemHandle handle);
343 
344     /// Older interface (still delegated to by default)
345     virtual void ProcessRequest(const TRequest& req) = 0;
346 
347     /// Clean up. It is called by OnExit()
x_OnExit(void)348     virtual void x_OnExit(void) {}
349 
350     /// Get run mode
GetRunMode(void) const351     ERunMode GetRunMode(void) const { return m_RunMode; }
352 
353 private:
354     // to prevent overriding; inherited from CThread
355     virtual void* Main(void);
356     virtual void OnExit(void);
357 
358     void x_HandleOneRequest(bool catch_all);
359     void x_UnregisterThread(void);
360 
361     class CAutoUnregGuard
362     {
363     public:
364         typedef CThreadInPool<TRequest> TThread;
365         CAutoUnregGuard(TThread* thr);
366         ~CAutoUnregGuard(void);
367 
368     private:
369         TThread* m_Thread;
370     };
371 
372     friend class CAutoUnregGuard;
373 
374 
375     TPool*          m_Pool;     ///< The pool that holds this thread
376     ERunMode        m_RunMode;  ///< How long to keep running
377     CAtomicCounter* m_Counter;
378 };
379 
380 
381 /////////////////////////////////////////////////////////////////////////////
382 ///
383 ///     CPoolOfThreads<>  -- abstract pool of threads sharing a request queue
384 
385 template <typename TRequest>
386 class CPoolOfThreads
387 {
388 public:
389     typedef CThreadInPool<TRequest> TThread;
390     typedef typename TThread::ERunMode ERunMode;
391 
392     typedef CBlockingQueue<TRequest> TQueue;
393     typedef typename TQueue::TUserPriority TUserPriority;
394     typedef typename TQueue::TItemHandle   TItemHandle;
395 
396     /// Constructor
397     ///
398     /// @param max_threads
399     ///   The maximum number of threads that this pool can run
400     /// @param queue_size
401     ///   The maximum number of requests in the queue
402     /// @param spawn_threashold
403     ///   The number of requests in the queue after which
404     ///   a new thread is started
405     /// @param max_urgent_threads
406     ///   The maximum number of urgent threads running simultaneously
407     CPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
408                    unsigned int spawn_threshold = 1,
409                    unsigned int max_urgent_threads = kMax_UInt,
410                    const string& thread_name = kEmptyStr);
411 
412     /// Destructor
413     virtual ~CPoolOfThreads(void);
414 
415     /// Start processing threads
416     ///
417     /// @param num_threads
418     ///    The number of threads to start
419     void Spawn(unsigned int num_threads);
420 
421     /// Put a request in the queue with a given priority
422     ///
423     /// @param request
424     ///   A request
425     /// @param priority
426     ///   The priority of the request. The higher the priority
427     ///   the sooner the request will be processed.
428     TItemHandle AcceptRequest(const TRequest& request,
429                               TUserPriority priority = 0,
430                               unsigned int timeout_sec = 0,
431                               unsigned int timeout_nsec = 0);
432 
433     /// Puts a request in the queue with the highest priority
434     /// It will run a new thread even if the maximum of allowed threads
435     /// has been already reached
436     ///
437     /// @param request
438     ///   A request
439     TItemHandle AcceptUrgentRequest(const TRequest& request,
440                                     unsigned int timeout_sec = 0,
441                                     unsigned int timeout_nsec = 0);
442 
443     /// Wait for the room in the queue up to
444     /// timeout_sec + timeout_nsec/1E9 seconds.
445     ///
446     /// @param timeout_sec
447     ///   Number of seconds
448     /// @param timeout_nsec
449     ///   Number of nanoseconds
450     void WaitForRoom(unsigned int timeout_sec  = kMax_UInt,
451                      unsigned int timeout_nsec = 0);
452 
453     /// Check if the queue is full
IsFull(void) const454     bool IsFull(void) const { return m_Queue.IsFull(); }
455 
456     /// Check if the queue is empty
IsEmpty(void) const457     bool IsEmpty(void) const { return m_Queue.IsEmpty(); }
458 
459     /// Check whether a new request could be immediately processed
460     ///
461     /// @param urgent
462     ///  Whether the request would be urgent.
463     bool HasImmediateRoom(bool urgent = false) const;
464 
465     /// Adjust a pending request's priority.
466     void         SetUserPriority(TItemHandle handle, TUserPriority priority);
467 
468     /// Withdraw a pending request from consideration.
Withdraw(TItemHandle handle)469     void         Withdraw(TItemHandle handle)
470         { m_Queue.Withdraw(handle); }
471 
472     /// Get the number of requests in the queue
GetQueueSize(void) const473     size_t       GetQueueSize(void) const
474         { return m_Queue.GetSize(); }
475 
476 
477 protected:
478 
479     /// Create a new thread
480     ///
481     /// @param mode
482     ///   How long the thread should stay around
483     virtual TThread* NewThread(ERunMode mode) = 0;
484 
485     /// Register a thread. It is called by TThread::Main.
486     /// It should detach a thread if not tracking
487     ///
488     /// @param thread
489     ///   A thread to register
Register(TThread & thread)490     virtual void Register(TThread& thread) { thread.Detach(); }
491 
492     /// Unregister a thread
493     ///
494     /// @param thread
495     ///   A thread to unregister
UnRegister(TThread &)496     virtual void UnRegister(TThread&) {}
497 
498 
499     typedef CAtomicCounter::TValue TACValue;
500 
501     /// The maximum number of threads the pool can hold
502     volatile TACValue        m_MaxThreads;
503     /// The maximum number of urgent threads running simultaneously
504     volatile TACValue        m_MaxUrgentThreads;
505     int                      m_Threshold; ///< for delta
506     /// The current number of threads in the pool
507     CAtomicCounter           m_ThreadCount;
508     /// The current number of urgent threads running now
509     CAtomicCounter           m_UrgentThreadCount;
510     /// The difference between the number of unfinished requests and
511     /// the total number of threads in the pool.
512     volatile int             m_Delta;
513     /// The guard for m_MaxThreads, m_MaxUrgentThreads, and m_Delta.
514     mutable CMutex           m_Mutex;
515     /// The request queue
516     TQueue                   m_Queue;
517     bool                     m_QueuingForbidden;
518 
519     const string             m_ThreadName;
520 
521 private:
522     friend class CThreadInPool<TRequest>;
523     TItemHandle x_AcceptRequest(const TRequest& req,
524                                 TUserPriority priority,
525                                 bool urgent,
526                                 unsigned int timeout_sec = 0,
527                                 unsigned int timeout_nsec = 0);
528 
529     void x_RunNewThread(ERunMode mode, CAtomicCounter* counter);
530 };
531 
532 /////////////////////////////////////////////////////////////////////////////
533 //
534 //  SPECIALIZATIONS:
535 //
536 
537 /////////////////////////////////////////////////////////////////////////////
538 //
539 //     CStdRequest       -- abstract request type
540 
541 class CStdRequest : public CObject
542 {
543 public:
544     ///Destructor
~CStdRequest(void)545     virtual ~CStdRequest(void) {}
546 
547     /// Do the actual job
548     /// Called by whichever thread handles this request.
549     virtual void Process(void) = 0;
550 
551     typedef CQueueItemBase::EStatus EStatus;
552 
553     /// Callback for status changes
OnStatusChange(EStatus,EStatus)554     virtual void OnStatusChange(EStatus /* old */, EStatus /* new */) {}
555 };
556 
557 
558 EMPTY_TEMPLATE
559 inline
x_SetStatus(EStatus new_status)560 void CBlockingQueue<CRef<CStdRequest> >::CQueueItem::x_SetStatus
561 (EStatus new_status)
562 {
563     EStatus old_status = GetStatus();
564     CQueueItemBase::x_SetStatus(new_status);
565     m_Request->OnStatusChange(old_status, new_status);
566 }
567 
568 
569 
570 /////////////////////////////////////////////////////////////////////////////
571 //
572 //     CStdThreadInPool  -- thread handling CStdRequest
573 
574 class NCBI_XUTIL_EXPORT CStdThreadInPool
575     : public CThreadInPool< CRef< CStdRequest > >
576 {
577 public:
578     typedef CThreadInPool< CRef< CStdRequest > > TParent;
579 
580     /// Constructor
581     ///
582     /// @param pool
583     ///   A pool where this thead is placed
584     /// @param mode
585     ///   A running mode of this thread
CStdThreadInPool(TPool * pool,ERunMode mode=eNormal)586     CStdThreadInPool(TPool* pool, ERunMode mode = eNormal)
587         : TParent(pool, mode) {}
588 
589 protected:
590     /// Process a request.
591     ///
592     /// @param
593     ///   A request for processing
ProcessRequest(const CRef<CStdRequest> & req)594     virtual void ProcessRequest(const CRef<CStdRequest>& req)
595     { const_cast<CStdRequest&>(*req).Process(); }
596 
597     // Avoid shadowing the handle-based version.
ProcessRequest(TItemHandle handle)598     virtual void ProcessRequest(TItemHandle handle)
599     { TParent::ProcessRequest(handle); }
600 };
601 
602 /////////////////////////////////////////////////////////////////////////////
603 //
604 //     CStdPoolOfThreads -- pool of threads handling CStdRequest
605 
606 class NCBI_XUTIL_EXPORT CStdPoolOfThreads
607     : public CPoolOfThreads< CRef< CStdRequest > >
608 {
609 public:
610     typedef CPoolOfThreads< CRef< CStdRequest > > TParent;
611 
612     /// Constructor
613     ///
614     /// @param max_threads
615     ///   The maximum number of threads that this pool can run
616     /// @param queue_size
617     ///   The maximum number of requests in the queue
618     /// @param spawn_threshold
619     ///   The number of requests in the queue after which
620     ///   a new thread is started
621     /// @param max_urgent_threads
622     ///   The maximum number of urgent threads running simultaneously
CStdPoolOfThreads(unsigned int max_threads,unsigned int queue_size,unsigned int spawn_threshold=1,unsigned int max_urgent_threads=kMax_UInt,const string & thread_name=kEmptyStr)623     CStdPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
624                       unsigned int spawn_threshold = 1,
625                       unsigned int max_urgent_threads = kMax_UInt,
626                       const string& thread_name = kEmptyStr)
627         : TParent(max_threads, queue_size, spawn_threshold, max_urgent_threads,
628                 thread_name)
629         {}
630 
631     virtual ~CStdPoolOfThreads();
632 
633     enum EKillFlags {
634         fKill_Wait   = 0x1, ///< Wait for all threads in the pool to finish.
635         fKill_Reopen = 0x2  ///< Allow a fresh batch of worker threads.
636     };
637     typedef int TKillFlags; ///< binary OR of EKillFlags
638 
639     /// Causes all threads in the pool to exit cleanly after finishing
640     /// all pending requests, optionally waiting for them to die.
641     ///
642     /// @param flags
643     ///    Governs optional behavior
644     virtual void KillAllThreads(TKillFlags flags);
645 
646     /// Causes all threads in the pool to exit cleanly after finishing
647     /// all pending requests, optionally waiting for them to die.
648     ///
649     /// @param wait
650     ///    If true will wait until all thread in the pool finish their job
KillAllThreads(bool wait)651     virtual void KillAllThreads(bool wait)
652         { KillAllThreads(wait ? (fKill_Wait | fKill_Reopen) : fKill_Reopen); }
653 
654     /// Register a thread.
655     ///
656     /// @param thread
657     ///   A thread to register
658     virtual void Register(TThread& thread);
659 
660     /// Unregister a thread
661     ///
662     /// @param thread
663     ///   A thread to unregister
664     virtual void UnRegister(TThread& thread);
665 
666 protected:
667     /// Create a new thread
668     ///
669     /// @param mode
670     ///   A thread's running mode
NewThread(TThread::ERunMode mode)671     virtual TThread* NewThread(TThread::ERunMode mode)
672         { return new CStdThreadInPool(this, mode); }
673 
674 private:
675     typedef list<CRef<TThread> > TThreads;
676     TThreads                     m_Threads;
677 };
678 
679 
680 NCBI_PARAM_DECL(bool, ThreadPool, Catch_Unhandled_Exceptions);
681 typedef NCBI_PARAM_TYPE(ThreadPool, Catch_Unhandled_Exceptions) TParamThreadPoolCatchExceptions;
682 
683 
684 
685 /////////////////////////////////////////////////////////////////////////////
686 
687 /////////////////////////////////////////////////////////////////////////////
688 //  IMPLEMENTATION of INLINE functions
689 /////////////////////////////////////////////////////////////////////////////
690 
691 
692 /////////////////////////////////////////////////////////////////////////////
693 //   CBlockingQueue<>::
694 //
695 
696 template <typename TRequest>
697 typename CBlockingQueue<TRequest>::TItemHandle
Put(const TRequest & data,TUserPriority priority,unsigned int timeout_sec,unsigned int timeout_nsec)698 CBlockingQueue<TRequest>::Put(const TRequest& data, TUserPriority priority,
699                               unsigned int timeout_sec,
700                               unsigned int timeout_nsec)
701 {
702     CMutexGuard guard(m_Mutex);
703     // Having the mutex, we can safely drop "volatile"
704     TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
705     if ( !x_WaitForPredicate(&CBlockingQueue::x_PutSemPred, m_PutSem, guard,
706                              timeout_sec, timeout_nsec) ) {
707         NCBI_THROW(CBlockingQueueException, eFull,
708                    "CBlockingQueue<>::Put: "
709                    "attempt to insert into a full queue");
710     }
711     if (m_RequestCounter == 0) {
712         m_RequestCounter = 0xFFFFFF;
713         NON_CONST_ITERATE (typename TRealQueue, it, q) {
714             CQueueItem& val = const_cast<CQueueItem&>(**it);
715             val.m_Priority = (val.m_Priority & 0xFF000000) | m_RequestCounter--;
716         }
717     }
718     /// Structure of the internal priority:
719     /// The highest byte is a user specified priority;
720     /// the other three bytes are a counter which ensures that
721     /// requests with the same user-specified priority are processed
722     /// in FIFO order
723     TPriority real_priority = (priority << 24) | m_RequestCounter--;
724     TItemHandle handle(new CQueueItem(real_priority, data));
725     q.insert(handle);
726     m_GetSem.TryWait();
727     m_GetSem.Post();
728     if (q.size() == m_MaxSize) {
729         m_PutSem.TryWait();
730     }
731     return handle;
732 }
733 
734 
735 template <typename TRequest>
WaitForRoom(unsigned int timeout_sec,unsigned int timeout_nsec) const736 void CBlockingQueue<TRequest>::WaitForRoom(unsigned int timeout_sec,
737                                            unsigned int timeout_nsec) const
738 {
739     // Make sure there's room, but don't actually change any state
740     CMutexGuard guard(m_Mutex);
741     if (x_WaitForPredicate(&CBlockingQueue::x_PutSemPred, m_PutSem, guard,
742                            timeout_sec, timeout_nsec)) {
743         m_PutSem.Post(); // signal that the room still exists
744     } else {
745         NCBI_THROW(CBlockingQueueException, eTimedOut,
746                    "CBlockingQueue<>::WaitForRoom: timed out");
747     }
748 }
749 
750 template <typename TRequest>
WaitForHunger(unsigned int timeout_sec,unsigned int timeout_nsec) const751 void CBlockingQueue<TRequest>::WaitForHunger(unsigned int timeout_sec,
752                                              unsigned int timeout_nsec) const
753 {
754     CMutexGuard guard(m_Mutex);
755     if (x_WaitForPredicate(&CBlockingQueue::x_HungerSemPred, m_HungerSem, guard,
756                            timeout_sec, timeout_nsec)) {
757         m_HungerSem.Post();
758     } else {
759         NCBI_THROW(CBlockingQueueException, eTimedOut,
760                    "CBlockingQueue<>::WaitForHunger: timed out");
761     }
762 }
763 
764 
765 template <typename TRequest>
766 typename CBlockingQueue<TRequest>::TItemHandle
GetHandle(unsigned int timeout_sec,unsigned int timeout_nsec)767 CBlockingQueue<TRequest>::GetHandle(unsigned int timeout_sec,
768                                     unsigned int timeout_nsec)
769 {
770     CMutexGuard guard(m_Mutex);
771     // Having the mutex, we can safely drop "volatile"
772     TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
773 
774     if (q.empty()) {
775         _VERIFY(++m_HungerCnt);
776         m_HungerSem.TryWait();
777         m_HungerSem.Post();
778 
779         bool ok = x_WaitForPredicate(&CBlockingQueue::x_GetSemPred, m_GetSem,
780                                      guard, timeout_sec, timeout_nsec);
781 
782         if (--m_HungerCnt <= q.size()) {
783             m_HungerSem.TryWait();
784         }
785 
786         if ( !ok ) {
787             NCBI_THROW(CBlockingQueueException, eTimedOut,
788                        "CBlockingQueue<>::Get[Handle]: timed out");
789         }
790     }
791 
792     TItemHandle handle(*q.begin());
793     q.erase(q.begin());
794     if (m_HungerCnt > q.size()) {
795         m_HungerSem.TryWait();
796         m_HungerSem.Post();
797     }
798     if ( ! q.empty() ) {
799         m_GetSem.TryWait();
800         m_GetSem.Post();
801     }
802 
803     // Get the attention of WaitForRoom() or the like; do this
804     // regardless of queue size because derived classes may want
805     // to insert multiple objects atomically.
806     m_PutSem.TryWait();
807     m_PutSem.Post();
808 
809     guard.Release(); // avoid possible deadlocks from x_SetStatus
810     handle->x_SetStatus(CQueueItem::eActive);
811     return handle;
812 }
813 
814 template <typename TRequest>
Get(unsigned int timeout_sec,unsigned int timeout_nsec)815 TRequest CBlockingQueue<TRequest>::Get(unsigned int timeout_sec,
816                                        unsigned int timeout_nsec)
817 {
818     TItemHandle handle = GetHandle(timeout_sec, timeout_nsec);
819     handle->MarkAsComplete(); // almost certainly premature, but our last chance
820     return handle->GetRequest();
821 }
822 
823 
824 template <typename TRequest>
GetSize(void) const825 size_t CBlockingQueue<TRequest>::GetSize(void) const
826 {
827     CMutexGuard guard(m_Mutex);
828     return const_cast<const TRealQueue&>(m_Queue).size();
829 }
830 
831 
832 template <typename TRequest>
SetUserPriority(TItemHandle handle,TUserPriority priority)833 void CBlockingQueue<TRequest>::SetUserPriority(TItemHandle handle,
834                                                TUserPriority priority)
835 {
836     if (handle->GetUserPriority() == priority
837         ||  handle->GetStatus() != CQueueItem::ePending) {
838         return;
839     }
840     CMutexGuard guard(m_Mutex);
841     // Having the mutex, we can safely drop "volatile"
842     TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
843     typename TRealQueue::iterator it = q.find(handle);
844     // These sanity checks protect against race conditions and
845     // accidental use of handles from other queues.
846     if (it != q.end()  &&  *it == handle) {
847         q.erase(it);
848         TPriority counter = handle->m_Priority & 0xFFFFFF;
849         handle->m_Priority = (priority << 24) | counter;
850         q.insert(handle);
851     }
852 }
853 
854 
855 template <typename TRequest>
Withdraw(TItemHandle handle)856 void CBlockingQueue<TRequest>::Withdraw(TItemHandle handle)
857 {
858     if (handle->GetStatus() != CQueueItem::ePending) {
859         return;
860     }
861     {{
862         CMutexGuard guard(m_Mutex);
863         // Having the mutex, we can safely drop "volatile"
864         TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
865         typename TRealQueue::iterator it = q.find(handle);
866         // These sanity checks protect against race conditions and
867         // accidental use of handles from other queues.
868         if (it != q.end()  &&  *it == handle) {
869             q.erase(it);
870 
871             if(q.empty())   {
872                 // m_GetSem may be signaled - clear it
873                 m_GetSem.TryWait();
874             }
875         } else {
876             return;
877         }
878     }}
879     // run outside the guard to avoid possible deadlocks from x_SetStatus
880     handle->x_SetStatus(CQueueItem::eWithdrawn);
881 }
882 
883 template <typename TRequest>
x_WaitForPredicate(TQueuePredicate pred,CSemaphore & sem,CMutexGuard & guard,unsigned int timeout_sec,unsigned int timeout_nsec) const884 bool CBlockingQueue<TRequest>::x_WaitForPredicate(TQueuePredicate pred,
885                                                   CSemaphore& sem,
886                                                   CMutexGuard& guard,
887                                                   unsigned int timeout_sec,
888                                                   unsigned int timeout_nsec)
889     const
890 {
891     const TRealQueue& q = const_cast<const TRealQueue&>(m_Queue);
892     if ( !(this->*pred)(q) ) {
893 #if SIZEOF_INT == SIZEOF_LONG
894         // If long is larger, converting from unsigned int to (signed)
895         // long for CTimeSpan will automatically be safe.
896         unsigned int extra_sec = timeout_nsec / kNanoSecondsPerSecond;
897         timeout_nsec %= kNanoSecondsPerSecond;
898         // Do the comparison this way to avoid overflow.
899         if (timeout_sec >= kMax_Int - extra_sec) {
900             timeout_sec = kMax_Int; // clamp
901         } else {
902             timeout_sec += extra_sec;
903         }
904 #endif
905         // _ASSERT(timeout_nsec <= (unsigned long)kMax_Long);
906         CTimeSpan span(timeout_sec, timeout_nsec);
907         while (span.GetSign() == ePositive  &&  !(this->*pred)(q) ) {
908             CTime start(CTime::eCurrent, CTime::eGmt);
909             // Temporarily release the mutex while waiting, to avoid deadlock.
910             guard.Release();
911             sem.TryWait((unsigned int)span.GetCompleteSeconds(),
912                         (unsigned int)span.GetNanoSecondsAfterSecond());
913             guard.Guard(m_Mutex);
914             span -= CurrentTime(CTime::eGmt) - start;
915         }
916     }
917     sem.TryWait();
918     return (this->*pred)(q);
919 }
920 
921 /////////////////////////////////////////////////////////////////////////////
922 //   CThreadInPool<>::
923 //
924 
925 template <typename TRequest>
CountSelf(CAtomicCounter * counter)926 void CThreadInPool<TRequest>::CountSelf(CAtomicCounter* counter)
927 {
928     _ASSERT(m_Counter == NULL);
929     counter->Add(1);
930     m_Counter = counter;
931 }
932 
933 template <typename TRequest>
~CThreadInPool()934 CThreadInPool<TRequest>::~CThreadInPool()
935 {
936     if (m_Counter != NULL) {
937         m_Counter->Add(-1);
938     }
939 }
940 
941 template <typename TRequest>
CAutoUnregGuard(TThread * thr)942 CThreadInPool<TRequest>::CAutoUnregGuard::CAutoUnregGuard(TThread* thr)
943     : m_Thread(thr)
944 {}
945 
946 template <typename TRequest>
~CAutoUnregGuard(void)947 CThreadInPool<TRequest>::CAutoUnregGuard::~CAutoUnregGuard(void)
948 {
949     m_Thread->x_UnregisterThread();
950 }
951 
952 
953 template <typename TRequest>
x_UnregisterThread(void)954 void CThreadInPool<TRequest>::x_UnregisterThread(void)
955 {
956     if (m_Counter != NULL) {
957         m_Counter->Add(-1);
958         m_Counter = NULL;
959     }
960     m_Pool->UnRegister(*this);
961 }
962 
963 template <typename TRequest>
x_HandleOneRequest(bool catch_all)964 void CThreadInPool<TRequest>::x_HandleOneRequest(bool catch_all)
965 {
966     TItemHandle handle;
967     {{
968         CMutexGuard guard(m_Pool->m_Mutex);
969         --m_Pool->m_Delta;
970     }}
971     try {
972         handle.Reset(m_Pool->m_Queue.GetHandle());
973     } catch (CBlockingQueueException& e) {
974         // work around "impossible" timeouts
975         NCBI_REPORT_EXCEPTION_XX(Util_Thread, 1, "Unexpected timeout", e);
976         CMutexGuard guard(m_Pool->m_Mutex);
977         ++m_Pool->m_Delta;
978         return;
979     }
980     if (catch_all) {
981         try {
982             ProcessRequest(handle);
983         } catch (std::exception& e) {
984             handle->MarkAsForciblyCaught();
985             NCBI_REPORT_EXCEPTION_XX(Util_Thread, 2,
986                                      "Exception from thread in pool: ", e);
987             // throw;
988         } catch (...) {
989             handle->MarkAsForciblyCaught();
990             // silently propagate non-standard exceptions because they're
991             // likely to be CExitThreadException.
992             // ERR_POST_XX(Util_Thread, 3,
993             //             "Thread in pool threw non-standard exception.");
994             throw;
995         }
996     }
997     else {
998         ProcessRequest(handle);
999     }
1000 }
1001 
1002 template <typename TRequest>
Main(void)1003 void* CThreadInPool<TRequest>::Main(void)
1004 {
1005     _ASSERT(m_Pool);
1006 
1007     const string& name = m_Pool->m_ThreadName;
1008 
1009     if (!name.empty()) {
1010         SetCurrentThreadName(name);
1011     }
1012 
1013     try {
1014         m_Pool->Register(*this);
1015     } catch (CThreadException&) {
1016         ERR_POST(Warning << "New worker thread blocked at the last minute.");
1017         return 0;
1018     }
1019     CAutoUnregGuard guard(this);
1020 
1021     Init();
1022     bool catch_all = TParamThreadPoolCatchExceptions::GetDefault();
1023 
1024     for (;;) {
1025         x_HandleOneRequest(catch_all);
1026         if (m_RunMode == eRunOnce)
1027             break;
1028     }
1029 
1030     return 0;
1031 }
1032 
1033 
1034 template <typename TRequest>
OnExit(void)1035 void CThreadInPool<TRequest>::OnExit(void)
1036 {
1037     try {
1038         x_OnExit();
1039     } STD_CATCH_ALL_XX(Util_Thread, 6, "x_OnExit")
1040 }
1041 
1042 template <typename TRequest>
ProcessRequest(TItemHandle handle)1043 void CThreadInPool<TRequest>::ProcessRequest(TItemHandle handle)
1044 {
1045     TCompletingHandle completer = handle;
1046     ProcessRequest(completer->GetRequest());
1047 }
1048 
1049 
1050 /////////////////////////////////////////////////////////////////////////////
1051 //   CPoolOfThreads<>::
1052 //
1053 
1054 template <typename TRequest>
CPoolOfThreads(unsigned int max_threads,unsigned int queue_size,unsigned int spawn_threshold,unsigned int max_urgent_threads,const string & thread_name)1055 CPoolOfThreads<TRequest>::CPoolOfThreads(unsigned int max_threads,
1056                                          unsigned int queue_size,
1057                                          unsigned int spawn_threshold,
1058                                          unsigned int max_urgent_threads,
1059                                          const string& thread_name)
1060     : m_MaxThreads(max_threads), m_MaxUrgentThreads(max_urgent_threads),
1061       m_Threshold(spawn_threshold), m_Delta(0),
1062       m_Queue(queue_size > 0 ? queue_size : max_threads),
1063       m_QueuingForbidden(queue_size == 0),
1064       m_ThreadName(thread_name)
1065 {
1066     m_ThreadCount.Set(0);
1067     m_UrgentThreadCount.Set(0);
1068 }
1069 
1070 
1071 template <typename TRequest>
~CPoolOfThreads(void)1072 CPoolOfThreads<TRequest>::~CPoolOfThreads(void)
1073 {
1074     CAtomicCounter::TValue n = m_ThreadCount.Get() + m_UrgentThreadCount.Get();
1075     if (n) {
1076         ERR_POST_XX(Util_Thread, 4,
1077                     Warning << "CPoolOfThreads<>::~CPoolOfThreads: "
1078                             << n << " thread(s) still active");
1079     }
1080 }
1081 
1082 template <typename TRequest>
Spawn(unsigned int num_threads)1083 void CPoolOfThreads<TRequest>::Spawn(unsigned int num_threads)
1084 {
1085     for (unsigned int i = 0; i < num_threads; i++)
1086     {
1087         x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1088     }
1089 }
1090 
1091 
1092 template <typename TRequest>
1093 inline
1094 typename CPoolOfThreads<TRequest>::TItemHandle
AcceptRequest(const TRequest & req,TUserPriority priority,unsigned int timeout_sec,unsigned int timeout_nsec)1095 CPoolOfThreads<TRequest>::AcceptRequest(const TRequest& req,
1096                                         TUserPriority priority,
1097                                         unsigned int timeout_sec,
1098                                         unsigned int timeout_nsec)
1099 {
1100     return x_AcceptRequest(req, priority, false, timeout_sec, timeout_nsec);
1101 }
1102 
1103 template <typename TRequest>
1104 inline
1105 typename CPoolOfThreads<TRequest>::TItemHandle
AcceptUrgentRequest(const TRequest & req,unsigned int timeout_sec,unsigned int timeout_nsec)1106 CPoolOfThreads<TRequest>::AcceptUrgentRequest(const TRequest& req,
1107                                               unsigned int timeout_sec,
1108                                               unsigned int timeout_nsec)
1109 {
1110     return x_AcceptRequest(req, 0xFF, true, timeout_sec, timeout_nsec);
1111 }
1112 
1113 template <typename TRequest>
1114 inline
HasImmediateRoom(bool urgent) const1115 bool CPoolOfThreads<TRequest>::HasImmediateRoom(bool urgent) const
1116 {
1117     CMutexGuard guard(m_Mutex);
1118 
1119     if (m_Queue.IsFull()) {
1120         return false; // temporary blockage
1121     } else if (m_Delta < 0) {
1122         return true;
1123     } else if (m_ThreadCount.Get() < m_MaxThreads) {
1124         return true;
1125     } else if (urgent  &&  m_UrgentThreadCount.Get() < m_MaxUrgentThreads) {
1126         return true;
1127     } else {
1128         try {
1129             m_Queue.WaitForHunger(0);
1130             // This should be redundant with the m_Delta < 0 case, now that
1131             // m_Mutex guards it.
1132             ERR_POST_XX(Util_Thread, 5,
1133                         "Possible thread pool bug.  delta: "
1134                           << const_cast<int&>(m_Delta)
1135                           << "; hunger: " << m_Queue.GetHunger());
1136             return true;
1137         } catch (...) {
1138         }
1139         return false;
1140     }
1141 }
1142 
1143 template <typename TRequest>
1144 inline
WaitForRoom(unsigned int timeout_sec,unsigned int timeout_nsec)1145 void CPoolOfThreads<TRequest>::WaitForRoom(unsigned int timeout_sec,
1146                                            unsigned int timeout_nsec)
1147 {
1148     if (HasImmediateRoom()) {
1149         return;
1150     } else if (m_QueuingForbidden) {
1151         m_Queue.WaitForHunger(timeout_sec, timeout_nsec);
1152     } else {
1153         m_Queue.WaitForRoom(timeout_sec, timeout_nsec);
1154     }
1155 }
1156 
1157 template <typename TRequest>
1158 inline
1159 typename CPoolOfThreads<TRequest>::TItemHandle
x_AcceptRequest(const TRequest & req,TUserPriority priority,bool urgent,unsigned int timeout_sec,unsigned int timeout_nsec)1160 CPoolOfThreads<TRequest>::x_AcceptRequest(const TRequest& req,
1161                                           TUserPriority priority,
1162                                           bool urgent,
1163                                           unsigned int timeout_sec,
1164                                           unsigned int timeout_nsec)
1165 {
1166     bool new_thread = false;
1167     TItemHandle handle;
1168     {{
1169         CMutexGuard guard(m_Mutex);
1170         // we reserved 0xFF priority for urgent requests
1171         if( priority == 0xFF && !urgent )
1172             --priority;
1173         if (m_QueuingForbidden  &&  !HasImmediateRoom(urgent) ) {
1174             NCBI_THROW(CBlockingQueueException, eFull,
1175                        "CPoolOfThreads<>::x_AcceptRequest: "
1176                        "attempt to insert into a full queue");
1177         }
1178         handle = m_Queue.Put(req, priority, timeout_sec, timeout_nsec);
1179         if (++m_Delta >= m_Threshold
1180             &&  m_ThreadCount.Get() < m_MaxThreads) {
1181             // Add another thread to the pool because they're all busy.
1182             new_thread = true;
1183         } else if (urgent && m_UrgentThreadCount.Get() >= m_MaxUrgentThreads) {
1184             // Prevent from running a new urgent thread if we have reached
1185             // the maximum number of urgent threads
1186             urgent = false;
1187         }
1188     }}
1189 
1190     if (new_thread) {
1191         x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1192     } else if (urgent) {
1193         x_RunNewThread(TThread::eRunOnce, &m_UrgentThreadCount);
1194     }
1195 
1196     return handle;
1197 }
1198 
1199 template <typename TRequest>
1200 inline
x_RunNewThread(ERunMode mode,CAtomicCounter * counter)1201 void CPoolOfThreads<TRequest>::x_RunNewThread(ERunMode mode,
1202                                               CAtomicCounter* counter)
1203 {
1204     try {
1205         CRef<TThread> thr(NewThread(mode));
1206         thr->CountSelf(counter);
1207         thr->Run();
1208     }
1209     catch (CThreadException& ex) {
1210         ERR_POST_XX(Util_Thread, 13,
1211                     Critical << "Ignoring error while starting new thread: "
1212                     << ex);
1213     }
1214 }
1215 
1216 template <typename TRequest>
1217 inline
SetUserPriority(TItemHandle handle,TUserPriority priority)1218 void CPoolOfThreads<TRequest>::SetUserPriority(TItemHandle handle,
1219                                                TUserPriority priority)
1220 {
1221     // Maintain segregation between urgent and non-urgent requests
1222     if (handle->GetUserPriority() == 0xFF) {
1223         return;
1224     } else if (priority == 0xFF) {
1225         priority = 0xFE;
1226     }
1227     m_Queue.SetUserPriority(handle, priority);
1228 }
1229 
1230 END_NCBI_SCOPE
1231 
1232 
1233 /* @} */
1234 
1235 #endif  /* UTIL__THREAD_POOL_OLD__HPP */
1236