1 /*  $Id: wn_commit_thread.cpp 622435 2020-12-23 17:57:37Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *    NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wn_commit_thread.hpp"
35 #include "grid_worker_impl.hpp"
36 
37 #include <connect/services/grid_globals.hpp>
38 #include <connect/services/grid_globals.hpp>
39 #include <connect/services/error_codes.hpp>
40 
41 #include <corelib/ncbiexpt.hpp>
42 #include <corelib/ncbi_system.hpp>
43 
44 
45 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
46 
47 
48 BEGIN_NCBI_SCOPE
49 
50 /////////////////////////////////////////////////////////////////////////////
51 //
52 ///@internal
s_TlsCleanup(IWorkerNodeJob * p_value,void *)53 static void s_TlsCleanup(IWorkerNodeJob* p_value, void* /* data */ )
54 {
55     if (p_value != NULL)
56         p_value->RemoveReference();
57 }
58 /// @internal
59 static CStaticTls<IWorkerNodeJob> s_tls;
60 
CJobCommitterThread(SGridWorkerNodeImpl * worker_node)61 CJobCommitterThread::CJobCommitterThread(SGridWorkerNodeImpl* worker_node) :
62     m_WorkerNode(worker_node),
63     m_Semaphore(0, 1),
64     m_ThreadName(worker_node->GetAppName() + "_cm")
65 {
66 }
67 
AllocJobContext()68 CWorkerNodeJobContext CJobCommitterThread::AllocJobContext()
69 {
70     TFastMutexGuard mutex_lock(m_TimelineMutex);
71 
72     if (m_JobContextPool.empty())
73         return new SWorkerNodeJobContextImpl(m_WorkerNode);
74 
75     CWorkerNodeJobContext job_context(m_JobContextPool.front());
76     m_JobContextPool.pop_front();
77 
78     job_context->m_Job.Reset();
79     job_context->m_JobGeneration = m_WorkerNode->m_CurrentJobGeneration;
80 
81     return job_context;
82 }
83 
RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl * job_context,CRequestContextSwitcher & rctx_switcher)84 void CJobCommitterThread::RecycleJobContextAndCommitJob(
85         SWorkerNodeJobContextImpl* job_context,
86         CRequestContextSwitcher& rctx_switcher)
87 {
88     job_context->m_FirstCommitAttempt = true;
89 
90     TFastMutexGuard mutex_lock(m_TimelineMutex);
91 
92     // Must be called prior to adding the job context to
93     // m_ImmediateActions: when empty, m_ImmediateActions
94     // indicates that the committer thread is waiting and
95     // the semaphore must be incremented.
96     WakeUp();
97 
98     m_ImmediateActions.push_back(TEntry(job_context));
99 
100     // We must do it here, before m_TimelineMutex is unlocked
101     rctx_switcher.Release();
102 }
103 
Stop()104 void CJobCommitterThread::Stop()
105 {
106     TFastMutexGuard mutex_lock(m_TimelineMutex);
107 
108     m_IsShuttingDown = true;
109     WakeUp();
110 }
111 
WaitForTimeout()112 bool CJobCommitterThread::WaitForTimeout()
113 {
114     CNanoTimeout timeout = m_Timeline.front()->GetTimeout().GetRemainingTime();
115 
116     if (timeout.IsZero()) {
117         return true;
118     }
119 
120     TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
121     return !m_Semaphore.TryWait(timeout);
122 }
123 
Main()124 void* CJobCommitterThread::Main()
125 {
126     SetCurrentThreadName(m_ThreadName);
127     TFastMutexGuard mutex_lock(m_TimelineMutex);
128 
129     do {
130         if (m_Timeline.empty()) {
131             TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
132 
133             m_Semaphore.Wait();
134         } else if (WaitForTimeout()) {
135             m_ImmediateActions.push_back(m_Timeline.front());
136             m_Timeline.pop_front();
137         }
138 
139         while (!m_ImmediateActions.empty()) {
140             TEntry& entry = m_ImmediateActions.front();
141 
142             // Do not remove the job context from m_ImmediateActions
143             // prior to calling x_CommitJob() to avoid race conditions
144             // (otherwise, the semaphore can be Post()'ed multiple times
145             // by the worker threads while this thread is in x_CommitJob()).
146             if (x_CommitJob(entry)) {
147                 m_JobContextPool.push_back(entry);
148             } else {
149                 m_Timeline.push_back(entry);
150             }
151 
152             m_ImmediateActions.pop_front();
153         }
154 
155     // Cannot use CGridGlobals::GetInstance().IsShuttingDown()) below,
156     // this thread could exit before committing all pending jobs otherwise.
157     } while (!m_IsShuttingDown);
158 
159     return NULL;
160 }
161 
x_CommitJob(SWorkerNodeJobContextImpl * job_context)162 bool CJobCommitterThread::x_CommitJob(SWorkerNodeJobContextImpl* job_context)
163 {
164     TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
165 
166     CRequestContextSwitcher request_state_guard(job_context->m_RequestContext);
167 
168     bool recycle_job_context = false;
169     m_WorkerNode->m_JobsInProgress.Update(job_context->m_Job);
170 
171     try {
172         switch (job_context->m_JobCommitStatus) {
173         case CWorkerNodeJobContext::eCS_Done:
174             m_WorkerNode->m_NSExecutor.PutResult(job_context->m_Job);
175             break;
176 
177         case CWorkerNodeJobContext::eCS_Failure:
178             m_WorkerNode->m_NSExecutor.PutFailure(job_context->m_Job,
179                     job_context->m_DisableRetries);
180             break;
181 
182         default: /* eCS_NotCommitted */
183             // In the unlikely event of eCS_NotCommitted, return the job.
184             /* FALL THROUGH */
185 
186         case CWorkerNodeJobContext::eCS_Return:
187             m_WorkerNode->m_NSExecutor.ReturnJob(job_context->m_Job);
188             break;
189 
190         case CWorkerNodeJobContext::eCS_Reschedule:
191             m_WorkerNode->m_NSExecutor.Reschedule(job_context->m_Job);
192             break;
193 
194         case CWorkerNodeJobContext::eCS_JobIsLost:
195             // Job is cancelled or otherwise taken away from the worker
196             // node. Whatever the cause is, it has been reported already.
197             break;
198         }
199 
200         recycle_job_context = true;
201     }
202     catch (CNetScheduleException& e) {
203         if ((e.GetErrCode() == CNetScheduleException::eInvalidJobStatus) &&
204                 e.GetMsg().find("job is in Canceled state") != string::npos) {
205             LOG_POST(Warning << "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
206         } else {
207             ERR_POST_X(65, "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
208         }
209 
210         recycle_job_context = true;
211     }
212     catch (exception& e) {
213         unsigned commit_interval = m_WorkerNode->m_CommitJobInterval;
214         job_context->ResetTimeout(commit_interval);
215         if (job_context->m_FirstCommitAttempt) {
216             job_context->m_FirstCommitAttempt = false;
217             job_context->m_CommitExpiration =
218                     CDeadline(m_WorkerNode->m_QueueTimeout, 0);
219         } else if (job_context->m_CommitExpiration <
220                 job_context->GetTimeout()) {
221             ERR_POST_X(64, "Could not commit " <<
222                     job_context->m_Job.job_id << ": " << e.what());
223             recycle_job_context = true;
224         }
225         if (!recycle_job_context) {
226             ERR_POST_X(63, "Error while committing " <<
227                     job_context->m_Job.job_id << ": " << e.what() <<
228                     "; will retry in " << commit_interval << " seconds.");
229         }
230     }
231 
232     if (recycle_job_context) {
233         m_WorkerNode->m_JobsInProgress.Remove(job_context->m_Job);
234         job_context->x_PrintRequestStop();
235     }
236 
237     return recycle_job_context;
238 }
239 
240 /// @internal
GetJobProcessor()241 IWorkerNodeJob* SGridWorkerNodeImpl::GetJobProcessor()
242 {
243     IWorkerNodeJob* ret = s_tls.GetValue();
244     if (ret == NULL) {
245         try {
246             CFastMutexGuard guard(m_JobProcessorMutex);
247             ret = m_JobProcessorFactory->CreateInstance();
248         }
249         catch (exception& e) {
250             ERR_POST_X(9, "Could not create an instance of the "
251                     "job processor class." << e.what());
252             CGridGlobals::GetInstance().RequestShutdown(
253                     CNetScheduleAdmin::eShutdownImmediate);
254             throw;
255         }
256         if (ret == NULL) {
257             CGridGlobals::GetInstance().RequestShutdown(
258                     CNetScheduleAdmin::eShutdownImmediate);
259             NCBI_THROW(CException, eUnknown,
260                     "Could not create an instance of the job processor class.");
261         }
262         if (CGridGlobals::GetInstance().ReuseJobObject()) {
263             s_tls.SetValue(ret, s_TlsCleanup);
264             ret->AddReference();
265         }
266     }
267     return ret;
268 }
269 
270 END_NCBI_SCOPE
271