1 /* $Id: queue_database.cpp 601644 2020-02-11 19:47:28Z satskyse $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Anatoliy Kuznetsov, Victor Joukov
27 *
28 * File Description: NetSchedule queue collection and database managenement.
29 *
30 */
31 #include <ncbi_pch.hpp>
32 #include <unistd.h>
33
34 #include <corelib/ncbi_system.hpp>
35 #include <corelib/ncbireg.hpp>
36
37 #include <connect/services/netschedule_api.hpp>
38 #include <connect/ncbi_socket.hpp>
39
40 #include <util/time_line.hpp>
41
42 #include "queue_database.hpp"
43 #include "ns_util.hpp"
44 #include "netschedule_version.hpp"
45 #include "ns_server.hpp"
46 #include "ns_handler.hpp"
47 #include "ns_db_dump.hpp"
48 #include "ns_types.hpp"
49 #include "ns_restore_state.hpp"
50
51
52 BEGIN_NCBI_SCOPE
53
54
55
56 /////////////////////////////////////////////////////////////////////////////
57 // CQueueDataBase implementation
58
CQueueDataBase(CNetScheduleServer * server,const string & path,unsigned int max_queues,bool diskless,bool reinit)59 CQueueDataBase::CQueueDataBase(CNetScheduleServer * server,
60 const string & path,
61 unsigned int max_queues,
62 bool diskless,
63 bool reinit)
64 : m_Host(server->GetBackgroundHost()),
65 m_MaxQueues(max_queues),
66 m_Diskless(diskless),
67 m_StopPurge(false),
68 m_FreeStatusMemCnt(0),
69 m_LastFreeMem(time(0)),
70 m_Server(server),
71 m_PurgeQueue(""),
72 m_PurgeStatusIndex(0),
73 m_PurgeJobScanned(0)
74 {
75 m_DataPath = CDirEntry::AddTrailingPathSeparator(path);
76 m_DumpPath = CDirEntry::AddTrailingPathSeparator(m_DataPath +
77 kDumpSubdirName);
78
79 // First, load the previous session start job IDs if file existed
80 // The diskless flag will be considered when IDs are loaded.
81 m_Server->LoadJobsStartIDs();
82
83 // Load the content of the queue state files
84 map<string, int> paused_queues = DeserializePauseState(m_DataPath,
85 m_Diskless);
86 vector<string> refuse_submit_queues =
87 DeserializeRefuseSubmitState(m_DataPath,
88 m_Diskless);
89
90 // Old instance data files are not needed (even if they survived)
91 if (!m_Diskless)
92 x_RemoveDataFiles();
93
94 // Creates the queues from the ini file and loads jobs from the dump
95 x_Open(reinit);
96
97 if (!m_Diskless) {
98 // Restore the queue state
99 x_RestorePauseState(paused_queues);
100 x_RestoreRefuseSubmitState(refuse_submit_queues);
101
102 // The files with queue state could be broken or the queues may be removed
103 // from the configuration file so update the state files to keep them
104 // consistent with the up to date configuration
105 SerializePauseState(m_DataPath, GetPauseQueues());
106 SerializeRefuseSubmitState(m_DataPath, server->GetRefuseSubmits(),
107 GetRefuseSubmitQueues());
108 }
109 }
110
111
~CQueueDataBase()112 CQueueDataBase::~CQueueDataBase()
113 {
114 try {
115 Close();
116 } catch (...) {}
117 }
118
119
x_Open(bool reinit)120 void CQueueDataBase::x_Open(bool reinit)
121 {
122 // Checks preconditions and provides the final reinit value
123 // It sets alerts and throws exceptions if needed.
124 if (m_Diskless) {
125 reinit = false;
126 } else {
127 reinit = x_CheckOpenPreconditions(reinit);
128 }
129
130 CDir data_dir(m_DataPath);
131 if (reinit)
132 data_dir.Remove();
133 if (m_Diskless) {
134 // Try to remove th dir if it exists
135 if (data_dir.Exists()) {
136 ERR_POST(Warning << "The configuration has the [server]/diskless "
137 "value set to true but the previous run "
138 "directory is on the disk. It will be removed.");
139 if (!data_dir.Remove()) {
140 ERR_POST(Critical << "Error removing the previous run data "
141 "directory " << m_DataPath);
142 m_Server->RegisterAlert(eDataDirRemoveError,
143 "Error removing the previous run data "
144 "directory " + m_DataPath +
145 " (due to the cofiguration file sets "
146 "the [server]/diskless value to true)");
147 }
148 }
149 } else if (!data_dir.Exists()) {
150 data_dir.Create();
151 }
152
153 // The initialization must be done before the queues are created but after
154 // the directory is possibly re-created
155 m_Server->InitNodeID(m_DataPath);
156
157 // Detect what queues need to be loaded. It depends on the configuration
158 // file and on the dumped queues. It might be that the saved queues +
159 // the config file queues excced the configured max number of queues.
160 set<string, PNocase> dump_static_queues;
161 map<string, string,
162 PNocase> dump_dynamic_queues; // qname -> qclass
163 TQueueParams dump_queue_classes;
164
165 if (!m_Diskless)
166 x_ReadDumpQueueDesrc(dump_static_queues, dump_dynamic_queues,
167 dump_queue_classes);
168
169 set<string, PNocase> config_static_queues = x_GetConfigQueues();
170 string last_queue_load_error;
171 size_t queue_load_error_count = 0;
172
173 // Exclude number of queues will be the static queues from the config
174 // plus the dumped dynamic queues
175 size_t final_dynamic_count = 0;
176 for (map<string, string, PNocase>::const_iterator
177 k = dump_dynamic_queues.begin();
178 k != dump_dynamic_queues.end(); ++k)
179 if (config_static_queues.find(k->first) == config_static_queues.end())
180 ++final_dynamic_count;
181
182 size_t total_queues = final_dynamic_count +
183 config_static_queues.size();
184 if (total_queues > m_MaxQueues) {
185 string msg = "The initial number of queues on the server exceeds the "
186 "configured max number of queues. Configured: " +
187 to_string(m_MaxQueues) + ". Real: " +
188 to_string(total_queues) + ". The limit will "
189 "be extended to accomodate all the queues.";
190 LOG_POST(Note << msg);
191 m_Server->RegisterAlert(eMaxQueues, msg);
192 m_MaxQueues = total_queues;
193 }
194
195 try {
196 // Here: we can start restoring what was saved. The first step is
197 // to get the linked sections. Linked sections have two sources:
198 // - config file
199 // - dumped sections
200 // The config file sections must override the dumped ones
201 CJsonNode unused_diff = CJsonNode::NewObjectNode();
202 x_ReadLinkedSections(CNcbiApplication::Instance()->GetConfig(),
203 unused_diff);
204
205 if (!m_Diskless)
206 x_AppendDumpLinkedSections();
207
208 // Read the queue classes from the config file and append those which
209 // come from the dump
210 m_QueueClasses = x_ReadIniFileQueueClassDescriptions(
211 CNcbiApplication::Instance()->GetConfig());
212 for (TQueueParams::const_iterator k = dump_queue_classes.begin();
213 k != dump_queue_classes.end(); ++k) {
214 if (m_QueueClasses.find(k->first) == m_QueueClasses.end())
215 m_QueueClasses[k->first] = k->second;
216 }
217
218 // Read the queues from the config file
219 TQueueParams queues_from_ini =
220 x_ReadIniFileQueueDescriptions(
221 CNcbiApplication::Instance()->GetConfig(),
222 m_QueueClasses);
223 x_ConfigureQueues(queues_from_ini, unused_diff);
224
225 // Add the queues from the dump
226 for (map<string, string, PNocase>::const_iterator
227 k = dump_dynamic_queues.begin();
228 k != dump_dynamic_queues.end(); ++k) {
229 string qname = k->first;
230 if (config_static_queues.find(qname) != config_static_queues.end())
231 continue;
232
233 // Here: the dumped queue has not been changed from a dynamic one
234 // to a static. So, it needs to be added.
235 string qclass = k->second;
236
237 SQueueParameters params = m_QueueClasses[qclass];
238
239 params.kind = CQueue::eKindDynamic;
240 params.delete_request = false;
241 params.qclass = qclass;
242
243 // Lost parameter: description. The only dynamic queue classes are
244 // dumped so the description is lost.
245 // params.description = ...
246
247 x_CreateAndMountQueue(qname, params);
248 }
249
250 // All the structures are ready to upload the jobs from the dump
251 if (!m_Diskless) {
252 for (TQueueInfo::iterator k = m_Queues.begin();
253 k != m_Queues.end(); ++k) {
254 try {
255 unsigned int records =
256 k->second.second->LoadFromDump(m_DumpPath);
257 GetDiagContext().Extra()
258 .Print("_type", "startup")
259 .Print("_queue", k->first)
260 .Print("info", "load_from_dump")
261 .Print("records", records);
262 } catch (const exception & ex) {
263 ERR_POST(ex.what());
264 last_queue_load_error = ex.what();
265 ++queue_load_error_count;
266 } catch (...) {
267 last_queue_load_error = "Unknown error loading queue " +
268 k->first + " from dump";
269 ERR_POST(last_queue_load_error);
270 ++queue_load_error_count;
271 }
272 }
273 }
274 } catch (const exception & ex) {
275 ERR_POST(ex.what());
276 last_queue_load_error = ex.what();
277 ++queue_load_error_count;
278 } catch (...) {
279 last_queue_load_error = "Unknown error loading queues from dump";
280 ERR_POST(last_queue_load_error);
281 ++queue_load_error_count;
282 }
283
284 if (!m_Diskless) {
285 x_CreateCrashFlagFile();
286 x_CreateDumpErrorFlagFile();
287 x_CreateStorageVersionFile();
288
289 // Serialize the start job IDs file if it was deleted during the
290 // database initialization. Even if it is overwriting an existing file
291 // there is no performance issue at this point (it's done once anyway).
292 m_Server->SerializeJobsStartIDs();
293 }
294
295 if (queue_load_error_count > 0) {
296 m_Server->RegisterAlert(eDumpLoadError,
297 "There were error(s) loading the previous "
298 "instance dump. Number of errors: " +
299 to_string(queue_load_error_count) +
300 ". See log for all the loading errors. "
301 "Last error: " + last_queue_load_error);
302 if (!m_Diskless)
303 x_BackupDump();
304 } else {
305 if (!m_Diskless)
306 x_RemoveDump();
307 }
308
309 if (!m_Diskless)
310 x_CreateSpaceReserveFile();
311 }
312
313
314 TQueueParams
x_ReadIniFileQueueClassDescriptions(const IRegistry & reg)315 CQueueDataBase::x_ReadIniFileQueueClassDescriptions(const IRegistry & reg)
316 {
317 TQueueParams queue_classes;
318 list<string> sections;
319
320 reg.EnumerateSections(§ions);
321
322 ITERATE(list<string>, it, sections) {
323 string queue_class;
324 string prefix;
325 const string & section_name = *it;
326
327 NStr::SplitInTwo(section_name, "_", prefix, queue_class);
328 if (NStr::CompareNocase(prefix, "qclass") != 0)
329 continue;
330 if (queue_class.empty())
331 continue;
332 if (queue_class.size() > kMaxQueueNameSize - 1)
333 continue;
334
335 // Warnings are ignored here. At this point they are not of interest
336 // because they have already been collected at the startup (allowed)
337 // or at RECO - a file with warnings is not allowed
338 SQueueParameters params;
339 vector<string> warnings;
340 if (params.ReadQueueClass(reg, section_name, warnings)) {
341 // false => problems with linked sections; see CXX-2617
342 // The same sections cannot appear twice
343 queue_classes[queue_class] = params;
344 }
345 }
346
347 return queue_classes;
348 }
349
350
351 // Reads the queues from ini file and respects inheriting queue classes
352 // parameters
353 TQueueParams
x_ReadIniFileQueueDescriptions(const IRegistry & reg,const TQueueParams & classes)354 CQueueDataBase::x_ReadIniFileQueueDescriptions(const IRegistry & reg,
355 const TQueueParams & classes)
356 {
357 TQueueParams queues;
358 list<string> sections;
359
360 reg.EnumerateSections(§ions);
361 ITERATE(list<string>, it, sections) {
362 string queue_name;
363 string prefix;
364 const string & section_name = *it;
365
366 NStr::SplitInTwo(section_name, "_", prefix, queue_name);
367 if (NStr::CompareNocase(prefix, "queue") != 0)
368 continue;
369 if (queue_name.empty())
370 continue;
371 if (queue_name.size() > kMaxQueueNameSize - 1)
372 continue;
373
374 // Warnings are ignored here. At this point they are not of interest
375 // because they have already been collected at the startup (allowed)
376 // or at RECO - a file with warnings is not allowed
377 SQueueParameters params;
378 vector<string> warnings;
379
380 if (params.ReadQueue(reg, section_name, classes, warnings)) {
381 // false => problems with linked sections; see CXX-2617
382 queues[queue_name] = params;
383 }
384 }
385
386 return queues;
387 }
388
389
x_ReadLinkedSections(const IRegistry & reg,CJsonNode & diff)390 void CQueueDataBase::x_ReadLinkedSections(const IRegistry & reg,
391 CJsonNode & diff)
392 {
393 // Read the new content
394 typedef map< string, map< string, string > > section_container;
395 section_container new_values;
396 list<string> sections;
397 reg.EnumerateSections(§ions);
398
399 ITERATE(list<string>, it, sections) {
400 string queue_or_class;
401 string prefix;
402 const string & section_name = *it;
403
404 NStr::SplitInTwo(section_name, "_", prefix, queue_or_class);
405 if (queue_or_class.empty())
406 continue;
407 if (NStr::CompareNocase(prefix, "qclass") != 0 &&
408 NStr::CompareNocase(prefix, "queue") != 0)
409 continue;
410 if (queue_or_class.size() > kMaxQueueNameSize - 1)
411 continue;
412
413 list<string> entries;
414 reg.EnumerateEntries(section_name, &entries);
415
416 ITERATE(list<string>, k, entries) {
417 const string & entry = *k;
418 string ref_section = reg.GetString(section_name,
419 entry, kEmptyStr);
420
421 if (!NStr::StartsWith(entry, "linked_section_", NStr::eCase))
422 continue;
423
424 if (entry == "linked_section_")
425 continue; // Malformed values prefix
426
427 if (ref_section.empty())
428 continue; // Malformed section name
429
430 if (find(sections.begin(), sections.end(), ref_section) ==
431 sections.end())
432 continue; // Non-existing section
433
434 if (new_values.find(ref_section) != new_values.end())
435 continue; // Has already been read
436
437 // Read the linked section values
438 list<string> linked_section_entries;
439 reg.EnumerateEntries(ref_section, &linked_section_entries);
440 map<string, string> values;
441 for (list<string>::const_iterator j = linked_section_entries.begin();
442 j != linked_section_entries.end(); ++j)
443 values[*j] = reg.GetString(ref_section, *j, kEmptyStr);
444
445 new_values[ref_section] = values;
446 }
447 }
448
449 CFastMutexGuard guard(m_LinkedSectionsGuard);
450
451 // Identify those sections which were deleted
452 vector<string> deleted;
453 for (section_container::const_iterator k(m_LinkedSections.begin());
454 k != m_LinkedSections.end(); ++k)
455 if (new_values.find(k->first) == new_values.end())
456 deleted.push_back(k->first);
457
458 if (!deleted.empty()) {
459 CJsonNode deletedSections = CJsonNode::NewArrayNode();
460 for (vector<string>::const_iterator k(deleted.begin());
461 k != deleted.end(); ++k)
462 deletedSections.AppendString( *k );
463 diff.SetByKey( "linked_section_deleted", deletedSections );
464 }
465
466 // Identify those sections which were added
467 vector<string> added;
468 for (section_container::const_iterator k(new_values.begin());
469 k != new_values.end(); ++k)
470 if (m_LinkedSections.find(k->first) == m_LinkedSections.end())
471 added.push_back(k->first);
472
473 if (!added.empty()) {
474 CJsonNode addedSections = CJsonNode::NewArrayNode();
475 for (vector<string>::const_iterator k(added.begin());
476 k != added.end(); ++k)
477 addedSections.AppendString( *k );
478 diff.SetByKey( "linked_section_added", addedSections );
479 }
480
481 // Deal with changed sections: what was added/deleted/modified
482 vector<string> changed;
483 for (section_container::const_iterator k(new_values.begin());
484 k != new_values.end(); ++k) {
485 if (find(added.begin(), added.end(), k->first) != added.end())
486 continue;
487 if (new_values[k->first] == m_LinkedSections[k->first])
488 continue;
489 changed.push_back(k->first);
490 }
491
492 if (!changed.empty()) {
493 CJsonNode changedSections = CJsonNode::NewObjectNode();
494 for (vector<string>::const_iterator k(changed.begin());
495 k != changed.end(); ++k)
496 changedSections.SetByKey( *k,
497 x_DetectChangesInLinkedSection( m_LinkedSections[*k],
498 new_values[*k]) );
499 diff.SetByKey( "linked_section_changed", changedSections );
500 }
501
502 // Finally, save the new configuration
503 m_LinkedSections = new_values;
504 }
505
506
507 CJsonNode
x_DetectChangesInLinkedSection(const map<string,string> & old_values,const map<string,string> & new_values)508 CQueueDataBase::x_DetectChangesInLinkedSection(
509 const map<string, string> & old_values,
510 const map<string, string> & new_values)
511 {
512 CJsonNode diff = CJsonNode::NewObjectNode();
513
514 // Deal with deleted items
515 vector<string> deleted;
516 for (map<string, string>::const_iterator k(old_values.begin());
517 k != old_values.end(); ++k)
518 if (new_values.find(k->first) == new_values.end())
519 deleted.push_back(k->first);
520 if (!deleted.empty()) {
521 CJsonNode deletedValues = CJsonNode::NewArrayNode();
522 for (vector<string>::const_iterator k(deleted.begin());
523 k != deleted.end(); ++k)
524 deletedValues.AppendString( *k );
525 diff.SetByKey( "deleted", deletedValues );
526 }
527
528 // Deal with added items
529 vector<string> added;
530 for (map<string, string>::const_iterator k(new_values.begin());
531 k != new_values.end(); ++k)
532 if (old_values.find(k->first) == old_values.end())
533 added.push_back(k->first);
534 if (!added.empty()) {
535 CJsonNode addedValues = CJsonNode::NewArrayNode();
536 for (vector<string>::const_iterator k(added.begin());
537 k != added.end(); ++k)
538 addedValues.AppendString( *k );
539 diff.SetByKey( "added", addedValues );
540 }
541
542 // Deal with changed values
543 vector<string> changed;
544 for (map<string, string>::const_iterator k(new_values.begin());
545 k != new_values.end(); ++k) {
546 if (old_values.find(k->first) == old_values.end())
547 continue;
548 if (old_values.find(k->first)->second ==
549 new_values.find(k->first)->second)
550 continue;
551 changed.push_back(k->first);
552 }
553 if (!changed.empty()) {
554 CJsonNode changedValues = CJsonNode::NewObjectNode();
555 for (vector<string>::const_iterator k(changed.begin());
556 k != changed.end(); ++k) {
557 CJsonNode values = CJsonNode::NewArrayNode();
558 values.AppendString( old_values.find(*k)->second );
559 values.AppendString( new_values.find(*k)->second );
560 changedValues.SetByKey( *k, values );
561 }
562 diff.SetByKey( "changed", changedValues );
563 }
564
565 return diff;
566 }
567
568
569 // Validates the config from an ini file for the following:
570 // - a static queue redefines existed dynamic queue
571 void
x_ValidateConfiguration(const TQueueParams & queues_from_ini) const572 CQueueDataBase::x_ValidateConfiguration(
573 const TQueueParams & queues_from_ini) const
574 {
575 // Check that static queues do not mess with existing dynamic queues
576 for (TQueueParams::const_iterator k = queues_from_ini.begin();
577 k != queues_from_ini.end(); ++k) {
578 TQueueInfo::const_iterator existing = m_Queues.find(k->first);
579
580 if (existing == m_Queues.end())
581 continue;
582 if (existing->second.first.kind == CQueue::eKindDynamic)
583 NCBI_THROW(CNetScheduleException, eInvalidParameter,
584 "Configuration error. The queue '" + k->first +
585 "' clashes with a currently existing "
586 "dynamic queue of the same name.");
587 }
588
589 // Config file is OK for the current configuration
590 }
591
592
593 unsigned int
x_CountQueuesToAdd(const TQueueParams & queues_from_ini) const594 CQueueDataBase::x_CountQueuesToAdd(const TQueueParams & queues_from_ini) const
595 {
596 unsigned int add_count = 0;
597
598 for (TQueueParams::const_iterator k = queues_from_ini.begin();
599 k != queues_from_ini.end(); ++k) {
600
601 if (m_Queues.find(k->first) == m_Queues.end())
602 ++add_count;
603 }
604
605 return add_count;
606 }
607
608
609 // Updates what is stored in memory.
610 // Forms the diff string. Tells if there were changes.
611 bool
x_ConfigureQueueClasses(const TQueueParams & classes_from_ini,CJsonNode & diff)612 CQueueDataBase::x_ConfigureQueueClasses(const TQueueParams & classes_from_ini,
613 CJsonNode & diff)
614 {
615 bool has_changes = false;
616 vector<string> classes; // Used to store added and deleted classes
617
618 // Delete from existed what was not found in the new classes
619 for (TQueueParams::iterator k = m_QueueClasses.begin();
620 k != m_QueueClasses.end(); ++k) {
621 string old_queue_class = k->first;
622
623 if (classes_from_ini.find(old_queue_class) != classes_from_ini.end())
624 continue;
625
626 // The queue class is not in the configuration any more however it
627 // still could be in use for a dynamic queue. Leave it for the GC
628 // to check if the class has no reference to it and delete it
629 // accordingly.
630 // So, just mark it as for removal.
631
632 if (k->second.delete_request)
633 continue; // Has already been marked for deletion
634
635 k->second.delete_request = true;
636 classes.push_back(old_queue_class);
637 }
638
639 if (!classes.empty()) {
640 has_changes = true;
641 CJsonNode deleted_classes = CJsonNode::NewArrayNode();
642 for (vector<string>::const_iterator k = classes.begin();
643 k != classes.end(); ++k)
644 deleted_classes.AppendString( *k );
645 diff.SetByKey( "deleted_queue_classes", deleted_classes );
646 }
647
648
649 // Check the updates in the classes
650 classes.clear();
651
652 CJsonNode class_changes = CJsonNode::NewObjectNode();
653 for (TQueueParams::iterator k = m_QueueClasses.begin();
654 k != m_QueueClasses.end(); ++k) {
655
656 string queue_class = k->first;
657 TQueueParams::const_iterator new_class =
658 classes_from_ini.find(queue_class);
659
660 if (new_class == classes_from_ini.end())
661 continue; // It is a candidate for deletion, so no diff
662
663 // The same class found in the new configuration
664 if (k->second.delete_request) {
665 // The class was restored before GC deleted it. Update the flag
666 // and parameters
667 k->second = new_class->second;
668 classes.push_back(queue_class);
669 continue;
670 }
671
672 // That's the same class which possibly was updated
673 // Do not compare class name here, this is a class itself
674 // Description should be compared
675 CJsonNode class_diff = k->second.Diff(new_class->second,
676 false, true);
677
678 if (class_diff.GetSize() > 0) {
679 // There is a difference, update the class info
680 k->second = new_class->second;
681
682 class_changes.SetByKey(queue_class, class_diff);
683 has_changes = true;
684 }
685 }
686 if (class_changes.GetSize() > 0)
687 diff.SetByKey("queue_class_changes", class_changes);
688
689 // Check what was added
690 for (TQueueParams::const_iterator k = classes_from_ini.begin();
691 k != classes_from_ini.end(); ++k) {
692 string new_queue_class = k->first;
693
694 if (m_QueueClasses.find(new_queue_class) == m_QueueClasses.end()) {
695 m_QueueClasses[new_queue_class] = k->second;
696 classes.push_back(new_queue_class);
697 }
698 }
699
700 if (!classes.empty()) {
701 has_changes = true;
702 CJsonNode added_classes = CJsonNode::NewArrayNode();
703 for (vector<string>::const_iterator k = classes.begin();
704 k != classes.end(); ++k)
705 added_classes.AppendString(*k);
706 diff.SetByKey("added_queue_classes", added_classes);
707 }
708
709 return has_changes;
710 }
711
712
713 // Updates the queue info in memory and creates/marks for deletion
714 // queues if necessary.
715 bool
x_ConfigureQueues(const TQueueParams & queues_from_ini,CJsonNode & diff)716 CQueueDataBase::x_ConfigureQueues(const TQueueParams & queues_from_ini,
717 CJsonNode & diff)
718 {
719 bool has_changes = false;
720 vector<string> deleted_queues;
721
722 // Mark for deletion what disappeared
723 for (TQueueInfo::iterator k = m_Queues.begin();
724 k != m_Queues.end(); ++k) {
725 if (k->second.first.kind == CQueue::eKindDynamic)
726 continue; // It's not the config business to deal
727 // with dynamic queues
728
729 string old_queue = k->first;
730 if (queues_from_ini.find(old_queue) != queues_from_ini.end())
731 continue;
732
733 // The queue is not in the configuration any more. It could
734 // still be in use or jobs could be still there. So mark it
735 // for deletion and forbid submits for the queue.
736 // GC will later delete it.
737
738 if (k->second.first.delete_request)
739 continue; // Has already been marked for deletion
740
741 CRef<CQueue> queue = k->second.second;
742 queue->SetRefuseSubmits(true);
743
744 k->second.first.delete_request = true;
745 deleted_queues.push_back(k->first);
746 }
747
748 if (!deleted_queues.empty()) {
749 has_changes = true;
750 CJsonNode deleted = CJsonNode::NewArrayNode();
751 for (vector<string>::const_iterator k = deleted_queues.begin();
752 k != deleted_queues.end(); ++k)
753 deleted.AppendString(*k);
754 diff.SetByKey("deleted_queues", deleted);
755 }
756
757
758 // Check the updates in the queue parameters
759 vector< pair<string,
760 string> > added_queues;
761 CJsonNode section_changes = CJsonNode::NewObjectNode();
762
763 for (TQueueInfo::iterator k = m_Queues.begin();
764 k != m_Queues.end(); ++k) {
765
766 // The server configuration which affects the performance logging for
767 // queues could have been changed, so let the queues update the flag
768 k->second.second->UpdatePerfLoggingSettings(k->second.first.qclass);
769
770
771 if (k->second.first.kind == CQueue::eKindDynamic)
772 continue; // It's not the config business to deal
773 // with dynamic queues
774
775 string queue_name = k->first;
776 TQueueParams::const_iterator new_queue =
777 queues_from_ini.find(queue_name);
778
779 if (new_queue == queues_from_ini.end())
780 continue; // It is a candidate for deletion, or a dynamic queue;
781 // So no diff
782
783 // The same queue is in the new configuration
784 if (k->second.first.delete_request) {
785 // The queue was restored before GC deleted it. Update the flag,
786 // parameters and allows submits and update parameters if so.
787 CRef<CQueue> queue = k->second.second;
788 queue->SetParameters(new_queue->second);
789 queue->SetRefuseSubmits(false);
790
791 // The queue kind could not be changed here.
792 // The delete request is just checked.
793 k->second.first = new_queue->second;
794 added_queues.push_back(make_pair(queue_name,
795 k->second.first.qclass));
796 continue;
797 }
798
799
800 // That's the same queue which possibly was updated
801 // Class name should also be compared here
802 // Description should be compared here
803 CJsonNode queue_diff = k->second.first.Diff(new_queue->second,
804 true, true);
805
806 if (queue_diff.GetSize() > 0) {
807 // There is a difference, update the queue info and the queue
808 CRef<CQueue> queue = k->second.second;
809 queue->SetParameters(new_queue->second);
810
811 // The queue kind could not be changed here.
812 // The queue delete request could not be changed here.
813 k->second.first = new_queue->second;
814
815 section_changes.SetByKey(queue_name, queue_diff);
816 has_changes = true;
817 }
818 }
819
820 // Check dynamic queues classes. They might be updated.
821 for (TQueueInfo::iterator k = m_Queues.begin();
822 k != m_Queues.end(); ++k) {
823
824 if (k->second.first.kind != CQueue::eKindDynamic)
825 continue;
826 if (k->second.first.delete_request == true)
827 continue;
828
829 // OK, this is dynamic queue, alive and not set for deletion
830 // Check if its class parameters have been updated/
831 TQueueParams::const_iterator queue_class =
832 m_QueueClasses.find(k->second.first.qclass);
833 if (queue_class == m_QueueClasses.end()) {
834 ERR_POST("Cannot find class '" + k->second.first.qclass +
835 "' for dynamic queue '" + k->first +
836 "'. Unexpected internal data inconsistency.");
837 continue;
838 }
839
840 // Do not compare classes
841 // Do not compare description
842 // They do not make sense for dynamic queues because they are created
843 // with their own descriptions and the class does not have the 'class'
844 // field
845 CJsonNode class_diff = k->second.first.Diff(queue_class->second,
846 false, false);
847 if (class_diff.GetSize() > 0) {
848 // There is a difference in the queue class - update the
849 // parameters.
850 string old_class = k->second.first.qclass;
851 string old_description = k->second.first.description;
852
853 CRef<CQueue> queue = k->second.second;
854 queue->SetParameters(queue_class->second);
855
856 k->second.first = queue_class->second;
857 k->second.first.qclass = old_class;
858 k->second.first.description = old_description;
859 k->second.first.kind = CQueue::eKindDynamic;
860
861 section_changes.SetByKey(k->first, class_diff);
862 has_changes = true;
863 }
864 }
865
866 if (section_changes.GetSize() > 0)
867 diff.SetByKey("queue_changes", section_changes);
868
869
870 // Check what was added
871 for (TQueueParams::const_iterator k = queues_from_ini.begin();
872 k != queues_from_ini.end(); ++k) {
873 string new_queue_name = k->first;
874
875 if (m_Queues.find(new_queue_name) == m_Queues.end()) {
876 x_CreateAndMountQueue(new_queue_name, k->second);
877 added_queues.push_back(make_pair(new_queue_name, k->second.qclass));
878 }
879 }
880
881 if (!added_queues.empty()) {
882 has_changes = true;
883 CJsonNode added = CJsonNode::NewObjectNode();
884 for (vector< pair<string, string> >::const_iterator
885 k = added_queues.begin();
886 k != added_queues.end(); ++k)
887 added.SetByKey(k->first, CJsonNode::NewStringNode(k->second));
888 diff.SetByKey("added_queues", added);
889 }
890
891 return has_changes;
892 }
893
894
Configure(const IRegistry & reg,CJsonNode & diff)895 time_t CQueueDataBase::Configure(const IRegistry & reg,
896 CJsonNode & diff)
897 {
898 CFastMutexGuard guard(m_ConfigureLock);
899
900 // Read the configured queues and classes from the ini file
901 TQueueParams classes_from_ini =
902 x_ReadIniFileQueueClassDescriptions(reg);
903 TQueueParams queues_from_ini =
904 x_ReadIniFileQueueDescriptions(reg,
905 classes_from_ini);
906
907 // Validate basic consistency of the incoming configuration
908 x_ValidateConfiguration(queues_from_ini);
909
910 x_ReadLinkedSections(reg, diff);
911
912 // Check that the there are enough slots for the new queues if so
913 // configured
914 unsigned int to_add_count = x_CountQueuesToAdd(queues_from_ini);
915 unsigned int available_count = m_MaxQueues - m_Queues.size();
916
917 if (to_add_count > available_count)
918 NCBI_THROW(CNetScheduleException, eInvalidParameter,
919 "New configuration slots requirement: " +
920 to_string(to_add_count) +
921 ". Number of available slots: " +
922 to_string(available_count) + ".");
923
924 // Here: validation is finished. There is enough resources for the new
925 // configuration.
926 x_ConfigureQueueClasses(classes_from_ini, diff);
927 x_ConfigureQueues(queues_from_ini, diff);
928 return CalculateRuntimePrecision();
929 }
930
931
CalculateRuntimePrecision(void) const932 CNSPreciseTime CQueueDataBase::CalculateRuntimePrecision(void) const
933 {
934 // Calculate the new min_run_timeout: required at the time of loading
935 // NetSchedule and not used while reconfiguring on the fly
936 CNSPreciseTime min_precision = kTimeNever;
937 for (TQueueParams::const_iterator k = m_QueueClasses.begin();
938 k != m_QueueClasses.end(); ++k)
939 min_precision = std::min(min_precision,
940 k->second.CalculateRuntimePrecision());
941 for (TQueueInfo::const_iterator k = m_Queues.begin();
942 k != m_Queues.end(); ++k)
943 min_precision = std::min(min_precision,
944 k->second.first.CalculateRuntimePrecision());
945 return min_precision;
946 }
947
948
CountActiveJobs(void) const949 unsigned int CQueueDataBase::CountActiveJobs(void) const
950 {
951 unsigned int cnt = 0;
952 CFastMutexGuard guard(m_ConfigureLock);
953
954 for (TQueueInfo::const_iterator k = m_Queues.begin();
955 k != m_Queues.end(); ++k)
956 cnt += k->second.second->CountActiveJobs();
957
958 return cnt;
959 }
960
961
CountAllJobs(void) const962 unsigned int CQueueDataBase::CountAllJobs(void) const
963 {
964 unsigned int cnt = 0;
965 CFastMutexGuard guard(m_ConfigureLock);
966
967 for (TQueueInfo::const_iterator k = m_Queues.begin();
968 k != m_Queues.end(); ++k)
969 cnt += k->second.second->CountAllJobs();
970
971 return cnt;
972 }
973
974
AnyJobs(void) const975 bool CQueueDataBase::AnyJobs(void) const
976 {
977 CFastMutexGuard guard(m_ConfigureLock);
978
979 for (TQueueInfo::const_iterator k = m_Queues.begin();
980 k != m_Queues.end(); ++k)
981 if (k->second.second->AnyJobs())
982 return true;
983
984 return false;
985 }
986
987
OpenQueue(const string & name)988 CRef<CQueue> CQueueDataBase::OpenQueue(const string & name)
989 {
990 CFastMutexGuard guard(m_ConfigureLock);
991 TQueueInfo::const_iterator found = m_Queues.find(name);
992
993 if (found == m_Queues.end())
994 NCBI_THROW(CNetScheduleException, eUnknownQueue,
995 "Queue '" + name + "' is not found.");
996
997 return found->second.second;
998 }
999
1000
1001 void
x_CreateAndMountQueue(const string & qname,const SQueueParameters & params)1002 CQueueDataBase::x_CreateAndMountQueue(const string & qname,
1003 const SQueueParameters & params)
1004 {
1005 unique_ptr<CQueue> q(new CQueue(qname, params.kind, m_Server, *this));
1006
1007 q->Attach();
1008 q->SetParameters(params);
1009
1010 m_Queues[qname] = make_pair(params, q.release());
1011
1012 GetDiagContext().Extra()
1013 .Print("_type", "startup")
1014 .Print("_queue", qname)
1015 .Print("qclass", params.qclass)
1016 .Print("info", "mount");
1017 }
1018
1019
QueueExists(const string & qname) const1020 bool CQueueDataBase::QueueExists(const string & qname) const
1021 {
1022 CFastMutexGuard guard(m_ConfigureLock);
1023 return m_Queues.find(qname) != m_Queues.end();
1024 }
1025
1026
CreateDynamicQueue(const CNSClientId & client,const string & qname,const string & qclass,const string & description)1027 void CQueueDataBase::CreateDynamicQueue(const CNSClientId & client,
1028 const string & qname,
1029 const string & qclass,
1030 const string & description)
1031 {
1032 CFastMutexGuard guard(m_ConfigureLock);
1033
1034 // Queue name clashes
1035 if (m_Queues.find(qname) != m_Queues.end())
1036 NCBI_THROW(CNetScheduleException, eDuplicateName,
1037 "Queue '" + qname + "' already exists.");
1038
1039 // Queue class existance
1040 TQueueParams::const_iterator queue_class = m_QueueClasses.find(qclass);
1041 if (queue_class == m_QueueClasses.end())
1042 NCBI_THROW(CNetScheduleException, eUnknownQueueClass,
1043 "Queue class '" + qclass +
1044 "' for queue '" + qname + "' is not found.");
1045
1046 // And class is not marked for deletion
1047 if (queue_class->second.delete_request)
1048 NCBI_THROW(CNetScheduleException, eUnknownQueueClass,
1049 "Queue class '" + qclass +
1050 "' for queue '" + qname + "' is marked for deletion.");
1051
1052 // Slot availability
1053 if ((int)m_MaxQueues - (int)m_Queues.size() <= 0)
1054 NCBI_THROW(CNetScheduleException, eUnknownQueue,
1055 "Cannot allocate queue '" + qname +
1056 "'. max_queues limit reached.");
1057
1058 // submitter and program restrictions must be checked
1059 // for the class
1060 if (!client.IsAdmin()) {
1061 if (!queue_class->second.subm_hosts.empty()) {
1062 CNetScheduleAccessList acl;
1063 acl.SetHosts(queue_class->second.subm_hosts);
1064 if (!acl.IsAllowed(client.GetAddress())) {
1065 m_Server->RegisterAlert(eAccess, "submitter privileges required"
1066 " to create a dynamic queue");
1067 NCBI_THROW(CNetScheduleException, eAccessDenied,
1068 "Access denied: submitter privileges required");
1069 }
1070 }
1071 if (!queue_class->second.program_name.empty()) {
1072 CQueueClientInfoList acl;
1073 bool ok = false;
1074
1075 acl.AddClientInfo(queue_class->second.program_name);
1076 try {
1077 CQueueClientInfo auth_prog_info;
1078 ParseVersionString(client.GetProgramName(),
1079 &auth_prog_info.client_name,
1080 &auth_prog_info.version_info);
1081 ok = acl.IsMatchingClient(auth_prog_info);
1082 } catch (...) {
1083 // Parsing errors
1084 ok = false;
1085 }
1086
1087 if (!ok) {
1088 m_Server->RegisterAlert(eAccess, "program privileges required "
1089 "to create a dynamic queue");
1090 NCBI_THROW(CNetScheduleException, eAccessDenied,
1091 "Access denied: program privileges required");
1092 }
1093 }
1094 }
1095
1096
1097 // All the preconditions are met. Create the queue
1098 SQueueParameters params = queue_class->second;
1099
1100 params.kind = CQueue::eKindDynamic;
1101 params.delete_request = false;
1102 params.qclass = qclass;
1103 params.description = description;
1104
1105 x_CreateAndMountQueue(qname, params);
1106 }
1107
1108
DeleteDynamicQueue(const CNSClientId & client,const string & qname)1109 void CQueueDataBase::DeleteDynamicQueue(const CNSClientId & client,
1110 const string & qname)
1111 {
1112 CFastMutexGuard guard(m_ConfigureLock);
1113 TQueueInfo::iterator found_queue = m_Queues.find(qname);
1114
1115 if (found_queue == m_Queues.end())
1116 NCBI_THROW(CNetScheduleException, eUnknownQueue,
1117 "Queue '" + qname + "' is not found." );
1118
1119 if (found_queue->second.first.kind != CQueue::eKindDynamic)
1120 NCBI_THROW(CNetScheduleException, eInvalidParameter,
1121 "Queue '" + qname + "' is static and cannot be deleted.");
1122
1123 // submitter and program restrictions must be checked
1124 CRef<CQueue> queue = found_queue->second.second;
1125 if (!client.IsAdmin()) {
1126 if (!queue->IsSubmitAllowed(client.GetAddress()))
1127 NCBI_THROW(CNetScheduleException, eAccessDenied,
1128 "Access denied: submitter privileges required");
1129 if (!queue->IsProgramAllowed(client.GetProgramName()))
1130 NCBI_THROW(CNetScheduleException, eAccessDenied,
1131 "Access denied: program privileges required");
1132 }
1133
1134 found_queue->second.first.delete_request = true;
1135 queue->SetRefuseSubmits(true);
1136 }
1137
1138
QueueInfo(const string & qname) const1139 SQueueParameters CQueueDataBase::QueueInfo(const string & qname) const
1140 {
1141 CFastMutexGuard guard(m_ConfigureLock);
1142 TQueueInfo::const_iterator found_queue = m_Queues.find(qname);
1143
1144 if (found_queue == m_Queues.end())
1145 NCBI_THROW(CNetScheduleException, eUnknownQueue,
1146 "Queue '" + qname + "' is not found." );
1147
1148 return x_SingleQueueInfo(found_queue);
1149 }
1150
1151
1152 /* Note: this member must be called under a lock, it's not thread safe */
1153 SQueueParameters
x_SingleQueueInfo(TQueueInfo::const_iterator found) const1154 CQueueDataBase::x_SingleQueueInfo(TQueueInfo::const_iterator found) const
1155 {
1156 SQueueParameters params = found->second.first;
1157
1158 // The fields below are used as a transport.
1159 // Usually used by QINF2 and STAT QUEUES
1160 params.refuse_submits = found->second.second->GetRefuseSubmits();
1161 params.pause_status = found->second.second->GetPauseStatus();
1162 params.max_aff_slots = m_Server->GetAffRegistrySettings().max_records;
1163 params.aff_slots_used = found->second.second->GetAffSlotsUsed();
1164 params.max_group_slots = m_Server->GetGroupRegistrySettings().max_records;
1165 params.group_slots_used = found->second.second->GetGroupSlotsUsed();
1166 params.max_scope_slots = m_Server->GetScopeRegistrySettings().max_records;
1167 params.scope_slots_used = found->second.second->GetScopeSlotsUsed();
1168 params.clients = found->second.second->GetClientsCount();
1169 params.groups = found->second.second->GetGroupsCount();
1170 params.gc_backlog = found->second.second->GetGCBacklogCount();
1171 params.notif_count = found->second.second->GetNotifCount();
1172 return params;
1173 }
1174
1175
GetQueueNames(const string & sep) const1176 string CQueueDataBase::GetQueueNames(const string & sep) const
1177 {
1178 string names;
1179 CFastMutexGuard guard(m_ConfigureLock);
1180
1181 for (TQueueInfo::const_iterator k = m_Queues.begin();
1182 k != m_Queues.end(); ++k)
1183 names += k->first + sep;
1184
1185 return names;
1186 }
1187
1188
Close(void)1189 void CQueueDataBase::Close(void)
1190 {
1191 StopNotifThread();
1192 StopPurgeThread();
1193 StopServiceThread();
1194 StopExecutionWatcherThread();
1195
1196 // Print the statistics counters last time
1197 if (m_Server->IsLogStatisticsThread()) {
1198 size_t aff_count = 0;
1199
1200 PrintStatistics(aff_count);
1201 CStatisticsCounters::PrintServerWide(aff_count);
1202 }
1203
1204 if (m_Server->IsDrainShutdown() && m_Server->WasDBDrained()) {
1205 // That was a not interrupted drain shutdown so there is no
1206 // need to dump anything
1207 LOG_POST("Drained shutdown: the DB has been successfully drained");
1208 x_RemoveDumpErrorFlagFile();
1209 } else {
1210 // That was either:
1211 // - hard shutdown
1212 // - hard shutdown when drained shutdown is in process
1213 if (m_Server->IsDrainShutdown())
1214 ERR_POST(Warning <<
1215 "Drained shutdown: DB draining has not been completed "
1216 "when a hard shutdown is received. "
1217 "Shutting down immediately.");
1218
1219 // Dump all the queues/queue classes/queue parameters to flat files
1220 if (!m_Diskless)
1221 x_Dump();
1222
1223 m_QueueClasses.clear();
1224
1225 // CQueue objects destructors are called from here because the last
1226 // reference to the object has gone
1227 m_Queues.clear();
1228 }
1229
1230 if (!m_Diskless) {
1231 x_RemoveDataFiles();
1232 x_RemoveCrashFlagFile();
1233 }
1234 }
1235
1236
PrintTransitionCounters(void)1237 string CQueueDataBase::PrintTransitionCounters(void)
1238 {
1239 string result;
1240 CFastMutexGuard guard(m_ConfigureLock);
1241 for (TQueueInfo::const_iterator k = m_Queues.begin();
1242 k != m_Queues.end(); ++k)
1243 result += "OK:[queue " + k->first + "]\n" +
1244 k->second.second->PrintTransitionCounters();
1245 return result;
1246 }
1247
1248
PrintJobsStat(const CNSClientId & client)1249 string CQueueDataBase::PrintJobsStat(const CNSClientId & client)
1250 {
1251 string result;
1252 vector<string> warnings;
1253 CFastMutexGuard guard(m_ConfigureLock);
1254 for (TQueueInfo::const_iterator k = m_Queues.begin();
1255 k != m_Queues.end(); ++k)
1256 // Group and affinity tokens make no sense for the server,
1257 // so they are both "".
1258 result += "OK:[queue " + k->first + "]\n" +
1259 k->second.second->PrintJobsStat(client, "", "", warnings);
1260 return result;
1261 }
1262
1263
GetQueueClassesInfo(void) const1264 string CQueueDataBase::GetQueueClassesInfo(void) const
1265 {
1266 string output;
1267 output.reserve(16384);
1268
1269 CFastMutexGuard guard(m_ConfigureLock);
1270
1271 for (TQueueParams::const_iterator k = m_QueueClasses.begin();
1272 k != m_QueueClasses.end(); ++k) {
1273 if (!output.empty())
1274 output.append(kNewLine);
1275
1276 // false - not to include qclass
1277 // false - not URL encoded format
1278 output.append("OK:[qclass ")
1279 .append(k->first)
1280 .append(1, ']')
1281 .append(kNewLine)
1282 .append(k->second.GetPrintableParameters(false, false));
1283
1284 for (map<string, string>::const_iterator
1285 j = k->second.linked_sections.begin();
1286 j != k->second.linked_sections.end(); ++j) {
1287 string prefix((j->first).c_str() + strlen("linked_section_"));
1288 string section_name = j->second;
1289
1290 map<string, string> values = GetLinkedSection(section_name);
1291 for (map<string, string>::const_iterator m = values.begin();
1292 m != values.end(); ++m)
1293 output.append(kNewLine)
1294 .append("OK:")
1295 .append(prefix)
1296 .append(1, '.')
1297 .append(m->first)
1298 .append(": ")
1299 .append(m->second);
1300 }
1301 }
1302 return output;
1303 }
1304
1305
GetQueueClassesConfig(void) const1306 string CQueueDataBase::GetQueueClassesConfig(void) const
1307 {
1308 string output;
1309 CFastMutexGuard guard(m_ConfigureLock);
1310 for (TQueueParams::const_iterator k = m_QueueClasses.begin();
1311 k != m_QueueClasses.end(); ++k) {
1312 if (!output.empty())
1313 output.append(kNewLine);
1314 output.append("[qclass_")
1315 .append(k->first)
1316 .append(1, ']')
1317 .append(kNewLine)
1318 .append(k->second.ConfigSection(true));
1319 }
1320 return output;
1321 }
1322
1323
GetQueueInfo(void) const1324 string CQueueDataBase::GetQueueInfo(void) const
1325 {
1326 string output;
1327 CFastMutexGuard guard(m_ConfigureLock);
1328
1329 for (TQueueInfo::const_iterator k = m_Queues.begin();
1330 k != m_Queues.end(); ++k) {
1331 if (!output.empty())
1332 output += "\n";
1333
1334 // true - include qclass
1335 // false - not URL encoded format
1336 output += "OK:[queue " + k->first + "]\n" +
1337 x_SingleQueueInfo(k).GetPrintableParameters(true, false);
1338
1339 for (map<string, string>::const_iterator
1340 j = k->second.first.linked_sections.begin();
1341 j != k->second.first.linked_sections.end(); ++j) {
1342 string prefix((j->first).c_str() + strlen("linked_section_"));
1343 string section_name = j->second;
1344
1345 map<string, string> values = GetLinkedSection(section_name);
1346 for (map<string, string>::const_iterator m = values.begin();
1347 m != values.end(); ++m)
1348 output += "\nOK:" + prefix + "." + m->first + ": " + m->second;
1349 }
1350 }
1351 return output;
1352 }
1353
GetQueueConfig(void) const1354 string CQueueDataBase::GetQueueConfig(void) const
1355 {
1356 string output;
1357 CFastMutexGuard guard(m_ConfigureLock);
1358 for (TQueueInfo::const_iterator k = m_Queues.begin();
1359 k != m_Queues.end(); ++k) {
1360 if (!output.empty())
1361 output += "\n";
1362 output += "[queue_" + k->first + "]\n" +
1363 k->second.first.ConfigSection(false);
1364 }
1365 return output;
1366 }
1367
1368
GetLinkedSectionConfig(void) const1369 string CQueueDataBase::GetLinkedSectionConfig(void) const
1370 {
1371 string output;
1372 CFastMutexGuard guard(m_LinkedSectionsGuard);
1373
1374 for (map< string, map< string, string > >::const_iterator
1375 k = m_LinkedSections.begin();
1376 k != m_LinkedSections.end(); ++k) {
1377 if (!output.empty())
1378 output += "\n";
1379 output += "[" + k->first + "]\n";
1380 for (map< string, string >::const_iterator j = k->second.begin();
1381 j != k->second.end(); ++j) {
1382 output += j->first + "=\"" + j->second + "\"\n";
1383 }
1384 }
1385 return output;
1386 }
1387
1388
1389 map<string, string>
GetLinkedSection(const string & section_name) const1390 CQueueDataBase::GetLinkedSection(const string & section_name) const
1391 {
1392 CFastMutexGuard guard(m_LinkedSectionsGuard);
1393 map< string, map<string, string> >::const_iterator found =
1394 m_LinkedSections.find(section_name);
1395 if (found == m_LinkedSections.end())
1396 return map<string, string>();
1397 return found->second;
1398 }
1399
1400
1401 map<string, int>
GetPauseQueues(void) const1402 CQueueDataBase::GetPauseQueues(void) const
1403 {
1404 map<string, int> pause_states;
1405 CFastMutexGuard guard(m_ConfigureLock);
1406
1407 for (TQueueInfo::const_iterator k = m_Queues.begin();
1408 k != m_Queues.end(); ++k) {
1409 int pause_state = k->second.second->GetPauseStatus();
1410 if (pause_state != CQueue::eNoPause)
1411 pause_states[k->first] = pause_state;
1412 }
1413 return pause_states;
1414 }
1415
1416
1417 vector<string>
GetRefuseSubmitQueues(void) const1418 CQueueDataBase::GetRefuseSubmitQueues(void) const
1419 {
1420 vector<string> refuse_submit_queues;
1421 CFastMutexGuard guard(m_ConfigureLock);
1422
1423 for (TQueueInfo::const_iterator k = m_Queues.begin();
1424 k != m_Queues.end(); ++k) {
1425 if (k->second.second->GetRefuseSubmits())
1426 refuse_submit_queues.push_back(k->first);
1427 }
1428 return refuse_submit_queues;
1429 }
1430
1431
NotifyListeners(void)1432 void CQueueDataBase::NotifyListeners(void)
1433 {
1434 CNSPreciseTime current_time = CNSPreciseTime::Current();
1435 for (unsigned int index = 0; ; ++index) {
1436 CRef<CQueue> queue = x_GetQueueAt(index);
1437 if (queue.IsNull())
1438 break;
1439 queue->NotifyListenersPeriodically(current_time);
1440 }
1441 }
1442
1443
PrintStatistics(size_t & aff_count)1444 void CQueueDataBase::PrintStatistics(size_t & aff_count)
1445 {
1446 CFastMutexGuard guard(m_ConfigureLock);
1447
1448 for (TQueueInfo::const_iterator k = m_Queues.begin();
1449 k != m_Queues.end(); ++k)
1450 k->second.second->PrintStatistics(aff_count);
1451 }
1452
1453
PrintJobCounters(void)1454 void CQueueDataBase::PrintJobCounters(void)
1455 {
1456 CFastMutexGuard guard(m_ConfigureLock);
1457
1458 for (TQueueInfo::const_iterator k = m_Queues.begin();
1459 k != m_Queues.end(); ++k)
1460 k->second.second->PrintJobCounters();
1461 }
1462
1463
CheckExecutionTimeout(bool logging)1464 void CQueueDataBase::CheckExecutionTimeout(bool logging)
1465 {
1466 for (unsigned int index = 0; ; ++index) {
1467 CRef<CQueue> queue = x_GetQueueAt(index);
1468 if (queue.IsNull())
1469 break;
1470 queue->CheckExecutionTimeout(logging);
1471 }
1472 }
1473
1474
1475 // Checks if the queues marked for deletion could be really deleted
1476 // and deletes them if so. Deletes queue classes which are marked
1477 // for deletion if there are no links to them.
x_DeleteQueuesAndClasses(void)1478 void CQueueDataBase::x_DeleteQueuesAndClasses(void)
1479 {
1480 // It's better to avoid quering the queues under a lock so
1481 // let's first build a list of CRefs to the candidate queues.
1482 list< pair< string, CRef< CQueue > > > candidates;
1483
1484 {{
1485 CFastMutexGuard guard(m_ConfigureLock);
1486 for (TQueueInfo::const_iterator k = m_Queues.begin();
1487 k != m_Queues.end(); ++k)
1488 if (k->second.first.delete_request)
1489 candidates.push_back(make_pair(k->first, k->second.second));
1490 }}
1491
1492 // Now the queues are queiried if they are empty without a lock
1493 list< pair< string, CRef< CQueue > > >::iterator
1494 k = candidates.begin();
1495 while (k != candidates.end()) {
1496 if (k->second->IsEmpty() == false)
1497 k = candidates.erase(k);
1498 else
1499 ++k;
1500 }
1501
1502 if (candidates.empty())
1503 return;
1504
1505 // Here we have a list of the queues which can be deleted
1506 // Take a lock and delete the queues plus check queue classes
1507 CFastMutexGuard guard(m_ConfigureLock);
1508 for (k = candidates.begin(); k != candidates.end(); ++k) {
1509 // It's only here where a queue can be deleted so it's safe not
1510 // to check the iterator
1511 TQueueInfo::iterator queue = m_Queues.find(k->first);
1512
1513 // Deallocation of the DB block will be done later when the queue
1514 // is actually deleted
1515 // queue->second.second->MarkForTruncating();
1516 m_Queues.erase(queue);
1517 }
1518
1519 // Now, while still holding the lock, let's check queue classes
1520 vector< string > classes_to_delete;
1521 for (TQueueParams::const_iterator j = m_QueueClasses.begin();
1522 j != m_QueueClasses.end(); ++j) {
1523 if (j->second.delete_request) {
1524 bool in_use = false;
1525 for (TQueueInfo::const_iterator m = m_Queues.begin();
1526 m != m_Queues.end(); ++m) {
1527 if (m->second.first.qclass == j->first) {
1528 in_use = true;
1529 break;
1530 }
1531 }
1532 if (in_use == false)
1533 classes_to_delete.push_back(j->first);
1534 }
1535 }
1536
1537 for (vector< string >::const_iterator k = classes_to_delete.begin();
1538 k != classes_to_delete.end(); ++k) {
1539 // It's only here where a queue class can be deleted so
1540 // it's safe not to check the iterator
1541 m_QueueClasses.erase(m_QueueClasses.find(*k));
1542 }
1543 }
1544
1545
1546 /* Data used in CQueueDataBase::Purge() only */
1547 static CNetScheduleAPI::EJobStatus statuses_to_delete_from[] = {
1548 CNetScheduleAPI::eFailed,
1549 CNetScheduleAPI::eCanceled,
1550 CNetScheduleAPI::eDone,
1551 CNetScheduleAPI::ePending,
1552 CNetScheduleAPI::eReadFailed,
1553 CNetScheduleAPI::eConfirmed
1554 };
1555 const size_t kStatusesSize = sizeof(statuses_to_delete_from) /
1556 sizeof(CNetScheduleAPI::EJobStatus);
1557
Purge(void)1558 void CQueueDataBase::Purge(void)
1559 {
1560 size_t max_mark_deleted = m_Server->GetMarkdelBatchSize();
1561 size_t max_scanned = m_Server->GetScanBatchSize();
1562 size_t total_scanned = 0;
1563 size_t total_mark_deleted = 0;
1564 CNSPreciseTime current_time = CNSPreciseTime::Current();
1565 bool limit_reached = false;
1566
1567 // Cleanup the queues and classes if possible
1568 x_DeleteQueuesAndClasses();
1569
1570 // Part I: from the last end till the end of the list
1571 CRef<CQueue> start_queue = x_GetLastPurged();
1572 CRef<CQueue> current_queue = start_queue;
1573 size_t start_status_index = m_PurgeStatusIndex;
1574 unsigned int start_job_id = m_PurgeJobScanned;
1575
1576 while (current_queue.IsNull() == false) {
1577 m_PurgeQueue = current_queue->GetQueueName();
1578 if (x_PurgeQueue(current_queue.GetObject(),
1579 m_PurgeStatusIndex, kStatusesSize - 1,
1580 m_PurgeJobScanned, 0,
1581 max_scanned, max_mark_deleted,
1582 current_time,
1583 total_scanned, total_mark_deleted) == true)
1584 return;
1585
1586 if (total_mark_deleted >= max_mark_deleted ||
1587 total_scanned >= max_scanned) {
1588 limit_reached = true;
1589 break;
1590 }
1591 current_queue = x_GetNext(m_PurgeQueue);
1592 }
1593
1594
1595 // Part II: from the beginning of the list till the last end
1596 if (limit_reached == false) {
1597 current_queue = x_GetFirst();
1598 while (current_queue.IsNull() == false) {
1599 if (current_queue->GetQueueName() == start_queue->GetQueueName())
1600 break;
1601
1602 m_PurgeQueue = current_queue->GetQueueName();
1603 if (x_PurgeQueue(current_queue.GetObject(),
1604 m_PurgeStatusIndex, kStatusesSize - 1,
1605 m_PurgeJobScanned, 0,
1606 max_scanned, max_mark_deleted,
1607 current_time,
1608 total_scanned, total_mark_deleted) == true)
1609 return;
1610
1611 if (total_mark_deleted >= max_mark_deleted ||
1612 total_scanned >= max_scanned) {
1613 limit_reached = true;
1614 break;
1615 }
1616 current_queue = x_GetNext(m_PurgeQueue);
1617 }
1618 }
1619
1620 // Part III: it might need to check the statuses in the queue we started
1621 // with if the start status was not the first one.
1622 if (limit_reached == false) {
1623 if (start_queue.IsNull() == false) {
1624 m_PurgeQueue = start_queue->GetQueueName();
1625 if (start_status_index > 0) {
1626 if (x_PurgeQueue(start_queue.GetObject(),
1627 0, start_status_index - 1,
1628 m_PurgeJobScanned, 0,
1629 max_scanned, max_mark_deleted,
1630 current_time,
1631 total_scanned, total_mark_deleted) == true)
1632 return;
1633 }
1634 }
1635 }
1636
1637 if (limit_reached == false) {
1638 if (start_queue.IsNull() == false) {
1639 m_PurgeQueue = start_queue->GetQueueName();
1640 if (x_PurgeQueue(start_queue.GetObject(),
1641 start_status_index, start_status_index,
1642 0, start_job_id,
1643 max_scanned, max_mark_deleted,
1644 current_time,
1645 total_scanned, total_mark_deleted) == true)
1646 return;
1647 }
1648 }
1649
1650
1651 // Part IV: purge the found candidates and optimize the memory if required
1652 m_FreeStatusMemCnt += x_PurgeUnconditional();
1653
1654 x_OptimizeStatusMatrix(current_time);
1655 }
1656
1657
x_GetLastPurged(void)1658 CRef<CQueue> CQueueDataBase::x_GetLastPurged(void)
1659 {
1660 CFastMutexGuard guard(m_ConfigureLock);
1661
1662 if (m_PurgeQueue.empty()) {
1663 if (m_Queues.empty())
1664 return CRef<CQueue>(NULL);
1665 return m_Queues.begin()->second.second;
1666 }
1667
1668 for (TQueueInfo::iterator it = m_Queues.begin();
1669 it != m_Queues.end(); ++it)
1670 if (it->first == m_PurgeQueue)
1671 return it->second.second;
1672
1673 // Not found, which means the queue was deleted. Pick a random one
1674 m_PurgeStatusIndex = 0;
1675 m_PurgeJobScanned = 0;
1676
1677 int queue_num = ((rand() * 1.0) / RAND_MAX) * m_Queues.size();
1678 int k = 1;
1679 for (TQueueInfo::iterator it = m_Queues.begin();
1680 it != m_Queues.end(); ++it) {
1681 if (k >= queue_num)
1682 return it->second.second;
1683 ++k;
1684 }
1685
1686 // Cannot happened, so just be consistent with the return value
1687 return m_Queues.begin()->second.second;
1688 }
1689
1690
x_GetFirst(void)1691 CRef<CQueue> CQueueDataBase::x_GetFirst(void)
1692 {
1693 CFastMutexGuard guard(m_ConfigureLock);
1694
1695 if (m_Queues.empty())
1696 return CRef<CQueue>(NULL);
1697 return m_Queues.begin()->second.second;
1698 }
1699
1700
x_GetNext(const string & current_name)1701 CRef<CQueue> CQueueDataBase::x_GetNext(const string & current_name)
1702 {
1703 CFastMutexGuard guard(m_ConfigureLock);
1704
1705 if (m_Queues.empty())
1706 return CRef<CQueue>(NULL);
1707
1708 for (TQueueInfo::iterator it = m_Queues.begin();
1709 it != m_Queues.end(); ++it) {
1710 if (it->first == current_name) {
1711 ++it;
1712 if (it == m_Queues.end())
1713 return CRef<CQueue>(NULL);
1714 return it->second.second;
1715 }
1716 }
1717
1718 // May not really happen. Let's have just in case.
1719 return CRef<CQueue>(NULL);
1720 }
1721
1722
1723 // Purges jobs from a queue starting from the given status.
1724 // Returns true if the purge should be stopped.
1725 // The status argument is a status to start from
x_PurgeQueue(CQueue & queue,size_t status,size_t status_to_end,unsigned int start_job_id,unsigned int end_job_id,size_t max_scanned,size_t max_mark_deleted,const CNSPreciseTime & current_time,size_t & total_scanned,size_t & total_mark_deleted)1726 bool CQueueDataBase::x_PurgeQueue(CQueue & queue,
1727 size_t status,
1728 size_t status_to_end,
1729 unsigned int start_job_id,
1730 unsigned int end_job_id,
1731 size_t max_scanned,
1732 size_t max_mark_deleted,
1733 const CNSPreciseTime & current_time,
1734 size_t & total_scanned,
1735 size_t & total_mark_deleted)
1736 {
1737 SPurgeAttributes purge_io;
1738
1739 for (; status <= status_to_end; ++status) {
1740 purge_io.scans = max_scanned - total_scanned;
1741 purge_io.deleted = max_mark_deleted - total_mark_deleted;
1742 purge_io.job_id = start_job_id;
1743
1744 purge_io = queue.CheckJobsExpiry(current_time, purge_io,
1745 end_job_id,
1746 statuses_to_delete_from[status]);
1747 total_scanned += purge_io.scans;
1748 total_mark_deleted += purge_io.deleted;
1749 m_PurgeJobScanned = purge_io.job_id;
1750
1751 if (x_CheckStopPurge())
1752 return true;
1753
1754 if (total_mark_deleted >= max_mark_deleted ||
1755 total_scanned >= max_scanned) {
1756 m_PurgeStatusIndex = status;
1757 return false;
1758 }
1759 }
1760 m_PurgeStatusIndex = 0;
1761 m_PurgeJobScanned = 0;
1762 return false;
1763 }
1764
1765
x_CreateCrashFlagFile(void)1766 void CQueueDataBase::x_CreateCrashFlagFile(void)
1767 {
1768 try {
1769 CFile crash_file(CFile::MakePath(m_DataPath,
1770 kCrashFlagFileName));
1771 if (!crash_file.Exists()) {
1772 CFileIO f;
1773 f.Open(CFile::MakePath(m_DataPath, kCrashFlagFileName),
1774 CFileIO_Base::eCreate,
1775 CFileIO_Base::eReadWrite);
1776 f.Close();
1777 }
1778 }
1779 catch (...) {
1780 ERR_POST("Error creating crash detection file.");
1781 }
1782 }
1783
1784
x_RemoveCrashFlagFile(void)1785 void CQueueDataBase::x_RemoveCrashFlagFile(void)
1786 {
1787 try {
1788 CFile crash_file(CFile::MakePath(m_DataPath,
1789 kCrashFlagFileName));
1790 if (crash_file.Exists())
1791 crash_file.Remove();
1792 }
1793 catch (...) {
1794 ERR_POST("Error removing crash detection file. When the server "
1795 "restarts it will re-initialize the database.");
1796 }
1797 }
1798
1799
x_DoesCrashFlagFileExist(void) const1800 bool CQueueDataBase::x_DoesCrashFlagFileExist(void) const
1801 {
1802 return CFile(CFile::MakePath(m_DataPath, kCrashFlagFileName)).Exists();
1803 }
1804
1805
x_CreateDumpErrorFlagFile(void)1806 void CQueueDataBase::x_CreateDumpErrorFlagFile(void)
1807 {
1808 try {
1809 CFile crash_file(CFile::MakePath(m_DataPath,
1810 kDumpErrorFlagFileName));
1811 if (!crash_file.Exists()) {
1812 CFileIO f;
1813 f.Open(CFile::MakePath(m_DataPath, kDumpErrorFlagFileName),
1814 CFileIO_Base::eCreate,
1815 CFileIO_Base::eReadWrite);
1816 f.Close();
1817 }
1818 }
1819 catch (...) {
1820 ERR_POST("Error creating dump error detection file.");
1821 }
1822 }
1823
1824
x_DoesDumpErrorFlagFileExist(void) const1825 bool CQueueDataBase::x_DoesDumpErrorFlagFileExist(void) const
1826 {
1827 return CFile(CFile::MakePath(m_DataPath, kDumpErrorFlagFileName)).Exists();
1828 }
1829
1830
x_RemoveDumpErrorFlagFile(void)1831 void CQueueDataBase::x_RemoveDumpErrorFlagFile(void)
1832 {
1833 if (!m_Diskless) {
1834 try {
1835 CFile crash_file(CFile::MakePath(m_DataPath,
1836 kDumpErrorFlagFileName));
1837 if (crash_file.Exists())
1838 crash_file.Remove();
1839 }
1840 catch (...) {
1841 ERR_POST("Error removing dump error detection file. When the server "
1842 "restarts it will set an alert.");
1843 }
1844 }
1845 }
1846
1847
x_PurgeUnconditional(void)1848 unsigned int CQueueDataBase::x_PurgeUnconditional(void) {
1849 // Purge unconditional jobs
1850 unsigned int del_rec = 0;
1851 unsigned int max_deleted = m_Server->GetDeleteBatchSize();
1852
1853
1854 for (unsigned int index = 0; ; ++index) {
1855 CRef<CQueue> queue = x_GetQueueAt(index);
1856 if (queue.IsNull())
1857 break;
1858 del_rec += queue->DeleteBatch(max_deleted - del_rec);
1859 if (del_rec >= max_deleted)
1860 break;
1861 }
1862 return del_rec;
1863 }
1864
1865
1866 void
x_OptimizeStatusMatrix(const CNSPreciseTime & current_time)1867 CQueueDataBase::x_OptimizeStatusMatrix(const CNSPreciseTime & current_time)
1868 {
1869 // optimize memory every 15 min. or after 1 million of deleted records
1870 static const int kMemFree_Delay = 15 * 60;
1871 static const unsigned kRecordThreshold = 1000000;
1872
1873 if ((m_FreeStatusMemCnt > kRecordThreshold) ||
1874 (m_LastFreeMem + kMemFree_Delay <= current_time)) {
1875 m_FreeStatusMemCnt = 0;
1876 m_LastFreeMem = current_time;
1877
1878 for (unsigned int index = 0; ; ++index) {
1879 CRef<CQueue> queue = x_GetQueueAt(index);
1880 if (queue.IsNull())
1881 break;
1882 queue->OptimizeMem();
1883 if (x_CheckStopPurge())
1884 break;
1885 }
1886 }
1887 }
1888
1889
StopPurge(void)1890 void CQueueDataBase::StopPurge(void)
1891 {
1892 // No need to guard, this operation is thread safe
1893 m_StopPurge = true;
1894 }
1895
1896
x_CheckStopPurge(void)1897 bool CQueueDataBase::x_CheckStopPurge(void)
1898 {
1899 CFastMutexGuard guard(m_PurgeLock);
1900 bool stop_purge = m_StopPurge;
1901
1902 m_StopPurge = false;
1903 return stop_purge;
1904 }
1905
1906
RunPurgeThread(void)1907 void CQueueDataBase::RunPurgeThread(void)
1908 {
1909 double purge_timeout = m_Server->GetPurgeTimeout();
1910 unsigned int sec_delay = purge_timeout;
1911 unsigned int nanosec_delay = (purge_timeout - sec_delay)*1000000000;
1912
1913 m_PurgeThread.Reset(new CJobQueueCleanerThread(
1914 m_Host, *this, sec_delay, nanosec_delay,
1915 m_Server->IsLogCleaningThread()));
1916 m_PurgeThread->Run();
1917 }
1918
1919
StopPurgeThread(void)1920 void CQueueDataBase::StopPurgeThread(void)
1921 {
1922 if (!m_PurgeThread.Empty()) {
1923 StopPurge();
1924 m_PurgeThread->RequestStop();
1925 m_PurgeThread->Join();
1926 m_PurgeThread.Reset(0);
1927 }
1928 }
1929
1930
PurgeAffinities(void)1931 void CQueueDataBase::PurgeAffinities(void)
1932 {
1933 for (unsigned int index = 0; ; ++index) {
1934 CRef<CQueue> queue = x_GetQueueAt(index);
1935 if (queue.IsNull())
1936 break;
1937 queue->PurgeAffinities();
1938 if (x_CheckStopPurge())
1939 break;
1940 }
1941 }
1942
1943
PurgeGroups(void)1944 void CQueueDataBase::PurgeGroups(void)
1945 {
1946 for (unsigned int index = 0; ; ++index) {
1947 CRef<CQueue> queue = x_GetQueueAt(index);
1948 if (queue.IsNull())
1949 break;
1950 queue->PurgeGroups();
1951 if (x_CheckStopPurge())
1952 break;
1953 }
1954 }
1955
1956
StaleWNodes(void)1957 void CQueueDataBase::StaleWNodes(void)
1958 {
1959 // Worker nodes have the last access time in seconds since 1970
1960 // so there is no need to purge them more often than once a second
1961 static CNSPreciseTime last_purge(0, 0);
1962 CNSPreciseTime current_time = CNSPreciseTime::Current();
1963
1964 if (current_time.Sec() == last_purge.Sec())
1965 return;
1966 last_purge = current_time;
1967
1968 for (unsigned int index = 0; ; ++index) {
1969 CRef<CQueue> queue = x_GetQueueAt(index);
1970 if (queue.IsNull())
1971 break;
1972 queue->StaleNodes(current_time);
1973 if (x_CheckStopPurge())
1974 break;
1975 }
1976 }
1977
1978
PurgeBlacklistedJobs(void)1979 void CQueueDataBase::PurgeBlacklistedJobs(void)
1980 {
1981 static CNSPreciseTime period(30, 0);
1982 static CNSPreciseTime last_time(0, 0);
1983 CNSPreciseTime current_time = CNSPreciseTime::Current();
1984
1985 // Run this check once in ten seconds
1986 if (current_time - last_time < period)
1987 return;
1988
1989 last_time = current_time;
1990
1991 for (unsigned int index = 0; ; ++index) {
1992 CRef<CQueue> queue = x_GetQueueAt(index);
1993 if (queue.IsNull())
1994 break;
1995 queue->PurgeBlacklistedJobs();
1996 if (x_CheckStopPurge())
1997 break;
1998 }
1999 }
2000
2001
PurgeClientRegistry(void)2002 void CQueueDataBase::PurgeClientRegistry(void)
2003 {
2004 static CNSPreciseTime period(5, 0);
2005 static CNSPreciseTime last_time(0, 0);
2006 CNSPreciseTime current_time = CNSPreciseTime::Current();
2007
2008 // Run this check once in five seconds
2009 if (current_time - last_time < period)
2010 return;
2011
2012 last_time = current_time;
2013
2014 for (unsigned int index = 0; ; ++index) {
2015 CRef<CQueue> queue = x_GetQueueAt(index);
2016 if (queue.IsNull())
2017 break;
2018 queue->PurgeClientRegistry(current_time);
2019 if (x_CheckStopPurge())
2020 break;
2021 }
2022 }
2023
2024
2025 // Safely provides a queue at the given index
x_GetQueueAt(unsigned int index)2026 CRef<CQueue> CQueueDataBase::x_GetQueueAt(unsigned int index)
2027 {
2028 unsigned int current_index = 0;
2029 CFastMutexGuard guard(m_ConfigureLock);
2030
2031 for (TQueueInfo::iterator k = m_Queues.begin();
2032 k != m_Queues.end(); ++k) {
2033 if (current_index == index)
2034 return k->second.second;
2035 ++current_index;
2036 }
2037 return CRef<CQueue>(NULL);
2038 }
2039
2040
RunNotifThread(void)2041 void CQueueDataBase::RunNotifThread(void)
2042 {
2043 // 10 times per second
2044 m_NotifThread.Reset(new CGetJobNotificationThread(
2045 *this, 0, 100000000,
2046 m_Server->IsLogNotificationThread()));
2047 m_NotifThread->Run();
2048 }
2049
2050
StopNotifThread(void)2051 void CQueueDataBase::StopNotifThread(void)
2052 {
2053 if (!m_NotifThread.Empty()) {
2054 m_NotifThread->RequestStop();
2055 m_NotifThread->Join();
2056 m_NotifThread.Reset(0);
2057 }
2058 }
2059
2060
WakeupNotifThread(void)2061 void CQueueDataBase::WakeupNotifThread(void)
2062 {
2063 if (!m_NotifThread.Empty())
2064 m_NotifThread->WakeUp();
2065 }
2066
2067
2068 CNSPreciseTime
SendExactNotifications(void)2069 CQueueDataBase::SendExactNotifications(void)
2070 {
2071 CNSPreciseTime next = kTimeNever;
2072 CNSPreciseTime from_queue;
2073
2074 for (unsigned int index = 0; ; ++index) {
2075 CRef<CQueue> queue = x_GetQueueAt(index);
2076 if (queue.IsNull())
2077 break;
2078 from_queue = queue->NotifyExactListeners();
2079 if (from_queue < next )
2080 next = from_queue;
2081 }
2082 return next;
2083 }
2084
2085
RunServiceThread(void)2086 void CQueueDataBase::RunServiceThread(void)
2087 {
2088 // Once in 100 seconds
2089 m_ServiceThread.Reset(new CServiceThread(
2090 *m_Server, m_Host, *this,
2091 m_Server->IsLogStatisticsThread(),
2092 m_Server->GetStatInterval(),
2093 m_Server->GetJobCountersInterval()));
2094 m_ServiceThread->Run();
2095 }
2096
2097
StopServiceThread(void)2098 void CQueueDataBase::StopServiceThread(void)
2099 {
2100 if (!m_ServiceThread.Empty()) {
2101 m_ServiceThread->RequestStop();
2102 m_ServiceThread->Join();
2103 m_ServiceThread.Reset(0);
2104 }
2105 }
2106
2107
RunExecutionWatcherThread(const CNSPreciseTime & run_delay)2108 void CQueueDataBase::RunExecutionWatcherThread(const CNSPreciseTime & run_delay)
2109 {
2110 m_ExeWatchThread.Reset(new CJobQueueExecutionWatcherThread(
2111 m_Host, *this,
2112 run_delay.Sec(), run_delay.NSec(),
2113 m_Server->IsLogExecutionWatcherThread()));
2114 m_ExeWatchThread->Run();
2115 }
2116
2117
StopExecutionWatcherThread(void)2118 void CQueueDataBase::StopExecutionWatcherThread(void)
2119 {
2120 if (!m_ExeWatchThread.Empty()) {
2121 m_ExeWatchThread->RequestStop();
2122 m_ExeWatchThread->Join();
2123 m_ExeWatchThread.Reset(0);
2124 }
2125 }
2126
2127
x_Dump()2128 void CQueueDataBase::x_Dump()
2129 {
2130 LOG_POST(Note << "Start dumping jobs");
2131
2132 // Create the directory if needed
2133 CDir dump_dir(m_DumpPath);
2134 if (!dump_dir.Exists())
2135 dump_dir.Create();
2136
2137 // Remove the file which reserves the disk space
2138 x_RemoveSpaceReserveFile();
2139
2140 // Walk all the queues and dump them
2141 // Note: no need for a lock because it is a shutdown time
2142 bool dump_error = false;
2143 set<string> dumped_queues;
2144 const string lbsm_test_queue("LBSMDTestQueue");
2145 for (TQueueInfo::iterator k = m_Queues.begin();
2146 k != m_Queues.end(); ++k) {
2147 if (NStr::CompareNocase(k->first, lbsm_test_queue) != 0) {
2148 try {
2149 k->second.second->Dump(m_DumpPath);
2150 dumped_queues.insert(k->first);
2151 } catch (const exception & ex) {
2152 dump_error = true;
2153 ERR_POST("Error dumping queue " << k->first << ": " <<
2154 ex.what());
2155 }
2156 }
2157 }
2158
2159 // Dump the required queue classes. The only classes required are those
2160 // which were used by dynamic queues. The dynamic queue classes may also
2161 // use linked sections
2162 set<string> classes_to_dump;
2163 set<string> linked_sections_to_dump;
2164 set<string> dynamic_queues_to_dump;
2165 for (TQueueInfo::iterator k = m_Queues.begin();
2166 k != m_Queues.end(); ++k) {
2167 if (NStr::CompareNocase(k->first, lbsm_test_queue) == 0)
2168 continue;
2169 if (k->second.second->GetQueueKind() == CQueue::eKindStatic)
2170 continue;
2171 if (dumped_queues.find(k->first) == dumped_queues.end())
2172 continue; // There was a dumping error
2173
2174 classes_to_dump.insert(k->second.first.qclass);
2175 for (map<string, string>::const_iterator
2176 j = k->second.first.linked_sections.begin();
2177 j != k->second.first.linked_sections.end(); ++j)
2178 linked_sections_to_dump.insert(j->second);
2179 dynamic_queues_to_dump.insert(k->first);
2180 }
2181
2182
2183 // Dump classes if so and linked sections if so
2184 if (!classes_to_dump.empty()) {
2185 string qclasses_dump_file_name = m_DumpPath +
2186 kQClassDescriptionFileName;
2187 string linked_sections_dump_file_name = m_DumpPath +
2188 kLinkedSectionsFileName;
2189 FILE * qclasses_dump_file = NULL;
2190 FILE * linked_sections_dump_file = NULL;
2191 try {
2192 qclasses_dump_file = fopen(qclasses_dump_file_name.c_str(), "wb");
2193 if (qclasses_dump_file == NULL)
2194 throw runtime_error("Cannot open file " +
2195 qclasses_dump_file_name);
2196 setbuf(qclasses_dump_file, NULL);
2197
2198 if (classes_to_dump.size() > 0 ||
2199 dynamic_queues_to_dump.size() > 0) {
2200 SOneStructDumpHeader header;
2201 header.fixed_size = sizeof(SQueueDescriptionDump);
2202 header.Write(qclasses_dump_file);
2203 }
2204
2205 // Dump dynamic queue classes
2206 for (set<string>::const_iterator k = classes_to_dump.begin();
2207 k != classes_to_dump.end(); ++k) {
2208 TQueueParams::const_iterator queue_class =
2209 m_QueueClasses.find(*k);
2210 x_DumpQueueOrClass(qclasses_dump_file, "", *k, false,
2211 queue_class->second);
2212 }
2213
2214 // Dump dynamic queues: qname and its class.
2215 for (set<string>::const_iterator
2216 k = dynamic_queues_to_dump.begin();
2217 k != dynamic_queues_to_dump.end(); ++k) {
2218 TQueueInfo::const_iterator q = m_Queues.find(*k);
2219 x_DumpQueueOrClass(qclasses_dump_file, *k,
2220 q->second.first.qclass, true,
2221 q->second.first);
2222 }
2223
2224 fclose(qclasses_dump_file);
2225 qclasses_dump_file = NULL;
2226
2227 // Dump linked sections if so
2228 if (!linked_sections_to_dump.empty()) {
2229 linked_sections_dump_file = fopen(
2230 linked_sections_dump_file_name.c_str(), "wb");
2231 if (linked_sections_dump_file == NULL)
2232 throw runtime_error("Cannot open file " +
2233 linked_sections_dump_file_name);
2234 setbuf(linked_sections_dump_file, NULL);
2235
2236 SOneStructDumpHeader header;
2237 header.fixed_size = sizeof(SLinkedSectionDump);
2238 header.Write(linked_sections_dump_file);
2239
2240 for (set<string>::const_iterator
2241 k = linked_sections_to_dump.begin();
2242 k != linked_sections_to_dump.end(); ++k) {
2243 map<string, map<string, string> >::const_iterator
2244 j = m_LinkedSections.find(*k);
2245 x_DumpLinkedSection(linked_sections_dump_file, *k,
2246 j->second);
2247 }
2248 fclose(linked_sections_dump_file);
2249 linked_sections_dump_file = NULL;
2250 }
2251 } catch (const exception & ex) {
2252 dump_error = true;
2253 ERR_POST("Error dumping dynamic queue classes and "
2254 "their linked sections. Dynamic queue dumps are lost.");
2255 if (qclasses_dump_file != NULL)
2256 fclose(qclasses_dump_file);
2257 if (linked_sections_dump_file != NULL)
2258 fclose(linked_sections_dump_file);
2259
2260 // Remove the classes and linked sections files
2261 if (access(qclasses_dump_file_name.c_str(), F_OK) != -1)
2262 remove(qclasses_dump_file_name.c_str());
2263 if (access(linked_sections_dump_file_name.c_str(), F_OK) != -1)
2264 remove(linked_sections_dump_file_name.c_str());
2265
2266 // Remove dynamic queues dumps
2267 for (set<string>::const_iterator
2268 k = dynamic_queues_to_dump.begin();
2269 k != dynamic_queues_to_dump.end(); ++k) {
2270 m_Queues[*k].second->RemoveDump(m_DumpPath);
2271 }
2272 }
2273 }
2274
2275 if (!dump_error)
2276 x_RemoveDumpErrorFlagFile();
2277
2278 LOG_POST(Note << "Dumping jobs finished");
2279 }
2280
2281
x_DumpQueueOrClass(FILE * f,const string & qname,const string & qclass,bool is_queue,const SQueueParameters & params)2282 void CQueueDataBase::x_DumpQueueOrClass(FILE * f,
2283 const string & qname,
2284 const string & qclass,
2285 bool is_queue,
2286 const SQueueParameters & params)
2287 {
2288 SQueueDescriptionDump descr_dump;
2289
2290 descr_dump.is_queue = is_queue;
2291 descr_dump.qname_size = qname.size();
2292 memcpy(descr_dump.qname, qname.data(), qname.size());
2293 descr_dump.qclass_size = qclass.size();
2294 memcpy(descr_dump.qclass, qclass.data(), qclass.size());
2295
2296 if (!is_queue) {
2297 // The other parameters are required for the queue classes only
2298 descr_dump.timeout = (double)params.timeout;
2299 descr_dump.notif_hifreq_interval = (double)params.notif_hifreq_interval;
2300 descr_dump.notif_hifreq_period = (double)params.notif_hifreq_period;
2301 descr_dump.notif_lofreq_mult = params.notif_lofreq_mult;
2302 descr_dump.notif_handicap = (double)params.notif_handicap;
2303 descr_dump.dump_buffer_size = params.dump_buffer_size;
2304 descr_dump.dump_client_buffer_size = params.dump_client_buffer_size;
2305 descr_dump.dump_aff_buffer_size = params.dump_aff_buffer_size;
2306 descr_dump.dump_group_buffer_size = params.dump_group_buffer_size;
2307 descr_dump.run_timeout = (double)params.run_timeout;
2308 descr_dump.read_timeout = (double)params.read_timeout;
2309 descr_dump.program_name_size = params.program_name.size();
2310 memcpy(descr_dump.program_name, params.program_name.data(),
2311 params.program_name.size());
2312 descr_dump.failed_retries = params.failed_retries;
2313 descr_dump.read_failed_retries = params.read_failed_retries;
2314 descr_dump.max_jobs_per_client = params.max_jobs_per_client;
2315 descr_dump.blacklist_time = (double)params.blacklist_time;
2316 descr_dump.read_blacklist_time = (double)params.read_blacklist_time;
2317 descr_dump.max_input_size = params.max_input_size;
2318 descr_dump.max_output_size = params.max_output_size;
2319 descr_dump.subm_hosts_size = params.subm_hosts.size();
2320 memcpy(descr_dump.subm_hosts, params.subm_hosts.data(),
2321 params.subm_hosts.size());
2322 descr_dump.wnode_hosts_size = params.wnode_hosts.size();
2323 memcpy(descr_dump.wnode_hosts, params.wnode_hosts.data(),
2324 params.wnode_hosts.size());
2325 descr_dump.reader_hosts_size = params.reader_hosts.size();
2326 memcpy(descr_dump.reader_hosts, params.reader_hosts.data(),
2327 params.reader_hosts.size());
2328 descr_dump.wnode_timeout = (double)params.wnode_timeout;
2329 descr_dump.reader_timeout = (double)params.reader_timeout;
2330 descr_dump.pending_timeout = (double)params.pending_timeout;
2331 descr_dump.max_pending_wait_timeout =
2332 (double)params.max_pending_wait_timeout;
2333 descr_dump.max_pending_read_wait_timeout =
2334 (double)params.max_pending_read_wait_timeout;
2335 descr_dump.description_size = params.description.size();
2336 memcpy(descr_dump.description, params.description.data(),
2337 params.description.size());
2338 descr_dump.scramble_job_keys = params.scramble_job_keys;
2339 descr_dump.client_registry_timeout_worker_node =
2340 (double)params.client_registry_timeout_worker_node;
2341 descr_dump.client_registry_min_worker_nodes =
2342 params.client_registry_min_worker_nodes;
2343 descr_dump.client_registry_timeout_admin =
2344 (double)params.client_registry_timeout_admin;
2345 descr_dump.client_registry_min_admins =
2346 params.client_registry_min_admins;
2347 descr_dump.client_registry_timeout_submitter =
2348 (double)params.client_registry_timeout_submitter;
2349 descr_dump.client_registry_min_submitters =
2350 params.client_registry_min_submitters;
2351 descr_dump.client_registry_timeout_reader =
2352 (double)params.client_registry_timeout_reader;
2353 descr_dump.client_registry_min_readers =
2354 params.client_registry_min_readers;
2355 descr_dump.client_registry_timeout_unknown =
2356 (double)params.client_registry_timeout_unknown;
2357 descr_dump.client_registry_min_unknowns =
2358 params.client_registry_min_unknowns;
2359
2360 // Dump the linked sections prefixes and names in the same order
2361 string prefixes;
2362 string names;
2363 for (map<string, string>::const_iterator
2364 k = params.linked_sections.begin();
2365 k != params.linked_sections.end(); ++k) {
2366 if (!prefixes.empty()) {
2367 prefixes += ",";
2368 names += ",";
2369 }
2370 prefixes += k->first;
2371 names += k->second;
2372 }
2373 descr_dump.linked_section_prefixes_size = prefixes.size();
2374 memcpy(descr_dump.linked_section_prefixes, prefixes.data(),
2375 prefixes.size());
2376 descr_dump.linked_section_names_size = names.size();
2377 memcpy(descr_dump.linked_section_names, names.data(), names.size());
2378 }
2379
2380 try {
2381 descr_dump.Write(f);
2382 } catch (const exception & ex) {
2383 string msg = "Writing error while dumping queue ";
2384 if (is_queue)
2385 msg += qname;
2386 else
2387 msg += "class " + qclass;
2388 msg += string(": ") + ex.what();
2389 throw runtime_error(msg);
2390 }
2391 }
2392
2393
x_DumpLinkedSection(FILE * f,const string & sname,const map<string,string> & values)2394 void CQueueDataBase::x_DumpLinkedSection(FILE * f, const string & sname,
2395 const map<string, string> & values)
2396 {
2397 for (map<string, string>::const_iterator k = values.begin();
2398 k != values.end(); ++k) {
2399 SLinkedSectionDump section_dump;
2400
2401 section_dump.section_size = sname.size();
2402 memcpy(section_dump.section, sname.data(), sname.size());
2403 section_dump.value_name_size = k->first.size();
2404 memcpy(section_dump.value_name, k->first.data(), k->first.size());
2405 section_dump.value_size = k->second.size();
2406 memcpy(section_dump.value, k->second.data(), k->second.size());
2407
2408 try {
2409 section_dump.Write(f);
2410 } catch (const exception & ex) {
2411 throw runtime_error("Writing error while dumping linked section " +
2412 sname + " values: " + ex.what());
2413 }
2414 }
2415 }
2416
2417
x_RemoveDump(void)2418 void CQueueDataBase::x_RemoveDump(void)
2419 {
2420 try {
2421 CDir dump_dir(m_DumpPath);
2422 if (dump_dir.Exists())
2423 dump_dir.Remove();
2424 } catch (const exception & ex) {
2425 ERR_POST("Error removing the dump directory: " << ex.what());
2426 } catch (...) {
2427 ERR_POST("Unknown error removing the dump directory");
2428 }
2429 }
2430
2431
2432 // Removes unnecessary files in the data directory
x_RemoveDataFiles(void)2433 void CQueueDataBase::x_RemoveDataFiles(void)
2434 {
2435 CDir data_dir(m_DataPath);
2436 if (!data_dir.Exists())
2437 return;
2438
2439 CDir::TEntries entries = data_dir.GetEntries(
2440 kEmptyStr, CDir::fIgnoreRecursive);
2441 for (CDir::TEntries::const_iterator k = entries.begin();
2442 k != entries.end(); ++k) {
2443 if ((*k)->IsDir())
2444 continue;
2445 if ((*k)->IsLink())
2446 continue;
2447 string entryName = (*k)->GetName();
2448 if (entryName == kDBStorageVersionFileName ||
2449 entryName == kNodeIDFileName ||
2450 entryName == kStartJobIDsFileName ||
2451 entryName == kCrashFlagFileName ||
2452 entryName == kDumpErrorFlagFileName ||
2453 entryName == kPausedQueuesFilesName ||
2454 entryName == kRefuseSubmitFileName)
2455 continue;
2456
2457 CFile f(m_DataPath + entryName);
2458 try {
2459 f.Remove();
2460 } catch (...) {}
2461 }
2462 }
2463
2464
2465 // Logs the corresponding message if needed and provides the overall reinit
2466 // status.
x_CheckOpenPreconditions(bool reinit)2467 bool CQueueDataBase::x_CheckOpenPreconditions(bool reinit)
2468 {
2469 if (x_DoesCrashFlagFileExist()) {
2470 ERR_POST("Reinitialization due to the server "
2471 "did not stop gracefully last time. "
2472 << m_DataPath << " removed.");
2473 m_Server->RegisterAlert(eStartAfterCrash, "Database has been "
2474 "reinitialized due to the server did not "
2475 "stop gracefully last time");
2476 return true;
2477 }
2478
2479 if (reinit) {
2480 LOG_POST(Note << "Reinitialization due to a command line option. "
2481 << m_DataPath << " removed.");
2482 m_Server->RegisterAlert(eReinit, "Database has been reinitialized due "
2483 "to a command line option");
2484 return true;
2485 }
2486
2487 if (CDir(m_DataPath).Exists()) {
2488 bool ver_file_exists = CFile(m_DataPath +
2489 kDBStorageVersionFileName).Exists();
2490 bool dump_dir_exists = CDir(m_DumpPath).Exists();
2491
2492 if (dump_dir_exists && !ver_file_exists) {
2493 // Strange. Some service file exist while the storage version
2494 // does not. It might be that the data dir has been altered
2495 // manually. Let's not start.
2496 NCBI_THROW(CNetScheduleException, eInternalError,
2497 "Error detected: Storage version file is not found "
2498 "while the dump directory exists.");
2499 }
2500
2501 if (dump_dir_exists && ver_file_exists) {
2502 CFileIO f;
2503 char buf[1024];
2504 size_t read_count = 0;
2505
2506 // Expected 1 or 2 lines.
2507 // The first one is a storage version
2508 // The second is a server version (used starting from 4.31.0)
2509 f.Open(m_DataPath + kDBStorageVersionFileName,
2510 CFileIO_Base::eOpen, CFileIO_Base::eRead);
2511 read_count = f.Read(buf, sizeof(buf) - 1);
2512 f.Close();
2513
2514 buf[read_count] = '\0';
2515 vector<string> lines;
2516 NStr::Split(buf, "\n", lines);
2517
2518 if (lines.size() > 2) {
2519 ERR_POST(Message << Warning <<
2520 "Unexpected format of the storage version file: more "
2521 "than 2 lines found");
2522 } else if (lines.size() == 0) {
2523 ERR_POST(Message << Warning <<
2524 "Unexpected format of the storage version file: no "
2525 "lines found");
2526 }
2527
2528 CNcbiApplication * app = CNcbiApplication::Instance();
2529 CVersionInfo ver_info = app->GetVersion();
2530 string server_version = ver_info.Print();
2531
2532 if (lines.size() >= 2) {
2533 if (server_version != lines[1]) {
2534 ERR_POST(Message << Warning <<
2535 "NetSchedule version has changed. "
2536 "Previous server version: " << lines[1] <<
2537 " Current version: " << server_version);
2538 }
2539 } else if (lines.size() == 1) {
2540 ERR_POST(Message << Warning <<
2541 "NetSchedule version has changed. "
2542 "Previous server version is unknown. "
2543 " Current version: " << server_version);
2544 }
2545
2546 if (lines.size() >= 1) {
2547 if (lines[0] != NETSCHEDULED_STORAGE_VERSION) {
2548 ERR_POST(Message << Warning <<
2549 "Storage version mismatch detected. "
2550 "Dumped version: " << lines[0] <<
2551 " Current version: " NETSCHEDULED_STORAGE_VERSION);
2552 }
2553 }
2554 }
2555
2556 if (!dump_dir_exists && ver_file_exists) {
2557 LOG_POST(Note << "Non-empty data directory exists however the "
2558 << kDumpSubdirName
2559 << " subdirectory is not found");
2560 m_Server->RegisterAlert(eNoDump,
2561 "Non-empty data directory exists "
2562 "however the " + kDumpSubdirName +
2563 " subdirectory is not found");
2564 }
2565 }
2566
2567 if (x_DoesDumpErrorFlagFileExist()) {
2568 string msg = "The previous instance of the server had problems with "
2569 "dumping the information on the disk. Some queues may be "
2570 "not restored. "
2571 "See the previous instance log for details.";
2572 LOG_POST(Note << msg);
2573 m_Server->RegisterAlert(eDumpError, msg);
2574 }
2575
2576 return false;
2577 }
2578
2579
x_CreateStorageVersionFile(void)2580 void CQueueDataBase::x_CreateStorageVersionFile(void)
2581 {
2582 CNcbiApplication * app = CNcbiApplication::Instance();
2583 CVersionInfo ver_info = app->GetVersion();
2584 string server_version = ver_info.Print();
2585
2586 CFileIO f;
2587 f.Open(m_DataPath + kDBStorageVersionFileName, CFileIO_Base::eCreate,
2588 CFileIO_Base::eReadWrite);
2589 f.Write(NETSCHEDULED_STORAGE_VERSION, strlen(NETSCHEDULED_STORAGE_VERSION));
2590 f.Write("\n", 1);
2591 f.Write(server_version.data(), server_version.size());
2592 f.Close();
2593 }
2594
2595
2596 void
x_ReadDumpQueueDesrc(set<string,PNocase> & dump_static_queues,map<string,string,PNocase> & dump_dynamic_queues,TQueueParams & dump_queue_classes)2597 CQueueDataBase::x_ReadDumpQueueDesrc(set<string, PNocase> & dump_static_queues,
2598 map<string, string,
2599 PNocase> & dump_dynamic_queues,
2600 TQueueParams & dump_queue_classes)
2601 {
2602 CDir dump_dir(m_DumpPath);
2603 if (!dump_dir.Exists())
2604 return;
2605
2606 // The file contains dynamic queue classes and dynamic queue names
2607 string queue_desrc_file_name = m_DumpPath + kQClassDescriptionFileName;
2608 CFile queue_desrc_file(queue_desrc_file_name);
2609 if (queue_desrc_file.Exists()) {
2610 FILE * f = fopen(queue_desrc_file_name.c_str(), "rb");
2611 if (f == NULL)
2612 throw runtime_error("Cannot open the existing dump file "
2613 "for reading: " + queue_desrc_file_name);
2614
2615 SQueueDescriptionDump dump_struct;
2616 try {
2617 SOneStructDumpHeader header;
2618 header.Read(f);
2619
2620 while (dump_struct.Read(f, header.fixed_size) == 0) {
2621 if (dump_struct.is_queue) {
2622 string qname(dump_struct.qname, dump_struct.qname_size);
2623 string qclass(dump_struct.qclass, dump_struct.qclass_size);
2624 dump_dynamic_queues[qname] = qclass;
2625 } else {
2626 SQueueParameters p;
2627 p.qclass = "";
2628 p.timeout = CNSPreciseTime(dump_struct.timeout);
2629 p.notif_hifreq_interval =
2630 CNSPreciseTime(dump_struct.notif_hifreq_interval);
2631 p.notif_hifreq_period =
2632 CNSPreciseTime(dump_struct.notif_hifreq_period);
2633 p.notif_lofreq_mult = dump_struct.notif_lofreq_mult;
2634 p.notif_handicap =
2635 CNSPreciseTime(dump_struct.notif_handicap);
2636 p.dump_buffer_size = dump_struct.dump_buffer_size;
2637 p.dump_client_buffer_size =
2638 dump_struct.dump_client_buffer_size;
2639 p.dump_aff_buffer_size = dump_struct.dump_aff_buffer_size;
2640 p.dump_group_buffer_size =
2641 dump_struct.dump_group_buffer_size;
2642 p.run_timeout = CNSPreciseTime(dump_struct.run_timeout);
2643 p.read_timeout = CNSPreciseTime(dump_struct.read_timeout);
2644 p.program_name = string(dump_struct.program_name,
2645 dump_struct.program_name_size);
2646 p.failed_retries = dump_struct.failed_retries;
2647 p.read_failed_retries = dump_struct.read_failed_retries;
2648 p.max_jobs_per_client = dump_struct.max_jobs_per_client;
2649 p.blacklist_time =
2650 CNSPreciseTime(dump_struct.blacklist_time);
2651 p.read_blacklist_time =
2652 CNSPreciseTime(dump_struct.read_blacklist_time);
2653 p.max_input_size = dump_struct.max_input_size;
2654 p.max_output_size = dump_struct.max_output_size;
2655 p.subm_hosts = string(dump_struct.subm_hosts,
2656 dump_struct.subm_hosts_size);
2657 p.wnode_hosts = string(dump_struct.wnode_hosts,
2658 dump_struct.wnode_hosts_size);
2659 p.reader_hosts = string(dump_struct.reader_hosts,
2660 dump_struct.reader_hosts_size);
2661 p.wnode_timeout =
2662 CNSPreciseTime(dump_struct.wnode_timeout);
2663 p.reader_timeout =
2664 CNSPreciseTime(dump_struct.reader_timeout);
2665 p.pending_timeout =
2666 CNSPreciseTime(dump_struct.pending_timeout);
2667 p.max_pending_wait_timeout =
2668 CNSPreciseTime(dump_struct.max_pending_wait_timeout);
2669 p.max_pending_read_wait_timeout =
2670 CNSPreciseTime(dump_struct.
2671 max_pending_read_wait_timeout);
2672 p.description = string(dump_struct.description,
2673 dump_struct.description_size);
2674 p.scramble_job_keys = dump_struct.scramble_job_keys;
2675 p.client_registry_timeout_worker_node =
2676 CNSPreciseTime(dump_struct.
2677 client_registry_timeout_worker_node);
2678 p.client_registry_min_worker_nodes =
2679 dump_struct.client_registry_min_worker_nodes;
2680 p.client_registry_timeout_admin =
2681 CNSPreciseTime(dump_struct.
2682 client_registry_timeout_admin);
2683 p.client_registry_min_admins =
2684 dump_struct.client_registry_min_admins;
2685 p.client_registry_timeout_submitter =
2686 CNSPreciseTime(dump_struct.
2687 client_registry_timeout_submitter);
2688 p.client_registry_min_submitters =
2689 dump_struct.client_registry_min_submitters;
2690 p.client_registry_timeout_reader =
2691 CNSPreciseTime(dump_struct.
2692 client_registry_timeout_reader);
2693 p.client_registry_min_readers =
2694 dump_struct.client_registry_min_readers;
2695 p.client_registry_timeout_unknown =
2696 CNSPreciseTime(dump_struct.
2697 client_registry_timeout_unknown);
2698 p.client_registry_min_unknowns =
2699 dump_struct.client_registry_min_unknowns;;
2700
2701 // Unpack linked sections
2702 string dump_prefs(dump_struct.
2703 linked_section_prefixes,
2704 dump_struct.
2705 linked_section_prefixes_size);
2706 string dump_names(dump_struct.
2707 linked_section_names,
2708 dump_struct.
2709 linked_section_names_size);
2710 list<string> prefixes;
2711 list<string> names;
2712 NStr::Split(dump_prefs, ",", prefixes,
2713 NStr::fSplit_MergeDelimiters |
2714 NStr::fSplit_Truncate);
2715 NStr::Split(dump_names, ",", names,
2716 NStr::fSplit_MergeDelimiters |
2717 NStr::fSplit_Truncate);
2718 list<string>::const_iterator pref_it = prefixes.begin();
2719 list<string>::const_iterator names_it = names.begin();
2720 for ( ; pref_it != prefixes.end() &&
2721 names_it != names.end(); ++pref_it, ++names_it)
2722 p.linked_sections[*pref_it] = *names_it;
2723
2724 string qclass(dump_struct.qclass, dump_struct.qclass_size);
2725 dump_queue_classes[qclass] = p;
2726 }
2727 }
2728 } catch (const exception & ex) {
2729 fclose(f);
2730 throw;
2731 }
2732 fclose(f);
2733 }
2734
2735
2736 CDir::TEntries entries = dump_dir.GetEntries(
2737 kEmptyStr, CDir::fIgnoreRecursive);
2738 for (CDir::TEntries::const_iterator k = entries.begin();
2739 k != entries.end(); ++k) {
2740 if ((*k)->IsDir())
2741 continue;
2742 if ((*k)->IsLink())
2743 continue;
2744 string entry_name = (*k)->GetName();
2745 if (!NStr::StartsWith(entry_name, "db_dump."))
2746 continue;
2747
2748 string prefix;
2749 string qname;
2750 NStr::SplitInTwo(entry_name, ".", prefix, qname);
2751 if (dump_dynamic_queues.find(qname) == dump_dynamic_queues.end())
2752 dump_static_queues.insert(qname);
2753 }
2754 }
2755
2756
x_GetConfigQueues(void)2757 set<string, PNocase> CQueueDataBase::x_GetConfigQueues(void)
2758 {
2759 const CNcbiRegistry & reg = CNcbiApplication::Instance()->GetConfig();
2760 set<string, PNocase> queues;
2761 list<string> sections;
2762
2763 reg.EnumerateSections(§ions);
2764 for (list<string>::const_iterator k = sections.begin();
2765 k != sections.end(); ++k) {
2766 string queue_name;
2767 string prefix;
2768 const string & section_name = *k;
2769
2770 NStr::SplitInTwo(section_name, "_", prefix, queue_name);
2771 if (NStr::CompareNocase(prefix, "queue") != 0)
2772 continue;
2773 if (queue_name.empty())
2774 continue;
2775 if (queue_name.size() > kMaxQueueNameSize - 1)
2776 continue;
2777 queues.insert(queue_name);
2778 }
2779
2780 return queues;
2781 }
2782
2783
x_AppendDumpLinkedSections(void)2784 void CQueueDataBase::x_AppendDumpLinkedSections(void)
2785 {
2786 // Here: the m_LinkedSections has already been read from the configuration
2787 // file. Let's append the sections from the dump.
2788 CDir dump_dir(m_DumpPath);
2789 if (!dump_dir.Exists())
2790 return;
2791
2792 string linked_sections_file_name = m_DumpPath + kLinkedSectionsFileName;
2793 CFile linked_sections_file(linked_sections_file_name);
2794
2795 if (linked_sections_file.Exists()) {
2796 FILE * f = fopen(linked_sections_file_name.c_str(), "rb");
2797 if (f == NULL)
2798 throw runtime_error("Cannot open the existing dump file "
2799 "for reading: " + linked_sections_file_name);
2800 SLinkedSectionDump dump_struct;
2801 map<string, map<string, string> > dump_sections;
2802 try {
2803 SOneStructDumpHeader header;
2804 header.Read(f);
2805
2806 while (dump_struct.Read(f, header.fixed_size) == 0) {
2807 if (m_LinkedSections.find(dump_struct.section) !=
2808 m_LinkedSections.end())
2809 continue;
2810 string sname(dump_struct.section, dump_struct.section_size);
2811 string vname(dump_struct.value_name,
2812 dump_struct.value_name_size);
2813 string val(dump_struct.value, dump_struct.value_size);
2814 dump_sections[sname][vname] = val;
2815 }
2816 } catch (const exception & ex) {
2817 fclose(f);
2818 throw;
2819 }
2820 fclose(f);
2821
2822 m_LinkedSections.insert(dump_sections.begin(), dump_sections.end());
2823 }
2824 }
2825
2826
x_BackupDump(void)2827 void CQueueDataBase::x_BackupDump(void)
2828 {
2829 CDir dump_dir(m_DumpPath);
2830 if (!dump_dir.Exists())
2831 return;
2832
2833 size_t backup_number = 0;
2834 string backup_dir_name;
2835 for ( ; ; ) {
2836 backup_dir_name = CDirEntry::DeleteTrailingPathSeparator(m_DumpPath) +
2837 "." + to_string(backup_number);
2838 if (!CDir(backup_dir_name).Exists())
2839 break;
2840 ++backup_number;
2841 }
2842
2843 try {
2844 dump_dir.Rename(backup_dir_name);
2845 } catch (const exception & ex) {
2846 ERR_POST("Error renaming the dump directory: " << ex.what());
2847 } catch (...) {
2848 ERR_POST("Unknown error renaming the dump directory");
2849 }
2850 }
2851
2852
x_GetDumpSpaceFileName(void) const2853 string CQueueDataBase::x_GetDumpSpaceFileName(void) const
2854 {
2855 return m_DumpPath + kDumpReservedSpaceFileName;
2856 }
2857
2858
x_RemoveSpaceReserveFile(void)2859 bool CQueueDataBase::x_RemoveSpaceReserveFile(void)
2860 {
2861 CFile space_file(x_GetDumpSpaceFileName());
2862 if (space_file.Exists()) {
2863 try {
2864 space_file.Remove();
2865 } catch (const exception & ex) {
2866 string msg = "Error removing reserving dump space file: " +
2867 string(ex.what());
2868 ERR_POST(msg);
2869 m_Server->RegisterAlert(eDumpSpaceError, msg);
2870 return false;
2871 }
2872 }
2873 return true;
2874 }
2875
2876
x_CreateSpaceReserveFile(void)2877 void CQueueDataBase::x_CreateSpaceReserveFile(void)
2878 {
2879 unsigned int space = m_Server->GetReserveDumpSpace();
2880 if (space == 0)
2881 return;
2882
2883 CDir dump_dir(m_DumpPath);
2884 if (!dump_dir.Exists()) {
2885 try {
2886 dump_dir.Create();
2887 } catch (const exception & ex) {
2888 string msg = "Error creating dump directory: " + string(ex.what());
2889 ERR_POST(msg);
2890 m_Server->RegisterAlert(eDumpSpaceError, msg);
2891 return;
2892 }
2893 }
2894
2895 // This will truncate the file if it existed
2896 FILE * space_file = fopen(x_GetDumpSpaceFileName().c_str(), "w");
2897 if (space_file == NULL) {
2898 string msg = "Error opening reserving dump space file " +
2899 x_GetDumpSpaceFileName();
2900 ERR_POST(msg);
2901 m_Server->RegisterAlert(eDumpSpaceError, msg);
2902 return;
2903 }
2904
2905 void * buffer = malloc(kDumpReservedSpaceFileBuffer);
2906 if (buffer == NULL) {
2907 fclose(space_file);
2908 string msg = "Error creating a memory buffer to write into the "
2909 "reserving dump space file";
2910 ERR_POST(msg);
2911 m_Server->RegisterAlert(eDumpSpaceError, msg);
2912 return;
2913 }
2914
2915 memset(buffer, 0, kDumpReservedSpaceFileBuffer);
2916 while (space > kDumpReservedSpaceFileBuffer) {
2917 errno = 0;
2918 if (fwrite(buffer, kDumpReservedSpaceFileBuffer, 1, space_file) != 1) {
2919 free(buffer);
2920 fclose(space_file);
2921 string msg = "Error writing into the reserving dump space file: " +
2922 string(strerror(errno));
2923 ERR_POST(msg);
2924 m_Server->RegisterAlert(eDumpSpaceError, msg);
2925 return;
2926 }
2927 space -= kDumpReservedSpaceFileBuffer;
2928 }
2929
2930 if (space > 0) {
2931 errno = 0;
2932 if (fwrite(buffer, space, 1, space_file) != 1) {
2933 string msg = "Error writing into the reserving dump space file: " +
2934 string(strerror(errno));
2935 ERR_POST(msg);
2936 m_Server->RegisterAlert(eDumpSpaceError, msg);
2937 }
2938 }
2939
2940 free(buffer);
2941 fclose(space_file);
2942 }
2943
2944
2945 void
x_RestorePauseState(const map<string,int> & paused_queues)2946 CQueueDataBase::x_RestorePauseState(const map<string, int> & paused_queues)
2947 {
2948 CFastMutexGuard guard(m_ConfigureLock);
2949
2950 for (map<string, int>::const_iterator k = paused_queues.begin();
2951 k != paused_queues.end(); ++k) {
2952 string queue_name = k->first;
2953 int pause_mode = k->second;
2954
2955
2956 TQueueInfo::iterator existing = m_Queues.find(queue_name);
2957 if (existing == m_Queues.end()) {
2958 ERR_POST("Cannot restore the pause state of the " <<
2959 queue_name << " queue. The queue is not configured.");
2960 } else {
2961 existing->second.second->RestorePauseStatus(pause_mode);
2962 }
2963 }
2964 }
2965
2966
2967 void
x_RestoreRefuseSubmitState(const vector<string> & refuse_submit_queues)2968 CQueueDataBase::x_RestoreRefuseSubmitState(
2969 const vector<string> & refuse_submit_queues)
2970 {
2971 CFastMutexGuard guard(m_ConfigureLock);
2972
2973 for (vector<string>::const_iterator k = refuse_submit_queues.begin();
2974 k != refuse_submit_queues.end(); ++k) {
2975 string queue_name = *k;
2976
2977 if (NStr::CompareNocase(queue_name, "[server]") == 0) {
2978 m_Server->SetRefuseSubmits(true);
2979 continue;
2980 }
2981
2982 TQueueInfo::iterator existing = m_Queues.find(queue_name);
2983 if (existing == m_Queues.end()) {
2984 ERR_POST("Cannot restore the refuse submit state of the " <<
2985 queue_name << " queue. The queue is not configured.");
2986 } else {
2987 existing->second.second->SetRefuseSubmits(true);
2988 }
2989 }
2990 }
2991
2992 END_NCBI_SCOPE
2993