1 #ifndef CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP
2 #define CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP
3 
4 /*  $Id: thread_pool_for_server.hpp 508624 2016-07-29 15:32:34Z satskyse $
5 * ===========================================================================
6 *
7 *                            PUBLIC DOMAIN NOTICE
8 *               National Center for Biotechnology Information
9 *
10 *  This software/database is a "United States Government Work" under the
11 *  terms of the United States Copyright Act.  It was written as part of
12 *  the author's official duties as a United States Government employee and
13 *  thus cannot be copyrighted.  This software/database is freely available
14 *  to the public for use. The National Library of Medicine and the U.S.
15 *  Government have not placed any restriction on its use or reproduction.
16 *
17 *  Although all reasonable efforts have been taken to ensure the accuracy
18 *  and reliability of the software and data, the NLM and the U.S.
19 *  Government do not and cannot warrant the performance or results that
20 *  may be obtained by using this software or data. The NLM and the U.S.
21 *  Government disclaim all warranties, express or implied, including
22 *  warranties of performance, merchantability or fitness for any particular
23 *  purpose.
24 *
25 *  Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Author:  Pavel Ivanov
30 *
31 * File Description:
32 */
33 
34 
35 #include <util/thread_pool_old.hpp>
36 
37 
38 /** @addtogroup ThreadedPools
39  *
40  * @{
41  */
42 
43 BEGIN_NCBI_SCOPE
44 
45 
46 class CQueueItemBase_ForServer : public CObject {
47 public:
48     typedef CQueueItemBase::EStatus EStatus;
49 
CQueueItemBase_ForServer(void)50     CQueueItemBase_ForServer(void)
51         : m_Status(CQueueItemBase::ePending)
52     {}
53 
GetStatus(void) const54     const EStatus&   GetStatus(void) const       { return m_Status; }
55 
MarkAsComplete(void)56     void MarkAsComplete(void)        { x_SetStatus(CQueueItemBase::eComplete); }
MarkAsForciblyCaught(void)57     void MarkAsForciblyCaught(void)  { x_SetStatus(CQueueItemBase::eForciblyCaught); }
58 
59 protected:
60     EStatus   m_Status;
61 
x_SetStatus(EStatus new_status)62     virtual void x_SetStatus(EStatus new_status)
63     { m_Status = new_status; }
64 };
65 
66 
67 class CBlockingQueue_ForServer
68 {
69 public:
70     class CQueueItem;
71     typedef CRef<CQueueItem> TItemHandle;
72     typedef CRef<CStdRequest> TRequest;
73 
74     /// It may be desirable to store handles obtained from GetHandle() in
75     /// instances of CCompletingHandle to ensure that they are marked as
76     /// complete when all is said and done, even in the face of exceptions.
77     class CCompletingHandle : public TItemHandle
78     {
79     public:
CCompletingHandle(const TItemHandle & h)80         CCompletingHandle(const TItemHandle& h)
81         : TItemHandle(h)
82         {}
83 
~CCompletingHandle()84         ~CCompletingHandle()
85         {
86             if (this->NotEmpty()) {
87                 this->GetObject().MarkAsComplete();
88             }
89         }
90     };
91 
92     /// Constructor
CBlockingQueue_ForServer(void)93     CBlockingQueue_ForServer(void)
94     {}
95 
96     /// Put a request into the queue.  If the queue remains full for
97     /// the duration of the (optional) timeout, throw an exception.
98     ///
99     /// @param request
100     ///   Request
101     TItemHandle Put(const TRequest& request);
102 
103     /// Get the first available request from the queue, and return a
104     /// handle to it.
105     /// Blocks politely if empty.
106     TItemHandle  GetHandle(void);
107 
108     class CQueueItem : public CQueueItemBase_ForServer
109     {
110     public:
111         // typedef CBlockingQueue<TRequest> TQueue;
CQueueItem(TRequest request)112         CQueueItem(TRequest request)
113             : m_Request(request)
114         {}
115 
GetRequest(void) const116         const TRequest& GetRequest(void) const { return m_Request; }
117 
118     protected:
119         // Specialized for CRef<CStdRequest> in thread_pool.cpp
x_SetStatus(EStatus new_status)120         void x_SetStatus(EStatus new_status)
121         {
122             EStatus old_status = GetStatus();
123             CQueueItemBase_ForServer::x_SetStatus(new_status);
124             m_Request->OnStatusChange(old_status, new_status);
125         }
126 
127     private:
128         friend class CBlockingQueue_ForServer;
129 
130         TRequest  m_Request;
131     };
132 
133 protected:
134     /// The type of the queue
135     typedef deque<TItemHandle> TRealQueue;
136 
137     // Derived classes should take care to use these members properly.
138     TRealQueue          m_Queue;     ///< The queue
139 
140     CConditionVariable  m_GetCond;
141     mutable CMutex      m_Mutex;     ///< Guards access to queue
142 
143 private:
144     /// forbidden
145     CBlockingQueue_ForServer(const CBlockingQueue_ForServer&);
146     CBlockingQueue_ForServer& operator=(const CBlockingQueue_ForServer&);
147 };
148 
149 
150 class CPoolOfThreads_ForServer;
151 
152 class CThreadInPool_ForServer : public CThread
153 {
154 public:
155     typedef CPoolOfThreads_ForServer TPool;
156     typedef CBlockingQueue_ForServer::TItemHandle TItemHandle;
157     typedef CBlockingQueue_ForServer::CCompletingHandle TCompletingHandle;
158     typedef CBlockingQueue_ForServer::TRequest TRequest;
159 
160     /// Constructor
161     ///
162     /// @param pool
163     ///   A pool where this thead is placed
164     /// @param mode
165     ///   A running mode of this thread
CThreadInPool_ForServer(TPool * pool)166     CThreadInPool_ForServer(TPool* pool)
167         : m_Pool(pool), m_Counted(false)
168     {}
169     void CountSelf(void);
170 
171 protected:
172     /// Destructor
173     virtual ~CThreadInPool_ForServer(void);
174 
175     /// Process a request.
176     /// It is called from Main() for each request this thread handles
177     ///
178     /// @param
179     ///   A request for processing
180     void ProcessRequest(TItemHandle handle);
181 
182     /// Older interface (still delegated to by default)
ProcessRequest(const TRequest & req)183     void ProcessRequest(const TRequest& req)
184     { req.GetNCPointerOrNull()->Process(); }
185 
186 private:
187     // to prevent overriding; inherited from CThread
188     virtual void* Main(void);
189 
190     void x_HandleOneRequest(bool catch_all);
191     void x_UnregisterThread(void);
192 
193     class CAutoUnregGuard
194     {
195     public:
196         typedef CThreadInPool_ForServer TThread;
197         CAutoUnregGuard(TThread* thr);
198         ~CAutoUnregGuard(void);
199 
200     private:
201         TThread* m_Thread;
202     };
203 
204     friend class CAutoUnregGuard;
205 
206 
207     TPool*   m_Pool;     ///< The pool that holds this thread
208     bool     m_Counted;
209 };
210 
211 
212 class CPoolOfThreads_ForServer
213 {
214 public:
215     typedef CThreadInPool_ForServer TThread;
216 
217     typedef CBlockingQueue_ForServer TQueue;
218     typedef TQueue::TItemHandle TItemHandle;
219     typedef TQueue::TRequest TRequest;
220 
221     /// Constructor
222     ///
223     /// @param max_threads
224     ///   The maximum number of threads that this pool can run
225     CPoolOfThreads_ForServer(unsigned int max_threads, const string& thr_suffix);
226 
227     /// Destructor
228     virtual ~CPoolOfThreads_ForServer(void);
229 
230     /// Start processing threads
231     ///
232     /// @param num_threads
233     ///    The number of threads to start
234     void Spawn(unsigned int num_threads);
235 
236     /// Put a request in the queue with a given priority
237     ///
238     /// @param request
239     ///   A request
240     void AcceptRequest(const TRequest& request);
241     TItemHandle GetHandle(void);
242 
243     /// Causes all threads in the pool to exit cleanly after finishing
244     /// all pending requests, optionally waiting for them to die.
245     ///
246     /// @param wait
247     ///    If true will wait until all thread in the pool finish their job
248     void KillAllThreads(bool wait);
249 
250 private:
251     friend class CThreadInPool_ForServer;
252 
253     /// Create a new thread
NewThread(void)254     TThread* NewThread(void)
255     { return new CThreadInPool_ForServer(this); }
256 
257     /// Register a thread. It is called by TThread::Main.
258     ///
259     /// @param thread
260     ///   A thread to register
261     /// @param return
262     ///   Whether registration succeeded.  (KillAllThreads disables it.)
263     bool Register(TThread& thread);
264 
265     /// Unregister a thread
266     ///
267     /// @param thread
268     ///   A thread to unregister
269     void UnRegister(TThread&);
270 
271 
272     typedef CAtomicCounter::TValue TACValue;
273 
274     /// The maximum number of threads the pool can hold
275     volatile TACValue       m_MaxThreads;
276     /// The current number of threads in the pool
277     CAtomicCounter          m_ThreadCount;
278     /// The guard for m_MaxThreads and m_MaxUrgentThreads
279     CMutex                  m_Mutex;
280 
281     TQueue                  m_Queue;
282     string                  m_ThrSuffix;
283 
284     typedef list<CRef<TThread> > TThreads;
285     TThreads                m_Threads;
286     bool                    m_KilledAll;
287 };
288 
289 
290 END_NCBI_SCOPE
291 
292 
293 /* @} */
294 
295 #endif  /* CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP */
296