1 /*  $Id: grid_control_thread.cpp 593248 2019-09-16 12:18:31Z grichenk $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *    NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "netschedule_api_impl.hpp"
35 #include "grid_worker_impl.hpp"
36 #include "grid_control_thread.hpp"
37 
38 #include <connect/services/grid_globals.hpp>
39 #include <connect/services/grid_worker.hpp>
40 
41 #include <corelib/ncbistre.hpp>
42 #include <corelib/ncbi_process.hpp>
43 
44 #include <math.h>
45 
46 
47 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
48 
49 BEGIN_NCBI_SCOPE
50 
51 /////////////////////////////////////////////////////////////////////////////
52 //
53 ///@internal
54 
55 class CGetVersionProcessor : public CWorkerNodeControlServer::IRequestProcessor
56 {
57 public:
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)58     virtual void Process(const string& /*request*/,
59                          CNcbiOstream& os,
60                          CWorkerNodeControlServer* control_server)
61     {
62         CGridWorkerNode node(control_server->GetWorkerNode());
63         const CGridWorkerNode::TVersion version(node.GetAppVersion());
64 
65         os << "OK:version=" << NStr::URLEncode(version.first) <<
66                 "&build_date=" << NStr::URLEncode(version.second.date) <<
67                 "&build_tag=" << NStr::URLEncode(version.second.tag) << "\n";
68     }
69 };
70 
71 class CAdminCmdProcessor : public CWorkerNodeControlServer::IRequestProcessor
72 {
73 public:
Authenticate(const string & host,const string &,const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)74     virtual bool Authenticate(const string& host,
75                               const string& /*auth*/,
76                               const string& /*queue*/,
77                               CNcbiOstream& os,
78                               CWorkerNodeControlServer* control_server)
79     {
80         m_Host = host;
81         size_t pos = m_Host.find_first_of(':');
82         if (pos != string::npos) {
83             m_Host = m_Host.substr(0, pos);
84         }
85         if (control_server->GetWorkerNode().IsHostInAdminHostsList(m_Host)) {
86             return true;
87         }
88         os << "ERR:Shutdown access denied.\n";
89         LOG_POST_X(10, Warning << "Shutdown access denied for host " << m_Host);
90         return false;
91     }
92 
93 protected:
94     string m_Host;
95 };
96 
97 class CShutdownProcessor : public CAdminCmdProcessor
98 {
99 public:
Process(const string & request,CNcbiOstream & os,CWorkerNodeControlServer *)100     virtual void Process(const string& request,
101                          CNcbiOstream& os,
102                          CWorkerNodeControlServer* /*control_server*/)
103     {
104         if (request.find("SUICIDE") != NPOS) {
105             LOG_POST_X(11, Warning <<
106                 "Shutdown request has been received from host: " << m_Host);
107             LOG_POST_X(12, Warning << "Server is shutting down");
108             CGridGlobals::GetInstance().KillNode();
109         } else {
110             CNetScheduleAdmin::EShutdownLevel level =
111                 CNetScheduleAdmin::eNormalShutdown;
112             if (request.find("IMMEDIATE") != NPOS)
113                 level = CNetScheduleAdmin::eShutdownImmediate;
114             os << "OK:\n";
115             CGridGlobals::GetInstance().RequestShutdown(level);
116             LOG_POST_X(13, "Shutdown request has been received from host " <<
117                 m_Host);
118         }
119     }
120 };
121 
122 class CSuspendProcessor : public CAdminCmdProcessor
123 {
124 public:
Process(const string & request,CNcbiOstream & os,CWorkerNodeControlServer * control_server)125     virtual void Process(const string& request, CNcbiOstream& os,
126         CWorkerNodeControlServer* control_server)
127     {
128         bool pullback = NStr::Find(request.c_str(), "pullback") != NPOS;
129 
130         unsigned timeout = 0;
131         SIZE_TYPE timeout_str = NStr::Find(request.c_str(), "timeout=");
132         if (timeout_str != NPOS)
133             timeout = NStr::StringToUInt(request.c_str() +
134                     timeout_str + sizeof("timeout=") - 1,
135                     NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols);
136 
137         LOG_POST("Received SUSPEND request from " << m_Host <<
138                 " (pullback=" << (pullback ? "ON" : "OFF") <<
139                 ", timeout=" << timeout << ')');
140 
141         control_server->GetWorkerNode().Suspend(pullback, timeout);
142 
143         os << "OK:\n";
144     }
145 };
146 
147 class CResumeProcessor : public CAdminCmdProcessor
148 {
149 public:
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)150     virtual void Process(const string& /*request*/, CNcbiOstream& os,
151         CWorkerNodeControlServer* control_server)
152     {
153         control_server->GetWorkerNode().Resume();
154         LOG_POST("Received RESUME request from " << m_Host);
155         os << "OK:\n";
156     }
157 };
158 
159 class CGetStatisticsProcessor :
160         public CWorkerNodeControlServer::IRequestProcessor
161 {
162 public:
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)163     virtual void Process(const string& /*request*/,
164                          CNcbiOstream& os,
165                          CWorkerNodeControlServer* control_server)
166     {
167         CGridWorkerNode node(control_server->GetWorkerNode());
168         const CGridWorkerNode::TVersion version(node.GetAppVersion());
169 
170         os << "OK:Application: " << node.GetAppName() <<
171                 "\nVersion: " << version.first <<
172                 "\nBuild date: " << version.second.date <<
173                 "\nBuild tag: " << version.second.tag << "\n";
174 
175         for (const auto& p : version.second.m_extra) {
176             os << SBuildInfo::ExtraName(p.first) << ": " << p.second << '\n';
177         }
178 
179         {{
180             CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
181             if (app)
182                 os << "Executable path: " << app->GetProgramExecutablePath()
183                         << "\nPID: " << CCurrentProcess::GetPid() << "\n";
184         }}
185 
186         CNetScheduleAPI ns_api(node.GetNetScheduleAPI());
187 
188         os << "Host name: " << CSocketAPI::gethostname() <<
189                 "\nControl port: " << control_server->GetControlPort() <<
190                 "\nUser name: " << GetDiagContext().GetUsername() <<
191                 "\nNetCache client name: " << node.GetNetCacheAPI().
192                         GetService()->GetClientName() <<
193                 "\nNetSchedule client name: " << node->GetClientName() <<
194                 "\nQueue name: " << node->GetQueueName() <<
195                 "\nNode ID: " << ns_api->m_ClientNode <<
196                 "\nNode session: " << ns_api->m_ClientSession <<
197                 "\nMaximum job threads: " << node.GetMaxThreads() << "\n";
198 
199         if (node.IsSuspended())
200             os << "The node is suspended\n";
201 
202         if (CGridGlobals::GetInstance().IsShuttingDown())
203             os << "The node is shutting down\n";
204 
205         if (node->IsExclusiveMode())
206             os << "The node is processing an exclusive job\n";
207 
208         CGridGlobals::GetInstance().GetJobWatcher().Print(os);
209 
210         os << "NetSchedule service: " <<
211                 ns_api.GetService().GetServiceName() << "\n";
212 
213         os << "NetSchedule servers:";
214         try {
215             for (CNetServiceIterator it = ns_api.GetService().
216                     Iterate(CNetService::eIncludePenalized); it; ++it)
217                 os << ' ' << (*it).GetServerAddress();
218             os << "\n";
219         }
220         catch (CNetSrvConnException&) {
221             os << " N/A\n";
222         }
223 
224         os << "Preferred affinities:";
225         CNetScheduleExecutor ns_executor(node.GetNSExecutor());
226         CFastMutexGuard guard(ns_executor->m_PreferredAffMutex);
227         ITERATE(set<string>, aff, ns_executor->m_PreferredAffinities) {
228             os << ' ' << *aff;
229         }
230         os << "\n";
231 
232         auto registry = node->m_SynRegistry;
233         _ASSERT(registry);
234 
235         registry->Alerts(os);
236 
237         os << "OK:END\n";
238     }
239 };
240 
241 class CGetLoadProcessor : public CWorkerNodeControlServer::IRequestProcessor
242 {
243 public:
Authenticate(const string &,const string & auth,const string & queue,CNcbiOstream & os,CWorkerNodeControlServer * control_server)244     virtual bool Authenticate(const string& /*host*/,
245                               const string& auth,
246                               const string& queue,
247                               CNcbiOstream& os,
248                               CWorkerNodeControlServer* control_server)
249     {
250         CGridWorkerNode node(control_server->GetWorkerNode());
251 
252         if (NStr::FindCase(auth, node->GetClientName()) == NPOS) {
253             os <<"ERR:Wrong client name. Required: " <<
254                     node->GetClientName() << "\n";
255             return false;
256         }
257 
258         CTempString qname, connection_info;
259         NStr::SplitInTwo(queue, ";", qname, connection_info);
260         if (qname != node->GetQueueName()) {
261             os << "ERR:Wrong queue name. Required: " <<
262                     node->GetQueueName() << "\n";
263             return false;
264         }
265 
266         return true;
267     }
268 
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)269     virtual void Process(const string& /*request*/,
270                          CNcbiOstream& os,
271                          CWorkerNodeControlServer* control_server)
272     {
273         int load = control_server->GetWorkerNode().GetMaxThreads() -
274             CGridGlobals::GetInstance().GetJobWatcher().GetJobsRunningNumber();
275         os << "OK:" << load << "\n";
276     }
277 };
278 
279 class CGetConfProcessor : public CWorkerNodeControlServer::IRequestProcessor
280 {
281 public:
282     void Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server) override;
283 };
284 
Process(const string &,CNcbiOstream & reply,CWorkerNodeControlServer * control_server)285 void CGetConfProcessor::Process(const string&, CNcbiOstream& reply, CWorkerNodeControlServer* control_server)
286 {
287     _ASSERT(control_server);
288 
289     auto node = static_cast<SGridWorkerNodeImpl*>(control_server->GetWorkerNode());
290     _ASSERT(node);
291 
292     auto registry = node->m_SynRegistry;
293     _ASSERT(registry);
294 
295     registry->Report(reply);
296     reply << "OK:END\n";
297 }
298 
299 class CAckAlertProcessor : public CWorkerNodeControlServer::IRequestProcessor
300 {
301 public:
302     void Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server) override;
303 };
304 
Process(const string & request,CNcbiOstream & reply,CWorkerNodeControlServer * control_server)305 void CAckAlertProcessor::Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server)
306 {
307     _ASSERT(control_server);
308 
309     auto node = static_cast<SGridWorkerNodeImpl*>(control_server->GetWorkerNode());
310     _ASSERT(node);
311 
312     auto registry = node->m_SynRegistry;
313     _ASSERT(registry);
314 
315     const string kAlertIDPrefix = " alert_";
316     auto pos = NStr::Find(request, kAlertIDPrefix, NStr::eNocase);
317 
318     if (pos == NPOS) {
319         reply << "ERR:Alert ID is required\n";
320         return;
321     }
322 
323     const auto kFlags = NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols;
324     auto id = NStr::StringToUInt(request.c_str() + pos + kAlertIDPrefix.size(), kFlags);
325 
326     if (!registry->AckAlert(id)) {
327         reply << "ERR:Failed to find an alert with such ID (" << id << ")\n";
328     } else {
329         reply << "OK:\n";
330     }
331 }
332 
333 class CUnknownProcessor : public CWorkerNodeControlServer::IRequestProcessor
334 {
335 public:
Process(const string & request,CNcbiOstream & os,CWorkerNodeControlServer *)336     virtual void Process(const string& request,
337                          CNcbiOstream& os,
338                          CWorkerNodeControlServer* /*control_server*/)
339     {
340         os << "ERR:Unknown command -- " << request << "\n";
341     }
342 };
343 
344 /////////////////////////////////////////////////////////////////////////////
345 //
346 ///@internal
347 
348 /* static */
349 CWorkerNodeControlServer::IRequestProcessor*
MakeProcessor(const string & cmd)350     CWorkerNodeControlServer::MakeProcessor(const string& cmd)
351 {
352     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("VERSION")))
353         return new CGetVersionProcessor;
354 
355     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("STAT")))
356         return new CGetStatisticsProcessor;
357 
358     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("SHUTDOWN")))
359         return new CShutdownProcessor;
360 
361     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("SUSPEND")))
362         return new CSuspendProcessor;
363 
364     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("RESUME")))
365         return new CResumeProcessor;
366 
367     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("GETLOAD")))
368         return new CGetLoadProcessor;
369 
370     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("GETCONF")))
371         return new CGetConfProcessor;
372 
373     if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("ACKALERT")))
374         return new CAckAlertProcessor;
375 
376     return new CUnknownProcessor;
377 }
378 
379 class CWNCTConnectionFactory : public IServer_ConnectionFactory
380 {
381 public:
CWNCTConnectionFactory(CWorkerNodeControlServer & server,unsigned short & start_port,unsigned short end_port)382     CWNCTConnectionFactory(CWorkerNodeControlServer& server,
383         unsigned short& start_port, unsigned short end_port)
384         : m_Server(server), m_Port(start_port), m_EndPort(end_port)
385     {}
Create(void)386     virtual IServer_ConnectionHandler* Create(void) {
387         return new CWNCTConnectionHandler(m_Server);
388     }
OnFailure(unsigned short * port)389     virtual EListenAction OnFailure(unsigned short* port)
390     {
391         if (*port >= m_EndPort)
392             return eLAFail;
393         m_Port = ++(*port);
394         return eLARetry;
395     }
396 
397 private:
398     CWorkerNodeControlServer& m_Server;
399     unsigned short& m_Port;
400     unsigned short m_EndPort;
401 };
402 
403 static STimeout kAcceptTimeout = {1,0};
404 
CWorkerNodeControlServer(SGridWorkerNodeImpl * worker_node,unsigned short start_port,unsigned short end_port)405 CWorkerNodeControlServer::CWorkerNodeControlServer(
406     SGridWorkerNodeImpl* worker_node,
407     unsigned short start_port,
408     unsigned short end_port) :
409         m_WorkerNode(worker_node),
410         m_ShutdownRequested(false),
411         m_Port(start_port)
412 {
413     SServer_Parameters params;
414     params.init_threads = 1;
415     params.max_threads = 3;
416     params.accept_timeout = &kAcceptTimeout;
417     SetParameters(params);
418     AddListener(new CWNCTConnectionFactory(*this, m_Port, end_port), m_Port);
419 }
420 
~CWorkerNodeControlServer()421 CWorkerNodeControlServer::~CWorkerNodeControlServer()
422 {
423     LOG_POST_X(14, Info << "Control server stopped.");
424 }
ShutdownRequested(void)425 bool CWorkerNodeControlServer::ShutdownRequested(void)
426 {
427     return m_ShutdownRequested;
428 }
429 
ProcessTimeout(void)430 void CWorkerNodeControlServer::ProcessTimeout(void)
431 {
432     CGridGlobals::GetInstance().GetJobWatcher().CheckForInfiniteLoop();
433 }
434 
435 
436 
437 ////////////////////////////////////////////////
s_ReadStrFromBUF(BUF buf)438 static string s_ReadStrFromBUF(BUF buf)
439 {
440     size_t size = BUF_Size(buf);
441     string ret(size, '\0');
442     if (size > 0)
443         BUF_Read(buf, &ret[0], size);
444     return ret;
445 }
446 
CWNCTConnectionHandler(CWorkerNodeControlServer & server)447 CWNCTConnectionHandler::CWNCTConnectionHandler(CWorkerNodeControlServer& server)
448     : m_Server(server)
449 {}
450 
~CWNCTConnectionHandler()451 CWNCTConnectionHandler::~CWNCTConnectionHandler()
452 {}
453 
OnOpen(void)454 void CWNCTConnectionHandler::OnOpen(void)
455 {
456     CSocket& socket = GetSocket();
457     socket.DisableOSSendDelay();
458     m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessAuth;
459 
460 }
461 
s_HandleError(CSocket & socket,const string & msg)462 static void s_HandleError(CSocket& socket, const string& msg)
463 {
464     ERR_POST_X(15, "Exception in the control server: " << msg);
465     string err = "ERR:" + NStr::PrintableString(msg);
466     socket.Write(&err[0], err.size());
467 }
OnMessage(BUF buffer)468 void CWNCTConnectionHandler::OnMessage(BUF buffer)
469 {
470     try {
471         (this->*m_ProcessMessage)(buffer);
472     } catch(exception& ex) {
473         s_HandleError(GetSocket(), ex.what());
474     } catch(...) {
475         s_HandleError(GetSocket(), "Unknown Error");
476     }
477 }
478 
x_ProcessAuth(BUF buffer)479 void CWNCTConnectionHandler::x_ProcessAuth(BUF buffer)
480 {
481     m_Auth = s_ReadStrFromBUF(buffer);
482     m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessQueue;
483 }
x_ProcessQueue(BUF buffer)484 void CWNCTConnectionHandler::x_ProcessQueue(BUF buffer)
485 {
486     m_Queue = s_ReadStrFromBUF(buffer);
487     m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessRequest;
488 }
x_ProcessRequest(BUF buffer)489 void CWNCTConnectionHandler::x_ProcessRequest(BUF buffer)
490 {
491     string request = s_ReadStrFromBUF(buffer);
492 
493     CSocket& socket = GetSocket();
494     string host = socket.GetPeerAddress();
495 
496     CNcbiOstrstream os;
497 
498     unique_ptr<CWorkerNodeControlServer::IRequestProcessor>
499             processor(m_Server.MakeProcessor(request));
500 
501     if (processor->Authenticate(host, m_Auth, m_Queue, os, &m_Server))
502         processor->Process(request, os, &m_Server);
503 
504     string s = CNcbiOstrstreamToString(os);
505     socket.Write(s.data(), s.size());
506 }
507 
508 END_NCBI_SCOPE
509