1 /* $Id: wn_commit_thread.cpp 637904 2021-09-20 19:52:17Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 * NetSchedule Worker Node implementation
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "wn_commit_thread.hpp"
35 #include "grid_worker_impl.hpp"
36
37 #include <connect/services/grid_globals.hpp>
38 #include <connect/services/grid_globals.hpp>
39 #include <connect/services/error_codes.hpp>
40
41 #include <corelib/ncbiexpt.hpp>
42 #include <corelib/ncbi_system.hpp>
43
44
45 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
46
47
48 BEGIN_NCBI_SCOPE
49
50 /////////////////////////////////////////////////////////////////////////////
51 //
52 ///@internal
s_TlsCleanup(IWorkerNodeJob * p_value,void *)53 static void s_TlsCleanup(IWorkerNodeJob* p_value, void* /* data */ )
54 {
55 if (p_value != NULL)
56 p_value->RemoveReference();
57 }
58 /// @internal
59 static CStaticTls<IWorkerNodeJob> s_tls;
60
CJobCommitterThread(SGridWorkerNodeImpl * worker_node)61 CJobCommitterThread::CJobCommitterThread(SGridWorkerNodeImpl* worker_node) :
62 m_WorkerNode(worker_node),
63 m_Semaphore(0, 1),
64 m_ThreadName(worker_node->GetAppName() + "_cm")
65 {
66 }
67
AllocJobContext()68 CWorkerNodeJobContext CJobCommitterThread::AllocJobContext()
69 {
70 TFastMutexGuard mutex_lock(m_TimelineMutex);
71
72 if (m_JobContextPool.empty())
73 return new SWorkerNodeJobContextImpl(m_WorkerNode);
74
75 CWorkerNodeJobContext job_context(m_JobContextPool.front());
76 m_JobContextPool.pop_front();
77
78 job_context->m_Job.Reset();
79 return job_context;
80 }
81
RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl * job_context,CRequestContextSwitcher & rctx_switcher)82 void CJobCommitterThread::RecycleJobContextAndCommitJob(
83 SWorkerNodeJobContextImpl* job_context,
84 CRequestContextSwitcher& rctx_switcher)
85 {
86 job_context->m_FirstCommitAttempt = true;
87
88 TFastMutexGuard mutex_lock(m_TimelineMutex);
89
90 // Must be called prior to adding the job context to
91 // m_ImmediateActions: when empty, m_ImmediateActions
92 // indicates that the committer thread is waiting and
93 // the semaphore must be incremented.
94 WakeUp();
95
96 m_ImmediateActions.push_back(TEntry(job_context));
97
98 // We must do it here, before m_TimelineMutex is unlocked
99 rctx_switcher.Release();
100 }
101
Stop()102 void CJobCommitterThread::Stop()
103 {
104 TFastMutexGuard mutex_lock(m_TimelineMutex);
105
106 m_IsShuttingDown = true;
107 WakeUp();
108 }
109
WaitForTimeout()110 bool CJobCommitterThread::WaitForTimeout()
111 {
112 CNanoTimeout timeout = m_Timeline.front()->GetTimeout().GetRemainingTime();
113
114 if (timeout.IsZero()) {
115 return true;
116 }
117
118 TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
119 return !m_Semaphore.TryWait(timeout);
120 }
121
Main()122 void* CJobCommitterThread::Main()
123 {
124 SetCurrentThreadName(m_ThreadName);
125 TFastMutexGuard mutex_lock(m_TimelineMutex);
126
127 do {
128 if (m_Timeline.empty()) {
129 TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
130
131 m_Semaphore.Wait();
132 } else if (WaitForTimeout()) {
133 m_ImmediateActions.push_back(m_Timeline.front());
134 m_Timeline.pop_front();
135 }
136
137 while (!m_ImmediateActions.empty()) {
138 TEntry& entry = m_ImmediateActions.front();
139
140 // Do not remove the job context from m_ImmediateActions
141 // prior to calling x_CommitJob() to avoid race conditions
142 // (otherwise, the semaphore can be Post()'ed multiple times
143 // by the worker threads while this thread is in x_CommitJob()).
144 if (x_CommitJob(entry)) {
145 m_JobContextPool.push_back(entry);
146 } else {
147 m_Timeline.push_back(entry);
148 }
149
150 m_ImmediateActions.pop_front();
151 }
152
153 // Cannot use CGridGlobals::GetInstance().IsShuttingDown()) below,
154 // this thread could exit before committing all pending jobs otherwise.
155 } while (!m_IsShuttingDown);
156
157 return NULL;
158 }
159
x_CommitJob(SWorkerNodeJobContextImpl * job_context)160 bool CJobCommitterThread::x_CommitJob(SWorkerNodeJobContextImpl* job_context)
161 {
162 TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
163
164 CRequestContextSwitcher request_state_guard(job_context->m_RequestContext);
165
166 bool recycle_job_context = false;
167 m_WorkerNode->m_JobsInProgress.Update(job_context->m_Job);
168
169 try {
170 switch (job_context->m_JobCommitStatus) {
171 case CWorkerNodeJobContext::eCS_Done:
172 m_WorkerNode->m_NSExecutor.PutResult(job_context->m_Job);
173 break;
174
175 case CWorkerNodeJobContext::eCS_Failure:
176 m_WorkerNode->m_NSExecutor.PutFailure(job_context->m_Job,
177 job_context->m_DisableRetries);
178 break;
179
180 default: /* eCS_NotCommitted */
181 // In the unlikely event of eCS_NotCommitted, return the job.
182 /* FALL THROUGH */
183
184 case CWorkerNodeJobContext::eCS_Return:
185 m_WorkerNode->m_NSExecutor.ReturnJob(job_context->m_Job);
186 break;
187
188 case CWorkerNodeJobContext::eCS_Reschedule:
189 m_WorkerNode->m_NSExecutor.Reschedule(job_context->m_Job);
190 break;
191
192 case CWorkerNodeJobContext::eCS_JobIsLost:
193 // Job is cancelled or otherwise taken away from the worker
194 // node. Whatever the cause is, it has been reported already.
195 break;
196 }
197
198 recycle_job_context = true;
199 }
200 catch (CNetScheduleException& e) {
201 if ((e.GetErrCode() == CNetScheduleException::eInvalidJobStatus) &&
202 e.GetMsg().find("job is in Canceled state") != string::npos) {
203 LOG_POST(Warning << "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
204 } else {
205 ERR_POST_X(65, "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
206 }
207
208 recycle_job_context = true;
209 }
210 catch (exception& e) {
211 unsigned commit_interval = m_WorkerNode->m_CommitJobInterval;
212 job_context->ResetTimeout(commit_interval);
213 if (job_context->m_FirstCommitAttempt) {
214 job_context->m_FirstCommitAttempt = false;
215 job_context->m_CommitExpiration =
216 CDeadline(m_WorkerNode->m_QueueTimeout, 0);
217 } else if (job_context->m_CommitExpiration <
218 job_context->GetTimeout()) {
219 ERR_POST_X(64, "Could not commit " <<
220 job_context->m_Job.job_id << ": " << e.what());
221 recycle_job_context = true;
222 }
223 if (!recycle_job_context) {
224 ERR_POST_X(63, "Error while committing " <<
225 job_context->m_Job.job_id << ": " << e.what() <<
226 "; will retry in " << commit_interval << " seconds.");
227 }
228 }
229
230 if (recycle_job_context) {
231 m_WorkerNode->m_JobsInProgress.Remove(job_context->m_Job);
232 job_context->x_PrintRequestStop();
233 }
234
235 return recycle_job_context;
236 }
237
238 /// @internal
GetJobProcessor()239 IWorkerNodeJob* SGridWorkerNodeImpl::GetJobProcessor()
240 {
241 IWorkerNodeJob* ret = s_tls.GetValue();
242 if (ret == NULL) {
243 try {
244 CFastMutexGuard guard(m_JobProcessorMutex);
245 ret = m_JobProcessorFactory->CreateInstance();
246 }
247 catch (exception& e) {
248 ERR_POST_X(9, "Could not create an instance of the "
249 "job processor class." << e.what());
250 CGridGlobals::GetInstance().RequestShutdown(
251 CNetScheduleAdmin::eShutdownImmediate);
252 throw;
253 }
254 if (ret == NULL) {
255 CGridGlobals::GetInstance().RequestShutdown(
256 CNetScheduleAdmin::eShutdownImmediate);
257 NCBI_THROW(CException, eUnknown,
258 "Could not create an instance of the job processor class.");
259 }
260 if (CGridGlobals::GetInstance().ReuseJobObject()) {
261 s_tls.SetValue(ret, s_TlsCleanup);
262 ret->AddReference();
263 }
264 }
265 return ret;
266 }
267
268 END_NCBI_SCOPE
269