1 /* $Id: grid_globals.cpp 606785 2020-04-27 16:12:51Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include <connect/services/grid_globals.hpp>
35 #include <connect/services/error_codes.hpp>
36
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/ncbimtx.hpp>
39 #include <corelib/ncbidiag.hpp>
40 #include <corelib/ncbi_safe_static.hpp>
41
42 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
43
44 BEGIN_NCBI_SCOPE
45
46 /////////////////////////////////////////////////////////////////////////////
47 //
48 // CWorkerNodeStatictics
49 /// @internal
CWNJobWatcher()50 CWNJobWatcher::CWNJobWatcher()
51 : m_JobsStarted(0), m_JobsSucceeded(0), m_JobsFailed(0), m_JobsReturned(0),
52 m_JobsRescheduled(0), m_JobsCanceled(0), m_JobsLost(0),
53 m_MaxJobsAllowed(0), m_MaxFailuresAllowed(0),
54 m_InfiniteLoopTime(0)
55 {
56 }
~CWNJobWatcher()57 CWNJobWatcher::~CWNJobWatcher()
58 {
59 }
60
Notify(const CWorkerNodeJobContext & job_context,EEvent event)61 void CWNJobWatcher::Notify(const CWorkerNodeJobContext& job_context,
62 EEvent event)
63 {
64 auto& grid_globals = CGridGlobals::GetInstance();
65
66 switch (event) {
67 case eJobStarted:
68 {
69 CMutexGuard guard(m_ActiveJobsMutex);
70 m_ActiveJobs[const_cast<CWorkerNodeJobContext*>(&job_context)] =
71 SJobActivity();
72 ++m_JobsStarted;
73 if (m_MaxJobsAllowed > 0 && m_JobsStarted > m_MaxJobsAllowed - 1 && !grid_globals.IsShuttingDown()) {
74 LOG_POST_X(1, "The maximum number of allowed jobs (" <<
75 m_MaxJobsAllowed << ") has been reached. "
76 "Sending the shutdown request." );
77 grid_globals.RequestShutdown(CNetScheduleAdmin::eNormalShutdown);
78 }
79 }
80 break;
81 case eJobStopped:
82 {
83 CMutexGuard guard(m_ActiveJobsMutex);
84 m_ActiveJobs.erase(
85 const_cast<CWorkerNodeJobContext*>(&job_context));
86 }
87 break;
88 case eJobFailed:
89 ++m_JobsFailed;
90 if (m_MaxFailuresAllowed > 0 && m_JobsFailed > m_MaxFailuresAllowed - 1 &&
91 grid_globals.GetShutdownLevel() < CNetScheduleAdmin::eShutdownImmediate) {
92 ERR_POST_X(2, Warning << "The maximum number of failed jobs (" <<
93 m_MaxFailuresAllowed << ") has been reached. "
94 "Shutting down..." );
95 grid_globals.RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
96 }
97 break;
98 case eJobSucceeded:
99 ++m_JobsSucceeded;
100 break;
101 case eJobReturned:
102 ++m_JobsReturned;
103 break;
104 case eJobRescheduled:
105 ++m_JobsRescheduled;
106 break;
107 case eJobCanceled:
108 ++m_JobsCanceled;
109 break;
110 case eJobLost:
111 ++m_JobsLost;
112 break;
113 }
114
115 if (event != eJobStarted && !grid_globals.IsShuttingDown()) {
116 CGridWorkerNode worker_node(job_context.GetWorkerNode());
117 Uint8 total_memory_limit = worker_node.GetTotalMemoryLimit();
118 if (total_memory_limit > 0) { // memory check requested
119 CCurrentProcess::SMemoryUsage memory_usage;
120 if (!CCurrentProcess::GetMemoryUsage(memory_usage)) {
121 ERR_POST("Could not check self memory usage" );
122 } else if (memory_usage.total > total_memory_limit) {
123 ERR_POST(Warning << "Memory usage (" << memory_usage.total <<
124 ") is above the configured limit (" <<
125 total_memory_limit << ")");
126 const auto kExitCode = 100; // See also one in wn_main_loop.cpp
127 grid_globals.RequestShutdown(CNetScheduleAdmin::eNormalShutdown, kExitCode);
128 }
129 }
130 }
131 }
132
Print(CNcbiOstream & os) const133 void CWNJobWatcher::Print(CNcbiOstream& os) const
134 {
135 os << "Started: " <<
136 CGridGlobals::GetInstance().GetStartTime().AsString() <<
137 "\nJobs Succeeded: " << m_JobsSucceeded <<
138 "\nJobs Failed: " << m_JobsFailed <<
139 "\nJobs Returned: " << m_JobsReturned <<
140 "\nJobs Rescheduled: " << m_JobsRescheduled <<
141 "\nJobs Canceled: " << m_JobsCanceled <<
142 "\nJobs Lost: " << m_JobsLost << "\n";
143
144 CMutexGuard guard(m_ActiveJobsMutex);
145 os << "Jobs Running: " << m_ActiveJobs.size() << "\n";
146 ITERATE(TActiveJobs, it, m_ActiveJobs) {
147 os << it->first->GetJobKey() << " \"" <<
148 NStr::PrintableString(it->first->GetJobInput()) <<
149 "\" -- running for " <<
150 (int) it->second.elasped_time.Elapsed() << " seconds.";
151 if (it->second.is_stuck)
152 os << "!!! LONG RUNNING JOB !!!";
153 os << "\n";
154 }
155 }
156
CheckForInfiniteLoop()157 void CWNJobWatcher::CheckForInfiniteLoop()
158 {
159 if (m_InfiniteLoopTime > 0) {
160 size_t count = 0;
161 CMutexGuard guard(m_ActiveJobsMutex);
162 NON_CONST_ITERATE(TActiveJobs, it, m_ActiveJobs) {
163 if (!it->second.is_stuck) {
164 if ( it->second.elasped_time.Elapsed() > m_InfiniteLoopTime) {
165 const auto job_key = it->first->GetJobKey();
166 ERR_POST_X(3, "An infinite loop is detected in job " << job_key);
167 GetDiagContext().Extra().Print("job_key", job_key);
168
169 it->second.is_stuck = true;
170 CGridGlobals::GetInstance().
171 RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
172 }
173 } else
174 ++count;
175 }
176 if (count > 0 && count == m_ActiveJobs.size()) {
177 ERR_POST_X(4, "All jobs are in infinite loops. "
178 "Server is shutting down.");
179 CGridGlobals::GetInstance().KillNode();
180 }
181 }
182 }
183
x_KillNode(CGridWorkerNode worker)184 void CWNJobWatcher::x_KillNode(CGridWorkerNode worker)
185 {
186 CMutexGuard guard(m_ActiveJobsMutex);
187 NON_CONST_ITERATE(TActiveJobs, it, m_ActiveJobs) {
188 CNetScheduleJob& job = it->first->GetJob();
189 if (!it->second.is_stuck)
190 worker.GetNSExecutor().ReturnJob(job);
191 else {
192 job.error_msg = "Job execution time exceeded " +
193 NStr::NumericToString(
194 unsigned(it->second.elasped_time.Elapsed()));
195 job.error_msg += " seconds.";
196 worker.GetNSExecutor().PutFailure(job);
197 }
198 }
199 TPid cpid = CCurrentProcess::GetPid();
200 CProcess(cpid).Kill();
201 }
202
203
204 /////////////////////////////////////////////////////////////////////////////
205 //
CGridGlobals()206 CGridGlobals::CGridGlobals() :
207 m_ReuseJobObject(false),
208 m_ShutdownLevel(CNetScheduleAdmin::eNoShutdown),
209 m_ExitCode(0),
210 m_StartTime(GetFastLocalTime()),
211 m_Worker(NULL),
212 m_UDPPort(0)
213 {
214 }
215
~CGridGlobals()216 CGridGlobals::~CGridGlobals()
217 {
218 }
219
220 /* static */
GetInstance()221 CGridGlobals& CGridGlobals::GetInstance()
222 {
223 static CSafeStatic<CGridGlobals> global_instance;
224
225 return global_instance.Get();
226 }
227
228
GetNewJobNumber()229 unsigned int CGridGlobals::GetNewJobNumber()
230 {
231 return (unsigned) m_JobsStarted.Add(1);
232 }
233
GetJobWatcher()234 CWNJobWatcher& CGridGlobals::GetJobWatcher()
235 {
236 if (!m_JobWatcher.get())
237 m_JobWatcher.reset(new CWNJobWatcher);
238 return *m_JobWatcher;
239 }
240
KillNode()241 void CGridGlobals::KillNode()
242 {
243 _ASSERT(m_Worker);
244 if (m_Worker)
245 GetJobWatcher().x_KillNode(m_Worker);
246 }
247
InterruptUDPPortListening()248 void CGridGlobals::InterruptUDPPortListening()
249 {
250 if (m_UDPPort != 0)
251 CDatagramSocket().Send("INTERRUPT", sizeof("INTERRUPT"),
252 "127.0.0.1", m_UDPPort);
253 }
254
255 END_NCBI_SCOPE
256