1 /* $Id: ns_server.cpp 601644 2020-02-11 19:47:28Z satskyse $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Anatoliy Kuznetsov, Victor Joukov
27 *
28 * File Description: NetScheduler threaded server
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "ns_server.hpp"
35 #include "ns_ini_params.hpp"
36 #include "queue_database.hpp"
37 #include "ns_types.hpp"
38
39 #include <corelib/request_ctx.hpp>
40
41 USING_NCBI_SCOPE;
42
43
44 CNetScheduleServer* CNetScheduleServer::sm_netschedule_server = 0;
45
46 //////////////////////////////////////////////////////////////////////////
47 /// NetScheduler threaded server implementation
CNetScheduleServer(const string & dbpath,bool diskless)48 CNetScheduleServer::CNetScheduleServer(const string & dbpath,
49 bool diskless)
50 : m_BackgroundHost(this),
51 m_Port(0),
52 m_HostNetAddr(0),
53 m_Shutdown(false),
54 m_SigNum(0),
55 m_InactivityTimeout(default_network_timeout),
56 m_QueueDB(NULL),
57 m_StartTime(GetFastLocalTime()),
58 m_LogFlag(default_is_log),
59 m_LogBatchEachJobFlag(default_log_batch_each_job),
60 m_LogNotificationThreadFlag(default_log_notification_thread),
61 m_LogCleaningThreadFlag(default_log_cleaning_thread),
62 m_LogExecutionWatcherThreadFlag(default_log_execution_watcher_thread),
63 m_LogStatisticsThreadFlag(default_log_statistics_thread),
64 m_RefuseSubmits(false),
65 m_UseHostname(default_use_hostname),
66 m_Diskless(diskless),
67 m_DBDrained(false),
68 m_DeleteBatchSize(default_del_batch_size),
69 m_MarkdelBatchSize(default_markdel_batch_size),
70 m_ScanBatchSize(default_scan_batch_size),
71 m_PurgeTimeout(default_purge_timeout),
72 m_StatInterval(default_stat_interval),
73 m_JobCountersInterval(default_job_counters_interval),
74 m_MaxClientData(default_max_client_data),
75 m_NodeID("not_initialized"),
76 m_SessionID("s" + x_GenerateGUID()),
77 m_StartIDs(dbpath, diskless),
78 m_AnybodyCanReconfigure(false),
79 m_ReserveDumpSpace(default_reserve_dump_space)
80 {
81 m_CurrentSubmitsCounter.Set(kSubmitCounterInitialValue);
82 sm_netschedule_server = this;
83 }
84
85
~CNetScheduleServer()86 CNetScheduleServer::~CNetScheduleServer()
87 {
88 delete m_QueueDB;
89 }
90
91
AddDefaultListener(IServer_ConnectionFactory * factory)92 void CNetScheduleServer::AddDefaultListener(IServer_ConnectionFactory* factory)
93 {
94 // port must be set before listening
95 _ASSERT(m_Port);
96 AddListener(factory, m_Port);
97 }
98
99
100 // Returns what was changed
SetNSParameters(const SNS_Parameters & params,bool limited)101 CJsonNode CNetScheduleServer::SetNSParameters(const SNS_Parameters & params,
102 bool limited)
103 {
104 CJsonNode what_changed = CJsonNode::NewObjectNode();
105 CJsonNode changes = CJsonNode::NewObjectNode();
106 bool new_val;
107
108 if (m_LogFlag != params.is_log) {
109 CJsonNode values = CJsonNode::NewArrayNode();
110 values.AppendBoolean(m_LogFlag);
111 values.AppendBoolean(params.is_log);
112 changes.SetByKey("log", values);
113 }
114 m_LogFlag = params.is_log;
115
116 new_val = (params.log_batch_each_job && m_LogFlag);
117 if (m_LogBatchEachJobFlag != new_val) {
118 CJsonNode values = CJsonNode::NewArrayNode();
119 values.AppendBoolean(m_LogBatchEachJobFlag);
120 values.AppendBoolean(new_val);
121 changes.SetByKey("log_batch_each_job", values);
122 }
123 m_LogBatchEachJobFlag = new_val;
124
125 new_val = (params.log_notification_thread && m_LogFlag);
126 if (m_LogNotificationThreadFlag != new_val) {
127 CJsonNode values = CJsonNode::NewArrayNode();
128 values.AppendBoolean(m_LogNotificationThreadFlag);
129 values.AppendBoolean(new_val);
130 changes.SetByKey("log_notification_thread", values);
131 }
132 m_LogNotificationThreadFlag = new_val;
133
134 new_val = (params.log_cleaning_thread && m_LogFlag);
135 if (m_LogCleaningThreadFlag != new_val) {
136 CJsonNode values = CJsonNode::NewArrayNode();
137 values.AppendBoolean(m_LogCleaningThreadFlag);
138 values.AppendBoolean(new_val);
139 changes.SetByKey("log_cleaning_thread", values);
140 }
141 m_LogCleaningThreadFlag = new_val;
142
143 new_val = (params.log_execution_watcher_thread&& m_LogFlag);
144 if (m_LogExecutionWatcherThreadFlag != new_val) {
145 CJsonNode values = CJsonNode::NewArrayNode();
146 values.AppendBoolean(m_LogExecutionWatcherThreadFlag);
147 values.AppendBoolean(new_val);
148 changes.SetByKey("log_execution_watcher_thread", values);
149 }
150 m_LogExecutionWatcherThreadFlag = new_val;
151
152 new_val = (params.log_statistics_thread && m_LogFlag);
153 if (m_LogStatisticsThreadFlag != new_val) {
154 CJsonNode values = CJsonNode::NewArrayNode();
155 values.AppendBoolean(m_LogStatisticsThreadFlag);
156 values.AppendBoolean(new_val);
157 changes.SetByKey("log_statistics_thread", values);
158 }
159 m_LogStatisticsThreadFlag = new_val;
160
161 if (m_StatInterval != params.stat_interval) {
162 CJsonNode values = CJsonNode::NewArrayNode();
163 values.AppendInteger(m_StatInterval);
164 values.AppendInteger(params.stat_interval);
165 changes.SetByKey("stat_interval", values);
166 }
167 m_StatInterval = params.stat_interval;
168
169 if (m_JobCountersInterval != params.job_counters_interval) {
170 CJsonNode values = CJsonNode::NewArrayNode();
171 values.AppendInteger(m_JobCountersInterval);
172 values.AppendInteger(params.job_counters_interval);
173 changes.SetByKey("job_counters_interval", values);
174 }
175 m_JobCountersInterval = params.job_counters_interval;
176
177 if (m_MaxClientData != params.max_client_data) {
178 CJsonNode values = CJsonNode::NewArrayNode();
179 values.AppendInteger(m_MaxClientData);
180 values.AppendInteger(params.max_client_data);
181 changes.SetByKey("max_client_data", values);
182 }
183 m_MaxClientData = params.max_client_data;
184
185 CJsonNode accepted_hosts = m_AdminHosts.SetHosts(params.admin_hosts);
186 if (accepted_hosts.GetSize() > 0)
187 changes.SetByKey("admin_host", accepted_hosts);
188
189 CJsonNode admin_diff = x_SetFromList(params.admin_client_names,
190 m_AdminClientNames,
191 m_AdminClientsLock);
192 if (admin_diff.GetSize() > 0)
193 changes.SetByKey("admin_client_name", admin_diff);
194
195 CJsonNode state_transition_perf_log_queues_diff =
196 x_SetFromList(params.state_transition_perf_log_queues,
197 m_StateTransitionPerfLogQueues,
198 m_STPerfLogQCLock);
199 if (state_transition_perf_log_queues_diff.GetSize() > 0)
200 changes.SetByKey("state_transition_perf_log_queues",
201 state_transition_perf_log_queues_diff);
202
203 CJsonNode state_transition_perf_log_classes_diff =
204 x_SetFromList(params.state_transition_perf_log_classes,
205 m_StateTransitionPerfLogClasses,
206 m_STPerfLogQCLock);
207 if (state_transition_perf_log_classes_diff.GetSize() > 0)
208 changes.SetByKey("state_transition_perf_log_classes",
209 state_transition_perf_log_classes_diff);
210
211 if (m_InactivityTimeout != params.network_timeout) {
212 CJsonNode values = CJsonNode::NewArrayNode();
213 values.AppendInteger(m_InactivityTimeout);
214 values.AppendInteger(params.network_timeout);
215 changes.SetByKey("network_timeout", values);
216 }
217 m_InactivityTimeout = params.network_timeout;
218
219 if (m_ReserveDumpSpace != params.reserve_dump_space) {
220 CJsonNode values = CJsonNode::NewArrayNode();
221 values.AppendInteger(m_ReserveDumpSpace);
222 values.AppendInteger(params.reserve_dump_space);
223 changes.SetByKey("reserve_dump_space", values);
224 }
225 m_ReserveDumpSpace = params.reserve_dump_space;
226
227
228 if (limited) {
229 // max_connections CServer parameter could be changed on the fly via
230 // the RECO command. So check if it was changed.
231 SServer_Parameters current_params;
232 CServer::GetParameters(¤t_params);
233 if (current_params.max_connections != params.max_connections) {
234 CJsonNode values = CJsonNode::NewArrayNode();
235 values.AppendInteger(current_params.max_connections);
236 values.AppendInteger(params.max_connections);
237 changes.SetByKey("max_connections", values);
238
239 current_params.max_connections = params.max_connections;
240 CServer::SetParameters(current_params);
241 }
242 }
243
244
245 if (changes.GetSize() > 0)
246 what_changed.SetByKey("server_changes", changes);
247
248 #if defined(_DEBUG) && !defined(NDEBUG)
249 debug_fd_count = params.debug_fd_count;
250 debug_mem_count = params.debug_mem_count;
251 debug_write_delay = params.debug_write_delay;
252 debug_conn_drop_before_write = params.debug_conn_drop_before_write;
253 debug_conn_drop_after_write = params.debug_conn_drop_after_write;
254 debug_reply_with_garbage = params.debug_reply_with_garbage;
255 debug_garbage = params.debug_garbage;
256 #endif
257
258 if (limited)
259 return what_changed;
260
261
262 CServer::SetParameters(params);
263 m_Port = params.port;
264 m_HostNetAddr = CSocketAPI::gethostbyname(kEmptyStr);
265 m_UseHostname = params.use_hostname;
266 if (m_UseHostname)
267 m_Host = CSocketAPI::gethostname();
268 else
269 m_Host = CSocketAPI::ntoa(m_HostNetAddr);
270
271 // Purge related parameters
272 m_DeleteBatchSize = params.del_batch_size;
273 m_MarkdelBatchSize = params.markdel_batch_size;
274 m_ScanBatchSize = params.scan_batch_size;
275 m_PurgeTimeout = params.purge_timeout;
276
277 m_AffRegistrySettings = params.affinity_reg;
278 m_GroupRegistrySettings = params.group_reg;
279 m_ScopeRegistrySettings = params.scope_reg;
280
281 // The difference is required only at the stage of RECO.
282 // RECO always calls the function with the limited flag so there is no need
283 // to provide the difference here
284 return CJsonNode::NewObjectNode();
285 }
286
287
288 // Return: difference
289 // Note: it is always called after the configuration is validated so
290 // no messages should be logged
ReadServicesConfig(const CNcbiRegistry & reg)291 CJsonNode CNetScheduleServer::ReadServicesConfig(const CNcbiRegistry & reg)
292 {
293 CJsonNode diff = CJsonNode::NewObjectNode();
294 const string section = "service_to_queue";
295 map< string, string > new_services;
296
297 // Read the new list -- new alerts if so
298 list<string> entries;
299 reg.EnumerateEntries(section, &entries);
300
301
302 for (list<string>::const_iterator k = entries.begin();
303 k != entries.end(); ++k) {
304 string service_name = *k;
305 string qname = reg.Get("service_to_queue", service_name);
306 if (qname.empty())
307 continue;
308
309 // Check that the queue name has been provided
310 if (!m_QueueDB->QueueExists(qname))
311 continue;
312
313 // Config line is fine, memorize it
314 new_services[service_name] = qname;
315 }
316
317 // Compare with the old list -- combine report string
318 CJsonNode new_items = CJsonNode::NewArrayNode();
319 CJsonNode deleted_items = CJsonNode::NewArrayNode();
320 CJsonNode changed_items = CJsonNode::NewObjectNode();
321
322 for (map< string, string>::const_iterator k = new_services.begin();
323 k != new_services.end(); ++k) {
324 map< string, string>::const_iterator found;
325 for (found = m_Services.begin(); found != m_Services.end(); ++found)
326 if (NStr::CompareNocase(found->first, k->first) == 0)
327 break;
328
329 if (found == m_Services.end()) {
330 new_items.AppendString(k->first);
331 continue;
332 }
333 if (found->second != k->second) {
334 CJsonNode vals = CJsonNode::NewArrayNode();
335 vals.AppendString(found->second);
336 vals.AppendString(k->second);
337 changed_items.SetByKey(k->first, vals);
338 }
339 }
340
341 for (map< string, string>::const_iterator k = m_Services.begin();
342 k != m_Services.end(); ++k) {
343 map< string, string>::const_iterator found;
344 for (found = new_services.begin(); found != new_services.end(); ++found)
345 if (NStr::CompareNocase(found->first, k->first) == 0)
346 break;
347
348 if (found == new_services.end())
349 deleted_items.AppendString(k->first);
350 }
351
352
353 if (new_items.GetSize() > 0)
354 diff.SetByKey("services_added", new_items);
355
356 if (deleted_items.GetSize() > 0)
357 diff.SetByKey("services_deleted", deleted_items);
358
359 if (changed_items.GetSize() > 0)
360 diff.SetByKey("services_changed", changed_items);
361
362 // Set the current as the new one
363 CFastMutexGuard guard(m_ServicesLock);
364 m_Services = new_services;
365 return diff;
366 }
367
368
ShutdownRequested(void)369 bool CNetScheduleServer::ShutdownRequested(void)
370 {
371 return m_Shutdown;
372 }
373
374
SetQueueDB(CQueueDataBase * qdb)375 void CNetScheduleServer::SetQueueDB(CQueueDataBase* qdb)
376 {
377 delete m_QueueDB;
378 m_QueueDB = qdb;
379 }
380
381
SetShutdownFlag(int signum,bool db_was_drained)382 void CNetScheduleServer::SetShutdownFlag(int signum, bool db_was_drained)
383 {
384 if (!m_Shutdown) {
385 m_Shutdown = true;
386 m_SigNum = signum;
387 m_DBDrained = db_was_drained;
388 }
389 }
390
391
392 // Queue handling
Configure(const IRegistry & reg,CJsonNode & diff)393 unsigned int CNetScheduleServer::Configure(const IRegistry & reg,
394 CJsonNode & diff)
395 {
396 return m_QueueDB->Configure(reg, diff);
397 }
398
CountActiveJobs() const399 unsigned CNetScheduleServer::CountActiveJobs() const
400 {
401 return m_QueueDB->CountActiveJobs();
402 }
403
404
OpenQueue(const std::string & name)405 CRef<CQueue> CNetScheduleServer::OpenQueue(const std::string& name)
406 {
407 return m_QueueDB->OpenQueue(name);
408 }
409
410
CreateDynamicQueue(const CNSClientId & client,const string & qname,const string & qclass,const string & description)411 void CNetScheduleServer::CreateDynamicQueue(const CNSClientId & client,
412 const string & qname,
413 const string & qclass,
414 const string & description)
415 {
416 m_QueueDB->CreateDynamicQueue(client, qname, qclass, description);
417 }
418
419
DeleteDynamicQueue(const CNSClientId & client,const std::string & qname)420 void CNetScheduleServer::DeleteDynamicQueue(const CNSClientId & client,
421 const std::string& qname)
422 {
423 m_QueueDB->DeleteDynamicQueue(client, qname);
424 }
425
426
427 SQueueParameters
QueueInfo(const string & qname) const428 CNetScheduleServer::QueueInfo(const string & qname) const
429 {
430 return m_QueueDB->QueueInfo(qname);
431 }
432
433
GetQueueNames(const string & sep) const434 std::string CNetScheduleServer::GetQueueNames(const string & sep) const
435 {
436 return m_QueueDB->GetQueueNames(sep);
437 }
438
439
PrintTransitionCounters(void)440 string CNetScheduleServer::PrintTransitionCounters(void)
441 {
442 return m_QueueDB->PrintTransitionCounters();
443 }
444
445
PrintJobsStat(const CNSClientId & client)446 string CNetScheduleServer::PrintJobsStat(const CNSClientId & client)
447 {
448 return m_QueueDB->PrintJobsStat(client);
449 }
450
451
GetQueueClassesInfo(void) const452 string CNetScheduleServer::GetQueueClassesInfo(void) const
453 {
454 return m_QueueDB->GetQueueClassesInfo();
455 }
456
457
GetQueueClassesConfig(void) const458 string CNetScheduleServer::GetQueueClassesConfig(void) const
459 {
460 return m_QueueDB->GetQueueClassesConfig();
461 }
462
463
GetQueueInfo(void) const464 string CNetScheduleServer::GetQueueInfo(void) const
465 {
466 return m_QueueDB->GetQueueInfo();
467 }
468
469
GetQueueConfig(void) const470 string CNetScheduleServer::GetQueueConfig(void) const
471 {
472 return m_QueueDB->GetQueueConfig();
473 }
474
475
GetLinkedSectionConfig(void) const476 string CNetScheduleServer::GetLinkedSectionConfig(void) const
477 {
478 return m_QueueDB->GetLinkedSectionConfig();
479 }
480
481
GetServiceToQueueSectionConfig(void) const482 string CNetScheduleServer::GetServiceToQueueSectionConfig(void) const
483 {
484 string output;
485 map< string, string >::const_iterator k;
486 CFastMutexGuard guard(m_ServicesLock);
487
488 if (m_Services.empty())
489 return output;
490
491 output = "\n[service_to_queue]";
492 for (k = m_Services.begin(); k != m_Services.end(); ++k) {
493 if (!output.empty())
494 output += "\n";
495 output += k->first + "=\"" + k->second + "\"";
496 }
497
498 return output;
499 }
500
501
ResolveService(const string & service) const502 string CNetScheduleServer::ResolveService(const string & service) const
503 {
504 map< string, string >::const_iterator k;
505 CFastMutexGuard guard(m_ServicesLock);
506
507 for (k = m_Services.begin(); k != m_Services.end(); ++k) {
508 if (NStr::CompareNocase(k->first, service) == 0)
509 return k->second;
510 }
511 return "";
512 }
513
514
GetServices(map<string,string> & services) const515 void CNetScheduleServer::GetServices(map<string, string> & services) const
516 {
517 CFastMutexGuard guard(m_ServicesLock);
518 services = m_Services;
519 }
520
521
AdminHostValid(unsigned host) const522 bool CNetScheduleServer::AdminHostValid(unsigned host) const
523 {
524 return m_AdminHosts.IsAllowed(host);
525 }
526
527
IsAdminClientName(const string & name) const528 bool CNetScheduleServer::IsAdminClientName(const string & name) const
529 {
530 CReadLockGuard guard(m_AdminClientsLock);
531
532 for (vector<string>::const_iterator k(m_AdminClientNames.begin());
533 k != m_AdminClientNames.end(); ++k)
534 if (*k == name)
535 return true;
536 return false;
537 }
538
GetAdminClientNames(void) const539 string CNetScheduleServer::GetAdminClientNames(void) const
540 {
541 string ret;
542 CReadLockGuard guard(m_AdminClientsLock);
543
544 for (vector<string>::const_iterator k(m_AdminClientNames.begin());
545 k != m_AdminClientNames.end(); ++k) {
546 if (!ret.empty())
547 ret += ", ";
548 ret += *k;
549 }
550 return ret;
551 }
552
553
GetStateTransitionPerfLogQueues(void) const554 string CNetScheduleServer::GetStateTransitionPerfLogQueues(void) const
555 {
556 string ret;
557 CReadLockGuard guard(m_STPerfLogQCLock);
558
559 for (vector<string>::const_iterator
560 k(m_StateTransitionPerfLogQueues.begin());
561 k != m_StateTransitionPerfLogQueues.end(); ++k) {
562 if (!ret.empty())
563 ret += ", ";
564 ret += *k;
565 }
566 return ret;
567 }
568
569
GetStateTransitionPerfLogClasses(void) const570 string CNetScheduleServer::GetStateTransitionPerfLogClasses(void) const
571 {
572 string ret;
573 CReadLockGuard guard(m_STPerfLogQCLock);
574
575 for (vector<string>::const_iterator
576 k(m_StateTransitionPerfLogClasses.begin());
577 k != m_StateTransitionPerfLogClasses.end(); ++k) {
578 if (!ret.empty())
579 ret += ", ";
580 ret += *k;
581 }
582 return ret;
583 }
584
585
586 bool
ShouldPerfLogTransitions(const string & queue_name,const string & class_name) const587 CNetScheduleServer::ShouldPerfLogTransitions(const string & queue_name,
588 const string & class_name) const
589 {
590 CReadLockGuard guard(m_STPerfLogQCLock);
591 for (vector<string>::const_iterator
592 k(m_StateTransitionPerfLogQueues.begin());
593 k != m_StateTransitionPerfLogQueues.end(); ++k) {
594 if (*k == "*")
595 return true;
596 if (NStr::CompareNocase(*k, queue_name) == 0)
597 return true;
598 }
599
600 if (!class_name.empty()) {
601 for (vector<string>::const_iterator
602 k(m_StateTransitionPerfLogClasses.begin());
603 k != m_StateTransitionPerfLogClasses.end(); ++k) {
604 if (*k == "*")
605 return true;
606 if (NStr::CompareNocase(*k, class_name) == 0)
607 return true;
608 }
609 }
610
611 return false;
612 }
613
614
GetAlerts(void) const615 string CNetScheduleServer::GetAlerts(void) const
616 {
617 return m_Alerts.GetURLEncoded();
618 }
619
620
SerializeAlerts(void) const621 string CNetScheduleServer::SerializeAlerts(void) const
622 {
623 return m_Alerts.Serialize();
624 }
625
626
627 enum EAlertAckResult
AcknowledgeAlert(const string & id,const string & user)628 CNetScheduleServer::AcknowledgeAlert(const string & id,
629 const string & user)
630 {
631 return m_Alerts.Acknowledge(id, user);
632 }
633
634
635 enum EAlertAckResult
AcknowledgeAlert(EAlertType alert_type,const string & user)636 CNetScheduleServer::AcknowledgeAlert(EAlertType alert_type,
637 const string & user)
638 {
639 return m_Alerts.Acknowledge(alert_type, user);
640 }
641
642
RegisterAlert(EAlertType alert_type,const string & message)643 void CNetScheduleServer::RegisterAlert(EAlertType alert_type,
644 const string & message)
645 {
646 m_Alerts.Register(alert_type, message);
647 }
648
649
650 // The method is called after the database is created or loaded.
651 // This guarantees that the directory is there.
652 // The file with an identifier could be read or created safely.
653 // Returns: true if everything is fine.
InitNodeID(const string & db_path)654 void CNetScheduleServer::InitNodeID(const string & db_path)
655 {
656 if (m_Diskless) {
657 m_NodeID = "n" + x_GenerateGUID();
658 return;
659 }
660
661 CFile node_id_file(CFile::MakePath(
662 CDirEntry::AddTrailingPathSeparator(db_path),
663 kNodeIDFileName));
664
665 if (node_id_file.Exists()) {
666 // File exists, read the ID from it
667 CFileIO f;
668 char buffer[64];
669
670 f.Open(node_id_file.GetPath(), CFileIO_Base::eOpen,
671 CFileIO_Base::eRead);
672 size_t n = f.Read(buffer, sizeof(buffer));
673
674 m_NodeID = string(buffer, n);
675 NStr::TruncateSpacesInPlace(m_NodeID, NStr::eTrunc_End);
676 f.Close();
677 } else {
678 // No file, need to be created
679 m_NodeID = "n" + x_GenerateGUID();
680
681 CFileIO f;
682 f.Open(node_id_file.GetPath(), CFileIO_Base::eCreate,
683 CFileIO_Base::eReadWrite);
684 f.Write(m_NodeID.data(), m_NodeID.size());
685 f.Close();
686 }
687 }
688
689
GetInstance(void)690 CNetScheduleServer* CNetScheduleServer::GetInstance(void)
691 {
692 return sm_netschedule_server;
693 }
694
695
Exit()696 void CNetScheduleServer::Exit()
697 {
698 // This method is called by the CServer::Run() when all the threads are
699 // shut down but the port is still posessed
700
701 // If it was a signal initiated shutdown then it is a good idea to log it.
702 // The signal number is set to non-zero value only in case of an OS signal
703 if (m_SigNum != 0) {
704 if (IsLog()) {
705 // Imitate the SHUTDOWN command logging with an extra information
706 // about the signal number
707 CRef<CRequestContext> ctx;
708 ctx.Reset(new CRequestContext());
709 ctx->SetRequestID();
710
711 CDiagContext & diag_context = GetDiagContext();
712 diag_context.SetRequestContext(ctx);
713 CDiagContext_Extra extra = diag_context.PrintRequestStart();
714
715 extra.Print("_type", "cmd")
716 .Print("_queue", "")
717 .Print("cmd", "SHUTDOWN")
718 .Print("signum", m_SigNum);
719 extra.Flush();
720
721 ctx->SetRequestStatus(200);
722 diag_context.PrintRequestStop();
723 ctx.Reset();
724 diag_context.SetRequestContext(NULL);
725 }
726 }
727
728 m_QueueDB->Close();
729 }
730
731
x_GenerateGUID(void) const732 string CNetScheduleServer::x_GenerateGUID(void) const
733 {
734 // Naive implementation of the unique identifier.
735 Int8 pid = CCurrentProcess::GetPid();
736 Int8 current_time = time(0);
737
738 return to_string((pid << 32) | current_time);
739 }
740
741
742 CJsonNode
x_SetFromList(const string & from,vector<string> & to,CRWLock & lock)743 CNetScheduleServer::x_SetFromList(const string & from,
744 vector<string> & to,
745 CRWLock & lock)
746 {
747 CJsonNode diff = CJsonNode::NewArrayNode();
748 CWriteLockGuard guard(lock);
749 vector<string> old = to;
750
751 to.clear();
752 NStr::Split(from, ";, \n\r", to,
753 NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
754 sort(to.begin(), to.end());
755
756 if (old != to) {
757 CJsonNode old_vals = CJsonNode::NewArrayNode();
758 CJsonNode new_vals = CJsonNode::NewArrayNode();
759
760 for (vector<string>::const_iterator k = old.begin();
761 k != old.end(); ++k)
762 old_vals.AppendString(*k);
763 for (vector<string>::const_iterator k = to.begin();
764 k != to.end(); ++k)
765 new_vals.AppendString(*k);
766
767 diff.Append(old_vals);
768 diff.Append(new_vals);
769 }
770 return diff;
771 }
772
773
774
SetRAMConfigFileChecksum(const string & checksum)775 void CNetScheduleServer::SetRAMConfigFileChecksum(const string & checksum)
776 {
777 m_RAMConfigFileChecksum = checksum;
778 }
779
780
SetDiskConfigFileChecksum(const string & checksum)781 void CNetScheduleServer::SetDiskConfigFileChecksum(const string & checksum)
782 {
783 m_DiskConfigFileChecksum = checksum;
784 }
785
786
GetPauseQueues(void) const787 map<string, int> CNetScheduleServer::GetPauseQueues(void) const
788 {
789 return m_QueueDB->GetPauseQueues();
790 }
791
792
GetRefuseSubmitQueues(void) const793 vector<string> CNetScheduleServer::GetRefuseSubmitQueues(void) const
794 {
795 return m_QueueDB->GetRefuseSubmitQueues();
796 }
797
798
GetDataPath(void) const799 string CNetScheduleServer::GetDataPath(void) const
800 {
801 return m_QueueDB->GetDataPath();
802 }
803