1 /* $Id: grid_control_thread.cpp 593248 2019-09-16 12:18:31Z grichenk $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 * NetSchedule Worker Node implementation
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "netschedule_api_impl.hpp"
35 #include "grid_worker_impl.hpp"
36 #include "grid_control_thread.hpp"
37
38 #include <connect/services/grid_globals.hpp>
39 #include <connect/services/grid_worker.hpp>
40
41 #include <corelib/ncbistre.hpp>
42 #include <corelib/ncbi_process.hpp>
43
44 #include <math.h>
45
46
47 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
48
49 BEGIN_NCBI_SCOPE
50
51 /////////////////////////////////////////////////////////////////////////////
52 //
53 ///@internal
54
55 class CGetVersionProcessor : public CWorkerNodeControlServer::IRequestProcessor
56 {
57 public:
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)58 virtual void Process(const string& /*request*/,
59 CNcbiOstream& os,
60 CWorkerNodeControlServer* control_server)
61 {
62 CGridWorkerNode node(control_server->GetWorkerNode());
63 const CGridWorkerNode::TVersion version(node.GetAppVersion());
64
65 os << "OK:version=" << NStr::URLEncode(version.first) <<
66 "&build_date=" << NStr::URLEncode(version.second.date) <<
67 "&build_tag=" << NStr::URLEncode(version.second.tag) << "\n";
68 }
69 };
70
71 class CAdminCmdProcessor : public CWorkerNodeControlServer::IRequestProcessor
72 {
73 public:
Authenticate(const string & host,const string &,const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)74 virtual bool Authenticate(const string& host,
75 const string& /*auth*/,
76 const string& /*queue*/,
77 CNcbiOstream& os,
78 CWorkerNodeControlServer* control_server)
79 {
80 m_Host = host;
81 size_t pos = m_Host.find_first_of(':');
82 if (pos != string::npos) {
83 m_Host = m_Host.substr(0, pos);
84 }
85 if (control_server->GetWorkerNode().IsHostInAdminHostsList(m_Host)) {
86 return true;
87 }
88 os << "ERR:Shutdown access denied.\n";
89 LOG_POST_X(10, Warning << "Shutdown access denied for host " << m_Host);
90 return false;
91 }
92
93 protected:
94 string m_Host;
95 };
96
97 class CShutdownProcessor : public CAdminCmdProcessor
98 {
99 public:
Process(const string & request,CNcbiOstream & os,CWorkerNodeControlServer *)100 virtual void Process(const string& request,
101 CNcbiOstream& os,
102 CWorkerNodeControlServer* /*control_server*/)
103 {
104 if (request.find("SUICIDE") != NPOS) {
105 LOG_POST_X(11, Warning <<
106 "Shutdown request has been received from host: " << m_Host);
107 LOG_POST_X(12, Warning << "Server is shutting down");
108 CGridGlobals::GetInstance().KillNode();
109 } else {
110 CNetScheduleAdmin::EShutdownLevel level =
111 CNetScheduleAdmin::eNormalShutdown;
112 if (request.find("IMMEDIATE") != NPOS)
113 level = CNetScheduleAdmin::eShutdownImmediate;
114 os << "OK:\n";
115 CGridGlobals::GetInstance().RequestShutdown(level);
116 LOG_POST_X(13, "Shutdown request has been received from host " <<
117 m_Host);
118 }
119 }
120 };
121
122 class CSuspendProcessor : public CAdminCmdProcessor
123 {
124 public:
Process(const string & request,CNcbiOstream & os,CWorkerNodeControlServer * control_server)125 virtual void Process(const string& request, CNcbiOstream& os,
126 CWorkerNodeControlServer* control_server)
127 {
128 bool pullback = NStr::Find(request.c_str(), "pullback") != NPOS;
129
130 unsigned timeout = 0;
131 SIZE_TYPE timeout_str = NStr::Find(request.c_str(), "timeout=");
132 if (timeout_str != NPOS)
133 timeout = NStr::StringToUInt(request.c_str() +
134 timeout_str + sizeof("timeout=") - 1,
135 NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols);
136
137 LOG_POST("Received SUSPEND request from " << m_Host <<
138 " (pullback=" << (pullback ? "ON" : "OFF") <<
139 ", timeout=" << timeout << ')');
140
141 control_server->GetWorkerNode().Suspend(pullback, timeout);
142
143 os << "OK:\n";
144 }
145 };
146
147 class CResumeProcessor : public CAdminCmdProcessor
148 {
149 public:
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)150 virtual void Process(const string& /*request*/, CNcbiOstream& os,
151 CWorkerNodeControlServer* control_server)
152 {
153 control_server->GetWorkerNode().Resume();
154 LOG_POST("Received RESUME request from " << m_Host);
155 os << "OK:\n";
156 }
157 };
158
159 class CGetStatisticsProcessor :
160 public CWorkerNodeControlServer::IRequestProcessor
161 {
162 public:
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)163 virtual void Process(const string& /*request*/,
164 CNcbiOstream& os,
165 CWorkerNodeControlServer* control_server)
166 {
167 CGridWorkerNode node(control_server->GetWorkerNode());
168 const CGridWorkerNode::TVersion version(node.GetAppVersion());
169
170 os << "OK:Application: " << node.GetAppName() <<
171 "\nVersion: " << version.first <<
172 "\nBuild date: " << version.second.date <<
173 "\nBuild tag: " << version.second.tag << "\n";
174
175 for (const auto& p : version.second.m_extra) {
176 os << SBuildInfo::ExtraName(p.first) << ": " << p.second << '\n';
177 }
178
179 {{
180 CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
181 if (app)
182 os << "Executable path: " << app->GetProgramExecutablePath()
183 << "\nPID: " << CCurrentProcess::GetPid() << "\n";
184 }}
185
186 CNetScheduleAPI ns_api(node.GetNetScheduleAPI());
187
188 os << "Host name: " << CSocketAPI::gethostname() <<
189 "\nControl port: " << control_server->GetControlPort() <<
190 "\nUser name: " << GetDiagContext().GetUsername() <<
191 "\nNetCache client name: " << node.GetNetCacheAPI().
192 GetService()->GetClientName() <<
193 "\nNetSchedule client name: " << node->GetClientName() <<
194 "\nQueue name: " << node->GetQueueName() <<
195 "\nNode ID: " << ns_api->m_ClientNode <<
196 "\nNode session: " << ns_api->m_ClientSession <<
197 "\nMaximum job threads: " << node.GetMaxThreads() << "\n";
198
199 if (node.IsSuspended())
200 os << "The node is suspended\n";
201
202 if (CGridGlobals::GetInstance().IsShuttingDown())
203 os << "The node is shutting down\n";
204
205 if (node->IsExclusiveMode())
206 os << "The node is processing an exclusive job\n";
207
208 CGridGlobals::GetInstance().GetJobWatcher().Print(os);
209
210 os << "NetSchedule service: " <<
211 ns_api.GetService().GetServiceName() << "\n";
212
213 os << "NetSchedule servers:";
214 try {
215 for (CNetServiceIterator it = ns_api.GetService().
216 Iterate(CNetService::eIncludePenalized); it; ++it)
217 os << ' ' << (*it).GetServerAddress();
218 os << "\n";
219 }
220 catch (CNetSrvConnException&) {
221 os << " N/A\n";
222 }
223
224 os << "Preferred affinities:";
225 CNetScheduleExecutor ns_executor(node.GetNSExecutor());
226 CFastMutexGuard guard(ns_executor->m_PreferredAffMutex);
227 ITERATE(set<string>, aff, ns_executor->m_PreferredAffinities) {
228 os << ' ' << *aff;
229 }
230 os << "\n";
231
232 auto registry = node->m_SynRegistry;
233 _ASSERT(registry);
234
235 registry->Alerts(os);
236
237 os << "OK:END\n";
238 }
239 };
240
241 class CGetLoadProcessor : public CWorkerNodeControlServer::IRequestProcessor
242 {
243 public:
Authenticate(const string &,const string & auth,const string & queue,CNcbiOstream & os,CWorkerNodeControlServer * control_server)244 virtual bool Authenticate(const string& /*host*/,
245 const string& auth,
246 const string& queue,
247 CNcbiOstream& os,
248 CWorkerNodeControlServer* control_server)
249 {
250 CGridWorkerNode node(control_server->GetWorkerNode());
251
252 if (NStr::FindCase(auth, node->GetClientName()) == NPOS) {
253 os <<"ERR:Wrong client name. Required: " <<
254 node->GetClientName() << "\n";
255 return false;
256 }
257
258 CTempString qname, connection_info;
259 NStr::SplitInTwo(queue, ";", qname, connection_info);
260 if (qname != node->GetQueueName()) {
261 os << "ERR:Wrong queue name. Required: " <<
262 node->GetQueueName() << "\n";
263 return false;
264 }
265
266 return true;
267 }
268
Process(const string &,CNcbiOstream & os,CWorkerNodeControlServer * control_server)269 virtual void Process(const string& /*request*/,
270 CNcbiOstream& os,
271 CWorkerNodeControlServer* control_server)
272 {
273 int load = control_server->GetWorkerNode().GetMaxThreads() -
274 CGridGlobals::GetInstance().GetJobWatcher().GetJobsRunningNumber();
275 os << "OK:" << load << "\n";
276 }
277 };
278
279 class CGetConfProcessor : public CWorkerNodeControlServer::IRequestProcessor
280 {
281 public:
282 void Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server) override;
283 };
284
Process(const string &,CNcbiOstream & reply,CWorkerNodeControlServer * control_server)285 void CGetConfProcessor::Process(const string&, CNcbiOstream& reply, CWorkerNodeControlServer* control_server)
286 {
287 _ASSERT(control_server);
288
289 auto node = static_cast<SGridWorkerNodeImpl*>(control_server->GetWorkerNode());
290 _ASSERT(node);
291
292 auto registry = node->m_SynRegistry;
293 _ASSERT(registry);
294
295 registry->Report(reply);
296 reply << "OK:END\n";
297 }
298
299 class CAckAlertProcessor : public CWorkerNodeControlServer::IRequestProcessor
300 {
301 public:
302 void Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server) override;
303 };
304
Process(const string & request,CNcbiOstream & reply,CWorkerNodeControlServer * control_server)305 void CAckAlertProcessor::Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server)
306 {
307 _ASSERT(control_server);
308
309 auto node = static_cast<SGridWorkerNodeImpl*>(control_server->GetWorkerNode());
310 _ASSERT(node);
311
312 auto registry = node->m_SynRegistry;
313 _ASSERT(registry);
314
315 const string kAlertIDPrefix = " alert_";
316 auto pos = NStr::Find(request, kAlertIDPrefix, NStr::eNocase);
317
318 if (pos == NPOS) {
319 reply << "ERR:Alert ID is required\n";
320 return;
321 }
322
323 const auto kFlags = NStr::fConvErr_NoThrow | NStr::fAllowTrailingSymbols;
324 auto id = NStr::StringToUInt(request.c_str() + pos + kAlertIDPrefix.size(), kFlags);
325
326 if (!registry->AckAlert(id)) {
327 reply << "ERR:Failed to find an alert with such ID (" << id << ")\n";
328 } else {
329 reply << "OK:\n";
330 }
331 }
332
333 class CUnknownProcessor : public CWorkerNodeControlServer::IRequestProcessor
334 {
335 public:
Process(const string & request,CNcbiOstream & os,CWorkerNodeControlServer *)336 virtual void Process(const string& request,
337 CNcbiOstream& os,
338 CWorkerNodeControlServer* /*control_server*/)
339 {
340 os << "ERR:Unknown command -- " << request << "\n";
341 }
342 };
343
344 /////////////////////////////////////////////////////////////////////////////
345 //
346 ///@internal
347
348 /* static */
349 CWorkerNodeControlServer::IRequestProcessor*
MakeProcessor(const string & cmd)350 CWorkerNodeControlServer::MakeProcessor(const string& cmd)
351 {
352 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("VERSION")))
353 return new CGetVersionProcessor;
354
355 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("STAT")))
356 return new CGetStatisticsProcessor;
357
358 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("SHUTDOWN")))
359 return new CShutdownProcessor;
360
361 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("SUSPEND")))
362 return new CSuspendProcessor;
363
364 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("RESUME")))
365 return new CResumeProcessor;
366
367 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("GETLOAD")))
368 return new CGetLoadProcessor;
369
370 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("GETCONF")))
371 return new CGetConfProcessor;
372
373 if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("ACKALERT")))
374 return new CAckAlertProcessor;
375
376 return new CUnknownProcessor;
377 }
378
379 class CWNCTConnectionFactory : public IServer_ConnectionFactory
380 {
381 public:
CWNCTConnectionFactory(CWorkerNodeControlServer & server,unsigned short & start_port,unsigned short end_port)382 CWNCTConnectionFactory(CWorkerNodeControlServer& server,
383 unsigned short& start_port, unsigned short end_port)
384 : m_Server(server), m_Port(start_port), m_EndPort(end_port)
385 {}
Create(void)386 virtual IServer_ConnectionHandler* Create(void) {
387 return new CWNCTConnectionHandler(m_Server);
388 }
OnFailure(unsigned short * port)389 virtual EListenAction OnFailure(unsigned short* port)
390 {
391 if (*port >= m_EndPort)
392 return eLAFail;
393 m_Port = ++(*port);
394 return eLARetry;
395 }
396
397 private:
398 CWorkerNodeControlServer& m_Server;
399 unsigned short& m_Port;
400 unsigned short m_EndPort;
401 };
402
403 static STimeout kAcceptTimeout = {1,0};
404
CWorkerNodeControlServer(SGridWorkerNodeImpl * worker_node,unsigned short start_port,unsigned short end_port)405 CWorkerNodeControlServer::CWorkerNodeControlServer(
406 SGridWorkerNodeImpl* worker_node,
407 unsigned short start_port,
408 unsigned short end_port) :
409 m_WorkerNode(worker_node),
410 m_ShutdownRequested(false),
411 m_Port(start_port)
412 {
413 SServer_Parameters params;
414 params.init_threads = 1;
415 params.max_threads = 3;
416 params.accept_timeout = &kAcceptTimeout;
417 SetParameters(params);
418 AddListener(new CWNCTConnectionFactory(*this, m_Port, end_port), m_Port);
419 }
420
~CWorkerNodeControlServer()421 CWorkerNodeControlServer::~CWorkerNodeControlServer()
422 {
423 LOG_POST_X(14, Info << "Control server stopped.");
424 }
ShutdownRequested(void)425 bool CWorkerNodeControlServer::ShutdownRequested(void)
426 {
427 return m_ShutdownRequested;
428 }
429
ProcessTimeout(void)430 void CWorkerNodeControlServer::ProcessTimeout(void)
431 {
432 CGridGlobals::GetInstance().GetJobWatcher().CheckForInfiniteLoop();
433 }
434
435
436
437 ////////////////////////////////////////////////
s_ReadStrFromBUF(BUF buf)438 static string s_ReadStrFromBUF(BUF buf)
439 {
440 size_t size = BUF_Size(buf);
441 string ret(size, '\0');
442 if (size > 0)
443 BUF_Read(buf, &ret[0], size);
444 return ret;
445 }
446
CWNCTConnectionHandler(CWorkerNodeControlServer & server)447 CWNCTConnectionHandler::CWNCTConnectionHandler(CWorkerNodeControlServer& server)
448 : m_Server(server)
449 {}
450
~CWNCTConnectionHandler()451 CWNCTConnectionHandler::~CWNCTConnectionHandler()
452 {}
453
OnOpen(void)454 void CWNCTConnectionHandler::OnOpen(void)
455 {
456 CSocket& socket = GetSocket();
457 socket.DisableOSSendDelay();
458 m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessAuth;
459
460 }
461
s_HandleError(CSocket & socket,const string & msg)462 static void s_HandleError(CSocket& socket, const string& msg)
463 {
464 ERR_POST_X(15, "Exception in the control server: " << msg);
465 string err = "ERR:" + NStr::PrintableString(msg);
466 socket.Write(&err[0], err.size());
467 }
OnMessage(BUF buffer)468 void CWNCTConnectionHandler::OnMessage(BUF buffer)
469 {
470 try {
471 (this->*m_ProcessMessage)(buffer);
472 } catch(exception& ex) {
473 s_HandleError(GetSocket(), ex.what());
474 } catch(...) {
475 s_HandleError(GetSocket(), "Unknown Error");
476 }
477 }
478
x_ProcessAuth(BUF buffer)479 void CWNCTConnectionHandler::x_ProcessAuth(BUF buffer)
480 {
481 m_Auth = s_ReadStrFromBUF(buffer);
482 m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessQueue;
483 }
x_ProcessQueue(BUF buffer)484 void CWNCTConnectionHandler::x_ProcessQueue(BUF buffer)
485 {
486 m_Queue = s_ReadStrFromBUF(buffer);
487 m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessRequest;
488 }
x_ProcessRequest(BUF buffer)489 void CWNCTConnectionHandler::x_ProcessRequest(BUF buffer)
490 {
491 string request = s_ReadStrFromBUF(buffer);
492
493 CSocket& socket = GetSocket();
494 string host = socket.GetPeerAddress();
495
496 CNcbiOstrstream os;
497
498 unique_ptr<CWorkerNodeControlServer::IRequestProcessor>
499 processor(m_Server.MakeProcessor(request));
500
501 if (processor->Authenticate(host, m_Auth, m_Queue, os, &m_Server))
502 processor->Process(request, os, &m_Server);
503
504 string s = CNcbiOstrstreamToString(os);
505 socket.Write(s.data(), s.size());
506 }
507
508 END_NCBI_SCOPE
509