1 /*  $Id: netschedule_api_admin.cpp 590486 2019-07-30 15:47:21Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author:  Anatoliy Kuznetsov, Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *   Implementation of NetSchedule API.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "netschedule_api_impl.hpp"
36 
37 #include <connect/services/netschedule_api.hpp>
38 
39 
40 BEGIN_NCBI_SCOPE
41 
42 using namespace grid::netschedule;
43 
SwitchToDrainMode(ESwitch on_off)44 void CNetScheduleAdmin::SwitchToDrainMode(ESwitch on_off)
45 {
46     string cmd(on_off != eOff ?
47             "REFUSESUBMITS mode=1" : "REFUSESUBMITS mode=0");
48     g_AppendClientIPSessionIDHitID(cmd);
49     m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
50 }
51 
ShutdownServer(CNetScheduleAdmin::EShutdownLevel level)52 void CNetScheduleAdmin::ShutdownServer(
53     CNetScheduleAdmin::EShutdownLevel level)
54 {
55     const auto die = level == eDie;
56     string cmd(die ? "SHUTDOWN SUICIDE" :
57             level == eShutdownImmediate ? "SHUTDOWN IMMEDIATE" :
58                     level == eDrain ? "SHUTDOWN drain=1" : "SHUTDOWN");
59     g_AppendClientIPSessionIDHitID(cmd);
60 
61     const auto retry_mode = die ? SNetServiceImpl::SRetry::eNoRetry : SNetServiceImpl::SRetry::eDefault;
62     auto retry_guard = m_Impl->m_API->m_Service->CreateRetryGuard(retry_mode);
63 
64     try {
65         m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
66     }
67     catch (CNetSrvConnException& ex)
68     {
69         if ((ex.GetErrCode() != CNetSrvConnException::eConnClosedByServer) || !die) throw;
70     }
71 }
72 
73 
ReloadServerConfig()74 void CNetScheduleAdmin::ReloadServerConfig()
75 {
76     string cmd("RECO");
77     g_AppendClientIPSessionIDHitID(cmd);
78     m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
79 }
80 
CreateQueue(const string & qname,const string & qclass,const string & description)81 void CNetScheduleAdmin::CreateQueue(const string& qname, const string& qclass,
82                                     const string& description)
83 {
84     limits::Check<limits::SQueueName>(qname);
85 
86     string cmd = "QCRE " + qname;
87     cmd += ' ';
88     cmd += qclass;
89 
90     if (!description.empty()) {
91         cmd += " \"";
92         cmd += description;
93         cmd += '"';
94     }
95 
96     g_AppendClientIPSessionIDHitID(cmd);
97 
98     m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
99 }
100 
DeleteQueue(const string & qname)101 void CNetScheduleAdmin::DeleteQueue(const string& qname)
102 {
103     limits::Check<limits::SQueueName>(qname);
104 
105     string cmd("QDEL " + qname);
106     g_AppendClientIPSessionIDHitID(cmd);
107     m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
108 }
109 
DumpJob(CNcbiOstream & out,const string & job_key)110 void CNetScheduleAdmin::DumpJob(CNcbiOstream& out, const string& job_key)
111 {
112     CNetServerMultilineCmdOutput output(DumpJob(job_key));
113 
114     string line;
115 
116     while (output.ReadLine(line))
117         out << line << "\n";
118 }
119 
DumpJob(const string & job_key)120 CNetServerMultilineCmdOutput CNetScheduleAdmin::DumpJob(const string& job_key)
121 {
122     string cmd("DUMP " + job_key);
123     g_AppendClientIPSessionIDHitID(cmd);
124     return m_Impl->m_API->GetServer(job_key).ExecWithRetry(cmd, true);
125 }
126 
CancelAllJobs(const string & job_statuses)127 void CNetScheduleAdmin::CancelAllJobs(const string& job_statuses)
128 {
129     string cmd;
130     if (job_statuses.empty()) {
131         cmd.assign("CANCELQ");
132     } else {
133         cmd.assign("CANCEL status=");
134         cmd.append(job_statuses);
135     }
136     g_AppendClientIPSessionIDHitID(cmd);
137     m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
138 }
139 
140 
PrintServerVersion(CNcbiOstream & output_stream)141 void CNetScheduleAdmin::PrintServerVersion(CNcbiOstream& output_stream)
142 {
143     string cmd("VERSION");
144     g_AppendClientIPSessionIDHitID(cmd);
145     m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
146         output_stream, CNetService::eSingleLineOutput);
147 }
148 
149 
DumpQueue(CNcbiOstream & output_stream,const string & start_after_job,size_t job_count,const string & job_statuses,const string & job_group)150 void CNetScheduleAdmin::DumpQueue(
151         CNcbiOstream& output_stream,
152         const string& start_after_job,
153         size_t job_count,
154         const string& job_statuses,
155         const string& job_group)
156 {
157     string cmd("DUMP");
158     if (!job_statuses.empty()) {
159         cmd.append(" status=");
160         cmd.append(job_statuses);
161     }
162     if (!start_after_job.empty()) {
163         cmd.append(" start_after=");
164         cmd.append(start_after_job);
165     }
166     if (job_count > 0) {
167         cmd.append(" count=");
168         cmd.append(NStr::NumericToString(job_count));
169     }
170     if (!job_group.empty()) {
171         limits::Check<limits::SJobGroup>(job_group);
172         cmd.append(" group=");
173         cmd.append(job_group);
174     }
175     g_AppendClientIPSessionIDHitID(cmd);
176     m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
177         output_stream, CNetService::eMultilineOutput);
178 }
179 
DumpQueue(CNcbiOstream & output_stream,const string & start_after_job,size_t job_count,CNetScheduleAPI::EJobStatus status,const string & job_group)180 void CNetScheduleAdmin::DumpQueue(
181         CNcbiOstream& output_stream,
182         const string& start_after_job,
183         size_t job_count,
184         CNetScheduleAPI::EJobStatus status,
185         const string& job_group)
186 {
187     string job_statuses(CNetScheduleAPI::StatusToString(status));
188 
189     // Must be a rare case
190     if (status == CNetScheduleAPI::eJobNotFound) {
191         job_statuses.clear();
192     }
193 
194     DumpQueue(output_stream, start_after_job, job_count, job_statuses, job_group);
195 }
196 
197 
s_MkQINFCmd(const string & queue_name)198 static string s_MkQINFCmd(const string& queue_name)
199 {
200     string qinf_cmd("QINF2 " + queue_name);
201     g_AppendClientIPSessionIDHitID(qinf_cmd);
202     return qinf_cmd;
203 }
204 
s_ParseQueueInfo(const string & server_output,CNetScheduleAdmin::TQueueInfo & queue_info)205 static void s_ParseQueueInfo(const string& server_output,
206         CNetScheduleAdmin::TQueueInfo& queue_info)
207 {
208     CUrlArgs url_parser(server_output);
209 
210     ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
211         queue_info[field->name] = field->value;
212     }
213 }
214 
GetQueueInfo(CNetServer server,const string & queue_name,CNetScheduleAdmin::TQueueInfo & queue_info)215 void CNetScheduleAdmin::GetQueueInfo(CNetServer server,
216         const string& queue_name, CNetScheduleAdmin::TQueueInfo& queue_info)
217 {
218     CNetServer::SExecResult exec_result;
219 
220     server->ConnectAndExec(s_MkQINFCmd(queue_name), false, exec_result);
221 
222     s_ParseQueueInfo(exec_result.response, queue_info);
223 }
224 
GetQueueInfo(const string & queue_name,CNetScheduleAdmin::TQueueInfo & queue_info)225 void CNetScheduleAdmin::GetQueueInfo(const string& queue_name,
226         CNetScheduleAdmin::TQueueInfo& queue_info)
227 {
228     GetQueueInfo(m_Impl->m_API->m_Service.Iterate().GetServer(),
229             queue_name, queue_info);
230 }
231 
GetQueueInfo(CNetServer server,CNetScheduleAdmin::TQueueInfo & queue_info)232 void CNetScheduleAdmin::GetQueueInfo(CNetServer server,
233         CNetScheduleAdmin::TQueueInfo& queue_info)
234 {
235     GetQueueInfo(server, m_Impl->m_API->m_Queue, queue_info);
236 }
237 
GetQueueInfo(CNetScheduleAdmin::TQueueInfo & queue_info)238 void CNetScheduleAdmin::GetQueueInfo(CNetScheduleAdmin::TQueueInfo& queue_info)
239 {
240     GetQueueInfo(m_Impl->m_API->m_Queue, queue_info);
241 }
242 
PrintQueueInfo(const string & queue_name,CNcbiOstream & output_stream)243 void CNetScheduleAdmin::PrintQueueInfo(const string& queue_name,
244         CNcbiOstream& output_stream)
245 {
246     bool print_headers = m_Impl->m_API->m_Service.IsLoadBalanced();
247 
248     for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
249             CNetService::eIncludePenalized); it; ++it) {
250         if (print_headers)
251             output_stream << '[' << (*it).GetServerAddress() << ']' << NcbiEndl;
252 
253         TQueueInfo queue_info;
254 
255         GetQueueInfo(*it, queue_name, queue_info);
256 
257         ITERATE(TQueueInfo, qi, queue_info) {
258             output_stream << qi->first << ": " << qi->second << NcbiEndl;
259         }
260 
261         if (print_headers)
262             output_stream << NcbiEndl;
263     }
264 }
265 
266 void g_GetWorkerNodes(CNetScheduleAPI api, list<CNetScheduleAdmin::SWorkerNodeInfo>& worker_nodes);
267 
GetWorkerNodes(list<SWorkerNodeInfo> & worker_nodes)268 void CNetScheduleAdmin::GetWorkerNodes(
269     list<SWorkerNodeInfo>& worker_nodes)
270 {
271     g_GetWorkerNodes(m_Impl->m_API, worker_nodes);
272 }
273 
PrintConf(CNcbiOstream & output_stream)274 void CNetScheduleAdmin::PrintConf(CNcbiOstream& output_stream)
275 {
276     string cmd("GETCONF");
277     g_AppendClientIPSessionIDHitID(cmd);
278     m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
279         output_stream, CNetService::eMultilineOutput);
280 }
281 
PrintServerStatistics(CNcbiOstream & output_stream,EStatisticsOptions opt)282 void CNetScheduleAdmin::PrintServerStatistics(CNcbiOstream& output_stream,
283     EStatisticsOptions opt)
284 {
285     string cmd(opt == eStatisticsBrief ? "STAT" :
286             opt == eStatisticsClients ? "STAT CLIENTS" : "STAT ALL");
287     g_AppendClientIPSessionIDHitID(cmd);
288     m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
289         output_stream, CNetService::eMultilineOutput_NetCacheStyle);
290 }
291 
PrintHealth(CNcbiOstream & output_stream)292 void CNetScheduleAdmin::PrintHealth(CNcbiOstream& output_stream)
293 {
294     string cmd("HEALTH");
295     g_AppendClientIPSessionIDHitID(cmd);
296     m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
297             output_stream, CNetService::eUrlEncodedOutput);
298 }
299 
GetQueueList(TQueueList & qlist)300 void CNetScheduleAdmin::GetQueueList(TQueueList& qlist)
301 {
302     string cmd("STAT QUEUES");
303     g_AppendClientIPSessionIDHitID(cmd);
304 
305     string output_line;
306 
307     for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
308             CNetService::eIncludePenalized); it; ++it) {
309         CNetServer server = *it;
310 
311         qlist.push_back(SServerQueueList(server));
312 
313         CNetServerMultilineCmdOutput cmd_output((*it).ExecWithRetry(cmd, true));
314         while (cmd_output.ReadLine(output_line))
315             if (NStr::StartsWith(output_line, "[queue ") &&
316                     output_line.length() > sizeof("[queue "))
317                 qlist.back().queues.push_back(output_line.substr(
318                         sizeof("[queue ") - 1,
319                         output_line.length() - sizeof("[queue ")));
320     }
321 }
322 
StatusSnapshot(CNetScheduleAdmin::TStatusMap & status_map,const string & affinity_token,const string & job_group)323 void CNetScheduleAdmin::StatusSnapshot(
324         CNetScheduleAdmin::TStatusMap& status_map,
325         const string& affinity_token,
326         const string& job_group)
327 {
328     string cmd = "STAT JOBS";
329 
330     if (!affinity_token.empty()) {
331         limits::Check<limits::SAffinity>(affinity_token);
332         cmd.append(" aff=");
333         cmd.append(affinity_token);
334     }
335 
336     if (!job_group.empty()) {
337         limits::Check<limits::SJobGroup>(job_group);
338         cmd.append(" group=");
339         cmd.append(job_group);
340     }
341 
342     g_AppendClientIPSessionIDHitID(cmd);
343 
344     string output_line;
345     CTempString st_str, cnt_str;
346 
347     try {
348         for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
349                 CNetService::eIncludePenalized); it; ++it) {
350             CNetServerMultilineCmdOutput cmd_output(
351                     (*it).ExecWithRetry(cmd, true));
352 
353             while (cmd_output.ReadLine(output_line))
354                 if (NStr::SplitInTwo(output_line, ":", st_str, cnt_str))
355                     status_map[st_str] +=
356                         NStr::StringToUInt(NStr::TruncateSpaces_Unsafe
357                                            (cnt_str, NStr::eTrunc_Begin));
358         }
359     }
360     catch (CStringException& ex)
361     {
362         NCBI_RETHROW(ex, CNetScheduleException, eProtocolSyntaxError,
363                 "Error while parsing STAT JOBS response");
364     }
365 }
366 
367 END_NCBI_SCOPE
368