1 #ifndef CONNECT_SERVICES__GRID_WORKER_IMPL__HPP
2 #define CONNECT_SERVICES__GRID_WORKER_IMPL__HPP
3 
4 /*  $Id: grid_worker_impl.hpp 607687 2020-05-06 16:16:59Z sadyrovr $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *   Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors:  Dmitry Kazimirov
30  *
31  * File Description:
32  *    Common NetSchedule Worker Node declarations
33  */
34 
35 
36 #include "wn_commit_thread.hpp"
37 #include "wn_cleanup.hpp"
38 #include "netschedule_api_impl.hpp"
39 
40 #include <connect/services/grid_rw_impl.hpp>
41 
42 #include <unordered_map>
43 
44 BEGIN_NCBI_SCOPE
45 
46 /////////////////////////////////////////////////////////////////////////////
47 //
48 
49 ///@internal
50 struct SWorkerNodeJobContextImpl : public CObject
51 {
52     SWorkerNodeJobContextImpl(SGridWorkerNodeImpl* worker_node);
53 
54     void ResetJobContext();
55 
MarkJobAsLostSWorkerNodeJobContextImpl56     void MarkJobAsLost()
57     {
58         m_JobCommitStatus = CWorkerNodeJobContext::eCS_JobIsLost;
59     }
60     void CheckIfJobIsLost();
61 
62     void x_PrintRequestStop();
63 
64     virtual void PutProgressMessage(const string& msg,
65         bool send_immediately, bool overwrite);
66     virtual CNetScheduleAdmin::EShutdownLevel GetShutdownLevel();
67     virtual void JobDelayExpiration(unsigned runtime_inc);
68     virtual void x_RunJob();
69 
GetTimeoutSWorkerNodeJobContextImpl70     const CDeadline GetTimeout() const { return m_Deadline; }
ResetTimeoutSWorkerNodeJobContextImpl71     void ResetTimeout(unsigned seconds) { m_Deadline = CDeadline(seconds, 0); }
72 
73     CNcbiIstream& GetIStream();
74     CNcbiOstream& GetOStream();
75 
76     SGridWorkerNodeImpl* m_WorkerNode;
77     CNetScheduleJob m_Job;
78     CWorkerNodeJobContext::ECommitStatus m_JobCommitStatus;
79     bool m_DisableRetries;
80     size_t m_InputBlobSize;
81     unsigned int m_JobNumber;
82     bool m_ExclusiveJob;
83 
84     CRef<CWorkerNodeCleanup> m_CleanupEventSource;
85 
86     CRef<CRequestContext> m_RequestContext;
87     CRequestRateControl m_StatusThrottler;
88     CRequestRateControl m_ProgressMsgThrottler;
89     CNetScheduleExecutor m_NetScheduleExecutor;
90     CNetCacheAPI m_NetCacheAPI;
91     SGridRead m_GridRead;
92     SGridWrite m_GridWrite;
93 
94     // Used for the job "pullback" mechanism.
95     unsigned m_JobGeneration;
96 
97     CDeadline m_CommitExpiration;
98     bool      m_FirstCommitAttempt;
99 
100 private:
101     CDeadline m_Deadline;
102 };
103 
104 class CJobRunRegistration;
105 
106 class CRunningJobLimit
107 {
108 public:
CRunningJobLimit()109     CRunningJobLimit() : m_MaxNumber(0) {}
110 
ResetJobCounter(unsigned max_number)111     void ResetJobCounter(unsigned max_number) {m_MaxNumber = max_number;}
112 
113     bool CountJob(const string& job_group,
114             CJobRunRegistration* job_run_registration);
115 
116 private:
117     friend class CJobRunRegistration;
118 
119     unsigned m_MaxNumber;
120 
121     CFastMutex m_Mutex;
122 
123     typedef map<string, unsigned> TJobCounter;
124     TJobCounter m_Counter;
125 };
126 
127 class CJobRunRegistration
128 {
129 public:
CJobRunRegistration()130     CJobRunRegistration() : m_RunRegistered(false) {}
131 
RegisterRun(CRunningJobLimit * job_counter,CRunningJobLimit::TJobCounter::iterator job_group_it)132     void RegisterRun(CRunningJobLimit* job_counter,
133             CRunningJobLimit::TJobCounter::iterator job_group_it)
134     {
135         m_JobCounter = job_counter;
136         m_JobGroupCounterIt = job_group_it;
137         m_RunRegistered = true;
138     }
139 
~CJobRunRegistration()140     ~CJobRunRegistration()
141     {
142         if (m_RunRegistered) {
143             CFastMutexGuard guard(m_JobCounter->m_Mutex);
144 
145             if (--m_JobGroupCounterIt->second == 0)
146                 m_JobCounter->m_Counter.erase(m_JobGroupCounterIt);
147         }
148     }
149 
150 private:
151     CRunningJobLimit* m_JobCounter;
152     CRunningJobLimit::TJobCounter::iterator m_JobGroupCounterIt;
153     bool m_RunRegistered;
154 };
155 
156 ///@internal
157 struct SGridWorkerNodeImpl : public CObject, IWorkerNodeInitContext
158 {
159     SGridWorkerNodeImpl(CNcbiApplicationAPI& app,
160             IWorkerNodeJobFactory* job_factory);
161 
162     void AddJobWatcher(IWorkerNodeJobWatcher& job_watcher,
163                           EOwnership owner = eNoOwnership);
164 
165     void Init();
166 
167     int Run(
168 #ifdef NCBI_OS_UNIX
169             ESwitch daemonize,
170 #endif
171             string procinfo_file_name);
172 
173     void x_WNCoreInit();
174     void x_StartWorkerThreads();
175     void x_StopWorkerThreads();
176     void x_ClearNode();
177     int x_WNCleanUp();
178 
GetQueueNameSGridWorkerNodeImpl179     const string& GetQueueName() const
180     {
181         return m_NetScheduleAPI.GetQueueName();
182     }
GetClientNameSGridWorkerNodeImpl183     const string& GetClientName() const
184     {
185         return m_NetScheduleAPI->m_Service->GetClientName();
186     }
GetServiceNameSGridWorkerNodeImpl187     const string& GetServiceName() const
188     {
189         return m_NetScheduleAPI->m_Service->m_ServiceName;
190     }
191 
GetAppNameSGridWorkerNodeImpl192     string GetAppName() const { return m_App.GetProgramDisplayName(); }
193 
194     bool EnterExclusiveMode();
195     void LeaveExclusiveMode();
IsExclusiveModeSGridWorkerNodeImpl196     bool IsExclusiveMode() const {return m_IsProcessingExclusiveJob;}
197     bool WaitForExclusiveJobToFinish();
198 
199     void SetJobPullbackTimer(unsigned seconds);
200     bool CheckForPullback(unsigned job_generation);
201 
202     int OfflineRun();
203 
204     // IWorkerNodeInitContext implementation
205     const IRegistry&               GetConfig()             const override;
206     const CArgs&                   GetArgs()               const override;
207     const CNcbiEnvironment&        GetEnvironment()        const override;
208     IWorkerNodeCleanupEventSource* GetCleanupEventSource() const override;
209     CNetScheduleAPI                GetNetScheduleAPI()     const override;
210     CNetCacheAPI                   GetNetCacheAPI()        const override;
211 
212     unique_ptr<IWorkerNodeJobFactory> m_JobProcessorFactory;
213 
214     CNetCacheAPI m_NetCacheAPI;
215     CNetScheduleAPI m_NetScheduleAPI;
216     CNetScheduleExecutor m_NSExecutor;
217     CStdPoolOfThreads* m_ThreadPool;
218 
219     unsigned int                 m_MaxThreads;
220     unsigned int                 m_NSTimeout;
221     mutable CFastMutex           m_JobProcessorMutex;
222     unsigned                     m_CommitJobInterval;
223     unsigned                     m_CheckStatusPeriod;
224     CSemaphore                   m_ExclusiveJobSemaphore;
225     bool                         m_IsProcessingExclusiveJob;
226     Uint8                        m_TotalMemoryLimit;
227     unsigned                     m_TotalTimeLimit;
228     time_t                       m_StartupTime;
229     unsigned                     m_QueueTimeout;
230 
231     typedef map<IWorkerNodeJobWatcher*,
232             AutoPtr<IWorkerNodeJobWatcher> > TJobWatchers;
233     CFastMutex m_JobWatcherMutex;
234     TJobWatchers m_Watchers;
235 
236     CRunningJobLimit m_JobsPerClientIP;
237     CRunningJobLimit m_JobsPerSessionID;
238 
239     CRef<CWorkerNodeCleanup> m_CleanupEventSource;
240 
241     IWorkerNodeJob* GetJobProcessor();
242 
243     void x_NotifyJobWatchers(const CWorkerNodeJobContext& job_context,
244                             IWorkerNodeJobWatcher::EEvent event);
245 
246     set<SSocketAddress> m_Masters;
247     set<unsigned int> m_AdminHosts;
248 
249     void* volatile m_SuspendResumeEvent;
250     bool m_TimelineIsSuspended;
251     // Support for the job "pullback" mechanism.
252     CFastMutex m_JobPullbackMutex;
253     unsigned m_CurrentJobGeneration;
254     unsigned m_DefaultPullbackTimeout;
255     CDeadline m_JobPullbackTime;
256 
257     bool x_AreMastersBusy() const;
258 
259     CRef<CJobCommitterThread> m_JobCommitterThread;
260     CRef<CWorkerNodeIdleThread>  m_IdleThread;
261 
262     unique_ptr<IGridWorkerNodeApp_Listener> m_Listener;
263 
264     CNcbiApplicationAPI& m_App;
265     CSynRegistry::TPtr m_SynRegistry;
266     CRef<IRegistry> m_Registry;
267     bool m_SingleThreadForced;
268     bool m_LogRequested;
269     bool m_ProgressLogRequested;
270     size_t m_QueueEmbeddedOutputSize;
271     unsigned m_ThreadPoolTimeout;
272 
273     /// Bookkeeping of jobs being executed (to prevent simultaneous runs of the same job)
274     struct SJobsInProgress
275     {
AddSGridWorkerNodeImpl::SJobsInProgress276         bool Add(const CNetScheduleJob& job)
277         {
278             TFastMutexGuard lock(m_Mutex);
279             auto it = m_Jobs.find(job.job_id);
280 
281             if (it == m_Jobs.end()) {
282                 return m_Jobs.emplace(job.job_id, job.auth_token).second;
283             }
284 
285             it->second = job.auth_token;
286             return false;
287         }
288 
UpdateSGridWorkerNodeImpl::SJobsInProgress289         void Update(CNetScheduleJob& job)
290         {
291             TFastMutexGuard lock(m_Mutex);
292             auto it = m_Jobs.find(job.job_id);
293 
294             _ASSERT(it != m_Jobs.end());
295             job.auth_token = it->second;
296         }
297 
RemoveSGridWorkerNodeImpl::SJobsInProgress298         void Remove(const CNetScheduleJob& job)
299         {
300             TFastMutexGuard lock(m_Mutex);
301             auto it = m_Jobs.find(job.job_id);
302 
303             _ASSERT(it != m_Jobs.end());
304             m_Jobs.erase(it);
305         }
306 
307     private:
308         CFastMutex m_Mutex;
309         unordered_map<string, string> m_Jobs;
310     };
311 
312     SJobsInProgress m_JobsInProgress;
313 };
314 
315 
316 ///@internal
317 class CWorkerNodeRequest : public CStdRequest
318 {
319 public:
CWorkerNodeRequest(SWorkerNodeJobContextImpl * context)320     CWorkerNodeRequest(SWorkerNodeJobContextImpl* context) :
321         m_JobContext(context)
322     {
323     }
324 
325     virtual void Process();
326 
327 private:
328     CWorkerNodeJobContext m_JobContext;
329 };
330 
331 
332 /////////////////////////////////////////////////////////////////////////////
333 //
334 
335 #define NO_EVENT ((void*) 0)
336 #define SUSPEND_EVENT ((void*) 1)
337 #define RESUME_EVENT ((void*) 2)
338 
339 /////////////////////////////////////////////////////////////////////////////
340 //
341 
342 bool g_IsRequestStartEventEnabled();
343 bool g_IsRequestStopEventEnabled();
344 
345 /////////////////////////////////////////////////////////////////////////////
346 //
347 /// @internal
348 class CMainLoopThread : public CThread
349 {
350 public:
CMainLoopThread(SGridWorkerNodeImpl * worker_node)351     CMainLoopThread(SGridWorkerNodeImpl* worker_node) :
352         m_WorkerNode(worker_node),
353         m_Impl(worker_node),
354         m_Timeline(m_Impl),
355         m_ThreadName(worker_node->GetAppName() + "_mn")
356     {
357     }
358 
359     virtual void* Main();
360 
361 private:
362     class CImpl : public CNetScheduleGetJob
363     {
364     public:
CImpl(SGridWorkerNodeImpl * worker_node)365         CImpl(SGridWorkerNodeImpl* worker_node) :
366             m_API(worker_node->m_NetScheduleAPI),
367             m_Timeout(worker_node->m_NSTimeout),
368             m_WorkerNode(worker_node)
369         {
370         }
371 
372         void Main();
373 
374         EState CheckState();
375         CNetServer ReadNotifications();
376         CNetServer WaitForNotifications(const CDeadline& deadline);
377         bool MoreJobs(const SEntry& entry);
378         bool CheckEntry(
379                 SEntry& entry,
380                 const string& prio_aff_list,
381                 bool any_affinity,
382                 CNetScheduleJob& job,
383                 CNetScheduleAPI::EJobStatus* job_status);
384         void ReturnJob(CNetScheduleJob& job);
385 
386         CNetScheduleAPI m_API;
387         const unsigned m_Timeout;
388 
389     private:
390         SGridWorkerNodeImpl* m_WorkerNode;
391 
392         CNetServer x_ProcessRequestJobNotification();
393     };
394 
395     bool x_GetNextJob(CNetScheduleJob& job, const CDeadline& deadline);
396 
397     SGridWorkerNodeImpl* m_WorkerNode;
398     CImpl m_Impl;
399     CNetScheduleGetJobImpl<CImpl> m_Timeline;
400     const string m_ThreadName;
401 };
402 
403 END_NCBI_SCOPE
404 
405 #endif /* CONNECT_SERVICES__GRID_WORKER_IMPL__HPP */
406