1 /* $Id: queue_clean_thread.cpp 573282 2018-10-25 15:29:28Z satskyse $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Anatoliy Kuznetsov, Victor Joukov
27 *
28 * File Description: Queue cleaning threads.
29 *
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <corelib/request_ctx.hpp>
35
36 #include "queue_database.hpp"
37 #include "queue_clean_thread.hpp"
38 #include "ns_handler.hpp"
39
40 BEGIN_NCBI_SCOPE
41
42 class CQueueDataBase;
43
44
45
CJobQueueCleanerThread(CBackgroundHost & host,CQueueDataBase & qdb,unsigned int sec_delay,unsigned int nanosec_delay,const bool & logging)46 CJobQueueCleanerThread::CJobQueueCleanerThread(
47 CBackgroundHost & host,
48 CQueueDataBase & qdb,
49 unsigned int sec_delay,
50 unsigned int nanosec_delay,
51 const bool & logging) :
52 m_Host(host),
53 m_QueueDB(qdb),
54 m_CleaningLogging(logging),
55 m_SecDelay(sec_delay),
56 m_NanosecDelay(nanosec_delay),
57 m_StopSignal(0, 10000000)
58 {}
59
60
~CJobQueueCleanerThread()61 CJobQueueCleanerThread::~CJobQueueCleanerThread()
62 {}
63
64
RequestStop(void)65 void CJobQueueCleanerThread::RequestStop(void)
66 {
67 m_StopFlag.Add(1);
68 m_StopSignal.Post();
69 }
70
71
Main(void)72 void * CJobQueueCleanerThread::Main(void)
73 {
74 SetCurrentThreadName("netscheduled_gc");
75 while (1) {
76 x_DoJob();
77
78 if (m_StopSignal.TryWait(m_SecDelay, m_NanosecDelay))
79 if (m_StopFlag.Get() != 0)
80 break;
81 } // while (1)
82
83 return 0;
84 }
85
86
x_DoJob(void)87 void CJobQueueCleanerThread::x_DoJob(void)
88 {
89 if (!m_Host.ShouldRun())
90 return;
91
92
93 bool is_log = m_CleaningLogging;
94 CRef<CRequestContext> ctx;
95
96 if (is_log) {
97 ctx.Reset(new CRequestContext());
98 ctx->SetRequestID();
99 GetDiagContext().SetRequestContext(ctx);
100 GetDiagContext().PrintRequestStart()
101 .Print("_type", "job_cleaner_thread");
102 ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
103 }
104
105 try {
106 m_QueueDB.Purge();
107 m_QueueDB.PurgeAffinities();
108 m_QueueDB.PurgeGroups();
109 m_QueueDB.StaleWNodes();
110 m_QueueDB.PurgeBlacklistedJobs();
111 m_QueueDB.PurgeClientRegistry();
112 }
113 catch(exception & ex) {
114 RequestStop();
115 ERR_POST("Error while cleaning queue: " << ex.what() <<
116 " Cleaning thread has been stopped.");
117 if (is_log)
118 ctx->SetRequestStatus(
119 CNetScheduleHandler::eStatus_ServerError);
120 }
121 catch (...) {
122 RequestStop();
123 ERR_POST("Unknown error while cleaning queue. "
124 "Cleaning thread has been stopped.");
125 if (is_log)
126 ctx->SetRequestStatus(
127 CNetScheduleHandler::eStatus_ServerError);
128 }
129
130 if (is_log) {
131 GetDiagContext().PrintRequestStop();
132 ctx.Reset();
133 GetDiagContext().SetRequestContext(NULL);
134 }
135 }
136
137
CJobQueueExecutionWatcherThread(CBackgroundHost & host,CQueueDataBase & qdb,unsigned int sec_delay,unsigned int nanosec_delay,const bool & logging)138 CJobQueueExecutionWatcherThread::CJobQueueExecutionWatcherThread(
139 CBackgroundHost & host,
140 CQueueDataBase & qdb,
141 unsigned int sec_delay,
142 unsigned int nanosec_delay,
143 const bool & logging) :
144 m_Host(host),
145 m_QueueDB(qdb),
146 m_ExecutionLogging(logging),
147 m_SecDelay(sec_delay),
148 m_NanosecDelay(nanosec_delay),
149 m_StopSignal(0, 10000000)
150 {}
151
152
~CJobQueueExecutionWatcherThread()153 CJobQueueExecutionWatcherThread::~CJobQueueExecutionWatcherThread()
154 {}
155
156
RequestStop(void)157 void CJobQueueExecutionWatcherThread::RequestStop(void)
158 {
159 m_StopFlag.Add(1);
160 m_StopSignal.Post();
161 }
162
163
Main(void)164 void * CJobQueueExecutionWatcherThread::Main(void)
165 {
166 SetCurrentThreadName("netscheduled_ew");
167 while (1) {
168 x_DoJob();
169
170 if (m_StopSignal.TryWait(m_SecDelay, m_NanosecDelay))
171 if (m_StopFlag.Get() != 0)
172 break;
173 } // while (1)
174
175 return 0;
176 }
177
178
179
x_DoJob(void)180 void CJobQueueExecutionWatcherThread::x_DoJob(void)
181 {
182 if (!m_Host.ShouldRun())
183 return;
184
185
186 bool is_log = m_ExecutionLogging;
187 CRef<CRequestContext> ctx;
188
189 if (is_log) {
190 ctx.Reset(new CRequestContext());
191 ctx->SetRequestID();
192 GetDiagContext().SetRequestContext(ctx);
193 GetDiagContext().PrintRequestStart()
194 .Print("_type", "job_execution_watcher_thread");
195 ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
196 }
197
198
199 try {
200 m_QueueDB.CheckExecutionTimeout(is_log);
201 }
202 catch (exception & ex) {
203 RequestStop();
204 ERR_POST("Error in execution watcher: " << ex.what() <<
205 " watcher thread has been stopped.");
206 if (is_log)
207 ctx->SetRequestStatus(
208 CNetScheduleHandler::eStatus_ServerError);
209 }
210 catch (...) {
211 RequestStop();
212 ERR_POST("Unknown error in execution watcher. "
213 "Watched thread has been stopped.");
214 if (is_log)
215 ctx->SetRequestStatus(
216 CNetScheduleHandler::eStatus_ServerError);
217 }
218
219 if (is_log) {
220 GetDiagContext().PrintRequestStop();
221 ctx.Reset();
222 GetDiagContext().SetRequestContext(NULL);
223 }
224 }
225
226
227 END_NCBI_SCOPE
228
229