1 /*  $Id: wn_commit_thread.cpp 637904 2021-09-20 19:52:17Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *    NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wn_commit_thread.hpp"
35 #include "grid_worker_impl.hpp"
36 
37 #include <connect/services/grid_globals.hpp>
38 #include <connect/services/grid_globals.hpp>
39 #include <connect/services/error_codes.hpp>
40 
41 #include <corelib/ncbiexpt.hpp>
42 #include <corelib/ncbi_system.hpp>
43 
44 
45 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
46 
47 
48 BEGIN_NCBI_SCOPE
49 
50 /////////////////////////////////////////////////////////////////////////////
51 //
52 ///@internal
s_TlsCleanup(IWorkerNodeJob * p_value,void *)53 static void s_TlsCleanup(IWorkerNodeJob* p_value, void* /* data */ )
54 {
55     if (p_value != NULL)
56         p_value->RemoveReference();
57 }
58 /// @internal
59 static CStaticTls<IWorkerNodeJob> s_tls;
60 
CJobCommitterThread(SGridWorkerNodeImpl * worker_node)61 CJobCommitterThread::CJobCommitterThread(SGridWorkerNodeImpl* worker_node) :
62     m_WorkerNode(worker_node),
63     m_Semaphore(0, 1),
64     m_ThreadName(worker_node->GetAppName() + "_cm")
65 {
66 }
67 
AllocJobContext()68 CWorkerNodeJobContext CJobCommitterThread::AllocJobContext()
69 {
70     TFastMutexGuard mutex_lock(m_TimelineMutex);
71 
72     if (m_JobContextPool.empty())
73         return new SWorkerNodeJobContextImpl(m_WorkerNode);
74 
75     CWorkerNodeJobContext job_context(m_JobContextPool.front());
76     m_JobContextPool.pop_front();
77 
78     job_context->m_Job.Reset();
79     return job_context;
80 }
81 
RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl * job_context,CRequestContextSwitcher & rctx_switcher)82 void CJobCommitterThread::RecycleJobContextAndCommitJob(
83         SWorkerNodeJobContextImpl* job_context,
84         CRequestContextSwitcher& rctx_switcher)
85 {
86     job_context->m_FirstCommitAttempt = true;
87 
88     TFastMutexGuard mutex_lock(m_TimelineMutex);
89 
90     // Must be called prior to adding the job context to
91     // m_ImmediateActions: when empty, m_ImmediateActions
92     // indicates that the committer thread is waiting and
93     // the semaphore must be incremented.
94     WakeUp();
95 
96     m_ImmediateActions.push_back(TEntry(job_context));
97 
98     // We must do it here, before m_TimelineMutex is unlocked
99     rctx_switcher.Release();
100 }
101 
Stop()102 void CJobCommitterThread::Stop()
103 {
104     TFastMutexGuard mutex_lock(m_TimelineMutex);
105 
106     m_IsShuttingDown = true;
107     WakeUp();
108 }
109 
WaitForTimeout()110 bool CJobCommitterThread::WaitForTimeout()
111 {
112     CNanoTimeout timeout = m_Timeline.front()->GetTimeout().GetRemainingTime();
113 
114     if (timeout.IsZero()) {
115         return true;
116     }
117 
118     TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
119     return !m_Semaphore.TryWait(timeout);
120 }
121 
Main()122 void* CJobCommitterThread::Main()
123 {
124     SetCurrentThreadName(m_ThreadName);
125     TFastMutexGuard mutex_lock(m_TimelineMutex);
126 
127     do {
128         if (m_Timeline.empty()) {
129             TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
130 
131             m_Semaphore.Wait();
132         } else if (WaitForTimeout()) {
133             m_ImmediateActions.push_back(m_Timeline.front());
134             m_Timeline.pop_front();
135         }
136 
137         while (!m_ImmediateActions.empty()) {
138             TEntry& entry = m_ImmediateActions.front();
139 
140             // Do not remove the job context from m_ImmediateActions
141             // prior to calling x_CommitJob() to avoid race conditions
142             // (otherwise, the semaphore can be Post()'ed multiple times
143             // by the worker threads while this thread is in x_CommitJob()).
144             if (x_CommitJob(entry)) {
145                 m_JobContextPool.push_back(entry);
146             } else {
147                 m_Timeline.push_back(entry);
148             }
149 
150             m_ImmediateActions.pop_front();
151         }
152 
153     // Cannot use CGridGlobals::GetInstance().IsShuttingDown()) below,
154     // this thread could exit before committing all pending jobs otherwise.
155     } while (!m_IsShuttingDown);
156 
157     return NULL;
158 }
159 
x_CommitJob(SWorkerNodeJobContextImpl * job_context)160 bool CJobCommitterThread::x_CommitJob(SWorkerNodeJobContextImpl* job_context)
161 {
162     TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
163 
164     CRequestContextSwitcher request_state_guard(job_context->m_RequestContext);
165 
166     bool recycle_job_context = false;
167     m_WorkerNode->m_JobsInProgress.Update(job_context->m_Job);
168 
169     try {
170         switch (job_context->m_JobCommitStatus) {
171         case CWorkerNodeJobContext::eCS_Done:
172             m_WorkerNode->m_NSExecutor.PutResult(job_context->m_Job);
173             break;
174 
175         case CWorkerNodeJobContext::eCS_Failure:
176             m_WorkerNode->m_NSExecutor.PutFailure(job_context->m_Job,
177                     job_context->m_DisableRetries);
178             break;
179 
180         default: /* eCS_NotCommitted */
181             // In the unlikely event of eCS_NotCommitted, return the job.
182             /* FALL THROUGH */
183 
184         case CWorkerNodeJobContext::eCS_Return:
185             m_WorkerNode->m_NSExecutor.ReturnJob(job_context->m_Job);
186             break;
187 
188         case CWorkerNodeJobContext::eCS_Reschedule:
189             m_WorkerNode->m_NSExecutor.Reschedule(job_context->m_Job);
190             break;
191 
192         case CWorkerNodeJobContext::eCS_JobIsLost:
193             // Job is cancelled or otherwise taken away from the worker
194             // node. Whatever the cause is, it has been reported already.
195             break;
196         }
197 
198         recycle_job_context = true;
199     }
200     catch (CNetScheduleException& e) {
201         if ((e.GetErrCode() == CNetScheduleException::eInvalidJobStatus) &&
202                 e.GetMsg().find("job is in Canceled state") != string::npos) {
203             LOG_POST(Warning << "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
204         } else {
205             ERR_POST_X(65, "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
206         }
207 
208         recycle_job_context = true;
209     }
210     catch (exception& e) {
211         unsigned commit_interval = m_WorkerNode->m_CommitJobInterval;
212         job_context->ResetTimeout(commit_interval);
213         if (job_context->m_FirstCommitAttempt) {
214             job_context->m_FirstCommitAttempt = false;
215             job_context->m_CommitExpiration =
216                     CDeadline(m_WorkerNode->m_QueueTimeout, 0);
217         } else if (job_context->m_CommitExpiration <
218                 job_context->GetTimeout()) {
219             ERR_POST_X(64, "Could not commit " <<
220                     job_context->m_Job.job_id << ": " << e.what());
221             recycle_job_context = true;
222         }
223         if (!recycle_job_context) {
224             ERR_POST_X(63, "Error while committing " <<
225                     job_context->m_Job.job_id << ": " << e.what() <<
226                     "; will retry in " << commit_interval << " seconds.");
227         }
228     }
229 
230     if (recycle_job_context) {
231         m_WorkerNode->m_JobsInProgress.Remove(job_context->m_Job);
232         job_context->x_PrintRequestStop();
233     }
234 
235     return recycle_job_context;
236 }
237 
238 /// @internal
GetJobProcessor()239 IWorkerNodeJob* SGridWorkerNodeImpl::GetJobProcessor()
240 {
241     IWorkerNodeJob* ret = s_tls.GetValue();
242     if (ret == NULL) {
243         try {
244             CFastMutexGuard guard(m_JobProcessorMutex);
245             ret = m_JobProcessorFactory->CreateInstance();
246         }
247         catch (exception& e) {
248             ERR_POST_X(9, "Could not create an instance of the "
249                     "job processor class." << e.what());
250             CGridGlobals::GetInstance().RequestShutdown(
251                     CNetScheduleAdmin::eShutdownImmediate);
252             throw;
253         }
254         if (ret == NULL) {
255             CGridGlobals::GetInstance().RequestShutdown(
256                     CNetScheduleAdmin::eShutdownImmediate);
257             NCBI_THROW(CException, eUnknown,
258                     "Could not create an instance of the job processor class.");
259         }
260         if (CGridGlobals::GetInstance().ReuseJobObject()) {
261             s_tls.SetValue(ret, s_TlsCleanup);
262             ret->AddReference();
263         }
264     }
265     return ret;
266 }
267 
268 END_NCBI_SCOPE
269