1 /* $Id: exec_helpers.cpp 599189 2019-12-20 15:57:19Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Dmitry Kazimirov, Rafael Sadyrov
27 *
28 * File Description: NetSchedule worker node sample
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include <sstream>
35
36 #include <connect/ncbi_pipe.hpp>
37
38 #include <corelib/rwstream.hpp>
39 #include <corelib/request_ctx.hpp>
40
41 #if defined(NCBI_OS_UNIX)
42 #include <fcntl.h>
43 #endif
44
45 #include "exec_helpers.hpp"
46 #include "async_task.hpp"
47
48 #define PIPE_SIZE 64 * 1024
49
50 BEGIN_NCBI_SCOPE
51
52 // A record for a child process
53 struct CRemoteAppReaperTask
54 {
55 const CProcess process;
56
CRemoteAppReaperTaskCRemoteAppReaperTask57 CRemoteAppReaperTask(TProcessHandle handle) : process(handle) {}
58
59 bool operator()(int current, int max_attempts);
60 };
61
operator ()(int current,int max_attempts)62 bool CRemoteAppReaperTask::operator()(int current, int max_attempts)
63 {
64 CProcess::CExitInfo exitinfo;
65 const bool first_attempt = current == 1;
66
67 if (process.Wait(0, &exitinfo) != -1 || exitinfo.IsExited() || exitinfo.IsSignaled()) {
68 // Log a message for those that had failed to be killed before
69 if (!first_attempt) {
70 LOG_POST(Note << "Successfully waited for a process: " << process.GetHandle());
71 }
72
73 return true;
74 }
75
76 if (first_attempt) {
77 if (process.KillGroup()) return true;
78
79 ERR_POST(Warning << "Failed to kill a process: " << process.GetHandle() << ", will wait for it");
80 return false;
81 }
82
83 if (current > max_attempts) {
84 // Give up if there are too many attempts to wait for a process
85 ERR_POST("Gave up waiting for a process: " << process.GetHandle());
86 return true;
87 }
88
89 return false;
90 }
91
92 // This class is responsibe for the whole process of reaping child processes
93 class CRemoteAppReaper : public CAsyncTaskProcessor<CRemoteAppReaperTask>
94 {
95 using CAsyncTaskProcessor<CRemoteAppReaperTask>::CAsyncTaskProcessor;
96 };
97
98 struct CRemoteAppRemoverTask
99 {
100 const string path;
101
102 CRemoteAppRemoverTask(string p);
103
104 bool operator()(int current, int max_attempts) const;
105 };
106
CRemoteAppRemoverTask(string p)107 CRemoteAppRemoverTask::CRemoteAppRemoverTask(string p) :
108 path(move(p))
109 {
110 if (path.empty()) return;
111
112 CDir dir(path);
113
114 if (dir.Exists()) return;
115
116 dir.CreatePath();
117 }
118
operator ()(int current,int max_attempts) const119 bool CRemoteAppRemoverTask::operator()(int current, int max_attempts) const
120 {
121 if (path.empty()) return true;
122
123 const bool first_attempt = current == 1;
124
125 try {
126 if (CDir(path).Remove(CDirEntry::eRecursiveIgnoreMissing)) {
127 // Log a message for those that had failed to be removed before
128 if (!first_attempt) {
129 LOG_POST(Note << "Successfully removed a path: " << path);
130 }
131
132 return true;
133 }
134 }
135 catch (...) {
136 }
137
138 if (current > max_attempts) {
139 // Give up if there are too many attempts to remove a path
140 ERR_POST("Gave up removing a path: " << path);
141 return true;
142 }
143
144 if (first_attempt) {
145 ERR_POST(Warning << "Failed to remove a path: " << path << ", will try later");
146 return false;
147 }
148
149 return false;
150 }
151
152 // This class is responsibe for removing tmp directories
153 class CRemoteAppRemover : public CAsyncTaskProcessor<CRemoteAppRemoverTask>
154 {
155 public:
156 struct SGuard;
157
158 using CAsyncTaskProcessor<CRemoteAppRemoverTask>::CAsyncTaskProcessor;
159 };
160
161 struct CRemoteAppRemover::SGuard
162 {
SGuardCRemoteAppRemover::SGuard163 SGuard(CRemoteAppRemover* remover, CRemoteAppRemoverTask task, bool remove_tmp_dir) :
164 m_Scheduler(remover ? &remover->GetScheduler() : nullptr),
165 m_Task(move(task)),
166 m_RemoveTmpDir(remove_tmp_dir)
167 {}
168
~SGuardCRemoteAppRemover::SGuard169 ~SGuard()
170 {
171 // Do not remove
172 if (!m_RemoveTmpDir) return;
173
174 // Remove asynchronously
175 if (m_Scheduler && (*m_Scheduler)(m_Task)) return;
176
177 // Remove synchronously
178 m_Task(1, 0);
179 }
180
181 private:
182 CRemoteAppRemover::CScheduler* m_Scheduler;
183 CRemoteAppRemoverTask m_Task;
184 bool m_RemoveTmpDir;
185 };
186
187 class CTimer
188 {
189 public:
CTimer(const CTimeout & timeout)190 CTimer(const CTimeout& timeout) :
191 m_Deadline(timeout),
192 m_Timeout(timeout)
193 {}
194
Restart()195 void Restart() { m_Deadline = m_Timeout; }
IsExpired() const196 bool IsExpired() const { return m_Deadline.IsExpired(); }
PresetSeconds() const197 unsigned PresetSeconds() const { return (unsigned)m_Timeout.GetAsDouble(); }
198
199 private:
200 CDeadline m_Deadline;
201 CTimeout m_Timeout;
202 };
203
204 class CTimedProcessWatcher : public CPipe::IProcessWatcher
205 {
206 public:
207 struct SParams
208 {
209 string process_type;
210 const CTimeout& run_timeout;
211 CRemoteAppReaper::CScheduler& process_manager;
212
SParamsCTimedProcessWatcher::SParams213 SParams(string pt, const CTimeout& rt, CRemoteAppReaper::CScheduler& pm) :
214 process_type(move(pt)),
215 run_timeout(rt),
216 process_manager(pm)
217 {}
218 };
219
CTimedProcessWatcher(SParams & p)220 CTimedProcessWatcher(SParams& p)
221 : m_ProcessManager(p.process_manager),
222 m_ProcessType(p.process_type),
223 m_Deadline(p.run_timeout)
224 {
225 }
226
Watch(TProcessHandle pid)227 virtual EAction Watch(TProcessHandle pid)
228 {
229 if (m_Deadline.IsExpired()) {
230 ERR_POST(m_ProcessType << " run time exceeded "
231 << m_Deadline.PresetSeconds()
232 <<" second(s), stopping the child: " << pid);
233 return m_ProcessManager(pid) ? eExit : eStop;
234 }
235
236 return eContinue;
237 }
238
239 protected:
240 CRemoteAppReaper::CScheduler& m_ProcessManager;
241 const string m_ProcessType;
242 const CTimer m_Deadline;
243 };
244
245 // This class is responsible for reporting app/cgi version run by this app
246 class CRemoteAppVersion
247 {
248 public:
CRemoteAppVersion(const string & app,const vector<string> & args)249 CRemoteAppVersion(const string& app, const vector<string>& args)
250 : m_App(app), m_Args(args)
251 {}
252
253 string Get(CTimedProcessWatcher::SParams& p, const string& v) const;
254
255 private:
256 const string m_App;
257 const vector<string> m_Args;
258 };
259
Get(CTimedProcessWatcher::SParams & p,const string & v) const260 string CRemoteAppVersion::Get(CTimedProcessWatcher::SParams& p, const string& v) const
261 {
262 CTimedProcessWatcher wait_one_second(p);
263 istringstream in;
264 ostringstream out;
265 ostringstream err;
266 int exit_code;
267
268 if (CPipe::ExecWait(m_App, m_Args, in, out, err, exit_code,
269 kEmptyStr, 0, &wait_one_second) == CPipe::eDone) {
270 // Restrict version string to 1024 chars
271 string app_ver(out.str(), 0, 1024);
272 return NStr::Sanitize(app_ver) + " / " + v;
273 }
274
275 return v;
276 }
277
278 // This class is responsible for reporting clients about apps timing out
279 class CRemoteAppTimeoutReporter
280 {
281 public:
CRemoteAppTimeoutReporter(const string & mode)282 CRemoteAppTimeoutReporter(const string& mode) : m_Mode(Get(mode)) {}
283
Report(CWorkerNodeJobContext & job_context,unsigned seconds)284 void Report(CWorkerNodeJobContext& job_context, unsigned seconds)
285 {
286 if (m_Mode != eNever) {
287 job_context.PutProgressMessage("Job run time exceeded " +
288 NStr::UIntToString(seconds) + " seconds.", true,
289 m_Mode == eAlways);
290 }
291 }
292
293 private:
294 enum EMode { eSmart, eAlways, eNever };
295
Get(const string & mode)296 static EMode Get(const string& mode)
297 {
298 if (!NStr::CompareNocase(mode, "smart"))
299 return eSmart;
300
301 else if (!NStr::CompareNocase(mode, "always"))
302 return eAlways;
303
304 else if (!NStr::CompareNocase(mode, "never"))
305 return eNever;
306 else
307 ERR_POST("Unknown parameter value: "
308 "parameter \"progress_message_on_timeout\", "
309 "value: \"" << mode << "\". "
310 "Allowed values: smart, always, never");
311
312 return eSmart;
313 }
314
315 EMode m_Mode;
316 };
317
318 /// Class representing (non overlapping) integer ranges.
319 ///
320 class CRanges
321 {
322 public:
323 /// Reads integer ranges from an input stream.
324 ///
325 /// The data must be in the following format (in ascending order):
326 /// [!] R1, N2, ..., Rn
327 ///
328 /// Where:
329 /// ! - negation, makes all provided ranges be excluded
330 /// (not included).
331 /// R1 ... Rn - integer closed ranges specified either as
332 /// FROM - TO (corresponds to [FROM, TO] range) or as
333 /// NUMBER (corresponds to [NUMBER, NUMBER] range).
334 /// Example:
335 /// 4, 6 - 9, 16 - 40, 64
336 ///
337 CRanges(istream& is);
338
339 /// Checks whether provided number belongs to one of the ranges.
340 ///
341 bool Contain(int n) const;
342
343 private:
344 vector<int> m_Ranges;
345 };
346
CRanges(istream & is)347 CRanges::CRanges(istream& is)
348 {
349 char ch;
350
351 if (!(is >> skipws >> ch)) return; // EoF
352
353 // Char '!' makes provided ranges to be excluded (not included)
354 if (ch == '!')
355 m_Ranges.push_back(numeric_limits<int>::min());
356 else
357 is.putback(ch);
358
359 bool interval = false;
360
361 do {
362 int n;
363
364 if (!(is >> n)) break;
365 if (!(is >> ch)) ch = ',';
366
367 if (m_Ranges.size()) {
368 int previous = m_Ranges.back();
369
370 if (n < previous) {
371 ostringstream err;
372 err << n << " is less or equal than previous number, "
373 "intervals must be sorted and not overlapping";
374 throw invalid_argument(err.str());
375 }
376 }
377
378 if (ch == ',') {
379 // If it's only one number in interval, add both start and end of it
380 if (!interval) m_Ranges.push_back(n);
381
382 m_Ranges.push_back(n + 1);
383 interval = false;
384
385 } else if (ch == '-' && !interval) {
386 interval = true;
387 m_Ranges.push_back(n);
388
389 } else {
390 ostringstream err;
391 err << "Unexpected char '" << ch << "'";
392 throw invalid_argument(err.str());
393 }
394 } while (is);
395
396 if (interval) {
397 ostringstream err;
398 err << "Missing interval end";
399 throw invalid_argument(err.str());
400 }
401
402 if (!is.eof()) {
403 ostringstream err;
404 err << "Not a number near: " << is.rdbuf();
405 throw invalid_argument(err.str());
406 }
407 }
408
Contain(int n) const409 bool CRanges::Contain(int n) const
410 {
411 auto range = upper_bound(m_Ranges.begin(), m_Ranges.end(), n);
412
413 // Every number starts a range, every second range is excluded
414 return (range - m_Ranges.begin()) % 2;
415 }
416
s_ReadRanges(const IRegistry & reg,const string & sec,string param)417 CRanges* s_ReadRanges(const IRegistry& reg, const string& sec, string param)
418 {
419 if (reg.HasEntry(sec, param)) {
420 istringstream iss(reg.GetString(sec, param, kEmptyStr));
421
422 try {
423 return new CRanges(iss);
424 }
425 catch (invalid_argument& ex) {
426 NCBI_THROW_FMT(CInvalidParamException, eInvalidCharacter,
427 "Parameter '" << param << "' parsing error: " << ex.what());
428 }
429 }
430
431 return nullptr;
432 }
433
s_ToTimeout(unsigned sec)434 CTimeout s_ToTimeout(unsigned sec)
435 {
436 // Zero counts as infinite timeout
437 return sec ? CTimeout(sec, 0) : CTimeout::eInfinite;
438 }
439
440 struct SSection
441 {
442 const IRegistry& reg;
443 const string& name;
444
SSectionSSection445 SSection(const IRegistry& r, const string& n) : reg(r), name(n) {}
446
GetSSection447 int Get(const string& param, int def) const
448 {
449 return reg.GetInt(name, param, def, 0, IRegistry::eReturn);
450 }
451
GetSSection452 bool Get(const string& param, bool def) const
453 {
454 return reg.GetBool(name, param, def, 0, IRegistry::eReturn);
455 }
456 };
457
458 //////////////////////////////////////////////////////////////////////////////
459 ///
CRemoteAppLauncher(const string & sec_name,const IRegistry & reg)460 CRemoteAppLauncher::CRemoteAppLauncher(const string& sec_name,
461 const IRegistry& reg) :
462 m_NonZeroExitAction(eDoneOnNonZeroExit),
463 m_RemoveTempDir(true),
464 m_CacheStdOutErr(true)
465 {
466 const SSection sec(reg, sec_name);
467
468 m_AppRunTimeout = s_ToTimeout(sec.Get("max_app_run_time", 0));
469 m_KeepAlivePeriod = s_ToTimeout(sec.Get("keep_alive_period", 0));
470
471 if (reg.HasEntry(sec_name, "non_zero_exit_action") ) {
472 string val = reg.GetString(sec_name, "non_zero_exit_action", "");
473 if (NStr::CompareNocase(val, "fail") == 0 )
474 m_NonZeroExitAction = eFailOnNonZeroExit;
475 else if (NStr::CompareNocase(val, "return") == 0 )
476 m_NonZeroExitAction = eReturnOnNonZeroExit;
477 else if (NStr::CompareNocase(val, "done") == 0 )
478 m_NonZeroExitAction = eDoneOnNonZeroExit;
479 else {
480 ERR_POST("Unknown parameter value: "
481 "section [" << sec_name << "], "
482 "parameter \"non_zero_exit_action\", "
483 "value: \"" << val << "\". "
484 "Allowed values: fail, return, done");
485 }
486 } else if (sec.Get("fail_on_non_zero_exit", false))
487 m_NonZeroExitAction = eFailOnNonZeroExit;
488
489 m_MustFailNoRetries.reset(
490 s_ReadRanges(reg, sec_name, "fail_no_retries_if_exit_code"));
491
492 const string name = CNcbiApplication::Instance()->GetProgramDisplayName();
493
494 if (sec.Get("run_in_separate_dir", false)) {
495 if (reg.HasEntry(sec_name, "tmp_dir"))
496 m_TempDir = reg.GetString(sec_name, "tmp_dir", "." );
497 else
498 m_TempDir = reg.GetString(sec_name, "tmp_path", "." );
499
500 if (!CDirEntry::IsAbsolutePath(m_TempDir)) {
501 string tmp = CDir::GetCwd()
502 + CDirEntry::GetPathSeparator()
503 + m_TempDir;
504 m_TempDir = CDirEntry::NormalizePath(tmp);
505 }
506 if (reg.HasEntry(sec_name, "remove_tmp_dir"))
507 m_RemoveTempDir = sec.Get("remove_tmp_dir", true);
508 else
509 m_RemoveTempDir = sec.Get("remove_tmp_path", true);
510
511 int sleep = sec.Get("sleep_between_remove_tmp_attempts", 60);
512 int max_attempts = sec.Get("max_remove_tmp_attempts", 60);
513 m_Remover.reset(new CRemoteAppRemover(sleep, max_attempts, name + "_rm"));
514
515 m_CacheStdOutErr = sec.Get("cache_std_out_err", true);
516 }
517
518 m_AppPath = reg.GetString(sec_name, "app_path", kEmptyStr);
519 if (m_AppPath.empty()) {
520 NCBI_THROW_FMT(CConfigException, eParameterMissing,
521 "Missing configuration parameter [" << sec_name <<
522 "].app_path");
523 }
524 if (!CDirEntry::IsAbsolutePath(m_AppPath)) {
525 string tmp = CDir::GetCwd()
526 + CDirEntry::GetPathSeparator()
527 + m_AppPath;
528 m_AppPath = CDirEntry::NormalizePath(tmp);
529 }
530
531 m_MonitorAppPath = reg.GetString(sec_name, "monitor_app_path", kEmptyStr);
532 if (!m_MonitorAppPath.empty()) {
533 if (!CDirEntry::IsAbsolutePath(m_MonitorAppPath)) {
534 string tmp = CDir::GetCwd()
535 + CDirEntry::GetPathSeparator()
536 + m_MonitorAppPath;
537 m_MonitorAppPath = CDirEntry::NormalizePath(tmp);
538 }
539 CFile f(m_MonitorAppPath);
540 if (!f.Exists() || !CanExec(f)) {
541 ERR_POST("Can not execute \"" << m_MonitorAppPath
542 << "\". The Monitor application will not run!");
543 m_MonitorAppPath = kEmptyStr;
544 }
545 }
546
547 m_MonitorRunTimeout = s_ToTimeout(sec.Get("max_monitor_running_time", 5));
548 m_MonitorPeriod = s_ToTimeout(sec.Get("monitor_period", 5));
549 m_KillTimeout.sec = sec.Get("kill_timeout", 1);
550 m_KillTimeout.usec = 0;
551
552 m_ExcludeEnv.clear();
553 m_IncludeEnv.clear();
554 m_AddedEnv.clear();
555
556 NStr::Split(reg.GetString("env_inherit", "exclude", "")," ;,", m_ExcludeEnv,
557 NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
558 NStr::Split(reg.GetString("env_inherit", "include", "")," ;,", m_IncludeEnv,
559 NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
560
561 list<string> added_env;
562 reg.EnumerateEntries("env_set", &added_env);
563
564 ITERATE(list<string>, it, added_env) {
565 const string& s = *it;
566 m_AddedEnv[s] = reg.GetString("env_set", s, "");
567 }
568
569 int sleep = sec.Get("sleep_between_reap_attempts", 60);
570 int max_attempts = sec.Get("max_reap_attempts_after_kill", 60);
571 m_Reaper.reset(new CRemoteAppReaper(sleep, max_attempts, name + "_cl"));
572
573 const string cmd = reg.GetString(sec_name, "version_cmd", m_AppPath);
574 const string args = reg.GetString(sec_name, "version_args", "-version");
575 vector<string> v;
576 m_Version.reset(new CRemoteAppVersion(cmd,
577 NStr::Split(args, " ", v)));
578
579 const string mode = reg.GetString(sec_name, "progress_message_on_timeout",
580 "smart");
581 m_TimeoutReporter.reset(new CRemoteAppTimeoutReporter(mode));
582 }
583
584 // We need this explicit empty destructor,
585 // so it could destruct CRemoteAppReaper and CRemoteAppVersion instances.
586 // Otherwise, there would be implicit inline destructor
587 // that could be placed where these classes are incomplete.
~CRemoteAppLauncher()588 CRemoteAppLauncher::~CRemoteAppLauncher()
589 {
590 }
591
592 //////////////////////////////////////////////////////////////////////////////
593 ///
CanExec(const CFile & file)594 bool CRemoteAppLauncher::CanExec(const CFile& file)
595 {
596 CDirEntry::TMode user_mode = 0;
597 if (!file.GetMode(&user_mode))
598 return false;
599 if (user_mode & CDirEntry::fExecute)
600 return true;
601 return false;
602 }
603
604 //////////////////////////////////////////////////////////////////////////////
605 ///
606 class CJobContextProcessWatcher : public CTimedProcessWatcher
607 {
608 public:
609 struct SParams : CTimedProcessWatcher::SParams
610 {
611 CWorkerNodeJobContext& job_context;
612 const CTimeout& keep_alive_period;
613 CRemoteAppTimeoutReporter& timeout_reporter;
614
SParamsCJobContextProcessWatcher::SParams615 SParams(CWorkerNodeJobContext& jc,
616 string pt,
617 const CTimeout& rt,
618 const CTimeout& kap,
619 CRemoteAppTimeoutReporter& tr,
620 CRemoteAppReaper::CScheduler& pm)
621 : CTimedProcessWatcher::SParams(move(pt), rt, pm),
622 job_context(jc),
623 keep_alive_period(kap),
624 timeout_reporter(tr)
625 {}
626 };
627
CJobContextProcessWatcher(SParams & p)628 CJobContextProcessWatcher(SParams& p)
629 : CTimedProcessWatcher(p),
630 m_JobContext(p.job_context), m_KeepAlive(p.keep_alive_period),
631 m_TimeoutReporter(p.timeout_reporter)
632 {
633 }
634
OnStart(TProcessHandle pid)635 virtual EAction OnStart(TProcessHandle pid)
636 {
637 if (m_JobContext.GetShutdownLevel() ==
638 CNetScheduleAdmin::eShutdownImmediate) {
639 return eStop;
640 }
641
642 LOG_POST(Note << "Child PID: " << NStr::UInt8ToString((Uint8) pid));
643
644 return CTimedProcessWatcher::OnStart(pid);
645 }
646
Watch(TProcessHandle pid)647 virtual EAction Watch(TProcessHandle pid)
648 {
649 if (m_JobContext.GetShutdownLevel() ==
650 CNetScheduleAdmin::eShutdownImmediate) {
651 m_JobContext.ReturnJob();
652 return eStop;
653 }
654
655 EAction action = CTimedProcessWatcher::Watch(pid);
656
657 if (action != eContinue) {
658 m_TimeoutReporter.Report(m_JobContext, m_Deadline.PresetSeconds());
659 return action;
660 }
661
662 if (m_KeepAlive.IsExpired()) {
663 m_JobContext.JobDelayExpiration(m_KeepAlive.PresetSeconds() + 10);
664 m_KeepAlive.Restart();
665 }
666
667 return eContinue;
668 }
669
670 protected:
671 CWorkerNodeJobContext& m_JobContext;
672
673 private:
674 CTimer m_KeepAlive;
675 CRemoteAppTimeoutReporter& m_TimeoutReporter;
676 };
677
678 //////////////////////////////////////////////////////////////////////////////
679 ///
680 class CMonitoredProcessWatcher : public CJobContextProcessWatcher
681 {
682 public:
CMonitoredProcessWatcher(SParams & p,const string & job_wdir,const string & path,const char * const * env,CTimeout run_period,CTimeout run_timeout)683 CMonitoredProcessWatcher(SParams& p, const string& job_wdir,
684 const string& path, const char* const* env,
685 CTimeout run_period, CTimeout run_timeout)
686 : CJobContextProcessWatcher(p),
687 m_MonitorWatch(run_period),
688 m_JobWDir(job_wdir),
689 m_Path(path),
690 m_Env(env),
691 m_RunTimeout(run_timeout)
692 {
693 }
694
Watch(TProcessHandle pid)695 virtual EAction Watch(TProcessHandle pid)
696 {
697 EAction action = CJobContextProcessWatcher::Watch(pid);
698
699 if (action != eContinue)
700 return action;
701
702 if (m_MonitorWatch.IsExpired()) {
703 action = MonitorRun(pid);
704 m_MonitorWatch.Restart();
705 }
706
707 return action;
708 }
709
710 private:
711 // The exit code of the monitor program is interpreted as follows
712 // (any exit code not listed below is treated as eInternalError)
713 enum EResult {
714 // The job is running as expected.
715 // The monitor's stdout is interpreted as a job progress message.
716 // The stderr goes to the log file if logging is enabled.
717 eJobRunning = 0,
718 // The monitor detected an inconsistency with the job run;
719 // the job must be returned back to the queue.
720 // The monitor's stderr goes to the log file
721 // regardless of whether logging is enabled or not.
722 eJobToReturn = 1,
723 // The job must be failed.
724 // The monitor's stdout is interpreted as the error message;
725 // stderr goes to the log file regardless of whether
726 // logging is enabled or not.
727 eJobFailed = 2,
728 // There's a problem with the monitor application itself.
729 // The job continues to run and the monitor's stderr goes
730 // to the log file unconditionally.
731 eInternalError = 3,
732 };
733
MonitorRun(TProcessHandle pid)734 EAction MonitorRun(TProcessHandle pid)
735 {
736 CNcbiStrstream in;
737 CNcbiOstrstream out;
738 CNcbiOstrstream err;
739 vector<string> args;
740 args.push_back("-pid");
741 args.push_back(NStr::UInt8ToString((Uint8) pid));
742 args.push_back("-jid");
743 args.push_back(m_JobContext.GetJobKey());
744 args.push_back("-jwdir");
745 args.push_back(m_JobWDir);
746
747 CTimedProcessWatcher::SParams params("Monitor", m_RunTimeout, m_ProcessManager);
748 CTimedProcessWatcher callback(params);
749 int exit_value = eInternalError;
750 try {
751 if (CPipe::eDone != CPipe::ExecWait(m_Path, args, in,
752 out, err, exit_value,
753 kEmptyStr, m_Env,
754 &callback,
755 NULL,
756 PIPE_SIZE)) {
757 exit_value = eInternalError;
758 }
759 }
760 catch (exception& ex) {
761 err << ex.what();
762 }
763 catch (...) {
764 err << "Unknown error";
765 }
766
767 switch (exit_value) {
768 case eJobRunning:
769 {
770 bool non_empty_output = !IsOssEmpty(out);
771 if (non_empty_output) {
772 m_JobContext.PutProgressMessage
773 (CNcbiOstrstreamToString(out), true);
774 }
775 if (m_JobContext.IsLogRequested() &&
776 ( !non_empty_output || !IsOssEmpty(err) ))
777 x_Log("exited with zero return code", err);
778 }
779 return eContinue;
780
781 case eJobToReturn:
782 m_JobContext.ReturnJob();
783 x_Log("job is returned", err);
784 return eStop;
785
786 case eJobFailed:
787 {
788 x_Log("job failed", err);
789 string errmsg;
790 if ( !IsOssEmpty(out) ) {
791 errmsg = CNcbiOstrstreamToString(out);
792 } else
793 errmsg = "Monitor requested job termination";
794 throw runtime_error(errmsg);
795 }
796 return eContinue;
797 }
798
799 x_Log("internal error", err);
800 return eContinue;
801 }
802
x_Log(const string & what,CNcbiOstrstream & sstream)803 inline void x_Log(const string& what, CNcbiOstrstream& sstream)
804 {
805 if ( !IsOssEmpty(sstream) ) {
806 ERR_POST(m_JobContext.GetJobKey() << " (monitor) " << what <<
807 ": " << (string)CNcbiOstrstreamToString(sstream));
808 } else {
809 ERR_POST(m_JobContext.GetJobKey() << " (monitor) " << what << ".");
810 }
811 }
812
813 CTimer m_MonitorWatch;
814 string m_JobWDir;
815 string m_Path;
816 const char* const* m_Env;
817 CTimeout m_RunTimeout;
818 };
819
820
821 //////////////////////////////////////////////////////////////////////////////
822 ///
823
824 class CTmpStreamGuard
825 {
826 public:
CTmpStreamGuard(const string & tmp_dir,const string & name,CNcbiOstream & orig_stream,bool cache_std_out_err)827 CTmpStreamGuard(const string& tmp_dir,
828 const string& name,
829 CNcbiOstream& orig_stream,
830 bool cache_std_out_err) : m_OrigStream(orig_stream), m_Stream(NULL)
831 {
832 if (!tmp_dir.empty() && cache_std_out_err) {
833 m_Name = tmp_dir + CDirEntry::GetPathSeparator();
834 m_Name += name;
835 }
836 if (!m_Name.empty()) {
837 try {
838 m_ReaderWriter.reset(new CFileReaderWriter(m_Name,
839 CFileIO_Base::eCreate));
840 } catch (CFileException& ex) {
841 ERR_POST("Could not create a temporary file " <<
842 m_Name << " :" << ex.what() << " the data will be "
843 "written directly to the original stream");
844 m_Name.erase();
845 m_Stream = &m_OrigStream;
846 return;
847 }
848 #if defined(NCBI_OS_UNIX)
849 // If the file is created on an NFS file system, the CLOEXEC
850 // flag needs to be set, otherwise deleting the temporary
851 // directory will not succeed.
852 TFileHandle fd = m_ReaderWriter->GetFileIO().GetFileHandle();
853 fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
854 #endif
855 m_StreamGuard.reset(new CWStream(m_ReaderWriter.get()));
856 m_Stream = m_StreamGuard.get();
857 } else {
858 m_Stream = &m_OrigStream;
859 }
860 }
~CTmpStreamGuard()861 ~CTmpStreamGuard()
862 {
863 try {
864 Close();
865 }
866 catch (exception& ex) {
867 ERR_POST("CTmpStreamGuard::~CTmpStreamGuard(): " <<
868 m_Name << " --> " << ex.what());
869 }
870 }
871
GetOStream()872 CNcbiOstream& GetOStream() { return *m_Stream; }
873
Close()874 void Close()
875 {
876 if (!m_Name.empty() && m_StreamGuard.get()) {
877 m_StreamGuard.reset();
878 m_ReaderWriter->Flush();
879 m_ReaderWriter->GetFileIO().SetFilePos(0, CFileIO_Base::eBegin);
880 {
881 CRStream rstm(m_ReaderWriter.get());
882 if (!rstm.good()
883 || !NcbiStreamCopy(m_OrigStream, rstm))
884 ERR_POST( "Cannot copy \"" << m_Name << "\" file.");
885 }
886 m_ReaderWriter.reset();
887 }
888 }
889
890 private:
891 CNcbiOstream& m_OrigStream;
892 unique_ptr<CFileReaderWriter> m_ReaderWriter;
893 unique_ptr<CNcbiOstream> m_StreamGuard;
894 CNcbiOstream* m_Stream;
895 string m_Name;
896 };
897
898
ExecRemoteApp(const vector<string> & args,CNcbiIstream & in,CNcbiOstream & out,CNcbiOstream & err,int & exit_value,CWorkerNodeJobContext & job_context,unsigned app_run_timeout,const char * const env[]) const899 bool CRemoteAppLauncher::ExecRemoteApp(const vector<string>& args,
900 CNcbiIstream& in, CNcbiOstream& out, CNcbiOstream& err,
901 int& exit_value,
902 CWorkerNodeJobContext& job_context,
903 unsigned app_run_timeout,
904 const char* const env[]) const
905 {
906 string tmp_path = m_TempDir;
907 if (!tmp_path.empty()) {
908 CFastLocalTime lt;
909 bool substitution_found = false;
910 size_t subst_pos;
911 while ((subst_pos = tmp_path.find('%')) != string::npos) {
912 if (subst_pos + 1 >= tmp_path.length())
913 break;
914 switch (tmp_path[subst_pos + 1]) {
915 case '%':
916 tmp_path.replace(subst_pos, 2, 1, '%');
917 continue;
918 case 'q':
919 tmp_path.replace(subst_pos, 2, job_context.GetQueueName());
920 break;
921 case 'j':
922 tmp_path.replace(subst_pos, 2, job_context.GetJobKey());
923 break;
924 case 'r':
925 tmp_path.replace(subst_pos, 2, NStr::UInt8ToString(
926 GetDiagContext().GetRequestContext().GetRequestID()));
927 break;
928 case 't':
929 tmp_path.replace(subst_pos, 2, NStr::UIntToString(
930 (unsigned) lt.GetLocalTime().GetTimeT()));
931 break;
932 default:
933 tmp_path.erase(subst_pos, 2);
934 }
935 substitution_found = true;
936 }
937 if (!substitution_found)
938 tmp_path += CDirEntry::GetPathSeparator() +
939 job_context.GetQueueName() + "_" +
940 job_context.GetJobKey() + "_" +
941 NStr::UIntToString((unsigned) lt.GetLocalTime().GetTimeT());
942 }
943
944 CRemoteAppRemover::SGuard guard(m_Remover.get(), tmp_path, m_RemoveTempDir);
945 {
946 CTmpStreamGuard std_out_guard(tmp_path, "std.out", out,
947 m_CacheStdOutErr);
948 CTmpStreamGuard std_err_guard(tmp_path, "std.err", err,
949 m_CacheStdOutErr);
950
951 CTimeout run_timeout = min(s_ToTimeout(app_run_timeout), m_AppRunTimeout);
952 string working_dir(tmp_path.empty() ? CDir::GetCwd() : tmp_path);
953
954 #ifdef NCBI_OS_MSWIN
955 NStr::ReplaceInPlace(working_dir, "\\", "/");
956 #endif
957
958 CJobContextProcessWatcher::SParams params(job_context,
959 "Job",
960 run_timeout,
961 m_KeepAlivePeriod,
962 *m_TimeoutReporter,
963 m_Reaper->GetScheduler());
964
965 bool monitor = !m_MonitorAppPath.empty() && m_MonitorPeriod.IsFinite();
966
967 unique_ptr<CPipe::IProcessWatcher> watcher(monitor ?
968 new CMonitoredProcessWatcher(params, working_dir,
969 m_MonitorAppPath, env, m_MonitorPeriod, m_MonitorRunTimeout) :
970 new CJobContextProcessWatcher(params));
971
972 bool result = CPipe::ExecWait(GetAppPath(), args, in,
973 std_out_guard.GetOStream(),
974 std_err_guard.GetOStream(),
975 exit_value,
976 tmp_path, env, watcher.get(),
977 &m_KillTimeout,
978 PIPE_SIZE) == CPipe::eDone;
979
980 std_err_guard.Close();
981 std_out_guard.Close();
982
983 return result;
984 }
985 }
986
FinishJob(bool finished_ok,int ret,CWorkerNodeJobContext & context) const987 void CRemoteAppLauncher::FinishJob(bool finished_ok, int ret,
988 CWorkerNodeJobContext& context) const
989 {
990 if (!finished_ok) {
991 if (!context.IsJobCommitted())
992 context.CommitJobWithFailure("Job has been canceled");
993 } else
994 // Check whether retries are disabled for the specified exit code.
995 if (m_MustFailNoRetries && m_MustFailNoRetries->Contain(ret))
996 context.CommitJobWithFailure(
997 "Exited with return code " + NStr::IntToString(ret) +
998 " - will not be rerun",
999 true /* no retries */);
1000 else if (ret == 0 || m_NonZeroExitAction == eDoneOnNonZeroExit)
1001 context.CommitJob();
1002 else if (m_NonZeroExitAction == eReturnOnNonZeroExit)
1003 context.ReturnJob();
1004 else
1005 context.CommitJobWithFailure(
1006 "Exited with return code " + NStr::IntToString(ret));
1007 }
1008
GetAppVersion(const string & v) const1009 string CRemoteAppLauncher::GetAppVersion(const string& v) const
1010 {
1011 CTimedProcessWatcher::SParams params("Version", CTimeout(1.0), m_Reaper->GetScheduler());
1012 return m_Version->Get(params, v);
1013 }
1014
OnGridWorkerStart()1015 void CRemoteAppLauncher::OnGridWorkerStart()
1016 {
1017 m_Reaper->StartExecutor();
1018 if (m_Remover) m_Remover->StartExecutor();
1019 }
1020
Run(CWorkerNodeIdleTaskContext &)1021 void CRemoteAppIdleTask::Run(CWorkerNodeIdleTaskContext&)
1022 {
1023 if (!m_AppCmd.empty())
1024 CExec::System(m_AppCmd.c_str());
1025 }
1026
1027 END_NCBI_SCOPE
1028