1 /*  $Id: remote_app.cpp 633611 2021-06-22 17:38:16Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <connect/services/grid_rw_impl.hpp>
35 #include <connect/services/remote_app.hpp>
36 #include <connect/services/error_codes.hpp>
37 
38 #include <corelib/ncbifile.hpp>
39 
40 
41 #define NCBI_USE_ERRCODE_X   ConnServ_Remote
42 
43 BEGIN_NCBI_SCOPE
44 
45 //////////////////////////////////////////////////////////////////////////////
46 //
WriteStrWithLen(CNcbiOstream & os,const string & str)47 inline CNcbiOstream& WriteStrWithLen(CNcbiOstream& os, const string& str)
48 {
49     os << str.size() << ' ' << str;
50     return os;
51 }
52 
ReadStrWithLen(CNcbiIstream & is,string & str)53 inline CNcbiIstream& ReadStrWithLen(CNcbiIstream& is, string& str)
54 {
55     string::size_type len;
56     if (!is.good()) return is;
57     is >> len;
58     if (!is.good()) return is;
59     vector<char> buf(len+1);
60     is.read(&buf[0], len+1);
61     str.assign(buf.begin()+1, buf.end());
62     return is;
63 }
64 
65 //////////////////////////////////////////////////////////////////////////////
66 //
67 
~CBlobStreamHelper()68 CBlobStreamHelper::~CBlobStreamHelper()
69 {
70     try {
71         Reset();
72     } NCBI_CATCH_ALL_X(14, "CBlobStreamHelper::~CBlobStreamHelper()");
73 }
74 
GetOStream(const string & fname,EStdOutErrStorageType type,size_t max_inline_size)75 CNcbiOstream& CBlobStreamHelper::GetOStream(const string& fname /*= ""*/,
76     EStdOutErrStorageType type /*= eBlobStorage*/,
77     size_t max_inline_size /*= kMaxBlobInlineSize*/)
78 {
79     if (!m_GridWrite.stream) {
80         _ASSERT(!m_GridRead.stream);
81 
82         m_GridWrite(m_Storage, max_inline_size, *m_Data);
83         *m_GridWrite.stream << (int) type << " ";
84         WriteStrWithLen(*m_GridWrite.stream, fname);
85         if (!fname.empty() && type == eLocalFile) {
86             m_GridWrite.stream.reset(new CNcbiOfstream(fname.c_str()));
87             m_GridWrite.writer.reset();
88             if (!m_GridWrite.stream->good()) {
89                 NCBI_THROW(CFileException, eRelativePath,
90                     "Cannot open " + fname + " for output");
91             }
92             m_GridWrite.stream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
93         }
94     }
95     return *m_GridWrite.stream;
96 }
97 
x_GetTypeAndName(CNcbiIstream & istream,string & name)98 int CBlobStreamHelper::x_GetTypeAndName(CNcbiIstream& istream,
99         string& name)
100 {
101     int res = eBlobStorage;
102     if (istream.good()) istream >> res;
103     if (istream.good()) ReadStrWithLen(istream, name);
104     return res;
105 }
106 
GetIStream(string * fname,EStdOutErrStorageType * type)107 CNcbiIstream& CBlobStreamHelper::GetIStream(string* fname /*= NULL*/,
108     EStdOutErrStorageType* type /*= NULL*/)
109 {
110     if (!m_GridRead.stream) {
111         _ASSERT(!m_GridWrite.stream);
112 
113         m_GridRead(m_Storage, *m_Data, m_DataSize);
114         string name;
115         int tmp = (int)eBlobStorage;
116         try {
117             tmp = x_GetTypeAndName(*m_GridRead.stream, name);
118         } catch (...) {
119             if (!m_GridRead.stream->eof()) {
120                 string msg =
121                         "Job output does not match remote_app output format";
122                 ERR_POST_X(1, msg);
123                 m_GridRead.stream.reset(new CNcbiIstrstream(msg));
124             }
125             return *m_GridRead.stream.get();
126         }
127 
128         if (fname) *fname = name;
129         if (type) *type = (EStdOutErrStorageType)tmp;
130         if (!name.empty() && (EStdOutErrStorageType)tmp == eLocalFile) {
131             m_GridRead.stream.reset(new CNcbiIfstream(name.c_str()));
132             if (m_GridRead.stream->good()) {
133                 m_GridRead.stream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
134             } else {
135                 string msg = "Can not open " + name;
136                 msg += " for reading";
137                 ERR_POST_X(2, msg);
138                 m_GridRead.stream.reset(new CNcbiIstrstream(msg));
139             }
140         }
141     }
142     return *m_GridRead.stream;
143 }
144 
Reset()145 void CBlobStreamHelper::Reset()
146 {
147     m_GridRead.Reset();
148     m_GridWrite.Reset(true);
149 }
150 //////////////////////////////////////////////////////////////////////////////
151 //
152 
153 CAtomicCounter CRemoteAppRequest::sm_DirCounter;
154 
155 const string kLocalFSSign = "LFS";
156 
~CRemoteAppRequest()157 CRemoteAppRequest::~CRemoteAppRequest()
158 {
159     try {
160         Reset();
161     } NCBI_CATCH_ALL_X(15, "CRemoteAppRequest::~CRemoteAppRequest()");
162 }
163 
Send(CNcbiOstream & os)164 void CRemoteAppRequest::Send(CNcbiOstream& os)
165 {
166     m_StdIn.Reset();
167     typedef map<string,string> TFmap;
168     TFmap file_map;
169     ITERATE(TFiles, it, GetFileNames()) {
170         const string& fname = it->first;
171         if (it->second == eLocalFile) {
172             file_map[fname] = kLocalFSSign;
173             continue;
174         }
175         CFile file(fname);
176         string blobid;
177         if (!file.Exists()) {
178             LOG_POST_X(3, Warning << "File :\"" << fname << "\" does not exist.");
179             continue;
180         }
181         if (NStr::Find(GetCmdLine(), fname) == NPOS) {
182             LOG_POST_X(4, Warning << "File :\"" << fname << "\" is not found in cmdline. Skipping.");
183             continue;
184         }
185 
186         CNcbiIfstream inf(fname.c_str());
187         if (inf.good()) {
188             unique_ptr<CNcbiOstream> of(GetNetCacheAPI().CreateOStream(blobid));
189             *of << inf.rdbuf();
190             file_map[fname] = blobid;
191         }
192     }
193 
194     WriteStrWithLen(os, GetCmdLine());
195     WriteStrWithLen(os, m_InBlobIdOrData);
196 
197     os << file_map.size() << ' ';
198     ITERATE(TFmap, itf, file_map) {
199         WriteStrWithLen(os, itf->first);
200         WriteStrWithLen(os, itf->second);
201     }
202     WriteStrWithLen(os, m_StdOutFileName);
203     WriteStrWithLen(os, m_StdErrFileName);
204     os << (int)m_StorageType << " ";
205     os << GetAppRunTimeout() << " ";
206     os << (int)m_ExlusiveMode;
207     Reset();
208 }
209 
s_ReplaceArg(vector<string> & args,const string & old_fname,const string & new_fname)210 static void s_ReplaceArg( vector<string>& args, const string& old_fname,
211                           const string& new_fname)
212 {
213     for(vector<string>::iterator it = args.begin();
214         it != args.end(); ++it) {
215         string& arg = *it;
216         SIZE_TYPE pos = NStr::Find(arg, old_fname);
217         if (pos == NPOS)
218             return;
219         if ( (pos == 0 || !isalnum((unsigned char)arg[pos-1]) )
220              && pos + old_fname.size() == arg.size())
221             arg = NStr::Replace(arg, old_fname, new_fname);
222     }
223 }
224 
x_Deserialize(CNcbiIstream & is,TStoredFiles * files)225 bool CRemoteAppRequest::x_Deserialize(CNcbiIstream& is, TStoredFiles* files)
226 {
227     // Partial deserialization doesn't create working dir and deserialize files,
228     // but fills the "files" map with deserialized filenames and blob IDs.
229     const bool partial_deserialization = files;
230 
231     if (partial_deserialization)
232         files->clear();
233 
234     Reset();
235 
236     string cmdline;
237     ReadStrWithLen(is, cmdline);
238     SetCmdLine(cmdline);
239     ReadStrWithLen(is, m_InBlobIdOrData);
240 
241     int fcount = 0;
242     vector<string> args;
243     if (!is.good()) return false;
244     is >> fcount;
245     if ( fcount > 0 && !partial_deserialization) {
246         TokenizeCmdLine(GetCmdLine(), args);
247         x_CreateWDir();
248     }
249 
250     for( int i = 0; i < fcount; ++i) {
251         string blobid, fname;
252         ReadStrWithLen(is, fname);
253         ReadStrWithLen(is, blobid);
254         if (!is.good()) return false;
255 
256         const bool is_blob = blobid != kLocalFSSign;
257         if (partial_deserialization) {
258             files->insert(make_pair(fname, is_blob ? blobid : kEmptyStr));
259         } else if (is_blob) {
260             string nfname = GetWorkingDir() + CDirEntry::GetPathSeparator()
261                 + blobid;
262             CNcbiOfstream of(nfname.c_str());
263             if (of.good()) {
264                 unique_ptr<CNcbiIstream> blob_is(GetNetCacheAPI().GetIStream(blobid));
265                 of << blob_is->rdbuf();
266                 blob_is.reset();
267                 s_ReplaceArg(args, fname, nfname);
268             }
269         }
270     }
271     if ( fcount > 0 && !partial_deserialization) {
272         SetCmdLine(JoinCmdLine(args));
273     }
274 
275     ReadStrWithLen(is, m_StdOutFileName);
276     ReadStrWithLen(is, m_StdErrFileName);
277     if (!is.good()) return false;
278     int tmp;
279     is >> tmp;
280     m_StorageType = (EStdOutErrStorageType)tmp;
281     if (!is.good()) return false;
282     is >> tmp; SetAppRunTimeout(tmp);
283     if (!is.good()) return false;
284     is >> tmp;
285     m_ExlusiveMode = tmp != 0;
286     return !is.fail();
287 }
288 
Reset()289 void CRemoteAppRequest::Reset()
290 {
291     m_CmdLine = "";
292     m_Files.clear();
293     m_AppRunTimeout = 0;
294     x_RemoveWDir();
295 
296     m_StdIn.Reset();
297     m_InBlobIdOrData = "";
298     m_StdInDataSize = 0;
299     m_ExlusiveMode = false;
300 }
301 
x_CreateWDir()302 void CRemoteAppRequest::x_CreateWDir()
303 {
304     if (!m_TmpDirName.empty())
305         return;
306     m_TmpDirName = m_TmpDirPath + NStr::NumericToString(sm_DirCounter.Add(1));
307     CDir wdir(m_TmpDirName);
308     if (wdir.Exists())
309         wdir.Remove();
310     CDir(m_TmpDirName).CreatePath();
311 }
312 
x_RemoveWDir()313 void CRemoteAppRequest::x_RemoveWDir()
314 {
315     if (m_TmpDirName.empty())
316         return;
317     CDir dir(m_TmpDirName);
318     if (dir.Exists())
319         dir.Remove();
320     m_TmpDirName = "";
321 }
322 
323 
324 //////////////////////////////////////////////////////////////////////////////
325 //
326 
CRemoteAppRequest(CNetCacheAPI::TInstance storage,size_t max_inline_size)327 CRemoteAppRequest::CRemoteAppRequest(
328         CNetCacheAPI::TInstance storage, size_t max_inline_size) :
329     m_NetCacheAPI(storage),
330     m_AppRunTimeout(0),
331     m_TmpDirPath(CDir::GetCwd() + CDirEntry::GetPathSeparator()),
332     m_StdIn(storage, m_InBlobIdOrData, m_StdInDataSize),
333     m_StdInDataSize(0),
334     m_StorageType(eBlobStorage),
335     m_ExlusiveMode(false),
336     m_MaxInlineSize(max_inline_size)
337 {
338 }
339 
~CRemoteAppResult()340 CRemoteAppResult::~CRemoteAppResult()
341 {
342     try {
343         Reset();
344     } NCBI_CATCH_ALL_X(16, "CRemoteAppResult::~CRemoteAppResult()");
345 }
346 
Serialize(CNcbiOstream & os)347 void CRemoteAppResult::Serialize(CNcbiOstream& os)
348 {
349     m_StdOut.Reset();
350     m_StdErr.Reset();
351     WriteStrWithLen(os, m_OutBlobIdOrData);
352     WriteStrWithLen(os, m_ErrBlobIdOrData);
353     os << GetRetCode();
354 }
Receive(CNcbiIstream & is)355 void CRemoteAppResult::Receive(CNcbiIstream& is)
356 {
357     Reset();
358     ReadStrWithLen(is, m_OutBlobIdOrData);
359     ReadStrWithLen(is, m_ErrBlobIdOrData);
360     int ret = -1; is >> ret; SetRetCode(ret);
361 }
362 
Reset()363 void CRemoteAppResult::Reset()
364 {
365     m_RetCode = -1;
366 
367     m_OutBlobIdOrData = "";
368     m_OutBlobSize = 0;
369     m_StdOut.Reset();
370 
371     m_ErrBlobIdOrData = "";
372     m_ErrBlobSize = 0;
373     m_StdErr.Reset();
374 
375     m_StdOutFileName = "";
376     m_StdErrFileName = "";
377     m_StorageType = eBlobStorage;
378 }
379 
380 
TokenizeCmdLine(const string & cmdline,vector<string> & args)381 void TokenizeCmdLine(const string& cmdline, vector<string>& args)
382 {
383     if (!cmdline.empty()) {
384         string arg;
385 
386         for (size_t i = 0; i < cmdline.size();) {
387             if (cmdline[i] == ' ') {
388                 if (!arg.empty()) {
389                     args.push_back(arg);
390                     arg.erase();
391                 }
392                 i++;
393                 continue;
394             }
395             if (cmdline[i] == '\'' || cmdline[i] == '"') {
396                 char quote = cmdline[i];
397                 while( ++i < cmdline.size() && cmdline[i] != quote )
398                     arg += cmdline[i];
399                 args.push_back(arg);
400                 arg.erase();
401                 ++i;
402                 continue;
403             }
404             arg += cmdline[i++];
405         }
406         if (!arg.empty())
407             args.push_back(arg);
408     }
409 }
410 
411 
JoinCmdLine(const vector<string> & args)412 string JoinCmdLine(const vector<string>& args)
413 {
414     string cmd_line;
415 
416     for (vector<string>::const_iterator it = args.begin();
417             it != args.end(); ++it) {
418         if (it != args.begin())
419             cmd_line += ' ';
420 
421         if (it->find(" ") != string::npos)
422             cmd_line += '\"' + *it + '\"';
423         else
424             cmd_line += *it;
425     }
426     return cmd_line;
427 }
428 
429 
430 END_NCBI_SCOPE
431