1 /*  $Id: exec_helpers.cpp 599189 2019-12-20 15:57:19Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Dmitry Kazimirov, Rafael Sadyrov
27  *
28  * File Description:  NetSchedule worker node sample
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <sstream>
35 
36 #include <connect/ncbi_pipe.hpp>
37 
38 #include <corelib/rwstream.hpp>
39 #include <corelib/request_ctx.hpp>
40 
41 #if defined(NCBI_OS_UNIX)
42 #include <fcntl.h>
43 #endif
44 
45 #include "exec_helpers.hpp"
46 #include "async_task.hpp"
47 
48 #define PIPE_SIZE 64 * 1024
49 
50 BEGIN_NCBI_SCOPE
51 
52 // A record for a child process
53 struct CRemoteAppReaperTask
54 {
55     const CProcess process;
56 
CRemoteAppReaperTaskCRemoteAppReaperTask57     CRemoteAppReaperTask(TProcessHandle handle) : process(handle) {}
58 
59     bool operator()(int current, int max_attempts);
60 };
61 
operator ()(int current,int max_attempts)62 bool CRemoteAppReaperTask::operator()(int current, int max_attempts)
63 {
64     CProcess::CExitInfo exitinfo;
65     const bool first_attempt = current == 1;
66 
67     if (process.Wait(0, &exitinfo) != -1 || exitinfo.IsExited() || exitinfo.IsSignaled()) {
68         // Log a message for those that had failed to be killed before
69         if (!first_attempt) {
70             LOG_POST(Note << "Successfully waited for a process: " << process.GetHandle());
71         }
72 
73         return true;
74     }
75 
76     if (first_attempt) {
77         if (process.KillGroup()) return true;
78 
79         ERR_POST(Warning << "Failed to kill a process: " << process.GetHandle() << ", will wait for it");
80         return false;
81     }
82 
83     if (current > max_attempts) {
84         // Give up if there are too many attempts to wait for a process
85         ERR_POST("Gave up waiting for a process: " << process.GetHandle());
86         return true;
87     }
88 
89     return false;
90 }
91 
92 // This class is responsibe for the whole process of reaping child processes
93 class CRemoteAppReaper : public CAsyncTaskProcessor<CRemoteAppReaperTask>
94 {
95     using CAsyncTaskProcessor<CRemoteAppReaperTask>::CAsyncTaskProcessor;
96 };
97 
98 struct CRemoteAppRemoverTask
99 {
100     const string path;
101 
102     CRemoteAppRemoverTask(string p);
103 
104     bool operator()(int current, int max_attempts) const;
105 };
106 
CRemoteAppRemoverTask(string p)107 CRemoteAppRemoverTask::CRemoteAppRemoverTask(string p) :
108     path(move(p))
109 {
110     if (path.empty()) return;
111 
112     CDir dir(path);
113 
114     if (dir.Exists()) return;
115 
116     dir.CreatePath();
117 }
118 
operator ()(int current,int max_attempts) const119 bool CRemoteAppRemoverTask::operator()(int current, int max_attempts) const
120 {
121     if (path.empty()) return true;
122 
123     const bool first_attempt = current == 1;
124 
125     try {
126         if (CDir(path).Remove(CDirEntry::eRecursiveIgnoreMissing)) {
127             // Log a message for those that had failed to be removed before
128             if (!first_attempt) {
129                 LOG_POST(Note << "Successfully removed a path: " << path);
130             }
131 
132             return true;
133         }
134     }
135     catch (...) {
136     }
137 
138     if (current > max_attempts) {
139         // Give up if there are too many attempts to remove a path
140         ERR_POST("Gave up removing a path: " << path);
141         return true;
142     }
143 
144     if (first_attempt) {
145         ERR_POST(Warning << "Failed to remove a path: " << path << ", will try later");
146         return false;
147     }
148 
149     return false;
150 }
151 
152 // This class is responsibe for removing tmp directories
153 class CRemoteAppRemover : public CAsyncTaskProcessor<CRemoteAppRemoverTask>
154 {
155 public:
156     struct SGuard;
157 
158     using CAsyncTaskProcessor<CRemoteAppRemoverTask>::CAsyncTaskProcessor;
159 };
160 
161 struct CRemoteAppRemover::SGuard
162 {
SGuardCRemoteAppRemover::SGuard163     SGuard(CRemoteAppRemover* remover, CRemoteAppRemoverTask task, bool remove_tmp_dir) :
164         m_Scheduler(remover ? &remover->GetScheduler() : nullptr),
165         m_Task(move(task)),
166         m_RemoveTmpDir(remove_tmp_dir)
167     {}
168 
~SGuardCRemoteAppRemover::SGuard169     ~SGuard()
170     {
171         // Do not remove
172         if (!m_RemoveTmpDir) return;
173 
174         // Remove asynchronously
175         if (m_Scheduler && (*m_Scheduler)(m_Task)) return;
176 
177         // Remove synchronously
178         m_Task(1, 0);
179     }
180 
181 private:
182     CRemoteAppRemover::CScheduler* m_Scheduler;
183     CRemoteAppRemoverTask m_Task;
184     bool m_RemoveTmpDir;
185 };
186 
187 class CTimer
188 {
189 public:
CTimer(const CTimeout & timeout)190     CTimer(const CTimeout& timeout) :
191         m_Deadline(timeout),
192         m_Timeout(timeout)
193     {}
194 
Restart()195     void Restart() { m_Deadline = m_Timeout; }
IsExpired() const196     bool IsExpired() const { return m_Deadline.IsExpired(); }
PresetSeconds() const197     unsigned PresetSeconds() const { return (unsigned)m_Timeout.GetAsDouble(); }
198 
199 private:
200     CDeadline m_Deadline;
201     CTimeout m_Timeout;
202 };
203 
204 class CTimedProcessWatcher : public CPipe::IProcessWatcher
205 {
206 public:
207     struct SParams
208     {
209         string process_type;
210         const CTimeout& run_timeout;
211         CRemoteAppReaper::CScheduler& process_manager;
212 
SParamsCTimedProcessWatcher::SParams213         SParams(string pt, const CTimeout& rt, CRemoteAppReaper::CScheduler& pm) :
214             process_type(move(pt)),
215             run_timeout(rt),
216             process_manager(pm)
217         {}
218     };
219 
CTimedProcessWatcher(SParams & p)220     CTimedProcessWatcher(SParams& p)
221         : m_ProcessManager(p.process_manager),
222           m_ProcessType(p.process_type),
223           m_Deadline(p.run_timeout)
224     {
225     }
226 
Watch(TProcessHandle pid)227     virtual EAction Watch(TProcessHandle pid)
228     {
229         if (m_Deadline.IsExpired()) {
230             ERR_POST(m_ProcessType << " run time exceeded "
231                      << m_Deadline.PresetSeconds()
232                      <<" second(s), stopping the child: " << pid);
233             return m_ProcessManager(pid) ? eExit : eStop;
234         }
235 
236         return eContinue;
237     }
238 
239 protected:
240     CRemoteAppReaper::CScheduler& m_ProcessManager;
241     const string m_ProcessType;
242     const CTimer m_Deadline;
243 };
244 
245 // This class is responsible for reporting app/cgi version run by this app
246 class CRemoteAppVersion
247 {
248 public:
CRemoteAppVersion(const string & app,const vector<string> & args)249     CRemoteAppVersion(const string& app, const vector<string>& args)
250         : m_App(app), m_Args(args)
251     {}
252 
253     string Get(CTimedProcessWatcher::SParams& p, const string& v) const;
254 
255 private:
256     const string m_App;
257     const vector<string> m_Args;
258 };
259 
Get(CTimedProcessWatcher::SParams & p,const string & v) const260 string CRemoteAppVersion::Get(CTimedProcessWatcher::SParams& p, const string& v) const
261 {
262     CTimedProcessWatcher  wait_one_second(p);
263     istringstream in;
264     ostringstream out;
265     ostringstream err;
266     int exit_code;
267 
268     if (CPipe::ExecWait(m_App, m_Args, in, out, err, exit_code,
269                 kEmptyStr, 0, &wait_one_second) == CPipe::eDone) {
270         // Restrict version string to 1024 chars
271         string app_ver(out.str(), 0, 1024);
272         return NStr::Sanitize(app_ver) + " / " + v;
273     }
274 
275     return v;
276 }
277 
278 // This class is responsible for reporting clients about apps timing out
279 class CRemoteAppTimeoutReporter
280 {
281 public:
CRemoteAppTimeoutReporter(const string & mode)282     CRemoteAppTimeoutReporter(const string& mode) : m_Mode(Get(mode)) {}
283 
Report(CWorkerNodeJobContext & job_context,unsigned seconds)284     void Report(CWorkerNodeJobContext& job_context, unsigned seconds)
285     {
286         if (m_Mode != eNever) {
287             job_context.PutProgressMessage("Job run time exceeded " +
288                     NStr::UIntToString(seconds) + " seconds.", true,
289                     m_Mode == eAlways);
290         }
291     }
292 
293 private:
294     enum EMode { eSmart, eAlways, eNever };
295 
Get(const string & mode)296     static EMode Get(const string& mode)
297     {
298         if (!NStr::CompareNocase(mode, "smart"))
299             return eSmart;
300 
301         else if (!NStr::CompareNocase(mode, "always"))
302             return eAlways;
303 
304         else if (!NStr::CompareNocase(mode, "never"))
305             return eNever;
306         else
307             ERR_POST("Unknown parameter value: "
308                     "parameter \"progress_message_on_timeout\", "
309                     "value: \"" << mode << "\". "
310                     "Allowed values: smart, always, never");
311 
312         return eSmart;
313     }
314 
315     EMode m_Mode;
316 };
317 
318 /// Class representing (non overlapping) integer ranges.
319 ///
320 class CRanges
321 {
322 public:
323     /// Reads integer ranges from an input stream.
324     ///
325     /// The data must be in the following format (in ascending order):
326     ///     [!] R1, N2, ..., Rn
327     ///
328     /// Where:
329     ///     !         - negation, makes all provided ranges be excluded
330     ///                 (not included).
331     ///     R1 ... Rn - integer closed ranges specified either as
332     ///                 FROM - TO (corresponds to [FROM, TO] range) or as
333     ///                 NUMBER (corresponds to [NUMBER, NUMBER] range).
334     /// Example:
335     ///     4, 6 - 9, 16 - 40, 64
336     ///
337     CRanges(istream& is);
338 
339     /// Checks whether provided number belongs to one of the ranges.
340     ///
341     bool Contain(int n) const;
342 
343 private:
344     vector<int> m_Ranges;
345 };
346 
CRanges(istream & is)347 CRanges::CRanges(istream& is)
348 {
349     char ch;
350 
351     if (!(is >> skipws >> ch)) return; // EoF
352 
353     // Char '!' makes provided ranges to be excluded (not included)
354     if (ch == '!')
355         m_Ranges.push_back(numeric_limits<int>::min());
356     else
357         is.putback(ch);
358 
359     bool interval = false;
360 
361     do {
362         int n;
363 
364         if (!(is >> n)) break;
365         if (!(is >> ch)) ch = ',';
366 
367         if (m_Ranges.size()) {
368             int previous =  m_Ranges.back();
369 
370             if (n < previous) {
371                 ostringstream err;
372                 err << n << " is less or equal than previous number, "
373                     "intervals must be sorted and not overlapping";
374                 throw invalid_argument(err.str());
375             }
376         }
377 
378         if (ch == ',') {
379             // If it's only one number in interval, add both start and end of it
380             if (!interval) m_Ranges.push_back(n);
381 
382             m_Ranges.push_back(n + 1);
383             interval = false;
384 
385         } else if (ch == '-' && !interval) {
386             interval = true;
387             m_Ranges.push_back(n);
388 
389         } else {
390             ostringstream err;
391             err << "Unexpected char '" << ch << "'";
392             throw invalid_argument(err.str());
393         }
394     } while (is);
395 
396     if (interval) {
397         ostringstream err;
398         err << "Missing interval end";
399         throw invalid_argument(err.str());
400     }
401 
402     if (!is.eof()) {
403         ostringstream err;
404         err << "Not a number near: " << is.rdbuf();
405         throw invalid_argument(err.str());
406     }
407 }
408 
Contain(int n) const409 bool CRanges::Contain(int n) const
410 {
411     auto range = upper_bound(m_Ranges.begin(), m_Ranges.end(), n);
412 
413     // Every number starts a range, every second range is excluded
414     return (range - m_Ranges.begin()) % 2;
415 }
416 
s_ReadRanges(const IRegistry & reg,const string & sec,string param)417 CRanges* s_ReadRanges(const IRegistry& reg, const string& sec, string param)
418 {
419     if (reg.HasEntry(sec, param)) {
420         istringstream iss(reg.GetString(sec, param, kEmptyStr));
421 
422         try {
423             return new CRanges(iss);
424         }
425         catch (invalid_argument& ex) {
426             NCBI_THROW_FMT(CInvalidParamException, eInvalidCharacter,
427                 "Parameter '" << param << "' parsing error: " << ex.what());
428         }
429     }
430 
431     return nullptr;
432 }
433 
s_ToTimeout(unsigned sec)434 CTimeout s_ToTimeout(unsigned sec)
435 {
436     // Zero counts as infinite timeout
437     return sec ? CTimeout(sec, 0) : CTimeout::eInfinite;
438 }
439 
440 struct SSection
441 {
442     const IRegistry& reg;
443     const string& name;
444 
SSectionSSection445     SSection(const IRegistry& r, const string& n) : reg(r), name(n) {}
446 
GetSSection447     int Get(const string& param, int def) const
448     {
449         return reg.GetInt(name, param, def, 0, IRegistry::eReturn);
450     }
451 
GetSSection452     bool Get(const string& param, bool def) const
453     {
454         return reg.GetBool(name, param, def, 0, IRegistry::eReturn);
455     }
456 };
457 
458 //////////////////////////////////////////////////////////////////////////////
459 ///
CRemoteAppLauncher(const string & sec_name,const IRegistry & reg)460 CRemoteAppLauncher::CRemoteAppLauncher(const string& sec_name,
461         const IRegistry& reg) :
462       m_NonZeroExitAction(eDoneOnNonZeroExit),
463       m_RemoveTempDir(true),
464       m_CacheStdOutErr(true)
465 {
466     const SSection sec(reg, sec_name);
467 
468     m_AppRunTimeout = s_ToTimeout(sec.Get("max_app_run_time", 0));
469     m_KeepAlivePeriod = s_ToTimeout(sec.Get("keep_alive_period", 0));
470 
471     if (reg.HasEntry(sec_name, "non_zero_exit_action") ) {
472         string val = reg.GetString(sec_name, "non_zero_exit_action", "");
473         if (NStr::CompareNocase(val, "fail") == 0 )
474             m_NonZeroExitAction = eFailOnNonZeroExit;
475         else if (NStr::CompareNocase(val, "return") == 0 )
476             m_NonZeroExitAction = eReturnOnNonZeroExit;
477         else if (NStr::CompareNocase(val, "done") == 0 )
478             m_NonZeroExitAction = eDoneOnNonZeroExit;
479         else {
480            ERR_POST("Unknown parameter value: "
481                    "section [" << sec_name << "], "
482                    "parameter \"non_zero_exit_action\", "
483                    "value: \"" << val << "\". "
484                    "Allowed values: fail, return, done");
485         }
486     } else if (sec.Get("fail_on_non_zero_exit", false))
487         m_NonZeroExitAction = eFailOnNonZeroExit;
488 
489     m_MustFailNoRetries.reset(
490             s_ReadRanges(reg, sec_name, "fail_no_retries_if_exit_code"));
491 
492     const string name = CNcbiApplication::Instance()->GetProgramDisplayName();
493 
494     if (sec.Get("run_in_separate_dir", false)) {
495         if (reg.HasEntry(sec_name, "tmp_dir"))
496             m_TempDir = reg.GetString(sec_name, "tmp_dir", "." );
497         else
498             m_TempDir = reg.GetString(sec_name, "tmp_path", "." );
499 
500         if (!CDirEntry::IsAbsolutePath(m_TempDir)) {
501             string tmp = CDir::GetCwd()
502                 + CDirEntry::GetPathSeparator()
503                 + m_TempDir;
504             m_TempDir = CDirEntry::NormalizePath(tmp);
505         }
506         if (reg.HasEntry(sec_name, "remove_tmp_dir"))
507             m_RemoveTempDir = sec.Get("remove_tmp_dir", true);
508         else
509             m_RemoveTempDir = sec.Get("remove_tmp_path", true);
510 
511         int sleep = sec.Get("sleep_between_remove_tmp_attempts", 60);
512         int max_attempts = sec.Get("max_remove_tmp_attempts", 60);
513         m_Remover.reset(new CRemoteAppRemover(sleep, max_attempts, name + "_rm"));
514 
515         m_CacheStdOutErr = sec.Get("cache_std_out_err", true);
516     }
517 
518     m_AppPath = reg.GetString(sec_name, "app_path", kEmptyStr);
519     if (m_AppPath.empty()) {
520         NCBI_THROW_FMT(CConfigException, eParameterMissing,
521                 "Missing configuration parameter [" << sec_name <<
522                 "].app_path");
523     }
524     if (!CDirEntry::IsAbsolutePath(m_AppPath)) {
525         string tmp = CDir::GetCwd()
526             + CDirEntry::GetPathSeparator()
527             + m_AppPath;
528         m_AppPath = CDirEntry::NormalizePath(tmp);
529     }
530 
531     m_MonitorAppPath = reg.GetString(sec_name, "monitor_app_path", kEmptyStr);
532     if (!m_MonitorAppPath.empty()) {
533         if (!CDirEntry::IsAbsolutePath(m_MonitorAppPath)) {
534             string tmp = CDir::GetCwd()
535                 + CDirEntry::GetPathSeparator()
536                 + m_MonitorAppPath;
537             m_MonitorAppPath = CDirEntry::NormalizePath(tmp);
538         }
539         CFile f(m_MonitorAppPath);
540         if (!f.Exists() || !CanExec(f)) {
541             ERR_POST("Can not execute \"" << m_MonitorAppPath
542                      << "\". The Monitor application will not run!");
543             m_MonitorAppPath = kEmptyStr;
544         }
545     }
546 
547     m_MonitorRunTimeout = s_ToTimeout(sec.Get("max_monitor_running_time", 5));
548     m_MonitorPeriod = s_ToTimeout(sec.Get("monitor_period", 5));
549     m_KillTimeout.sec = sec.Get("kill_timeout", 1);
550     m_KillTimeout.usec = 0;
551 
552     m_ExcludeEnv.clear();
553     m_IncludeEnv.clear();
554     m_AddedEnv.clear();
555 
556     NStr::Split(reg.GetString("env_inherit", "exclude", "")," ;,", m_ExcludeEnv,
557             NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
558     NStr::Split(reg.GetString("env_inherit", "include", "")," ;,", m_IncludeEnv,
559             NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
560 
561     list<string> added_env;
562     reg.EnumerateEntries("env_set", &added_env);
563 
564     ITERATE(list<string>, it, added_env) {
565         const string& s = *it;
566          m_AddedEnv[s] = reg.GetString("env_set", s, "");
567     }
568 
569     int sleep = sec.Get("sleep_between_reap_attempts", 60);
570     int max_attempts = sec.Get("max_reap_attempts_after_kill", 60);
571     m_Reaper.reset(new CRemoteAppReaper(sleep, max_attempts, name + "_cl"));
572 
573     const string cmd = reg.GetString(sec_name, "version_cmd", m_AppPath);
574     const string args = reg.GetString(sec_name, "version_args", "-version");
575     vector<string> v;
576     m_Version.reset(new CRemoteAppVersion(cmd,
577                 NStr::Split(args, " ", v)));
578 
579     const string mode = reg.GetString(sec_name, "progress_message_on_timeout",
580             "smart");
581     m_TimeoutReporter.reset(new CRemoteAppTimeoutReporter(mode));
582 }
583 
584 // We need this explicit empty destructor,
585 // so it could destruct CRemoteAppReaper and CRemoteAppVersion instances.
586 // Otherwise, there would be implicit inline destructor
587 // that could be placed where these classes are incomplete.
~CRemoteAppLauncher()588 CRemoteAppLauncher::~CRemoteAppLauncher()
589 {
590 }
591 
592 //////////////////////////////////////////////////////////////////////////////
593 ///
CanExec(const CFile & file)594 bool CRemoteAppLauncher::CanExec(const CFile& file)
595 {
596     CDirEntry::TMode user_mode  = 0;
597     if (!file.GetMode(&user_mode))
598         return false;
599     if (user_mode & CDirEntry::fExecute)
600         return true;
601     return false;
602 }
603 
604 //////////////////////////////////////////////////////////////////////////////
605 ///
606 class CJobContextProcessWatcher : public CTimedProcessWatcher
607 {
608 public:
609     struct SParams : CTimedProcessWatcher::SParams
610     {
611         CWorkerNodeJobContext& job_context;
612         const CTimeout& keep_alive_period;
613         CRemoteAppTimeoutReporter& timeout_reporter;
614 
SParamsCJobContextProcessWatcher::SParams615         SParams(CWorkerNodeJobContext& jc,
616                 string pt,
617                 const CTimeout& rt,
618                 const CTimeout& kap,
619                 CRemoteAppTimeoutReporter& tr,
620                 CRemoteAppReaper::CScheduler& pm)
621             : CTimedProcessWatcher::SParams(move(pt), rt, pm),
622                 job_context(jc),
623                 keep_alive_period(kap),
624                 timeout_reporter(tr)
625         {}
626     };
627 
CJobContextProcessWatcher(SParams & p)628     CJobContextProcessWatcher(SParams& p)
629         : CTimedProcessWatcher(p),
630           m_JobContext(p.job_context), m_KeepAlive(p.keep_alive_period),
631           m_TimeoutReporter(p.timeout_reporter)
632     {
633     }
634 
OnStart(TProcessHandle pid)635     virtual EAction OnStart(TProcessHandle pid)
636     {
637         if (m_JobContext.GetShutdownLevel() ==
638             CNetScheduleAdmin::eShutdownImmediate) {
639             return eStop;
640         }
641 
642         LOG_POST(Note << "Child PID: " << NStr::UInt8ToString((Uint8) pid));
643 
644         return CTimedProcessWatcher::OnStart(pid);
645     }
646 
Watch(TProcessHandle pid)647     virtual EAction Watch(TProcessHandle pid)
648     {
649         if (m_JobContext.GetShutdownLevel() ==
650                 CNetScheduleAdmin::eShutdownImmediate) {
651             m_JobContext.ReturnJob();
652             return eStop;
653         }
654 
655         EAction action = CTimedProcessWatcher::Watch(pid);
656 
657         if (action != eContinue) {
658             m_TimeoutReporter.Report(m_JobContext, m_Deadline.PresetSeconds());
659             return action;
660         }
661 
662         if (m_KeepAlive.IsExpired()) {
663             m_JobContext.JobDelayExpiration(m_KeepAlive.PresetSeconds() + 10);
664             m_KeepAlive.Restart();
665         }
666 
667         return eContinue;
668     }
669 
670 protected:
671     CWorkerNodeJobContext& m_JobContext;
672 
673 private:
674     CTimer m_KeepAlive;
675     CRemoteAppTimeoutReporter& m_TimeoutReporter;
676 };
677 
678 //////////////////////////////////////////////////////////////////////////////
679 ///
680 class CMonitoredProcessWatcher : public CJobContextProcessWatcher
681 {
682 public:
CMonitoredProcessWatcher(SParams & p,const string & job_wdir,const string & path,const char * const * env,CTimeout run_period,CTimeout run_timeout)683     CMonitoredProcessWatcher(SParams& p, const string& job_wdir,
684             const string& path, const char* const* env,
685             CTimeout run_period, CTimeout run_timeout)
686         : CJobContextProcessWatcher(p),
687           m_MonitorWatch(run_period),
688           m_JobWDir(job_wdir),
689           m_Path(path),
690           m_Env(env),
691           m_RunTimeout(run_timeout)
692     {
693     }
694 
Watch(TProcessHandle pid)695     virtual EAction Watch(TProcessHandle pid)
696     {
697         EAction action = CJobContextProcessWatcher::Watch(pid);
698 
699         if (action != eContinue)
700             return action;
701 
702         if (m_MonitorWatch.IsExpired()) {
703             action = MonitorRun(pid);
704             m_MonitorWatch.Restart();
705         }
706 
707         return action;
708     }
709 
710 private:
711     // The exit code of the monitor program is interpreted as follows
712     // (any exit code not listed below is treated as eInternalError)
713     enum EResult {
714         // The job is running as expected.
715         // The monitor's stdout is interpreted as a job progress message.
716         // The stderr goes to the log file if logging is enabled.
717         eJobRunning = 0,
718         // The monitor detected an inconsistency with the job run;
719         // the job must be returned back to the queue.
720         // The monitor's stderr goes to the log file
721         // regardless of whether logging is enabled or not.
722         eJobToReturn = 1,
723         // The job must be failed.
724         // The monitor's stdout is interpreted as the error message;
725         // stderr goes to the log file regardless of whether
726         // logging is enabled or not.
727         eJobFailed = 2,
728         // There's a problem with the monitor application itself.
729         // The job continues to run and the monitor's stderr goes
730         // to the log file unconditionally.
731         eInternalError = 3,
732     };
733 
MonitorRun(TProcessHandle pid)734     EAction MonitorRun(TProcessHandle pid)
735     {
736         CNcbiStrstream in;
737         CNcbiOstrstream out;
738         CNcbiOstrstream err;
739         vector<string> args;
740         args.push_back("-pid");
741         args.push_back(NStr::UInt8ToString((Uint8) pid));
742         args.push_back("-jid");
743         args.push_back(m_JobContext.GetJobKey());
744         args.push_back("-jwdir");
745         args.push_back(m_JobWDir);
746 
747         CTimedProcessWatcher::SParams params("Monitor", m_RunTimeout, m_ProcessManager);
748         CTimedProcessWatcher callback(params);
749         int exit_value = eInternalError;
750         try {
751             if (CPipe::eDone != CPipe::ExecWait(m_Path, args, in,
752                                   out, err, exit_value,
753                                   kEmptyStr, m_Env,
754                                   &callback,
755                                   NULL,
756                                   PIPE_SIZE)) {
757                 exit_value = eInternalError;
758             }
759         }
760         catch (exception& ex) {
761             err << ex.what();
762         }
763         catch (...) {
764             err << "Unknown error";
765         }
766 
767         switch (exit_value) {
768         case eJobRunning:
769             {
770                 bool non_empty_output = !IsOssEmpty(out);
771                 if (non_empty_output) {
772                     m_JobContext.PutProgressMessage
773                         (CNcbiOstrstreamToString(out), true);
774                 }
775                 if (m_JobContext.IsLogRequested() &&
776                     ( !non_empty_output || !IsOssEmpty(err) ))
777                     x_Log("exited with zero return code", err);
778             }
779             return eContinue;
780 
781         case eJobToReturn:
782             m_JobContext.ReturnJob();
783             x_Log("job is returned", err);
784             return eStop;
785 
786         case eJobFailed:
787             {
788                 x_Log("job failed", err);
789                 string errmsg;
790                 if ( !IsOssEmpty(out) ) {
791                     errmsg = CNcbiOstrstreamToString(out);
792                 } else
793                     errmsg = "Monitor requested job termination";
794                 throw runtime_error(errmsg);
795             }
796             return eContinue;
797         }
798 
799         x_Log("internal error", err);
800         return eContinue;
801     }
802 
x_Log(const string & what,CNcbiOstrstream & sstream)803     inline void x_Log(const string& what, CNcbiOstrstream& sstream)
804     {
805         if ( !IsOssEmpty(sstream) ) {
806             ERR_POST(m_JobContext.GetJobKey() << " (monitor) " << what <<
807                      ": " << (string)CNcbiOstrstreamToString(sstream));
808         } else {
809             ERR_POST(m_JobContext.GetJobKey() << " (monitor) " << what << ".");
810         }
811     }
812 
813     CTimer m_MonitorWatch;
814     string m_JobWDir;
815     string m_Path;
816     const char* const* m_Env;
817     CTimeout m_RunTimeout;
818 };
819 
820 
821 //////////////////////////////////////////////////////////////////////////////
822 ///
823 
824 class CTmpStreamGuard
825 {
826 public:
CTmpStreamGuard(const string & tmp_dir,const string & name,CNcbiOstream & orig_stream,bool cache_std_out_err)827     CTmpStreamGuard(const string& tmp_dir,
828         const string& name,
829         CNcbiOstream& orig_stream,
830         bool cache_std_out_err) : m_OrigStream(orig_stream), m_Stream(NULL)
831     {
832         if (!tmp_dir.empty() && cache_std_out_err) {
833             m_Name = tmp_dir + CDirEntry::GetPathSeparator();
834             m_Name += name;
835         }
836         if (!m_Name.empty()) {
837             try {
838                m_ReaderWriter.reset(new CFileReaderWriter(m_Name,
839                    CFileIO_Base::eCreate));
840             } catch (CFileException& ex) {
841                 ERR_POST("Could not create a temporary file " <<
842                     m_Name << " :" << ex.what() << " the data will be "
843                     "written directly to the original stream");
844                 m_Name.erase();
845                 m_Stream = &m_OrigStream;
846                 return;
847             }
848 #if defined(NCBI_OS_UNIX)
849             // If the file is created on an NFS file system, the CLOEXEC
850             // flag needs to be set, otherwise deleting the temporary
851             // directory will not succeed.
852             TFileHandle fd = m_ReaderWriter->GetFileIO().GetFileHandle();
853             fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
854 #endif
855             m_StreamGuard.reset(new CWStream(m_ReaderWriter.get()));
856             m_Stream = m_StreamGuard.get();
857         } else {
858             m_Stream = &m_OrigStream;
859         }
860     }
~CTmpStreamGuard()861     ~CTmpStreamGuard()
862     {
863         try {
864             Close();
865         }
866         catch (exception& ex) {
867             ERR_POST("CTmpStreamGuard::~CTmpStreamGuard(): " <<
868                 m_Name << " --> " << ex.what());
869         }
870     }
871 
GetOStream()872     CNcbiOstream& GetOStream() { return *m_Stream; }
873 
Close()874     void Close()
875     {
876         if (!m_Name.empty() && m_StreamGuard.get()) {
877             m_StreamGuard.reset();
878             m_ReaderWriter->Flush();
879             m_ReaderWriter->GetFileIO().SetFilePos(0, CFileIO_Base::eBegin);
880             {
881             CRStream rstm(m_ReaderWriter.get());
882             if (!rstm.good()
883                 || !NcbiStreamCopy(m_OrigStream, rstm))
884                 ERR_POST( "Cannot copy \"" << m_Name << "\" file.");
885             }
886             m_ReaderWriter.reset();
887         }
888     }
889 
890 private:
891     CNcbiOstream& m_OrigStream;
892     unique_ptr<CFileReaderWriter> m_ReaderWriter;
893     unique_ptr<CNcbiOstream> m_StreamGuard;
894     CNcbiOstream* m_Stream;
895     string m_Name;
896 };
897 
898 
ExecRemoteApp(const vector<string> & args,CNcbiIstream & in,CNcbiOstream & out,CNcbiOstream & err,int & exit_value,CWorkerNodeJobContext & job_context,unsigned app_run_timeout,const char * const env[]) const899 bool CRemoteAppLauncher::ExecRemoteApp(const vector<string>& args,
900     CNcbiIstream& in, CNcbiOstream& out, CNcbiOstream& err,
901     int& exit_value,
902     CWorkerNodeJobContext& job_context,
903     unsigned app_run_timeout,
904     const char* const env[]) const
905 {
906     string tmp_path = m_TempDir;
907     if (!tmp_path.empty()) {
908         CFastLocalTime lt;
909         bool substitution_found = false;
910         size_t subst_pos;
911         while ((subst_pos = tmp_path.find('%')) != string::npos) {
912             if (subst_pos + 1 >= tmp_path.length())
913                 break;
914             switch (tmp_path[subst_pos + 1]) {
915             case '%':
916                 tmp_path.replace(subst_pos, 2, 1, '%');
917                 continue;
918             case 'q':
919                 tmp_path.replace(subst_pos, 2, job_context.GetQueueName());
920                 break;
921             case 'j':
922                 tmp_path.replace(subst_pos, 2, job_context.GetJobKey());
923                 break;
924             case 'r':
925                 tmp_path.replace(subst_pos, 2, NStr::UInt8ToString(
926                     GetDiagContext().GetRequestContext().GetRequestID()));
927                 break;
928             case 't':
929                 tmp_path.replace(subst_pos, 2, NStr::UIntToString(
930                     (unsigned) lt.GetLocalTime().GetTimeT()));
931                 break;
932             default:
933                 tmp_path.erase(subst_pos, 2);
934             }
935             substitution_found = true;
936         }
937         if (!substitution_found)
938             tmp_path += CDirEntry::GetPathSeparator() +
939                 job_context.GetQueueName() + "_"  +
940                 job_context.GetJobKey() + "_" +
941                 NStr::UIntToString((unsigned) lt.GetLocalTime().GetTimeT());
942     }
943 
944     CRemoteAppRemover::SGuard guard(m_Remover.get(), tmp_path, m_RemoveTempDir);
945     {
946         CTmpStreamGuard std_out_guard(tmp_path, "std.out", out,
947             m_CacheStdOutErr);
948         CTmpStreamGuard std_err_guard(tmp_path, "std.err", err,
949             m_CacheStdOutErr);
950 
951         CTimeout run_timeout = min(s_ToTimeout(app_run_timeout), m_AppRunTimeout);
952         string working_dir(tmp_path.empty() ? CDir::GetCwd() : tmp_path);
953 
954 #ifdef NCBI_OS_MSWIN
955         NStr::ReplaceInPlace(working_dir, "\\", "/");
956 #endif
957 
958         CJobContextProcessWatcher::SParams params(job_context,
959                 "Job",
960                 run_timeout,
961                 m_KeepAlivePeriod,
962                 *m_TimeoutReporter,
963                 m_Reaper->GetScheduler());
964 
965         bool monitor = !m_MonitorAppPath.empty() && m_MonitorPeriod.IsFinite();
966 
967         unique_ptr<CPipe::IProcessWatcher> watcher(monitor ?
968                 new CMonitoredProcessWatcher(params, working_dir,
969                     m_MonitorAppPath, env, m_MonitorPeriod, m_MonitorRunTimeout) :
970                 new CJobContextProcessWatcher(params));
971 
972         bool result = CPipe::ExecWait(GetAppPath(), args, in,
973                                std_out_guard.GetOStream(),
974                                std_err_guard.GetOStream(),
975                                exit_value,
976                                tmp_path, env, watcher.get(),
977                                &m_KillTimeout,
978                                PIPE_SIZE) == CPipe::eDone;
979 
980         std_err_guard.Close();
981         std_out_guard.Close();
982 
983         return result;
984     }
985 }
986 
FinishJob(bool finished_ok,int ret,CWorkerNodeJobContext & context) const987 void CRemoteAppLauncher::FinishJob(bool finished_ok, int ret,
988         CWorkerNodeJobContext& context) const
989 {
990     if (!finished_ok) {
991         if (!context.IsJobCommitted())
992             context.CommitJobWithFailure("Job has been canceled");
993     } else
994         // Check whether retries are disabled for the specified exit code.
995         if (m_MustFailNoRetries && m_MustFailNoRetries->Contain(ret))
996             context.CommitJobWithFailure(
997                     "Exited with return code " + NStr::IntToString(ret) +
998                     " - will not be rerun",
999                     true /* no retries */);
1000         else if (ret == 0 || m_NonZeroExitAction == eDoneOnNonZeroExit)
1001             context.CommitJob();
1002         else if (m_NonZeroExitAction == eReturnOnNonZeroExit)
1003             context.ReturnJob();
1004         else
1005             context.CommitJobWithFailure(
1006                     "Exited with return code " + NStr::IntToString(ret));
1007 }
1008 
GetAppVersion(const string & v) const1009 string CRemoteAppLauncher::GetAppVersion(const string& v) const
1010 {
1011     CTimedProcessWatcher::SParams params("Version", CTimeout(1.0), m_Reaper->GetScheduler());
1012     return m_Version->Get(params, v);
1013 }
1014 
OnGridWorkerStart()1015 void CRemoteAppLauncher::OnGridWorkerStart()
1016 {
1017     m_Reaper->StartExecutor();
1018     if (m_Remover) m_Remover->StartExecutor();
1019 }
1020 
Run(CWorkerNodeIdleTaskContext &)1021 void CRemoteAppIdleTask::Run(CWorkerNodeIdleTaskContext&)
1022 {
1023     if (!m_AppCmd.empty())
1024         CExec::System(m_AppCmd.c_str());
1025 }
1026 
1027 END_NCBI_SCOPE
1028