1 /* $Id: remote_app_wn.cpp 574078 2018-11-05 20:43:42Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description: NetSchedule worker node sample
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "exec_helpers.hpp"
35
36 #include <connect/services/remote_app.hpp>
37
38 USING_NCBI_SCOPE;
39
40
41 class CAppEnvHolder
42 {
43 public:
CAppEnvHolder(const CRemoteAppLauncher & remote_app_launcher)44 CAppEnvHolder(const CRemoteAppLauncher& remote_app_launcher) :
45 m_RemoteAppLauncher(remote_app_launcher)
46 {}
47
48 const char* const* GetEnv(const CWorkerNodeJobContext&);
49
50 private:
51 const CRemoteAppLauncher& m_RemoteAppLauncher;
52 list<string> m_EnvValues;
53 list<string> m_CtxEnvValues;
54 vector<const char*> m_Env;
55 };
56
GetEnv(const CWorkerNodeJobContext & context)57 const char* const* CAppEnvHolder::GetEnv(
58 const CWorkerNodeJobContext& context)
59 {
60 // If called for the first time
61 if (m_Env.empty()) {
62 const CRemoteAppLauncher::TEnvMap& added_env =
63 m_RemoteAppLauncher.GetAddedEnv();
64
65 ITERATE(CRemoteAppLauncher::TEnvMap, it, added_env) {
66 m_EnvValues.push_back(it->first + "=" +it->second);
67 }
68 list<string> names;
69 const CNcbiEnvironment& env = m_RemoteAppLauncher.GetLocalEnv();
70 env.Enumerate(names);
71 ITERATE(list<string>, it, names) {
72 if (added_env.find(*it) == added_env.end())
73 m_EnvValues.push_back(*it + "=" + env.Get(*it));
74 }
75 } else {
76 m_Env.clear();
77 }
78
79 // m_CtxEnvValues cannot be replaced with a local variable,
80 // as it holds actual values (m_Env holds only pointers to them)
81 m_CtxEnvValues.clear();
82 m_CtxEnvValues.push_back("NCBI_NS_SERVICE=" +
83 context.GetWorkerNode().GetServiceName());
84
85 m_CtxEnvValues.push_back("NCBI_NS_QUEUE=" + context.GetQueueName());
86
87 const CNetScheduleJob& job = context.GetJob();
88
89 m_CtxEnvValues.push_back("NCBI_NS_JID=" + job.job_id);
90 m_CtxEnvValues.push_back("NCBI_JOB_AFFINITY=" + job.affinity);
91
92 if (!job.client_ip.empty()) {
93 m_CtxEnvValues.push_back("NCBI_LOG_CLIENT_IP=" + job.client_ip);
94 }
95
96 if (!job.session_id.empty()) {
97 m_CtxEnvValues.push_back("NCBI_LOG_SESSION_ID=" + job.session_id);
98 }
99
100 if (!job.page_hit_id.empty()) {
101 m_CtxEnvValues.push_back("NCBI_LOG_HIT_ID=" + job.page_hit_id);
102 }
103
104 ITERATE(list<string>, it, m_CtxEnvValues) {
105 m_Env.push_back(it->c_str());
106 }
107
108 ITERATE(list<string>, it, m_EnvValues) {
109 m_Env.push_back(it->c_str());
110 }
111
112 m_Env.push_back(NULL);
113 return &m_Env[0];
114 }
115
116 ///////////////////////////////////////////////////////////////////////
117
118 /// The remote_app NetSchedule job.
119 ///
120 /// This class performs demarshalling of the command line arguments
121 /// and then executes the specified program.
122 ///
123 class CRemoteAppJob : public IWorkerNodeJob
124 {
125 public:
126 CRemoteAppJob(const IWorkerNodeInitContext& context,
127 const CRemoteAppLauncher& remote_app_launcher);
128
~CRemoteAppJob()129 virtual ~CRemoteAppJob() {}
130
131 int Do(CWorkerNodeJobContext& context);
132
133 private:
134 CNetCacheAPI m_NetCacheAPI;
135 const CRemoteAppLauncher& m_RemoteAppLauncher;
136 CAppEnvHolder m_AppEnvHolder;
137 };
138
Do(CWorkerNodeJobContext & context)139 int CRemoteAppJob::Do(CWorkerNodeJobContext& context)
140 {
141 if (context.IsLogRequested()) {
142 LOG_POST(Note << context.GetJobKey() << " is received.");
143 }
144
145 CRemoteAppRequest request(m_NetCacheAPI);
146 CRemoteAppResult result(m_NetCacheAPI);
147
148 try {
149 request.Deserialize(context.GetIStream());
150 }
151 catch (exception&) {
152 ERR_POST("Cannot deserialize remote_app job");
153 context.CommitJobWithFailure("Error while "
154 "unpacking remote_app arguments");
155 return -1;
156 }
157
158 result.SetStdOutErrFileNames(request.GetStdOutFileName(),
159 request.GetStdErrFileName(),
160 request.GetStdOutErrStorageType());
161
162 size_t output_size = context.GetWorkerNode().GetServerOutputSize();
163 if (output_size == 0) {
164 // NetSchedule internal storage is not supported; all
165 // output will be stored in NetCache anyway, so it can
166 // be put into one blob.
167 output_size = kMax_UInt;
168 } else {
169 // Empiric estimation of the maximum output size
170 // (reduction by 10%).
171 output_size = output_size - output_size / 10;
172 }
173 result.SetMaxInlineSize(output_size);
174
175 if (context.IsLogRequested()) {
176 if (!request.GetInBlobIdOrData().empty()) {
177 LOG_POST(Note << context.GetJobKey()
178 << " Input data: " << request.GetInBlobIdOrData());
179 }
180 LOG_POST(Note << context.GetJobKey()
181 << " Args: " << request.GetCmdLine());
182 if (!request.GetStdOutFileName().empty()) {
183 LOG_POST(Note << context.GetJobKey()
184 << " StdOutFile: " << request.GetStdOutFileName());
185 }
186 if (!request.GetStdErrFileName().empty()) {
187 LOG_POST(Note << context.GetJobKey()
188 << " StdErrFile: " << request.GetStdErrFileName());
189 }
190 }
191
192 vector<string> args;
193 TokenizeCmdLine(request.GetCmdLine(), args);
194
195 int ret = -1;
196 bool finished_ok = m_RemoteAppLauncher.ExecRemoteApp(args,
197 request.GetStdInForRead(),
198 result.GetStdOutForWrite(),
199 result.GetStdErrForWrite(),
200 ret,
201 context,
202 request.GetAppRunTimeout(),
203 m_AppEnvHolder.GetEnv(context));
204
205 result.SetRetCode(ret);
206 result.Serialize(context.GetOStream());
207
208 m_RemoteAppLauncher.FinishJob(finished_ok, ret, context);
209
210 if (context.IsLogRequested()) {
211 LOG_POST(Note << "Job " << context.GetJobKey() <<
212 " is " << context.GetCommitStatusDescription(
213 context.GetCommitStatus()) <<
214 ". Exit code: " << ret);
215 if (!result.GetErrBlobIdOrData().empty()) {
216 LOG_POST(Note << context.GetJobKey() << " Err data: " <<
217 result.GetErrBlobIdOrData());
218 }
219 }
220 request.Reset();
221 result.Reset();
222 return ret;
223 }
224
CRemoteAppJob(const IWorkerNodeInitContext & context,const CRemoteAppLauncher & remote_app_launcher)225 CRemoteAppJob::CRemoteAppJob(const IWorkerNodeInitContext& context,
226 const CRemoteAppLauncher& remote_app_launcher) :
227 m_NetCacheAPI(context.GetNetCacheAPI()),
228 m_RemoteAppLauncher(remote_app_launcher),
229 m_AppEnvHolder(remote_app_launcher)
230 {
231 CGridGlobals::GetInstance().SetReuseJobObject(true);
232 }
233
234 class CRemoteAppListener : public CRemoteAppBaseListener
235 {
236 public:
CRemoteAppListener(const TLauncherPtr & launcher)237 CRemoteAppListener(const TLauncherPtr& launcher)
238 : CRemoteAppBaseListener(launcher) {}
239
240 virtual void OnInit(IWorkerNodeInitBaseContext* ctx);
241 };
242
OnInit(IWorkerNodeInitBaseContext * ctx)243 void CRemoteAppListener::OnInit(IWorkerNodeInitBaseContext* ctx)
244 {
245 if (!ctx->GetConfig().HasEntry("log", "merge_lines")) {
246 SetDiagPostFlag(eDPF_PreMergeLines);
247 SetDiagPostFlag(eDPF_MergeLines);
248 }
249 }
250
251
252 /////////////////////////////////////////////////////////////////////////////
253
254 #define GRID_APP_NAME "remote_app"
255 extern const char kGridAppName[] = GRID_APP_NAME;
256
257 using TRemoteAppJobFactory = CRemoteAppJobFactory<CRemoteAppJob, CRemoteAppListener, kGridAppName>;
258
main(int argc,const char * argv[])259 int main(int argc, const char* argv[])
260 {
261 GRID_APP_CHECK_VERSION_ARGS();
262 return Main<TRemoteAppJobFactory, CRemoteAppListener>(argc, argv);
263 }
264