1 #ifndef NETSCHEDULE_QUEUE_DATABASE__HPP
2 #define NETSCHEDULE_QUEUE_DATABASE__HPP
3 
4 
5 /*  $Id: queue_database.hpp 575173 2018-11-26 13:25:46Z satskyse $
6  * ===========================================================================
7  *
8  *                            PUBLIC DOMAIN NOTICE
9  *               National Center for Biotechnology Information
10  *
11  *  This software/database is a "United States Government Work" under the
12  *  terms of the United States Copyright Act.  It was written as part of
13  *  the author's official duties as a United States Government employee and
14  *  thus cannot be copyrighted.  This software/database is freely available
15  *  to the public for use. The National Library of Medicine and the U.S.
16  *  Government have not placed any restriction on its use or reproduction.
17  *
18  *  Although all reasonable efforts have been taken to ensure the accuracy
19  *  and reliability of the software and data, the NLM and the U.S.
20  *  Government do not and cannot warrant the performance or results that
21  *  may be obtained by using this software or data. The NLM and the U.S.
22  *  Government disclaim all warranties, express or implied, including
23  *  warranties of performance, merchantability or fitness for any particular
24  *  purpose.
25  *
26  *  Please cite the author in any work or product based on this material.
27  *
28  * ===========================================================================
29  *
30  * Authors:  Anatoliy Kuznetsov, Victor Joukov
31  *
32  * File Description:
33  *   Top level queue database (Thread-Safe, synchronized).
34  *
35  */
36 
37 
38 #include <corelib/ncbimtx.hpp>
39 #include <corelib/ncbicntr.hpp>
40 
41 #include <utility>
42 
43 #include <connect/services/netschedule_api.hpp>
44 #include <connect/services/json_over_uttp.hpp>
45 #include "ns_util.hpp"
46 #include "job_status.hpp"
47 #include "queue_clean_thread.hpp"
48 #include "ns_notifications.hpp"
49 #include "ns_queue.hpp"
50 #include "queue_vc.hpp"
51 #include "background_host.hpp"
52 #include "ns_service_thread.hpp"
53 #include "ns_precise_time.hpp"
54 
55 BEGIN_NCBI_SCOPE
56 
57 class CNetScheduleServer;
58 
59 
60 // Holds parameters together with a queue instance
61 typedef map<string,
62             pair<SQueueParameters, CRef<CQueue> >,
63             PNocase >   TQueueInfo;
64 
65 
66 // Top level queue database. (Thread-Safe, synchronized.)
67 class CQueueDataBase
68 {
69 public:
70     CQueueDataBase(CNetScheduleServer *  server,
71                    const string &  path,
72                    unsigned int  max_queues,
73                    bool  diskless,
74                    bool  reinit);
75     ~CQueueDataBase();
76 
77     // Read queue information from registry and configure queues
78     // accordingly.
79     // returns minimum run timeout, necessary for watcher thread
80     time_t  Configure(const IRegistry &  reg,
81                       CJsonNode &        diff);
82 
83     // Count Pending and Running jobs in all queues
84     unsigned int  CountActiveJobs(void) const;
85     unsigned int  CountAllJobs(void) const;
86     bool  AnyJobs(void) const;
87 
88     CRef<CQueue> OpenQueue(const string &  name);
89 
90     void CreateDynamicQueue(const CNSClientId &  client,
91                             const string &  qname, const string &  qclass,
92                             const string &  description = "");
93     void DeleteDynamicQueue(const CNSClientId &  client,
94                             const string &  qname);
95     SQueueParameters QueueInfo(const string &  qname) const;
96     string GetQueueNames(const string &  sep) const;
97 
98     void Close(void);
99     bool QueueExists(const string &  qname) const;
100 
101     // Remove old jobs
102     void Purge(void);
103     void StopPurge(void);
104     void RunPurgeThread(void);
105     void StopPurgeThread(void);
106 
107     // Collect garbage from affinities
108     void PurgeAffinities(void);
109     void PurgeGroups(void);
110     void StaleWNodes(void);
111     void PurgeBlacklistedJobs(void);
112     void PurgeClientRegistry(void);
113 
114     // Notify all listeners
115     void NotifyListeners(void);
116     void RunNotifThread(void);
117     void StopNotifThread(void);
118     void WakeupNotifThread(void);
119     CNSPreciseTime SendExactNotifications(void);
120 
121     // Print statistics
122     void PrintStatistics(size_t &  aff_count);
123     void PrintJobCounters(void);
124     void RunServiceThread(void);
125     void StopServiceThread(void);
126 
127     void CheckExecutionTimeout(bool  logging);
128     void RunExecutionWatcherThread(const CNSPreciseTime &  run_delay);
129     void StopExecutionWatcherThread(void);
130 
131     string PrintTransitionCounters(void);
132     string PrintJobsStat(const CNSClientId &  client);
133     string GetQueueClassesInfo(void) const;
134     string GetQueueClassesConfig(void) const;
135     string GetQueueInfo(void) const;
136     string GetQueueConfig(void) const;
137     string GetLinkedSectionConfig(void) const;
138 
139     map<string, string>  GetLinkedSection(const string &  section_name) const;
140 
141     // map: queue name -> pause state (integer, CQueue::EPauseState)
142     // the only paused queues are reported
143     map<string, int> GetPauseQueues(void) const;
144     vector<string> GetRefuseSubmitQueues(void) const;
GetDataPath(void) const145     string GetDataPath(void) const
146     { return m_DataPath; }
147 
148 private:
149     // No copy
150     CQueueDataBase(const CQueueDataBase&);
151     CQueueDataBase& operator=(const CQueueDataBase&);
152 
153 protected:
154     // get next job id (counter increment)
155     unsigned int  GetNextId();
156 
157     // Returns first id for the batch
158     unsigned int  GetNextIdBatch(unsigned int  count);
159 
160 private:
161     void x_Open(bool  reinit);
162     void x_CreateAndMountQueue(const string &            qname,
163                                const SQueueParameters &  params);
164 
165     unsigned x_PurgeUnconditional(void);
166     void     x_OptimizeStatusMatrix(const CNSPreciseTime &  current_time);
167     bool     x_CheckStopPurge(void);
168     SQueueParameters x_SingleQueueInfo(TQueueInfo::const_iterator  found) const;
169 
170     CBackgroundHost &    m_Host;
171     string               m_DataPath;
172     string               m_DumpPath;
173     unsigned int         m_MaxQueues;
174     bool                 m_Diskless;
175 
176     mutable CFastMutex   m_ConfigureLock;
177 
178     // Effective queue classes
179     TQueueParams         m_QueueClasses;
180     // Effective queues
181     TQueueInfo           m_Queues;
182 
183     bool                 m_StopPurge;         // Purge stop flag
184     CFastMutex           m_PurgeLock;
185     unsigned int         m_FreeStatusMemCnt;  // Free memory counter
186     time_t               m_LastFreeMem;       // time of the last memory opt
187 
188     CRef<CJobQueueCleanerThread>            m_PurgeThread;
189     CRef<CServiceThread>                    m_ServiceThread;
190     CRef<CGetJobNotificationThread>         m_NotifThread;
191     CRef<CJobQueueExecutionWatcherThread>   m_ExeWatchThread;
192 
193     CNetScheduleServer *                    m_Server;
194 
195 private:
196     // Last scan attributes
197     string              m_PurgeQueue;           // The queue name
198     size_t              m_PurgeStatusIndex;     // Scanned status index
199     unsigned int        m_PurgeJobScanned;      // Scanned job ID within status
200 
201     // Linked sections support
202     mutable CFastMutex                      m_LinkedSectionsGuard;
203     // Section name -> section values
204     map< string, map< string, string > >    m_LinkedSections;
205 
206     bool x_PurgeQueue(CQueue &                queue,
207                       size_t                  status_to_start,
208                       size_t                  status_to_end,
209                       unsigned int            start_job_id,
210                       unsigned int            end_job_id,
211                       size_t                  max_scanned,
212                       size_t                  max_mark_deleted,
213                       const CNSPreciseTime &  current_time,
214                       size_t &                total_scanned,
215                       size_t &                total_mark_deleted);
216     void  x_DeleteQueuesAndClasses(void);
217     CRef<CQueue>  x_GetLastPurged(void);
218     CRef<CQueue>  x_GetFirst(void);
219     CRef<CQueue>  x_GetNext(const string &  current_name);
220 
221     // Crash detect support:
222     // - upon start the server creates CRASH_FLAG file
223     // - when gracefully finished the file is deleted
224     // - at the start it is checked if the file is there. If it is then
225     //   it means the server crashed
226     void  x_CreateCrashFlagFile(void);
227     bool  x_DoesCrashFlagFileExist(void) const;
228     void  x_RemoveCrashFlagFile(void);
229 
230     // Dump problem detect support:
231     // - upon dtart the server creates DUMP_ERROR_FLAG file
232     // - if all the NS info was dumped successfully the file is deleted
233     // - at the start it is checked if the file is there. If it is then
234     //   it means the previous instance had problems dumping something
235     void  x_CreateDumpErrorFlagFile(void);
236     bool  x_DoesDumpErrorFlagFileExist(void) const;
237     void  x_RemoveDumpErrorFlagFile(void);
238 
239 
240     bool  x_ConfigureQueueClasses(const TQueueParams &  classes_from_ini,
241                                   CJsonNode &           diff);
242     bool  x_ConfigureQueues(const TQueueParams &  queues_from_ini,
243                             CJsonNode &           diff);
244 
245     TQueueParams  x_ReadIniFileQueueClassDescriptions(const IRegistry &   reg);
246     TQueueParams  x_ReadIniFileQueueDescriptions(const IRegistry &     reg,
247                                                  const TQueueParams &  classes);
248     void  x_ReadLinkedSections(const IRegistry &  reg,
249                                CJsonNode &        diff);
250     CJsonNode  x_DetectChangesInLinkedSection(
251                                 const map<string, string> &  old_values,
252                                 const map<string, string> &  new_values);
253 
254     void  x_ValidateConfiguration(const TQueueParams &  queues_from_ini) const;
255     unsigned int
256     x_CountQueuesToAdd(const TQueueParams &  queues_from_ini) const;
257 
258     CRef<CQueue>  x_GetQueueAt(unsigned int  index);
259 
260     void x_Dump(void);
261     void x_DumpQueueOrClass(FILE *  f,
262                             const string &  qname, const string &  qclass,
263                             bool  is_queue,
264                             const SQueueParameters &  params);
265     void x_DumpLinkedSection(FILE *  f, const string &  sname,
266                              const map<string, string> &  values);
267     void x_RemoveDump(void);
268     void x_RemoveDataFiles(void);
269     void x_CreateStorageVersionFile(void);
270 
271     bool x_CheckOpenPreconditions(bool  reinit);
272     void x_ReadDumpQueueDesrc(set<string, PNocase> &  dump_static_queues,
273                               map<string, string,
274                                   PNocase> &  dump_dynamic_queues,
275                               TQueueParams &  dump_queue_classes);
276     set<string, PNocase> x_GetConfigQueues(void);
277     void x_AppendDumpLinkedSections(void);
278     CNSPreciseTime CalculateRuntimePrecision(void) const;
279     void x_BackupDump(void);
280     void x_CreateSpaceReserveFile(void);
281     bool x_RemoveSpaceReserveFile(void);
282     string x_GetDumpSpaceFileName(void) const;
283     void x_RestorePauseState(const map<string, int> &  paused_queues);
284     void x_RestoreRefuseSubmitState(const vector<string> &  refuse_submit_queues);
285 }; // CQueueDataBase
286 
287 
288 END_NCBI_SCOPE
289 
290 #endif /* NETSCHEDULE_QUEUE_DATABASE__HPP */
291 
292