1 /*  $Id: queue_database.cpp 601644 2020-02-11 19:47:28Z satskyse $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Anatoliy Kuznetsov, Victor Joukov
27  *
28  * File Description: NetSchedule queue collection and database managenement.
29  *
30  */
31 #include <ncbi_pch.hpp>
32 #include <unistd.h>
33 
34 #include <corelib/ncbi_system.hpp>
35 #include <corelib/ncbireg.hpp>
36 
37 #include <connect/services/netschedule_api.hpp>
38 #include <connect/ncbi_socket.hpp>
39 
40 #include <util/time_line.hpp>
41 
42 #include "queue_database.hpp"
43 #include "ns_util.hpp"
44 #include "netschedule_version.hpp"
45 #include "ns_server.hpp"
46 #include "ns_handler.hpp"
47 #include "ns_db_dump.hpp"
48 #include "ns_types.hpp"
49 #include "ns_restore_state.hpp"
50 
51 
52 BEGIN_NCBI_SCOPE
53 
54 
55 
56 /////////////////////////////////////////////////////////////////////////////
57 // CQueueDataBase implementation
58 
CQueueDataBase(CNetScheduleServer * server,const string & path,unsigned int max_queues,bool diskless,bool reinit)59 CQueueDataBase::CQueueDataBase(CNetScheduleServer *  server,
60                                const string &  path,
61                                unsigned int  max_queues,
62                                bool  diskless,
63                                bool  reinit)
64 : m_Host(server->GetBackgroundHost()),
65   m_MaxQueues(max_queues),
66   m_Diskless(diskless),
67   m_StopPurge(false),
68   m_FreeStatusMemCnt(0),
69   m_LastFreeMem(time(0)),
70   m_Server(server),
71   m_PurgeQueue(""),
72   m_PurgeStatusIndex(0),
73   m_PurgeJobScanned(0)
74 {
75     m_DataPath = CDirEntry::AddTrailingPathSeparator(path);
76     m_DumpPath = CDirEntry::AddTrailingPathSeparator(m_DataPath +
77                                                      kDumpSubdirName);
78 
79     // First, load the previous session start job IDs if file existed
80     // The diskless flag will be considered when IDs are loaded.
81     m_Server->LoadJobsStartIDs();
82 
83     // Load the content of the queue state files
84     map<string, int>    paused_queues = DeserializePauseState(m_DataPath,
85                                                               m_Diskless);
86     vector<string>      refuse_submit_queues =
87                                 DeserializeRefuseSubmitState(m_DataPath,
88                                                              m_Diskless);
89 
90     // Old instance data files are not needed (even if they survived)
91     if (!m_Diskless)
92         x_RemoveDataFiles();
93 
94     // Creates the queues from the ini file and loads jobs from the dump
95     x_Open(reinit);
96 
97     if (!m_Diskless) {
98         // Restore the queue state
99         x_RestorePauseState(paused_queues);
100         x_RestoreRefuseSubmitState(refuse_submit_queues);
101 
102         // The files with queue state could be broken or the queues may be removed
103         // from the configuration file so update the state files to keep them
104         // consistent with the up to date configuration
105         SerializePauseState(m_DataPath, GetPauseQueues());
106         SerializeRefuseSubmitState(m_DataPath, server->GetRefuseSubmits(),
107                                    GetRefuseSubmitQueues());
108     }
109 }
110 
111 
~CQueueDataBase()112 CQueueDataBase::~CQueueDataBase()
113 {
114     try {
115         Close();
116     } catch (...) {}
117 }
118 
119 
x_Open(bool reinit)120 void  CQueueDataBase::x_Open(bool  reinit)
121 {
122     // Checks preconditions and provides the final reinit value
123     // It sets alerts and throws exceptions if needed.
124     if (m_Diskless) {
125         reinit = false;
126     } else {
127         reinit = x_CheckOpenPreconditions(reinit);
128     }
129 
130     CDir    data_dir(m_DataPath);
131     if (reinit)
132         data_dir.Remove();
133     if (m_Diskless) {
134         // Try to remove th dir if it exists
135         if (data_dir.Exists()) {
136             ERR_POST(Warning << "The configuration has the [server]/diskless "
137                                 "value set to true but the previous run "
138                                 "directory is on the disk. It will be removed.");
139             if (!data_dir.Remove()) {
140                 ERR_POST(Critical << "Error removing the previous run data "
141                                      "directory " << m_DataPath);
142                 m_Server->RegisterAlert(eDataDirRemoveError,
143                                         "Error removing the previous run data "
144                                         "directory " + m_DataPath +
145                                         " (due to the cofiguration file sets "
146                                         "the [server]/diskless value to true)");
147             }
148         }
149     } else if (!data_dir.Exists()) {
150         data_dir.Create();
151     }
152 
153     // The initialization must be done before the queues are created but after
154     // the directory is possibly re-created
155     m_Server->InitNodeID(m_DataPath);
156 
157     // Detect what queues need to be loaded. It depends on the configuration
158     // file and on the dumped queues. It might be that the saved queues +
159     // the config file queues excced the configured max number of queues.
160     set<string, PNocase>    dump_static_queues;
161     map<string, string,
162         PNocase>            dump_dynamic_queues;    // qname -> qclass
163     TQueueParams            dump_queue_classes;
164 
165     if (!m_Diskless)
166         x_ReadDumpQueueDesrc(dump_static_queues, dump_dynamic_queues,
167                              dump_queue_classes);
168 
169     set<string, PNocase>    config_static_queues = x_GetConfigQueues();
170     string                  last_queue_load_error;
171     size_t                  queue_load_error_count = 0;
172 
173     // Exclude number of queues will be the static queues from the config
174     // plus the dumped dynamic queues
175     size_t      final_dynamic_count = 0;
176     for (map<string, string, PNocase>::const_iterator
177             k = dump_dynamic_queues.begin();
178             k != dump_dynamic_queues.end(); ++k)
179         if (config_static_queues.find(k->first) == config_static_queues.end())
180             ++final_dynamic_count;
181 
182     size_t      total_queues = final_dynamic_count +
183                                config_static_queues.size();
184     if (total_queues > m_MaxQueues) {
185         string  msg = "The initial number of queues on the server exceeds the "
186                       "configured max number of queues. Configured: " +
187                       to_string(m_MaxQueues) + ". Real: " +
188                       to_string(total_queues) + ". The limit will "
189                       "be extended to accomodate all the queues.";
190         LOG_POST(Note << msg);
191         m_Server->RegisterAlert(eMaxQueues, msg);
192         m_MaxQueues = total_queues;
193     }
194 
195     try {
196         // Here: we can start restoring what was saved. The first step is
197         //       to get the linked sections. Linked sections have two sources:
198         //       - config file
199         //       - dumped sections
200         // The config file sections must override the dumped ones
201         CJsonNode       unused_diff = CJsonNode::NewObjectNode();
202         x_ReadLinkedSections(CNcbiApplication::Instance()->GetConfig(),
203                              unused_diff);
204 
205         if (!m_Diskless)
206             x_AppendDumpLinkedSections();
207 
208         // Read the queue classes from the config file and append those which
209         // come from the dump
210         m_QueueClasses = x_ReadIniFileQueueClassDescriptions(
211                                     CNcbiApplication::Instance()->GetConfig());
212         for (TQueueParams::const_iterator  k = dump_queue_classes.begin();
213                 k != dump_queue_classes.end(); ++k) {
214             if (m_QueueClasses.find(k->first) == m_QueueClasses.end())
215                 m_QueueClasses[k->first] = k->second;
216         }
217 
218         // Read the queues from the config file
219         TQueueParams    queues_from_ini =
220                             x_ReadIniFileQueueDescriptions(
221                                     CNcbiApplication::Instance()->GetConfig(),
222                                     m_QueueClasses);
223         x_ConfigureQueues(queues_from_ini, unused_diff);
224 
225         // Add the queues from the dump
226         for (map<string, string, PNocase>::const_iterator
227                 k = dump_dynamic_queues.begin();
228                 k != dump_dynamic_queues.end(); ++k) {
229             string      qname = k->first;
230             if (config_static_queues.find(qname) != config_static_queues.end())
231                 continue;
232 
233             // Here: the dumped queue has not been changed from a dynamic one
234             //       to a static. So, it needs to be added.
235             string      qclass = k->second;
236 
237             SQueueParameters    params = m_QueueClasses[qclass];
238 
239             params.kind = CQueue::eKindDynamic;
240             params.delete_request = false;
241             params.qclass = qclass;
242 
243             // Lost parameter: description. The only dynamic queue classes are
244             // dumped so the description is lost.
245             // params.description = ...
246 
247             x_CreateAndMountQueue(qname, params);
248         }
249 
250         // All the structures are ready to upload the jobs from the dump
251         if (!m_Diskless) {
252             for (TQueueInfo::iterator  k = m_Queues.begin();
253                     k != m_Queues.end(); ++k) {
254                 try {
255                     unsigned int   records =
256                                         k->second.second->LoadFromDump(m_DumpPath);
257                     GetDiagContext().Extra()
258                         .Print("_type", "startup")
259                         .Print("_queue", k->first)
260                         .Print("info", "load_from_dump")
261                         .Print("records", records);
262                 } catch (const exception &  ex) {
263                     ERR_POST(ex.what());
264                     last_queue_load_error = ex.what();
265                     ++queue_load_error_count;
266                 } catch (...) {
267                     last_queue_load_error = "Unknown error loading queue " +
268                                             k->first + " from dump";
269                     ERR_POST(last_queue_load_error);
270                     ++queue_load_error_count;
271                 }
272             }
273         }
274     } catch (const exception &  ex) {
275         ERR_POST(ex.what());
276         last_queue_load_error = ex.what();
277         ++queue_load_error_count;
278     } catch (...) {
279         last_queue_load_error = "Unknown error loading queues from dump";
280         ERR_POST(last_queue_load_error);
281         ++queue_load_error_count;
282     }
283 
284     if (!m_Diskless) {
285         x_CreateCrashFlagFile();
286         x_CreateDumpErrorFlagFile();
287         x_CreateStorageVersionFile();
288 
289         // Serialize the start job IDs file if it was deleted during the
290         // database initialization. Even if it is overwriting an existing file
291         // there is no performance issue at this point (it's done once anyway).
292         m_Server->SerializeJobsStartIDs();
293     }
294 
295     if (queue_load_error_count > 0) {
296         m_Server->RegisterAlert(eDumpLoadError,
297                                 "There were error(s) loading the previous "
298                                 "instance dump. Number of errors: " +
299                                 to_string(queue_load_error_count) +
300                                 ". See log for all the loading errors. "
301                                 "Last error: " + last_queue_load_error);
302         if (!m_Diskless)
303             x_BackupDump();
304     } else {
305         if (!m_Diskless)
306             x_RemoveDump();
307     }
308 
309     if (!m_Diskless)
310         x_CreateSpaceReserveFile();
311 }
312 
313 
314 TQueueParams
x_ReadIniFileQueueClassDescriptions(const IRegistry & reg)315 CQueueDataBase::x_ReadIniFileQueueClassDescriptions(const IRegistry &  reg)
316 {
317     TQueueParams        queue_classes;
318     list<string>        sections;
319 
320     reg.EnumerateSections(&sections);
321 
322     ITERATE(list<string>, it, sections) {
323         string              queue_class;
324         string              prefix;
325         const string &      section_name = *it;
326 
327         NStr::SplitInTwo(section_name, "_", prefix, queue_class);
328         if (NStr::CompareNocase(prefix, "qclass") != 0)
329            continue;
330         if (queue_class.empty())
331             continue;
332         if (queue_class.size() > kMaxQueueNameSize - 1)
333             continue;
334 
335         // Warnings are ignored here. At this point they are not of interest
336         // because they have already been collected at the startup (allowed)
337         // or at RECO - a file with warnings is not allowed
338         SQueueParameters    params;
339         vector<string>      warnings;
340         if (params.ReadQueueClass(reg, section_name, warnings)) {
341             // false => problems with linked sections; see CXX-2617
342             // The same sections cannot appear twice
343             queue_classes[queue_class] = params;
344         }
345     }
346 
347     return queue_classes;
348 }
349 
350 
351 // Reads the queues from ini file and respects inheriting queue classes
352 // parameters
353 TQueueParams
x_ReadIniFileQueueDescriptions(const IRegistry & reg,const TQueueParams & classes)354 CQueueDataBase::x_ReadIniFileQueueDescriptions(const IRegistry &     reg,
355                                                const TQueueParams &  classes)
356 {
357     TQueueParams        queues;
358     list<string>        sections;
359 
360     reg.EnumerateSections(&sections);
361     ITERATE(list<string>, it, sections) {
362         string              queue_name;
363         string              prefix;
364         const string &      section_name = *it;
365 
366         NStr::SplitInTwo(section_name, "_", prefix, queue_name);
367         if (NStr::CompareNocase(prefix, "queue") != 0)
368            continue;
369         if (queue_name.empty())
370             continue;
371         if (queue_name.size() > kMaxQueueNameSize - 1)
372             continue;
373 
374         // Warnings are ignored here. At this point they are not of interest
375         // because they have already been collected at the startup (allowed)
376         // or at RECO - a file with warnings is not allowed
377         SQueueParameters    params;
378         vector<string>      warnings;
379 
380         if (params.ReadQueue(reg, section_name, classes, warnings)) {
381             // false => problems with linked sections; see CXX-2617
382             queues[queue_name] = params;
383         }
384     }
385 
386     return queues;
387 }
388 
389 
x_ReadLinkedSections(const IRegistry & reg,CJsonNode & diff)390 void  CQueueDataBase::x_ReadLinkedSections(const IRegistry &  reg,
391                                            CJsonNode &        diff)
392 {
393     // Read the new content
394     typedef map< string, map< string, string > >    section_container;
395     section_container   new_values;
396     list<string>        sections;
397     reg.EnumerateSections(&sections);
398 
399     ITERATE(list<string>, it, sections) {
400         string              queue_or_class;
401         string              prefix;
402         const string &      section_name = *it;
403 
404         NStr::SplitInTwo(section_name, "_", prefix, queue_or_class);
405         if (queue_or_class.empty())
406             continue;
407         if (NStr::CompareNocase(prefix, "qclass") != 0 &&
408             NStr::CompareNocase(prefix, "queue") != 0)
409             continue;
410         if (queue_or_class.size() > kMaxQueueNameSize - 1)
411             continue;
412 
413         list<string>    entries;
414         reg.EnumerateEntries(section_name, &entries);
415 
416         ITERATE(list<string>, k, entries) {
417             const string &  entry = *k;
418             string          ref_section = reg.GetString(section_name,
419                                                         entry, kEmptyStr);
420 
421             if (!NStr::StartsWith(entry, "linked_section_", NStr::eCase))
422                 continue;
423 
424             if (entry == "linked_section_")
425                 continue;   // Malformed values prefix
426 
427             if (ref_section.empty())
428                 continue;   // Malformed section name
429 
430             if (find(sections.begin(), sections.end(), ref_section) ==
431                                                             sections.end())
432                 continue;   // Non-existing section
433 
434             if (new_values.find(ref_section) != new_values.end())
435                 continue;   // Has already been read
436 
437             // Read the linked section values
438             list<string>    linked_section_entries;
439             reg.EnumerateEntries(ref_section, &linked_section_entries);
440             map<string, string> values;
441             for (list<string>::const_iterator j = linked_section_entries.begin();
442                  j != linked_section_entries.end(); ++j)
443                 values[*j] = reg.GetString(ref_section, *j, kEmptyStr);
444 
445             new_values[ref_section] = values;
446         }
447     }
448 
449     CFastMutexGuard     guard(m_LinkedSectionsGuard);
450 
451     // Identify those sections which were deleted
452     vector<string>  deleted;
453     for (section_container::const_iterator  k(m_LinkedSections.begin());
454          k != m_LinkedSections.end(); ++k)
455         if (new_values.find(k->first) == new_values.end())
456             deleted.push_back(k->first);
457 
458     if (!deleted.empty()) {
459         CJsonNode   deletedSections = CJsonNode::NewArrayNode();
460         for (vector<string>::const_iterator  k(deleted.begin());
461                 k != deleted.end(); ++k)
462             deletedSections.AppendString( *k );
463         diff.SetByKey( "linked_section_deleted", deletedSections );
464     }
465 
466     // Identify those sections which were added
467     vector<string>  added;
468     for (section_container::const_iterator  k(new_values.begin());
469          k != new_values.end(); ++k)
470         if (m_LinkedSections.find(k->first) == m_LinkedSections.end())
471             added.push_back(k->first);
472 
473     if (!added.empty()) {
474         CJsonNode   addedSections = CJsonNode::NewArrayNode();
475         for (vector<string>::const_iterator  k(added.begin());
476                 k != added.end(); ++k)
477             addedSections.AppendString( *k );
478         diff.SetByKey( "linked_section_added", addedSections );
479     }
480 
481     // Deal with changed sections: what was added/deleted/modified
482     vector<string>  changed;
483     for (section_container::const_iterator  k(new_values.begin());
484         k != new_values.end(); ++k) {
485         if (find(added.begin(), added.end(), k->first) != added.end())
486             continue;
487         if (new_values[k->first] == m_LinkedSections[k->first])
488             continue;
489         changed.push_back(k->first);
490     }
491 
492     if (!changed.empty()) {
493         CJsonNode       changedSections = CJsonNode::NewObjectNode();
494         for (vector<string>::const_iterator  k(changed.begin());
495                 k != changed.end(); ++k)
496             changedSections.SetByKey( *k,
497                         x_DetectChangesInLinkedSection( m_LinkedSections[*k],
498                                                         new_values[*k]) );
499         diff.SetByKey( "linked_section_changed", changedSections );
500     }
501 
502     // Finally, save the new configuration
503     m_LinkedSections = new_values;
504 }
505 
506 
507 CJsonNode
x_DetectChangesInLinkedSection(const map<string,string> & old_values,const map<string,string> & new_values)508 CQueueDataBase::x_DetectChangesInLinkedSection(
509                         const map<string, string> &  old_values,
510                         const map<string, string> &  new_values)
511 {
512     CJsonNode       diff = CJsonNode::NewObjectNode();
513 
514     // Deal with deleted items
515     vector<string>  deleted;
516     for (map<string, string>::const_iterator  k(old_values.begin());
517          k != old_values.end(); ++k)
518         if (new_values.find(k->first) == new_values.end())
519             deleted.push_back(k->first);
520     if (!deleted.empty()) {
521         CJsonNode   deletedValues = CJsonNode::NewArrayNode();
522         for (vector<string>::const_iterator  k(deleted.begin());
523                 k != deleted.end(); ++k)
524             deletedValues.AppendString( *k );
525         diff.SetByKey( "deleted", deletedValues );
526     }
527 
528     // Deal with added items
529     vector<string>  added;
530     for (map<string, string>::const_iterator  k(new_values.begin());
531          k != new_values.end(); ++k)
532         if (old_values.find(k->first) == old_values.end())
533             added.push_back(k->first);
534     if (!added.empty()) {
535         CJsonNode   addedValues = CJsonNode::NewArrayNode();
536         for (vector<string>::const_iterator  k(added.begin());
537                 k != added.end(); ++k)
538             addedValues.AppendString( *k );
539         diff.SetByKey( "added", addedValues );
540     }
541 
542     // Deal with changed values
543     vector<string>  changed;
544     for (map<string, string>::const_iterator  k(new_values.begin());
545          k != new_values.end(); ++k) {
546         if (old_values.find(k->first) == old_values.end())
547             continue;
548         if (old_values.find(k->first)->second ==
549             new_values.find(k->first)->second)
550             continue;
551         changed.push_back(k->first);
552     }
553     if (!changed.empty()) {
554         CJsonNode   changedValues = CJsonNode::NewObjectNode();
555         for (vector<string>::const_iterator  k(changed.begin());
556                 k != changed.end(); ++k) {
557             CJsonNode       values = CJsonNode::NewArrayNode();
558             values.AppendString( old_values.find(*k)->second );
559             values.AppendString( new_values.find(*k)->second );
560             changedValues.SetByKey( *k, values );
561         }
562         diff.SetByKey( "changed", changedValues );
563     }
564 
565     return diff;
566 }
567 
568 
569 // Validates the config from an ini file for the following:
570 // - a static queue redefines existed dynamic queue
571 void
x_ValidateConfiguration(const TQueueParams & queues_from_ini) const572 CQueueDataBase::x_ValidateConfiguration(
573                         const TQueueParams &  queues_from_ini) const
574 {
575     // Check that static queues do not mess with existing dynamic queues
576     for (TQueueParams::const_iterator  k = queues_from_ini.begin();
577          k != queues_from_ini.end(); ++k) {
578         TQueueInfo::const_iterator  existing = m_Queues.find(k->first);
579 
580         if (existing == m_Queues.end())
581             continue;
582         if (existing->second.first.kind == CQueue::eKindDynamic)
583             NCBI_THROW(CNetScheduleException, eInvalidParameter,
584                        "Configuration error. The queue '" + k->first +
585                        "' clashes with a currently existing "
586                        "dynamic queue of the same name.");
587     }
588 
589     // Config file is OK for the current configuration
590 }
591 
592 
593 unsigned int
x_CountQueuesToAdd(const TQueueParams & queues_from_ini) const594 CQueueDataBase::x_CountQueuesToAdd(const TQueueParams &  queues_from_ini) const
595 {
596     unsigned int        add_count = 0;
597 
598     for (TQueueParams::const_iterator  k = queues_from_ini.begin();
599          k != queues_from_ini.end(); ++k) {
600 
601         if (m_Queues.find(k->first) == m_Queues.end())
602             ++add_count;
603     }
604 
605     return add_count;
606 }
607 
608 
609 // Updates what is stored in memory.
610 // Forms the diff string. Tells if there were changes.
611 bool
x_ConfigureQueueClasses(const TQueueParams & classes_from_ini,CJsonNode & diff)612 CQueueDataBase::x_ConfigureQueueClasses(const TQueueParams &  classes_from_ini,
613                                         CJsonNode &           diff)
614 {
615     bool            has_changes = false;
616     vector<string>  classes;    // Used to store added and deleted classes
617 
618     // Delete from existed what was not found in the new classes
619     for (TQueueParams::iterator    k = m_QueueClasses.begin();
620          k != m_QueueClasses.end(); ++k) {
621         string      old_queue_class = k->first;
622 
623         if (classes_from_ini.find(old_queue_class) != classes_from_ini.end())
624             continue;
625 
626         // The queue class is not in the configuration any more however it
627         // still could be in use for a dynamic queue. Leave it for the GC
628         // to check if the class has no reference to it and delete it
629         // accordingly.
630         // So, just mark it as for removal.
631 
632         if (k->second.delete_request)
633             continue;   // Has already been marked for deletion
634 
635         k->second.delete_request = true;
636         classes.push_back(old_queue_class);
637     }
638 
639     if (!classes.empty()) {
640         has_changes = true;
641         CJsonNode       deleted_classes = CJsonNode::NewArrayNode();
642         for (vector<string>::const_iterator  k = classes.begin();
643                 k != classes.end(); ++k)
644             deleted_classes.AppendString( *k );
645         diff.SetByKey( "deleted_queue_classes", deleted_classes );
646     }
647 
648 
649     // Check the updates in the classes
650     classes.clear();
651 
652     CJsonNode           class_changes = CJsonNode::NewObjectNode();
653     for (TQueueParams::iterator    k = m_QueueClasses.begin();
654          k != m_QueueClasses.end(); ++k) {
655 
656         string                        queue_class = k->first;
657         TQueueParams::const_iterator  new_class =
658                                 classes_from_ini.find(queue_class);
659 
660         if (new_class == classes_from_ini.end())
661             continue;   // It is a candidate for deletion, so no diff
662 
663         // The same class found in the new configuration
664         if (k->second.delete_request) {
665             // The class was restored before GC deleted it. Update the flag
666             // and parameters
667             k->second = new_class->second;
668             classes.push_back(queue_class);
669             continue;
670         }
671 
672         // That's the same class which possibly was updated
673         // Do not compare class name here, this is a class itself
674         // Description should be compared
675         CJsonNode   class_diff = k->second.Diff(new_class->second,
676                                                 false, true);
677 
678         if (class_diff.GetSize() > 0) {
679             // There is a difference, update the class info
680             k->second = new_class->second;
681 
682             class_changes.SetByKey(queue_class, class_diff);
683             has_changes = true;
684         }
685     }
686     if (class_changes.GetSize() > 0)
687         diff.SetByKey("queue_class_changes", class_changes);
688 
689     // Check what was added
690     for (TQueueParams::const_iterator  k = classes_from_ini.begin();
691          k != classes_from_ini.end(); ++k) {
692         string      new_queue_class = k->first;
693 
694         if (m_QueueClasses.find(new_queue_class) == m_QueueClasses.end()) {
695             m_QueueClasses[new_queue_class] = k->second;
696             classes.push_back(new_queue_class);
697         }
698     }
699 
700     if (!classes.empty()) {
701         has_changes = true;
702         CJsonNode       added_classes = CJsonNode::NewArrayNode();
703         for (vector<string>::const_iterator  k = classes.begin();
704                 k != classes.end(); ++k)
705             added_classes.AppendString(*k);
706         diff.SetByKey("added_queue_classes", added_classes);
707     }
708 
709     return has_changes;
710 }
711 
712 
713 // Updates the queue info in memory and creates/marks for deletion
714 // queues if necessary.
715 bool
x_ConfigureQueues(const TQueueParams & queues_from_ini,CJsonNode & diff)716 CQueueDataBase::x_ConfigureQueues(const TQueueParams &  queues_from_ini,
717                                   CJsonNode &           diff)
718 {
719     bool            has_changes = false;
720     vector<string>  deleted_queues;
721 
722     // Mark for deletion what disappeared
723     for (TQueueInfo::iterator    k = m_Queues.begin();
724          k != m_Queues.end(); ++k) {
725         if (k->second.first.kind == CQueue::eKindDynamic)
726             continue;   // It's not the config business to deal
727                         // with dynamic queues
728 
729         string      old_queue = k->first;
730         if (queues_from_ini.find(old_queue) != queues_from_ini.end())
731             continue;
732 
733         // The queue is not in the configuration any more. It could
734         // still be in use or jobs could be still there. So mark it
735         // for deletion and forbid submits for the queue.
736         // GC will later delete it.
737 
738         if (k->second.first.delete_request)
739             continue;   // Has already been marked for deletion
740 
741         CRef<CQueue>    queue = k->second.second;
742         queue->SetRefuseSubmits(true);
743 
744         k->second.first.delete_request = true;
745         deleted_queues.push_back(k->first);
746     }
747 
748     if (!deleted_queues.empty()) {
749         has_changes = true;
750         CJsonNode       deleted = CJsonNode::NewArrayNode();
751         for (vector<string>::const_iterator  k = deleted_queues.begin();
752                 k != deleted_queues.end(); ++k)
753             deleted.AppendString(*k);
754         diff.SetByKey("deleted_queues", deleted);
755     }
756 
757 
758     // Check the updates in the queue parameters
759     vector< pair<string,
760                  string> >  added_queues;
761     CJsonNode               section_changes = CJsonNode::NewObjectNode();
762 
763     for (TQueueInfo::iterator    k = m_Queues.begin();
764          k != m_Queues.end(); ++k) {
765 
766         // The server configuration which affects the performance logging for
767         // queues could have been changed, so let the queues update the flag
768         k->second.second->UpdatePerfLoggingSettings(k->second.first.qclass);
769 
770 
771         if (k->second.first.kind == CQueue::eKindDynamic)
772             continue;   // It's not the config business to deal
773                         // with dynamic queues
774 
775         string                        queue_name = k->first;
776         TQueueParams::const_iterator  new_queue =
777                                         queues_from_ini.find(queue_name);
778 
779         if (new_queue == queues_from_ini.end())
780             continue;   // It is a candidate for deletion, or a dynamic queue;
781                         // So no diff
782 
783         // The same queue is in the new configuration
784         if (k->second.first.delete_request) {
785             // The queue was restored before GC deleted it. Update the flag,
786             // parameters and allows submits and update parameters if so.
787             CRef<CQueue>    queue = k->second.second;
788             queue->SetParameters(new_queue->second);
789             queue->SetRefuseSubmits(false);
790 
791             // The queue kind could not be changed here.
792             // The delete request is just checked.
793             k->second.first = new_queue->second;
794             added_queues.push_back(make_pair(queue_name,
795                                              k->second.first.qclass));
796             continue;
797         }
798 
799 
800         // That's the same queue which possibly was updated
801         // Class name should also be compared here
802         // Description should be compared here
803         CJsonNode   queue_diff = k->second.first.Diff(new_queue->second,
804                                                       true, true);
805 
806         if (queue_diff.GetSize() > 0) {
807             // There is a difference, update the queue info and the queue
808             CRef<CQueue>    queue = k->second.second;
809             queue->SetParameters(new_queue->second);
810 
811             // The queue kind could not be changed here.
812             // The queue delete request could not be changed here.
813             k->second.first = new_queue->second;
814 
815             section_changes.SetByKey(queue_name, queue_diff);
816             has_changes = true;
817         }
818     }
819 
820     // Check dynamic queues classes. They might be updated.
821     for (TQueueInfo::iterator    k = m_Queues.begin();
822          k != m_Queues.end(); ++k) {
823 
824         if (k->second.first.kind != CQueue::eKindDynamic)
825             continue;
826         if (k->second.first.delete_request == true)
827             continue;
828 
829         // OK, this is dynamic queue, alive and not set for deletion
830         // Check if its class parameters have been  updated/
831         TQueueParams::const_iterator    queue_class =
832                             m_QueueClasses.find(k->second.first.qclass);
833         if (queue_class == m_QueueClasses.end()) {
834             ERR_POST("Cannot find class '" + k->second.first.qclass +
835                      "' for dynamic queue '" + k->first +
836                      "'. Unexpected internal data inconsistency.");
837             continue;
838         }
839 
840         // Do not compare classes
841         // Do not compare description
842         // They do not make sense for dynamic queues because they are created
843         // with their own descriptions and the class does not have the 'class'
844         // field
845         CJsonNode   class_diff = k->second.first.Diff(queue_class->second,
846                                                       false, false);
847         if (class_diff.GetSize() > 0) {
848             // There is a difference in the queue class - update the
849             // parameters.
850             string      old_class = k->second.first.qclass;
851             string      old_description = k->second.first.description;
852 
853             CRef<CQueue>    queue = k->second.second;
854             queue->SetParameters(queue_class->second);
855 
856             k->second.first = queue_class->second;
857             k->second.first.qclass = old_class;
858             k->second.first.description = old_description;
859             k->second.first.kind = CQueue::eKindDynamic;
860 
861             section_changes.SetByKey(k->first, class_diff);
862             has_changes = true;
863         }
864     }
865 
866     if (section_changes.GetSize() > 0)
867         diff.SetByKey("queue_changes", section_changes);
868 
869 
870     // Check what was added
871     for (TQueueParams::const_iterator  k = queues_from_ini.begin();
872          k != queues_from_ini.end(); ++k) {
873         string      new_queue_name = k->first;
874 
875         if (m_Queues.find(new_queue_name) == m_Queues.end()) {
876             x_CreateAndMountQueue(new_queue_name, k->second);
877             added_queues.push_back(make_pair(new_queue_name, k->second.qclass));
878         }
879     }
880 
881     if (!added_queues.empty()) {
882         has_changes = true;
883         CJsonNode       added = CJsonNode::NewObjectNode();
884         for (vector< pair<string, string> >::const_iterator
885                 k = added_queues.begin();
886                 k != added_queues.end(); ++k)
887             added.SetByKey(k->first, CJsonNode::NewStringNode(k->second));
888         diff.SetByKey("added_queues", added);
889     }
890 
891     return has_changes;
892 }
893 
894 
Configure(const IRegistry & reg,CJsonNode & diff)895 time_t  CQueueDataBase::Configure(const IRegistry &  reg,
896                                   CJsonNode &        diff)
897 {
898     CFastMutexGuard     guard(m_ConfigureLock);
899 
900     // Read the configured queues and classes from the ini file
901     TQueueParams        classes_from_ini =
902                             x_ReadIniFileQueueClassDescriptions(reg);
903     TQueueParams        queues_from_ini =
904                             x_ReadIniFileQueueDescriptions(reg,
905                                                            classes_from_ini);
906 
907     // Validate basic consistency of the incoming configuration
908     x_ValidateConfiguration(queues_from_ini);
909 
910     x_ReadLinkedSections(reg, diff);
911 
912     // Check that the there are enough slots for the new queues if so
913     // configured
914     unsigned int        to_add_count = x_CountQueuesToAdd(queues_from_ini);
915     unsigned int        available_count = m_MaxQueues - m_Queues.size();
916 
917     if (to_add_count > available_count)
918         NCBI_THROW(CNetScheduleException, eInvalidParameter,
919                    "New configuration slots requirement: " +
920                    to_string(to_add_count) +
921                    ". Number of available slots: " +
922                    to_string(available_count) + ".");
923 
924     // Here: validation is finished. There is enough resources for the new
925     // configuration.
926     x_ConfigureQueueClasses(classes_from_ini, diff);
927     x_ConfigureQueues(queues_from_ini, diff);
928     return CalculateRuntimePrecision();
929 }
930 
931 
CalculateRuntimePrecision(void) const932 CNSPreciseTime CQueueDataBase::CalculateRuntimePrecision(void) const
933 {
934     // Calculate the new min_run_timeout: required at the time of loading
935     // NetSchedule and not used while reconfiguring on the fly
936     CNSPreciseTime      min_precision = kTimeNever;
937     for (TQueueParams::const_iterator  k = m_QueueClasses.begin();
938          k != m_QueueClasses.end(); ++k)
939         min_precision = std::min(min_precision,
940                                  k->second.CalculateRuntimePrecision());
941     for (TQueueInfo::const_iterator  k = m_Queues.begin();
942          k != m_Queues.end(); ++k)
943         min_precision = std::min(min_precision,
944                                  k->second.first.CalculateRuntimePrecision());
945     return min_precision;
946 }
947 
948 
CountActiveJobs(void) const949 unsigned int  CQueueDataBase::CountActiveJobs(void) const
950 {
951     unsigned int        cnt = 0;
952     CFastMutexGuard     guard(m_ConfigureLock);
953 
954     for (TQueueInfo::const_iterator  k = m_Queues.begin();
955          k != m_Queues.end(); ++k)
956         cnt += k->second.second->CountActiveJobs();
957 
958     return cnt;
959 }
960 
961 
CountAllJobs(void) const962 unsigned int  CQueueDataBase::CountAllJobs(void) const
963 {
964     unsigned int        cnt = 0;
965     CFastMutexGuard     guard(m_ConfigureLock);
966 
967     for (TQueueInfo::const_iterator  k = m_Queues.begin();
968          k != m_Queues.end(); ++k)
969         cnt += k->second.second->CountAllJobs();
970 
971     return cnt;
972 }
973 
974 
AnyJobs(void) const975 bool  CQueueDataBase::AnyJobs(void) const
976 {
977     CFastMutexGuard     guard(m_ConfigureLock);
978 
979     for (TQueueInfo::const_iterator  k = m_Queues.begin();
980          k != m_Queues.end(); ++k)
981         if (k->second.second->AnyJobs())
982             return true;
983 
984     return false;
985 }
986 
987 
OpenQueue(const string & name)988 CRef<CQueue> CQueueDataBase::OpenQueue(const string &  name)
989 {
990     CFastMutexGuard             guard(m_ConfigureLock);
991     TQueueInfo::const_iterator  found = m_Queues.find(name);
992 
993     if (found == m_Queues.end())
994         NCBI_THROW(CNetScheduleException, eUnknownQueue,
995                    "Queue '" + name + "' is not found.");
996 
997     return found->second.second;
998 }
999 
1000 
1001 void
x_CreateAndMountQueue(const string & qname,const SQueueParameters & params)1002 CQueueDataBase::x_CreateAndMountQueue(const string &            qname,
1003                                       const SQueueParameters &  params)
1004 {
1005     unique_ptr<CQueue>    q(new CQueue(qname, params.kind, m_Server, *this));
1006 
1007     q->Attach();
1008     q->SetParameters(params);
1009 
1010     m_Queues[qname] = make_pair(params, q.release());
1011 
1012     GetDiagContext().Extra()
1013         .Print("_type", "startup")
1014         .Print("_queue", qname)
1015         .Print("qclass", params.qclass)
1016         .Print("info", "mount");
1017 }
1018 
1019 
QueueExists(const string & qname) const1020 bool CQueueDataBase::QueueExists(const string &  qname) const
1021 {
1022     CFastMutexGuard     guard(m_ConfigureLock);
1023     return m_Queues.find(qname) != m_Queues.end();
1024 }
1025 
1026 
CreateDynamicQueue(const CNSClientId & client,const string & qname,const string & qclass,const string & description)1027 void CQueueDataBase::CreateDynamicQueue(const CNSClientId &  client,
1028                                         const string &  qname,
1029                                         const string &  qclass,
1030                                         const string &  description)
1031 {
1032     CFastMutexGuard     guard(m_ConfigureLock);
1033 
1034     // Queue name clashes
1035     if (m_Queues.find(qname) != m_Queues.end())
1036         NCBI_THROW(CNetScheduleException, eDuplicateName,
1037                    "Queue '" + qname + "' already exists.");
1038 
1039     // Queue class existance
1040     TQueueParams::const_iterator  queue_class = m_QueueClasses.find(qclass);
1041     if (queue_class == m_QueueClasses.end())
1042         NCBI_THROW(CNetScheduleException, eUnknownQueueClass,
1043                    "Queue class '" + qclass +
1044                    "' for queue '" + qname + "' is not found.");
1045 
1046     // And class is not marked for deletion
1047     if (queue_class->second.delete_request)
1048         NCBI_THROW(CNetScheduleException, eUnknownQueueClass,
1049                    "Queue class '" + qclass +
1050                    "' for queue '" + qname + "' is marked for deletion.");
1051 
1052     // Slot availability
1053     if ((int)m_MaxQueues - (int)m_Queues.size() <= 0)
1054         NCBI_THROW(CNetScheduleException, eUnknownQueue,
1055                    "Cannot allocate queue '" + qname +
1056                    "'. max_queues limit reached.");
1057 
1058     // submitter and program restrictions must be checked
1059     // for the class
1060     if (!client.IsAdmin()) {
1061         if (!queue_class->second.subm_hosts.empty()) {
1062             CNetScheduleAccessList  acl;
1063             acl.SetHosts(queue_class->second.subm_hosts);
1064             if (!acl.IsAllowed(client.GetAddress())) {
1065                 m_Server->RegisterAlert(eAccess, "submitter privileges required"
1066                                         " to create a dynamic queue");
1067                 NCBI_THROW(CNetScheduleException, eAccessDenied,
1068                            "Access denied: submitter privileges required");
1069             }
1070         }
1071         if (!queue_class->second.program_name.empty()) {
1072             CQueueClientInfoList    acl;
1073             bool                    ok = false;
1074 
1075             acl.AddClientInfo(queue_class->second.program_name);
1076             try {
1077                 CQueueClientInfo    auth_prog_info;
1078                 ParseVersionString(client.GetProgramName(),
1079                                    &auth_prog_info.client_name,
1080                                    &auth_prog_info.version_info);
1081                 ok = acl.IsMatchingClient(auth_prog_info);
1082             } catch (...) {
1083                 // Parsing errors
1084                 ok = false;
1085             }
1086 
1087             if (!ok) {
1088                 m_Server->RegisterAlert(eAccess, "program privileges required "
1089                                         "to create a dynamic queue");
1090                 NCBI_THROW(CNetScheduleException, eAccessDenied,
1091                            "Access denied: program privileges required");
1092             }
1093         }
1094     }
1095 
1096 
1097     // All the preconditions are met. Create the queue
1098     SQueueParameters    params = queue_class->second;
1099 
1100     params.kind = CQueue::eKindDynamic;
1101     params.delete_request = false;
1102     params.qclass = qclass;
1103     params.description = description;
1104 
1105     x_CreateAndMountQueue(qname, params);
1106 }
1107 
1108 
DeleteDynamicQueue(const CNSClientId & client,const string & qname)1109 void  CQueueDataBase::DeleteDynamicQueue(const CNSClientId &  client,
1110                                          const string &  qname)
1111 {
1112     CFastMutexGuard         guard(m_ConfigureLock);
1113     TQueueInfo::iterator    found_queue = m_Queues.find(qname);
1114 
1115     if (found_queue == m_Queues.end())
1116         NCBI_THROW(CNetScheduleException, eUnknownQueue,
1117                    "Queue '" + qname + "' is not found." );
1118 
1119     if (found_queue->second.first.kind != CQueue::eKindDynamic)
1120         NCBI_THROW(CNetScheduleException, eInvalidParameter,
1121                    "Queue '" + qname + "' is static and cannot be deleted.");
1122 
1123     // submitter and program restrictions must be checked
1124     CRef<CQueue>    queue = found_queue->second.second;
1125     if (!client.IsAdmin()) {
1126         if (!queue->IsSubmitAllowed(client.GetAddress()))
1127             NCBI_THROW(CNetScheduleException, eAccessDenied,
1128                        "Access denied: submitter privileges required");
1129         if (!queue->IsProgramAllowed(client.GetProgramName()))
1130             NCBI_THROW(CNetScheduleException, eAccessDenied,
1131                        "Access denied: program privileges required");
1132     }
1133 
1134     found_queue->second.first.delete_request = true;
1135     queue->SetRefuseSubmits(true);
1136 }
1137 
1138 
QueueInfo(const string & qname) const1139 SQueueParameters CQueueDataBase::QueueInfo(const string &  qname) const
1140 {
1141     CFastMutexGuard             guard(m_ConfigureLock);
1142     TQueueInfo::const_iterator  found_queue = m_Queues.find(qname);
1143 
1144     if (found_queue == m_Queues.end())
1145         NCBI_THROW(CNetScheduleException, eUnknownQueue,
1146                    "Queue '" + qname + "' is not found." );
1147 
1148     return x_SingleQueueInfo(found_queue);
1149 }
1150 
1151 
1152 /* Note: this member must be called under a lock, it's not thread safe */
1153 SQueueParameters
x_SingleQueueInfo(TQueueInfo::const_iterator found) const1154 CQueueDataBase::x_SingleQueueInfo(TQueueInfo::const_iterator  found) const
1155 {
1156     SQueueParameters    params = found->second.first;
1157 
1158     // The fields below are used as a transport.
1159     // Usually used by QINF2 and STAT QUEUES
1160     params.refuse_submits = found->second.second->GetRefuseSubmits();
1161     params.pause_status = found->second.second->GetPauseStatus();
1162     params.max_aff_slots = m_Server->GetAffRegistrySettings().max_records;
1163     params.aff_slots_used = found->second.second->GetAffSlotsUsed();
1164     params.max_group_slots = m_Server->GetGroupRegistrySettings().max_records;
1165     params.group_slots_used = found->second.second->GetGroupSlotsUsed();
1166     params.max_scope_slots = m_Server->GetScopeRegistrySettings().max_records;
1167     params.scope_slots_used = found->second.second->GetScopeSlotsUsed();
1168     params.clients = found->second.second->GetClientsCount();
1169     params.groups = found->second.second->GetGroupsCount();
1170     params.gc_backlog = found->second.second->GetGCBacklogCount();
1171     params.notif_count = found->second.second->GetNotifCount();
1172     return params;
1173 }
1174 
1175 
GetQueueNames(const string & sep) const1176 string CQueueDataBase::GetQueueNames(const string &  sep) const
1177 {
1178     string                      names;
1179     CFastMutexGuard             guard(m_ConfigureLock);
1180 
1181     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1182          k != m_Queues.end(); ++k)
1183         names += k->first + sep;
1184 
1185     return names;
1186 }
1187 
1188 
Close(void)1189 void CQueueDataBase::Close(void)
1190 {
1191     StopNotifThread();
1192     StopPurgeThread();
1193     StopServiceThread();
1194     StopExecutionWatcherThread();
1195 
1196     // Print the statistics counters last time
1197     if (m_Server->IsLogStatisticsThread()) {
1198         size_t      aff_count = 0;
1199 
1200         PrintStatistics(aff_count);
1201         CStatisticsCounters::PrintServerWide(aff_count);
1202     }
1203 
1204     if (m_Server->IsDrainShutdown() && m_Server->WasDBDrained()) {
1205         // That was a not interrupted drain shutdown so there is no
1206         // need to dump anything
1207         LOG_POST("Drained shutdown: the DB has been successfully drained");
1208         x_RemoveDumpErrorFlagFile();
1209     } else {
1210         // That was either:
1211         // - hard shutdown
1212         // - hard shutdown when drained shutdown is in process
1213         if (m_Server->IsDrainShutdown())
1214             ERR_POST(Warning <<
1215                      "Drained shutdown: DB draining has not been completed "
1216                      "when a hard shutdown is received. "
1217                      "Shutting down immediately.");
1218 
1219         // Dump all the queues/queue classes/queue parameters to flat files
1220         if (!m_Diskless)
1221             x_Dump();
1222 
1223         m_QueueClasses.clear();
1224 
1225         // CQueue objects destructors are called from here because the last
1226         // reference to the object has gone
1227         m_Queues.clear();
1228     }
1229 
1230     if (!m_Diskless) {
1231         x_RemoveDataFiles();
1232         x_RemoveCrashFlagFile();
1233     }
1234 }
1235 
1236 
PrintTransitionCounters(void)1237 string CQueueDataBase::PrintTransitionCounters(void)
1238 {
1239     string                      result;
1240     CFastMutexGuard             guard(m_ConfigureLock);
1241     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1242          k != m_Queues.end(); ++k)
1243         result += "OK:[queue " + k->first + "]\n" +
1244                   k->second.second->PrintTransitionCounters();
1245     return result;
1246 }
1247 
1248 
PrintJobsStat(const CNSClientId & client)1249 string CQueueDataBase::PrintJobsStat(const CNSClientId &  client)
1250 {
1251     string                      result;
1252     vector<string>              warnings;
1253     CFastMutexGuard             guard(m_ConfigureLock);
1254     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1255          k != m_Queues.end(); ++k)
1256         // Group and affinity tokens make no sense for the server,
1257         // so they are both "".
1258         result += "OK:[queue " + k->first + "]\n" +
1259                   k->second.second->PrintJobsStat(client, "", "", warnings);
1260     return result;
1261 }
1262 
1263 
GetQueueClassesInfo(void) const1264 string CQueueDataBase::GetQueueClassesInfo(void) const
1265 {
1266     string                  output;
1267     output.reserve(16384);
1268 
1269     CFastMutexGuard         guard(m_ConfigureLock);
1270 
1271     for (TQueueParams::const_iterator  k = m_QueueClasses.begin();
1272          k != m_QueueClasses.end(); ++k) {
1273         if (!output.empty())
1274             output.append(kNewLine);
1275 
1276         // false - not to include qclass
1277         // false - not URL encoded format
1278         output.append("OK:[qclass ")
1279               .append(k->first)
1280               .append(1, ']')
1281               .append(kNewLine)
1282               .append(k->second.GetPrintableParameters(false, false));
1283 
1284         for (map<string, string>::const_iterator
1285              j = k->second.linked_sections.begin();
1286              j != k->second.linked_sections.end(); ++j) {
1287             string  prefix((j->first).c_str() + strlen("linked_section_"));
1288             string  section_name = j->second;
1289 
1290             map<string, string> values = GetLinkedSection(section_name);
1291             for (map<string, string>::const_iterator m = values.begin();
1292                  m != values.end(); ++m)
1293                 output.append(kNewLine)
1294                       .append("OK:")
1295                       .append(prefix)
1296                       .append(1, '.')
1297                       .append(m->first)
1298                       .append(": ")
1299                       .append(m->second);
1300         }
1301     }
1302     return output;
1303 }
1304 
1305 
GetQueueClassesConfig(void) const1306 string CQueueDataBase::GetQueueClassesConfig(void) const
1307 {
1308     string              output;
1309     CFastMutexGuard     guard(m_ConfigureLock);
1310     for (TQueueParams::const_iterator  k = m_QueueClasses.begin();
1311          k != m_QueueClasses.end(); ++k) {
1312         if (!output.empty())
1313             output.append(kNewLine);
1314         output.append("[qclass_")
1315               .append(k->first)
1316               .append(1, ']')
1317               .append(kNewLine)
1318               .append(k->second.ConfigSection(true));
1319     }
1320     return output;
1321 }
1322 
1323 
GetQueueInfo(void) const1324 string CQueueDataBase::GetQueueInfo(void) const
1325 {
1326     string                  output;
1327     CFastMutexGuard         guard(m_ConfigureLock);
1328 
1329     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1330          k != m_Queues.end(); ++k) {
1331         if (!output.empty())
1332             output += "\n";
1333 
1334         // true - include qclass
1335         // false - not URL encoded format
1336         output += "OK:[queue " + k->first + "]\n" +
1337                   x_SingleQueueInfo(k).GetPrintableParameters(true, false);
1338 
1339         for (map<string, string>::const_iterator
1340              j = k->second.first.linked_sections.begin();
1341              j != k->second.first.linked_sections.end(); ++j) {
1342             string  prefix((j->first).c_str() + strlen("linked_section_"));
1343             string  section_name = j->second;
1344 
1345             map<string, string> values = GetLinkedSection(section_name);
1346             for (map<string, string>::const_iterator m = values.begin();
1347                  m != values.end(); ++m)
1348                 output += "\nOK:" + prefix + "." + m->first + ": " + m->second;
1349         }
1350     }
1351     return output;
1352 }
1353 
GetQueueConfig(void) const1354 string CQueueDataBase::GetQueueConfig(void) const
1355 {
1356     string              output;
1357     CFastMutexGuard     guard(m_ConfigureLock);
1358     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1359          k != m_Queues.end(); ++k) {
1360         if (!output.empty())
1361             output += "\n";
1362         output += "[queue_" + k->first + "]\n" +
1363                   k->second.first.ConfigSection(false);
1364     }
1365     return output;
1366 }
1367 
1368 
GetLinkedSectionConfig(void) const1369 string CQueueDataBase::GetLinkedSectionConfig(void) const
1370 {
1371     string              output;
1372     CFastMutexGuard     guard(m_LinkedSectionsGuard);
1373 
1374     for (map< string, map< string, string > >::const_iterator
1375             k = m_LinkedSections.begin();
1376             k != m_LinkedSections.end(); ++k) {
1377         if (!output.empty())
1378             output += "\n";
1379         output += "[" + k->first + "]\n";
1380         for (map< string, string >::const_iterator j = k->second.begin();
1381              j != k->second.end(); ++j) {
1382             output += j->first + "=\"" + j->second + "\"\n";
1383         }
1384     }
1385     return output;
1386 }
1387 
1388 
1389 map<string, string>
GetLinkedSection(const string & section_name) const1390 CQueueDataBase::GetLinkedSection(const string &  section_name) const
1391 {
1392     CFastMutexGuard     guard(m_LinkedSectionsGuard);
1393     map< string, map<string, string> >::const_iterator  found =
1394         m_LinkedSections.find(section_name);
1395     if (found == m_LinkedSections.end())
1396         return map<string, string>();
1397     return found->second;
1398 }
1399 
1400 
1401 map<string, int>
GetPauseQueues(void) const1402 CQueueDataBase::GetPauseQueues(void) const
1403 {
1404     map<string, int>    pause_states;
1405     CFastMutexGuard     guard(m_ConfigureLock);
1406 
1407     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1408          k != m_Queues.end(); ++k) {
1409         int  pause_state = k->second.second->GetPauseStatus();
1410         if (pause_state != CQueue::eNoPause)
1411             pause_states[k->first] = pause_state;
1412     }
1413     return pause_states;
1414 }
1415 
1416 
1417 vector<string>
GetRefuseSubmitQueues(void) const1418 CQueueDataBase::GetRefuseSubmitQueues(void) const
1419 {
1420     vector<string>      refuse_submit_queues;
1421     CFastMutexGuard     guard(m_ConfigureLock);
1422 
1423     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1424          k != m_Queues.end(); ++k) {
1425         if (k->second.second->GetRefuseSubmits())
1426             refuse_submit_queues.push_back(k->first);
1427     }
1428     return refuse_submit_queues;
1429 }
1430 
1431 
NotifyListeners(void)1432 void CQueueDataBase::NotifyListeners(void)
1433 {
1434     CNSPreciseTime  current_time = CNSPreciseTime::Current();
1435     for (unsigned int  index = 0; ; ++index) {
1436         CRef<CQueue>  queue = x_GetQueueAt(index);
1437         if (queue.IsNull())
1438             break;
1439         queue->NotifyListenersPeriodically(current_time);
1440     }
1441 }
1442 
1443 
PrintStatistics(size_t & aff_count)1444 void CQueueDataBase::PrintStatistics(size_t &  aff_count)
1445 {
1446     CFastMutexGuard             guard(m_ConfigureLock);
1447 
1448     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1449          k != m_Queues.end(); ++k)
1450         k->second.second->PrintStatistics(aff_count);
1451 }
1452 
1453 
PrintJobCounters(void)1454 void CQueueDataBase::PrintJobCounters(void)
1455 {
1456     CFastMutexGuard             guard(m_ConfigureLock);
1457 
1458     for (TQueueInfo::const_iterator  k = m_Queues.begin();
1459             k != m_Queues.end(); ++k)
1460         k->second.second->PrintJobCounters();
1461 }
1462 
1463 
CheckExecutionTimeout(bool logging)1464 void CQueueDataBase::CheckExecutionTimeout(bool  logging)
1465 {
1466     for (unsigned int  index = 0; ; ++index) {
1467         CRef<CQueue>  queue = x_GetQueueAt(index);
1468         if (queue.IsNull())
1469             break;
1470         queue->CheckExecutionTimeout(logging);
1471     }
1472 }
1473 
1474 
1475 // Checks if the queues marked for deletion could be really deleted
1476 // and deletes them if so. Deletes queue classes which are marked
1477 // for deletion if there are no links to them.
x_DeleteQueuesAndClasses(void)1478 void  CQueueDataBase::x_DeleteQueuesAndClasses(void)
1479 {
1480     // It's better to avoid quering the queues under a lock so
1481     // let's first build a list of CRefs to the candidate queues.
1482     list< pair< string, CRef< CQueue > > >    candidates;
1483 
1484     {{
1485         CFastMutexGuard             guard(m_ConfigureLock);
1486         for (TQueueInfo::const_iterator  k = m_Queues.begin();
1487              k != m_Queues.end(); ++k)
1488             if (k->second.first.delete_request)
1489                 candidates.push_back(make_pair(k->first, k->second.second));
1490     }}
1491 
1492     // Now the queues are queiried if they are empty without a lock
1493     list< pair< string, CRef< CQueue > > >::iterator
1494                                             k = candidates.begin();
1495     while (k != candidates.end()) {
1496         if (k->second->IsEmpty() == false)
1497             k = candidates.erase(k);
1498         else
1499             ++k;
1500     }
1501 
1502     if (candidates.empty())
1503         return;
1504 
1505     // Here we have a list of the queues which can be deleted
1506     // Take a lock and delete the queues plus check queue classes
1507     CFastMutexGuard             guard(m_ConfigureLock);
1508     for (k = candidates.begin(); k != candidates.end(); ++k) {
1509         // It's only here where a queue can be deleted so it's safe not
1510         // to check the iterator
1511         TQueueInfo::iterator    queue = m_Queues.find(k->first);
1512 
1513         // Deallocation of the DB block will be done later when the queue
1514         // is actually deleted
1515         // queue->second.second->MarkForTruncating();
1516         m_Queues.erase(queue);
1517     }
1518 
1519     // Now, while still holding the lock, let's check queue classes
1520     vector< string >    classes_to_delete;
1521     for (TQueueParams::const_iterator  j = m_QueueClasses.begin();
1522          j != m_QueueClasses.end(); ++j) {
1523         if (j->second.delete_request) {
1524             bool    in_use = false;
1525             for (TQueueInfo::const_iterator m = m_Queues.begin();
1526                 m != m_Queues.end(); ++m) {
1527                 if (m->second.first.qclass == j->first) {
1528                     in_use = true;
1529                     break;
1530                 }
1531             }
1532             if (in_use == false)
1533                 classes_to_delete.push_back(j->first);
1534         }
1535     }
1536 
1537     for (vector< string >::const_iterator  k = classes_to_delete.begin();
1538          k != classes_to_delete.end(); ++k) {
1539         // It's only here where a queue class can be deleted so
1540         // it's safe not  to check the iterator
1541         m_QueueClasses.erase(m_QueueClasses.find(*k));
1542     }
1543 }
1544 
1545 
1546 /* Data used in CQueueDataBase::Purge() only */
1547 static CNetScheduleAPI::EJobStatus statuses_to_delete_from[] = {
1548     CNetScheduleAPI::eFailed,
1549     CNetScheduleAPI::eCanceled,
1550     CNetScheduleAPI::eDone,
1551     CNetScheduleAPI::ePending,
1552     CNetScheduleAPI::eReadFailed,
1553     CNetScheduleAPI::eConfirmed
1554 };
1555 const size_t kStatusesSize = sizeof(statuses_to_delete_from) /
1556                              sizeof(CNetScheduleAPI::EJobStatus);
1557 
Purge(void)1558 void CQueueDataBase::Purge(void)
1559 {
1560     size_t              max_mark_deleted = m_Server->GetMarkdelBatchSize();
1561     size_t              max_scanned = m_Server->GetScanBatchSize();
1562     size_t              total_scanned = 0;
1563     size_t              total_mark_deleted = 0;
1564     CNSPreciseTime      current_time = CNSPreciseTime::Current();
1565     bool                limit_reached = false;
1566 
1567     // Cleanup the queues and classes if possible
1568     x_DeleteQueuesAndClasses();
1569 
1570     // Part I: from the last end till the end of the list
1571     CRef<CQueue>        start_queue = x_GetLastPurged();
1572     CRef<CQueue>        current_queue = start_queue;
1573     size_t              start_status_index = m_PurgeStatusIndex;
1574     unsigned int        start_job_id = m_PurgeJobScanned;
1575 
1576     while (current_queue.IsNull() == false) {
1577         m_PurgeQueue = current_queue->GetQueueName();
1578         if (x_PurgeQueue(current_queue.GetObject(),
1579                          m_PurgeStatusIndex, kStatusesSize - 1,
1580                          m_PurgeJobScanned, 0,
1581                          max_scanned, max_mark_deleted,
1582                          current_time,
1583                          total_scanned, total_mark_deleted) == true)
1584             return;
1585 
1586         if (total_mark_deleted >= max_mark_deleted ||
1587             total_scanned >= max_scanned) {
1588             limit_reached = true;
1589             break;
1590         }
1591         current_queue = x_GetNext(m_PurgeQueue);
1592     }
1593 
1594 
1595     // Part II: from the beginning of the list till the last end
1596     if (limit_reached == false) {
1597         current_queue = x_GetFirst();
1598         while (current_queue.IsNull() == false) {
1599             if (current_queue->GetQueueName() == start_queue->GetQueueName())
1600                 break;
1601 
1602             m_PurgeQueue = current_queue->GetQueueName();
1603             if (x_PurgeQueue(current_queue.GetObject(),
1604                              m_PurgeStatusIndex, kStatusesSize - 1,
1605                              m_PurgeJobScanned, 0,
1606                              max_scanned, max_mark_deleted,
1607                              current_time,
1608                              total_scanned, total_mark_deleted) == true)
1609                 return;
1610 
1611             if (total_mark_deleted >= max_mark_deleted ||
1612                 total_scanned >= max_scanned) {
1613                 limit_reached = true;
1614                 break;
1615             }
1616             current_queue = x_GetNext(m_PurgeQueue);
1617         }
1618     }
1619 
1620     // Part III: it might need to check the statuses in the queue we started
1621     // with if the start status was not the first one.
1622     if (limit_reached == false) {
1623         if (start_queue.IsNull() == false) {
1624             m_PurgeQueue = start_queue->GetQueueName();
1625             if (start_status_index > 0) {
1626                 if (x_PurgeQueue(start_queue.GetObject(),
1627                                  0, start_status_index - 1,
1628                                  m_PurgeJobScanned, 0,
1629                                  max_scanned, max_mark_deleted,
1630                                  current_time,
1631                                  total_scanned, total_mark_deleted) == true)
1632                     return;
1633             }
1634         }
1635     }
1636 
1637     if (limit_reached == false) {
1638         if (start_queue.IsNull() == false) {
1639             m_PurgeQueue = start_queue->GetQueueName();
1640             if (x_PurgeQueue(start_queue.GetObject(),
1641                              start_status_index, start_status_index,
1642                              0, start_job_id,
1643                              max_scanned, max_mark_deleted,
1644                              current_time,
1645                              total_scanned, total_mark_deleted) == true)
1646                 return;
1647         }
1648     }
1649 
1650 
1651     // Part IV: purge the found candidates and optimize the memory if required
1652     m_FreeStatusMemCnt += x_PurgeUnconditional();
1653 
1654     x_OptimizeStatusMatrix(current_time);
1655 }
1656 
1657 
x_GetLastPurged(void)1658 CRef<CQueue>  CQueueDataBase::x_GetLastPurged(void)
1659 {
1660     CFastMutexGuard             guard(m_ConfigureLock);
1661 
1662     if (m_PurgeQueue.empty()) {
1663         if (m_Queues.empty())
1664             return CRef<CQueue>(NULL);
1665         return m_Queues.begin()->second.second;
1666     }
1667 
1668     for (TQueueInfo::iterator  it = m_Queues.begin();
1669          it != m_Queues.end(); ++it)
1670         if (it->first == m_PurgeQueue)
1671             return it->second.second;
1672 
1673     // Not found, which means the queue was deleted. Pick a random one
1674     m_PurgeStatusIndex = 0;
1675     m_PurgeJobScanned = 0;
1676 
1677     int     queue_num = ((rand() * 1.0) / RAND_MAX) * m_Queues.size();
1678     int     k = 1;
1679     for (TQueueInfo::iterator  it = m_Queues.begin();
1680          it != m_Queues.end(); ++it) {
1681         if (k >= queue_num)
1682             return it->second.second;
1683         ++k;
1684     }
1685 
1686     // Cannot happened, so just be consistent with the return value
1687     return m_Queues.begin()->second.second;
1688 }
1689 
1690 
x_GetFirst(void)1691 CRef<CQueue>  CQueueDataBase::x_GetFirst(void)
1692 {
1693     CFastMutexGuard             guard(m_ConfigureLock);
1694 
1695     if (m_Queues.empty())
1696         return CRef<CQueue>(NULL);
1697     return m_Queues.begin()->second.second;
1698 }
1699 
1700 
x_GetNext(const string & current_name)1701 CRef<CQueue>  CQueueDataBase::x_GetNext(const string &  current_name)
1702 {
1703     CFastMutexGuard             guard(m_ConfigureLock);
1704 
1705     if (m_Queues.empty())
1706         return CRef<CQueue>(NULL);
1707 
1708     for (TQueueInfo::iterator  it = m_Queues.begin();
1709          it != m_Queues.end(); ++it) {
1710         if (it->first == current_name) {
1711             ++it;
1712             if (it == m_Queues.end())
1713                 return CRef<CQueue>(NULL);
1714             return it->second.second;
1715         }
1716     }
1717 
1718     // May not really happen. Let's have just in case.
1719     return CRef<CQueue>(NULL);
1720 }
1721 
1722 
1723 // Purges jobs from a queue starting from the given status.
1724 // Returns true if the purge should be stopped.
1725 // The status argument is a status to start from
x_PurgeQueue(CQueue & queue,size_t status,size_t status_to_end,unsigned int start_job_id,unsigned int end_job_id,size_t max_scanned,size_t max_mark_deleted,const CNSPreciseTime & current_time,size_t & total_scanned,size_t & total_mark_deleted)1726 bool  CQueueDataBase::x_PurgeQueue(CQueue &                queue,
1727                                    size_t                  status,
1728                                    size_t                  status_to_end,
1729                                    unsigned int            start_job_id,
1730                                    unsigned int            end_job_id,
1731                                    size_t                  max_scanned,
1732                                    size_t                  max_mark_deleted,
1733                                    const CNSPreciseTime &  current_time,
1734                                    size_t &                total_scanned,
1735                                    size_t &                total_mark_deleted)
1736 {
1737     SPurgeAttributes    purge_io;
1738 
1739     for (; status <= status_to_end; ++status) {
1740         purge_io.scans = max_scanned - total_scanned;
1741         purge_io.deleted = max_mark_deleted - total_mark_deleted;
1742         purge_io.job_id = start_job_id;
1743 
1744         purge_io = queue.CheckJobsExpiry(current_time, purge_io,
1745                                          end_job_id,
1746                                          statuses_to_delete_from[status]);
1747         total_scanned += purge_io.scans;
1748         total_mark_deleted += purge_io.deleted;
1749         m_PurgeJobScanned = purge_io.job_id;
1750 
1751         if (x_CheckStopPurge())
1752             return true;
1753 
1754         if (total_mark_deleted >= max_mark_deleted ||
1755             total_scanned >= max_scanned) {
1756             m_PurgeStatusIndex = status;
1757             return false;
1758         }
1759     }
1760     m_PurgeStatusIndex = 0;
1761     m_PurgeJobScanned = 0;
1762     return false;
1763 }
1764 
1765 
x_CreateCrashFlagFile(void)1766 void  CQueueDataBase::x_CreateCrashFlagFile(void)
1767 {
1768     try {
1769         CFile       crash_file(CFile::MakePath(m_DataPath,
1770                                                kCrashFlagFileName));
1771         if (!crash_file.Exists()) {
1772             CFileIO     f;
1773             f.Open(CFile::MakePath(m_DataPath, kCrashFlagFileName),
1774                    CFileIO_Base::eCreate,
1775                    CFileIO_Base::eReadWrite);
1776             f.Close();
1777         }
1778     }
1779     catch (...) {
1780         ERR_POST("Error creating crash detection file.");
1781     }
1782 }
1783 
1784 
x_RemoveCrashFlagFile(void)1785 void  CQueueDataBase::x_RemoveCrashFlagFile(void)
1786 {
1787     try {
1788         CFile       crash_file(CFile::MakePath(m_DataPath,
1789                                                kCrashFlagFileName));
1790         if (crash_file.Exists())
1791             crash_file.Remove();
1792     }
1793     catch (...) {
1794         ERR_POST("Error removing crash detection file. When the server "
1795                  "restarts it will re-initialize the database.");
1796     }
1797 }
1798 
1799 
x_DoesCrashFlagFileExist(void) const1800 bool  CQueueDataBase::x_DoesCrashFlagFileExist(void) const
1801 {
1802     return CFile(CFile::MakePath(m_DataPath, kCrashFlagFileName)).Exists();
1803 }
1804 
1805 
x_CreateDumpErrorFlagFile(void)1806 void  CQueueDataBase::x_CreateDumpErrorFlagFile(void)
1807 {
1808     try {
1809         CFile       crash_file(CFile::MakePath(m_DataPath,
1810                                                kDumpErrorFlagFileName));
1811         if (!crash_file.Exists()) {
1812             CFileIO     f;
1813             f.Open(CFile::MakePath(m_DataPath, kDumpErrorFlagFileName),
1814                    CFileIO_Base::eCreate,
1815                    CFileIO_Base::eReadWrite);
1816             f.Close();
1817         }
1818     }
1819     catch (...) {
1820         ERR_POST("Error creating dump error  detection file.");
1821     }
1822 }
1823 
1824 
x_DoesDumpErrorFlagFileExist(void) const1825 bool  CQueueDataBase::x_DoesDumpErrorFlagFileExist(void) const
1826 {
1827     return CFile(CFile::MakePath(m_DataPath, kDumpErrorFlagFileName)).Exists();
1828 }
1829 
1830 
x_RemoveDumpErrorFlagFile(void)1831 void  CQueueDataBase::x_RemoveDumpErrorFlagFile(void)
1832 {
1833     if (!m_Diskless) {
1834         try {
1835             CFile       crash_file(CFile::MakePath(m_DataPath,
1836                                                    kDumpErrorFlagFileName));
1837             if (crash_file.Exists())
1838                 crash_file.Remove();
1839         }
1840         catch (...) {
1841             ERR_POST("Error removing dump error detection file. When the server "
1842                      "restarts it will set an alert.");
1843         }
1844     }
1845 }
1846 
1847 
x_PurgeUnconditional(void)1848 unsigned int  CQueueDataBase::x_PurgeUnconditional(void) {
1849     // Purge unconditional jobs
1850     unsigned int        del_rec = 0;
1851     unsigned int        max_deleted = m_Server->GetDeleteBatchSize();
1852 
1853 
1854     for (unsigned int  index = 0; ; ++index) {
1855         CRef<CQueue>  queue = x_GetQueueAt(index);
1856         if (queue.IsNull())
1857             break;
1858         del_rec += queue->DeleteBatch(max_deleted - del_rec);
1859         if (del_rec >= max_deleted)
1860             break;
1861     }
1862     return del_rec;
1863 }
1864 
1865 
1866 void
x_OptimizeStatusMatrix(const CNSPreciseTime & current_time)1867 CQueueDataBase::x_OptimizeStatusMatrix(const CNSPreciseTime &  current_time)
1868 {
1869     // optimize memory every 15 min. or after 1 million of deleted records
1870     static const int        kMemFree_Delay = 15 * 60;
1871     static const unsigned   kRecordThreshold = 1000000;
1872 
1873     if ((m_FreeStatusMemCnt > kRecordThreshold) ||
1874         (m_LastFreeMem + kMemFree_Delay <= current_time)) {
1875         m_FreeStatusMemCnt = 0;
1876         m_LastFreeMem = current_time;
1877 
1878         for (unsigned int  index = 0; ; ++index) {
1879             CRef<CQueue>  queue = x_GetQueueAt(index);
1880             if (queue.IsNull())
1881                 break;
1882             queue->OptimizeMem();
1883             if (x_CheckStopPurge())
1884                 break;
1885         }
1886     }
1887 }
1888 
1889 
StopPurge(void)1890 void CQueueDataBase::StopPurge(void)
1891 {
1892     // No need to guard, this operation is thread safe
1893     m_StopPurge = true;
1894 }
1895 
1896 
x_CheckStopPurge(void)1897 bool CQueueDataBase::x_CheckStopPurge(void)
1898 {
1899     CFastMutexGuard     guard(m_PurgeLock);
1900     bool                stop_purge = m_StopPurge;
1901 
1902     m_StopPurge = false;
1903     return stop_purge;
1904 }
1905 
1906 
RunPurgeThread(void)1907 void CQueueDataBase::RunPurgeThread(void)
1908 {
1909     double              purge_timeout = m_Server->GetPurgeTimeout();
1910     unsigned int        sec_delay = purge_timeout;
1911     unsigned int        nanosec_delay = (purge_timeout - sec_delay)*1000000000;
1912 
1913     m_PurgeThread.Reset(new CJobQueueCleanerThread(
1914                                 m_Host, *this, sec_delay, nanosec_delay,
1915                                 m_Server->IsLogCleaningThread()));
1916     m_PurgeThread->Run();
1917 }
1918 
1919 
StopPurgeThread(void)1920 void CQueueDataBase::StopPurgeThread(void)
1921 {
1922     if (!m_PurgeThread.Empty()) {
1923         StopPurge();
1924         m_PurgeThread->RequestStop();
1925         m_PurgeThread->Join();
1926         m_PurgeThread.Reset(0);
1927     }
1928 }
1929 
1930 
PurgeAffinities(void)1931 void CQueueDataBase::PurgeAffinities(void)
1932 {
1933     for (unsigned int  index = 0; ; ++index) {
1934         CRef<CQueue>  queue = x_GetQueueAt(index);
1935         if (queue.IsNull())
1936             break;
1937         queue->PurgeAffinities();
1938         if (x_CheckStopPurge())
1939             break;
1940     }
1941 }
1942 
1943 
PurgeGroups(void)1944 void CQueueDataBase::PurgeGroups(void)
1945 {
1946     for (unsigned int  index = 0; ; ++index) {
1947         CRef<CQueue>  queue = x_GetQueueAt(index);
1948         if (queue.IsNull())
1949             break;
1950         queue->PurgeGroups();
1951         if (x_CheckStopPurge())
1952             break;
1953     }
1954 }
1955 
1956 
StaleWNodes(void)1957 void CQueueDataBase::StaleWNodes(void)
1958 {
1959     // Worker nodes have the last access time in seconds since 1970
1960     // so there is no need to purge them more often than once a second
1961     static CNSPreciseTime   last_purge(0, 0);
1962     CNSPreciseTime          current_time = CNSPreciseTime::Current();
1963 
1964     if (current_time.Sec() == last_purge.Sec())
1965         return;
1966     last_purge = current_time;
1967 
1968     for (unsigned int  index = 0; ; ++index) {
1969         CRef<CQueue>  queue = x_GetQueueAt(index);
1970         if (queue.IsNull())
1971             break;
1972         queue->StaleNodes(current_time);
1973         if (x_CheckStopPurge())
1974             break;
1975     }
1976 }
1977 
1978 
PurgeBlacklistedJobs(void)1979 void CQueueDataBase::PurgeBlacklistedJobs(void)
1980 {
1981     static CNSPreciseTime   period(30, 0);
1982     static CNSPreciseTime   last_time(0, 0);
1983     CNSPreciseTime          current_time = CNSPreciseTime::Current();
1984 
1985     // Run this check once in ten seconds
1986     if (current_time - last_time < period)
1987         return;
1988 
1989     last_time = current_time;
1990 
1991     for (unsigned int  index = 0; ; ++index) {
1992         CRef<CQueue>  queue = x_GetQueueAt(index);
1993         if (queue.IsNull())
1994             break;
1995         queue->PurgeBlacklistedJobs();
1996         if (x_CheckStopPurge())
1997             break;
1998     }
1999 }
2000 
2001 
PurgeClientRegistry(void)2002 void CQueueDataBase::PurgeClientRegistry(void)
2003 {
2004     static CNSPreciseTime   period(5, 0);
2005     static CNSPreciseTime   last_time(0, 0);
2006     CNSPreciseTime          current_time = CNSPreciseTime::Current();
2007 
2008     // Run this check once in five seconds
2009     if (current_time - last_time < period)
2010         return;
2011 
2012     last_time = current_time;
2013 
2014     for (unsigned int  index = 0; ; ++index) {
2015         CRef<CQueue>  queue = x_GetQueueAt(index);
2016         if (queue.IsNull())
2017             break;
2018         queue->PurgeClientRegistry(current_time);
2019         if (x_CheckStopPurge())
2020             break;
2021     }
2022 }
2023 
2024 
2025 // Safely provides a queue at the given index
x_GetQueueAt(unsigned int index)2026 CRef<CQueue>  CQueueDataBase::x_GetQueueAt(unsigned int  index)
2027 {
2028     unsigned int                current_index = 0;
2029     CFastMutexGuard             guard(m_ConfigureLock);
2030 
2031     for (TQueueInfo::iterator  k = m_Queues.begin();
2032          k != m_Queues.end(); ++k) {
2033         if (current_index == index)
2034             return k->second.second;
2035         ++current_index;
2036     }
2037     return CRef<CQueue>(NULL);
2038 }
2039 
2040 
RunNotifThread(void)2041 void CQueueDataBase::RunNotifThread(void)
2042 {
2043     // 10 times per second
2044     m_NotifThread.Reset(new CGetJobNotificationThread(
2045                                 *this, 0, 100000000,
2046                                 m_Server->IsLogNotificationThread()));
2047     m_NotifThread->Run();
2048 }
2049 
2050 
StopNotifThread(void)2051 void CQueueDataBase::StopNotifThread(void)
2052 {
2053     if (!m_NotifThread.Empty()) {
2054         m_NotifThread->RequestStop();
2055         m_NotifThread->Join();
2056         m_NotifThread.Reset(0);
2057     }
2058 }
2059 
2060 
WakeupNotifThread(void)2061 void CQueueDataBase::WakeupNotifThread(void)
2062 {
2063     if (!m_NotifThread.Empty())
2064         m_NotifThread->WakeUp();
2065 }
2066 
2067 
2068 CNSPreciseTime
SendExactNotifications(void)2069 CQueueDataBase::SendExactNotifications(void)
2070 {
2071     CNSPreciseTime      next = kTimeNever;
2072     CNSPreciseTime      from_queue;
2073 
2074     for (unsigned int  index = 0; ; ++index) {
2075         CRef<CQueue>  queue = x_GetQueueAt(index);
2076         if (queue.IsNull())
2077             break;
2078         from_queue = queue->NotifyExactListeners();
2079         if (from_queue < next )
2080             next = from_queue;
2081     }
2082     return next;
2083 }
2084 
2085 
RunServiceThread(void)2086 void CQueueDataBase::RunServiceThread(void)
2087 {
2088     // Once in 100 seconds
2089     m_ServiceThread.Reset(new CServiceThread(
2090                                 *m_Server, m_Host, *this,
2091                                 m_Server->IsLogStatisticsThread(),
2092                                 m_Server->GetStatInterval(),
2093                                 m_Server->GetJobCountersInterval()));
2094     m_ServiceThread->Run();
2095 }
2096 
2097 
StopServiceThread(void)2098 void CQueueDataBase::StopServiceThread(void)
2099 {
2100     if (!m_ServiceThread.Empty()) {
2101         m_ServiceThread->RequestStop();
2102         m_ServiceThread->Join();
2103         m_ServiceThread.Reset(0);
2104     }
2105 }
2106 
2107 
RunExecutionWatcherThread(const CNSPreciseTime & run_delay)2108 void CQueueDataBase::RunExecutionWatcherThread(const CNSPreciseTime & run_delay)
2109 {
2110     m_ExeWatchThread.Reset(new CJobQueueExecutionWatcherThread(
2111                                     m_Host, *this,
2112                                     run_delay.Sec(), run_delay.NSec(),
2113                                     m_Server->IsLogExecutionWatcherThread()));
2114     m_ExeWatchThread->Run();
2115 }
2116 
2117 
StopExecutionWatcherThread(void)2118 void CQueueDataBase::StopExecutionWatcherThread(void)
2119 {
2120     if (!m_ExeWatchThread.Empty()) {
2121         m_ExeWatchThread->RequestStop();
2122         m_ExeWatchThread->Join();
2123         m_ExeWatchThread.Reset(0);
2124     }
2125 }
2126 
2127 
x_Dump()2128 void CQueueDataBase::x_Dump()
2129 {
2130     LOG_POST(Note << "Start dumping jobs");
2131 
2132     // Create the directory if needed
2133     CDir        dump_dir(m_DumpPath);
2134     if (!dump_dir.Exists())
2135         dump_dir.Create();
2136 
2137     // Remove the file which reserves the disk space
2138     x_RemoveSpaceReserveFile();
2139 
2140     // Walk all the queues and dump them
2141     // Note: no need for a lock because it is a shutdown time
2142     bool                dump_error = false;
2143     set<string>         dumped_queues;
2144     const string        lbsm_test_queue("LBSMDTestQueue");
2145     for (TQueueInfo::iterator  k = m_Queues.begin();
2146             k != m_Queues.end(); ++k) {
2147         if (NStr::CompareNocase(k->first, lbsm_test_queue) != 0) {
2148             try {
2149                 k->second.second->Dump(m_DumpPath);
2150                 dumped_queues.insert(k->first);
2151             } catch (const exception &  ex) {
2152                 dump_error = true;
2153                 ERR_POST("Error dumping queue " << k->first << ": " <<
2154                          ex.what());
2155             }
2156         }
2157     }
2158 
2159     // Dump the required queue classes. The only classes required are those
2160     // which were used by dynamic queues. The dynamic queue classes may also
2161     // use linked sections
2162     set<string>     classes_to_dump;
2163     set<string>     linked_sections_to_dump;
2164     set<string>     dynamic_queues_to_dump;
2165     for (TQueueInfo::iterator  k = m_Queues.begin();
2166             k != m_Queues.end(); ++k) {
2167         if (NStr::CompareNocase(k->first, lbsm_test_queue) == 0)
2168             continue;
2169         if (k->second.second->GetQueueKind() == CQueue::eKindStatic)
2170             continue;
2171         if (dumped_queues.find(k->first) == dumped_queues.end())
2172             continue;   // There was a dumping error
2173 
2174         classes_to_dump.insert(k->second.first.qclass);
2175         for (map<string, string>::const_iterator
2176                 j = k->second.first.linked_sections.begin();
2177                 j != k->second.first.linked_sections.end(); ++j)
2178             linked_sections_to_dump.insert(j->second);
2179         dynamic_queues_to_dump.insert(k->first);
2180     }
2181 
2182 
2183     // Dump classes if so and linked sections if so
2184     if (!classes_to_dump.empty()) {
2185         string      qclasses_dump_file_name = m_DumpPath +
2186                                             kQClassDescriptionFileName;
2187         string      linked_sections_dump_file_name = m_DumpPath +
2188                                             kLinkedSectionsFileName;
2189         FILE *      qclasses_dump_file = NULL;
2190         FILE *      linked_sections_dump_file = NULL;
2191         try {
2192             qclasses_dump_file = fopen(qclasses_dump_file_name.c_str(), "wb");
2193             if (qclasses_dump_file == NULL)
2194                 throw runtime_error("Cannot open file " +
2195                                     qclasses_dump_file_name);
2196             setbuf(qclasses_dump_file, NULL);
2197 
2198             if (classes_to_dump.size() > 0 ||
2199                 dynamic_queues_to_dump.size() > 0) {
2200                 SOneStructDumpHeader    header;
2201                 header.fixed_size = sizeof(SQueueDescriptionDump);
2202                 header.Write(qclasses_dump_file);
2203             }
2204 
2205             // Dump dynamic queue classes
2206             for (set<string>::const_iterator  k = classes_to_dump.begin();
2207                     k != classes_to_dump.end(); ++k) {
2208                 TQueueParams::const_iterator  queue_class =
2209                                                     m_QueueClasses.find(*k);
2210                 x_DumpQueueOrClass(qclasses_dump_file, "", *k, false,
2211                                    queue_class->second);
2212             }
2213 
2214             // Dump dynamic queues: qname and its class.
2215             for (set<string>::const_iterator
2216                     k = dynamic_queues_to_dump.begin();
2217                     k != dynamic_queues_to_dump.end(); ++k) {
2218                 TQueueInfo::const_iterator  q = m_Queues.find(*k);
2219                 x_DumpQueueOrClass(qclasses_dump_file, *k,
2220                                    q->second.first.qclass, true,
2221                                    q->second.first);
2222             }
2223 
2224             fclose(qclasses_dump_file);
2225             qclasses_dump_file = NULL;
2226 
2227             // Dump linked sections if so
2228             if (!linked_sections_to_dump.empty()) {
2229                 linked_sections_dump_file = fopen(
2230                                 linked_sections_dump_file_name.c_str(), "wb");
2231                 if (linked_sections_dump_file == NULL)
2232                     throw runtime_error("Cannot open file " +
2233                                          linked_sections_dump_file_name);
2234                 setbuf(linked_sections_dump_file, NULL);
2235 
2236                 SOneStructDumpHeader    header;
2237                 header.fixed_size = sizeof(SLinkedSectionDump);
2238                 header.Write(linked_sections_dump_file);
2239 
2240                 for (set<string>::const_iterator
2241                         k = linked_sections_to_dump.begin();
2242                         k != linked_sections_to_dump.end(); ++k) {
2243                     map<string, map<string, string> >::const_iterator
2244                         j = m_LinkedSections.find(*k);
2245                     x_DumpLinkedSection(linked_sections_dump_file, *k,
2246                                         j->second);
2247                 }
2248                 fclose(linked_sections_dump_file);
2249                 linked_sections_dump_file = NULL;
2250             }
2251         } catch (const exception &  ex) {
2252             dump_error = true;
2253             ERR_POST("Error dumping dynamic queue classes and "
2254                      "their linked sections. Dynamic queue dumps are lost.");
2255             if (qclasses_dump_file != NULL)
2256                 fclose(qclasses_dump_file);
2257             if (linked_sections_dump_file != NULL)
2258                 fclose(linked_sections_dump_file);
2259 
2260             // Remove the classes and linked sections files
2261             if (access(qclasses_dump_file_name.c_str(), F_OK) != -1)
2262                 remove(qclasses_dump_file_name.c_str());
2263             if (access(linked_sections_dump_file_name.c_str(), F_OK) != -1)
2264                 remove(linked_sections_dump_file_name.c_str());
2265 
2266             // Remove dynamic queues dumps
2267             for (set<string>::const_iterator
2268                     k = dynamic_queues_to_dump.begin();
2269                     k != dynamic_queues_to_dump.end(); ++k) {
2270                 m_Queues[*k].second->RemoveDump(m_DumpPath);
2271             }
2272         }
2273     }
2274 
2275     if (!dump_error)
2276         x_RemoveDumpErrorFlagFile();
2277 
2278     LOG_POST(Note << "Dumping jobs finished");
2279 }
2280 
2281 
x_DumpQueueOrClass(FILE * f,const string & qname,const string & qclass,bool is_queue,const SQueueParameters & params)2282 void CQueueDataBase::x_DumpQueueOrClass(FILE *  f,
2283                                         const string &  qname,
2284                                         const string &  qclass,
2285                                         bool  is_queue,
2286                                         const SQueueParameters &  params)
2287 {
2288     SQueueDescriptionDump       descr_dump;
2289 
2290     descr_dump.is_queue = is_queue;
2291     descr_dump.qname_size = qname.size();
2292     memcpy(descr_dump.qname, qname.data(), qname.size());
2293     descr_dump.qclass_size = qclass.size();
2294     memcpy(descr_dump.qclass, qclass.data(), qclass.size());
2295 
2296     if (!is_queue) {
2297         // The other parameters are required for the queue classes only
2298         descr_dump.timeout = (double)params.timeout;
2299         descr_dump.notif_hifreq_interval = (double)params.notif_hifreq_interval;
2300         descr_dump.notif_hifreq_period = (double)params.notif_hifreq_period;
2301         descr_dump.notif_lofreq_mult = params.notif_lofreq_mult;
2302         descr_dump.notif_handicap = (double)params.notif_handicap;
2303         descr_dump.dump_buffer_size = params.dump_buffer_size;
2304         descr_dump.dump_client_buffer_size = params.dump_client_buffer_size;
2305         descr_dump.dump_aff_buffer_size = params.dump_aff_buffer_size;
2306         descr_dump.dump_group_buffer_size = params.dump_group_buffer_size;
2307         descr_dump.run_timeout = (double)params.run_timeout;
2308         descr_dump.read_timeout = (double)params.read_timeout;
2309         descr_dump.program_name_size = params.program_name.size();
2310         memcpy(descr_dump.program_name, params.program_name.data(),
2311                                         params.program_name.size());
2312         descr_dump.failed_retries = params.failed_retries;
2313         descr_dump.read_failed_retries = params.read_failed_retries;
2314         descr_dump.max_jobs_per_client = params.max_jobs_per_client;
2315         descr_dump.blacklist_time = (double)params.blacklist_time;
2316         descr_dump.read_blacklist_time = (double)params.read_blacklist_time;
2317         descr_dump.max_input_size = params.max_input_size;
2318         descr_dump.max_output_size = params.max_output_size;
2319         descr_dump.subm_hosts_size = params.subm_hosts.size();
2320         memcpy(descr_dump.subm_hosts, params.subm_hosts.data(),
2321                                       params.subm_hosts.size());
2322         descr_dump.wnode_hosts_size = params.wnode_hosts.size();
2323         memcpy(descr_dump.wnode_hosts, params.wnode_hosts.data(),
2324                                        params.wnode_hosts.size());
2325         descr_dump.reader_hosts_size = params.reader_hosts.size();
2326         memcpy(descr_dump.reader_hosts, params.reader_hosts.data(),
2327                                         params.reader_hosts.size());
2328         descr_dump.wnode_timeout = (double)params.wnode_timeout;
2329         descr_dump.reader_timeout = (double)params.reader_timeout;
2330         descr_dump.pending_timeout = (double)params.pending_timeout;
2331         descr_dump.max_pending_wait_timeout =
2332                                 (double)params.max_pending_wait_timeout;
2333         descr_dump.max_pending_read_wait_timeout =
2334                                 (double)params.max_pending_read_wait_timeout;
2335         descr_dump.description_size = params.description.size();
2336         memcpy(descr_dump.description, params.description.data(),
2337                                        params.description.size());
2338         descr_dump.scramble_job_keys = params.scramble_job_keys;
2339         descr_dump.client_registry_timeout_worker_node =
2340                         (double)params.client_registry_timeout_worker_node;
2341         descr_dump.client_registry_min_worker_nodes =
2342                         params.client_registry_min_worker_nodes;
2343         descr_dump.client_registry_timeout_admin =
2344                         (double)params.client_registry_timeout_admin;
2345         descr_dump.client_registry_min_admins =
2346                         params.client_registry_min_admins;
2347         descr_dump.client_registry_timeout_submitter =
2348                         (double)params.client_registry_timeout_submitter;
2349         descr_dump.client_registry_min_submitters =
2350                         params.client_registry_min_submitters;
2351         descr_dump.client_registry_timeout_reader =
2352                         (double)params.client_registry_timeout_reader;
2353         descr_dump.client_registry_min_readers =
2354                         params.client_registry_min_readers;
2355         descr_dump.client_registry_timeout_unknown =
2356                         (double)params.client_registry_timeout_unknown;
2357         descr_dump.client_registry_min_unknowns =
2358                         params.client_registry_min_unknowns;
2359 
2360         // Dump the linked sections prefixes and names in the same order
2361         string      prefixes;
2362         string      names;
2363         for (map<string, string>::const_iterator
2364                 k = params.linked_sections.begin();
2365                 k != params.linked_sections.end(); ++k) {
2366             if (!prefixes.empty()) {
2367                 prefixes += ",";
2368                 names += ",";
2369             }
2370             prefixes += k->first;
2371             names += k->second;
2372         }
2373         descr_dump.linked_section_prefixes_size = prefixes.size();
2374         memcpy(descr_dump.linked_section_prefixes, prefixes.data(),
2375                                                    prefixes.size());
2376         descr_dump.linked_section_names_size = names.size();
2377         memcpy(descr_dump.linked_section_names, names.data(), names.size());
2378     }
2379 
2380     try {
2381         descr_dump.Write(f);
2382     } catch (const exception &  ex) {
2383         string      msg = "Writing error while dumping queue ";
2384         if (is_queue)
2385             msg += qname;
2386         else
2387             msg += "class " + qclass;
2388         msg += string(": ") + ex.what();
2389         throw runtime_error(msg);
2390     }
2391 }
2392 
2393 
x_DumpLinkedSection(FILE * f,const string & sname,const map<string,string> & values)2394 void CQueueDataBase::x_DumpLinkedSection(FILE *  f, const string &  sname,
2395                                          const map<string, string> &  values)
2396 {
2397     for (map<string, string>::const_iterator  k = values.begin();
2398             k != values.end(); ++k) {
2399         SLinkedSectionDump      section_dump;
2400 
2401         section_dump.section_size = sname.size();
2402         memcpy(section_dump.section, sname.data(), sname.size());
2403         section_dump.value_name_size = k->first.size();
2404         memcpy(section_dump.value_name, k->first.data(), k->first.size());
2405         section_dump.value_size = k->second.size();
2406         memcpy(section_dump.value, k->second.data(), k->second.size());
2407 
2408         try {
2409             section_dump.Write(f);
2410         } catch (const exception &  ex) {
2411             throw runtime_error("Writing error while dumping linked section " +
2412                                 sname + " values: " + ex.what());
2413         }
2414     }
2415 }
2416 
2417 
x_RemoveDump(void)2418 void CQueueDataBase::x_RemoveDump(void)
2419 {
2420     try {
2421         CDir    dump_dir(m_DumpPath);
2422         if (dump_dir.Exists())
2423             dump_dir.Remove();
2424     } catch (const exception &  ex) {
2425         ERR_POST("Error removing the dump directory: " << ex.what());
2426     } catch (...) {
2427         ERR_POST("Unknown error removing the dump directory");
2428     }
2429 }
2430 
2431 
2432 // Removes unnecessary files in the data directory
x_RemoveDataFiles(void)2433 void CQueueDataBase::x_RemoveDataFiles(void)
2434 {
2435     CDir        data_dir(m_DataPath);
2436     if (!data_dir.Exists())
2437         return;
2438 
2439     CDir::TEntries      entries = data_dir.GetEntries(
2440                                     kEmptyStr, CDir::fIgnoreRecursive);
2441     for (CDir::TEntries::const_iterator  k = entries.begin();
2442             k != entries.end(); ++k) {
2443         if ((*k)->IsDir())
2444             continue;
2445         if ((*k)->IsLink())
2446             continue;
2447         string      entryName = (*k)->GetName();
2448         if (entryName == kDBStorageVersionFileName ||
2449             entryName == kNodeIDFileName ||
2450             entryName == kStartJobIDsFileName ||
2451             entryName == kCrashFlagFileName ||
2452             entryName == kDumpErrorFlagFileName ||
2453             entryName == kPausedQueuesFilesName ||
2454             entryName == kRefuseSubmitFileName)
2455             continue;
2456 
2457         CFile   f(m_DataPath + entryName);
2458         try {
2459             f.Remove();
2460         } catch (...) {}
2461     }
2462 }
2463 
2464 
2465 // Logs the corresponding message if needed and provides the overall reinit
2466 // status.
x_CheckOpenPreconditions(bool reinit)2467 bool CQueueDataBase::x_CheckOpenPreconditions(bool  reinit)
2468 {
2469     if (x_DoesCrashFlagFileExist()) {
2470         ERR_POST("Reinitialization due to the server "
2471                  "did not stop gracefully last time. "
2472                  << m_DataPath << " removed.");
2473         m_Server->RegisterAlert(eStartAfterCrash, "Database has been "
2474                                 "reinitialized due to the server did not "
2475                                 "stop gracefully last time");
2476         return true;
2477     }
2478 
2479     if (reinit) {
2480         LOG_POST(Note << "Reinitialization due to a command line option. "
2481                       << m_DataPath << " removed.");
2482         m_Server->RegisterAlert(eReinit, "Database has been reinitialized due "
2483                                          "to a command line option");
2484         return true;
2485     }
2486 
2487     if (CDir(m_DataPath).Exists()) {
2488         bool    ver_file_exists = CFile(m_DataPath +
2489                                         kDBStorageVersionFileName).Exists();
2490         bool    dump_dir_exists = CDir(m_DumpPath).Exists();
2491 
2492         if (dump_dir_exists && !ver_file_exists) {
2493             // Strange. Some service file exist while the storage version
2494             // does not. It might be that the data dir has been altered
2495             // manually. Let's not start.
2496             NCBI_THROW(CNetScheduleException, eInternalError,
2497                        "Error detected: Storage version file is not found "
2498                        "while the dump directory exists.");
2499         }
2500 
2501         if (dump_dir_exists && ver_file_exists) {
2502             CFileIO     f;
2503             char        buf[1024];
2504             size_t      read_count = 0;
2505 
2506             // Expected 1 or 2 lines.
2507             // The first one is a storage version
2508             // The second is a server version (used starting from 4.31.0)
2509             f.Open(m_DataPath + kDBStorageVersionFileName,
2510                    CFileIO_Base::eOpen, CFileIO_Base::eRead);
2511             read_count = f.Read(buf, sizeof(buf) - 1);
2512             f.Close();
2513 
2514             buf[read_count] = '\0';
2515             vector<string>  lines;
2516             NStr::Split(buf, "\n", lines);
2517 
2518             if (lines.size() > 2) {
2519                 ERR_POST(Message << Warning <<
2520                          "Unexpected format of the storage version file: more "
2521                          "than 2 lines found");
2522             } else if (lines.size() == 0) {
2523                 ERR_POST(Message << Warning <<
2524                          "Unexpected format of the storage version file: no "
2525                          "lines found");
2526             }
2527 
2528             CNcbiApplication *  app = CNcbiApplication::Instance();
2529             CVersionInfo        ver_info = app->GetVersion();
2530             string              server_version = ver_info.Print();
2531 
2532             if (lines.size() >= 2) {
2533                 if (server_version != lines[1]) {
2534                     ERR_POST(Message << Warning <<
2535                              "NetSchedule version has changed. "
2536                              "Previous server version: " << lines[1] <<
2537                              " Current version: " << server_version);
2538                 }
2539             } else if (lines.size() == 1) {
2540                 ERR_POST(Message << Warning <<
2541                          "NetSchedule version has changed. "
2542                          "Previous server version is unknown. "
2543                          " Current version: " << server_version);
2544             }
2545 
2546             if (lines.size() >= 1) {
2547                 if (lines[0] != NETSCHEDULED_STORAGE_VERSION) {
2548                     ERR_POST(Message << Warning <<
2549                              "Storage version mismatch detected. "
2550                              "Dumped version: " << lines[0] <<
2551                              " Current version: " NETSCHEDULED_STORAGE_VERSION);
2552                 }
2553             }
2554         }
2555 
2556         if (!dump_dir_exists && ver_file_exists) {
2557             LOG_POST(Note << "Non-empty data directory exists however the "
2558                           << kDumpSubdirName
2559                           << " subdirectory is not found");
2560             m_Server->RegisterAlert(eNoDump,
2561                                     "Non-empty data directory exists "
2562                                     "however the " + kDumpSubdirName +
2563                                     " subdirectory is not found");
2564         }
2565     }
2566 
2567     if (x_DoesDumpErrorFlagFileExist()) {
2568         string  msg = "The previous instance of the server had problems with "
2569                       "dumping the information on the disk. Some queues may be "
2570                       "not restored. "
2571                       "See the previous instance log for details.";
2572         LOG_POST(Note << msg);
2573         m_Server->RegisterAlert(eDumpError, msg);
2574     }
2575 
2576     return false;
2577 }
2578 
2579 
x_CreateStorageVersionFile(void)2580 void CQueueDataBase::x_CreateStorageVersionFile(void)
2581 {
2582     CNcbiApplication *  app = CNcbiApplication::Instance();
2583     CVersionInfo        ver_info = app->GetVersion();
2584     string              server_version = ver_info.Print();
2585 
2586     CFileIO     f;
2587     f.Open(m_DataPath + kDBStorageVersionFileName, CFileIO_Base::eCreate,
2588                                                    CFileIO_Base::eReadWrite);
2589     f.Write(NETSCHEDULED_STORAGE_VERSION, strlen(NETSCHEDULED_STORAGE_VERSION));
2590     f.Write("\n", 1);
2591     f.Write(server_version.data(), server_version.size());
2592     f.Close();
2593 }
2594 
2595 
2596 void
x_ReadDumpQueueDesrc(set<string,PNocase> & dump_static_queues,map<string,string,PNocase> & dump_dynamic_queues,TQueueParams & dump_queue_classes)2597 CQueueDataBase::x_ReadDumpQueueDesrc(set<string, PNocase> &  dump_static_queues,
2598                                      map<string, string,
2599                                          PNocase> &  dump_dynamic_queues,
2600                                      TQueueParams &  dump_queue_classes)
2601 {
2602     CDir        dump_dir(m_DumpPath);
2603     if (!dump_dir.Exists())
2604         return;
2605 
2606     // The file contains dynamic queue classes and dynamic queue names
2607     string      queue_desrc_file_name = m_DumpPath + kQClassDescriptionFileName;
2608     CFile       queue_desrc_file(queue_desrc_file_name);
2609     if (queue_desrc_file.Exists()) {
2610         FILE *      f = fopen(queue_desrc_file_name.c_str(), "rb");
2611         if (f == NULL)
2612             throw runtime_error("Cannot open the existing dump file "
2613                                 "for reading: " + queue_desrc_file_name);
2614 
2615         SQueueDescriptionDump   dump_struct;
2616         try {
2617             SOneStructDumpHeader    header;
2618             header.Read(f);
2619 
2620             while (dump_struct.Read(f, header.fixed_size) == 0) {
2621                 if (dump_struct.is_queue) {
2622                     string  qname(dump_struct.qname, dump_struct.qname_size);
2623                     string  qclass(dump_struct.qclass, dump_struct.qclass_size);
2624                     dump_dynamic_queues[qname] = qclass;
2625                 } else {
2626                     SQueueParameters    p;
2627                     p.qclass = "";
2628                     p.timeout = CNSPreciseTime(dump_struct.timeout);
2629                     p.notif_hifreq_interval =
2630                         CNSPreciseTime(dump_struct.notif_hifreq_interval);
2631                     p.notif_hifreq_period =
2632                         CNSPreciseTime(dump_struct.notif_hifreq_period);
2633                     p.notif_lofreq_mult = dump_struct.notif_lofreq_mult;
2634                     p.notif_handicap =
2635                         CNSPreciseTime(dump_struct.notif_handicap);
2636                     p.dump_buffer_size = dump_struct.dump_buffer_size;
2637                     p.dump_client_buffer_size =
2638                         dump_struct.dump_client_buffer_size;
2639                     p.dump_aff_buffer_size = dump_struct.dump_aff_buffer_size;
2640                     p.dump_group_buffer_size =
2641                         dump_struct.dump_group_buffer_size;
2642                     p.run_timeout = CNSPreciseTime(dump_struct.run_timeout);
2643                     p.read_timeout = CNSPreciseTime(dump_struct.read_timeout);
2644                     p.program_name = string(dump_struct.program_name,
2645                                             dump_struct.program_name_size);
2646                     p.failed_retries = dump_struct.failed_retries;
2647                     p.read_failed_retries = dump_struct.read_failed_retries;
2648                     p.max_jobs_per_client = dump_struct.max_jobs_per_client;
2649                     p.blacklist_time =
2650                         CNSPreciseTime(dump_struct.blacklist_time);
2651                     p.read_blacklist_time =
2652                         CNSPreciseTime(dump_struct.read_blacklist_time);
2653                     p.max_input_size = dump_struct.max_input_size;
2654                     p.max_output_size = dump_struct.max_output_size;
2655                     p.subm_hosts = string(dump_struct.subm_hosts,
2656                                           dump_struct.subm_hosts_size);
2657                     p.wnode_hosts = string(dump_struct.wnode_hosts,
2658                                            dump_struct.wnode_hosts_size);
2659                     p.reader_hosts = string(dump_struct.reader_hosts,
2660                                             dump_struct.reader_hosts_size);
2661                     p.wnode_timeout =
2662                         CNSPreciseTime(dump_struct.wnode_timeout);
2663                     p.reader_timeout =
2664                         CNSPreciseTime(dump_struct.reader_timeout);
2665                     p.pending_timeout =
2666                         CNSPreciseTime(dump_struct.pending_timeout);
2667                     p.max_pending_wait_timeout =
2668                         CNSPreciseTime(dump_struct.max_pending_wait_timeout);
2669                     p.max_pending_read_wait_timeout =
2670                         CNSPreciseTime(dump_struct.
2671                                 max_pending_read_wait_timeout);
2672                     p.description = string(dump_struct.description,
2673                                            dump_struct.description_size);
2674                     p.scramble_job_keys = dump_struct.scramble_job_keys;
2675                     p.client_registry_timeout_worker_node =
2676                         CNSPreciseTime(dump_struct.
2677                                 client_registry_timeout_worker_node);
2678                     p.client_registry_min_worker_nodes =
2679                         dump_struct.client_registry_min_worker_nodes;
2680                     p.client_registry_timeout_admin =
2681                         CNSPreciseTime(dump_struct.
2682                                 client_registry_timeout_admin);
2683                     p.client_registry_min_admins =
2684                         dump_struct.client_registry_min_admins;
2685                     p.client_registry_timeout_submitter =
2686                         CNSPreciseTime(dump_struct.
2687                                 client_registry_timeout_submitter);
2688                     p.client_registry_min_submitters =
2689                         dump_struct.client_registry_min_submitters;
2690                     p.client_registry_timeout_reader =
2691                         CNSPreciseTime(dump_struct.
2692                                 client_registry_timeout_reader);
2693                     p.client_registry_min_readers =
2694                         dump_struct.client_registry_min_readers;
2695                     p.client_registry_timeout_unknown =
2696                         CNSPreciseTime(dump_struct.
2697                                 client_registry_timeout_unknown);
2698                     p.client_registry_min_unknowns =
2699                         dump_struct.client_registry_min_unknowns;;
2700 
2701                     // Unpack linked sections
2702                     string          dump_prefs(dump_struct.
2703                                                 linked_section_prefixes,
2704                                                dump_struct.
2705                                                 linked_section_prefixes_size);
2706                     string          dump_names(dump_struct.
2707                                                 linked_section_names,
2708                                                dump_struct.
2709                                                 linked_section_names_size);
2710                     list<string>    prefixes;
2711                     list<string>    names;
2712                     NStr::Split(dump_prefs, ",", prefixes,
2713                                 NStr::fSplit_MergeDelimiters |
2714                                 NStr::fSplit_Truncate);
2715                     NStr::Split(dump_names, ",", names,
2716                                 NStr::fSplit_MergeDelimiters |
2717                                 NStr::fSplit_Truncate);
2718                     list<string>::const_iterator pref_it = prefixes.begin();
2719                     list<string>::const_iterator names_it = names.begin();
2720                     for ( ; pref_it != prefixes.end() &&
2721                             names_it != names.end(); ++pref_it, ++names_it)
2722                         p.linked_sections[*pref_it] = *names_it;
2723 
2724                     string  qclass(dump_struct.qclass, dump_struct.qclass_size);
2725                     dump_queue_classes[qclass] = p;
2726                 }
2727             }
2728         } catch (const exception &  ex) {
2729             fclose(f);
2730             throw;
2731         }
2732         fclose(f);
2733     }
2734 
2735 
2736     CDir::TEntries      entries = dump_dir.GetEntries(
2737                                     kEmptyStr, CDir::fIgnoreRecursive);
2738     for (CDir::TEntries::const_iterator  k = entries.begin();
2739             k != entries.end(); ++k) {
2740         if ((*k)->IsDir())
2741             continue;
2742         if ((*k)->IsLink())
2743             continue;
2744         string      entry_name = (*k)->GetName();
2745         if (!NStr::StartsWith(entry_name, "db_dump."))
2746             continue;
2747 
2748         string  prefix;
2749         string  qname;
2750         NStr::SplitInTwo(entry_name, ".", prefix, qname);
2751         if (dump_dynamic_queues.find(qname) == dump_dynamic_queues.end())
2752             dump_static_queues.insert(qname);
2753     }
2754 }
2755 
2756 
x_GetConfigQueues(void)2757 set<string, PNocase> CQueueDataBase::x_GetConfigQueues(void)
2758 {
2759     const CNcbiRegistry &   reg = CNcbiApplication::Instance()->GetConfig();
2760     set<string, PNocase>    queues;
2761     list<string>            sections;
2762 
2763     reg.EnumerateSections(&sections);
2764     for (list<string>::const_iterator  k = sections.begin();
2765             k != sections.end(); ++k) {
2766         string              queue_name;
2767         string              prefix;
2768         const string &      section_name = *k;
2769 
2770         NStr::SplitInTwo(section_name, "_", prefix, queue_name);
2771         if (NStr::CompareNocase(prefix, "queue") != 0)
2772             continue;
2773         if (queue_name.empty())
2774             continue;
2775         if (queue_name.size() > kMaxQueueNameSize - 1)
2776             continue;
2777         queues.insert(queue_name);
2778     }
2779 
2780     return queues;
2781 }
2782 
2783 
x_AppendDumpLinkedSections(void)2784 void CQueueDataBase::x_AppendDumpLinkedSections(void)
2785 {
2786     // Here: the m_LinkedSections has already been read from the configuration
2787     //       file. Let's append the sections from the dump.
2788     CDir        dump_dir(m_DumpPath);
2789     if (!dump_dir.Exists())
2790         return;
2791 
2792     string  linked_sections_file_name = m_DumpPath + kLinkedSectionsFileName;
2793     CFile   linked_sections_file(linked_sections_file_name);
2794 
2795     if (linked_sections_file.Exists()) {
2796         FILE *      f = fopen(linked_sections_file_name.c_str(), "rb");
2797         if (f == NULL)
2798             throw runtime_error("Cannot open the existing dump file "
2799                                 "for reading: " + linked_sections_file_name);
2800         SLinkedSectionDump                  dump_struct;
2801         map<string, map<string, string> >   dump_sections;
2802         try {
2803             SOneStructDumpHeader    header;
2804             header.Read(f);
2805 
2806             while (dump_struct.Read(f, header.fixed_size) == 0) {
2807                 if (m_LinkedSections.find(dump_struct.section) !=
2808                         m_LinkedSections.end())
2809                     continue;
2810                 string  sname(dump_struct.section, dump_struct.section_size);
2811                 string  vname(dump_struct.value_name,
2812                               dump_struct.value_name_size);
2813                 string  val(dump_struct.value, dump_struct.value_size);
2814                 dump_sections[sname][vname] = val;
2815             }
2816         } catch (const exception &  ex) {
2817             fclose(f);
2818             throw;
2819         }
2820         fclose(f);
2821 
2822         m_LinkedSections.insert(dump_sections.begin(), dump_sections.end());
2823     }
2824 }
2825 
2826 
x_BackupDump(void)2827 void CQueueDataBase::x_BackupDump(void)
2828 {
2829     CDir    dump_dir(m_DumpPath);
2830     if (!dump_dir.Exists())
2831         return;
2832 
2833     size_t      backup_number = 0;
2834     string      backup_dir_name;
2835     for ( ; ; ) {
2836         backup_dir_name = CDirEntry::DeleteTrailingPathSeparator(m_DumpPath) +
2837                           "." + to_string(backup_number);
2838         if (!CDir(backup_dir_name).Exists())
2839             break;
2840         ++backup_number;
2841     }
2842 
2843     try {
2844         dump_dir.Rename(backup_dir_name);
2845     } catch (const exception &  ex) {
2846         ERR_POST("Error renaming the dump directory: " << ex.what());
2847     } catch (...) {
2848         ERR_POST("Unknown error renaming the dump directory");
2849     }
2850 }
2851 
2852 
x_GetDumpSpaceFileName(void) const2853 string CQueueDataBase::x_GetDumpSpaceFileName(void) const
2854 {
2855     return m_DumpPath + kDumpReservedSpaceFileName;
2856 }
2857 
2858 
x_RemoveSpaceReserveFile(void)2859 bool CQueueDataBase::x_RemoveSpaceReserveFile(void)
2860 {
2861     CFile       space_file(x_GetDumpSpaceFileName());
2862     if (space_file.Exists()) {
2863         try {
2864             space_file.Remove();
2865         } catch (const exception &  ex) {
2866             string  msg = "Error removing reserving dump space file: " +
2867                           string(ex.what());
2868             ERR_POST(msg);
2869             m_Server->RegisterAlert(eDumpSpaceError, msg);
2870             return false;
2871         }
2872     }
2873     return true;
2874 }
2875 
2876 
x_CreateSpaceReserveFile(void)2877 void CQueueDataBase::x_CreateSpaceReserveFile(void)
2878 {
2879     unsigned int        space = m_Server->GetReserveDumpSpace();
2880     if (space == 0)
2881         return;
2882 
2883     CDir    dump_dir(m_DumpPath);
2884     if (!dump_dir.Exists()) {
2885         try {
2886             dump_dir.Create();
2887         } catch (const exception &  ex) {
2888             string  msg = "Error creating dump directory: " + string(ex.what());
2889             ERR_POST(msg);
2890             m_Server->RegisterAlert(eDumpSpaceError, msg);
2891             return;
2892         }
2893     }
2894 
2895     // This will truncate the file if it existed
2896     FILE *      space_file = fopen(x_GetDumpSpaceFileName().c_str(), "w");
2897     if (space_file == NULL) {
2898         string  msg = "Error opening reserving dump space file " +
2899                       x_GetDumpSpaceFileName();
2900         ERR_POST(msg);
2901         m_Server->RegisterAlert(eDumpSpaceError, msg);
2902         return;
2903     }
2904 
2905     void *      buffer = malloc(kDumpReservedSpaceFileBuffer);
2906     if (buffer == NULL) {
2907         fclose(space_file);
2908         string  msg = "Error creating a memory buffer to write into the "
2909                       "reserving dump space file";
2910         ERR_POST(msg);
2911         m_Server->RegisterAlert(eDumpSpaceError, msg);
2912         return;
2913     }
2914 
2915     memset(buffer, 0, kDumpReservedSpaceFileBuffer);
2916     while (space > kDumpReservedSpaceFileBuffer) {
2917         errno = 0;
2918         if (fwrite(buffer, kDumpReservedSpaceFileBuffer, 1, space_file) != 1) {
2919             free(buffer);
2920             fclose(space_file);
2921             string  msg = "Error writing into the reserving dump space file: " +
2922                           string(strerror(errno));
2923             ERR_POST(msg);
2924             m_Server->RegisterAlert(eDumpSpaceError, msg);
2925             return;
2926         }
2927         space -= kDumpReservedSpaceFileBuffer;
2928     }
2929 
2930     if (space > 0) {
2931         errno = 0;
2932         if (fwrite(buffer, space, 1, space_file) != 1) {
2933             string  msg = "Error writing into the reserving dump space file: " +
2934                           string(strerror(errno));
2935             ERR_POST(msg);
2936             m_Server->RegisterAlert(eDumpSpaceError, msg);
2937         }
2938     }
2939 
2940     free(buffer);
2941     fclose(space_file);
2942 }
2943 
2944 
2945 void
x_RestorePauseState(const map<string,int> & paused_queues)2946 CQueueDataBase::x_RestorePauseState(const map<string, int> &  paused_queues)
2947 {
2948     CFastMutexGuard     guard(m_ConfigureLock);
2949 
2950     for (map<string, int>::const_iterator k = paused_queues.begin();
2951          k != paused_queues.end(); ++k) {
2952         string  queue_name = k->first;
2953         int     pause_mode = k->second;
2954 
2955 
2956         TQueueInfo::iterator  existing = m_Queues.find(queue_name);
2957         if (existing == m_Queues.end()) {
2958             ERR_POST("Cannot restore the pause state of the " <<
2959                      queue_name << " queue. The queue is not configured.");
2960         } else {
2961             existing->second.second->RestorePauseStatus(pause_mode);
2962         }
2963     }
2964 }
2965 
2966 
2967 void
x_RestoreRefuseSubmitState(const vector<string> & refuse_submit_queues)2968 CQueueDataBase::x_RestoreRefuseSubmitState(
2969                                 const vector<string> &  refuse_submit_queues)
2970 {
2971     CFastMutexGuard     guard(m_ConfigureLock);
2972 
2973     for (vector<string>::const_iterator k = refuse_submit_queues.begin();
2974          k != refuse_submit_queues.end(); ++k) {
2975         string  queue_name = *k;
2976 
2977         if (NStr::CompareNocase(queue_name, "[server]") == 0) {
2978             m_Server->SetRefuseSubmits(true);
2979             continue;
2980         }
2981 
2982         TQueueInfo::iterator  existing = m_Queues.find(queue_name);
2983         if (existing == m_Queues.end()) {
2984             ERR_POST("Cannot restore the refuse submit state of the " <<
2985                      queue_name << " queue. The queue is not configured.");
2986         } else {
2987             existing->second.second->SetRefuseSubmits(true);
2988         }
2989     }
2990 }
2991 
2992 END_NCBI_SCOPE
2993