1 /*  $Id: remote_app_wn.cpp 574078 2018-11-05 20:43:42Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:  NetSchedule worker node sample
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "exec_helpers.hpp"
35 
36 #include <connect/services/remote_app.hpp>
37 
38 USING_NCBI_SCOPE;
39 
40 
41 class CAppEnvHolder
42 {
43 public:
CAppEnvHolder(const CRemoteAppLauncher & remote_app_launcher)44     CAppEnvHolder(const CRemoteAppLauncher& remote_app_launcher) :
45         m_RemoteAppLauncher(remote_app_launcher)
46     {}
47 
48     const char* const* GetEnv(const CWorkerNodeJobContext&);
49 
50 private:
51     const CRemoteAppLauncher& m_RemoteAppLauncher;
52     list<string> m_EnvValues;
53     list<string> m_CtxEnvValues;
54     vector<const char*> m_Env;
55 };
56 
GetEnv(const CWorkerNodeJobContext & context)57 const char* const* CAppEnvHolder::GetEnv(
58         const CWorkerNodeJobContext& context)
59 {
60     // If called for the first time
61     if (m_Env.empty()) {
62         const CRemoteAppLauncher::TEnvMap& added_env =
63                 m_RemoteAppLauncher.GetAddedEnv();
64 
65         ITERATE(CRemoteAppLauncher::TEnvMap, it, added_env) {
66             m_EnvValues.push_back(it->first + "=" +it->second);
67         }
68         list<string> names;
69         const CNcbiEnvironment& env = m_RemoteAppLauncher.GetLocalEnv();
70         env.Enumerate(names);
71         ITERATE(list<string>, it, names) {
72             if (added_env.find(*it) == added_env.end())
73                 m_EnvValues.push_back(*it + "=" + env.Get(*it));
74         }
75     } else {
76         m_Env.clear();
77     }
78 
79     // m_CtxEnvValues cannot be replaced with a local variable,
80     // as it holds actual values (m_Env holds only pointers to them)
81     m_CtxEnvValues.clear();
82     m_CtxEnvValues.push_back("NCBI_NS_SERVICE=" +
83             context.GetWorkerNode().GetServiceName());
84 
85     m_CtxEnvValues.push_back("NCBI_NS_QUEUE=" + context.GetQueueName());
86 
87     const CNetScheduleJob& job = context.GetJob();
88 
89     m_CtxEnvValues.push_back("NCBI_NS_JID=" + job.job_id);
90     m_CtxEnvValues.push_back("NCBI_JOB_AFFINITY=" + job.affinity);
91 
92     if (!job.client_ip.empty()) {
93         m_CtxEnvValues.push_back("NCBI_LOG_CLIENT_IP=" + job.client_ip);
94     }
95 
96     if (!job.session_id.empty()) {
97         m_CtxEnvValues.push_back("NCBI_LOG_SESSION_ID=" + job.session_id);
98     }
99 
100     if (!job.page_hit_id.empty()) {
101         m_CtxEnvValues.push_back("NCBI_LOG_HIT_ID=" + job.page_hit_id);
102     }
103 
104     ITERATE(list<string>, it, m_CtxEnvValues) {
105         m_Env.push_back(it->c_str());
106     }
107 
108     ITERATE(list<string>, it, m_EnvValues) {
109         m_Env.push_back(it->c_str());
110     }
111 
112     m_Env.push_back(NULL);
113     return &m_Env[0];
114 }
115 
116 ///////////////////////////////////////////////////////////////////////
117 
118 /// The remote_app NetSchedule job.
119 ///
120 /// This class performs demarshalling of the command line arguments
121 /// and then executes the specified program.
122 ///
123 class CRemoteAppJob : public IWorkerNodeJob
124 {
125 public:
126     CRemoteAppJob(const IWorkerNodeInitContext& context,
127             const CRemoteAppLauncher& remote_app_launcher);
128 
~CRemoteAppJob()129     virtual ~CRemoteAppJob() {}
130 
131     int Do(CWorkerNodeJobContext& context);
132 
133 private:
134     CNetCacheAPI m_NetCacheAPI;
135     const CRemoteAppLauncher& m_RemoteAppLauncher;
136     CAppEnvHolder m_AppEnvHolder;
137 };
138 
Do(CWorkerNodeJobContext & context)139 int CRemoteAppJob::Do(CWorkerNodeJobContext& context)
140 {
141     if (context.IsLogRequested()) {
142         LOG_POST(Note << context.GetJobKey() << " is received.");
143     }
144 
145     CRemoteAppRequest request(m_NetCacheAPI);
146     CRemoteAppResult result(m_NetCacheAPI);
147 
148     try {
149         request.Deserialize(context.GetIStream());
150     }
151     catch (exception&) {
152         ERR_POST("Cannot deserialize remote_app job");
153         context.CommitJobWithFailure("Error while "
154                 "unpacking remote_app arguments");
155         return -1;
156     }
157 
158     result.SetStdOutErrFileNames(request.GetStdOutFileName(),
159                                  request.GetStdErrFileName(),
160                                  request.GetStdOutErrStorageType());
161 
162     size_t output_size = context.GetWorkerNode().GetServerOutputSize();
163     if (output_size == 0) {
164         // NetSchedule internal storage is not supported; all
165         // output will be stored in NetCache anyway, so it can
166         // be put into one blob.
167         output_size = kMax_UInt;
168     } else {
169         // Empiric estimation of the maximum output size
170         // (reduction by 10%).
171         output_size = output_size - output_size / 10;
172     }
173     result.SetMaxInlineSize(output_size);
174 
175     if (context.IsLogRequested()) {
176         if (!request.GetInBlobIdOrData().empty()) {
177             LOG_POST(Note << context.GetJobKey()
178                 << " Input data: " << request.GetInBlobIdOrData());
179         }
180         LOG_POST(Note << context.GetJobKey()
181             << " Args: " << request.GetCmdLine());
182         if (!request.GetStdOutFileName().empty()) {
183             LOG_POST(Note << context.GetJobKey()
184                 << " StdOutFile: " << request.GetStdOutFileName());
185         }
186         if (!request.GetStdErrFileName().empty()) {
187             LOG_POST(Note << context.GetJobKey()
188                 << " StdErrFile: " << request.GetStdErrFileName());
189         }
190     }
191 
192     vector<string> args;
193     TokenizeCmdLine(request.GetCmdLine(), args);
194 
195     int ret = -1;
196     bool finished_ok = m_RemoteAppLauncher.ExecRemoteApp(args,
197                                     request.GetStdInForRead(),
198                                     result.GetStdOutForWrite(),
199                                     result.GetStdErrForWrite(),
200                                     ret,
201                                     context,
202                                     request.GetAppRunTimeout(),
203                                     m_AppEnvHolder.GetEnv(context));
204 
205     result.SetRetCode(ret);
206     result.Serialize(context.GetOStream());
207 
208     m_RemoteAppLauncher.FinishJob(finished_ok, ret, context);
209 
210     if (context.IsLogRequested()) {
211         LOG_POST(Note << "Job " << context.GetJobKey() <<
212                 " is " << context.GetCommitStatusDescription(
213                         context.GetCommitStatus()) <<
214                 ". Exit code: " << ret);
215         if (!result.GetErrBlobIdOrData().empty()) {
216             LOG_POST(Note << context.GetJobKey() << " Err data: " <<
217                 result.GetErrBlobIdOrData());
218         }
219     }
220     request.Reset();
221     result.Reset();
222     return ret;
223 }
224 
CRemoteAppJob(const IWorkerNodeInitContext & context,const CRemoteAppLauncher & remote_app_launcher)225 CRemoteAppJob::CRemoteAppJob(const IWorkerNodeInitContext& context,
226         const CRemoteAppLauncher& remote_app_launcher) :
227     m_NetCacheAPI(context.GetNetCacheAPI()),
228     m_RemoteAppLauncher(remote_app_launcher),
229     m_AppEnvHolder(remote_app_launcher)
230 {
231     CGridGlobals::GetInstance().SetReuseJobObject(true);
232 }
233 
234 class CRemoteAppListener : public CRemoteAppBaseListener
235 {
236 public:
CRemoteAppListener(const TLauncherPtr & launcher)237     CRemoteAppListener(const TLauncherPtr& launcher)
238         : CRemoteAppBaseListener(launcher) {}
239 
240     virtual void OnInit(IWorkerNodeInitBaseContext* ctx);
241 };
242 
OnInit(IWorkerNodeInitBaseContext * ctx)243 void CRemoteAppListener::OnInit(IWorkerNodeInitBaseContext* ctx)
244 {
245     if (!ctx->GetConfig().HasEntry("log", "merge_lines")) {
246         SetDiagPostFlag(eDPF_PreMergeLines);
247         SetDiagPostFlag(eDPF_MergeLines);
248     }
249 }
250 
251 
252 /////////////////////////////////////////////////////////////////////////////
253 
254 #define GRID_APP_NAME "remote_app"
255 extern const char kGridAppName[] = GRID_APP_NAME;
256 
257 using TRemoteAppJobFactory = CRemoteAppJobFactory<CRemoteAppJob, CRemoteAppListener, kGridAppName>;
258 
main(int argc,const char * argv[])259 int main(int argc, const char* argv[])
260 {
261     GRID_APP_CHECK_VERSION_ARGS();
262     return Main<TRemoteAppJobFactory, CRemoteAppListener>(argc, argv);
263 }
264