1 /*  $Id: wn_main_loop.cpp 637904 2021-09-20 19:52:17Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov
27  *
28  * File Description:
29  *    NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wn_commit_thread.hpp"
35 #include "wn_cleanup.hpp"
36 #include "grid_worker_impl.hpp"
37 #include "netschedule_api_impl.hpp"
38 
39 #include <connect/services/grid_globals.hpp>
40 #include <connect/services/grid_worker_app.hpp>
41 #include <connect/services/ns_job_serializer.hpp>
42 
43 
44 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
45 
46 BEGIN_NCBI_SCOPE
47 
GetJob() const48 const CNetScheduleJob& CWorkerNodeJobContext::GetJob() const
49 {
50     return m_Impl->m_Job;
51 }
52 
GetJob()53 CNetScheduleJob& CWorkerNodeJobContext::GetJob()
54 {
55     return m_Impl->m_Job;
56 }
57 
GetJobKey() const58 const string& CWorkerNodeJobContext::GetJobKey() const
59 {
60     return m_Impl->m_Job.job_id;
61 }
62 
GetJobInput() const63 const string& CWorkerNodeJobContext::GetJobInput() const
64 {
65     return m_Impl->m_Job.input;
66 }
67 
SetJobOutput(const string & output)68 void CWorkerNodeJobContext::SetJobOutput(const string& output)
69 {
70     m_Impl->m_Job.output = output;
71 }
72 
SetJobRetCode(int ret_code)73 void CWorkerNodeJobContext::SetJobRetCode(int ret_code)
74 {
75     m_Impl->m_Job.ret_code = ret_code;
76 }
77 
GetInputBlobSize() const78 size_t CWorkerNodeJobContext::GetInputBlobSize() const
79 {
80     return m_Impl->m_InputBlobSize;
81 }
82 
GetJobOutput() const83 const string& CWorkerNodeJobContext::GetJobOutput() const
84 {
85     return m_Impl->m_Job.output;
86 }
87 
GetJobMask() const88 CNetScheduleAPI::TJobMask CWorkerNodeJobContext::GetJobMask() const
89 {
90     return m_Impl->m_Job.mask;
91 }
92 
GetJobNumber() const93 unsigned int CWorkerNodeJobContext::GetJobNumber() const
94 {
95     return m_Impl->m_JobNumber;
96 }
97 
IsJobCommitted() const98 bool CWorkerNodeJobContext::IsJobCommitted() const
99 {
100     return m_Impl->m_JobCommitStatus != eCS_NotCommitted;
101 }
102 
103 CWorkerNodeJobContext::ECommitStatus
GetCommitStatus() const104         CWorkerNodeJobContext::GetCommitStatus() const
105 {
106     return m_Impl->m_JobCommitStatus;
107 }
108 
IsJobLost() const109 bool CWorkerNodeJobContext::IsJobLost() const
110 {
111     return m_Impl->m_JobCommitStatus == eCS_JobIsLost;
112 }
113 
GetCleanupEventSource()114 IWorkerNodeCleanupEventSource* CWorkerNodeJobContext::GetCleanupEventSource()
115 {
116     return m_Impl->m_CleanupEventSource;
117 }
118 
GetWorkerNode() const119 CGridWorkerNode CWorkerNodeJobContext::GetWorkerNode() const
120 {
121     return m_Impl->m_WorkerNode;
122 }
123 
SWorkerNodeJobContextImpl(SGridWorkerNodeImpl * worker_node)124 SWorkerNodeJobContextImpl::SWorkerNodeJobContextImpl(
125         SGridWorkerNodeImpl* worker_node) :
126     m_WorkerNode(worker_node),
127     m_CleanupEventSource(
128             new CWorkerNodeJobCleanup(worker_node->m_CleanupEventSource)),
129     m_RequestContext(new CRequestContext),
130     m_StatusThrottler(1, CTimeSpan(worker_node->m_CheckStatusPeriod, 0)),
131     m_ProgressMsgThrottler(1),
132     m_NetScheduleExecutor(worker_node->m_NSExecutor),
133     m_NetCacheAPI(worker_node->m_NetCacheAPI),
134     m_JobGeneration(0),
135     m_CommitExpiration(0, 0),
136     m_Deadline(0, 0)
137 {
138 }
139 
GetQueueName() const140 const string& CWorkerNodeJobContext::GetQueueName() const
141 {
142     return m_Impl->m_WorkerNode->GetQueueName();
143 }
GetClientName() const144 const string& CWorkerNodeJobContext::GetClientName() const
145 {
146     return m_Impl->m_WorkerNode->GetClientName();
147 }
148 
GetIStream()149 CNcbiIstream& SWorkerNodeJobContextImpl::GetIStream()
150 {
151     return m_GridRead(m_NetCacheAPI, m_Job.input, &m_InputBlobSize);
152 }
153 
GetIStream()154 CNcbiIstream& CWorkerNodeJobContext::GetIStream()
155 {
156     return m_Impl->GetIStream();
157 }
158 
GetOStream()159 CNcbiOstream& SWorkerNodeJobContextImpl::GetOStream()
160 {
161     return m_GridWrite(m_NetCacheAPI, m_WorkerNode->m_QueueEmbeddedOutputSize, m_Job.output);
162 }
163 
GetOStream()164 CNcbiOstream& CWorkerNodeJobContext::GetOStream()
165 {
166     return m_Impl->GetOStream();
167 }
168 
CloseStreams()169 void CWorkerNodeJobContext::CloseStreams()
170 {
171     try {
172         m_Impl->m_ProgressMsgThrottler.Reset(1);
173         m_Impl->m_StatusThrottler.Reset(1,
174                 CTimeSpan(m_Impl->m_WorkerNode->m_CheckStatusPeriod, 0));
175 
176         m_Impl->m_GridRead.Reset();
177         m_Impl->m_GridWrite.Reset();
178     }
179     NCBI_CATCH_ALL_X(61, "Could not close IO streams");
180 }
181 
CommitJob()182 void CWorkerNodeJobContext::CommitJob()
183 {
184     m_Impl->CheckIfJobIsLost();
185     m_Impl->m_JobCommitStatus = eCS_Done;
186 }
187 
CommitJobWithFailure(const string & err_msg,bool no_retries)188 void CWorkerNodeJobContext::CommitJobWithFailure(const string& err_msg,
189         bool no_retries)
190 {
191     m_Impl->CheckIfJobIsLost();
192     m_Impl->m_JobCommitStatus = eCS_Failure;
193     m_Impl->m_DisableRetries = no_retries;
194     m_Impl->m_Job.error_msg = err_msg;
195 }
196 
ReturnJob()197 void CWorkerNodeJobContext::ReturnJob()
198 {
199     m_Impl->CheckIfJobIsLost();
200     m_Impl->m_JobCommitStatus = eCS_Return;
201 }
202 
RescheduleJob(const string & affinity,const string & group)203 void CWorkerNodeJobContext::RescheduleJob(
204         const string& affinity, const string& group)
205 {
206     m_Impl->CheckIfJobIsLost();
207     m_Impl->m_JobCommitStatus = eCS_Reschedule;
208     m_Impl->m_Job.affinity = affinity;
209     m_Impl->m_Job.group = group;
210 }
211 
PutProgressMessage(const string & msg,bool send_immediately,bool overwrite)212 void CWorkerNodeJobContext::PutProgressMessage(const string& msg,
213                                                bool send_immediately,
214                                                bool overwrite)
215 {
216     m_Impl->PutProgressMessage(msg, send_immediately, overwrite);
217 }
218 
PutProgressMessage(const string & msg,bool send_immediately,bool overwrite)219 void SWorkerNodeJobContextImpl::PutProgressMessage(const string& msg,
220                                                bool send_immediately,
221                                                bool overwrite)
222 {
223     CheckIfJobIsLost();
224     if (!send_immediately &&
225             !m_ProgressMsgThrottler.Approve(CRequestRateControl::eErrCode)) {
226         ERR_POST(Warning << "Progress message \"" <<
227                 msg << "\" has been suppressed.");
228         return;
229     }
230 
231     if (m_WorkerNode->m_ProgressLogRequested) {
232         LOG_POST(m_Job.job_id << " progress: " <<
233                 NStr::TruncateSpaces(msg, NStr::eTrunc_End));
234     }
235 
236     try {
237         if (m_Job.progress_msg.empty()) {
238             m_NetScheduleExecutor.GetProgressMsg(m_Job);
239 
240             if (m_Job.progress_msg.empty()) {
241                 m_Job.progress_msg =
242                         m_NetCacheAPI.PutData(msg.data(), msg.length());
243 
244                 m_NetScheduleExecutor.PutProgressMsg(m_Job);
245 
246                 return;
247             }
248         }
249 
250         if (overwrite) {
251             m_NetCacheAPI.PutData(m_Job.progress_msg, msg.data(), msg.length());
252         }
253     }
254     catch (exception& ex) {
255         ERR_POST_X(6, "Couldn't send a progress message: " << ex.what());
256     }
257 }
258 
JobDelayExpiration(unsigned runtime_inc)259 void CWorkerNodeJobContext::JobDelayExpiration(unsigned runtime_inc)
260 {
261     m_Impl->CheckIfJobIsLost();
262     m_Impl->JobDelayExpiration(runtime_inc);
263 }
264 
JobDelayExpiration(unsigned runtime_inc)265 void SWorkerNodeJobContextImpl::JobDelayExpiration(unsigned runtime_inc)
266 {
267     try {
268         m_NetScheduleExecutor.JobDelayExpiration(m_Job, runtime_inc);
269     }
270     catch (exception& ex) {
271         ERR_POST_X(8, "CWorkerNodeJobContext::JobDelayExpiration: " <<
272                 ex.what());
273     }
274 }
275 
IsLogRequested() const276 bool CWorkerNodeJobContext::IsLogRequested() const
277 {
278     return m_Impl->m_WorkerNode->m_LogRequested;
279 }
280 
GetShutdownLevel()281 CNetScheduleAdmin::EShutdownLevel CWorkerNodeJobContext::GetShutdownLevel()
282 {
283     return m_Impl->GetShutdownLevel();
284 }
285 
GetShutdownLevel()286 CNetScheduleAdmin::EShutdownLevel SWorkerNodeJobContextImpl::GetShutdownLevel()
287 {
288     if (m_StatusThrottler.Approve(CRequestRateControl::eErrCode))
289         try {
290             ENetScheduleQueuePauseMode pause_mode;
291             CNetScheduleAPI::EJobStatus job_status =
292                 m_NetScheduleExecutor.GetJobStatus(m_Job, NULL, &pause_mode);
293             switch (job_status) {
294             case CNetScheduleAPI::eRunning:
295                 if (pause_mode == eNSQ_WithPullback) {
296                     m_WorkerNode->SetJobPullbackTimer(
297                             m_WorkerNode->m_DefaultPullbackTimeout);
298                     LOG_POST("Pullback request from the server, "
299                             "(default) pullback timeout=" <<
300                             m_WorkerNode->m_DefaultPullbackTimeout);
301                 }
302                 /* FALL THROUGH */
303 
304             case CNetScheduleAPI::ePending:
305                 // NetSchedule will still allow to commit this job.
306                 break;
307 
308             case CNetScheduleAPI::eCanceled:
309                 LOG_POST(Warning << "Job " << m_Job.job_id << " has been canceled");
310                 MarkJobAsLost();
311                 return CNetScheduleAdmin::eShutdownImmediate;
312 
313             default:
314                 // The worker node does not "own" the job any longer.
315                 ERR_POST("Cannot proceed with job processing: job '" <<
316                         m_Job.job_id << "' changed status to '" <<
317                         CNetScheduleAPI::StatusToString(job_status) << "'.");
318                 MarkJobAsLost();
319                 return CNetScheduleAdmin::eShutdownImmediate;
320             }
321         }
322         catch(exception& ex) {
323             ERR_POST("Cannot retrieve job status for " << m_Job.job_id <<
324                     ": " << ex.what());
325         }
326 
327     if (m_WorkerNode->CheckForPullback(m_JobGeneration)) {
328         LOG_POST("Pullback timeout for " << m_Job.job_id);
329         return CNetScheduleAdmin::eShutdownImmediate;
330     }
331 
332     return CGridGlobals::GetInstance().GetShutdownLevel();
333 }
334 
CheckIfJobIsLost()335 void SWorkerNodeJobContextImpl::CheckIfJobIsLost()
336 {
337     if (m_JobCommitStatus == CWorkerNodeJobContext::eCS_JobIsLost) {
338         NCBI_THROW_FMT(CGridWorkerNodeException, eJobIsLost,
339             "Job " << m_Job.job_id << " has been canceled");
340     }
341 }
342 
ResetJobContext()343 void SWorkerNodeJobContextImpl::ResetJobContext()
344 {
345     m_JobNumber = CGridGlobals::GetInstance().GetNewJobNumber();
346 
347     m_JobCommitStatus = CWorkerNodeJobContext::eCS_NotCommitted;
348     m_DisableRetries = false;
349     m_InputBlobSize = 0;
350     m_ExclusiveJob =
351             (m_Job.mask & CNetScheduleAPI::eExclusiveJob) != 0;
352 
353     m_RequestContext->Reset();
354     m_JobGeneration = m_WorkerNode->m_CurrentJobGeneration;
355 }
356 
RequestExclusiveMode()357 void CWorkerNodeJobContext::RequestExclusiveMode()
358 {
359     if (!m_Impl->m_ExclusiveJob) {
360         if (!m_Impl->m_WorkerNode->EnterExclusiveMode()) {
361             NCBI_THROW(CGridWorkerNodeException,
362                 eExclusiveModeIsAlreadySet, "");
363         }
364         m_Impl->m_ExclusiveJob = true;
365     }
366 }
367 
GetCommitStatusDescription(CWorkerNodeJobContext::ECommitStatus commit_status)368 const char* CWorkerNodeJobContext::GetCommitStatusDescription(
369         CWorkerNodeJobContext::ECommitStatus commit_status)
370 {
371     switch (commit_status) {
372     case eCS_Done:
373         return "done";
374     case eCS_Failure:
375         return "failed";
376     case eCS_Return:
377         return "returned";
378     case eCS_Reschedule:
379         return "rescheduled";
380     case eCS_JobIsLost:
381         return "lost";
382     default:
383         return "not committed";
384     }
385 }
386 
x_PrintRequestStop()387 void SWorkerNodeJobContextImpl::x_PrintRequestStop()
388 {
389     m_RequestContext->SetAppState(eDiagAppState_RequestEnd);
390 
391     if (!m_RequestContext->IsSetRequestStatus())
392         m_RequestContext->SetRequestStatus(
393             m_JobCommitStatus == CWorkerNodeJobContext::eCS_Done &&
394                 m_Job.ret_code == 0 ? 200 : 500);
395 
396     if (m_RequestContext->GetAppState() == eDiagAppState_Request)
397         m_RequestContext->SetAppState(eDiagAppState_RequestEnd);
398 
399     if (g_IsRequestStopEventEnabled())
400         GetDiagContext().PrintRequestStop();
401 }
402 
x_RunJob()403 void SWorkerNodeJobContextImpl::x_RunJob()
404 {
405     CWorkerNodeJobContext this_job_context(this);
406 
407     m_RequestContext->SetRequestID((int) this_job_context.GetJobNumber());
408 
409     if (!m_Job.client_ip.empty())
410         m_RequestContext->SetClientIP(m_Job.client_ip);
411 
412     if (!m_Job.session_id.empty())
413         m_RequestContext->SetSessionID(m_Job.session_id);
414 
415     if (!m_Job.page_hit_id.empty())
416         m_RequestContext->SetHitID(m_Job.page_hit_id);
417 
418     m_RequestContext->SetAppState(eDiagAppState_RequestBegin);
419 
420     CRequestContextSwitcher request_state_guard(m_RequestContext);
421 
422     if (g_IsRequestStartEventEnabled()) {
423         auto extra = GetDiagContext().PrintRequestStart();
424         extra.Print("jid", m_Job.job_id);
425 
426         CNetScheduleKey key;
427         if (key.ParseJobKey(m_Job.job_id)) extra.Print("_queue", key.queue);
428     }
429 
430     m_RequestContext->SetAppState(eDiagAppState_Request);
431 
432     CJobRunRegistration client_ip_registration, session_id_registration;
433 
434     if (!m_Job.client_ip.empty() &&
435             !m_WorkerNode->m_JobsPerClientIP.CountJob(m_Job.client_ip,
436                     &client_ip_registration)) {
437         ERR_POST("Too many jobs with client IP \"" <<
438                  m_Job.client_ip << "\"; job " <<
439                  m_Job.job_id << " will be returned.");
440         m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
441     } else if (!m_Job.session_id.empty() &&
442             !m_WorkerNode->m_JobsPerSessionID.CountJob(m_Job.session_id,
443                     &session_id_registration)) {
444         ERR_POST("Too many jobs with session ID \"" <<
445                  m_Job.session_id << "\"; job " <<
446                  m_Job.job_id << " will be returned.");
447         m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
448     } else {
449         m_WorkerNode->x_NotifyJobWatchers(this_job_context,
450                 IWorkerNodeJobWatcher::eJobStarted);
451 
452         try {
453             m_Job.ret_code =
454                     m_WorkerNode->GetJobProcessor()->Do(this_job_context);
455         }
456         catch (CGridWorkerNodeException& ex) {
457             switch (ex.GetErrCode()) {
458             case CGridWorkerNodeException::eJobIsLost:
459                 break;
460 
461             case CGridWorkerNodeException::eExclusiveModeIsAlreadySet:
462                 if (m_WorkerNode->m_LogRequested) {
463                     LOG_POST_X(21, "Job " << m_Job.job_id <<
464                         " will be returned back to the queue "
465                         "because it requested exclusive mode while "
466                         "another exclusive job is already running.");
467                 }
468                 if (m_JobCommitStatus ==
469                         CWorkerNodeJobContext::eCS_NotCommitted)
470                     m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
471                 break;
472 
473             default:
474                 ERR_POST_X(62, ex);
475                 if (m_JobCommitStatus ==
476                         CWorkerNodeJobContext::eCS_NotCommitted)
477                     m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
478             }
479         }
480         catch (CNetScheduleException& e) {
481             ERR_POST_X(20, "job " << m_Job.job_id << " failed: " << e);
482             if (e.GetErrCode() == CNetScheduleException::eJobNotFound) {
483                 ERR_POST("Cannot proceed with job processing: job '" <<
484                         m_Job.job_id << "' has expired.");
485                 MarkJobAsLost();
486             } else if (m_JobCommitStatus ==
487                     CWorkerNodeJobContext::eCS_NotCommitted) {
488                 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Failure;
489                 m_Job.error_msg = e.what();
490             }
491         }
492         catch (exception& e) {
493             ERR_POST_X(18, "job " << m_Job.job_id << " failed: " << e.what());
494             if (m_JobCommitStatus == CWorkerNodeJobContext::eCS_NotCommitted) {
495                 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Failure;
496                 m_Job.error_msg = e.what();
497             }
498         }
499 
500         this_job_context.CloseStreams();
501 
502         switch (m_JobCommitStatus) {
503         case CWorkerNodeJobContext::eCS_Done:
504             m_WorkerNode->x_NotifyJobWatchers(this_job_context,
505                     IWorkerNodeJobWatcher::eJobSucceeded);
506             break;
507 
508         case CWorkerNodeJobContext::eCS_NotCommitted:
509             if (TWorkerNode_AllowImplicitJobReturn::GetDefault() ||
510                     this_job_context.GetShutdownLevel() !=
511                             CNetScheduleAdmin::eNoShutdown) {
512                 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
513                 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
514                         IWorkerNodeJobWatcher::eJobReturned);
515                 break;
516             }
517 
518             m_JobCommitStatus = CWorkerNodeJobContext::eCS_Failure;
519             m_Job.error_msg = "Job was not explicitly committed";
520             /* FALL THROUGH */
521 
522         case CWorkerNodeJobContext::eCS_Failure:
523             m_WorkerNode->x_NotifyJobWatchers(this_job_context,
524                     IWorkerNodeJobWatcher::eJobFailed);
525             break;
526 
527         case CWorkerNodeJobContext::eCS_Return:
528             m_WorkerNode->x_NotifyJobWatchers(this_job_context,
529                     IWorkerNodeJobWatcher::eJobReturned);
530             break;
531 
532         case CWorkerNodeJobContext::eCS_Reschedule:
533             m_WorkerNode->x_NotifyJobWatchers(this_job_context,
534                     IWorkerNodeJobWatcher::eJobRescheduled);
535             break;
536 
537         default: // eCanceled - no action needed.
538             // This object will be recycled in x_CommitJob().
539             break;
540         }
541 
542         m_WorkerNode->x_NotifyJobWatchers(this_job_context,
543                 IWorkerNodeJobWatcher::eJobStopped);
544     }
545 
546     if (m_WorkerNode->IsExclusiveMode() && m_ExclusiveJob)
547         m_WorkerNode->LeaveExclusiveMode();
548 
549     if (!CGridGlobals::GetInstance().IsShuttingDown())
550         m_CleanupEventSource->CallEventHandlers();
551 
552     m_WorkerNode->m_JobCommitterThread->RecycleJobContextAndCommitJob(this,
553             request_state_guard);
554 }
555 
Main()556 void* CMainLoopThread::Main()
557 {
558     const auto kRetryDelay = static_cast<unsigned long>(TServConn_RetryDelay::GetDefault() * kMilliSecondsPerSecond);
559     SetCurrentThreadName(m_ThreadName);
560     CDeadline max_wait_for_servers(
561             TWorkerNode_MaxWaitForServers::GetDefault());
562 
563     CWorkerNodeJobContext job_context(
564             m_WorkerNode->m_JobCommitterThread->AllocJobContext());
565 
566     const auto total_time_limit = m_WorkerNode->m_TotalTimeLimit;
567     CDeadline deadline(total_time_limit ? CDeadline(total_time_limit) : CDeadline::eInfinite);
568     CRequestContextSwitcher no_op;
569     unsigned try_count = 0;
570     while (!CGridGlobals::GetInstance().IsShuttingDown()) {
571         try {
572             try {
573                 m_WorkerNode->m_ThreadPool->WaitForRoom(
574                         m_WorkerNode->m_ThreadPoolTimeout);
575             }
576             catch (CBlockingQueueException&) {
577                 // threaded pool is busy
578                 continue;
579             }
580 
581             if (x_GetNextJob(job_context->m_Job, deadline)) {
582                 job_context->ResetJobContext();
583 
584                 try {
585                     m_WorkerNode->m_ThreadPool->AcceptRequest(CRef<CStdRequest>(
586                             new CWorkerNodeRequest(job_context)));
587                 }
588                 catch (CBlockingQueueException& ex) {
589                     ERR_POST_X(28, ex);
590                     // that must not happen after CBlockingQueue is fixed
591                     _ASSERT(0);
592                     job_context->m_JobCommitStatus =
593                             CWorkerNodeJobContext::eCS_Return;
594                     m_WorkerNode->m_JobCommitterThread->
595                             RecycleJobContextAndCommitJob(job_context, no_op);
596                 }
597                 job_context =
598                         m_WorkerNode->m_JobCommitterThread->AllocJobContext();
599 
600             } else if (deadline.IsExpired()) {
601                 LOG_POST("The total runtime limit (" << total_time_limit << " seconds) has been reached");
602                 const auto kExitCode = 100; // See also one in grid_globals.cpp
603                 CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eNormalShutdown, kExitCode);
604                 break;
605 
606             }
607             max_wait_for_servers =
608                 CDeadline(TWorkerNode_MaxWaitForServers::GetDefault());
609         }
610         catch (CNetSrvConnException& e) {
611             SleepMilliSec(kRetryDelay);
612             if (e.GetErrCode() == CNetSrvConnException::eConnectionFailure &&
613                     !max_wait_for_servers.GetRemainingTime().IsZero())
614                 continue;
615             ERR_POST(Critical << "Could not connect to the "
616                     "configured servers, exiting...");
617             CGridGlobals::GetInstance().RequestShutdown(
618                     CNetScheduleAdmin::eShutdownImmediate);
619         }
620         catch (CNetServiceException& ex) {
621             ERR_POST_X(40, ex);
622             if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
623                 CGridGlobals::GetInstance().RequestShutdown(
624                     CNetScheduleAdmin::eShutdownImmediate);
625             } else {
626                 SleepMilliSec(kRetryDelay);
627                 continue;
628             }
629         }
630         catch (exception& ex) {
631             ERR_POST_X(29, ex.what());
632             if (TWorkerNode_StopOnJobErrors::GetDefault()) {
633                 CGridGlobals::GetInstance().RequestShutdown(
634                     CNetScheduleAdmin::eShutdownImmediate);
635             }
636         }
637         try_count = 0;
638     }
639 
640     return NULL;
641 }
642 
643 
CheckState()644 CNetScheduleGetJob::EState CMainLoopThread::CImpl::CheckState()
645 {
646     EState ret = eWorking;
647 
648     while (!CGridGlobals::GetInstance().IsShuttingDown()) {
649         void* event;
650 
651         while ((event = SwapPointers(&m_WorkerNode->m_SuspendResumeEvent,
652                 NO_EVENT)) != NO_EVENT) {
653             if (event == SUSPEND_EVENT) {
654                 if (!m_WorkerNode->m_TimelineIsSuspended) {
655                     // Stop the timeline.
656                     m_WorkerNode->m_TimelineIsSuspended = true;
657                     ret = eRestarted;
658                 }
659             } else { /* event == RESUME_EVENT */
660                 if (m_WorkerNode->m_TimelineIsSuspended) {
661                     // Resume the timeline.
662                     m_WorkerNode->m_TimelineIsSuspended = false;
663                 }
664             }
665         }
666 
667         if (!m_WorkerNode->m_TimelineIsSuspended) {
668             return ret;
669         }
670 
671         m_WorkerNode->m_NSExecutor->
672             m_NotificationHandler.WaitForNotification(m_Timeout);
673     }
674 
675     return eStopped;
676 }
677 
ReadNotifications()678 CNetServer CMainLoopThread::CImpl::ReadNotifications()
679 {
680     if (m_WorkerNode->m_NSExecutor->
681             m_NotificationHandler.ReceiveNotification())
682         return x_ProcessRequestJobNotification();
683 
684     return CNetServer();
685 }
686 
WaitForNotifications(const CDeadline & deadline)687 CNetServer CMainLoopThread::CImpl::WaitForNotifications(const CDeadline& deadline)
688 {
689      if (m_WorkerNode->m_NSExecutor->
690              m_NotificationHandler.WaitForNotification(deadline)) {
691         return x_ProcessRequestJobNotification();
692      }
693 
694      return CNetServer();
695 }
696 
x_ProcessRequestJobNotification()697 CNetServer CMainLoopThread::CImpl::x_ProcessRequestJobNotification()
698 {
699     CNetServer server;
700 
701     // No need to check state here, it will be checked before entry processing
702     m_WorkerNode->m_NSExecutor->
703         m_NotificationHandler.CheckRequestJobNotification(
704                 m_WorkerNode->m_NSExecutor, &server);
705 
706     return server;
707 }
708 
MoreJobs(const SEntry &)709 bool CMainLoopThread::CImpl::MoreJobs(const SEntry& /*entry*/)
710 {
711     return true;
712 }
713 
CheckEntry(SEntry & entry,const string & prio_aff_list,bool any_affinity,CNetScheduleJob & job,CNetScheduleAPI::EJobStatus *)714 bool CMainLoopThread::CImpl::CheckEntry(
715         SEntry& entry,
716         const string& prio_aff_list,
717         bool any_affinity,
718         CNetScheduleJob& job,
719         CNetScheduleAPI::EJobStatus* /*job_status*/)
720 {
721     CNetServer server(m_API.GetService()->GetServer(entry.server_address));
722     return m_WorkerNode->m_NSExecutor->x_GetJobWithAffinityLadder(server,
723             m_Timeout, prio_aff_list, any_affinity, job);
724 }
725 
ReturnJob(CNetScheduleJob & job)726 void CMainLoopThread::CImpl::ReturnJob(CNetScheduleJob& job)
727 {
728     m_WorkerNode->m_NSExecutor->ReturnJob(job, false);
729 }
730 
x_GetNextJob(CNetScheduleJob & job,const CDeadline & deadline)731 bool CMainLoopThread::x_GetNextJob(CNetScheduleJob& job, const CDeadline& deadline)
732 {
733     if (!m_WorkerNode->x_AreMastersBusy()) {
734         SleepSec(m_WorkerNode->m_NSTimeout);
735         return false;
736     }
737 
738     if (!m_WorkerNode->WaitForExclusiveJobToFinish())
739         return false;
740 
741     const bool any_affinity = m_Impl.m_API->m_AffinityLadder.empty();
742 
743     if (m_Timeline.GetJob(deadline, job, NULL, any_affinity) != CNetScheduleGetJob::eJob) {
744         return false;
745     }
746 
747     // Already executing this job, so do nothing
748     // (and rely on that execution to report its result later)
749     if (!m_WorkerNode->m_JobsInProgress.Add(job)) {
750         ERR_POST(Warning << "Got already processing job " << job.job_id);
751         return false;
752     }
753 
754     if (job.mask & CNetScheduleAPI::eExclusiveJob) {
755         if (!m_WorkerNode->EnterExclusiveMode()) {
756             m_WorkerNode->m_NSExecutor.ReturnJob(job);
757             return false;
758         }
759     }
760 
761     // No need to check for idleness here, running jobs won't be stopped anyway
762     if (CGridGlobals::GetInstance().IsShuttingDown()) {
763         m_WorkerNode->m_NSExecutor.ReturnJob(job);
764         return false;
765     }
766 
767     return true;
768 }
769 
GetServerOutputSize()770 size_t CGridWorkerNode::GetServerOutputSize()
771 {
772     return m_Impl->m_QueueEmbeddedOutputSize;
773 }
774 
775 END_NCBI_SCOPE
776