1 /*  $Id: queue_clean_thread.cpp 573282 2018-10-25 15:29:28Z satskyse $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Anatoliy Kuznetsov, Victor Joukov
27  *
28  * File Description: Queue cleaning threads.
29  *
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/request_ctx.hpp>
35 
36 #include "queue_database.hpp"
37 #include "queue_clean_thread.hpp"
38 #include "ns_handler.hpp"
39 
40 BEGIN_NCBI_SCOPE
41 
42 class CQueueDataBase;
43 
44 
45 
CJobQueueCleanerThread(CBackgroundHost & host,CQueueDataBase & qdb,unsigned int sec_delay,unsigned int nanosec_delay,const bool & logging)46 CJobQueueCleanerThread::CJobQueueCleanerThread(
47         CBackgroundHost &    host,
48         CQueueDataBase &     qdb,
49         unsigned int         sec_delay,
50         unsigned int         nanosec_delay,
51         const bool &         logging) :
52     m_Host(host),
53     m_QueueDB(qdb),
54     m_CleaningLogging(logging),
55     m_SecDelay(sec_delay),
56     m_NanosecDelay(nanosec_delay),
57     m_StopSignal(0, 10000000)
58 {}
59 
60 
~CJobQueueCleanerThread()61 CJobQueueCleanerThread::~CJobQueueCleanerThread()
62 {}
63 
64 
RequestStop(void)65 void CJobQueueCleanerThread::RequestStop(void)
66 {
67     m_StopFlag.Add(1);
68     m_StopSignal.Post();
69 }
70 
71 
Main(void)72 void *  CJobQueueCleanerThread::Main(void)
73 {
74     SetCurrentThreadName("netscheduled_gc");
75     while (1) {
76         x_DoJob();
77 
78         if (m_StopSignal.TryWait(m_SecDelay, m_NanosecDelay))
79             if (m_StopFlag.Get() != 0)
80                 break;
81     } // while (1)
82 
83     return 0;
84 }
85 
86 
x_DoJob(void)87 void CJobQueueCleanerThread::x_DoJob(void)
88 {
89     if (!m_Host.ShouldRun())
90         return;
91 
92 
93     bool                    is_log = m_CleaningLogging;
94     CRef<CRequestContext>   ctx;
95 
96     if (is_log) {
97         ctx.Reset(new CRequestContext());
98         ctx->SetRequestID();
99         GetDiagContext().SetRequestContext(ctx);
100         GetDiagContext().PrintRequestStart()
101                         .Print("_type", "job_cleaner_thread");
102         ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
103     }
104 
105     try {
106         m_QueueDB.Purge();
107         m_QueueDB.PurgeAffinities();
108         m_QueueDB.PurgeGroups();
109         m_QueueDB.StaleWNodes();
110         m_QueueDB.PurgeBlacklistedJobs();
111         m_QueueDB.PurgeClientRegistry();
112     }
113     catch(exception &  ex) {
114         RequestStop();
115         ERR_POST("Error while cleaning queue: " << ex.what() <<
116                  " Cleaning thread has been stopped.");
117         if (is_log)
118             ctx->SetRequestStatus(
119                             CNetScheduleHandler::eStatus_ServerError);
120     }
121     catch (...) {
122         RequestStop();
123         ERR_POST("Unknown error while cleaning queue. "
124                  "Cleaning thread has been stopped.");
125         if (is_log)
126             ctx->SetRequestStatus(
127                             CNetScheduleHandler::eStatus_ServerError);
128     }
129 
130     if (is_log) {
131         GetDiagContext().PrintRequestStop();
132         ctx.Reset();
133         GetDiagContext().SetRequestContext(NULL);
134     }
135 }
136 
137 
CJobQueueExecutionWatcherThread(CBackgroundHost & host,CQueueDataBase & qdb,unsigned int sec_delay,unsigned int nanosec_delay,const bool & logging)138 CJobQueueExecutionWatcherThread::CJobQueueExecutionWatcherThread(
139             CBackgroundHost &       host,
140             CQueueDataBase &        qdb,
141             unsigned int            sec_delay,
142             unsigned int            nanosec_delay,
143             const bool &            logging) :
144     m_Host(host),
145     m_QueueDB(qdb),
146     m_ExecutionLogging(logging),
147     m_SecDelay(sec_delay),
148     m_NanosecDelay(nanosec_delay),
149     m_StopSignal(0, 10000000)
150 {}
151 
152 
~CJobQueueExecutionWatcherThread()153 CJobQueueExecutionWatcherThread::~CJobQueueExecutionWatcherThread()
154 {}
155 
156 
RequestStop(void)157 void CJobQueueExecutionWatcherThread::RequestStop(void)
158 {
159     m_StopFlag.Add(1);
160     m_StopSignal.Post();
161 }
162 
163 
Main(void)164 void *  CJobQueueExecutionWatcherThread::Main(void)
165 {
166     SetCurrentThreadName("netscheduled_ew");
167     while (1) {
168         x_DoJob();
169 
170         if (m_StopSignal.TryWait(m_SecDelay, m_NanosecDelay))
171             if (m_StopFlag.Get() != 0)
172                 break;
173     } // while (1)
174 
175     return 0;
176 }
177 
178 
179 
x_DoJob(void)180 void CJobQueueExecutionWatcherThread::x_DoJob(void)
181 {
182     if (!m_Host.ShouldRun())
183         return;
184 
185 
186     bool                    is_log = m_ExecutionLogging;
187     CRef<CRequestContext>   ctx;
188 
189     if (is_log) {
190         ctx.Reset(new CRequestContext());
191         ctx->SetRequestID();
192         GetDiagContext().SetRequestContext(ctx);
193         GetDiagContext().PrintRequestStart()
194                         .Print("_type", "job_execution_watcher_thread");
195         ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
196     }
197 
198 
199     try {
200         m_QueueDB.CheckExecutionTimeout(is_log);
201     }
202     catch (exception &  ex) {
203         RequestStop();
204         ERR_POST("Error in execution watcher: " << ex.what() <<
205                  " watcher thread has been stopped.");
206         if (is_log)
207             ctx->SetRequestStatus(
208                             CNetScheduleHandler::eStatus_ServerError);
209     }
210     catch (...) {
211         RequestStop();
212         ERR_POST("Unknown error in execution watcher. "
213                  "Watched thread has been stopped.");
214         if (is_log)
215             ctx->SetRequestStatus(
216                             CNetScheduleHandler::eStatus_ServerError);
217     }
218 
219     if (is_log) {
220         GetDiagContext().PrintRequestStop();
221         ctx.Reset();
222         GetDiagContext().SetRequestContext(NULL);
223     }
224 }
225 
226 
227 END_NCBI_SCOPE
228 
229