1 /*  $Id: ns_cmds.cpp 607703 2020-05-06 16:57:56Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Dmitry Kazimirov
27  *
28  * File Description: NetSchedule-specific commands of the grid_cli application.
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "ns_cmd_impl.hpp"
35 #include "util.hpp"
36 
37 #include <connect/services/remote_app.hpp>
38 #include <connect/services/grid_rw_impl.hpp>
39 #include <connect/services/ns_job_serializer.hpp>
40 
41 #include <corelib/rwstream.hpp>
42 
43 #include <ctype.h>
44 
45 BEGIN_NCBI_SCOPE
46 
SetUp_NetScheduleCmd(CGridCommandLineInterfaceApp::EAPIClass api_class,CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity,bool require_queue)47 void CGridCommandLineInterfaceApp::SetUp_NetScheduleCmd(
48         CGridCommandLineInterfaceApp::EAPIClass api_class,
49         CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity, bool require_queue)
50 {
51     if ((api_class == eNetScheduleSubmitter) || (api_class == eNetScheduleExecutor))
52         SetUp_NetCacheCmd(false, false, false);
53 
54     m_APIClass = api_class;
55 
56     string service = m_Opts.ns_service;
57     string queue = m_Opts.queue;
58 
59     const bool job_provided = IsOptionSet(eID) || IsOptionSet(eJobId);
60     const bool service_provided = IsOptionExplicitlySet(eNetSchedule);
61 
62     if (IsOptionSet(eWorkerNode)) {
63         m_NetScheduleAPI = CNetScheduleAPIExt::CreateWnCompat(service, m_Opts.auth);
64 
65     } else {
66         if (job_provided) {
67             CNetScheduleKey key(m_Opts.id, m_CompoundIDPool);
68 
69             if (queue.empty())
70                 queue = key.queue;
71 
72             if (!service_provided) {
73                 key.host.push_back(':');
74                 key.host.append(NStr::NumericToString(key.port));
75                 service = key.host;
76             }
77         } else if (!IsOptionSet(eNetSchedule)) {
78             NCBI_THROW(CArgException, eNoValue, "'--" NETSCHEDULE_OPTION "' option is required.");
79 
80         } else if (!IsOptionSet(eQueue) && !IsOptionSet(eTargetQueueArg) && require_queue) {
81             NCBI_THROW(CArgException, eNoValue, "'--" QUEUE_OPTION "' option is required.");
82 
83         }
84 
85         m_NetScheduleAPI = CNetScheduleAPIExt::CreateNoCfgLoad(service, m_Opts.auth, queue);
86     }
87 
88     if (job_provided && service_provided) {
89         if (auto address = SSocketAddress::Parse(m_Opts.ns_service)) {
90             m_NetScheduleAPI.GetService().GetServerPool().StickToServer(move(address));
91         } else {
92             NCBI_THROW(CArgException, eInvalidArg,
93                     "When job ID is given, '--" NETSCHEDULE_OPTION "' "
94                     "must be a host:port server address.");
95         }
96     }
97 
98     if (m_AdminMode)
99         m_NetScheduleAPI.SetClientType(CNetScheduleAPI::eCT_Admin);
100 
101     auto warning_handler = [&](const string& m, CNetServer s) {
102         return OnWarning(m_APIClass != eWorkerNodeAdmin, m, s);
103     };
104 
105     m_NetScheduleAPI.GetService().SetWarningHandler(warning_handler);
106 
107     if (IsOptionSet(eCompatMode)) {
108         m_NetScheduleAPI.UseOldStyleAuth();
109     }
110 
111     // Specialize NetSchedule API.
112     switch (api_class) {
113     case eNetScheduleAPI:
114         break;
115     case eNetScheduleAdmin:
116         if (cmd_severity != eReadOnlyAdminCmd) {
117             if (!service_provided) {
118                 NCBI_THROW(CArgException, eNoValue, "'--" NETSCHEDULE_OPTION
119                         "' must be explicitly specified.");
120             }
121             if (IsOptionAcceptedAndSetImplicitly(eQueue)) {
122                 NCBI_THROW(CArgException, eNoValue, "'--" QUEUE_OPTION
123                         "' must be specified explicitly (not via $"
124                         LOGIN_TOKEN_ENV ").");
125             }
126         }
127         /* FALL THROUGH */
128     case eWorkerNodeAdmin:
129         m_NetScheduleAdmin = m_NetScheduleAPI.GetAdmin();
130         break;
131     case eNetScheduleExecutor:
132         m_NetScheduleExecutor = m_NetScheduleAPI.GetExecutor();
133 
134         if (!IsOptionSet(eLoginToken) &&
135                 !IsOptionSet(eClientNode) && !IsOptionSet(eClientSession)) {
136             NCBI_THROW(CArgException, eNoArg, "client identification required "
137                     "(see the '" LOGIN_COMMAND "' command).");
138         }
139 
140         if (IsOptionSet(eJobGroup))
141             m_NetScheduleExecutor.SetJobGroup(m_Opts.job_group);
142 
143         /* FALL THROUGH */
144     case eNetScheduleSubmitter:
145         m_NetScheduleSubmitter = m_NetScheduleAPI.GetSubmitter();
146 
147         m_GridClient.reset(new CGridClient(m_NetScheduleSubmitter,
148                 m_NetCacheAPI, CGridClient::eManualCleanup,
149                         CGridClient::eProgressMsgOn));
150         break;
151     default:
152         _ASSERT(0);
153         break;
154     }
155 
156     if (IsOptionSet(eClientNode)) {
157         m_NetScheduleAPI.ReSetClientNode(m_Opts.client_node);
158     }
159 
160     if (IsOptionSet(eClientSession)) {
161         m_NetScheduleAPI.ReSetClientSession(m_Opts.client_session);
162     }
163 }
164 
JobInfo_PrintStatus(CNetScheduleAPI::EJobStatus status)165 void CGridCommandLineInterfaceApp::JobInfo_PrintStatus(
166         CNetScheduleAPI::EJobStatus status)
167 {
168     string job_status(CNetScheduleAPI::StatusToString(status));
169 
170     if (m_Opts.output_format == eJSON)
171         printf("{\"status\": \"%s\"}\n", job_status.c_str());
172     else
173         PrintLine(job_status);
174 }
175 
Cmd_JobInfo()176 int CGridCommandLineInterfaceApp::Cmd_JobInfo()
177 {
178     SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd);
179 
180     if (IsOptionSet(eDeferExpiration)) {
181         CNetScheduleAPI::EJobStatus status =
182                 m_NetScheduleAPI.GetSubmitter().GetJobStatus(m_Opts.id);
183         if (IsOptionSet(eStatusOnly)) {
184             JobInfo_PrintStatus(status);
185             return 0;
186         }
187     } else if (IsOptionSet(eStatusOnly)) {
188         CNetScheduleJob job;
189         job.job_id = m_Opts.id;
190         JobInfo_PrintStatus(m_NetScheduleAPI.GetExecutor().GetJobStatus(job));
191         return 0;
192     }
193 
194     if (IsOptionSet(eProgressMessageOnly)) {
195         CNetScheduleJob job;
196         job.job_id = m_Opts.id;
197         m_NetScheduleAPI.GetProgressMsg(job);
198         if (m_Opts.output_format == eJSON)
199             printf("{\"progress_message\": \"%s\"}\n",
200                     job.progress_msg.c_str());
201         else
202             if (!job.progress_msg.empty())
203                 PrintLine(job.progress_msg);
204         return 0;
205     }
206 
207     switch (m_Opts.output_format) {
208     case eRaw:
209         if (IsOptionSet(eBrief)) {
210             fprintf(stderr, GRID_APP_NAME " " JOBINFO_COMMAND ": option '--"
211                     BRIEF_OPTION "' cannot be used with '"
212                     RAW_OUTPUT_FORMAT "' output format.\n");
213             return 2;
214         }
215         m_NetScheduleAdmin.DumpJob(NcbiCout, m_Opts.id);
216         break;
217 
218     case eJSON:
219         {
220             CJobInfoToJSON job_info_to_json;
221             g_ProcessJobInfo(m_NetScheduleAPI, m_Opts.id,
222                     &job_info_to_json, !IsOptionSet(eBrief), m_CompoundIDPool);
223             g_PrintJSON(stdout, job_info_to_json.GetRootNode());
224         }
225         break;
226 
227     default:
228         {
229             CPrintJobInfo print_job_info;
230             g_ProcessJobInfo(m_NetScheduleAPI, m_Opts.id,
231                     &print_job_info, !IsOptionSet(eBrief), m_CompoundIDPool);
232         }
233     }
234 
235     return 0;
236 }
237 
238 class CBatchSubmitAttrParser
239 {
240 public:
CBatchSubmitAttrParser(istream * input_stream)241     CBatchSubmitAttrParser(istream* input_stream) :
242         m_InputStream(input_stream),
243         m_LineNumber(0)
244     {
245     }
246     bool NextLine();
247     bool NextAttribute();
GetAttributeType() const248     EOption GetAttributeType() const {return m_JobAttribute;}
GetAttributeValue() const249     const string& GetAttributeValue() const {return m_JobAttributeValue;}
GetLineNumber() const250     size_t GetLineNumber() const {return m_LineNumber;}
251 
252 private:
253     istream* m_InputStream;
254     size_t m_LineNumber;
255     string m_Line;
256     CAttrListParser m_AttrParser;
257     EOption m_JobAttribute;
258     string m_JobAttributeValue;
259 };
260 
NextLine()261 bool CBatchSubmitAttrParser::NextLine()
262 {
263     if (m_InputStream == NULL)
264         return false;
265 
266     ++m_LineNumber;
267 
268     getline(*m_InputStream, m_Line);
269 
270     if (m_InputStream->fail()) {
271         m_InputStream = NULL;
272         return false;
273     }
274 
275     if (m_InputStream->eof()) {
276         m_InputStream = NULL;
277         if (m_Line.empty())
278             return false;
279     }
280 
281     m_AttrParser.Reset(m_Line);
282     return true;
283 }
284 
NextAttribute()285 bool CBatchSubmitAttrParser::NextAttribute()
286 {
287     m_JobAttribute = eUntypedArg;
288 
289 #define ATTR_CHECK_SET(name, type) \
290     if (attr_name.length() == sizeof(name) - 1 && \
291             memcmp(attr_name.data(), name, sizeof(name) - 1) == 0) { \
292         m_JobAttribute = type; \
293         break; \
294     }
295 
296     CTempString attr_name;
297     size_t attr_column;
298 
299     CAttrListParser::ENextAttributeType next_attr_type =
300             m_AttrParser.NextAttribute(&attr_name,
301                     &m_JobAttributeValue, &attr_column);
302 
303     if (next_attr_type == CAttrListParser::eNoMoreAttributes)
304         return false;
305 
306     switch (attr_name[0]) {
307     case 'i':
308         ATTR_CHECK_SET("input", eInput);
309         break;
310     case 'a':
311         ATTR_CHECK_SET("args", eRemoteAppArgs);
312         ATTR_CHECK_SET("affinity", eAffinity);
313         break;
314     case 'e':
315         ATTR_CHECK_SET("exclusive", eExclusiveJob);
316     }
317 
318 #define ATTR_POS " at line " << m_LineNumber << ", column " << attr_column
319 
320     switch (m_JobAttribute) {
321     case eUntypedArg:
322         NCBI_THROW_FMT(CArgException, eInvalidArg,
323             "unknown attribute " << attr_name << ATTR_POS);
324 
325     case eExclusiveJob:
326         break;
327 
328     default:
329         if (next_attr_type != CAttrListParser::eAttributeWithValue) {
330             NCBI_THROW_FMT(CArgException, eInvalidArg,
331                 "attribute " << attr_name << " requires a value" ATTR_POS);
332         }
333     }
334 
335     return true;
336 }
337 
338 static const string s_NotificationTimestampFormat("Y/M/D h:m:s.l");
339 
PrintJobStatusNotification(CNetScheduleNotificationHandler & submit_job_handler,const string & job_key,const string & server_host)340 void CGridCommandLineInterfaceApp::PrintJobStatusNotification(
341         CNetScheduleNotificationHandler& submit_job_handler,
342         const string& job_key,
343         const string& server_host)
344 {
345     CNetScheduleAPI::EJobStatus job_status = CNetScheduleAPI::eJobNotFound;
346     int last_event_index = -1;
347 
348     const char* format = "%s \"%s\" from %s [invalid]\n";
349 
350     if (submit_job_handler.CheckJobStatusNotification(job_key,
351             &job_status, &last_event_index))
352         format = "%s \"%s\" from %s [valid, "
353                 "job_status=%s, last_event_index=%d]\n";
354 
355     printf(format, GetFastLocalTime().
356             AsString(s_NotificationTimestampFormat).c_str(),
357             submit_job_handler.GetMessage().c_str(),
358             server_host.c_str(),
359             CNetScheduleAPI::StatusToString(job_status).c_str(),
360             last_event_index);
361 }
362 
CheckJobInputStream(CNcbiOstream & job_input_ostream)363 void CGridCommandLineInterfaceApp::CheckJobInputStream(
364         CNcbiOstream& job_input_ostream)
365 {
366     if (job_input_ostream.fail()) {
367         NCBI_THROW(CIOException, eWrite, "Error while writing job input");
368     }
369 }
370 
PrepareRemoteAppJobInput(size_t max_embedded_input_size,const string & args,CNcbiIstream & remote_app_stdin,CNcbiOstream & job_input_ostream)371 void CGridCommandLineInterfaceApp::PrepareRemoteAppJobInput(
372         size_t max_embedded_input_size,
373         const string& args,
374         CNcbiIstream& remote_app_stdin,
375         CNcbiOstream& job_input_ostream)
376 {
377     CRemoteAppRequest request(m_GridClient->GetNetCacheAPI());
378 
379     // Roughly estimate the maximum embedded input size.
380     request.SetMaxInlineSize(max_embedded_input_size == 0 ?
381             numeric_limits<size_t>().max() :
382             max_embedded_input_size - max_embedded_input_size / 10);
383 
384     request.SetCmdLine(args);
385 
386     remote_app_stdin.peek();
387     if (!remote_app_stdin.eof())
388         NcbiStreamCopy(request.GetStdIn(), remote_app_stdin);
389 
390     request.Send(job_input_ostream);
391 }
392 
393 struct SBatchSubmitRecord {
394     CBatchSubmitAttrParser attr_parser;
395 
396     string job_input;
397     bool job_input_defined;
398     string remote_app_args;
399     bool remote_app_args_defined;
400     string affinity;
401     bool exclusive_job;
402 
SBatchSubmitRecordSBatchSubmitRecord403     SBatchSubmitRecord(istream* input_stream) : attr_parser(input_stream) {}
404 
405     bool LoadNextRecord();
406 };
407 
LoadNextRecord()408 bool SBatchSubmitRecord::LoadNextRecord()
409 {
410     if (!attr_parser.NextLine())
411         return false;
412 
413     job_input = kEmptyStr;
414     remote_app_args = kEmptyStr;
415     affinity = kEmptyStr;
416     job_input_defined = remote_app_args_defined = exclusive_job = false;
417 
418     while (attr_parser.NextAttribute())
419         switch (attr_parser.GetAttributeType()) {
420         case eInput:
421             if (job_input_defined) {
422                 NCBI_THROW_FMT(CArgException, eInvalidArg,
423                         "More than one \"input\" attribute is defined "
424                         "at line " << attr_parser.GetLineNumber());
425             }
426             job_input = attr_parser.GetAttributeValue();
427             job_input_defined = true;
428             break;
429         case eRemoteAppArgs:
430             if (remote_app_args_defined) {
431                 NCBI_THROW_FMT(CArgException, eInvalidArg,
432                         "More than one \"args\" attribute is defined "
433                         "at line " << attr_parser.GetLineNumber());
434             }
435             remote_app_args = attr_parser.GetAttributeValue();
436             remote_app_args_defined = true;
437             break;
438         case eAffinity:
439             affinity = attr_parser.GetAttributeValue();
440             break;
441         case eExclusiveJob:
442             exclusive_job = true;
443             break;
444         default:
445             _ASSERT(0);
446             break;
447         }
448 
449     if (!job_input_defined && !remote_app_args_defined) {
450         NCBI_THROW_FMT(CArgException, eInvalidArg, "\"input\" "
451                 "(and/or \"args\") attribute is required "
452                 "at line " << attr_parser.GetLineNumber());
453     }
454 
455     return true;
456 }
457 
x_LoadJobInput(size_t max_embedded_input_size,CNcbiOstream & job_input_ostream)458 void CGridCommandLineInterfaceApp::x_LoadJobInput(
459         size_t max_embedded_input_size, CNcbiOstream &job_input_ostream)
460 {
461     if (IsOptionSet(eRemoteAppArgs)) {
462         if (IsOptionSet(eInputFile))
463             PrepareRemoteAppJobInput(max_embedded_input_size,
464                     m_Opts.remote_app_args,
465                     *m_Opts.input_stream, job_input_ostream);
466         else {
467             CNcbiStrstream remote_app_stdin;
468             remote_app_stdin.write(m_Opts.input.data(),
469                     m_Opts.input.length());
470             PrepareRemoteAppJobInput(max_embedded_input_size,
471                     m_Opts.remote_app_args,
472                     remote_app_stdin, job_input_ostream);
473         }
474     } else if (IsOptionSet(eInput))
475         job_input_ostream.write(m_Opts.input.data(), m_Opts.input.length());
476     else
477         NcbiStreamCopy(job_input_ostream, *m_Opts.input_stream);
478 
479     CheckJobInputStream(job_input_ostream);
480 }
481 
SubmitJob_Batch()482 void CGridCommandLineInterfaceApp::SubmitJob_Batch()
483 {
484     SBatchSubmitRecord job_input_record(m_Opts.input_stream);
485 
486     size_t max_embedded_input_size = m_GridClient->GetMaxServerInputSize();
487 
488     if (m_Opts.batch_size <= 1) {
489         while (job_input_record.LoadNextRecord()) {
490             CNcbiOstream& job_input_ostream(m_GridClient->GetOStream());
491 
492             if (job_input_record.remote_app_args_defined) {
493                 CNcbiStrstream remote_app_stdin;
494                 remote_app_stdin.write(job_input_record.job_input.data(),
495                         job_input_record.job_input.length());
496                 PrepareRemoteAppJobInput(max_embedded_input_size,
497                         job_input_record.remote_app_args,
498                         remote_app_stdin, job_input_ostream);
499             } else {
500                 job_input_ostream.write(job_input_record.job_input.data(),
501                         job_input_record.job_input.length());
502             }
503 
504             CheckJobInputStream(job_input_ostream);
505 
506             if (!job_input_record.affinity.empty())
507                 m_GridClient->SetJobAffinity(job_input_record.affinity);
508 
509             if (job_input_record.exclusive_job)
510                 m_GridClient->SetJobMask(CNetScheduleAPI::eExclusiveJob);
511 
512             if (IsOptionSet(eJobGroup))
513                 m_GridClient->SetJobGroup(m_Opts.job_group);
514 
515             fprintf(m_Opts.output_stream,
516                 "%s\n", m_GridClient->Submit(m_Opts.affinity).c_str());
517         }
518     } else {
519         CGridJobBatchSubmitter& batch_submitter(
520             m_GridClient->GetJobBatchSubmitter());
521         unsigned remaining_batch_size = m_Opts.batch_size;
522 
523         while (job_input_record.LoadNextRecord()) {
524             if (remaining_batch_size == 0) {
525                 batch_submitter.Submit(m_Opts.job_group);
526                 const vector<CNetScheduleJob>& jobs =
527                     batch_submitter.GetBatch();
528                 ITERATE(vector<CNetScheduleJob>, it, jobs)
529                     fprintf(m_Opts.output_stream,
530                         "%s\n", it->job_id.c_str());
531                 batch_submitter.Reset();
532                 remaining_batch_size = m_Opts.batch_size;
533             }
534             batch_submitter.PrepareNextJob();
535 
536             CNcbiOstream& job_input_ostream(batch_submitter.GetOStream());
537 
538             if (job_input_record.remote_app_args_defined) {
539                 CNcbiStrstream remote_app_stdin;
540                 remote_app_stdin.write(job_input_record.job_input.data(),
541                         job_input_record.job_input.length());
542                 PrepareRemoteAppJobInput(max_embedded_input_size,
543                         job_input_record.remote_app_args,
544                         remote_app_stdin, job_input_ostream);
545             } else
546                 job_input_ostream.write(job_input_record.job_input.data(),
547                         job_input_record.job_input.length());
548 
549             CheckJobInputStream(job_input_ostream);
550 
551             if (!job_input_record.affinity.empty())
552                 batch_submitter.SetJobAffinity(job_input_record.affinity);
553 
554             if (job_input_record.exclusive_job)
555                 batch_submitter.SetJobMask(CNetScheduleAPI::eExclusiveJob);
556 
557             --remaining_batch_size;
558         }
559         if (remaining_batch_size < m_Opts.batch_size) {
560             batch_submitter.Submit(m_Opts.job_group);
561             const vector<CNetScheduleJob>& jobs =
562                 batch_submitter.GetBatch();
563             ITERATE(vector<CNetScheduleJob>, it, jobs)
564                 fprintf(m_Opts.output_stream, "%s\n", it->job_id.c_str());
565             batch_submitter.Reset();
566         }
567     }
568 }
569 
Cmd_SubmitJob()570 int CGridCommandLineInterfaceApp::Cmd_SubmitJob()
571 {
572     SetUp_NetScheduleCmd(eNetScheduleSubmitter);
573 
574     if (IsOptionSet(eBatch)) {
575         if (IsOptionSet(eJobInputDir)) {
576             NCBI_THROW(CArgException, eInvalidArg, "'--" JOB_INPUT_DIR_OPTION
577                 "' option is not supported in batch mode");
578         }
579         SubmitJob_Batch();
580     } else if (IsOptionSet(eJobInputDir)) {
581         CNetScheduleJob job;
582 
583         job.affinity = m_Opts.affinity;
584         job.group = m_Opts.job_group;
585 
586         CCompoundID job_key = m_CompoundIDPool.NewID(eCIC_GenericID);
587         job_key.AppendCurrentTime();
588         job_key.AppendRandom();
589         job.job_id = job_key.ToString();
590 
591         {{
592             CStringOrBlobStorageWriter job_input_writer(
593                     numeric_limits<size_t>().max(), NULL, job.input);
594             CWStream job_input_ostream(&job_input_writer, 0, NULL);
595 
596             x_LoadJobInput(0, job_input_ostream);
597         }}
598 
599         CNetScheduleJobSerializer job_serializer(job, m_CompoundIDPool);
600         job_serializer.SaveJobInput(m_Opts.job_input_dir, m_NetCacheAPI);
601 
602         PrintLine(job.job_id);
603     } else {
604         CNcbiOstream& job_input_ostream = m_GridClient->GetOStream();
605 
606         size_t max_embedded_input_size = m_GridClient->GetMaxServerInputSize();
607 
608         x_LoadJobInput(max_embedded_input_size, job_input_ostream);
609 
610         m_GridClient->SetJobGroup(m_Opts.job_group);
611         m_GridClient->SetJobAffinity(m_Opts.affinity);
612 
613         if (IsOptionSet(eExclusiveJob))
614             m_GridClient->SetJobMask(CNetScheduleAPI::eExclusiveJob);
615 
616         if (!IsOptionSet(eWaitTimeout))
617             PrintLine(m_GridClient->Submit());
618         else {
619             m_GridClient->CloseStream();
620 
621             CNetScheduleJob& job = m_GridClient->GetJob();
622 
623             CDeadline deadline(m_Opts.timeout, 0);
624 
625             CNetScheduleNotificationHandler submit_job_handler;
626 
627             submit_job_handler.SubmitJob(m_NetScheduleSubmitter,
628                 job, m_Opts.timeout);
629 
630             PrintLine(job.job_id);
631 
632             if (!IsOptionSet(eDumpNSNotifications)) {
633                 CNetScheduleAPI::EJobStatus status =
634                     submit_job_handler.WaitForJobCompletion
635                     (job, deadline, m_NetScheduleAPI);
636 
637                 PrintLine(CNetScheduleAPI::StatusToString(status));
638 
639                 if (status == CNetScheduleAPI::eDone) {
640                     if (IsOptionSet(eRemoteAppArgs))
641                         MarkOptionAsSet(eRemoteAppStdOut);
642                     DumpJobInputOutput(job.output);
643                 }
644             } else {
645                 submit_job_handler.PrintPortNumber();
646 
647                 string server_host;
648 
649                 if (submit_job_handler.WaitForNotification(deadline,
650                                                            &server_host))
651                     PrintJobStatusNotification(submit_job_handler,
652                             job.job_id, server_host);
653             }
654         }
655     }
656 
657     return 0;
658 }
659 
Cmd_WatchJob()660 int CGridCommandLineInterfaceApp::Cmd_WatchJob()
661 {
662     SetUp_NetScheduleCmd(eNetScheduleSubmitter);
663 
664     if (!IsOptionSet(eWaitTimeout)) {
665         fprintf(stderr, GRID_APP_NAME " " WATCHJOB_COMMAND
666             ": option '--" WAIT_TIMEOUT_OPTION "' is required.\n");
667         return 2;
668     }
669 
670     if (!IsOptionSet(eWaitForJobStatus) && !IsOptionSet(eWaitForJobEventAfter))
671         m_Opts.job_status_mask = ~((1 << CNetScheduleAPI::ePending) |
672                 (1 << CNetScheduleAPI::eRunning));
673 
674     CDeadline deadline(m_Opts.timeout, 0);
675 
676     CNetScheduleNotificationHandler submit_job_handler;
677 
678     CNetScheduleAPI::EJobStatus job_status;
679     int last_event_index = -1;
680 
681     tie(job_status, last_event_index, ignore) =
682         submit_job_handler.RequestJobWatching(m_NetScheduleAPI, m_Opts.id, deadline);
683 
684     if (job_status == CNetScheduleAPI::eJobNotFound) {
685         fprintf(stderr, GRID_APP_NAME ": unexpected error while "
686                 "setting up a job event listener.\n");
687         return 3;
688     }
689 
690     if (!IsOptionSet(eDumpNSNotifications)) {
691         if (last_event_index <= m_Opts.last_event_index &&
692                 (m_Opts.job_status_mask & (1 << job_status)) == 0)
693             job_status = submit_job_handler.WaitForJobEvent(m_Opts.id,
694                     deadline, m_NetScheduleAPI, m_Opts.job_status_mask,
695                     m_Opts.last_event_index, &last_event_index);
696 
697         printf("%d\n%s\n", last_event_index,
698                 CNetScheduleAPI::StatusToString(job_status).c_str());
699     } else {
700         if (last_event_index > m_Opts.last_event_index) {
701             fprintf(stderr, "Job event index (%d) has already "
702                     "exceeded %d; won't wait.\n",
703                     last_event_index, m_Opts.last_event_index);
704             return 6;
705         }
706         if ((m_Opts.job_status_mask & (1 << job_status)) != 0) {
707             fprintf(stderr, "Job is already '%s'; won't wait.\n",
708                     CNetScheduleAPI::StatusToString(job_status).c_str());
709             return 6;
710         }
711 
712         submit_job_handler.PrintPortNumber();
713 
714         string server_host;
715 
716         while (submit_job_handler.WaitForNotification(deadline,
717                                                       &server_host))
718             PrintJobStatusNotification(submit_job_handler,
719                     m_Opts.id, server_host);
720     }
721 
722     return 0;
723 }
724 
s_DumpStdStream(CNcbiIstream & std_stream,FILE * output_stream)725 static bool s_DumpStdStream(CNcbiIstream& std_stream, FILE* output_stream)
726 {
727     char buffer[IO_BUFFER_SIZE];
728     size_t bytes_read;
729 
730     std_stream.exceptions((ios::iostate) 0);
731 
732     while (!std_stream.eof()) {
733         std_stream.read(buffer, sizeof(buffer));
734         if (std_stream.fail() && !std_stream.eof())
735             return false;
736         bytes_read = (size_t) std_stream.gcount();
737 
738         // bytes_read could be zero due to EoF reported after read
739         if (bytes_read &&
740                 fwrite(buffer, bytes_read, 1, output_stream) != 1)
741             return false;
742     }
743 
744     return true;
745 }
746 
DumpJobInputOutput(const string & data_or_blob_id)747 int CGridCommandLineInterfaceApp::DumpJobInputOutput(
748     const string& data_or_blob_id)
749 {
750     const auto kStreamFlags = CRWStreambuf::fOwnReader | CRWStreambuf::fLeakExceptions;
751     unique_ptr<IReader> reader;
752 
753     try {
754         reader.reset(new CStringOrBlobStorageReader(data_or_blob_id,
755             m_NetCacheAPI));
756     }
757     catch (CStringOrBlobStorageRWException& e) {
758         // Allow the special case of jobs submitted bypassing the Grid API.
759 
760         if (e.GetErrCode() != CStringOrBlobStorageRWException::eInvalidFlag)
761             throw;
762 
763         if (IsOptionSet(eRemoteAppStdIn) ||
764                 IsOptionSet(eRemoteAppStdOut) ||
765                 IsOptionSet(eRemoteAppStdErr))
766             throw;
767 
768         if (fwrite(data_or_blob_id.data(), data_or_blob_id.length(), 1,
769                 m_Opts.output_stream) != 1)
770             goto Error;
771 
772         return 0;
773     }
774 
775     if (IsOptionSet(eRemoteAppStdIn)) {
776         CRemoteAppRequest request(m_NetCacheAPI);
777 
778         try {
779             CRStream input_stream(reader.release(), 0, 0, kStreamFlags);
780 
781             request.Deserialize(input_stream);
782         }
783         catch (exception&) {
784             fprintf(stderr, GRID_APP_NAME
785                     ": Cannot deserialize remote_app job input.\n");
786             return 3;
787         }
788 
789         if (!s_DumpStdStream(request.GetStdInForRead(), m_Opts.output_stream))
790             goto Error;
791 
792         return 0;
793     }
794 
795     if (IsOptionSet(eRemoteAppStdOut) || IsOptionSet(eRemoteAppStdErr)) {
796         CRStream input_stream(reader.release(), 0, 0, kStreamFlags);
797 
798         CRemoteAppResult remote_app_result(m_NetCacheAPI);
799         remote_app_result.Receive(input_stream);
800 
801         CNcbiIstream& std_stream = IsOptionSet(eRemoteAppStdOut) ?
802                 remote_app_result.GetStdOut() : remote_app_result.GetStdErr();
803 
804         if (!s_DumpStdStream(std_stream, m_Opts.output_stream))
805             goto Error;
806 
807         return 0;
808     }
809 
810     char buffer[IO_BUFFER_SIZE];
811     size_t bytes_read;
812 
813     while (reader->Read(buffer, sizeof(buffer), &bytes_read) != eRW_Eof)
814         if (fwrite(buffer, bytes_read, 1, m_Opts.output_stream) != 1)
815             goto Error;
816 
817     return 0;
818 
819 Error:
820     fprintf(stderr, GRID_APP_NAME ": I/O error.\n");
821     return 3;
822 }
823 
PrintJobAttrsAndDumpInput(const CNetScheduleJob & job)824 int CGridCommandLineInterfaceApp::PrintJobAttrsAndDumpInput(
825     const CNetScheduleJob& job)
826 {
827     PrintLine(job.job_id);
828     if (!job.auth_token.empty())
829         printf("%s ", job.auth_token.c_str());
830     if (!job.affinity.empty()) {
831         string affinity(NStr::PrintableString(job.affinity));
832         printf(job.mask & CNetScheduleAPI::eExclusiveJob ?
833             "affinity=\"%s\" exclusive\n" : "affinity=\"%s\"\n",
834             affinity.c_str());
835     } else
836         printf(job.mask & CNetScheduleAPI::eExclusiveJob ?
837             "exclusive\n" : "\n");
838     return DumpJobInputOutput(job.input);
839 }
840 
Cmd_GetJobInput()841 int CGridCommandLineInterfaceApp::Cmd_GetJobInput()
842 {
843     SetUp_NetScheduleCmd(eNetScheduleSubmitter);
844 
845     CNetScheduleJob job;
846     job.job_id = m_Opts.id;
847 
848     if (m_NetScheduleAPI.GetJobDetails(job) == CNetScheduleAPI::eJobNotFound) {
849         fprintf(stderr, GRID_APP_NAME ": job %s has expired.\n",
850                 job.job_id.c_str());
851         return 3;
852     }
853 
854     return DumpJobInputOutput(job.input);
855 }
856 
Cmd_GetJobOutput()857 int CGridCommandLineInterfaceApp::Cmd_GetJobOutput()
858 {
859     SetUp_NetScheduleCmd(eNetScheduleSubmitter);
860 
861     CNetScheduleJob job;
862     job.job_id = m_Opts.id;
863     CNetScheduleAPI::EJobStatus status = m_NetScheduleAPI.GetJobDetails(job);
864 
865     switch (status) {
866     case CNetScheduleAPI::eDone:
867     case CNetScheduleAPI::eReading:
868     case CNetScheduleAPI::eConfirmed:
869     case CNetScheduleAPI::eReadFailed:
870         break;
871 
872     default:
873         fprintf(stderr, "Warning: job is in %s status.\n",
874             CNetScheduleAPI::StatusToString(status).c_str());
875     }
876 
877     return DumpJobInputOutput(job.output);
878 }
879 
Cmd_ReadJob()880 int CGridCommandLineInterfaceApp::Cmd_ReadJob()
881 {
882     SetUp_NetScheduleCmd(eNetScheduleSubmitter);
883 
884     if (!IsOptionSet(eConfirmRead) && !IsOptionSet(eFailRead) &&
885             !IsOptionSet(eRollbackRead)) {
886         CNetScheduleJob job;
887         CNetScheduleAPI::EJobStatus job_status;
888 
889         CNetScheduleJobReader job_reader(m_NetScheduleAPI.GetJobReader(
890                     m_Opts.job_group, m_Opts.affinity));
891 
892         CNetScheduleJobReader::EReadNextJobResult rnj_result;
893 
894         if (!IsOptionSet(eWaitTimeout))
895             rnj_result = job_reader.ReadNextJob(&job, &job_status);
896         else {
897             CTimeout timeout(m_Opts.timeout);
898             rnj_result = job_reader.ReadNextJob(&job, &job_status, &timeout);
899         }
900 
901         switch (rnj_result) {
902         case CNetScheduleJobReader::eRNJ_JobReady:
903             PrintLine(job.job_id);
904             PrintLine(CNetScheduleAPI::StatusToString(job_status));
905 
906             if (IsOptionSet(eReliableRead))
907                 PrintLine(job.auth_token);
908             else {
909                 if (job_status == CNetScheduleAPI::eDone) {
910                     m_NetScheduleSubmitter.GetJobDetails(job);
911                     int ret_code = DumpJobInputOutput(job.output);
912                     if (ret_code != 0)
913                         return ret_code;
914                 }
915                 m_NetScheduleSubmitter.ReadConfirm(job.job_id, job.auth_token);
916             }
917             break;
918 
919         case CNetScheduleJobReader::eRNJ_Interrupt:
920             return 3;
921 
922         case CNetScheduleJobReader::eRNJ_NotReady:
923             if (IsOptionSet(eWaitTimeout)) {
924                 PrintLine("TIMEOUT");
925             }
926 
927             return 0;
928 
929         case CNetScheduleJobReader::eRNJ_NoMoreJobs:
930             PrintLine("NOJOBS");
931             return 0;
932         }
933     } else {
934         if (!IsOptionSet(eJobId)) {
935             fprintf(stderr, GRID_APP_NAME " " READJOB_COMMAND
936                 ": option '--" JOB_ID_OPTION "' is required.\n");
937             return 2;
938         }
939 
940         if (IsOptionSet(eConfirmRead))
941             m_NetScheduleSubmitter.ReadConfirm(m_Opts.id, m_Opts.auth_token);
942         else if (IsOptionSet(eFailRead))
943             m_NetScheduleSubmitter.ReadFail(m_Opts.id, m_Opts.auth_token,
944                     m_Opts.error_message);
945         else
946             m_NetScheduleSubmitter.ReadRollback(m_Opts.id, m_Opts.auth_token);
947     }
948 
949     return 0;
950 }
951 
Cmd_CancelJob()952 int CGridCommandLineInterfaceApp::Cmd_CancelJob()
953 {
954     if (IsOptionSet(eJobGroup)) {
955         SetUp_NetScheduleCmd(eNetScheduleAPI);
956 
957         m_NetScheduleAPI.GetSubmitter().CancelJobGroup(m_Opts.job_group,
958                 m_Opts.job_statuses);
959     } else if (IsOptionSet(eAllJobs) || !m_Opts.job_statuses.empty()) {
960         SetUp_NetScheduleCmd(eNetScheduleAdmin, eAdminCmdWithSideEffects);
961 
962         m_NetScheduleAdmin.CancelAllJobs(m_Opts.job_statuses);
963     } else {
964         SetUp_NetScheduleCmd(eNetScheduleAPI);
965 
966         m_NetScheduleAPI.GetSubmitter().CancelJob(m_Opts.id);
967     }
968 
969     return 0;
970 }
971 
Cmd_RequestJob()972 int CGridCommandLineInterfaceApp::Cmd_RequestJob()
973 {
974     SetUp_NetScheduleCmd(eNetScheduleExecutor);
975 
976     CNetScheduleExecutor::EJobAffinityPreference affinity_preference =
977             CNetScheduleExecutor::eExplicitAffinitiesOnly;
978 
979     switch (IsOptionSet(eUsePreferredAffinities, OPTION_N(0)) |
980             IsOptionSet(eClaimNewAffinities, OPTION_N(1)) |
981             IsOptionSet(eAnyAffinity, OPTION_N(2))) {
982     case OPTION_N(2) + OPTION_N(0):
983         affinity_preference = CNetScheduleExecutor::ePreferredAffsOrAnyJob;
984         break;
985 
986     case OPTION_N(1):
987     case OPTION_N(1) + OPTION_N(0):
988         affinity_preference = CNetScheduleExecutor::eClaimNewPreferredAffs;
989         break;
990 
991     case OPTION_N(0):
992         affinity_preference = CNetScheduleExecutor::ePreferredAffinities;
993         break;
994 
995     case 0:
996         if (IsOptionSet(eAffinityList))
997             break;
998         /* FALL THROUGH */
999 
1000     case OPTION_N(2):
1001         affinity_preference = CNetScheduleExecutor::eAnyJob;
1002         break;
1003 
1004     default:
1005         fprintf(stderr, GRID_APP_NAME ": options '--"
1006             CLAIM_NEW_AFFINITIES_OPTION "' and '--" ANY_AFFINITY_OPTION
1007             "' are mutually exclusive.\n");
1008         return 2;
1009     }
1010 
1011     m_NetScheduleExecutor.SetAffinityPreference(affinity_preference);
1012 
1013     CNetScheduleJob job;
1014 
1015     if (!IsOptionSet(eDumpNSNotifications)) {
1016         if (m_NetScheduleExecutor.GetJob(job, m_Opts.timeout, m_Opts.affinity))
1017             return PrintJobAttrsAndDumpInput(job);
1018     } else {
1019         CDeadline deadline(m_Opts.timeout, 0);
1020 
1021         CNetScheduleNotificationHandler wait_job_handler;
1022 
1023         wait_job_handler.PrintPortNumber();
1024 
1025         string cmd(CNetScheduleNotificationHandler::MkBaseGETCmd(
1026                 affinity_preference, m_Opts.affinity));
1027 
1028         wait_job_handler.CmdAppendTimeoutGroupAndClientInfo(cmd,
1029                 &deadline, m_Opts.job_group);
1030 
1031         if (wait_job_handler.RequestJob(m_NetScheduleExecutor, job, cmd)) {
1032             fprintf(stderr, "%s\nA job has been returned; won't wait.\n",
1033                     job.job_id.c_str());
1034             return 6;
1035         }
1036 
1037         string server_host;
1038         CNetServer server;
1039         string server_address;
1040 
1041         while (wait_job_handler.WaitForNotification(deadline,
1042                                                     &server_host)) {
1043             const char* format = "%s \"%s\" from %s [invalid]\n";
1044 
1045             if (wait_job_handler.CheckRequestJobNotification(
1046                     m_NetScheduleExecutor, &server)) {
1047                 server_address = server.GetServerAddress();
1048                 format = "%s \"%s\" from %s [valid, server=%s]\n";
1049             }
1050 
1051             printf(format, GetFastLocalTime().AsString(
1052                     s_NotificationTimestampFormat).c_str(),
1053                     wait_job_handler.GetMessage().c_str(),
1054                     server_host.c_str(),
1055                     server_address.c_str());
1056         }
1057     }
1058 
1059     return 0;
1060 }
1061 
Cmd_CommitJob()1062 int CGridCommandLineInterfaceApp::Cmd_CommitJob()
1063 {
1064     SetUp_NetScheduleCmd(eNetScheduleExecutor);
1065 
1066     CNetScheduleJob job;
1067 
1068     job.job_id = m_Opts.id;
1069     job.ret_code = m_Opts.return_code;
1070     job.auth_token = m_Opts.auth_token;
1071 
1072     if (IsOptionSet(eJobOutputBlob))
1073         job.output = "K " + m_Opts.job_output_blob;
1074     else {
1075         const auto kMaxOutputSize = m_NetScheduleAPI.GetServerParams().max_output_size;
1076         CStringOrBlobStorageWriter writer(kMaxOutputSize, m_NetCacheAPI, job.output);
1077 
1078         if (!IsOptionSet(eJobOutput)) {
1079             char buffer[IO_BUFFER_SIZE];
1080 
1081             do {
1082                 m_Opts.input_stream->read(buffer, sizeof(buffer));
1083                 if (m_Opts.input_stream->fail() &&
1084                         !m_Opts.input_stream->eof()) {
1085                     NCBI_THROW(CIOException, eRead,
1086                             "Error while reading job output data");
1087                 }
1088                 if (writer.Write(buffer,
1089                         (size_t) m_Opts.input_stream->gcount()) != eRW_Success)
1090                     goto ErrorExit;
1091             } while (!m_Opts.input_stream->eof());
1092         } else
1093             if (writer.Write(m_Opts.job_output.data(),
1094                     m_Opts.job_output.length()) != eRW_Success)
1095                 goto ErrorExit;
1096     }
1097 
1098     if (!IsOptionSet(eFailJob))
1099         m_NetScheduleExecutor.PutResult(job);
1100     else {
1101         job.error_msg = m_Opts.error_message;
1102         m_NetScheduleExecutor.PutFailure(job);
1103     }
1104 
1105     return 0;
1106 
1107 ErrorExit:
1108     fprintf(stderr, GRID_APP_NAME ": error while sending job output.\n");
1109     return 3;
1110 }
1111 
Cmd_ReturnJob()1112 int CGridCommandLineInterfaceApp::Cmd_ReturnJob()
1113 {
1114     SetUp_NetScheduleCmd(eNetScheduleExecutor);
1115 
1116     CNetScheduleJob job;
1117     job.job_id = m_Opts.id;
1118     job.auth_token = m_Opts.auth_token;
1119 
1120     m_NetScheduleExecutor.ReturnJob(job);
1121 
1122     return 0;
1123 }
1124 
Cmd_ClearNode()1125 int CGridCommandLineInterfaceApp::Cmd_ClearNode()
1126 {
1127     SetUp_NetScheduleCmd(eNetScheduleExecutor);
1128 
1129     m_NetScheduleExecutor.ClearNode();
1130 
1131     return 0;
1132 }
1133 
Cmd_UpdateJob()1134 int CGridCommandLineInterfaceApp::Cmd_UpdateJob()
1135 {
1136     SetUp_NetScheduleCmd(eNetScheduleAPI);
1137 
1138     if (IsOptionSet(eExtendLifetime) ||
1139             IsOptionSet(eProgressMessage)) {
1140         CNetScheduleExecutor executor(m_NetScheduleAPI.GetExecutor());
1141 
1142         CNetScheduleJob job;
1143         job.job_id = m_Opts.id;
1144 
1145         if (IsOptionSet(eExtendLifetime))
1146             executor.JobDelayExpiration(job,
1147                     (unsigned) m_Opts.extend_lifetime_by);
1148 
1149         if (IsOptionSet(eProgressMessage)) {
1150             job.progress_msg = m_Opts.progress_message;
1151             executor.PutProgressMsg(job);
1152         }
1153     }
1154 
1155     return 0;
1156 }
1157 
Cmd_QueueInfo()1158 int CGridCommandLineInterfaceApp::Cmd_QueueInfo()
1159 {
1160     SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd, false);
1161 
1162     if (!IsOptionSet(eQueueClasses)) {
1163         if ((IsOptionSet(eQueueArg) ^ IsOptionSet(eAllQueues)) == 0) {
1164             fprintf(stderr, GRID_APP_NAME " " QUEUEINFO_COMMAND
1165                     ": either the '" QUEUE_ARG "' argument or the '--"
1166                     ALL_QUEUES_OPTION "' option must be specified.\n");
1167             return 1;
1168         }
1169         if (m_Opts.output_format == eJSON)
1170             g_PrintJSON(stdout, g_QueueInfoToJson(m_NetScheduleAPI,
1171                     IsOptionSet(eQueueArg) ? m_Opts.queue : kEmptyStr));
1172         else if (!IsOptionSet(eAllQueues))
1173             m_NetScheduleAdmin.PrintQueueInfo(m_Opts.queue, NcbiCout);
1174         else {
1175             m_NetScheduleAPI.GetService().PrintCmdOutput("STAT QUEUES",
1176                     NcbiCout, CNetService::eMultilineOutput);
1177         }
1178     } else if (m_Opts.output_format == eJSON)
1179         g_PrintJSON(stdout, g_QueueClassInfoToJson(m_NetScheduleAPI));
1180     else
1181         m_NetScheduleAPI.GetService().PrintCmdOutput("STAT QCLASSES",
1182                 NcbiCout, CNetService::eMultilineOutput);
1183 
1184     return 0;
1185 }
1186 
Cmd_DumpQueue()1187 int CGridCommandLineInterfaceApp::Cmd_DumpQueue()
1188 {
1189     SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd);
1190 
1191     m_NetScheduleAdmin.DumpQueue(NcbiCout, m_Opts.start_after_job,
1192             m_Opts.job_count, m_Opts.job_statuses, m_Opts.job_group);
1193 
1194     return 0;
1195 }
1196 
Cmd_CreateQueue()1197 int CGridCommandLineInterfaceApp::Cmd_CreateQueue()
1198 {
1199     SetUp_NetScheduleCmd(eNetScheduleAdmin, eAdminCmdWithSideEffects);
1200 
1201     m_NetScheduleAdmin.CreateQueue(m_Opts.id,
1202             m_Opts.queue_class, m_Opts.queue_description);
1203 
1204     return 0;
1205 }
1206 
Cmd_GetQueueList()1207 int CGridCommandLineInterfaceApp::Cmd_GetQueueList()
1208 {
1209     SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd, false);
1210 
1211     CNetScheduleAdmin::TQueueList queues;
1212 
1213     m_NetScheduleAdmin.GetQueueList(queues);
1214 
1215     typedef set<string> TServerSet;
1216     typedef map<string, TServerSet> TQueueRegister;
1217 
1218     TQueueRegister queue_register;
1219 
1220     ITERATE (CNetScheduleAdmin::TQueueList, it, queues) {
1221         string server_address = it->server.GetServerAddress();
1222 
1223         ITERATE(std::list<std::string>, queue, it->queues) {
1224             queue_register[*queue].insert(server_address);
1225         }
1226     }
1227 
1228     ITERATE(TQueueRegister, it, queue_register) {
1229         NcbiCout << it->first;
1230         if (it->second.size() != queues.size()) {
1231             const char* sep = " (limited to ";
1232             ITERATE(TServerSet, server, it->second) {
1233                 NcbiCout << sep << *server;
1234                 sep = ", ";
1235             }
1236             NcbiCout << ")";
1237         }
1238         NcbiCout << NcbiEndl;
1239     }
1240 
1241     return 0;
1242 }
1243 
Cmd_DeleteQueue()1244 int CGridCommandLineInterfaceApp::Cmd_DeleteQueue()
1245 {
1246     SetUp_NetScheduleCmd(eNetScheduleAdmin, eAdminCmdWithSideEffects);
1247 
1248     m_NetScheduleAdmin.DeleteQueue(m_Opts.id);
1249 
1250     return 0;
1251 }
1252 
1253 END_NCBI_SCOPE
1254