1 /*  $Id: grid_globals.cpp 606785 2020-04-27 16:12:51Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *   Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <connect/services/grid_globals.hpp>
35 #include <connect/services/error_codes.hpp>
36 
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/ncbimtx.hpp>
39 #include <corelib/ncbidiag.hpp>
40 #include <corelib/ncbi_safe_static.hpp>
41 
42 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
43 
44 BEGIN_NCBI_SCOPE
45 
46 /////////////////////////////////////////////////////////////////////////////
47 //
48 //     CWorkerNodeStatictics
49 /// @internal
CWNJobWatcher()50 CWNJobWatcher::CWNJobWatcher()
51     : m_JobsStarted(0), m_JobsSucceeded(0), m_JobsFailed(0), m_JobsReturned(0),
52       m_JobsRescheduled(0), m_JobsCanceled(0), m_JobsLost(0),
53       m_MaxJobsAllowed(0), m_MaxFailuresAllowed(0),
54       m_InfiniteLoopTime(0)
55 {
56 }
~CWNJobWatcher()57 CWNJobWatcher::~CWNJobWatcher()
58 {
59 }
60 
Notify(const CWorkerNodeJobContext & job_context,EEvent event)61 void CWNJobWatcher::Notify(const CWorkerNodeJobContext& job_context,
62                             EEvent event)
63 {
64     auto& grid_globals = CGridGlobals::GetInstance();
65 
66     switch (event) {
67     case eJobStarted:
68         {
69             CMutexGuard guard(m_ActiveJobsMutex);
70             m_ActiveJobs[const_cast<CWorkerNodeJobContext*>(&job_context)] =
71                     SJobActivity();
72             ++m_JobsStarted;
73             if (m_MaxJobsAllowed > 0 && m_JobsStarted > m_MaxJobsAllowed - 1 && !grid_globals.IsShuttingDown()) {
74                 LOG_POST_X(1, "The maximum number of allowed jobs (" <<
75                               m_MaxJobsAllowed << ") has been reached. "
76                               "Sending the shutdown request." );
77                 grid_globals.RequestShutdown(CNetScheduleAdmin::eNormalShutdown);
78             }
79         }
80         break;
81     case eJobStopped:
82         {
83             CMutexGuard guard(m_ActiveJobsMutex);
84             m_ActiveJobs.erase(
85                     const_cast<CWorkerNodeJobContext*>(&job_context));
86         }
87         break;
88     case eJobFailed:
89         ++m_JobsFailed;
90         if (m_MaxFailuresAllowed > 0 && m_JobsFailed > m_MaxFailuresAllowed - 1 &&
91                 grid_globals.GetShutdownLevel() < CNetScheduleAdmin::eShutdownImmediate) {
92             ERR_POST_X(2, Warning << "The maximum number of failed jobs (" <<
93                           m_MaxFailuresAllowed << ") has been reached. "
94                           "Shutting down..." );
95             grid_globals.RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
96         }
97         break;
98     case eJobSucceeded:
99         ++m_JobsSucceeded;
100         break;
101     case eJobReturned:
102         ++m_JobsReturned;
103         break;
104     case eJobRescheduled:
105         ++m_JobsRescheduled;
106         break;
107     case eJobCanceled:
108         ++m_JobsCanceled;
109         break;
110     case eJobLost:
111         ++m_JobsLost;
112         break;
113     }
114 
115     if (event != eJobStarted && !grid_globals.IsShuttingDown()) {
116         CGridWorkerNode worker_node(job_context.GetWorkerNode());
117         Uint8 total_memory_limit = worker_node.GetTotalMemoryLimit();
118         if (total_memory_limit > 0) {  // memory check requested
119             CCurrentProcess::SMemoryUsage memory_usage;
120             if (!CCurrentProcess::GetMemoryUsage(memory_usage)) {
121                 ERR_POST("Could not check self memory usage" );
122             } else if (memory_usage.total > total_memory_limit) {
123                 ERR_POST(Warning << "Memory usage (" << memory_usage.total <<
124                     ") is above the configured limit (" <<
125                     total_memory_limit << ")");
126                 const auto kExitCode = 100; // See also one in wn_main_loop.cpp
127                 grid_globals.RequestShutdown(CNetScheduleAdmin::eNormalShutdown, kExitCode);
128             }
129         }
130     }
131 }
132 
Print(CNcbiOstream & os) const133 void CWNJobWatcher::Print(CNcbiOstream& os) const
134 {
135     os << "Started: " <<
136                     CGridGlobals::GetInstance().GetStartTime().AsString() <<
137             "\nJobs Succeeded: " << m_JobsSucceeded <<
138             "\nJobs Failed: " << m_JobsFailed  <<
139             "\nJobs Returned: " << m_JobsReturned <<
140             "\nJobs Rescheduled: " << m_JobsRescheduled <<
141             "\nJobs Canceled: " << m_JobsCanceled <<
142             "\nJobs Lost: " << m_JobsLost << "\n";
143 
144     CMutexGuard guard(m_ActiveJobsMutex);
145     os << "Jobs Running: " << m_ActiveJobs.size() << "\n";
146     ITERATE(TActiveJobs, it, m_ActiveJobs) {
147         os << it->first->GetJobKey() << " \"" <<
148             NStr::PrintableString(it->first->GetJobInput()) <<
149             "\" -- running for " <<
150             (int) it->second.elasped_time.Elapsed() << " seconds.";
151         if (it->second.is_stuck)
152             os << "!!! LONG RUNNING JOB !!!";
153         os << "\n";
154     }
155 }
156 
CheckForInfiniteLoop()157 void CWNJobWatcher::CheckForInfiniteLoop()
158 {
159     if (m_InfiniteLoopTime > 0) {
160         size_t count = 0;
161         CMutexGuard guard(m_ActiveJobsMutex);
162         NON_CONST_ITERATE(TActiveJobs, it, m_ActiveJobs) {
163             if (!it->second.is_stuck) {
164                 if ( it->second.elasped_time.Elapsed() > m_InfiniteLoopTime) {
165                     const auto job_key = it->first->GetJobKey();
166                     ERR_POST_X(3, "An infinite loop is detected in job " << job_key);
167                     GetDiagContext().Extra().Print("job_key", job_key);
168 
169                     it->second.is_stuck = true;
170                     CGridGlobals::GetInstance().
171                         RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
172                 }
173             } else
174                 ++count;
175         }
176         if (count > 0 && count == m_ActiveJobs.size()) {
177             ERR_POST_X(4, "All jobs are in infinite loops. "
178                           "Server is shutting down.");
179             CGridGlobals::GetInstance().KillNode();
180         }
181     }
182 }
183 
x_KillNode(CGridWorkerNode worker)184 void CWNJobWatcher::x_KillNode(CGridWorkerNode worker)
185 {
186     CMutexGuard guard(m_ActiveJobsMutex);
187     NON_CONST_ITERATE(TActiveJobs, it, m_ActiveJobs) {
188         CNetScheduleJob& job = it->first->GetJob();
189         if (!it->second.is_stuck)
190             worker.GetNSExecutor().ReturnJob(job);
191         else {
192             job.error_msg = "Job execution time exceeded " +
193                     NStr::NumericToString(
194                             unsigned(it->second.elasped_time.Elapsed()));
195             job.error_msg += " seconds.";
196             worker.GetNSExecutor().PutFailure(job);
197         }
198     }
199     TPid cpid = CCurrentProcess::GetPid();
200     CProcess(cpid).Kill();
201 }
202 
203 
204 /////////////////////////////////////////////////////////////////////////////
205 //
CGridGlobals()206 CGridGlobals::CGridGlobals() :
207     m_ReuseJobObject(false),
208     m_ShutdownLevel(CNetScheduleAdmin::eNoShutdown),
209     m_ExitCode(0),
210     m_StartTime(GetFastLocalTime()),
211     m_Worker(NULL),
212     m_UDPPort(0)
213 {
214 }
215 
~CGridGlobals()216 CGridGlobals::~CGridGlobals()
217 {
218 }
219 
220 /* static */
GetInstance()221 CGridGlobals& CGridGlobals::GetInstance()
222 {
223     static CSafeStatic<CGridGlobals> global_instance;
224 
225     return global_instance.Get();
226 }
227 
228 
GetNewJobNumber()229 unsigned int CGridGlobals::GetNewJobNumber()
230 {
231     return (unsigned) m_JobsStarted.Add(1);
232 }
233 
GetJobWatcher()234 CWNJobWatcher& CGridGlobals::GetJobWatcher()
235 {
236     if (!m_JobWatcher.get())
237         m_JobWatcher.reset(new CWNJobWatcher);
238     return *m_JobWatcher;
239 }
240 
KillNode()241 void CGridGlobals::KillNode()
242 {
243     _ASSERT(m_Worker);
244     if (m_Worker)
245         GetJobWatcher().x_KillNode(m_Worker);
246 }
247 
InterruptUDPPortListening()248 void CGridGlobals::InterruptUDPPortListening()
249 {
250     if (m_UDPPort != 0)
251         CDatagramSocket().Send("INTERRUPT", sizeof("INTERRUPT"),
252                 "127.0.0.1", m_UDPPort);
253 }
254 
255 END_NCBI_SCOPE
256