1 /**
2  * Orthanc - A Lightweight, RESTful DICOM Store
3  * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4  * Department, University Hospital of Liege, Belgium
5  * Copyright (C) 2017-2021 Osimis S.A., Belgium
6  *
7  * This program is free software: you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation, either version 3 of
10  * the License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this program. If not, see
19  * <http://www.gnu.org/licenses/>.
20  **/
21 
22 
23 #include "../PrecompiledHeaders.h"
24 #include "JobsEngine.h"
25 
26 #include "../Logging.h"
27 #include "../OrthancException.h"
28 #include "../Toolbox.h"
29 
30 
31 namespace Orthanc
32 {
IsRunning()33   bool JobsEngine::IsRunning()
34   {
35     boost::mutex::scoped_lock lock(stateMutex_);
36     return (state_ == State_Running);
37   }
38 
39 
ExecuteStep(JobsRegistry::RunningJob & running,size_t workerIndex)40   bool JobsEngine::ExecuteStep(JobsRegistry::RunningJob& running,
41                                size_t workerIndex)
42   {
43     assert(running.IsValid());
44 
45     if (running.IsPauseScheduled())
46     {
47       running.GetJob().Stop(JobStopReason_Paused);
48       running.MarkPause();
49       return false;
50     }
51 
52     if (running.IsCancelScheduled())
53     {
54       running.GetJob().Stop(JobStopReason_Canceled);
55       running.MarkCanceled();
56       return false;
57     }
58 
59     JobStepResult result;
60 
61     try
62     {
63       result = running.GetJob().Step(running.GetId());
64     }
65     catch (OrthancException& e)
66     {
67       result = JobStepResult::Failure(e);
68     }
69     catch (boost::bad_lexical_cast&)
70     {
71       result = JobStepResult::Failure(ErrorCode_BadFileFormat, NULL);
72     }
73     catch (...)
74     {
75       result = JobStepResult::Failure(ErrorCode_InternalError, NULL);
76     }
77 
78     switch (result.GetCode())
79     {
80       case JobStepCode_Success:
81         running.GetJob().Stop(JobStopReason_Success);
82         running.UpdateStatus(ErrorCode_Success, "");
83         running.MarkSuccess();
84         return false;
85 
86       case JobStepCode_Failure:
87         running.GetJob().Stop(JobStopReason_Failure);
88         running.UpdateStatus(result.GetFailureCode(), result.GetFailureDetails());
89         running.MarkFailure();
90         return false;
91 
92       case JobStepCode_Retry:
93         running.GetJob().Stop(JobStopReason_Retry);
94         running.UpdateStatus(ErrorCode_Success, "");
95         running.MarkRetry(result.GetRetryTimeout());
96         return false;
97 
98       case JobStepCode_Continue:
99         running.UpdateStatus(ErrorCode_Success, "");
100         return true;
101 
102       default:
103         throw OrthancException(ErrorCode_InternalError);
104     }
105   }
106 
107 
RetryHandler(JobsEngine * engine)108   void JobsEngine::RetryHandler(JobsEngine* engine)
109   {
110     assert(engine != NULL);
111 
112     while (engine->IsRunning())
113     {
114       boost::this_thread::sleep(boost::posix_time::milliseconds(engine->threadSleep_));
115       engine->GetRegistry().ScheduleRetries();
116     }
117   }
118 
119 
Worker(JobsEngine * engine,size_t workerIndex)120   void JobsEngine::Worker(JobsEngine* engine,
121                           size_t workerIndex)
122   {
123     assert(engine != NULL);
124 
125     CLOG(INFO, JOBS) << "Worker thread " << workerIndex << " has started";
126 
127     while (engine->IsRunning())
128     {
129       JobsRegistry::RunningJob running(engine->GetRegistry(), engine->threadSleep_);
130 
131       if (running.IsValid())
132       {
133         CLOG(INFO, JOBS) << "Executing job with priority " << running.GetPriority()
134                          << " in worker thread " << workerIndex << ": " << running.GetId();
135 
136         while (engine->IsRunning())
137         {
138           if (!engine->ExecuteStep(running, workerIndex))
139           {
140             break;
141           }
142         }
143       }
144     }
145   }
146 
147 
JobsEngine(size_t maxCompletedJobs)148   JobsEngine::JobsEngine(size_t maxCompletedJobs) :
149     state_(State_Setup),
150     registry_(new JobsRegistry(maxCompletedJobs)),
151     threadSleep_(200),
152     workers_(1)
153   {
154   }
155 
156 
~JobsEngine()157   JobsEngine::~JobsEngine()
158   {
159     if (state_ != State_Setup &&
160         state_ != State_Done)
161     {
162       CLOG(ERROR, JOBS) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!";
163       Stop();
164     }
165   }
166 
167 
GetRegistry()168   JobsRegistry& JobsEngine::GetRegistry()
169   {
170     if (registry_.get() == NULL)
171     {
172       throw OrthancException(ErrorCode_InternalError);
173     }
174 
175     return *registry_;
176   }
177 
178 
LoadRegistryFromJson(IJobUnserializer & unserializer,const Json::Value & serialized)179   void JobsEngine::LoadRegistryFromJson(IJobUnserializer& unserializer,
180                                         const Json::Value& serialized)
181   {
182     boost::mutex::scoped_lock lock(stateMutex_);
183 
184     if (state_ != State_Setup)
185     {
186       // Can only be invoked before calling "Start()"
187       throw OrthancException(ErrorCode_BadSequenceOfCalls);
188     }
189 
190     assert(registry_.get() != NULL);
191     const size_t maxCompletedJobs = registry_->GetMaxCompletedJobs();
192     registry_.reset(new JobsRegistry(unserializer, serialized, maxCompletedJobs));
193   }
194 
195 
LoadRegistryFromString(IJobUnserializer & unserializer,const std::string & serialized)196   void JobsEngine::LoadRegistryFromString(IJobUnserializer& unserializer,
197                                           const std::string& serialized)
198   {
199     Json::Value value;
200     if (Toolbox::ReadJson(value, serialized))
201     {
202       LoadRegistryFromJson(unserializer, value);
203     }
204     else
205     {
206       throw OrthancException(ErrorCode_BadFileFormat);
207     }
208   }
209 
210 
SetWorkersCount(size_t count)211   void JobsEngine::SetWorkersCount(size_t count)
212   {
213     boost::mutex::scoped_lock lock(stateMutex_);
214 
215     if (state_ != State_Setup)
216     {
217       // Can only be invoked before calling "Start()"
218       throw OrthancException(ErrorCode_BadSequenceOfCalls);
219     }
220 
221     workers_.resize(count);
222   }
223 
224 
SetThreadSleep(unsigned int sleep)225   void JobsEngine::SetThreadSleep(unsigned int sleep)
226   {
227     boost::mutex::scoped_lock lock(stateMutex_);
228 
229     if (state_ != State_Setup)
230     {
231       // Can only be invoked before calling "Start()"
232       throw OrthancException(ErrorCode_BadSequenceOfCalls);
233     }
234 
235     threadSleep_ = sleep;
236   }
237 
238 
Start()239   void JobsEngine::Start()
240   {
241     boost::mutex::scoped_lock lock(stateMutex_);
242 
243     if (state_ != State_Setup)
244     {
245       throw OrthancException(ErrorCode_BadSequenceOfCalls);
246     }
247 
248     retryHandler_ = boost::thread(RetryHandler, this);
249 
250     if (workers_.size() == 0)
251     {
252       // Use all the available CPUs
253       size_t n = boost::thread::hardware_concurrency();
254 
255       if (n == 0)
256       {
257         n = 1;
258       }
259 
260       workers_.resize(n);
261     }
262 
263     for (size_t i = 0; i < workers_.size(); i++)
264     {
265       assert(workers_[i] == NULL);
266       workers_[i] = new boost::thread(Worker, this, i);
267     }
268 
269     state_ = State_Running;
270 
271     CLOG(WARNING, JOBS) << "The jobs engine has started with " << workers_.size() << " threads";
272   }
273 
274 
Stop()275   void JobsEngine::Stop()
276   {
277     {
278       boost::mutex::scoped_lock lock(stateMutex_);
279 
280       if (state_ != State_Running)
281       {
282         return;
283       }
284 
285       state_ = State_Stopping;
286     }
287 
288     CLOG(INFO, JOBS) << "Stopping the jobs engine";
289 
290     if (retryHandler_.joinable())
291     {
292       retryHandler_.join();
293     }
294 
295     for (size_t i = 0; i < workers_.size(); i++)
296     {
297       assert(workers_[i] != NULL);
298 
299       if (workers_[i]->joinable())
300       {
301         workers_[i]->join();
302       }
303 
304       delete workers_[i];
305     }
306 
307     {
308       boost::mutex::scoped_lock lock(stateMutex_);
309       state_ = State_Done;
310     }
311 
312     CLOG(WARNING, JOBS) << "The jobs engine has stopped";
313   }
314 }
315