1 /*  $Id: ns_server.cpp 601644 2020-02-11 19:47:28Z satskyse $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Anatoliy Kuznetsov, Victor Joukov
27  *
28  * File Description: NetScheduler threaded server
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "ns_server.hpp"
35 #include "ns_ini_params.hpp"
36 #include "queue_database.hpp"
37 #include "ns_types.hpp"
38 
39 #include <corelib/request_ctx.hpp>
40 
41 USING_NCBI_SCOPE;
42 
43 
44 CNetScheduleServer* CNetScheduleServer::sm_netschedule_server = 0;
45 
46 //////////////////////////////////////////////////////////////////////////
47 /// NetScheduler threaded server implementation
CNetScheduleServer(const string & dbpath,bool diskless)48 CNetScheduleServer::CNetScheduleServer(const string &  dbpath,
49                                        bool  diskless)
50     : m_BackgroundHost(this),
51       m_Port(0),
52       m_HostNetAddr(0),
53       m_Shutdown(false),
54       m_SigNum(0),
55       m_InactivityTimeout(default_network_timeout),
56       m_QueueDB(NULL),
57       m_StartTime(GetFastLocalTime()),
58       m_LogFlag(default_is_log),
59       m_LogBatchEachJobFlag(default_log_batch_each_job),
60       m_LogNotificationThreadFlag(default_log_notification_thread),
61       m_LogCleaningThreadFlag(default_log_cleaning_thread),
62       m_LogExecutionWatcherThreadFlag(default_log_execution_watcher_thread),
63       m_LogStatisticsThreadFlag(default_log_statistics_thread),
64       m_RefuseSubmits(false),
65       m_UseHostname(default_use_hostname),
66       m_Diskless(diskless),
67       m_DBDrained(false),
68       m_DeleteBatchSize(default_del_batch_size),
69       m_MarkdelBatchSize(default_markdel_batch_size),
70       m_ScanBatchSize(default_scan_batch_size),
71       m_PurgeTimeout(default_purge_timeout),
72       m_StatInterval(default_stat_interval),
73       m_JobCountersInterval(default_job_counters_interval),
74       m_MaxClientData(default_max_client_data),
75       m_NodeID("not_initialized"),
76       m_SessionID("s" + x_GenerateGUID()),
77       m_StartIDs(dbpath, diskless),
78       m_AnybodyCanReconfigure(false),
79       m_ReserveDumpSpace(default_reserve_dump_space)
80 {
81     m_CurrentSubmitsCounter.Set(kSubmitCounterInitialValue);
82     sm_netschedule_server = this;
83 }
84 
85 
~CNetScheduleServer()86 CNetScheduleServer::~CNetScheduleServer()
87 {
88     delete m_QueueDB;
89 }
90 
91 
AddDefaultListener(IServer_ConnectionFactory * factory)92 void CNetScheduleServer::AddDefaultListener(IServer_ConnectionFactory* factory)
93 {
94     // port must be set before listening
95     _ASSERT(m_Port);
96     AddListener(factory, m_Port);
97 }
98 
99 
100 // Returns what was changed
SetNSParameters(const SNS_Parameters & params,bool limited)101 CJsonNode CNetScheduleServer::SetNSParameters(const SNS_Parameters &  params,
102                                               bool                    limited)
103 {
104     CJsonNode   what_changed = CJsonNode::NewObjectNode();
105     CJsonNode   changes = CJsonNode::NewObjectNode();
106     bool        new_val;
107 
108     if (m_LogFlag != params.is_log) {
109         CJsonNode       values = CJsonNode::NewArrayNode();
110         values.AppendBoolean(m_LogFlag);
111         values.AppendBoolean(params.is_log);
112         changes.SetByKey("log", values);
113     }
114     m_LogFlag = params.is_log;
115 
116     new_val = (params.log_batch_each_job && m_LogFlag);
117     if (m_LogBatchEachJobFlag != new_val) {
118         CJsonNode       values = CJsonNode::NewArrayNode();
119         values.AppendBoolean(m_LogBatchEachJobFlag);
120         values.AppendBoolean(new_val);
121         changes.SetByKey("log_batch_each_job", values);
122     }
123     m_LogBatchEachJobFlag = new_val;
124 
125     new_val = (params.log_notification_thread && m_LogFlag);
126     if (m_LogNotificationThreadFlag != new_val) {
127         CJsonNode       values = CJsonNode::NewArrayNode();
128         values.AppendBoolean(m_LogNotificationThreadFlag);
129         values.AppendBoolean(new_val);
130         changes.SetByKey("log_notification_thread", values);
131     }
132     m_LogNotificationThreadFlag = new_val;
133 
134     new_val = (params.log_cleaning_thread && m_LogFlag);
135     if (m_LogCleaningThreadFlag != new_val) {
136         CJsonNode       values = CJsonNode::NewArrayNode();
137         values.AppendBoolean(m_LogCleaningThreadFlag);
138         values.AppendBoolean(new_val);
139         changes.SetByKey("log_cleaning_thread", values);
140     }
141     m_LogCleaningThreadFlag = new_val;
142 
143     new_val = (params.log_execution_watcher_thread&& m_LogFlag);
144     if (m_LogExecutionWatcherThreadFlag != new_val) {
145         CJsonNode       values = CJsonNode::NewArrayNode();
146         values.AppendBoolean(m_LogExecutionWatcherThreadFlag);
147         values.AppendBoolean(new_val);
148         changes.SetByKey("log_execution_watcher_thread", values);
149     }
150     m_LogExecutionWatcherThreadFlag = new_val;
151 
152     new_val = (params.log_statistics_thread && m_LogFlag);
153     if (m_LogStatisticsThreadFlag != new_val) {
154         CJsonNode       values = CJsonNode::NewArrayNode();
155         values.AppendBoolean(m_LogStatisticsThreadFlag);
156         values.AppendBoolean(new_val);
157         changes.SetByKey("log_statistics_thread", values);
158     }
159     m_LogStatisticsThreadFlag = new_val;
160 
161     if (m_StatInterval != params.stat_interval) {
162         CJsonNode       values = CJsonNode::NewArrayNode();
163         values.AppendInteger(m_StatInterval);
164         values.AppendInteger(params.stat_interval);
165         changes.SetByKey("stat_interval", values);
166     }
167     m_StatInterval = params.stat_interval;
168 
169     if (m_JobCountersInterval != params.job_counters_interval) {
170         CJsonNode       values = CJsonNode::NewArrayNode();
171         values.AppendInteger(m_JobCountersInterval);
172         values.AppendInteger(params.job_counters_interval);
173         changes.SetByKey("job_counters_interval", values);
174     }
175     m_JobCountersInterval = params.job_counters_interval;
176 
177     if (m_MaxClientData != params.max_client_data) {
178         CJsonNode       values = CJsonNode::NewArrayNode();
179         values.AppendInteger(m_MaxClientData);
180         values.AppendInteger(params.max_client_data);
181         changes.SetByKey("max_client_data", values);
182     }
183     m_MaxClientData = params.max_client_data;
184 
185     CJsonNode   accepted_hosts = m_AdminHosts.SetHosts(params.admin_hosts);
186     if (accepted_hosts.GetSize() > 0)
187         changes.SetByKey("admin_host", accepted_hosts);
188 
189     CJsonNode   admin_diff = x_SetFromList(params.admin_client_names,
190                                            m_AdminClientNames,
191                                            m_AdminClientsLock);
192     if (admin_diff.GetSize() > 0)
193         changes.SetByKey("admin_client_name", admin_diff);
194 
195     CJsonNode   state_transition_perf_log_queues_diff =
196                         x_SetFromList(params.state_transition_perf_log_queues,
197                                       m_StateTransitionPerfLogQueues,
198                                       m_STPerfLogQCLock);
199     if (state_transition_perf_log_queues_diff.GetSize() > 0)
200         changes.SetByKey("state_transition_perf_log_queues",
201                          state_transition_perf_log_queues_diff);
202 
203     CJsonNode   state_transition_perf_log_classes_diff =
204                         x_SetFromList(params.state_transition_perf_log_classes,
205                                       m_StateTransitionPerfLogClasses,
206                                       m_STPerfLogQCLock);
207     if (state_transition_perf_log_classes_diff.GetSize() > 0)
208         changes.SetByKey("state_transition_perf_log_classes",
209                          state_transition_perf_log_classes_diff);
210 
211     if (m_InactivityTimeout != params.network_timeout) {
212         CJsonNode       values = CJsonNode::NewArrayNode();
213         values.AppendInteger(m_InactivityTimeout);
214         values.AppendInteger(params.network_timeout);
215         changes.SetByKey("network_timeout", values);
216     }
217     m_InactivityTimeout = params.network_timeout;
218 
219     if (m_ReserveDumpSpace != params.reserve_dump_space) {
220         CJsonNode       values = CJsonNode::NewArrayNode();
221         values.AppendInteger(m_ReserveDumpSpace);
222         values.AppendInteger(params.reserve_dump_space);
223         changes.SetByKey("reserve_dump_space", values);
224     }
225     m_ReserveDumpSpace = params.reserve_dump_space;
226 
227 
228     if (limited) {
229         // max_connections CServer parameter could be changed on the fly via
230         // the RECO command. So check if it was changed.
231         SServer_Parameters      current_params;
232         CServer::GetParameters(&current_params);
233         if (current_params.max_connections != params.max_connections) {
234             CJsonNode       values = CJsonNode::NewArrayNode();
235             values.AppendInteger(current_params.max_connections);
236             values.AppendInteger(params.max_connections);
237             changes.SetByKey("max_connections", values);
238 
239             current_params.max_connections = params.max_connections;
240             CServer::SetParameters(current_params);
241         }
242     }
243 
244 
245     if (changes.GetSize() > 0)
246         what_changed.SetByKey("server_changes", changes);
247 
248     #if defined(_DEBUG) && !defined(NDEBUG)
249     debug_fd_count = params.debug_fd_count;
250     debug_mem_count = params.debug_mem_count;
251     debug_write_delay = params.debug_write_delay;
252     debug_conn_drop_before_write = params.debug_conn_drop_before_write;
253     debug_conn_drop_after_write = params.debug_conn_drop_after_write;
254     debug_reply_with_garbage = params.debug_reply_with_garbage;
255     debug_garbage = params.debug_garbage;
256     #endif
257 
258     if (limited)
259         return what_changed;
260 
261 
262     CServer::SetParameters(params);
263     m_Port = params.port;
264     m_HostNetAddr = CSocketAPI::gethostbyname(kEmptyStr);
265     m_UseHostname = params.use_hostname;
266     if (m_UseHostname)
267         m_Host = CSocketAPI::gethostname();
268     else
269         m_Host = CSocketAPI::ntoa(m_HostNetAddr);
270 
271     // Purge related parameters
272     m_DeleteBatchSize = params.del_batch_size;
273     m_MarkdelBatchSize = params.markdel_batch_size;
274     m_ScanBatchSize = params.scan_batch_size;
275     m_PurgeTimeout = params.purge_timeout;
276 
277     m_AffRegistrySettings = params.affinity_reg;
278     m_GroupRegistrySettings = params.group_reg;
279     m_ScopeRegistrySettings = params.scope_reg;
280 
281     // The difference is required only at the stage of RECO.
282     // RECO always calls the function with the limited flag so there is no need
283     // to provide the difference here
284     return CJsonNode::NewObjectNode();
285 }
286 
287 
288 // Return: difference
289 // Note: it is always called after the configuration is validated so
290 //       no messages should be logged
ReadServicesConfig(const CNcbiRegistry & reg)291 CJsonNode CNetScheduleServer::ReadServicesConfig(const CNcbiRegistry &  reg)
292 {
293     CJsonNode               diff = CJsonNode::NewObjectNode();
294     const string            section = "service_to_queue";
295     map< string, string >   new_services;
296 
297     // Read the new list -- new alerts if so
298     list<string>            entries;
299     reg.EnumerateEntries(section, &entries);
300 
301 
302     for (list<string>::const_iterator  k = entries.begin();
303          k != entries.end(); ++k) {
304         string      service_name = *k;
305         string      qname = reg.Get("service_to_queue", service_name);
306         if (qname.empty())
307             continue;
308 
309         // Check that the queue name has been provided
310         if (!m_QueueDB->QueueExists(qname))
311             continue;
312 
313         // Config line is fine, memorize it
314         new_services[service_name] = qname;
315     }
316 
317     // Compare with the old list -- combine report string
318     CJsonNode   new_items = CJsonNode::NewArrayNode();
319     CJsonNode   deleted_items = CJsonNode::NewArrayNode();
320     CJsonNode   changed_items = CJsonNode::NewObjectNode();
321 
322     for (map< string, string>::const_iterator  k = new_services.begin();
323          k != new_services.end(); ++k) {
324         map< string, string>::const_iterator    found;
325         for (found = m_Services.begin(); found != m_Services.end(); ++found)
326             if (NStr::CompareNocase(found->first, k->first) == 0)
327                 break;
328 
329         if (found == m_Services.end()) {
330             new_items.AppendString(k->first);
331             continue;
332         }
333         if (found->second != k->second) {
334             CJsonNode       vals = CJsonNode::NewArrayNode();
335             vals.AppendString(found->second);
336             vals.AppendString(k->second);
337             changed_items.SetByKey(k->first, vals);
338         }
339     }
340 
341     for (map< string, string>::const_iterator  k = m_Services.begin();
342          k != m_Services.end(); ++k) {
343         map< string, string>::const_iterator    found;
344         for (found = new_services.begin(); found != new_services.end(); ++found)
345             if (NStr::CompareNocase(found->first, k->first) == 0)
346                 break;
347 
348         if (found == new_services.end())
349             deleted_items.AppendString(k->first);
350     }
351 
352 
353     if (new_items.GetSize() > 0)
354         diff.SetByKey("services_added", new_items);
355 
356     if (deleted_items.GetSize() > 0)
357         diff.SetByKey("services_deleted", deleted_items);
358 
359     if (changed_items.GetSize() > 0)
360         diff.SetByKey("services_changed", changed_items);
361 
362     // Set the current as the new one
363     CFastMutexGuard     guard(m_ServicesLock);
364     m_Services = new_services;
365     return diff;
366 }
367 
368 
ShutdownRequested(void)369 bool CNetScheduleServer::ShutdownRequested(void)
370 {
371     return m_Shutdown;
372 }
373 
374 
SetQueueDB(CQueueDataBase * qdb)375 void CNetScheduleServer::SetQueueDB(CQueueDataBase* qdb)
376 {
377     delete m_QueueDB;
378     m_QueueDB = qdb;
379 }
380 
381 
SetShutdownFlag(int signum,bool db_was_drained)382 void CNetScheduleServer::SetShutdownFlag(int  signum, bool  db_was_drained)
383 {
384     if (!m_Shutdown) {
385         m_Shutdown = true;
386         m_SigNum = signum;
387         m_DBDrained = db_was_drained;
388     }
389 }
390 
391 
392 // Queue handling
Configure(const IRegistry & reg,CJsonNode & diff)393 unsigned int  CNetScheduleServer::Configure(const IRegistry &  reg,
394                                             CJsonNode &        diff)
395 {
396     return m_QueueDB->Configure(reg, diff);
397 }
398 
CountActiveJobs() const399 unsigned CNetScheduleServer::CountActiveJobs() const
400 {
401     return m_QueueDB->CountActiveJobs();
402 }
403 
404 
OpenQueue(const std::string & name)405 CRef<CQueue> CNetScheduleServer::OpenQueue(const std::string& name)
406 {
407     return m_QueueDB->OpenQueue(name);
408 }
409 
410 
CreateDynamicQueue(const CNSClientId & client,const string & qname,const string & qclass,const string & description)411 void CNetScheduleServer::CreateDynamicQueue(const CNSClientId &  client,
412                                             const string &  qname,
413                                             const string &  qclass,
414                                             const string &  description)
415 {
416     m_QueueDB->CreateDynamicQueue(client, qname, qclass, description);
417 }
418 
419 
DeleteDynamicQueue(const CNSClientId & client,const std::string & qname)420 void CNetScheduleServer::DeleteDynamicQueue(const CNSClientId &  client,
421                                             const std::string& qname)
422 {
423     m_QueueDB->DeleteDynamicQueue(client, qname);
424 }
425 
426 
427 SQueueParameters
QueueInfo(const string & qname) const428 CNetScheduleServer::QueueInfo(const string &  qname) const
429 {
430     return m_QueueDB->QueueInfo(qname);
431 }
432 
433 
GetQueueNames(const string & sep) const434 std::string CNetScheduleServer::GetQueueNames(const string &  sep) const
435 {
436     return m_QueueDB->GetQueueNames(sep);
437 }
438 
439 
PrintTransitionCounters(void)440 string CNetScheduleServer::PrintTransitionCounters(void)
441 {
442     return m_QueueDB->PrintTransitionCounters();
443 }
444 
445 
PrintJobsStat(const CNSClientId & client)446 string CNetScheduleServer::PrintJobsStat(const CNSClientId &  client)
447 {
448     return m_QueueDB->PrintJobsStat(client);
449 }
450 
451 
GetQueueClassesInfo(void) const452 string CNetScheduleServer::GetQueueClassesInfo(void) const
453 {
454     return m_QueueDB->GetQueueClassesInfo();
455 }
456 
457 
GetQueueClassesConfig(void) const458 string CNetScheduleServer::GetQueueClassesConfig(void) const
459 {
460     return m_QueueDB->GetQueueClassesConfig();
461 }
462 
463 
GetQueueInfo(void) const464 string CNetScheduleServer::GetQueueInfo(void) const
465 {
466     return m_QueueDB->GetQueueInfo();
467 }
468 
469 
GetQueueConfig(void) const470 string CNetScheduleServer::GetQueueConfig(void) const
471 {
472     return m_QueueDB->GetQueueConfig();
473 }
474 
475 
GetLinkedSectionConfig(void) const476 string CNetScheduleServer::GetLinkedSectionConfig(void) const
477 {
478     return m_QueueDB->GetLinkedSectionConfig();
479 }
480 
481 
GetServiceToQueueSectionConfig(void) const482 string CNetScheduleServer::GetServiceToQueueSectionConfig(void) const
483 {
484     string                                  output;
485     map< string, string >::const_iterator   k;
486     CFastMutexGuard                         guard(m_ServicesLock);
487 
488     if (m_Services.empty())
489         return output;
490 
491     output = "\n[service_to_queue]";
492     for (k = m_Services.begin(); k != m_Services.end(); ++k) {
493         if (!output.empty())
494             output += "\n";
495         output += k->first + "=\"" + k->second + "\"";
496     }
497 
498     return output;
499 }
500 
501 
ResolveService(const string & service) const502 string CNetScheduleServer::ResolveService(const string &  service) const
503 {
504     map< string, string >::const_iterator   k;
505     CFastMutexGuard                         guard(m_ServicesLock);
506 
507     for (k = m_Services.begin(); k != m_Services.end(); ++k) {
508         if (NStr::CompareNocase(k->first, service) == 0)
509             return k->second;
510     }
511     return "";
512 }
513 
514 
GetServices(map<string,string> & services) const515 void CNetScheduleServer::GetServices(map<string, string> &  services) const
516 {
517     CFastMutexGuard     guard(m_ServicesLock);
518     services = m_Services;
519 }
520 
521 
AdminHostValid(unsigned host) const522 bool CNetScheduleServer::AdminHostValid(unsigned host) const
523 {
524     return m_AdminHosts.IsAllowed(host);
525 }
526 
527 
IsAdminClientName(const string & name) const528 bool CNetScheduleServer::IsAdminClientName(const string &  name) const
529 {
530     CReadLockGuard      guard(m_AdminClientsLock);
531 
532     for (vector<string>::const_iterator  k(m_AdminClientNames.begin());
533          k != m_AdminClientNames.end(); ++k)
534         if (*k == name)
535             return true;
536     return false;
537 }
538 
GetAdminClientNames(void) const539 string CNetScheduleServer::GetAdminClientNames(void) const
540 {
541     string              ret;
542     CReadLockGuard      guard(m_AdminClientsLock);
543 
544     for (vector<string>::const_iterator  k(m_AdminClientNames.begin());
545          k != m_AdminClientNames.end(); ++k) {
546         if (!ret.empty())
547             ret += ", ";
548         ret += *k;
549     }
550     return ret;
551 }
552 
553 
GetStateTransitionPerfLogQueues(void) const554 string CNetScheduleServer::GetStateTransitionPerfLogQueues(void) const
555 {
556     string              ret;
557     CReadLockGuard      guard(m_STPerfLogQCLock);
558 
559     for (vector<string>::const_iterator
560             k(m_StateTransitionPerfLogQueues.begin());
561             k != m_StateTransitionPerfLogQueues.end(); ++k) {
562         if (!ret.empty())
563             ret += ", ";
564         ret += *k;
565     }
566     return ret;
567 }
568 
569 
GetStateTransitionPerfLogClasses(void) const570 string CNetScheduleServer::GetStateTransitionPerfLogClasses(void) const
571 {
572     string              ret;
573     CReadLockGuard      guard(m_STPerfLogQCLock);
574 
575     for (vector<string>::const_iterator
576             k(m_StateTransitionPerfLogClasses.begin());
577             k != m_StateTransitionPerfLogClasses.end(); ++k) {
578         if (!ret.empty())
579             ret += ", ";
580         ret += *k;
581     }
582     return ret;
583 }
584 
585 
586 bool
ShouldPerfLogTransitions(const string & queue_name,const string & class_name) const587 CNetScheduleServer::ShouldPerfLogTransitions(const string &  queue_name,
588                                              const string &  class_name) const
589 {
590     CReadLockGuard      guard(m_STPerfLogQCLock);
591     for (vector<string>::const_iterator
592             k(m_StateTransitionPerfLogQueues.begin());
593             k != m_StateTransitionPerfLogQueues.end(); ++k) {
594         if (*k == "*")
595             return true;
596         if (NStr::CompareNocase(*k, queue_name) == 0)
597             return true;
598     }
599 
600     if (!class_name.empty()) {
601         for (vector<string>::const_iterator
602                 k(m_StateTransitionPerfLogClasses.begin());
603                 k != m_StateTransitionPerfLogClasses.end(); ++k) {
604             if (*k == "*")
605                 return true;
606             if (NStr::CompareNocase(*k, class_name) == 0)
607                 return true;
608         }
609     }
610 
611     return false;
612 }
613 
614 
GetAlerts(void) const615 string CNetScheduleServer::GetAlerts(void) const
616 {
617     return m_Alerts.GetURLEncoded();
618 }
619 
620 
SerializeAlerts(void) const621 string CNetScheduleServer::SerializeAlerts(void) const
622 {
623     return m_Alerts.Serialize();
624 }
625 
626 
627 enum EAlertAckResult
AcknowledgeAlert(const string & id,const string & user)628 CNetScheduleServer::AcknowledgeAlert(const string &  id,
629                                      const string &  user)
630 {
631     return m_Alerts.Acknowledge(id, user);
632 }
633 
634 
635 enum EAlertAckResult
AcknowledgeAlert(EAlertType alert_type,const string & user)636 CNetScheduleServer::AcknowledgeAlert(EAlertType      alert_type,
637                                      const string &  user)
638 {
639     return m_Alerts.Acknowledge(alert_type, user);
640 }
641 
642 
RegisterAlert(EAlertType alert_type,const string & message)643 void CNetScheduleServer::RegisterAlert(EAlertType  alert_type,
644                                        const string &  message)
645 {
646     m_Alerts.Register(alert_type, message);
647 }
648 
649 
650 // The method is called after the database is created or loaded.
651 // This guarantees that the directory is there.
652 // The file with an identifier could be read or created safely.
653 // Returns: true if everything is fine.
InitNodeID(const string & db_path)654 void CNetScheduleServer::InitNodeID(const string &  db_path)
655 {
656     if (m_Diskless) {
657         m_NodeID = "n" + x_GenerateGUID();
658         return;
659     }
660 
661     CFile   node_id_file(CFile::MakePath(
662                             CDirEntry::AddTrailingPathSeparator(db_path),
663                             kNodeIDFileName));
664 
665     if (node_id_file.Exists()) {
666         // File exists, read the ID from it
667         CFileIO     f;
668         char        buffer[64];
669 
670         f.Open(node_id_file.GetPath(), CFileIO_Base::eOpen,
671                                        CFileIO_Base::eRead);
672         size_t      n = f.Read(buffer, sizeof(buffer));
673 
674         m_NodeID = string(buffer, n);
675         NStr::TruncateSpacesInPlace(m_NodeID, NStr::eTrunc_End);
676         f.Close();
677     } else {
678         // No file, need to be created
679         m_NodeID = "n" + x_GenerateGUID();
680 
681         CFileIO     f;
682         f.Open(node_id_file.GetPath(), CFileIO_Base::eCreate,
683                                        CFileIO_Base::eReadWrite);
684         f.Write(m_NodeID.data(), m_NodeID.size());
685         f.Close();
686     }
687 }
688 
689 
GetInstance(void)690 CNetScheduleServer*  CNetScheduleServer::GetInstance(void)
691 {
692     return sm_netschedule_server;
693 }
694 
695 
Exit()696 void CNetScheduleServer::Exit()
697 {
698     // This method is called by the CServer::Run() when all the threads are
699     // shut down but the port is still posessed
700 
701     // If it was a signal initiated shutdown then it is a good idea to log it.
702     // The signal number is set to non-zero value only in case of an OS signal
703     if (m_SigNum != 0) {
704         if (IsLog()) {
705             // Imitate the SHUTDOWN command logging with an extra information
706             // about the signal number
707             CRef<CRequestContext>   ctx;
708             ctx.Reset(new CRequestContext());
709             ctx->SetRequestID();
710 
711             CDiagContext &      diag_context = GetDiagContext();
712             diag_context.SetRequestContext(ctx);
713             CDiagContext_Extra      extra = diag_context.PrintRequestStart();
714 
715             extra.Print("_type", "cmd")
716                  .Print("_queue", "")
717                  .Print("cmd", "SHUTDOWN")
718                  .Print("signum", m_SigNum);
719             extra.Flush();
720 
721             ctx->SetRequestStatus(200);
722             diag_context.PrintRequestStop();
723             ctx.Reset();
724             diag_context.SetRequestContext(NULL);
725         }
726     }
727 
728     m_QueueDB->Close();
729 }
730 
731 
x_GenerateGUID(void) const732 string  CNetScheduleServer::x_GenerateGUID(void) const
733 {
734     // Naive implementation of the unique identifier.
735     Int8        pid = CCurrentProcess::GetPid();
736     Int8        current_time = time(0);
737 
738     return to_string((pid << 32) | current_time);
739 }
740 
741 
742 CJsonNode
x_SetFromList(const string & from,vector<string> & to,CRWLock & lock)743 CNetScheduleServer::x_SetFromList(const string &  from,
744                                   vector<string> &  to,
745                                   CRWLock &  lock)
746 {
747     CJsonNode           diff = CJsonNode::NewArrayNode();
748     CWriteLockGuard     guard(lock);
749     vector<string>      old = to;
750 
751     to.clear();
752     NStr::Split(from, ";, \n\r", to,
753                 NStr::fSplit_MergeDelimiters | NStr::fSplit_Truncate);
754     sort(to.begin(), to.end());
755 
756     if (old != to) {
757         CJsonNode       old_vals = CJsonNode::NewArrayNode();
758         CJsonNode       new_vals = CJsonNode::NewArrayNode();
759 
760         for (vector<string>::const_iterator  k = old.begin();
761                 k != old.end(); ++k)
762             old_vals.AppendString(*k);
763         for (vector<string>::const_iterator  k = to.begin();
764                 k != to.end(); ++k)
765             new_vals.AppendString(*k);
766 
767         diff.Append(old_vals);
768         diff.Append(new_vals);
769     }
770     return diff;
771 }
772 
773 
774 
SetRAMConfigFileChecksum(const string & checksum)775 void CNetScheduleServer::SetRAMConfigFileChecksum(const string &  checksum)
776 {
777     m_RAMConfigFileChecksum = checksum;
778 }
779 
780 
SetDiskConfigFileChecksum(const string & checksum)781 void CNetScheduleServer::SetDiskConfigFileChecksum(const string &  checksum)
782 {
783     m_DiskConfigFileChecksum = checksum;
784 }
785 
786 
GetPauseQueues(void) const787 map<string, int> CNetScheduleServer::GetPauseQueues(void) const
788 {
789     return m_QueueDB->GetPauseQueues();
790 }
791 
792 
GetRefuseSubmitQueues(void) const793 vector<string> CNetScheduleServer::GetRefuseSubmitQueues(void) const
794 {
795     return m_QueueDB->GetRefuseSubmitQueues();
796 }
797 
798 
GetDataPath(void) const799 string CNetScheduleServer::GetDataPath(void) const
800 {
801     return m_QueueDB->GetDataPath();
802 }
803