1 /* $Id: ns_cmds.cpp 607703 2020-05-06 16:57:56Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitry Kazimirov
27 *
28 * File Description: NetSchedule-specific commands of the grid_cli application.
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "ns_cmd_impl.hpp"
35 #include "util.hpp"
36
37 #include <connect/services/remote_app.hpp>
38 #include <connect/services/grid_rw_impl.hpp>
39 #include <connect/services/ns_job_serializer.hpp>
40
41 #include <corelib/rwstream.hpp>
42
43 #include <ctype.h>
44
45 BEGIN_NCBI_SCOPE
46
SetUp_NetScheduleCmd(CGridCommandLineInterfaceApp::EAPIClass api_class,CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity,bool require_queue)47 void CGridCommandLineInterfaceApp::SetUp_NetScheduleCmd(
48 CGridCommandLineInterfaceApp::EAPIClass api_class,
49 CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity, bool require_queue)
50 {
51 if ((api_class == eNetScheduleSubmitter) || (api_class == eNetScheduleExecutor))
52 SetUp_NetCacheCmd(false, false, false);
53
54 m_APIClass = api_class;
55
56 string service = m_Opts.ns_service;
57 string queue = m_Opts.queue;
58
59 const bool job_provided = IsOptionSet(eID) || IsOptionSet(eJobId);
60 const bool service_provided = IsOptionExplicitlySet(eNetSchedule);
61
62 if (IsOptionSet(eWorkerNode)) {
63 m_NetScheduleAPI = CNetScheduleAPIExt::CreateWnCompat(service, m_Opts.auth);
64
65 } else {
66 if (job_provided) {
67 CNetScheduleKey key(m_Opts.id, m_CompoundIDPool);
68
69 if (queue.empty())
70 queue = key.queue;
71
72 if (!service_provided) {
73 key.host.push_back(':');
74 key.host.append(NStr::NumericToString(key.port));
75 service = key.host;
76 }
77 } else if (!IsOptionSet(eNetSchedule)) {
78 NCBI_THROW(CArgException, eNoValue, "'--" NETSCHEDULE_OPTION "' option is required.");
79
80 } else if (!IsOptionSet(eQueue) && !IsOptionSet(eTargetQueueArg) && require_queue) {
81 NCBI_THROW(CArgException, eNoValue, "'--" QUEUE_OPTION "' option is required.");
82
83 }
84
85 m_NetScheduleAPI = CNetScheduleAPIExt::CreateNoCfgLoad(service, m_Opts.auth, queue);
86 }
87
88 if (job_provided && service_provided) {
89 if (auto address = SSocketAddress::Parse(m_Opts.ns_service)) {
90 m_NetScheduleAPI.GetService().GetServerPool().StickToServer(move(address));
91 } else {
92 NCBI_THROW(CArgException, eInvalidArg,
93 "When job ID is given, '--" NETSCHEDULE_OPTION "' "
94 "must be a host:port server address.");
95 }
96 }
97
98 if (m_AdminMode)
99 m_NetScheduleAPI.SetClientType(CNetScheduleAPI::eCT_Admin);
100
101 auto warning_handler = [&](const string& m, CNetServer s) {
102 return OnWarning(m_APIClass != eWorkerNodeAdmin, m, s);
103 };
104
105 m_NetScheduleAPI.GetService().SetWarningHandler(warning_handler);
106
107 if (IsOptionSet(eCompatMode)) {
108 m_NetScheduleAPI.UseOldStyleAuth();
109 }
110
111 // Specialize NetSchedule API.
112 switch (api_class) {
113 case eNetScheduleAPI:
114 break;
115 case eNetScheduleAdmin:
116 if (cmd_severity != eReadOnlyAdminCmd) {
117 if (!service_provided) {
118 NCBI_THROW(CArgException, eNoValue, "'--" NETSCHEDULE_OPTION
119 "' must be explicitly specified.");
120 }
121 if (IsOptionAcceptedAndSetImplicitly(eQueue)) {
122 NCBI_THROW(CArgException, eNoValue, "'--" QUEUE_OPTION
123 "' must be specified explicitly (not via $"
124 LOGIN_TOKEN_ENV ").");
125 }
126 }
127 /* FALL THROUGH */
128 case eWorkerNodeAdmin:
129 m_NetScheduleAdmin = m_NetScheduleAPI.GetAdmin();
130 break;
131 case eNetScheduleExecutor:
132 m_NetScheduleExecutor = m_NetScheduleAPI.GetExecutor();
133
134 if (!IsOptionSet(eLoginToken) &&
135 !IsOptionSet(eClientNode) && !IsOptionSet(eClientSession)) {
136 NCBI_THROW(CArgException, eNoArg, "client identification required "
137 "(see the '" LOGIN_COMMAND "' command).");
138 }
139
140 if (IsOptionSet(eJobGroup))
141 m_NetScheduleExecutor.SetJobGroup(m_Opts.job_group);
142
143 /* FALL THROUGH */
144 case eNetScheduleSubmitter:
145 m_NetScheduleSubmitter = m_NetScheduleAPI.GetSubmitter();
146
147 m_GridClient.reset(new CGridClient(m_NetScheduleSubmitter,
148 m_NetCacheAPI, CGridClient::eManualCleanup,
149 CGridClient::eProgressMsgOn));
150 break;
151 default:
152 _ASSERT(0);
153 break;
154 }
155
156 if (IsOptionSet(eClientNode)) {
157 m_NetScheduleAPI.ReSetClientNode(m_Opts.client_node);
158 }
159
160 if (IsOptionSet(eClientSession)) {
161 m_NetScheduleAPI.ReSetClientSession(m_Opts.client_session);
162 }
163 }
164
JobInfo_PrintStatus(CNetScheduleAPI::EJobStatus status)165 void CGridCommandLineInterfaceApp::JobInfo_PrintStatus(
166 CNetScheduleAPI::EJobStatus status)
167 {
168 string job_status(CNetScheduleAPI::StatusToString(status));
169
170 if (m_Opts.output_format == eJSON)
171 printf("{\"status\": \"%s\"}\n", job_status.c_str());
172 else
173 PrintLine(job_status);
174 }
175
Cmd_JobInfo()176 int CGridCommandLineInterfaceApp::Cmd_JobInfo()
177 {
178 SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd);
179
180 if (IsOptionSet(eDeferExpiration)) {
181 CNetScheduleAPI::EJobStatus status =
182 m_NetScheduleAPI.GetSubmitter().GetJobStatus(m_Opts.id);
183 if (IsOptionSet(eStatusOnly)) {
184 JobInfo_PrintStatus(status);
185 return 0;
186 }
187 } else if (IsOptionSet(eStatusOnly)) {
188 CNetScheduleJob job;
189 job.job_id = m_Opts.id;
190 JobInfo_PrintStatus(m_NetScheduleAPI.GetExecutor().GetJobStatus(job));
191 return 0;
192 }
193
194 if (IsOptionSet(eProgressMessageOnly)) {
195 CNetScheduleJob job;
196 job.job_id = m_Opts.id;
197 m_NetScheduleAPI.GetProgressMsg(job);
198 if (m_Opts.output_format == eJSON)
199 printf("{\"progress_message\": \"%s\"}\n",
200 job.progress_msg.c_str());
201 else
202 if (!job.progress_msg.empty())
203 PrintLine(job.progress_msg);
204 return 0;
205 }
206
207 switch (m_Opts.output_format) {
208 case eRaw:
209 if (IsOptionSet(eBrief)) {
210 fprintf(stderr, GRID_APP_NAME " " JOBINFO_COMMAND ": option '--"
211 BRIEF_OPTION "' cannot be used with '"
212 RAW_OUTPUT_FORMAT "' output format.\n");
213 return 2;
214 }
215 m_NetScheduleAdmin.DumpJob(NcbiCout, m_Opts.id);
216 break;
217
218 case eJSON:
219 {
220 CJobInfoToJSON job_info_to_json;
221 g_ProcessJobInfo(m_NetScheduleAPI, m_Opts.id,
222 &job_info_to_json, !IsOptionSet(eBrief), m_CompoundIDPool);
223 g_PrintJSON(stdout, job_info_to_json.GetRootNode());
224 }
225 break;
226
227 default:
228 {
229 CPrintJobInfo print_job_info;
230 g_ProcessJobInfo(m_NetScheduleAPI, m_Opts.id,
231 &print_job_info, !IsOptionSet(eBrief), m_CompoundIDPool);
232 }
233 }
234
235 return 0;
236 }
237
238 class CBatchSubmitAttrParser
239 {
240 public:
CBatchSubmitAttrParser(istream * input_stream)241 CBatchSubmitAttrParser(istream* input_stream) :
242 m_InputStream(input_stream),
243 m_LineNumber(0)
244 {
245 }
246 bool NextLine();
247 bool NextAttribute();
GetAttributeType() const248 EOption GetAttributeType() const {return m_JobAttribute;}
GetAttributeValue() const249 const string& GetAttributeValue() const {return m_JobAttributeValue;}
GetLineNumber() const250 size_t GetLineNumber() const {return m_LineNumber;}
251
252 private:
253 istream* m_InputStream;
254 size_t m_LineNumber;
255 string m_Line;
256 CAttrListParser m_AttrParser;
257 EOption m_JobAttribute;
258 string m_JobAttributeValue;
259 };
260
NextLine()261 bool CBatchSubmitAttrParser::NextLine()
262 {
263 if (m_InputStream == NULL)
264 return false;
265
266 ++m_LineNumber;
267
268 getline(*m_InputStream, m_Line);
269
270 if (m_InputStream->fail()) {
271 m_InputStream = NULL;
272 return false;
273 }
274
275 if (m_InputStream->eof()) {
276 m_InputStream = NULL;
277 if (m_Line.empty())
278 return false;
279 }
280
281 m_AttrParser.Reset(m_Line);
282 return true;
283 }
284
NextAttribute()285 bool CBatchSubmitAttrParser::NextAttribute()
286 {
287 m_JobAttribute = eUntypedArg;
288
289 #define ATTR_CHECK_SET(name, type) \
290 if (attr_name.length() == sizeof(name) - 1 && \
291 memcmp(attr_name.data(), name, sizeof(name) - 1) == 0) { \
292 m_JobAttribute = type; \
293 break; \
294 }
295
296 CTempString attr_name;
297 size_t attr_column;
298
299 CAttrListParser::ENextAttributeType next_attr_type =
300 m_AttrParser.NextAttribute(&attr_name,
301 &m_JobAttributeValue, &attr_column);
302
303 if (next_attr_type == CAttrListParser::eNoMoreAttributes)
304 return false;
305
306 switch (attr_name[0]) {
307 case 'i':
308 ATTR_CHECK_SET("input", eInput);
309 break;
310 case 'a':
311 ATTR_CHECK_SET("args", eRemoteAppArgs);
312 ATTR_CHECK_SET("affinity", eAffinity);
313 break;
314 case 'e':
315 ATTR_CHECK_SET("exclusive", eExclusiveJob);
316 }
317
318 #define ATTR_POS " at line " << m_LineNumber << ", column " << attr_column
319
320 switch (m_JobAttribute) {
321 case eUntypedArg:
322 NCBI_THROW_FMT(CArgException, eInvalidArg,
323 "unknown attribute " << attr_name << ATTR_POS);
324
325 case eExclusiveJob:
326 break;
327
328 default:
329 if (next_attr_type != CAttrListParser::eAttributeWithValue) {
330 NCBI_THROW_FMT(CArgException, eInvalidArg,
331 "attribute " << attr_name << " requires a value" ATTR_POS);
332 }
333 }
334
335 return true;
336 }
337
338 static const string s_NotificationTimestampFormat("Y/M/D h:m:s.l");
339
PrintJobStatusNotification(CNetScheduleNotificationHandler & submit_job_handler,const string & job_key,const string & server_host)340 void CGridCommandLineInterfaceApp::PrintJobStatusNotification(
341 CNetScheduleNotificationHandler& submit_job_handler,
342 const string& job_key,
343 const string& server_host)
344 {
345 CNetScheduleAPI::EJobStatus job_status = CNetScheduleAPI::eJobNotFound;
346 int last_event_index = -1;
347
348 const char* format = "%s \"%s\" from %s [invalid]\n";
349
350 if (submit_job_handler.CheckJobStatusNotification(job_key,
351 &job_status, &last_event_index))
352 format = "%s \"%s\" from %s [valid, "
353 "job_status=%s, last_event_index=%d]\n";
354
355 printf(format, GetFastLocalTime().
356 AsString(s_NotificationTimestampFormat).c_str(),
357 submit_job_handler.GetMessage().c_str(),
358 server_host.c_str(),
359 CNetScheduleAPI::StatusToString(job_status).c_str(),
360 last_event_index);
361 }
362
CheckJobInputStream(CNcbiOstream & job_input_ostream)363 void CGridCommandLineInterfaceApp::CheckJobInputStream(
364 CNcbiOstream& job_input_ostream)
365 {
366 if (job_input_ostream.fail()) {
367 NCBI_THROW(CIOException, eWrite, "Error while writing job input");
368 }
369 }
370
PrepareRemoteAppJobInput(size_t max_embedded_input_size,const string & args,CNcbiIstream & remote_app_stdin,CNcbiOstream & job_input_ostream)371 void CGridCommandLineInterfaceApp::PrepareRemoteAppJobInput(
372 size_t max_embedded_input_size,
373 const string& args,
374 CNcbiIstream& remote_app_stdin,
375 CNcbiOstream& job_input_ostream)
376 {
377 CRemoteAppRequest request(m_GridClient->GetNetCacheAPI());
378
379 // Roughly estimate the maximum embedded input size.
380 request.SetMaxInlineSize(max_embedded_input_size == 0 ?
381 numeric_limits<size_t>().max() :
382 max_embedded_input_size - max_embedded_input_size / 10);
383
384 request.SetCmdLine(args);
385
386 remote_app_stdin.peek();
387 if (!remote_app_stdin.eof())
388 NcbiStreamCopy(request.GetStdIn(), remote_app_stdin);
389
390 request.Send(job_input_ostream);
391 }
392
393 struct SBatchSubmitRecord {
394 CBatchSubmitAttrParser attr_parser;
395
396 string job_input;
397 bool job_input_defined;
398 string remote_app_args;
399 bool remote_app_args_defined;
400 string affinity;
401 bool exclusive_job;
402
SBatchSubmitRecordSBatchSubmitRecord403 SBatchSubmitRecord(istream* input_stream) : attr_parser(input_stream) {}
404
405 bool LoadNextRecord();
406 };
407
LoadNextRecord()408 bool SBatchSubmitRecord::LoadNextRecord()
409 {
410 if (!attr_parser.NextLine())
411 return false;
412
413 job_input = kEmptyStr;
414 remote_app_args = kEmptyStr;
415 affinity = kEmptyStr;
416 job_input_defined = remote_app_args_defined = exclusive_job = false;
417
418 while (attr_parser.NextAttribute())
419 switch (attr_parser.GetAttributeType()) {
420 case eInput:
421 if (job_input_defined) {
422 NCBI_THROW_FMT(CArgException, eInvalidArg,
423 "More than one \"input\" attribute is defined "
424 "at line " << attr_parser.GetLineNumber());
425 }
426 job_input = attr_parser.GetAttributeValue();
427 job_input_defined = true;
428 break;
429 case eRemoteAppArgs:
430 if (remote_app_args_defined) {
431 NCBI_THROW_FMT(CArgException, eInvalidArg,
432 "More than one \"args\" attribute is defined "
433 "at line " << attr_parser.GetLineNumber());
434 }
435 remote_app_args = attr_parser.GetAttributeValue();
436 remote_app_args_defined = true;
437 break;
438 case eAffinity:
439 affinity = attr_parser.GetAttributeValue();
440 break;
441 case eExclusiveJob:
442 exclusive_job = true;
443 break;
444 default:
445 _ASSERT(0);
446 break;
447 }
448
449 if (!job_input_defined && !remote_app_args_defined) {
450 NCBI_THROW_FMT(CArgException, eInvalidArg, "\"input\" "
451 "(and/or \"args\") attribute is required "
452 "at line " << attr_parser.GetLineNumber());
453 }
454
455 return true;
456 }
457
x_LoadJobInput(size_t max_embedded_input_size,CNcbiOstream & job_input_ostream)458 void CGridCommandLineInterfaceApp::x_LoadJobInput(
459 size_t max_embedded_input_size, CNcbiOstream &job_input_ostream)
460 {
461 if (IsOptionSet(eRemoteAppArgs)) {
462 if (IsOptionSet(eInputFile))
463 PrepareRemoteAppJobInput(max_embedded_input_size,
464 m_Opts.remote_app_args,
465 *m_Opts.input_stream, job_input_ostream);
466 else {
467 CNcbiStrstream remote_app_stdin;
468 remote_app_stdin.write(m_Opts.input.data(),
469 m_Opts.input.length());
470 PrepareRemoteAppJobInput(max_embedded_input_size,
471 m_Opts.remote_app_args,
472 remote_app_stdin, job_input_ostream);
473 }
474 } else if (IsOptionSet(eInput))
475 job_input_ostream.write(m_Opts.input.data(), m_Opts.input.length());
476 else
477 NcbiStreamCopy(job_input_ostream, *m_Opts.input_stream);
478
479 CheckJobInputStream(job_input_ostream);
480 }
481
SubmitJob_Batch()482 void CGridCommandLineInterfaceApp::SubmitJob_Batch()
483 {
484 SBatchSubmitRecord job_input_record(m_Opts.input_stream);
485
486 size_t max_embedded_input_size = m_GridClient->GetMaxServerInputSize();
487
488 if (m_Opts.batch_size <= 1) {
489 while (job_input_record.LoadNextRecord()) {
490 CNcbiOstream& job_input_ostream(m_GridClient->GetOStream());
491
492 if (job_input_record.remote_app_args_defined) {
493 CNcbiStrstream remote_app_stdin;
494 remote_app_stdin.write(job_input_record.job_input.data(),
495 job_input_record.job_input.length());
496 PrepareRemoteAppJobInput(max_embedded_input_size,
497 job_input_record.remote_app_args,
498 remote_app_stdin, job_input_ostream);
499 } else {
500 job_input_ostream.write(job_input_record.job_input.data(),
501 job_input_record.job_input.length());
502 }
503
504 CheckJobInputStream(job_input_ostream);
505
506 if (!job_input_record.affinity.empty())
507 m_GridClient->SetJobAffinity(job_input_record.affinity);
508
509 if (job_input_record.exclusive_job)
510 m_GridClient->SetJobMask(CNetScheduleAPI::eExclusiveJob);
511
512 if (IsOptionSet(eJobGroup))
513 m_GridClient->SetJobGroup(m_Opts.job_group);
514
515 fprintf(m_Opts.output_stream,
516 "%s\n", m_GridClient->Submit(m_Opts.affinity).c_str());
517 }
518 } else {
519 CGridJobBatchSubmitter& batch_submitter(
520 m_GridClient->GetJobBatchSubmitter());
521 unsigned remaining_batch_size = m_Opts.batch_size;
522
523 while (job_input_record.LoadNextRecord()) {
524 if (remaining_batch_size == 0) {
525 batch_submitter.Submit(m_Opts.job_group);
526 const vector<CNetScheduleJob>& jobs =
527 batch_submitter.GetBatch();
528 ITERATE(vector<CNetScheduleJob>, it, jobs)
529 fprintf(m_Opts.output_stream,
530 "%s\n", it->job_id.c_str());
531 batch_submitter.Reset();
532 remaining_batch_size = m_Opts.batch_size;
533 }
534 batch_submitter.PrepareNextJob();
535
536 CNcbiOstream& job_input_ostream(batch_submitter.GetOStream());
537
538 if (job_input_record.remote_app_args_defined) {
539 CNcbiStrstream remote_app_stdin;
540 remote_app_stdin.write(job_input_record.job_input.data(),
541 job_input_record.job_input.length());
542 PrepareRemoteAppJobInput(max_embedded_input_size,
543 job_input_record.remote_app_args,
544 remote_app_stdin, job_input_ostream);
545 } else
546 job_input_ostream.write(job_input_record.job_input.data(),
547 job_input_record.job_input.length());
548
549 CheckJobInputStream(job_input_ostream);
550
551 if (!job_input_record.affinity.empty())
552 batch_submitter.SetJobAffinity(job_input_record.affinity);
553
554 if (job_input_record.exclusive_job)
555 batch_submitter.SetJobMask(CNetScheduleAPI::eExclusiveJob);
556
557 --remaining_batch_size;
558 }
559 if (remaining_batch_size < m_Opts.batch_size) {
560 batch_submitter.Submit(m_Opts.job_group);
561 const vector<CNetScheduleJob>& jobs =
562 batch_submitter.GetBatch();
563 ITERATE(vector<CNetScheduleJob>, it, jobs)
564 fprintf(m_Opts.output_stream, "%s\n", it->job_id.c_str());
565 batch_submitter.Reset();
566 }
567 }
568 }
569
Cmd_SubmitJob()570 int CGridCommandLineInterfaceApp::Cmd_SubmitJob()
571 {
572 SetUp_NetScheduleCmd(eNetScheduleSubmitter);
573
574 if (IsOptionSet(eBatch)) {
575 if (IsOptionSet(eJobInputDir)) {
576 NCBI_THROW(CArgException, eInvalidArg, "'--" JOB_INPUT_DIR_OPTION
577 "' option is not supported in batch mode");
578 }
579 SubmitJob_Batch();
580 } else if (IsOptionSet(eJobInputDir)) {
581 CNetScheduleJob job;
582
583 job.affinity = m_Opts.affinity;
584 job.group = m_Opts.job_group;
585
586 CCompoundID job_key = m_CompoundIDPool.NewID(eCIC_GenericID);
587 job_key.AppendCurrentTime();
588 job_key.AppendRandom();
589 job.job_id = job_key.ToString();
590
591 {{
592 CStringOrBlobStorageWriter job_input_writer(
593 numeric_limits<size_t>().max(), NULL, job.input);
594 CWStream job_input_ostream(&job_input_writer, 0, NULL);
595
596 x_LoadJobInput(0, job_input_ostream);
597 }}
598
599 CNetScheduleJobSerializer job_serializer(job, m_CompoundIDPool);
600 job_serializer.SaveJobInput(m_Opts.job_input_dir, m_NetCacheAPI);
601
602 PrintLine(job.job_id);
603 } else {
604 CNcbiOstream& job_input_ostream = m_GridClient->GetOStream();
605
606 size_t max_embedded_input_size = m_GridClient->GetMaxServerInputSize();
607
608 x_LoadJobInput(max_embedded_input_size, job_input_ostream);
609
610 m_GridClient->SetJobGroup(m_Opts.job_group);
611 m_GridClient->SetJobAffinity(m_Opts.affinity);
612
613 if (IsOptionSet(eExclusiveJob))
614 m_GridClient->SetJobMask(CNetScheduleAPI::eExclusiveJob);
615
616 if (!IsOptionSet(eWaitTimeout))
617 PrintLine(m_GridClient->Submit());
618 else {
619 m_GridClient->CloseStream();
620
621 CNetScheduleJob& job = m_GridClient->GetJob();
622
623 CDeadline deadline(m_Opts.timeout, 0);
624
625 CNetScheduleNotificationHandler submit_job_handler;
626
627 submit_job_handler.SubmitJob(m_NetScheduleSubmitter,
628 job, m_Opts.timeout);
629
630 PrintLine(job.job_id);
631
632 if (!IsOptionSet(eDumpNSNotifications)) {
633 CNetScheduleAPI::EJobStatus status =
634 submit_job_handler.WaitForJobCompletion
635 (job, deadline, m_NetScheduleAPI);
636
637 PrintLine(CNetScheduleAPI::StatusToString(status));
638
639 if (status == CNetScheduleAPI::eDone) {
640 if (IsOptionSet(eRemoteAppArgs))
641 MarkOptionAsSet(eRemoteAppStdOut);
642 DumpJobInputOutput(job.output);
643 }
644 } else {
645 submit_job_handler.PrintPortNumber();
646
647 string server_host;
648
649 if (submit_job_handler.WaitForNotification(deadline,
650 &server_host))
651 PrintJobStatusNotification(submit_job_handler,
652 job.job_id, server_host);
653 }
654 }
655 }
656
657 return 0;
658 }
659
Cmd_WatchJob()660 int CGridCommandLineInterfaceApp::Cmd_WatchJob()
661 {
662 SetUp_NetScheduleCmd(eNetScheduleSubmitter);
663
664 if (!IsOptionSet(eWaitTimeout)) {
665 fprintf(stderr, GRID_APP_NAME " " WATCHJOB_COMMAND
666 ": option '--" WAIT_TIMEOUT_OPTION "' is required.\n");
667 return 2;
668 }
669
670 if (!IsOptionSet(eWaitForJobStatus) && !IsOptionSet(eWaitForJobEventAfter))
671 m_Opts.job_status_mask = ~((1 << CNetScheduleAPI::ePending) |
672 (1 << CNetScheduleAPI::eRunning));
673
674 CDeadline deadline(m_Opts.timeout, 0);
675
676 CNetScheduleNotificationHandler submit_job_handler;
677
678 CNetScheduleAPI::EJobStatus job_status;
679 int last_event_index = -1;
680
681 tie(job_status, last_event_index, ignore) =
682 submit_job_handler.RequestJobWatching(m_NetScheduleAPI, m_Opts.id, deadline);
683
684 if (job_status == CNetScheduleAPI::eJobNotFound) {
685 fprintf(stderr, GRID_APP_NAME ": unexpected error while "
686 "setting up a job event listener.\n");
687 return 3;
688 }
689
690 if (!IsOptionSet(eDumpNSNotifications)) {
691 if (last_event_index <= m_Opts.last_event_index &&
692 (m_Opts.job_status_mask & (1 << job_status)) == 0)
693 job_status = submit_job_handler.WaitForJobEvent(m_Opts.id,
694 deadline, m_NetScheduleAPI, m_Opts.job_status_mask,
695 m_Opts.last_event_index, &last_event_index);
696
697 printf("%d\n%s\n", last_event_index,
698 CNetScheduleAPI::StatusToString(job_status).c_str());
699 } else {
700 if (last_event_index > m_Opts.last_event_index) {
701 fprintf(stderr, "Job event index (%d) has already "
702 "exceeded %d; won't wait.\n",
703 last_event_index, m_Opts.last_event_index);
704 return 6;
705 }
706 if ((m_Opts.job_status_mask & (1 << job_status)) != 0) {
707 fprintf(stderr, "Job is already '%s'; won't wait.\n",
708 CNetScheduleAPI::StatusToString(job_status).c_str());
709 return 6;
710 }
711
712 submit_job_handler.PrintPortNumber();
713
714 string server_host;
715
716 while (submit_job_handler.WaitForNotification(deadline,
717 &server_host))
718 PrintJobStatusNotification(submit_job_handler,
719 m_Opts.id, server_host);
720 }
721
722 return 0;
723 }
724
s_DumpStdStream(CNcbiIstream & std_stream,FILE * output_stream)725 static bool s_DumpStdStream(CNcbiIstream& std_stream, FILE* output_stream)
726 {
727 char buffer[IO_BUFFER_SIZE];
728 size_t bytes_read;
729
730 std_stream.exceptions((ios::iostate) 0);
731
732 while (!std_stream.eof()) {
733 std_stream.read(buffer, sizeof(buffer));
734 if (std_stream.fail() && !std_stream.eof())
735 return false;
736 bytes_read = (size_t) std_stream.gcount();
737
738 // bytes_read could be zero due to EoF reported after read
739 if (bytes_read &&
740 fwrite(buffer, bytes_read, 1, output_stream) != 1)
741 return false;
742 }
743
744 return true;
745 }
746
DumpJobInputOutput(const string & data_or_blob_id)747 int CGridCommandLineInterfaceApp::DumpJobInputOutput(
748 const string& data_or_blob_id)
749 {
750 const auto kStreamFlags = CRWStreambuf::fOwnReader | CRWStreambuf::fLeakExceptions;
751 unique_ptr<IReader> reader;
752
753 try {
754 reader.reset(new CStringOrBlobStorageReader(data_or_blob_id,
755 m_NetCacheAPI));
756 }
757 catch (CStringOrBlobStorageRWException& e) {
758 // Allow the special case of jobs submitted bypassing the Grid API.
759
760 if (e.GetErrCode() != CStringOrBlobStorageRWException::eInvalidFlag)
761 throw;
762
763 if (IsOptionSet(eRemoteAppStdIn) ||
764 IsOptionSet(eRemoteAppStdOut) ||
765 IsOptionSet(eRemoteAppStdErr))
766 throw;
767
768 if (fwrite(data_or_blob_id.data(), data_or_blob_id.length(), 1,
769 m_Opts.output_stream) != 1)
770 goto Error;
771
772 return 0;
773 }
774
775 if (IsOptionSet(eRemoteAppStdIn)) {
776 CRemoteAppRequest request(m_NetCacheAPI);
777
778 try {
779 CRStream input_stream(reader.release(), 0, 0, kStreamFlags);
780
781 request.Deserialize(input_stream);
782 }
783 catch (exception&) {
784 fprintf(stderr, GRID_APP_NAME
785 ": Cannot deserialize remote_app job input.\n");
786 return 3;
787 }
788
789 if (!s_DumpStdStream(request.GetStdInForRead(), m_Opts.output_stream))
790 goto Error;
791
792 return 0;
793 }
794
795 if (IsOptionSet(eRemoteAppStdOut) || IsOptionSet(eRemoteAppStdErr)) {
796 CRStream input_stream(reader.release(), 0, 0, kStreamFlags);
797
798 CRemoteAppResult remote_app_result(m_NetCacheAPI);
799 remote_app_result.Receive(input_stream);
800
801 CNcbiIstream& std_stream = IsOptionSet(eRemoteAppStdOut) ?
802 remote_app_result.GetStdOut() : remote_app_result.GetStdErr();
803
804 if (!s_DumpStdStream(std_stream, m_Opts.output_stream))
805 goto Error;
806
807 return 0;
808 }
809
810 char buffer[IO_BUFFER_SIZE];
811 size_t bytes_read;
812
813 while (reader->Read(buffer, sizeof(buffer), &bytes_read) != eRW_Eof)
814 if (fwrite(buffer, bytes_read, 1, m_Opts.output_stream) != 1)
815 goto Error;
816
817 return 0;
818
819 Error:
820 fprintf(stderr, GRID_APP_NAME ": I/O error.\n");
821 return 3;
822 }
823
PrintJobAttrsAndDumpInput(const CNetScheduleJob & job)824 int CGridCommandLineInterfaceApp::PrintJobAttrsAndDumpInput(
825 const CNetScheduleJob& job)
826 {
827 PrintLine(job.job_id);
828 if (!job.auth_token.empty())
829 printf("%s ", job.auth_token.c_str());
830 if (!job.affinity.empty()) {
831 string affinity(NStr::PrintableString(job.affinity));
832 printf(job.mask & CNetScheduleAPI::eExclusiveJob ?
833 "affinity=\"%s\" exclusive\n" : "affinity=\"%s\"\n",
834 affinity.c_str());
835 } else
836 printf(job.mask & CNetScheduleAPI::eExclusiveJob ?
837 "exclusive\n" : "\n");
838 return DumpJobInputOutput(job.input);
839 }
840
Cmd_GetJobInput()841 int CGridCommandLineInterfaceApp::Cmd_GetJobInput()
842 {
843 SetUp_NetScheduleCmd(eNetScheduleSubmitter);
844
845 CNetScheduleJob job;
846 job.job_id = m_Opts.id;
847
848 if (m_NetScheduleAPI.GetJobDetails(job) == CNetScheduleAPI::eJobNotFound) {
849 fprintf(stderr, GRID_APP_NAME ": job %s has expired.\n",
850 job.job_id.c_str());
851 return 3;
852 }
853
854 return DumpJobInputOutput(job.input);
855 }
856
Cmd_GetJobOutput()857 int CGridCommandLineInterfaceApp::Cmd_GetJobOutput()
858 {
859 SetUp_NetScheduleCmd(eNetScheduleSubmitter);
860
861 CNetScheduleJob job;
862 job.job_id = m_Opts.id;
863 CNetScheduleAPI::EJobStatus status = m_NetScheduleAPI.GetJobDetails(job);
864
865 switch (status) {
866 case CNetScheduleAPI::eDone:
867 case CNetScheduleAPI::eReading:
868 case CNetScheduleAPI::eConfirmed:
869 case CNetScheduleAPI::eReadFailed:
870 break;
871
872 default:
873 fprintf(stderr, "Warning: job is in %s status.\n",
874 CNetScheduleAPI::StatusToString(status).c_str());
875 }
876
877 return DumpJobInputOutput(job.output);
878 }
879
Cmd_ReadJob()880 int CGridCommandLineInterfaceApp::Cmd_ReadJob()
881 {
882 SetUp_NetScheduleCmd(eNetScheduleSubmitter);
883
884 if (!IsOptionSet(eConfirmRead) && !IsOptionSet(eFailRead) &&
885 !IsOptionSet(eRollbackRead)) {
886 CNetScheduleJob job;
887 CNetScheduleAPI::EJobStatus job_status;
888
889 CNetScheduleJobReader job_reader(m_NetScheduleAPI.GetJobReader(
890 m_Opts.job_group, m_Opts.affinity));
891
892 CNetScheduleJobReader::EReadNextJobResult rnj_result;
893
894 if (!IsOptionSet(eWaitTimeout))
895 rnj_result = job_reader.ReadNextJob(&job, &job_status);
896 else {
897 CTimeout timeout(m_Opts.timeout);
898 rnj_result = job_reader.ReadNextJob(&job, &job_status, &timeout);
899 }
900
901 switch (rnj_result) {
902 case CNetScheduleJobReader::eRNJ_JobReady:
903 PrintLine(job.job_id);
904 PrintLine(CNetScheduleAPI::StatusToString(job_status));
905
906 if (IsOptionSet(eReliableRead))
907 PrintLine(job.auth_token);
908 else {
909 if (job_status == CNetScheduleAPI::eDone) {
910 m_NetScheduleSubmitter.GetJobDetails(job);
911 int ret_code = DumpJobInputOutput(job.output);
912 if (ret_code != 0)
913 return ret_code;
914 }
915 m_NetScheduleSubmitter.ReadConfirm(job.job_id, job.auth_token);
916 }
917 break;
918
919 case CNetScheduleJobReader::eRNJ_Interrupt:
920 return 3;
921
922 case CNetScheduleJobReader::eRNJ_NotReady:
923 if (IsOptionSet(eWaitTimeout)) {
924 PrintLine("TIMEOUT");
925 }
926
927 return 0;
928
929 case CNetScheduleJobReader::eRNJ_NoMoreJobs:
930 PrintLine("NOJOBS");
931 return 0;
932 }
933 } else {
934 if (!IsOptionSet(eJobId)) {
935 fprintf(stderr, GRID_APP_NAME " " READJOB_COMMAND
936 ": option '--" JOB_ID_OPTION "' is required.\n");
937 return 2;
938 }
939
940 if (IsOptionSet(eConfirmRead))
941 m_NetScheduleSubmitter.ReadConfirm(m_Opts.id, m_Opts.auth_token);
942 else if (IsOptionSet(eFailRead))
943 m_NetScheduleSubmitter.ReadFail(m_Opts.id, m_Opts.auth_token,
944 m_Opts.error_message);
945 else
946 m_NetScheduleSubmitter.ReadRollback(m_Opts.id, m_Opts.auth_token);
947 }
948
949 return 0;
950 }
951
Cmd_CancelJob()952 int CGridCommandLineInterfaceApp::Cmd_CancelJob()
953 {
954 if (IsOptionSet(eJobGroup)) {
955 SetUp_NetScheduleCmd(eNetScheduleAPI);
956
957 m_NetScheduleAPI.GetSubmitter().CancelJobGroup(m_Opts.job_group,
958 m_Opts.job_statuses);
959 } else if (IsOptionSet(eAllJobs) || !m_Opts.job_statuses.empty()) {
960 SetUp_NetScheduleCmd(eNetScheduleAdmin, eAdminCmdWithSideEffects);
961
962 m_NetScheduleAdmin.CancelAllJobs(m_Opts.job_statuses);
963 } else {
964 SetUp_NetScheduleCmd(eNetScheduleAPI);
965
966 m_NetScheduleAPI.GetSubmitter().CancelJob(m_Opts.id);
967 }
968
969 return 0;
970 }
971
Cmd_RequestJob()972 int CGridCommandLineInterfaceApp::Cmd_RequestJob()
973 {
974 SetUp_NetScheduleCmd(eNetScheduleExecutor);
975
976 CNetScheduleExecutor::EJobAffinityPreference affinity_preference =
977 CNetScheduleExecutor::eExplicitAffinitiesOnly;
978
979 switch (IsOptionSet(eUsePreferredAffinities, OPTION_N(0)) |
980 IsOptionSet(eClaimNewAffinities, OPTION_N(1)) |
981 IsOptionSet(eAnyAffinity, OPTION_N(2))) {
982 case OPTION_N(2) + OPTION_N(0):
983 affinity_preference = CNetScheduleExecutor::ePreferredAffsOrAnyJob;
984 break;
985
986 case OPTION_N(1):
987 case OPTION_N(1) + OPTION_N(0):
988 affinity_preference = CNetScheduleExecutor::eClaimNewPreferredAffs;
989 break;
990
991 case OPTION_N(0):
992 affinity_preference = CNetScheduleExecutor::ePreferredAffinities;
993 break;
994
995 case 0:
996 if (IsOptionSet(eAffinityList))
997 break;
998 /* FALL THROUGH */
999
1000 case OPTION_N(2):
1001 affinity_preference = CNetScheduleExecutor::eAnyJob;
1002 break;
1003
1004 default:
1005 fprintf(stderr, GRID_APP_NAME ": options '--"
1006 CLAIM_NEW_AFFINITIES_OPTION "' and '--" ANY_AFFINITY_OPTION
1007 "' are mutually exclusive.\n");
1008 return 2;
1009 }
1010
1011 m_NetScheduleExecutor.SetAffinityPreference(affinity_preference);
1012
1013 CNetScheduleJob job;
1014
1015 if (!IsOptionSet(eDumpNSNotifications)) {
1016 if (m_NetScheduleExecutor.GetJob(job, m_Opts.timeout, m_Opts.affinity))
1017 return PrintJobAttrsAndDumpInput(job);
1018 } else {
1019 CDeadline deadline(m_Opts.timeout, 0);
1020
1021 CNetScheduleNotificationHandler wait_job_handler;
1022
1023 wait_job_handler.PrintPortNumber();
1024
1025 string cmd(CNetScheduleNotificationHandler::MkBaseGETCmd(
1026 affinity_preference, m_Opts.affinity));
1027
1028 wait_job_handler.CmdAppendTimeoutGroupAndClientInfo(cmd,
1029 &deadline, m_Opts.job_group);
1030
1031 if (wait_job_handler.RequestJob(m_NetScheduleExecutor, job, cmd)) {
1032 fprintf(stderr, "%s\nA job has been returned; won't wait.\n",
1033 job.job_id.c_str());
1034 return 6;
1035 }
1036
1037 string server_host;
1038 CNetServer server;
1039 string server_address;
1040
1041 while (wait_job_handler.WaitForNotification(deadline,
1042 &server_host)) {
1043 const char* format = "%s \"%s\" from %s [invalid]\n";
1044
1045 if (wait_job_handler.CheckRequestJobNotification(
1046 m_NetScheduleExecutor, &server)) {
1047 server_address = server.GetServerAddress();
1048 format = "%s \"%s\" from %s [valid, server=%s]\n";
1049 }
1050
1051 printf(format, GetFastLocalTime().AsString(
1052 s_NotificationTimestampFormat).c_str(),
1053 wait_job_handler.GetMessage().c_str(),
1054 server_host.c_str(),
1055 server_address.c_str());
1056 }
1057 }
1058
1059 return 0;
1060 }
1061
Cmd_CommitJob()1062 int CGridCommandLineInterfaceApp::Cmd_CommitJob()
1063 {
1064 SetUp_NetScheduleCmd(eNetScheduleExecutor);
1065
1066 CNetScheduleJob job;
1067
1068 job.job_id = m_Opts.id;
1069 job.ret_code = m_Opts.return_code;
1070 job.auth_token = m_Opts.auth_token;
1071
1072 if (IsOptionSet(eJobOutputBlob))
1073 job.output = "K " + m_Opts.job_output_blob;
1074 else {
1075 const auto kMaxOutputSize = m_NetScheduleAPI.GetServerParams().max_output_size;
1076 CStringOrBlobStorageWriter writer(kMaxOutputSize, m_NetCacheAPI, job.output);
1077
1078 if (!IsOptionSet(eJobOutput)) {
1079 char buffer[IO_BUFFER_SIZE];
1080
1081 do {
1082 m_Opts.input_stream->read(buffer, sizeof(buffer));
1083 if (m_Opts.input_stream->fail() &&
1084 !m_Opts.input_stream->eof()) {
1085 NCBI_THROW(CIOException, eRead,
1086 "Error while reading job output data");
1087 }
1088 if (writer.Write(buffer,
1089 (size_t) m_Opts.input_stream->gcount()) != eRW_Success)
1090 goto ErrorExit;
1091 } while (!m_Opts.input_stream->eof());
1092 } else
1093 if (writer.Write(m_Opts.job_output.data(),
1094 m_Opts.job_output.length()) != eRW_Success)
1095 goto ErrorExit;
1096 }
1097
1098 if (!IsOptionSet(eFailJob))
1099 m_NetScheduleExecutor.PutResult(job);
1100 else {
1101 job.error_msg = m_Opts.error_message;
1102 m_NetScheduleExecutor.PutFailure(job);
1103 }
1104
1105 return 0;
1106
1107 ErrorExit:
1108 fprintf(stderr, GRID_APP_NAME ": error while sending job output.\n");
1109 return 3;
1110 }
1111
Cmd_ReturnJob()1112 int CGridCommandLineInterfaceApp::Cmd_ReturnJob()
1113 {
1114 SetUp_NetScheduleCmd(eNetScheduleExecutor);
1115
1116 CNetScheduleJob job;
1117 job.job_id = m_Opts.id;
1118 job.auth_token = m_Opts.auth_token;
1119
1120 m_NetScheduleExecutor.ReturnJob(job);
1121
1122 return 0;
1123 }
1124
Cmd_ClearNode()1125 int CGridCommandLineInterfaceApp::Cmd_ClearNode()
1126 {
1127 SetUp_NetScheduleCmd(eNetScheduleExecutor);
1128
1129 m_NetScheduleExecutor.ClearNode();
1130
1131 return 0;
1132 }
1133
Cmd_UpdateJob()1134 int CGridCommandLineInterfaceApp::Cmd_UpdateJob()
1135 {
1136 SetUp_NetScheduleCmd(eNetScheduleAPI);
1137
1138 if (IsOptionSet(eExtendLifetime) ||
1139 IsOptionSet(eProgressMessage)) {
1140 CNetScheduleExecutor executor(m_NetScheduleAPI.GetExecutor());
1141
1142 CNetScheduleJob job;
1143 job.job_id = m_Opts.id;
1144
1145 if (IsOptionSet(eExtendLifetime))
1146 executor.JobDelayExpiration(job,
1147 (unsigned) m_Opts.extend_lifetime_by);
1148
1149 if (IsOptionSet(eProgressMessage)) {
1150 job.progress_msg = m_Opts.progress_message;
1151 executor.PutProgressMsg(job);
1152 }
1153 }
1154
1155 return 0;
1156 }
1157
Cmd_QueueInfo()1158 int CGridCommandLineInterfaceApp::Cmd_QueueInfo()
1159 {
1160 SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd, false);
1161
1162 if (!IsOptionSet(eQueueClasses)) {
1163 if ((IsOptionSet(eQueueArg) ^ IsOptionSet(eAllQueues)) == 0) {
1164 fprintf(stderr, GRID_APP_NAME " " QUEUEINFO_COMMAND
1165 ": either the '" QUEUE_ARG "' argument or the '--"
1166 ALL_QUEUES_OPTION "' option must be specified.\n");
1167 return 1;
1168 }
1169 if (m_Opts.output_format == eJSON)
1170 g_PrintJSON(stdout, g_QueueInfoToJson(m_NetScheduleAPI,
1171 IsOptionSet(eQueueArg) ? m_Opts.queue : kEmptyStr));
1172 else if (!IsOptionSet(eAllQueues))
1173 m_NetScheduleAdmin.PrintQueueInfo(m_Opts.queue, NcbiCout);
1174 else {
1175 m_NetScheduleAPI.GetService().PrintCmdOutput("STAT QUEUES",
1176 NcbiCout, CNetService::eMultilineOutput);
1177 }
1178 } else if (m_Opts.output_format == eJSON)
1179 g_PrintJSON(stdout, g_QueueClassInfoToJson(m_NetScheduleAPI));
1180 else
1181 m_NetScheduleAPI.GetService().PrintCmdOutput("STAT QCLASSES",
1182 NcbiCout, CNetService::eMultilineOutput);
1183
1184 return 0;
1185 }
1186
Cmd_DumpQueue()1187 int CGridCommandLineInterfaceApp::Cmd_DumpQueue()
1188 {
1189 SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd);
1190
1191 m_NetScheduleAdmin.DumpQueue(NcbiCout, m_Opts.start_after_job,
1192 m_Opts.job_count, m_Opts.job_statuses, m_Opts.job_group);
1193
1194 return 0;
1195 }
1196
Cmd_CreateQueue()1197 int CGridCommandLineInterfaceApp::Cmd_CreateQueue()
1198 {
1199 SetUp_NetScheduleCmd(eNetScheduleAdmin, eAdminCmdWithSideEffects);
1200
1201 m_NetScheduleAdmin.CreateQueue(m_Opts.id,
1202 m_Opts.queue_class, m_Opts.queue_description);
1203
1204 return 0;
1205 }
1206
Cmd_GetQueueList()1207 int CGridCommandLineInterfaceApp::Cmd_GetQueueList()
1208 {
1209 SetUp_NetScheduleCmd(eNetScheduleAdmin, eReadOnlyAdminCmd, false);
1210
1211 CNetScheduleAdmin::TQueueList queues;
1212
1213 m_NetScheduleAdmin.GetQueueList(queues);
1214
1215 typedef set<string> TServerSet;
1216 typedef map<string, TServerSet> TQueueRegister;
1217
1218 TQueueRegister queue_register;
1219
1220 ITERATE (CNetScheduleAdmin::TQueueList, it, queues) {
1221 string server_address = it->server.GetServerAddress();
1222
1223 ITERATE(std::list<std::string>, queue, it->queues) {
1224 queue_register[*queue].insert(server_address);
1225 }
1226 }
1227
1228 ITERATE(TQueueRegister, it, queue_register) {
1229 NcbiCout << it->first;
1230 if (it->second.size() != queues.size()) {
1231 const char* sep = " (limited to ";
1232 ITERATE(TServerSet, server, it->second) {
1233 NcbiCout << sep << *server;
1234 sep = ", ";
1235 }
1236 NcbiCout << ")";
1237 }
1238 NcbiCout << NcbiEndl;
1239 }
1240
1241 return 0;
1242 }
1243
Cmd_DeleteQueue()1244 int CGridCommandLineInterfaceApp::Cmd_DeleteQueue()
1245 {
1246 SetUp_NetScheduleCmd(eNetScheduleAdmin, eAdminCmdWithSideEffects);
1247
1248 m_NetScheduleAdmin.DeleteQueue(m_Opts.id);
1249
1250 return 0;
1251 }
1252
1253 END_NCBI_SCOPE
1254