1 #ifndef CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP 2 #define CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP 3 4 /* $Id: thread_pool_for_server.hpp 508624 2016-07-29 15:32:34Z satskyse $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Author: Pavel Ivanov 30 * 31 * File Description: 32 */ 33 34 35 #include <util/thread_pool_old.hpp> 36 37 38 /** @addtogroup ThreadedPools 39 * 40 * @{ 41 */ 42 43 BEGIN_NCBI_SCOPE 44 45 46 class CQueueItemBase_ForServer : public CObject { 47 public: 48 typedef CQueueItemBase::EStatus EStatus; 49 CQueueItemBase_ForServer(void)50 CQueueItemBase_ForServer(void) 51 : m_Status(CQueueItemBase::ePending) 52 {} 53 GetStatus(void) const54 const EStatus& GetStatus(void) const { return m_Status; } 55 MarkAsComplete(void)56 void MarkAsComplete(void) { x_SetStatus(CQueueItemBase::eComplete); } MarkAsForciblyCaught(void)57 void MarkAsForciblyCaught(void) { x_SetStatus(CQueueItemBase::eForciblyCaught); } 58 59 protected: 60 EStatus m_Status; 61 x_SetStatus(EStatus new_status)62 virtual void x_SetStatus(EStatus new_status) 63 { m_Status = new_status; } 64 }; 65 66 67 class CBlockingQueue_ForServer 68 { 69 public: 70 class CQueueItem; 71 typedef CRef<CQueueItem> TItemHandle; 72 typedef CRef<CStdRequest> TRequest; 73 74 /// It may be desirable to store handles obtained from GetHandle() in 75 /// instances of CCompletingHandle to ensure that they are marked as 76 /// complete when all is said and done, even in the face of exceptions. 77 class CCompletingHandle : public TItemHandle 78 { 79 public: CCompletingHandle(const TItemHandle & h)80 CCompletingHandle(const TItemHandle& h) 81 : TItemHandle(h) 82 {} 83 ~CCompletingHandle()84 ~CCompletingHandle() 85 { 86 if (this->NotEmpty()) { 87 this->GetObject().MarkAsComplete(); 88 } 89 } 90 }; 91 92 /// Constructor CBlockingQueue_ForServer(void)93 CBlockingQueue_ForServer(void) 94 {} 95 96 /// Put a request into the queue. If the queue remains full for 97 /// the duration of the (optional) timeout, throw an exception. 98 /// 99 /// @param request 100 /// Request 101 TItemHandle Put(const TRequest& request); 102 103 /// Get the first available request from the queue, and return a 104 /// handle to it. 105 /// Blocks politely if empty. 106 TItemHandle GetHandle(void); 107 108 class CQueueItem : public CQueueItemBase_ForServer 109 { 110 public: 111 // typedef CBlockingQueue<TRequest> TQueue; CQueueItem(TRequest request)112 CQueueItem(TRequest request) 113 : m_Request(request) 114 {} 115 GetRequest(void) const116 const TRequest& GetRequest(void) const { return m_Request; } 117 118 protected: 119 // Specialized for CRef<CStdRequest> in thread_pool.cpp x_SetStatus(EStatus new_status)120 void x_SetStatus(EStatus new_status) 121 { 122 EStatus old_status = GetStatus(); 123 CQueueItemBase_ForServer::x_SetStatus(new_status); 124 m_Request->OnStatusChange(old_status, new_status); 125 } 126 127 private: 128 friend class CBlockingQueue_ForServer; 129 130 TRequest m_Request; 131 }; 132 133 protected: 134 /// The type of the queue 135 typedef deque<TItemHandle> TRealQueue; 136 137 // Derived classes should take care to use these members properly. 138 TRealQueue m_Queue; ///< The queue 139 140 CConditionVariable m_GetCond; 141 mutable CMutex m_Mutex; ///< Guards access to queue 142 143 private: 144 /// forbidden 145 CBlockingQueue_ForServer(const CBlockingQueue_ForServer&); 146 CBlockingQueue_ForServer& operator=(const CBlockingQueue_ForServer&); 147 }; 148 149 150 class CPoolOfThreads_ForServer; 151 152 class CThreadInPool_ForServer : public CThread 153 { 154 public: 155 typedef CPoolOfThreads_ForServer TPool; 156 typedef CBlockingQueue_ForServer::TItemHandle TItemHandle; 157 typedef CBlockingQueue_ForServer::CCompletingHandle TCompletingHandle; 158 typedef CBlockingQueue_ForServer::TRequest TRequest; 159 160 /// Constructor 161 /// 162 /// @param pool 163 /// A pool where this thead is placed 164 /// @param mode 165 /// A running mode of this thread CThreadInPool_ForServer(TPool * pool)166 CThreadInPool_ForServer(TPool* pool) 167 : m_Pool(pool), m_Counted(false) 168 {} 169 void CountSelf(void); 170 171 protected: 172 /// Destructor 173 virtual ~CThreadInPool_ForServer(void); 174 175 /// Process a request. 176 /// It is called from Main() for each request this thread handles 177 /// 178 /// @param 179 /// A request for processing 180 void ProcessRequest(TItemHandle handle); 181 182 /// Older interface (still delegated to by default) ProcessRequest(const TRequest & req)183 void ProcessRequest(const TRequest& req) 184 { req.GetNCPointerOrNull()->Process(); } 185 186 private: 187 // to prevent overriding; inherited from CThread 188 virtual void* Main(void); 189 190 void x_HandleOneRequest(bool catch_all); 191 void x_UnregisterThread(void); 192 193 class CAutoUnregGuard 194 { 195 public: 196 typedef CThreadInPool_ForServer TThread; 197 CAutoUnregGuard(TThread* thr); 198 ~CAutoUnregGuard(void); 199 200 private: 201 TThread* m_Thread; 202 }; 203 204 friend class CAutoUnregGuard; 205 206 207 TPool* m_Pool; ///< The pool that holds this thread 208 bool m_Counted; 209 }; 210 211 212 class CPoolOfThreads_ForServer 213 { 214 public: 215 typedef CThreadInPool_ForServer TThread; 216 217 typedef CBlockingQueue_ForServer TQueue; 218 typedef TQueue::TItemHandle TItemHandle; 219 typedef TQueue::TRequest TRequest; 220 221 /// Constructor 222 /// 223 /// @param max_threads 224 /// The maximum number of threads that this pool can run 225 CPoolOfThreads_ForServer(unsigned int max_threads, const string& thr_suffix); 226 227 /// Destructor 228 virtual ~CPoolOfThreads_ForServer(void); 229 230 /// Start processing threads 231 /// 232 /// @param num_threads 233 /// The number of threads to start 234 void Spawn(unsigned int num_threads); 235 236 /// Put a request in the queue with a given priority 237 /// 238 /// @param request 239 /// A request 240 void AcceptRequest(const TRequest& request); 241 TItemHandle GetHandle(void); 242 243 /// Causes all threads in the pool to exit cleanly after finishing 244 /// all pending requests, optionally waiting for them to die. 245 /// 246 /// @param wait 247 /// If true will wait until all thread in the pool finish their job 248 void KillAllThreads(bool wait); 249 250 private: 251 friend class CThreadInPool_ForServer; 252 253 /// Create a new thread NewThread(void)254 TThread* NewThread(void) 255 { return new CThreadInPool_ForServer(this); } 256 257 /// Register a thread. It is called by TThread::Main. 258 /// 259 /// @param thread 260 /// A thread to register 261 /// @param return 262 /// Whether registration succeeded. (KillAllThreads disables it.) 263 bool Register(TThread& thread); 264 265 /// Unregister a thread 266 /// 267 /// @param thread 268 /// A thread to unregister 269 void UnRegister(TThread&); 270 271 272 typedef CAtomicCounter::TValue TACValue; 273 274 /// The maximum number of threads the pool can hold 275 volatile TACValue m_MaxThreads; 276 /// The current number of threads in the pool 277 CAtomicCounter m_ThreadCount; 278 /// The guard for m_MaxThreads and m_MaxUrgentThreads 279 CMutex m_Mutex; 280 281 TQueue m_Queue; 282 string m_ThrSuffix; 283 284 typedef list<CRef<TThread> > TThreads; 285 TThreads m_Threads; 286 bool m_KilledAll; 287 }; 288 289 290 END_NCBI_SCOPE 291 292 293 /* @} */ 294 295 #endif /* CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP */ 296