1 #ifndef CONNECT_SERVICES__GRID_WORKER_IMPL__HPP 2 #define CONNECT_SERVICES__GRID_WORKER_IMPL__HPP 3 4 /* $Id: grid_worker_impl.hpp 607687 2020-05-06 16:16:59Z sadyrovr $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Authors: Dmitry Kazimirov 30 * 31 * File Description: 32 * Common NetSchedule Worker Node declarations 33 */ 34 35 36 #include "wn_commit_thread.hpp" 37 #include "wn_cleanup.hpp" 38 #include "netschedule_api_impl.hpp" 39 40 #include <connect/services/grid_rw_impl.hpp> 41 42 #include <unordered_map> 43 44 BEGIN_NCBI_SCOPE 45 46 ///////////////////////////////////////////////////////////////////////////// 47 // 48 49 ///@internal 50 struct SWorkerNodeJobContextImpl : public CObject 51 { 52 SWorkerNodeJobContextImpl(SGridWorkerNodeImpl* worker_node); 53 54 void ResetJobContext(); 55 MarkJobAsLostSWorkerNodeJobContextImpl56 void MarkJobAsLost() 57 { 58 m_JobCommitStatus = CWorkerNodeJobContext::eCS_JobIsLost; 59 } 60 void CheckIfJobIsLost(); 61 62 void x_PrintRequestStop(); 63 64 virtual void PutProgressMessage(const string& msg, 65 bool send_immediately, bool overwrite); 66 virtual CNetScheduleAdmin::EShutdownLevel GetShutdownLevel(); 67 virtual void JobDelayExpiration(unsigned runtime_inc); 68 virtual void x_RunJob(); 69 GetTimeoutSWorkerNodeJobContextImpl70 const CDeadline GetTimeout() const { return m_Deadline; } ResetTimeoutSWorkerNodeJobContextImpl71 void ResetTimeout(unsigned seconds) { m_Deadline = CDeadline(seconds, 0); } 72 73 CNcbiIstream& GetIStream(); 74 CNcbiOstream& GetOStream(); 75 76 SGridWorkerNodeImpl* m_WorkerNode; 77 CNetScheduleJob m_Job; 78 CWorkerNodeJobContext::ECommitStatus m_JobCommitStatus; 79 bool m_DisableRetries; 80 size_t m_InputBlobSize; 81 unsigned int m_JobNumber; 82 bool m_ExclusiveJob; 83 84 CRef<CWorkerNodeCleanup> m_CleanupEventSource; 85 86 CRef<CRequestContext> m_RequestContext; 87 CRequestRateControl m_StatusThrottler; 88 CRequestRateControl m_ProgressMsgThrottler; 89 CNetScheduleExecutor m_NetScheduleExecutor; 90 CNetCacheAPI m_NetCacheAPI; 91 SGridRead m_GridRead; 92 SGridWrite m_GridWrite; 93 94 // Used for the job "pullback" mechanism. 95 unsigned m_JobGeneration; 96 97 CDeadline m_CommitExpiration; 98 bool m_FirstCommitAttempt; 99 100 private: 101 CDeadline m_Deadline; 102 }; 103 104 class CJobRunRegistration; 105 106 class CRunningJobLimit 107 { 108 public: CRunningJobLimit()109 CRunningJobLimit() : m_MaxNumber(0) {} 110 ResetJobCounter(unsigned max_number)111 void ResetJobCounter(unsigned max_number) {m_MaxNumber = max_number;} 112 113 bool CountJob(const string& job_group, 114 CJobRunRegistration* job_run_registration); 115 116 private: 117 friend class CJobRunRegistration; 118 119 unsigned m_MaxNumber; 120 121 CFastMutex m_Mutex; 122 123 typedef map<string, unsigned> TJobCounter; 124 TJobCounter m_Counter; 125 }; 126 127 class CJobRunRegistration 128 { 129 public: CJobRunRegistration()130 CJobRunRegistration() : m_RunRegistered(false) {} 131 RegisterRun(CRunningJobLimit * job_counter,CRunningJobLimit::TJobCounter::iterator job_group_it)132 void RegisterRun(CRunningJobLimit* job_counter, 133 CRunningJobLimit::TJobCounter::iterator job_group_it) 134 { 135 m_JobCounter = job_counter; 136 m_JobGroupCounterIt = job_group_it; 137 m_RunRegistered = true; 138 } 139 ~CJobRunRegistration()140 ~CJobRunRegistration() 141 { 142 if (m_RunRegistered) { 143 CFastMutexGuard guard(m_JobCounter->m_Mutex); 144 145 if (--m_JobGroupCounterIt->second == 0) 146 m_JobCounter->m_Counter.erase(m_JobGroupCounterIt); 147 } 148 } 149 150 private: 151 CRunningJobLimit* m_JobCounter; 152 CRunningJobLimit::TJobCounter::iterator m_JobGroupCounterIt; 153 bool m_RunRegistered; 154 }; 155 156 ///@internal 157 struct SGridWorkerNodeImpl : public CObject, IWorkerNodeInitContext 158 { 159 SGridWorkerNodeImpl(CNcbiApplicationAPI& app, 160 IWorkerNodeJobFactory* job_factory); 161 162 void AddJobWatcher(IWorkerNodeJobWatcher& job_watcher, 163 EOwnership owner = eNoOwnership); 164 165 void Init(); 166 167 int Run( 168 #ifdef NCBI_OS_UNIX 169 ESwitch daemonize, 170 #endif 171 string procinfo_file_name); 172 173 void x_WNCoreInit(); 174 void x_StartWorkerThreads(); 175 void x_StopWorkerThreads(); 176 void x_ClearNode(); 177 int x_WNCleanUp(); 178 GetQueueNameSGridWorkerNodeImpl179 const string& GetQueueName() const 180 { 181 return m_NetScheduleAPI.GetQueueName(); 182 } GetClientNameSGridWorkerNodeImpl183 const string& GetClientName() const 184 { 185 return m_NetScheduleAPI->m_Service->GetClientName(); 186 } GetServiceNameSGridWorkerNodeImpl187 const string& GetServiceName() const 188 { 189 return m_NetScheduleAPI->m_Service->m_ServiceName; 190 } 191 GetAppNameSGridWorkerNodeImpl192 string GetAppName() const { return m_App.GetProgramDisplayName(); } 193 194 bool EnterExclusiveMode(); 195 void LeaveExclusiveMode(); IsExclusiveModeSGridWorkerNodeImpl196 bool IsExclusiveMode() const {return m_IsProcessingExclusiveJob;} 197 bool WaitForExclusiveJobToFinish(); 198 199 void SetJobPullbackTimer(unsigned seconds); 200 bool CheckForPullback(unsigned job_generation); 201 202 int OfflineRun(); 203 204 // IWorkerNodeInitContext implementation 205 const IRegistry& GetConfig() const override; 206 const CArgs& GetArgs() const override; 207 const CNcbiEnvironment& GetEnvironment() const override; 208 IWorkerNodeCleanupEventSource* GetCleanupEventSource() const override; 209 CNetScheduleAPI GetNetScheduleAPI() const override; 210 CNetCacheAPI GetNetCacheAPI() const override; 211 212 unique_ptr<IWorkerNodeJobFactory> m_JobProcessorFactory; 213 214 CNetCacheAPI m_NetCacheAPI; 215 CNetScheduleAPI m_NetScheduleAPI; 216 CNetScheduleExecutor m_NSExecutor; 217 CStdPoolOfThreads* m_ThreadPool; 218 219 unsigned int m_MaxThreads; 220 unsigned int m_NSTimeout; 221 mutable CFastMutex m_JobProcessorMutex; 222 unsigned m_CommitJobInterval; 223 unsigned m_CheckStatusPeriod; 224 CSemaphore m_ExclusiveJobSemaphore; 225 bool m_IsProcessingExclusiveJob; 226 Uint8 m_TotalMemoryLimit; 227 unsigned m_TotalTimeLimit; 228 time_t m_StartupTime; 229 unsigned m_QueueTimeout; 230 231 typedef map<IWorkerNodeJobWatcher*, 232 AutoPtr<IWorkerNodeJobWatcher> > TJobWatchers; 233 CFastMutex m_JobWatcherMutex; 234 TJobWatchers m_Watchers; 235 236 CRunningJobLimit m_JobsPerClientIP; 237 CRunningJobLimit m_JobsPerSessionID; 238 239 CRef<CWorkerNodeCleanup> m_CleanupEventSource; 240 241 IWorkerNodeJob* GetJobProcessor(); 242 243 void x_NotifyJobWatchers(const CWorkerNodeJobContext& job_context, 244 IWorkerNodeJobWatcher::EEvent event); 245 246 set<SSocketAddress> m_Masters; 247 set<unsigned int> m_AdminHosts; 248 249 void* volatile m_SuspendResumeEvent; 250 bool m_TimelineIsSuspended; 251 // Support for the job "pullback" mechanism. 252 CFastMutex m_JobPullbackMutex; 253 unsigned m_CurrentJobGeneration; 254 unsigned m_DefaultPullbackTimeout; 255 CDeadline m_JobPullbackTime; 256 257 bool x_AreMastersBusy() const; 258 259 CRef<CJobCommitterThread> m_JobCommitterThread; 260 CRef<CWorkerNodeIdleThread> m_IdleThread; 261 262 unique_ptr<IGridWorkerNodeApp_Listener> m_Listener; 263 264 CNcbiApplicationAPI& m_App; 265 CSynRegistry::TPtr m_SynRegistry; 266 CRef<IRegistry> m_Registry; 267 bool m_SingleThreadForced; 268 bool m_LogRequested; 269 bool m_ProgressLogRequested; 270 size_t m_QueueEmbeddedOutputSize; 271 unsigned m_ThreadPoolTimeout; 272 273 /// Bookkeeping of jobs being executed (to prevent simultaneous runs of the same job) 274 struct SJobsInProgress 275 { AddSGridWorkerNodeImpl::SJobsInProgress276 bool Add(const CNetScheduleJob& job) 277 { 278 TFastMutexGuard lock(m_Mutex); 279 auto it = m_Jobs.find(job.job_id); 280 281 if (it == m_Jobs.end()) { 282 return m_Jobs.emplace(job.job_id, job.auth_token).second; 283 } 284 285 it->second = job.auth_token; 286 return false; 287 } 288 UpdateSGridWorkerNodeImpl::SJobsInProgress289 void Update(CNetScheduleJob& job) 290 { 291 TFastMutexGuard lock(m_Mutex); 292 auto it = m_Jobs.find(job.job_id); 293 294 _ASSERT(it != m_Jobs.end()); 295 job.auth_token = it->second; 296 } 297 RemoveSGridWorkerNodeImpl::SJobsInProgress298 void Remove(const CNetScheduleJob& job) 299 { 300 TFastMutexGuard lock(m_Mutex); 301 auto it = m_Jobs.find(job.job_id); 302 303 _ASSERT(it != m_Jobs.end()); 304 m_Jobs.erase(it); 305 } 306 307 private: 308 CFastMutex m_Mutex; 309 unordered_map<string, string> m_Jobs; 310 }; 311 312 SJobsInProgress m_JobsInProgress; 313 }; 314 315 316 ///@internal 317 class CWorkerNodeRequest : public CStdRequest 318 { 319 public: CWorkerNodeRequest(SWorkerNodeJobContextImpl * context)320 CWorkerNodeRequest(SWorkerNodeJobContextImpl* context) : 321 m_JobContext(context) 322 { 323 } 324 325 virtual void Process(); 326 327 private: 328 CWorkerNodeJobContext m_JobContext; 329 }; 330 331 332 ///////////////////////////////////////////////////////////////////////////// 333 // 334 335 #define NO_EVENT ((void*) 0) 336 #define SUSPEND_EVENT ((void*) 1) 337 #define RESUME_EVENT ((void*) 2) 338 339 ///////////////////////////////////////////////////////////////////////////// 340 // 341 342 bool g_IsRequestStartEventEnabled(); 343 bool g_IsRequestStopEventEnabled(); 344 345 ///////////////////////////////////////////////////////////////////////////// 346 // 347 /// @internal 348 class CMainLoopThread : public CThread 349 { 350 public: CMainLoopThread(SGridWorkerNodeImpl * worker_node)351 CMainLoopThread(SGridWorkerNodeImpl* worker_node) : 352 m_WorkerNode(worker_node), 353 m_Impl(worker_node), 354 m_Timeline(m_Impl), 355 m_ThreadName(worker_node->GetAppName() + "_mn") 356 { 357 } 358 359 virtual void* Main(); 360 361 private: 362 class CImpl : public CNetScheduleGetJob 363 { 364 public: CImpl(SGridWorkerNodeImpl * worker_node)365 CImpl(SGridWorkerNodeImpl* worker_node) : 366 m_API(worker_node->m_NetScheduleAPI), 367 m_Timeout(worker_node->m_NSTimeout), 368 m_WorkerNode(worker_node) 369 { 370 } 371 372 void Main(); 373 374 EState CheckState(); 375 CNetServer ReadNotifications(); 376 CNetServer WaitForNotifications(const CDeadline& deadline); 377 bool MoreJobs(const SEntry& entry); 378 bool CheckEntry( 379 SEntry& entry, 380 const string& prio_aff_list, 381 bool any_affinity, 382 CNetScheduleJob& job, 383 CNetScheduleAPI::EJobStatus* job_status); 384 void ReturnJob(CNetScheduleJob& job); 385 386 CNetScheduleAPI m_API; 387 const unsigned m_Timeout; 388 389 private: 390 SGridWorkerNodeImpl* m_WorkerNode; 391 392 CNetServer x_ProcessRequestJobNotification(); 393 }; 394 395 bool x_GetNextJob(CNetScheduleJob& job, const CDeadline& deadline); 396 397 SGridWorkerNodeImpl* m_WorkerNode; 398 CImpl m_Impl; 399 CNetScheduleGetJobImpl<CImpl> m_Timeline; 400 const string m_ThreadName; 401 }; 402 403 END_NCBI_SCOPE 404 405 #endif /* CONNECT_SERVICES__GRID_WORKER_IMPL__HPP */ 406