1 /* $Id: netschedule_api_admin.cpp 590486 2019-07-30 15:47:21Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Anatoliy Kuznetsov, Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 * Implementation of NetSchedule API.
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include "netschedule_api_impl.hpp"
36
37 #include <connect/services/netschedule_api.hpp>
38
39
40 BEGIN_NCBI_SCOPE
41
42 using namespace grid::netschedule;
43
SwitchToDrainMode(ESwitch on_off)44 void CNetScheduleAdmin::SwitchToDrainMode(ESwitch on_off)
45 {
46 string cmd(on_off != eOff ?
47 "REFUSESUBMITS mode=1" : "REFUSESUBMITS mode=0");
48 g_AppendClientIPSessionIDHitID(cmd);
49 m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
50 }
51
ShutdownServer(CNetScheduleAdmin::EShutdownLevel level)52 void CNetScheduleAdmin::ShutdownServer(
53 CNetScheduleAdmin::EShutdownLevel level)
54 {
55 const auto die = level == eDie;
56 string cmd(die ? "SHUTDOWN SUICIDE" :
57 level == eShutdownImmediate ? "SHUTDOWN IMMEDIATE" :
58 level == eDrain ? "SHUTDOWN drain=1" : "SHUTDOWN");
59 g_AppendClientIPSessionIDHitID(cmd);
60
61 const auto retry_mode = die ? SNetServiceImpl::SRetry::eNoRetry : SNetServiceImpl::SRetry::eDefault;
62 auto retry_guard = m_Impl->m_API->m_Service->CreateRetryGuard(retry_mode);
63
64 try {
65 m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
66 }
67 catch (CNetSrvConnException& ex)
68 {
69 if ((ex.GetErrCode() != CNetSrvConnException::eConnClosedByServer) || !die) throw;
70 }
71 }
72
73
ReloadServerConfig()74 void CNetScheduleAdmin::ReloadServerConfig()
75 {
76 string cmd("RECO");
77 g_AppendClientIPSessionIDHitID(cmd);
78 m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
79 }
80
CreateQueue(const string & qname,const string & qclass,const string & description)81 void CNetScheduleAdmin::CreateQueue(const string& qname, const string& qclass,
82 const string& description)
83 {
84 limits::Check<limits::SQueueName>(qname);
85
86 string cmd = "QCRE " + qname;
87 cmd += ' ';
88 cmd += qclass;
89
90 if (!description.empty()) {
91 cmd += " \"";
92 cmd += description;
93 cmd += '"';
94 }
95
96 g_AppendClientIPSessionIDHitID(cmd);
97
98 m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
99 }
100
DeleteQueue(const string & qname)101 void CNetScheduleAdmin::DeleteQueue(const string& qname)
102 {
103 limits::Check<limits::SQueueName>(qname);
104
105 string cmd("QDEL " + qname);
106 g_AppendClientIPSessionIDHitID(cmd);
107 m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
108 }
109
DumpJob(CNcbiOstream & out,const string & job_key)110 void CNetScheduleAdmin::DumpJob(CNcbiOstream& out, const string& job_key)
111 {
112 CNetServerMultilineCmdOutput output(DumpJob(job_key));
113
114 string line;
115
116 while (output.ReadLine(line))
117 out << line << "\n";
118 }
119
DumpJob(const string & job_key)120 CNetServerMultilineCmdOutput CNetScheduleAdmin::DumpJob(const string& job_key)
121 {
122 string cmd("DUMP " + job_key);
123 g_AppendClientIPSessionIDHitID(cmd);
124 return m_Impl->m_API->GetServer(job_key).ExecWithRetry(cmd, true);
125 }
126
CancelAllJobs(const string & job_statuses)127 void CNetScheduleAdmin::CancelAllJobs(const string& job_statuses)
128 {
129 string cmd;
130 if (job_statuses.empty()) {
131 cmd.assign("CANCELQ");
132 } else {
133 cmd.assign("CANCEL status=");
134 cmd.append(job_statuses);
135 }
136 g_AppendClientIPSessionIDHitID(cmd);
137 m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
138 }
139
140
PrintServerVersion(CNcbiOstream & output_stream)141 void CNetScheduleAdmin::PrintServerVersion(CNcbiOstream& output_stream)
142 {
143 string cmd("VERSION");
144 g_AppendClientIPSessionIDHitID(cmd);
145 m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
146 output_stream, CNetService::eSingleLineOutput);
147 }
148
149
DumpQueue(CNcbiOstream & output_stream,const string & start_after_job,size_t job_count,const string & job_statuses,const string & job_group)150 void CNetScheduleAdmin::DumpQueue(
151 CNcbiOstream& output_stream,
152 const string& start_after_job,
153 size_t job_count,
154 const string& job_statuses,
155 const string& job_group)
156 {
157 string cmd("DUMP");
158 if (!job_statuses.empty()) {
159 cmd.append(" status=");
160 cmd.append(job_statuses);
161 }
162 if (!start_after_job.empty()) {
163 cmd.append(" start_after=");
164 cmd.append(start_after_job);
165 }
166 if (job_count > 0) {
167 cmd.append(" count=");
168 cmd.append(NStr::NumericToString(job_count));
169 }
170 if (!job_group.empty()) {
171 limits::Check<limits::SJobGroup>(job_group);
172 cmd.append(" group=");
173 cmd.append(job_group);
174 }
175 g_AppendClientIPSessionIDHitID(cmd);
176 m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
177 output_stream, CNetService::eMultilineOutput);
178 }
179
DumpQueue(CNcbiOstream & output_stream,const string & start_after_job,size_t job_count,CNetScheduleAPI::EJobStatus status,const string & job_group)180 void CNetScheduleAdmin::DumpQueue(
181 CNcbiOstream& output_stream,
182 const string& start_after_job,
183 size_t job_count,
184 CNetScheduleAPI::EJobStatus status,
185 const string& job_group)
186 {
187 string job_statuses(CNetScheduleAPI::StatusToString(status));
188
189 // Must be a rare case
190 if (status == CNetScheduleAPI::eJobNotFound) {
191 job_statuses.clear();
192 }
193
194 DumpQueue(output_stream, start_after_job, job_count, job_statuses, job_group);
195 }
196
197
s_MkQINFCmd(const string & queue_name)198 static string s_MkQINFCmd(const string& queue_name)
199 {
200 string qinf_cmd("QINF2 " + queue_name);
201 g_AppendClientIPSessionIDHitID(qinf_cmd);
202 return qinf_cmd;
203 }
204
s_ParseQueueInfo(const string & server_output,CNetScheduleAdmin::TQueueInfo & queue_info)205 static void s_ParseQueueInfo(const string& server_output,
206 CNetScheduleAdmin::TQueueInfo& queue_info)
207 {
208 CUrlArgs url_parser(server_output);
209
210 ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
211 queue_info[field->name] = field->value;
212 }
213 }
214
GetQueueInfo(CNetServer server,const string & queue_name,CNetScheduleAdmin::TQueueInfo & queue_info)215 void CNetScheduleAdmin::GetQueueInfo(CNetServer server,
216 const string& queue_name, CNetScheduleAdmin::TQueueInfo& queue_info)
217 {
218 CNetServer::SExecResult exec_result;
219
220 server->ConnectAndExec(s_MkQINFCmd(queue_name), false, exec_result);
221
222 s_ParseQueueInfo(exec_result.response, queue_info);
223 }
224
GetQueueInfo(const string & queue_name,CNetScheduleAdmin::TQueueInfo & queue_info)225 void CNetScheduleAdmin::GetQueueInfo(const string& queue_name,
226 CNetScheduleAdmin::TQueueInfo& queue_info)
227 {
228 GetQueueInfo(m_Impl->m_API->m_Service.Iterate().GetServer(),
229 queue_name, queue_info);
230 }
231
GetQueueInfo(CNetServer server,CNetScheduleAdmin::TQueueInfo & queue_info)232 void CNetScheduleAdmin::GetQueueInfo(CNetServer server,
233 CNetScheduleAdmin::TQueueInfo& queue_info)
234 {
235 GetQueueInfo(server, m_Impl->m_API->m_Queue, queue_info);
236 }
237
GetQueueInfo(CNetScheduleAdmin::TQueueInfo & queue_info)238 void CNetScheduleAdmin::GetQueueInfo(CNetScheduleAdmin::TQueueInfo& queue_info)
239 {
240 GetQueueInfo(m_Impl->m_API->m_Queue, queue_info);
241 }
242
PrintQueueInfo(const string & queue_name,CNcbiOstream & output_stream)243 void CNetScheduleAdmin::PrintQueueInfo(const string& queue_name,
244 CNcbiOstream& output_stream)
245 {
246 bool print_headers = m_Impl->m_API->m_Service.IsLoadBalanced();
247
248 for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
249 CNetService::eIncludePenalized); it; ++it) {
250 if (print_headers)
251 output_stream << '[' << (*it).GetServerAddress() << ']' << NcbiEndl;
252
253 TQueueInfo queue_info;
254
255 GetQueueInfo(*it, queue_name, queue_info);
256
257 ITERATE(TQueueInfo, qi, queue_info) {
258 output_stream << qi->first << ": " << qi->second << NcbiEndl;
259 }
260
261 if (print_headers)
262 output_stream << NcbiEndl;
263 }
264 }
265
266 void g_GetWorkerNodes(CNetScheduleAPI api, list<CNetScheduleAdmin::SWorkerNodeInfo>& worker_nodes);
267
GetWorkerNodes(list<SWorkerNodeInfo> & worker_nodes)268 void CNetScheduleAdmin::GetWorkerNodes(
269 list<SWorkerNodeInfo>& worker_nodes)
270 {
271 g_GetWorkerNodes(m_Impl->m_API, worker_nodes);
272 }
273
PrintConf(CNcbiOstream & output_stream)274 void CNetScheduleAdmin::PrintConf(CNcbiOstream& output_stream)
275 {
276 string cmd("GETCONF");
277 g_AppendClientIPSessionIDHitID(cmd);
278 m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
279 output_stream, CNetService::eMultilineOutput);
280 }
281
PrintServerStatistics(CNcbiOstream & output_stream,EStatisticsOptions opt)282 void CNetScheduleAdmin::PrintServerStatistics(CNcbiOstream& output_stream,
283 EStatisticsOptions opt)
284 {
285 string cmd(opt == eStatisticsBrief ? "STAT" :
286 opt == eStatisticsClients ? "STAT CLIENTS" : "STAT ALL");
287 g_AppendClientIPSessionIDHitID(cmd);
288 m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
289 output_stream, CNetService::eMultilineOutput_NetCacheStyle);
290 }
291
PrintHealth(CNcbiOstream & output_stream)292 void CNetScheduleAdmin::PrintHealth(CNcbiOstream& output_stream)
293 {
294 string cmd("HEALTH");
295 g_AppendClientIPSessionIDHitID(cmd);
296 m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
297 output_stream, CNetService::eUrlEncodedOutput);
298 }
299
GetQueueList(TQueueList & qlist)300 void CNetScheduleAdmin::GetQueueList(TQueueList& qlist)
301 {
302 string cmd("STAT QUEUES");
303 g_AppendClientIPSessionIDHitID(cmd);
304
305 string output_line;
306
307 for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
308 CNetService::eIncludePenalized); it; ++it) {
309 CNetServer server = *it;
310
311 qlist.push_back(SServerQueueList(server));
312
313 CNetServerMultilineCmdOutput cmd_output((*it).ExecWithRetry(cmd, true));
314 while (cmd_output.ReadLine(output_line))
315 if (NStr::StartsWith(output_line, "[queue ") &&
316 output_line.length() > sizeof("[queue "))
317 qlist.back().queues.push_back(output_line.substr(
318 sizeof("[queue ") - 1,
319 output_line.length() - sizeof("[queue ")));
320 }
321 }
322
StatusSnapshot(CNetScheduleAdmin::TStatusMap & status_map,const string & affinity_token,const string & job_group)323 void CNetScheduleAdmin::StatusSnapshot(
324 CNetScheduleAdmin::TStatusMap& status_map,
325 const string& affinity_token,
326 const string& job_group)
327 {
328 string cmd = "STAT JOBS";
329
330 if (!affinity_token.empty()) {
331 limits::Check<limits::SAffinity>(affinity_token);
332 cmd.append(" aff=");
333 cmd.append(affinity_token);
334 }
335
336 if (!job_group.empty()) {
337 limits::Check<limits::SJobGroup>(job_group);
338 cmd.append(" group=");
339 cmd.append(job_group);
340 }
341
342 g_AppendClientIPSessionIDHitID(cmd);
343
344 string output_line;
345 CTempString st_str, cnt_str;
346
347 try {
348 for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
349 CNetService::eIncludePenalized); it; ++it) {
350 CNetServerMultilineCmdOutput cmd_output(
351 (*it).ExecWithRetry(cmd, true));
352
353 while (cmd_output.ReadLine(output_line))
354 if (NStr::SplitInTwo(output_line, ":", st_str, cnt_str))
355 status_map[st_str] +=
356 NStr::StringToUInt(NStr::TruncateSpaces_Unsafe
357 (cnt_str, NStr::eTrunc_Begin));
358 }
359 }
360 catch (CStringException& ex)
361 {
362 NCBI_RETHROW(ex, CNetScheduleException, eProtocolSyntaxError,
363 "Error while parsing STAT JOBS response");
364 }
365 }
366
367 END_NCBI_SCOPE
368