1 /* $Id: wn_main_loop.cpp 637904 2021-09-20 19:52:17Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov
27 *
28 * File Description:
29 * NetSchedule Worker Node implementation
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "wn_commit_thread.hpp"
35 #include "wn_cleanup.hpp"
36 #include "grid_worker_impl.hpp"
37 #include "netschedule_api_impl.hpp"
38
39 #include <connect/services/grid_globals.hpp>
40 #include <connect/services/grid_worker_app.hpp>
41 #include <connect/services/ns_job_serializer.hpp>
42
43
44 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
45
46 BEGIN_NCBI_SCOPE
47
GetJob() const48 const CNetScheduleJob& CWorkerNodeJobContext::GetJob() const
49 {
50 return m_Impl->m_Job;
51 }
52
GetJob()53 CNetScheduleJob& CWorkerNodeJobContext::GetJob()
54 {
55 return m_Impl->m_Job;
56 }
57
GetJobKey() const58 const string& CWorkerNodeJobContext::GetJobKey() const
59 {
60 return m_Impl->m_Job.job_id;
61 }
62
GetJobInput() const63 const string& CWorkerNodeJobContext::GetJobInput() const
64 {
65 return m_Impl->m_Job.input;
66 }
67
SetJobOutput(const string & output)68 void CWorkerNodeJobContext::SetJobOutput(const string& output)
69 {
70 m_Impl->m_Job.output = output;
71 }
72
SetJobRetCode(int ret_code)73 void CWorkerNodeJobContext::SetJobRetCode(int ret_code)
74 {
75 m_Impl->m_Job.ret_code = ret_code;
76 }
77
GetInputBlobSize() const78 size_t CWorkerNodeJobContext::GetInputBlobSize() const
79 {
80 return m_Impl->m_InputBlobSize;
81 }
82
GetJobOutput() const83 const string& CWorkerNodeJobContext::GetJobOutput() const
84 {
85 return m_Impl->m_Job.output;
86 }
87
GetJobMask() const88 CNetScheduleAPI::TJobMask CWorkerNodeJobContext::GetJobMask() const
89 {
90 return m_Impl->m_Job.mask;
91 }
92
GetJobNumber() const93 unsigned int CWorkerNodeJobContext::GetJobNumber() const
94 {
95 return m_Impl->m_JobNumber;
96 }
97
IsJobCommitted() const98 bool CWorkerNodeJobContext::IsJobCommitted() const
99 {
100 return m_Impl->m_JobCommitStatus != eCS_NotCommitted;
101 }
102
103 CWorkerNodeJobContext::ECommitStatus
GetCommitStatus() const104 CWorkerNodeJobContext::GetCommitStatus() const
105 {
106 return m_Impl->m_JobCommitStatus;
107 }
108
IsJobLost() const109 bool CWorkerNodeJobContext::IsJobLost() const
110 {
111 return m_Impl->m_JobCommitStatus == eCS_JobIsLost;
112 }
113
GetCleanupEventSource()114 IWorkerNodeCleanupEventSource* CWorkerNodeJobContext::GetCleanupEventSource()
115 {
116 return m_Impl->m_CleanupEventSource;
117 }
118
GetWorkerNode() const119 CGridWorkerNode CWorkerNodeJobContext::GetWorkerNode() const
120 {
121 return m_Impl->m_WorkerNode;
122 }
123
SWorkerNodeJobContextImpl(SGridWorkerNodeImpl * worker_node)124 SWorkerNodeJobContextImpl::SWorkerNodeJobContextImpl(
125 SGridWorkerNodeImpl* worker_node) :
126 m_WorkerNode(worker_node),
127 m_CleanupEventSource(
128 new CWorkerNodeJobCleanup(worker_node->m_CleanupEventSource)),
129 m_RequestContext(new CRequestContext),
130 m_StatusThrottler(1, CTimeSpan(worker_node->m_CheckStatusPeriod, 0)),
131 m_ProgressMsgThrottler(1),
132 m_NetScheduleExecutor(worker_node->m_NSExecutor),
133 m_NetCacheAPI(worker_node->m_NetCacheAPI),
134 m_JobGeneration(0),
135 m_CommitExpiration(0, 0),
136 m_Deadline(0, 0)
137 {
138 }
139
GetQueueName() const140 const string& CWorkerNodeJobContext::GetQueueName() const
141 {
142 return m_Impl->m_WorkerNode->GetQueueName();
143 }
GetClientName() const144 const string& CWorkerNodeJobContext::GetClientName() const
145 {
146 return m_Impl->m_WorkerNode->GetClientName();
147 }
148
GetIStream()149 CNcbiIstream& SWorkerNodeJobContextImpl::GetIStream()
150 {
151 return m_GridRead(m_NetCacheAPI, m_Job.input, &m_InputBlobSize);
152 }
153
GetIStream()154 CNcbiIstream& CWorkerNodeJobContext::GetIStream()
155 {
156 return m_Impl->GetIStream();
157 }
158
GetOStream()159 CNcbiOstream& SWorkerNodeJobContextImpl::GetOStream()
160 {
161 return m_GridWrite(m_NetCacheAPI, m_WorkerNode->m_QueueEmbeddedOutputSize, m_Job.output);
162 }
163
GetOStream()164 CNcbiOstream& CWorkerNodeJobContext::GetOStream()
165 {
166 return m_Impl->GetOStream();
167 }
168
CloseStreams()169 void CWorkerNodeJobContext::CloseStreams()
170 {
171 try {
172 m_Impl->m_ProgressMsgThrottler.Reset(1);
173 m_Impl->m_StatusThrottler.Reset(1,
174 CTimeSpan(m_Impl->m_WorkerNode->m_CheckStatusPeriod, 0));
175
176 m_Impl->m_GridRead.Reset();
177 m_Impl->m_GridWrite.Reset();
178 }
179 NCBI_CATCH_ALL_X(61, "Could not close IO streams");
180 }
181
CommitJob()182 void CWorkerNodeJobContext::CommitJob()
183 {
184 m_Impl->CheckIfJobIsLost();
185 m_Impl->m_JobCommitStatus = eCS_Done;
186 }
187
CommitJobWithFailure(const string & err_msg,bool no_retries)188 void CWorkerNodeJobContext::CommitJobWithFailure(const string& err_msg,
189 bool no_retries)
190 {
191 m_Impl->CheckIfJobIsLost();
192 m_Impl->m_JobCommitStatus = eCS_Failure;
193 m_Impl->m_DisableRetries = no_retries;
194 m_Impl->m_Job.error_msg = err_msg;
195 }
196
ReturnJob()197 void CWorkerNodeJobContext::ReturnJob()
198 {
199 m_Impl->CheckIfJobIsLost();
200 m_Impl->m_JobCommitStatus = eCS_Return;
201 }
202
RescheduleJob(const string & affinity,const string & group)203 void CWorkerNodeJobContext::RescheduleJob(
204 const string& affinity, const string& group)
205 {
206 m_Impl->CheckIfJobIsLost();
207 m_Impl->m_JobCommitStatus = eCS_Reschedule;
208 m_Impl->m_Job.affinity = affinity;
209 m_Impl->m_Job.group = group;
210 }
211
PutProgressMessage(const string & msg,bool send_immediately,bool overwrite)212 void CWorkerNodeJobContext::PutProgressMessage(const string& msg,
213 bool send_immediately,
214 bool overwrite)
215 {
216 m_Impl->PutProgressMessage(msg, send_immediately, overwrite);
217 }
218
PutProgressMessage(const string & msg,bool send_immediately,bool overwrite)219 void SWorkerNodeJobContextImpl::PutProgressMessage(const string& msg,
220 bool send_immediately,
221 bool overwrite)
222 {
223 CheckIfJobIsLost();
224 if (!send_immediately &&
225 !m_ProgressMsgThrottler.Approve(CRequestRateControl::eErrCode)) {
226 ERR_POST(Warning << "Progress message \"" <<
227 msg << "\" has been suppressed.");
228 return;
229 }
230
231 if (m_WorkerNode->m_ProgressLogRequested) {
232 LOG_POST(m_Job.job_id << " progress: " <<
233 NStr::TruncateSpaces(msg, NStr::eTrunc_End));
234 }
235
236 try {
237 if (m_Job.progress_msg.empty()) {
238 m_NetScheduleExecutor.GetProgressMsg(m_Job);
239
240 if (m_Job.progress_msg.empty()) {
241 m_Job.progress_msg =
242 m_NetCacheAPI.PutData(msg.data(), msg.length());
243
244 m_NetScheduleExecutor.PutProgressMsg(m_Job);
245
246 return;
247 }
248 }
249
250 if (overwrite) {
251 m_NetCacheAPI.PutData(m_Job.progress_msg, msg.data(), msg.length());
252 }
253 }
254 catch (exception& ex) {
255 ERR_POST_X(6, "Couldn't send a progress message: " << ex.what());
256 }
257 }
258
JobDelayExpiration(unsigned runtime_inc)259 void CWorkerNodeJobContext::JobDelayExpiration(unsigned runtime_inc)
260 {
261 m_Impl->CheckIfJobIsLost();
262 m_Impl->JobDelayExpiration(runtime_inc);
263 }
264
JobDelayExpiration(unsigned runtime_inc)265 void SWorkerNodeJobContextImpl::JobDelayExpiration(unsigned runtime_inc)
266 {
267 try {
268 m_NetScheduleExecutor.JobDelayExpiration(m_Job, runtime_inc);
269 }
270 catch (exception& ex) {
271 ERR_POST_X(8, "CWorkerNodeJobContext::JobDelayExpiration: " <<
272 ex.what());
273 }
274 }
275
IsLogRequested() const276 bool CWorkerNodeJobContext::IsLogRequested() const
277 {
278 return m_Impl->m_WorkerNode->m_LogRequested;
279 }
280
GetShutdownLevel()281 CNetScheduleAdmin::EShutdownLevel CWorkerNodeJobContext::GetShutdownLevel()
282 {
283 return m_Impl->GetShutdownLevel();
284 }
285
GetShutdownLevel()286 CNetScheduleAdmin::EShutdownLevel SWorkerNodeJobContextImpl::GetShutdownLevel()
287 {
288 if (m_StatusThrottler.Approve(CRequestRateControl::eErrCode))
289 try {
290 ENetScheduleQueuePauseMode pause_mode;
291 CNetScheduleAPI::EJobStatus job_status =
292 m_NetScheduleExecutor.GetJobStatus(m_Job, NULL, &pause_mode);
293 switch (job_status) {
294 case CNetScheduleAPI::eRunning:
295 if (pause_mode == eNSQ_WithPullback) {
296 m_WorkerNode->SetJobPullbackTimer(
297 m_WorkerNode->m_DefaultPullbackTimeout);
298 LOG_POST("Pullback request from the server, "
299 "(default) pullback timeout=" <<
300 m_WorkerNode->m_DefaultPullbackTimeout);
301 }
302 /* FALL THROUGH */
303
304 case CNetScheduleAPI::ePending:
305 // NetSchedule will still allow to commit this job.
306 break;
307
308 case CNetScheduleAPI::eCanceled:
309 LOG_POST(Warning << "Job " << m_Job.job_id << " has been canceled");
310 MarkJobAsLost();
311 return CNetScheduleAdmin::eShutdownImmediate;
312
313 default:
314 // The worker node does not "own" the job any longer.
315 ERR_POST("Cannot proceed with job processing: job '" <<
316 m_Job.job_id << "' changed status to '" <<
317 CNetScheduleAPI::StatusToString(job_status) << "'.");
318 MarkJobAsLost();
319 return CNetScheduleAdmin::eShutdownImmediate;
320 }
321 }
322 catch(exception& ex) {
323 ERR_POST("Cannot retrieve job status for " << m_Job.job_id <<
324 ": " << ex.what());
325 }
326
327 if (m_WorkerNode->CheckForPullback(m_JobGeneration)) {
328 LOG_POST("Pullback timeout for " << m_Job.job_id);
329 return CNetScheduleAdmin::eShutdownImmediate;
330 }
331
332 return CGridGlobals::GetInstance().GetShutdownLevel();
333 }
334
CheckIfJobIsLost()335 void SWorkerNodeJobContextImpl::CheckIfJobIsLost()
336 {
337 if (m_JobCommitStatus == CWorkerNodeJobContext::eCS_JobIsLost) {
338 NCBI_THROW_FMT(CGridWorkerNodeException, eJobIsLost,
339 "Job " << m_Job.job_id << " has been canceled");
340 }
341 }
342
ResetJobContext()343 void SWorkerNodeJobContextImpl::ResetJobContext()
344 {
345 m_JobNumber = CGridGlobals::GetInstance().GetNewJobNumber();
346
347 m_JobCommitStatus = CWorkerNodeJobContext::eCS_NotCommitted;
348 m_DisableRetries = false;
349 m_InputBlobSize = 0;
350 m_ExclusiveJob =
351 (m_Job.mask & CNetScheduleAPI::eExclusiveJob) != 0;
352
353 m_RequestContext->Reset();
354 m_JobGeneration = m_WorkerNode->m_CurrentJobGeneration;
355 }
356
RequestExclusiveMode()357 void CWorkerNodeJobContext::RequestExclusiveMode()
358 {
359 if (!m_Impl->m_ExclusiveJob) {
360 if (!m_Impl->m_WorkerNode->EnterExclusiveMode()) {
361 NCBI_THROW(CGridWorkerNodeException,
362 eExclusiveModeIsAlreadySet, "");
363 }
364 m_Impl->m_ExclusiveJob = true;
365 }
366 }
367
GetCommitStatusDescription(CWorkerNodeJobContext::ECommitStatus commit_status)368 const char* CWorkerNodeJobContext::GetCommitStatusDescription(
369 CWorkerNodeJobContext::ECommitStatus commit_status)
370 {
371 switch (commit_status) {
372 case eCS_Done:
373 return "done";
374 case eCS_Failure:
375 return "failed";
376 case eCS_Return:
377 return "returned";
378 case eCS_Reschedule:
379 return "rescheduled";
380 case eCS_JobIsLost:
381 return "lost";
382 default:
383 return "not committed";
384 }
385 }
386
x_PrintRequestStop()387 void SWorkerNodeJobContextImpl::x_PrintRequestStop()
388 {
389 m_RequestContext->SetAppState(eDiagAppState_RequestEnd);
390
391 if (!m_RequestContext->IsSetRequestStatus())
392 m_RequestContext->SetRequestStatus(
393 m_JobCommitStatus == CWorkerNodeJobContext::eCS_Done &&
394 m_Job.ret_code == 0 ? 200 : 500);
395
396 if (m_RequestContext->GetAppState() == eDiagAppState_Request)
397 m_RequestContext->SetAppState(eDiagAppState_RequestEnd);
398
399 if (g_IsRequestStopEventEnabled())
400 GetDiagContext().PrintRequestStop();
401 }
402
x_RunJob()403 void SWorkerNodeJobContextImpl::x_RunJob()
404 {
405 CWorkerNodeJobContext this_job_context(this);
406
407 m_RequestContext->SetRequestID((int) this_job_context.GetJobNumber());
408
409 if (!m_Job.client_ip.empty())
410 m_RequestContext->SetClientIP(m_Job.client_ip);
411
412 if (!m_Job.session_id.empty())
413 m_RequestContext->SetSessionID(m_Job.session_id);
414
415 if (!m_Job.page_hit_id.empty())
416 m_RequestContext->SetHitID(m_Job.page_hit_id);
417
418 m_RequestContext->SetAppState(eDiagAppState_RequestBegin);
419
420 CRequestContextSwitcher request_state_guard(m_RequestContext);
421
422 if (g_IsRequestStartEventEnabled()) {
423 auto extra = GetDiagContext().PrintRequestStart();
424 extra.Print("jid", m_Job.job_id);
425
426 CNetScheduleKey key;
427 if (key.ParseJobKey(m_Job.job_id)) extra.Print("_queue", key.queue);
428 }
429
430 m_RequestContext->SetAppState(eDiagAppState_Request);
431
432 CJobRunRegistration client_ip_registration, session_id_registration;
433
434 if (!m_Job.client_ip.empty() &&
435 !m_WorkerNode->m_JobsPerClientIP.CountJob(m_Job.client_ip,
436 &client_ip_registration)) {
437 ERR_POST("Too many jobs with client IP \"" <<
438 m_Job.client_ip << "\"; job " <<
439 m_Job.job_id << " will be returned.");
440 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
441 } else if (!m_Job.session_id.empty() &&
442 !m_WorkerNode->m_JobsPerSessionID.CountJob(m_Job.session_id,
443 &session_id_registration)) {
444 ERR_POST("Too many jobs with session ID \"" <<
445 m_Job.session_id << "\"; job " <<
446 m_Job.job_id << " will be returned.");
447 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
448 } else {
449 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
450 IWorkerNodeJobWatcher::eJobStarted);
451
452 try {
453 m_Job.ret_code =
454 m_WorkerNode->GetJobProcessor()->Do(this_job_context);
455 }
456 catch (CGridWorkerNodeException& ex) {
457 switch (ex.GetErrCode()) {
458 case CGridWorkerNodeException::eJobIsLost:
459 break;
460
461 case CGridWorkerNodeException::eExclusiveModeIsAlreadySet:
462 if (m_WorkerNode->m_LogRequested) {
463 LOG_POST_X(21, "Job " << m_Job.job_id <<
464 " will be returned back to the queue "
465 "because it requested exclusive mode while "
466 "another exclusive job is already running.");
467 }
468 if (m_JobCommitStatus ==
469 CWorkerNodeJobContext::eCS_NotCommitted)
470 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
471 break;
472
473 default:
474 ERR_POST_X(62, ex);
475 if (m_JobCommitStatus ==
476 CWorkerNodeJobContext::eCS_NotCommitted)
477 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
478 }
479 }
480 catch (CNetScheduleException& e) {
481 ERR_POST_X(20, "job " << m_Job.job_id << " failed: " << e);
482 if (e.GetErrCode() == CNetScheduleException::eJobNotFound) {
483 ERR_POST("Cannot proceed with job processing: job '" <<
484 m_Job.job_id << "' has expired.");
485 MarkJobAsLost();
486 } else if (m_JobCommitStatus ==
487 CWorkerNodeJobContext::eCS_NotCommitted) {
488 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Failure;
489 m_Job.error_msg = e.what();
490 }
491 }
492 catch (exception& e) {
493 ERR_POST_X(18, "job " << m_Job.job_id << " failed: " << e.what());
494 if (m_JobCommitStatus == CWorkerNodeJobContext::eCS_NotCommitted) {
495 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Failure;
496 m_Job.error_msg = e.what();
497 }
498 }
499
500 this_job_context.CloseStreams();
501
502 switch (m_JobCommitStatus) {
503 case CWorkerNodeJobContext::eCS_Done:
504 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
505 IWorkerNodeJobWatcher::eJobSucceeded);
506 break;
507
508 case CWorkerNodeJobContext::eCS_NotCommitted:
509 if (TWorkerNode_AllowImplicitJobReturn::GetDefault() ||
510 this_job_context.GetShutdownLevel() !=
511 CNetScheduleAdmin::eNoShutdown) {
512 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Return;
513 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
514 IWorkerNodeJobWatcher::eJobReturned);
515 break;
516 }
517
518 m_JobCommitStatus = CWorkerNodeJobContext::eCS_Failure;
519 m_Job.error_msg = "Job was not explicitly committed";
520 /* FALL THROUGH */
521
522 case CWorkerNodeJobContext::eCS_Failure:
523 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
524 IWorkerNodeJobWatcher::eJobFailed);
525 break;
526
527 case CWorkerNodeJobContext::eCS_Return:
528 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
529 IWorkerNodeJobWatcher::eJobReturned);
530 break;
531
532 case CWorkerNodeJobContext::eCS_Reschedule:
533 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
534 IWorkerNodeJobWatcher::eJobRescheduled);
535 break;
536
537 default: // eCanceled - no action needed.
538 // This object will be recycled in x_CommitJob().
539 break;
540 }
541
542 m_WorkerNode->x_NotifyJobWatchers(this_job_context,
543 IWorkerNodeJobWatcher::eJobStopped);
544 }
545
546 if (m_WorkerNode->IsExclusiveMode() && m_ExclusiveJob)
547 m_WorkerNode->LeaveExclusiveMode();
548
549 if (!CGridGlobals::GetInstance().IsShuttingDown())
550 m_CleanupEventSource->CallEventHandlers();
551
552 m_WorkerNode->m_JobCommitterThread->RecycleJobContextAndCommitJob(this,
553 request_state_guard);
554 }
555
Main()556 void* CMainLoopThread::Main()
557 {
558 const auto kRetryDelay = static_cast<unsigned long>(TServConn_RetryDelay::GetDefault() * kMilliSecondsPerSecond);
559 SetCurrentThreadName(m_ThreadName);
560 CDeadline max_wait_for_servers(
561 TWorkerNode_MaxWaitForServers::GetDefault());
562
563 CWorkerNodeJobContext job_context(
564 m_WorkerNode->m_JobCommitterThread->AllocJobContext());
565
566 const auto total_time_limit = m_WorkerNode->m_TotalTimeLimit;
567 CDeadline deadline(total_time_limit ? CDeadline(total_time_limit) : CDeadline::eInfinite);
568 CRequestContextSwitcher no_op;
569 unsigned try_count = 0;
570 while (!CGridGlobals::GetInstance().IsShuttingDown()) {
571 try {
572 try {
573 m_WorkerNode->m_ThreadPool->WaitForRoom(
574 m_WorkerNode->m_ThreadPoolTimeout);
575 }
576 catch (CBlockingQueueException&) {
577 // threaded pool is busy
578 continue;
579 }
580
581 if (x_GetNextJob(job_context->m_Job, deadline)) {
582 job_context->ResetJobContext();
583
584 try {
585 m_WorkerNode->m_ThreadPool->AcceptRequest(CRef<CStdRequest>(
586 new CWorkerNodeRequest(job_context)));
587 }
588 catch (CBlockingQueueException& ex) {
589 ERR_POST_X(28, ex);
590 // that must not happen after CBlockingQueue is fixed
591 _ASSERT(0);
592 job_context->m_JobCommitStatus =
593 CWorkerNodeJobContext::eCS_Return;
594 m_WorkerNode->m_JobCommitterThread->
595 RecycleJobContextAndCommitJob(job_context, no_op);
596 }
597 job_context =
598 m_WorkerNode->m_JobCommitterThread->AllocJobContext();
599
600 } else if (deadline.IsExpired()) {
601 LOG_POST("The total runtime limit (" << total_time_limit << " seconds) has been reached");
602 const auto kExitCode = 100; // See also one in grid_globals.cpp
603 CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eNormalShutdown, kExitCode);
604 break;
605
606 }
607 max_wait_for_servers =
608 CDeadline(TWorkerNode_MaxWaitForServers::GetDefault());
609 }
610 catch (CNetSrvConnException& e) {
611 SleepMilliSec(kRetryDelay);
612 if (e.GetErrCode() == CNetSrvConnException::eConnectionFailure &&
613 !max_wait_for_servers.GetRemainingTime().IsZero())
614 continue;
615 ERR_POST(Critical << "Could not connect to the "
616 "configured servers, exiting...");
617 CGridGlobals::GetInstance().RequestShutdown(
618 CNetScheduleAdmin::eShutdownImmediate);
619 }
620 catch (CNetServiceException& ex) {
621 ERR_POST_X(40, ex);
622 if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
623 CGridGlobals::GetInstance().RequestShutdown(
624 CNetScheduleAdmin::eShutdownImmediate);
625 } else {
626 SleepMilliSec(kRetryDelay);
627 continue;
628 }
629 }
630 catch (exception& ex) {
631 ERR_POST_X(29, ex.what());
632 if (TWorkerNode_StopOnJobErrors::GetDefault()) {
633 CGridGlobals::GetInstance().RequestShutdown(
634 CNetScheduleAdmin::eShutdownImmediate);
635 }
636 }
637 try_count = 0;
638 }
639
640 return NULL;
641 }
642
643
CheckState()644 CNetScheduleGetJob::EState CMainLoopThread::CImpl::CheckState()
645 {
646 EState ret = eWorking;
647
648 while (!CGridGlobals::GetInstance().IsShuttingDown()) {
649 void* event;
650
651 while ((event = SwapPointers(&m_WorkerNode->m_SuspendResumeEvent,
652 NO_EVENT)) != NO_EVENT) {
653 if (event == SUSPEND_EVENT) {
654 if (!m_WorkerNode->m_TimelineIsSuspended) {
655 // Stop the timeline.
656 m_WorkerNode->m_TimelineIsSuspended = true;
657 ret = eRestarted;
658 }
659 } else { /* event == RESUME_EVENT */
660 if (m_WorkerNode->m_TimelineIsSuspended) {
661 // Resume the timeline.
662 m_WorkerNode->m_TimelineIsSuspended = false;
663 }
664 }
665 }
666
667 if (!m_WorkerNode->m_TimelineIsSuspended) {
668 return ret;
669 }
670
671 m_WorkerNode->m_NSExecutor->
672 m_NotificationHandler.WaitForNotification(m_Timeout);
673 }
674
675 return eStopped;
676 }
677
ReadNotifications()678 CNetServer CMainLoopThread::CImpl::ReadNotifications()
679 {
680 if (m_WorkerNode->m_NSExecutor->
681 m_NotificationHandler.ReceiveNotification())
682 return x_ProcessRequestJobNotification();
683
684 return CNetServer();
685 }
686
WaitForNotifications(const CDeadline & deadline)687 CNetServer CMainLoopThread::CImpl::WaitForNotifications(const CDeadline& deadline)
688 {
689 if (m_WorkerNode->m_NSExecutor->
690 m_NotificationHandler.WaitForNotification(deadline)) {
691 return x_ProcessRequestJobNotification();
692 }
693
694 return CNetServer();
695 }
696
x_ProcessRequestJobNotification()697 CNetServer CMainLoopThread::CImpl::x_ProcessRequestJobNotification()
698 {
699 CNetServer server;
700
701 // No need to check state here, it will be checked before entry processing
702 m_WorkerNode->m_NSExecutor->
703 m_NotificationHandler.CheckRequestJobNotification(
704 m_WorkerNode->m_NSExecutor, &server);
705
706 return server;
707 }
708
MoreJobs(const SEntry &)709 bool CMainLoopThread::CImpl::MoreJobs(const SEntry& /*entry*/)
710 {
711 return true;
712 }
713
CheckEntry(SEntry & entry,const string & prio_aff_list,bool any_affinity,CNetScheduleJob & job,CNetScheduleAPI::EJobStatus *)714 bool CMainLoopThread::CImpl::CheckEntry(
715 SEntry& entry,
716 const string& prio_aff_list,
717 bool any_affinity,
718 CNetScheduleJob& job,
719 CNetScheduleAPI::EJobStatus* /*job_status*/)
720 {
721 CNetServer server(m_API.GetService()->GetServer(entry.server_address));
722 return m_WorkerNode->m_NSExecutor->x_GetJobWithAffinityLadder(server,
723 m_Timeout, prio_aff_list, any_affinity, job);
724 }
725
ReturnJob(CNetScheduleJob & job)726 void CMainLoopThread::CImpl::ReturnJob(CNetScheduleJob& job)
727 {
728 m_WorkerNode->m_NSExecutor->ReturnJob(job, false);
729 }
730
x_GetNextJob(CNetScheduleJob & job,const CDeadline & deadline)731 bool CMainLoopThread::x_GetNextJob(CNetScheduleJob& job, const CDeadline& deadline)
732 {
733 if (!m_WorkerNode->x_AreMastersBusy()) {
734 SleepSec(m_WorkerNode->m_NSTimeout);
735 return false;
736 }
737
738 if (!m_WorkerNode->WaitForExclusiveJobToFinish())
739 return false;
740
741 const bool any_affinity = m_Impl.m_API->m_AffinityLadder.empty();
742
743 if (m_Timeline.GetJob(deadline, job, NULL, any_affinity) != CNetScheduleGetJob::eJob) {
744 return false;
745 }
746
747 // Already executing this job, so do nothing
748 // (and rely on that execution to report its result later)
749 if (!m_WorkerNode->m_JobsInProgress.Add(job)) {
750 ERR_POST(Warning << "Got already processing job " << job.job_id);
751 return false;
752 }
753
754 if (job.mask & CNetScheduleAPI::eExclusiveJob) {
755 if (!m_WorkerNode->EnterExclusiveMode()) {
756 m_WorkerNode->m_NSExecutor.ReturnJob(job);
757 return false;
758 }
759 }
760
761 // No need to check for idleness here, running jobs won't be stopped anyway
762 if (CGridGlobals::GetInstance().IsShuttingDown()) {
763 m_WorkerNode->m_NSExecutor.ReturnJob(job);
764 return false;
765 }
766
767 return true;
768 }
769
GetServerOutputSize()770 size_t CGridWorkerNode::GetServerOutputSize()
771 {
772 return m_Impl->m_QueueEmbeddedOutputSize;
773 }
774
775 END_NCBI_SCOPE
776