1 #ifndef NETSCHEDULE_QUEUE_DATABASE__HPP 2 #define NETSCHEDULE_QUEUE_DATABASE__HPP 3 4 5 /* $Id: queue_database.hpp 575173 2018-11-26 13:25:46Z satskyse $ 6 * =========================================================================== 7 * 8 * PUBLIC DOMAIN NOTICE 9 * National Center for Biotechnology Information 10 * 11 * This software/database is a "United States Government Work" under the 12 * terms of the United States Copyright Act. It was written as part of 13 * the author's official duties as a United States Government employee and 14 * thus cannot be copyrighted. This software/database is freely available 15 * to the public for use. The National Library of Medicine and the U.S. 16 * Government have not placed any restriction on its use or reproduction. 17 * 18 * Although all reasonable efforts have been taken to ensure the accuracy 19 * and reliability of the software and data, the NLM and the U.S. 20 * Government do not and cannot warrant the performance or results that 21 * may be obtained by using this software or data. The NLM and the U.S. 22 * Government disclaim all warranties, express or implied, including 23 * warranties of performance, merchantability or fitness for any particular 24 * purpose. 25 * 26 * Please cite the author in any work or product based on this material. 27 * 28 * =========================================================================== 29 * 30 * Authors: Anatoliy Kuznetsov, Victor Joukov 31 * 32 * File Description: 33 * Top level queue database (Thread-Safe, synchronized). 34 * 35 */ 36 37 38 #include <corelib/ncbimtx.hpp> 39 #include <corelib/ncbicntr.hpp> 40 41 #include <utility> 42 43 #include <connect/services/netschedule_api.hpp> 44 #include <connect/services/json_over_uttp.hpp> 45 #include "ns_util.hpp" 46 #include "job_status.hpp" 47 #include "queue_clean_thread.hpp" 48 #include "ns_notifications.hpp" 49 #include "ns_queue.hpp" 50 #include "queue_vc.hpp" 51 #include "background_host.hpp" 52 #include "ns_service_thread.hpp" 53 #include "ns_precise_time.hpp" 54 55 BEGIN_NCBI_SCOPE 56 57 class CNetScheduleServer; 58 59 60 // Holds parameters together with a queue instance 61 typedef map<string, 62 pair<SQueueParameters, CRef<CQueue> >, 63 PNocase > TQueueInfo; 64 65 66 // Top level queue database. (Thread-Safe, synchronized.) 67 class CQueueDataBase 68 { 69 public: 70 CQueueDataBase(CNetScheduleServer * server, 71 const string & path, 72 unsigned int max_queues, 73 bool diskless, 74 bool reinit); 75 ~CQueueDataBase(); 76 77 // Read queue information from registry and configure queues 78 // accordingly. 79 // returns minimum run timeout, necessary for watcher thread 80 time_t Configure(const IRegistry & reg, 81 CJsonNode & diff); 82 83 // Count Pending and Running jobs in all queues 84 unsigned int CountActiveJobs(void) const; 85 unsigned int CountAllJobs(void) const; 86 bool AnyJobs(void) const; 87 88 CRef<CQueue> OpenQueue(const string & name); 89 90 void CreateDynamicQueue(const CNSClientId & client, 91 const string & qname, const string & qclass, 92 const string & description = ""); 93 void DeleteDynamicQueue(const CNSClientId & client, 94 const string & qname); 95 SQueueParameters QueueInfo(const string & qname) const; 96 string GetQueueNames(const string & sep) const; 97 98 void Close(void); 99 bool QueueExists(const string & qname) const; 100 101 // Remove old jobs 102 void Purge(void); 103 void StopPurge(void); 104 void RunPurgeThread(void); 105 void StopPurgeThread(void); 106 107 // Collect garbage from affinities 108 void PurgeAffinities(void); 109 void PurgeGroups(void); 110 void StaleWNodes(void); 111 void PurgeBlacklistedJobs(void); 112 void PurgeClientRegistry(void); 113 114 // Notify all listeners 115 void NotifyListeners(void); 116 void RunNotifThread(void); 117 void StopNotifThread(void); 118 void WakeupNotifThread(void); 119 CNSPreciseTime SendExactNotifications(void); 120 121 // Print statistics 122 void PrintStatistics(size_t & aff_count); 123 void PrintJobCounters(void); 124 void RunServiceThread(void); 125 void StopServiceThread(void); 126 127 void CheckExecutionTimeout(bool logging); 128 void RunExecutionWatcherThread(const CNSPreciseTime & run_delay); 129 void StopExecutionWatcherThread(void); 130 131 string PrintTransitionCounters(void); 132 string PrintJobsStat(const CNSClientId & client); 133 string GetQueueClassesInfo(void) const; 134 string GetQueueClassesConfig(void) const; 135 string GetQueueInfo(void) const; 136 string GetQueueConfig(void) const; 137 string GetLinkedSectionConfig(void) const; 138 139 map<string, string> GetLinkedSection(const string & section_name) const; 140 141 // map: queue name -> pause state (integer, CQueue::EPauseState) 142 // the only paused queues are reported 143 map<string, int> GetPauseQueues(void) const; 144 vector<string> GetRefuseSubmitQueues(void) const; GetDataPath(void) const145 string GetDataPath(void) const 146 { return m_DataPath; } 147 148 private: 149 // No copy 150 CQueueDataBase(const CQueueDataBase&); 151 CQueueDataBase& operator=(const CQueueDataBase&); 152 153 protected: 154 // get next job id (counter increment) 155 unsigned int GetNextId(); 156 157 // Returns first id for the batch 158 unsigned int GetNextIdBatch(unsigned int count); 159 160 private: 161 void x_Open(bool reinit); 162 void x_CreateAndMountQueue(const string & qname, 163 const SQueueParameters & params); 164 165 unsigned x_PurgeUnconditional(void); 166 void x_OptimizeStatusMatrix(const CNSPreciseTime & current_time); 167 bool x_CheckStopPurge(void); 168 SQueueParameters x_SingleQueueInfo(TQueueInfo::const_iterator found) const; 169 170 CBackgroundHost & m_Host; 171 string m_DataPath; 172 string m_DumpPath; 173 unsigned int m_MaxQueues; 174 bool m_Diskless; 175 176 mutable CFastMutex m_ConfigureLock; 177 178 // Effective queue classes 179 TQueueParams m_QueueClasses; 180 // Effective queues 181 TQueueInfo m_Queues; 182 183 bool m_StopPurge; // Purge stop flag 184 CFastMutex m_PurgeLock; 185 unsigned int m_FreeStatusMemCnt; // Free memory counter 186 time_t m_LastFreeMem; // time of the last memory opt 187 188 CRef<CJobQueueCleanerThread> m_PurgeThread; 189 CRef<CServiceThread> m_ServiceThread; 190 CRef<CGetJobNotificationThread> m_NotifThread; 191 CRef<CJobQueueExecutionWatcherThread> m_ExeWatchThread; 192 193 CNetScheduleServer * m_Server; 194 195 private: 196 // Last scan attributes 197 string m_PurgeQueue; // The queue name 198 size_t m_PurgeStatusIndex; // Scanned status index 199 unsigned int m_PurgeJobScanned; // Scanned job ID within status 200 201 // Linked sections support 202 mutable CFastMutex m_LinkedSectionsGuard; 203 // Section name -> section values 204 map< string, map< string, string > > m_LinkedSections; 205 206 bool x_PurgeQueue(CQueue & queue, 207 size_t status_to_start, 208 size_t status_to_end, 209 unsigned int start_job_id, 210 unsigned int end_job_id, 211 size_t max_scanned, 212 size_t max_mark_deleted, 213 const CNSPreciseTime & current_time, 214 size_t & total_scanned, 215 size_t & total_mark_deleted); 216 void x_DeleteQueuesAndClasses(void); 217 CRef<CQueue> x_GetLastPurged(void); 218 CRef<CQueue> x_GetFirst(void); 219 CRef<CQueue> x_GetNext(const string & current_name); 220 221 // Crash detect support: 222 // - upon start the server creates CRASH_FLAG file 223 // - when gracefully finished the file is deleted 224 // - at the start it is checked if the file is there. If it is then 225 // it means the server crashed 226 void x_CreateCrashFlagFile(void); 227 bool x_DoesCrashFlagFileExist(void) const; 228 void x_RemoveCrashFlagFile(void); 229 230 // Dump problem detect support: 231 // - upon dtart the server creates DUMP_ERROR_FLAG file 232 // - if all the NS info was dumped successfully the file is deleted 233 // - at the start it is checked if the file is there. If it is then 234 // it means the previous instance had problems dumping something 235 void x_CreateDumpErrorFlagFile(void); 236 bool x_DoesDumpErrorFlagFileExist(void) const; 237 void x_RemoveDumpErrorFlagFile(void); 238 239 240 bool x_ConfigureQueueClasses(const TQueueParams & classes_from_ini, 241 CJsonNode & diff); 242 bool x_ConfigureQueues(const TQueueParams & queues_from_ini, 243 CJsonNode & diff); 244 245 TQueueParams x_ReadIniFileQueueClassDescriptions(const IRegistry & reg); 246 TQueueParams x_ReadIniFileQueueDescriptions(const IRegistry & reg, 247 const TQueueParams & classes); 248 void x_ReadLinkedSections(const IRegistry & reg, 249 CJsonNode & diff); 250 CJsonNode x_DetectChangesInLinkedSection( 251 const map<string, string> & old_values, 252 const map<string, string> & new_values); 253 254 void x_ValidateConfiguration(const TQueueParams & queues_from_ini) const; 255 unsigned int 256 x_CountQueuesToAdd(const TQueueParams & queues_from_ini) const; 257 258 CRef<CQueue> x_GetQueueAt(unsigned int index); 259 260 void x_Dump(void); 261 void x_DumpQueueOrClass(FILE * f, 262 const string & qname, const string & qclass, 263 bool is_queue, 264 const SQueueParameters & params); 265 void x_DumpLinkedSection(FILE * f, const string & sname, 266 const map<string, string> & values); 267 void x_RemoveDump(void); 268 void x_RemoveDataFiles(void); 269 void x_CreateStorageVersionFile(void); 270 271 bool x_CheckOpenPreconditions(bool reinit); 272 void x_ReadDumpQueueDesrc(set<string, PNocase> & dump_static_queues, 273 map<string, string, 274 PNocase> & dump_dynamic_queues, 275 TQueueParams & dump_queue_classes); 276 set<string, PNocase> x_GetConfigQueues(void); 277 void x_AppendDumpLinkedSections(void); 278 CNSPreciseTime CalculateRuntimePrecision(void) const; 279 void x_BackupDump(void); 280 void x_CreateSpaceReserveFile(void); 281 bool x_RemoveSpaceReserveFile(void); 282 string x_GetDumpSpaceFileName(void) const; 283 void x_RestorePauseState(const map<string, int> & paused_queues); 284 void x_RestoreRefuseSubmitState(const vector<string> & refuse_submit_queues); 285 }; // CQueueDataBase 286 287 288 END_NCBI_SCOPE 289 290 #endif /* NETSCHEDULE_QUEUE_DATABASE__HPP */ 291 292