1 /* $Id: wn_commit_thread.cpp 622435 2020-12-23 17:57:37Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 * NetSchedule Worker Node implementation
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "wn_commit_thread.hpp"
35 #include "grid_worker_impl.hpp"
36
37 #include <connect/services/grid_globals.hpp>
38 #include <connect/services/grid_globals.hpp>
39 #include <connect/services/error_codes.hpp>
40
41 #include <corelib/ncbiexpt.hpp>
42 #include <corelib/ncbi_system.hpp>
43
44
45 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
46
47
48 BEGIN_NCBI_SCOPE
49
50 /////////////////////////////////////////////////////////////////////////////
51 //
52 ///@internal
s_TlsCleanup(IWorkerNodeJob * p_value,void *)53 static void s_TlsCleanup(IWorkerNodeJob* p_value, void* /* data */ )
54 {
55 if (p_value != NULL)
56 p_value->RemoveReference();
57 }
58 /// @internal
59 static CStaticTls<IWorkerNodeJob> s_tls;
60
CJobCommitterThread(SGridWorkerNodeImpl * worker_node)61 CJobCommitterThread::CJobCommitterThread(SGridWorkerNodeImpl* worker_node) :
62 m_WorkerNode(worker_node),
63 m_Semaphore(0, 1),
64 m_ThreadName(worker_node->GetAppName() + "_cm")
65 {
66 }
67
AllocJobContext()68 CWorkerNodeJobContext CJobCommitterThread::AllocJobContext()
69 {
70 TFastMutexGuard mutex_lock(m_TimelineMutex);
71
72 if (m_JobContextPool.empty())
73 return new SWorkerNodeJobContextImpl(m_WorkerNode);
74
75 CWorkerNodeJobContext job_context(m_JobContextPool.front());
76 m_JobContextPool.pop_front();
77
78 job_context->m_Job.Reset();
79 job_context->m_JobGeneration = m_WorkerNode->m_CurrentJobGeneration;
80
81 return job_context;
82 }
83
RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl * job_context,CRequestContextSwitcher & rctx_switcher)84 void CJobCommitterThread::RecycleJobContextAndCommitJob(
85 SWorkerNodeJobContextImpl* job_context,
86 CRequestContextSwitcher& rctx_switcher)
87 {
88 job_context->m_FirstCommitAttempt = true;
89
90 TFastMutexGuard mutex_lock(m_TimelineMutex);
91
92 // Must be called prior to adding the job context to
93 // m_ImmediateActions: when empty, m_ImmediateActions
94 // indicates that the committer thread is waiting and
95 // the semaphore must be incremented.
96 WakeUp();
97
98 m_ImmediateActions.push_back(TEntry(job_context));
99
100 // We must do it here, before m_TimelineMutex is unlocked
101 rctx_switcher.Release();
102 }
103
Stop()104 void CJobCommitterThread::Stop()
105 {
106 TFastMutexGuard mutex_lock(m_TimelineMutex);
107
108 m_IsShuttingDown = true;
109 WakeUp();
110 }
111
WaitForTimeout()112 bool CJobCommitterThread::WaitForTimeout()
113 {
114 CNanoTimeout timeout = m_Timeline.front()->GetTimeout().GetRemainingTime();
115
116 if (timeout.IsZero()) {
117 return true;
118 }
119
120 TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
121 return !m_Semaphore.TryWait(timeout);
122 }
123
Main()124 void* CJobCommitterThread::Main()
125 {
126 SetCurrentThreadName(m_ThreadName);
127 TFastMutexGuard mutex_lock(m_TimelineMutex);
128
129 do {
130 if (m_Timeline.empty()) {
131 TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
132
133 m_Semaphore.Wait();
134 } else if (WaitForTimeout()) {
135 m_ImmediateActions.push_back(m_Timeline.front());
136 m_Timeline.pop_front();
137 }
138
139 while (!m_ImmediateActions.empty()) {
140 TEntry& entry = m_ImmediateActions.front();
141
142 // Do not remove the job context from m_ImmediateActions
143 // prior to calling x_CommitJob() to avoid race conditions
144 // (otherwise, the semaphore can be Post()'ed multiple times
145 // by the worker threads while this thread is in x_CommitJob()).
146 if (x_CommitJob(entry)) {
147 m_JobContextPool.push_back(entry);
148 } else {
149 m_Timeline.push_back(entry);
150 }
151
152 m_ImmediateActions.pop_front();
153 }
154
155 // Cannot use CGridGlobals::GetInstance().IsShuttingDown()) below,
156 // this thread could exit before committing all pending jobs otherwise.
157 } while (!m_IsShuttingDown);
158
159 return NULL;
160 }
161
x_CommitJob(SWorkerNodeJobContextImpl * job_context)162 bool CJobCommitterThread::x_CommitJob(SWorkerNodeJobContextImpl* job_context)
163 {
164 TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
165
166 CRequestContextSwitcher request_state_guard(job_context->m_RequestContext);
167
168 bool recycle_job_context = false;
169 m_WorkerNode->m_JobsInProgress.Update(job_context->m_Job);
170
171 try {
172 switch (job_context->m_JobCommitStatus) {
173 case CWorkerNodeJobContext::eCS_Done:
174 m_WorkerNode->m_NSExecutor.PutResult(job_context->m_Job);
175 break;
176
177 case CWorkerNodeJobContext::eCS_Failure:
178 m_WorkerNode->m_NSExecutor.PutFailure(job_context->m_Job,
179 job_context->m_DisableRetries);
180 break;
181
182 default: /* eCS_NotCommitted */
183 // In the unlikely event of eCS_NotCommitted, return the job.
184 /* FALL THROUGH */
185
186 case CWorkerNodeJobContext::eCS_Return:
187 m_WorkerNode->m_NSExecutor.ReturnJob(job_context->m_Job);
188 break;
189
190 case CWorkerNodeJobContext::eCS_Reschedule:
191 m_WorkerNode->m_NSExecutor.Reschedule(job_context->m_Job);
192 break;
193
194 case CWorkerNodeJobContext::eCS_JobIsLost:
195 // Job is cancelled or otherwise taken away from the worker
196 // node. Whatever the cause is, it has been reported already.
197 break;
198 }
199
200 recycle_job_context = true;
201 }
202 catch (CNetScheduleException& e) {
203 if ((e.GetErrCode() == CNetScheduleException::eInvalidJobStatus) &&
204 e.GetMsg().find("job is in Canceled state") != string::npos) {
205 LOG_POST(Warning << "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
206 } else {
207 ERR_POST_X(65, "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
208 }
209
210 recycle_job_context = true;
211 }
212 catch (exception& e) {
213 unsigned commit_interval = m_WorkerNode->m_CommitJobInterval;
214 job_context->ResetTimeout(commit_interval);
215 if (job_context->m_FirstCommitAttempt) {
216 job_context->m_FirstCommitAttempt = false;
217 job_context->m_CommitExpiration =
218 CDeadline(m_WorkerNode->m_QueueTimeout, 0);
219 } else if (job_context->m_CommitExpiration <
220 job_context->GetTimeout()) {
221 ERR_POST_X(64, "Could not commit " <<
222 job_context->m_Job.job_id << ": " << e.what());
223 recycle_job_context = true;
224 }
225 if (!recycle_job_context) {
226 ERR_POST_X(63, "Error while committing " <<
227 job_context->m_Job.job_id << ": " << e.what() <<
228 "; will retry in " << commit_interval << " seconds.");
229 }
230 }
231
232 if (recycle_job_context) {
233 m_WorkerNode->m_JobsInProgress.Remove(job_context->m_Job);
234 job_context->x_PrintRequestStop();
235 }
236
237 return recycle_job_context;
238 }
239
240 /// @internal
GetJobProcessor()241 IWorkerNodeJob* SGridWorkerNodeImpl::GetJobProcessor()
242 {
243 IWorkerNodeJob* ret = s_tls.GetValue();
244 if (ret == NULL) {
245 try {
246 CFastMutexGuard guard(m_JobProcessorMutex);
247 ret = m_JobProcessorFactory->CreateInstance();
248 }
249 catch (exception& e) {
250 ERR_POST_X(9, "Could not create an instance of the "
251 "job processor class." << e.what());
252 CGridGlobals::GetInstance().RequestShutdown(
253 CNetScheduleAdmin::eShutdownImmediate);
254 throw;
255 }
256 if (ret == NULL) {
257 CGridGlobals::GetInstance().RequestShutdown(
258 CNetScheduleAdmin::eShutdownImmediate);
259 NCBI_THROW(CException, eUnknown,
260 "Could not create an instance of the job processor class.");
261 }
262 if (CGridGlobals::GetInstance().ReuseJobObject()) {
263 s_tls.SetValue(ret, s_TlsCleanup);
264 ret->AddReference();
265 }
266 }
267 return ret;
268 }
269
270 END_NCBI_SCOPE
271