1 #ifndef WN_COMMIT_THREAD__HPP 2 #define WN_COMMIT_THREAD__HPP 3 4 5 /* $Id: wn_commit_thread.hpp 622435 2020-12-23 17:57:37Z ivanov $ 6 * =========================================================================== 7 * 8 * PUBLIC DOMAIN NOTICE 9 * National Center for Biotechnology Information 10 * 11 * This software/database is a "United States Government Work" under the 12 * terms of the United States Copyright Act. It was written as part of 13 * the author's official duties as a United States Government employee and 14 * thus cannot be copyrighted. This software/database is freely available 15 * to the public for use. The National Library of Medicine and the U.S. 16 * Government have not placed any restriction on its use or reproduction. 17 * 18 * Although all reasonable efforts have been taken to ensure the accuracy 19 * and reliability of the software and data, the NLM and the U.S. 20 * Government do not and cannot warrant the performance or results that 21 * may be obtained by using this software or data. The NLM and the U.S. 22 * Government disclaim all warranties, express or implied, including 23 * warranties of performance, merchantability or fitness for any particular 24 * purpose. 25 * 26 * Please cite the author in any work or product based on this material. 27 * 28 * =========================================================================== 29 * 30 * Authors: Dmitry Kazimirov 31 * 32 * File Description: 33 * NetSchedule Worker Node - job committer thread, declarations. 34 */ 35 36 #include <connect/services/grid_worker.hpp> 37 38 #include <deque> 39 40 BEGIN_NCBI_SCOPE 41 42 ///////////////////////////////////////////////////////////////////////////// 43 // 44 /// @internal 45 class CRequestContextSwitcher 46 { 47 public: CRequestContextSwitcher()48 CRequestContextSwitcher() {} 49 CRequestContextSwitcher(CRequestContext * new_request_context)50 CRequestContextSwitcher(CRequestContext* new_request_context) 51 : m_SavedRequestContext(&CDiagContext::GetRequestContext()) 52 { 53 CDiagContext::SetRequestContext(new_request_context); 54 } 55 Release()56 void Release() 57 { 58 if (m_SavedRequestContext) { 59 CDiagContext::SetRequestContext(m_SavedRequestContext); 60 m_SavedRequestContext.Reset(); 61 } 62 } 63 ~CRequestContextSwitcher()64 ~CRequestContextSwitcher() 65 { 66 Release(); 67 } 68 69 private: 70 CRef<CRequestContext> m_SavedRequestContext; 71 }; 72 73 ///////////////////////////////////////////////////////////////////////////// 74 // 75 /// @internal 76 class CJobCommitterThread : public CThread 77 { 78 public: 79 CJobCommitterThread(SGridWorkerNodeImpl* worker_node); 80 81 CWorkerNodeJobContext AllocJobContext(); 82 83 void RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl* job_context, 84 CRequestContextSwitcher& rctx_switcher); 85 86 void Stop(); 87 88 private: 89 typedef CRef<SWorkerNodeJobContextImpl> TEntry; 90 typedef deque<TEntry> TCommitJobTimeline; 91 92 virtual void* Main(); 93 94 bool WaitForTimeout(); 95 bool x_CommitJob(SWorkerNodeJobContextImpl* job_context); 96 WakeUp()97 void WakeUp() 98 { 99 if (m_ImmediateActions.empty()) 100 m_Semaphore.Post(); 101 } 102 103 SGridWorkerNodeImpl* m_WorkerNode; 104 CSemaphore m_Semaphore; 105 TCommitJobTimeline m_ImmediateActions, m_Timeline, m_JobContextPool; 106 CFastMutex m_TimelineMutex; 107 const string m_ThreadName; 108 bool m_IsShuttingDown = false; 109 110 typedef CGuard<CFastMutex, SSimpleUnlock<CFastMutex>, 111 SSimpleLock<CFastMutex> > TFastMutexUnlockGuard; 112 }; 113 114 END_NCBI_SCOPE 115 116 #endif // WN_COMMIT_THREAD__HPP 117