1 /* $Id: remote_app.cpp 633611 2021-06-22 17:38:16Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include <connect/services/grid_rw_impl.hpp>
35 #include <connect/services/remote_app.hpp>
36 #include <connect/services/error_codes.hpp>
37
38 #include <corelib/ncbifile.hpp>
39
40
41 #define NCBI_USE_ERRCODE_X ConnServ_Remote
42
43 BEGIN_NCBI_SCOPE
44
45 //////////////////////////////////////////////////////////////////////////////
46 //
WriteStrWithLen(CNcbiOstream & os,const string & str)47 inline CNcbiOstream& WriteStrWithLen(CNcbiOstream& os, const string& str)
48 {
49 os << str.size() << ' ' << str;
50 return os;
51 }
52
ReadStrWithLen(CNcbiIstream & is,string & str)53 inline CNcbiIstream& ReadStrWithLen(CNcbiIstream& is, string& str)
54 {
55 string::size_type len;
56 if (!is.good()) return is;
57 is >> len;
58 if (!is.good()) return is;
59 vector<char> buf(len+1);
60 is.read(&buf[0], len+1);
61 str.assign(buf.begin()+1, buf.end());
62 return is;
63 }
64
65 //////////////////////////////////////////////////////////////////////////////
66 //
67
~CBlobStreamHelper()68 CBlobStreamHelper::~CBlobStreamHelper()
69 {
70 try {
71 Reset();
72 } NCBI_CATCH_ALL_X(14, "CBlobStreamHelper::~CBlobStreamHelper()");
73 }
74
GetOStream(const string & fname,EStdOutErrStorageType type,size_t max_inline_size)75 CNcbiOstream& CBlobStreamHelper::GetOStream(const string& fname /*= ""*/,
76 EStdOutErrStorageType type /*= eBlobStorage*/,
77 size_t max_inline_size /*= kMaxBlobInlineSize*/)
78 {
79 if (!m_GridWrite.stream) {
80 _ASSERT(!m_GridRead.stream);
81
82 m_GridWrite(m_Storage, max_inline_size, *m_Data);
83 *m_GridWrite.stream << (int) type << " ";
84 WriteStrWithLen(*m_GridWrite.stream, fname);
85 if (!fname.empty() && type == eLocalFile) {
86 m_GridWrite.stream.reset(new CNcbiOfstream(fname.c_str()));
87 m_GridWrite.writer.reset();
88 if (!m_GridWrite.stream->good()) {
89 NCBI_THROW(CFileException, eRelativePath,
90 "Cannot open " + fname + " for output");
91 }
92 m_GridWrite.stream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
93 }
94 }
95 return *m_GridWrite.stream;
96 }
97
x_GetTypeAndName(CNcbiIstream & istream,string & name)98 int CBlobStreamHelper::x_GetTypeAndName(CNcbiIstream& istream,
99 string& name)
100 {
101 int res = eBlobStorage;
102 if (istream.good()) istream >> res;
103 if (istream.good()) ReadStrWithLen(istream, name);
104 return res;
105 }
106
GetIStream(string * fname,EStdOutErrStorageType * type)107 CNcbiIstream& CBlobStreamHelper::GetIStream(string* fname /*= NULL*/,
108 EStdOutErrStorageType* type /*= NULL*/)
109 {
110 if (!m_GridRead.stream) {
111 _ASSERT(!m_GridWrite.stream);
112
113 m_GridRead(m_Storage, *m_Data, m_DataSize);
114 string name;
115 int tmp = (int)eBlobStorage;
116 try {
117 tmp = x_GetTypeAndName(*m_GridRead.stream, name);
118 } catch (...) {
119 if (!m_GridRead.stream->eof()) {
120 string msg =
121 "Job output does not match remote_app output format";
122 ERR_POST_X(1, msg);
123 m_GridRead.stream.reset(new CNcbiIstrstream(msg));
124 }
125 return *m_GridRead.stream.get();
126 }
127
128 if (fname) *fname = name;
129 if (type) *type = (EStdOutErrStorageType)tmp;
130 if (!name.empty() && (EStdOutErrStorageType)tmp == eLocalFile) {
131 m_GridRead.stream.reset(new CNcbiIfstream(name.c_str()));
132 if (m_GridRead.stream->good()) {
133 m_GridRead.stream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
134 } else {
135 string msg = "Can not open " + name;
136 msg += " for reading";
137 ERR_POST_X(2, msg);
138 m_GridRead.stream.reset(new CNcbiIstrstream(msg));
139 }
140 }
141 }
142 return *m_GridRead.stream;
143 }
144
Reset()145 void CBlobStreamHelper::Reset()
146 {
147 m_GridRead.Reset();
148 m_GridWrite.Reset(true);
149 }
150 //////////////////////////////////////////////////////////////////////////////
151 //
152
153 CAtomicCounter CRemoteAppRequest::sm_DirCounter;
154
155 const string kLocalFSSign = "LFS";
156
~CRemoteAppRequest()157 CRemoteAppRequest::~CRemoteAppRequest()
158 {
159 try {
160 Reset();
161 } NCBI_CATCH_ALL_X(15, "CRemoteAppRequest::~CRemoteAppRequest()");
162 }
163
Send(CNcbiOstream & os)164 void CRemoteAppRequest::Send(CNcbiOstream& os)
165 {
166 m_StdIn.Reset();
167 typedef map<string,string> TFmap;
168 TFmap file_map;
169 ITERATE(TFiles, it, GetFileNames()) {
170 const string& fname = it->first;
171 if (it->second == eLocalFile) {
172 file_map[fname] = kLocalFSSign;
173 continue;
174 }
175 CFile file(fname);
176 string blobid;
177 if (!file.Exists()) {
178 LOG_POST_X(3, Warning << "File :\"" << fname << "\" does not exist.");
179 continue;
180 }
181 if (NStr::Find(GetCmdLine(), fname) == NPOS) {
182 LOG_POST_X(4, Warning << "File :\"" << fname << "\" is not found in cmdline. Skipping.");
183 continue;
184 }
185
186 CNcbiIfstream inf(fname.c_str());
187 if (inf.good()) {
188 unique_ptr<CNcbiOstream> of(GetNetCacheAPI().CreateOStream(blobid));
189 *of << inf.rdbuf();
190 file_map[fname] = blobid;
191 }
192 }
193
194 WriteStrWithLen(os, GetCmdLine());
195 WriteStrWithLen(os, m_InBlobIdOrData);
196
197 os << file_map.size() << ' ';
198 ITERATE(TFmap, itf, file_map) {
199 WriteStrWithLen(os, itf->first);
200 WriteStrWithLen(os, itf->second);
201 }
202 WriteStrWithLen(os, m_StdOutFileName);
203 WriteStrWithLen(os, m_StdErrFileName);
204 os << (int)m_StorageType << " ";
205 os << GetAppRunTimeout() << " ";
206 os << (int)m_ExlusiveMode;
207 Reset();
208 }
209
s_ReplaceArg(vector<string> & args,const string & old_fname,const string & new_fname)210 static void s_ReplaceArg( vector<string>& args, const string& old_fname,
211 const string& new_fname)
212 {
213 for(vector<string>::iterator it = args.begin();
214 it != args.end(); ++it) {
215 string& arg = *it;
216 SIZE_TYPE pos = NStr::Find(arg, old_fname);
217 if (pos == NPOS)
218 return;
219 if ( (pos == 0 || !isalnum((unsigned char)arg[pos-1]) )
220 && pos + old_fname.size() == arg.size())
221 arg = NStr::Replace(arg, old_fname, new_fname);
222 }
223 }
224
x_Deserialize(CNcbiIstream & is,TStoredFiles * files)225 bool CRemoteAppRequest::x_Deserialize(CNcbiIstream& is, TStoredFiles* files)
226 {
227 // Partial deserialization doesn't create working dir and deserialize files,
228 // but fills the "files" map with deserialized filenames and blob IDs.
229 const bool partial_deserialization = files;
230
231 if (partial_deserialization)
232 files->clear();
233
234 Reset();
235
236 string cmdline;
237 ReadStrWithLen(is, cmdline);
238 SetCmdLine(cmdline);
239 ReadStrWithLen(is, m_InBlobIdOrData);
240
241 int fcount = 0;
242 vector<string> args;
243 if (!is.good()) return false;
244 is >> fcount;
245 if ( fcount > 0 && !partial_deserialization) {
246 TokenizeCmdLine(GetCmdLine(), args);
247 x_CreateWDir();
248 }
249
250 for( int i = 0; i < fcount; ++i) {
251 string blobid, fname;
252 ReadStrWithLen(is, fname);
253 ReadStrWithLen(is, blobid);
254 if (!is.good()) return false;
255
256 const bool is_blob = blobid != kLocalFSSign;
257 if (partial_deserialization) {
258 files->insert(make_pair(fname, is_blob ? blobid : kEmptyStr));
259 } else if (is_blob) {
260 string nfname = GetWorkingDir() + CDirEntry::GetPathSeparator()
261 + blobid;
262 CNcbiOfstream of(nfname.c_str());
263 if (of.good()) {
264 unique_ptr<CNcbiIstream> blob_is(GetNetCacheAPI().GetIStream(blobid));
265 of << blob_is->rdbuf();
266 blob_is.reset();
267 s_ReplaceArg(args, fname, nfname);
268 }
269 }
270 }
271 if ( fcount > 0 && !partial_deserialization) {
272 SetCmdLine(JoinCmdLine(args));
273 }
274
275 ReadStrWithLen(is, m_StdOutFileName);
276 ReadStrWithLen(is, m_StdErrFileName);
277 if (!is.good()) return false;
278 int tmp;
279 is >> tmp;
280 m_StorageType = (EStdOutErrStorageType)tmp;
281 if (!is.good()) return false;
282 is >> tmp; SetAppRunTimeout(tmp);
283 if (!is.good()) return false;
284 is >> tmp;
285 m_ExlusiveMode = tmp != 0;
286 return !is.fail();
287 }
288
Reset()289 void CRemoteAppRequest::Reset()
290 {
291 m_CmdLine = "";
292 m_Files.clear();
293 m_AppRunTimeout = 0;
294 x_RemoveWDir();
295
296 m_StdIn.Reset();
297 m_InBlobIdOrData = "";
298 m_StdInDataSize = 0;
299 m_ExlusiveMode = false;
300 }
301
x_CreateWDir()302 void CRemoteAppRequest::x_CreateWDir()
303 {
304 if (!m_TmpDirName.empty())
305 return;
306 m_TmpDirName = m_TmpDirPath + NStr::NumericToString(sm_DirCounter.Add(1));
307 CDir wdir(m_TmpDirName);
308 if (wdir.Exists())
309 wdir.Remove();
310 CDir(m_TmpDirName).CreatePath();
311 }
312
x_RemoveWDir()313 void CRemoteAppRequest::x_RemoveWDir()
314 {
315 if (m_TmpDirName.empty())
316 return;
317 CDir dir(m_TmpDirName);
318 if (dir.Exists())
319 dir.Remove();
320 m_TmpDirName = "";
321 }
322
323
324 //////////////////////////////////////////////////////////////////////////////
325 //
326
CRemoteAppRequest(CNetCacheAPI::TInstance storage,size_t max_inline_size)327 CRemoteAppRequest::CRemoteAppRequest(
328 CNetCacheAPI::TInstance storage, size_t max_inline_size) :
329 m_NetCacheAPI(storage),
330 m_AppRunTimeout(0),
331 m_TmpDirPath(CDir::GetCwd() + CDirEntry::GetPathSeparator()),
332 m_StdIn(storage, m_InBlobIdOrData, m_StdInDataSize),
333 m_StdInDataSize(0),
334 m_StorageType(eBlobStorage),
335 m_ExlusiveMode(false),
336 m_MaxInlineSize(max_inline_size)
337 {
338 }
339
~CRemoteAppResult()340 CRemoteAppResult::~CRemoteAppResult()
341 {
342 try {
343 Reset();
344 } NCBI_CATCH_ALL_X(16, "CRemoteAppResult::~CRemoteAppResult()");
345 }
346
Serialize(CNcbiOstream & os)347 void CRemoteAppResult::Serialize(CNcbiOstream& os)
348 {
349 m_StdOut.Reset();
350 m_StdErr.Reset();
351 WriteStrWithLen(os, m_OutBlobIdOrData);
352 WriteStrWithLen(os, m_ErrBlobIdOrData);
353 os << GetRetCode();
354 }
Receive(CNcbiIstream & is)355 void CRemoteAppResult::Receive(CNcbiIstream& is)
356 {
357 Reset();
358 ReadStrWithLen(is, m_OutBlobIdOrData);
359 ReadStrWithLen(is, m_ErrBlobIdOrData);
360 int ret = -1; is >> ret; SetRetCode(ret);
361 }
362
Reset()363 void CRemoteAppResult::Reset()
364 {
365 m_RetCode = -1;
366
367 m_OutBlobIdOrData = "";
368 m_OutBlobSize = 0;
369 m_StdOut.Reset();
370
371 m_ErrBlobIdOrData = "";
372 m_ErrBlobSize = 0;
373 m_StdErr.Reset();
374
375 m_StdOutFileName = "";
376 m_StdErrFileName = "";
377 m_StorageType = eBlobStorage;
378 }
379
380
TokenizeCmdLine(const string & cmdline,vector<string> & args)381 void TokenizeCmdLine(const string& cmdline, vector<string>& args)
382 {
383 if (!cmdline.empty()) {
384 string arg;
385
386 for (size_t i = 0; i < cmdline.size();) {
387 if (cmdline[i] == ' ') {
388 if (!arg.empty()) {
389 args.push_back(arg);
390 arg.erase();
391 }
392 i++;
393 continue;
394 }
395 if (cmdline[i] == '\'' || cmdline[i] == '"') {
396 char quote = cmdline[i];
397 while( ++i < cmdline.size() && cmdline[i] != quote )
398 arg += cmdline[i];
399 args.push_back(arg);
400 arg.erase();
401 ++i;
402 continue;
403 }
404 arg += cmdline[i++];
405 }
406 if (!arg.empty())
407 args.push_back(arg);
408 }
409 }
410
411
JoinCmdLine(const vector<string> & args)412 string JoinCmdLine(const vector<string>& args)
413 {
414 string cmd_line;
415
416 for (vector<string>::const_iterator it = args.begin();
417 it != args.end(); ++it) {
418 if (it != args.begin())
419 cmd_line += ' ';
420
421 if (it->find(" ") != string::npos)
422 cmd_line += '\"' + *it + '\"';
423 else
424 cmd_line += *it;
425 }
426 return cmd_line;
427 }
428
429
430 END_NCBI_SCOPE
431