1 #ifndef WN_COMMIT_THREAD__HPP
2 #define WN_COMMIT_THREAD__HPP
3 
4 
5 /*  $Id: wn_commit_thread.hpp 622435 2020-12-23 17:57:37Z ivanov $
6  * ===========================================================================
7  *
8  *                            PUBLIC DOMAIN NOTICE
9  *               National Center for Biotechnology Information
10  *
11  *  This software/database is a "United States Government Work" under the
12  *  terms of the United States Copyright Act.  It was written as part of
13  *  the author's official duties as a United States Government employee and
14  *  thus cannot be copyrighted.  This software/database is freely available
15  *  to the public for use. The National Library of Medicine and the U.S.
16  *   Government have not placed any restriction on its use or reproduction.
17  *
18  *  Although all reasonable efforts have been taken to ensure the accuracy
19  *  and reliability of the software and data, the NLM and the U.S.
20  *  Government do not and cannot warrant the performance or results that
21  *  may be obtained by using this software or data. The NLM and the U.S.
22  *  Government disclaim all warranties, express or implied, including
23  *  warranties of performance, merchantability or fitness for any particular
24  *  purpose.
25  *
26  *  Please cite the author in any work or product based on this material.
27  *
28  * ===========================================================================
29  *
30  * Authors:  Dmitry Kazimirov
31  *
32  * File Description:
33  *    NetSchedule Worker Node - job committer thread, declarations.
34  */
35 
36 #include <connect/services/grid_worker.hpp>
37 
38 #include <deque>
39 
40 BEGIN_NCBI_SCOPE
41 
42 /////////////////////////////////////////////////////////////////////////////
43 //
44 /// @internal
45 class CRequestContextSwitcher
46 {
47 public:
CRequestContextSwitcher()48     CRequestContextSwitcher() {}
49 
CRequestContextSwitcher(CRequestContext * new_request_context)50     CRequestContextSwitcher(CRequestContext* new_request_context)
51         : m_SavedRequestContext(&CDiagContext::GetRequestContext())
52     {
53         CDiagContext::SetRequestContext(new_request_context);
54     }
55 
Release()56     void Release()
57     {
58         if (m_SavedRequestContext) {
59             CDiagContext::SetRequestContext(m_SavedRequestContext);
60             m_SavedRequestContext.Reset();
61         }
62     }
63 
~CRequestContextSwitcher()64     ~CRequestContextSwitcher()
65     {
66         Release();
67     }
68 
69 private:
70     CRef<CRequestContext> m_SavedRequestContext;
71 };
72 
73 /////////////////////////////////////////////////////////////////////////////
74 //
75 /// @internal
76 class CJobCommitterThread : public CThread
77 {
78 public:
79     CJobCommitterThread(SGridWorkerNodeImpl* worker_node);
80 
81     CWorkerNodeJobContext AllocJobContext();
82 
83     void RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl* job_context,
84             CRequestContextSwitcher& rctx_switcher);
85 
86     void Stop();
87 
88 private:
89     typedef CRef<SWorkerNodeJobContextImpl> TEntry;
90     typedef deque<TEntry> TCommitJobTimeline;
91 
92     virtual void* Main();
93 
94     bool WaitForTimeout();
95     bool x_CommitJob(SWorkerNodeJobContextImpl* job_context);
96 
WakeUp()97     void WakeUp()
98     {
99         if (m_ImmediateActions.empty())
100             m_Semaphore.Post();
101     }
102 
103     SGridWorkerNodeImpl* m_WorkerNode;
104     CSemaphore m_Semaphore;
105     TCommitJobTimeline m_ImmediateActions, m_Timeline, m_JobContextPool;
106     CFastMutex m_TimelineMutex;
107     const string m_ThreadName;
108     bool m_IsShuttingDown = false;
109 
110     typedef CGuard<CFastMutex, SSimpleUnlock<CFastMutex>,
111             SSimpleLock<CFastMutex> > TFastMutexUnlockGuard;
112 };
113 
114 END_NCBI_SCOPE
115 
116 #endif // WN_COMMIT_THREAD__HPP
117