1 /** 2 * Orthanc - A Lightweight, RESTful DICOM Store 3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics 4 * Department, University Hospital of Liege, Belgium 5 * Copyright (C) 2017-2021 Osimis S.A., Belgium 6 * 7 * This program is free software: you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public License 9 * as published by the Free Software Foundation, either version 3 of 10 * the License, or (at your option) any later version. 11 * 12 * This program is distributed in the hope that it will be useful, but 13 * WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this program. If not, see 19 * <http://www.gnu.org/licenses/>. 20 **/ 21 22 23 #include "../PrecompiledHeaders.h" 24 #include "JobsEngine.h" 25 26 #include "../Logging.h" 27 #include "../OrthancException.h" 28 #include "../Toolbox.h" 29 30 31 namespace Orthanc 32 { IsRunning()33 bool JobsEngine::IsRunning() 34 { 35 boost::mutex::scoped_lock lock(stateMutex_); 36 return (state_ == State_Running); 37 } 38 39 ExecuteStep(JobsRegistry::RunningJob & running,size_t workerIndex)40 bool JobsEngine::ExecuteStep(JobsRegistry::RunningJob& running, 41 size_t workerIndex) 42 { 43 assert(running.IsValid()); 44 45 if (running.IsPauseScheduled()) 46 { 47 running.GetJob().Stop(JobStopReason_Paused); 48 running.MarkPause(); 49 return false; 50 } 51 52 if (running.IsCancelScheduled()) 53 { 54 running.GetJob().Stop(JobStopReason_Canceled); 55 running.MarkCanceled(); 56 return false; 57 } 58 59 JobStepResult result; 60 61 try 62 { 63 result = running.GetJob().Step(running.GetId()); 64 } 65 catch (OrthancException& e) 66 { 67 result = JobStepResult::Failure(e); 68 } 69 catch (boost::bad_lexical_cast&) 70 { 71 result = JobStepResult::Failure(ErrorCode_BadFileFormat, NULL); 72 } 73 catch (...) 74 { 75 result = JobStepResult::Failure(ErrorCode_InternalError, NULL); 76 } 77 78 switch (result.GetCode()) 79 { 80 case JobStepCode_Success: 81 running.GetJob().Stop(JobStopReason_Success); 82 running.UpdateStatus(ErrorCode_Success, ""); 83 running.MarkSuccess(); 84 return false; 85 86 case JobStepCode_Failure: 87 running.GetJob().Stop(JobStopReason_Failure); 88 running.UpdateStatus(result.GetFailureCode(), result.GetFailureDetails()); 89 running.MarkFailure(); 90 return false; 91 92 case JobStepCode_Retry: 93 running.GetJob().Stop(JobStopReason_Retry); 94 running.UpdateStatus(ErrorCode_Success, ""); 95 running.MarkRetry(result.GetRetryTimeout()); 96 return false; 97 98 case JobStepCode_Continue: 99 running.UpdateStatus(ErrorCode_Success, ""); 100 return true; 101 102 default: 103 throw OrthancException(ErrorCode_InternalError); 104 } 105 } 106 107 RetryHandler(JobsEngine * engine)108 void JobsEngine::RetryHandler(JobsEngine* engine) 109 { 110 assert(engine != NULL); 111 112 while (engine->IsRunning()) 113 { 114 boost::this_thread::sleep(boost::posix_time::milliseconds(engine->threadSleep_)); 115 engine->GetRegistry().ScheduleRetries(); 116 } 117 } 118 119 Worker(JobsEngine * engine,size_t workerIndex)120 void JobsEngine::Worker(JobsEngine* engine, 121 size_t workerIndex) 122 { 123 assert(engine != NULL); 124 125 CLOG(INFO, JOBS) << "Worker thread " << workerIndex << " has started"; 126 127 while (engine->IsRunning()) 128 { 129 JobsRegistry::RunningJob running(engine->GetRegistry(), engine->threadSleep_); 130 131 if (running.IsValid()) 132 { 133 CLOG(INFO, JOBS) << "Executing job with priority " << running.GetPriority() 134 << " in worker thread " << workerIndex << ": " << running.GetId(); 135 136 while (engine->IsRunning()) 137 { 138 if (!engine->ExecuteStep(running, workerIndex)) 139 { 140 break; 141 } 142 } 143 } 144 } 145 } 146 147 JobsEngine(size_t maxCompletedJobs)148 JobsEngine::JobsEngine(size_t maxCompletedJobs) : 149 state_(State_Setup), 150 registry_(new JobsRegistry(maxCompletedJobs)), 151 threadSleep_(200), 152 workers_(1) 153 { 154 } 155 156 ~JobsEngine()157 JobsEngine::~JobsEngine() 158 { 159 if (state_ != State_Setup && 160 state_ != State_Done) 161 { 162 CLOG(ERROR, JOBS) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!"; 163 Stop(); 164 } 165 } 166 167 GetRegistry()168 JobsRegistry& JobsEngine::GetRegistry() 169 { 170 if (registry_.get() == NULL) 171 { 172 throw OrthancException(ErrorCode_InternalError); 173 } 174 175 return *registry_; 176 } 177 178 LoadRegistryFromJson(IJobUnserializer & unserializer,const Json::Value & serialized)179 void JobsEngine::LoadRegistryFromJson(IJobUnserializer& unserializer, 180 const Json::Value& serialized) 181 { 182 boost::mutex::scoped_lock lock(stateMutex_); 183 184 if (state_ != State_Setup) 185 { 186 // Can only be invoked before calling "Start()" 187 throw OrthancException(ErrorCode_BadSequenceOfCalls); 188 } 189 190 assert(registry_.get() != NULL); 191 const size_t maxCompletedJobs = registry_->GetMaxCompletedJobs(); 192 registry_.reset(new JobsRegistry(unserializer, serialized, maxCompletedJobs)); 193 } 194 195 LoadRegistryFromString(IJobUnserializer & unserializer,const std::string & serialized)196 void JobsEngine::LoadRegistryFromString(IJobUnserializer& unserializer, 197 const std::string& serialized) 198 { 199 Json::Value value; 200 if (Toolbox::ReadJson(value, serialized)) 201 { 202 LoadRegistryFromJson(unserializer, value); 203 } 204 else 205 { 206 throw OrthancException(ErrorCode_BadFileFormat); 207 } 208 } 209 210 SetWorkersCount(size_t count)211 void JobsEngine::SetWorkersCount(size_t count) 212 { 213 boost::mutex::scoped_lock lock(stateMutex_); 214 215 if (state_ != State_Setup) 216 { 217 // Can only be invoked before calling "Start()" 218 throw OrthancException(ErrorCode_BadSequenceOfCalls); 219 } 220 221 workers_.resize(count); 222 } 223 224 SetThreadSleep(unsigned int sleep)225 void JobsEngine::SetThreadSleep(unsigned int sleep) 226 { 227 boost::mutex::scoped_lock lock(stateMutex_); 228 229 if (state_ != State_Setup) 230 { 231 // Can only be invoked before calling "Start()" 232 throw OrthancException(ErrorCode_BadSequenceOfCalls); 233 } 234 235 threadSleep_ = sleep; 236 } 237 238 Start()239 void JobsEngine::Start() 240 { 241 boost::mutex::scoped_lock lock(stateMutex_); 242 243 if (state_ != State_Setup) 244 { 245 throw OrthancException(ErrorCode_BadSequenceOfCalls); 246 } 247 248 retryHandler_ = boost::thread(RetryHandler, this); 249 250 if (workers_.size() == 0) 251 { 252 // Use all the available CPUs 253 size_t n = boost::thread::hardware_concurrency(); 254 255 if (n == 0) 256 { 257 n = 1; 258 } 259 260 workers_.resize(n); 261 } 262 263 for (size_t i = 0; i < workers_.size(); i++) 264 { 265 assert(workers_[i] == NULL); 266 workers_[i] = new boost::thread(Worker, this, i); 267 } 268 269 state_ = State_Running; 270 271 CLOG(WARNING, JOBS) << "The jobs engine has started with " << workers_.size() << " threads"; 272 } 273 274 Stop()275 void JobsEngine::Stop() 276 { 277 { 278 boost::mutex::scoped_lock lock(stateMutex_); 279 280 if (state_ != State_Running) 281 { 282 return; 283 } 284 285 state_ = State_Stopping; 286 } 287 288 CLOG(INFO, JOBS) << "Stopping the jobs engine"; 289 290 if (retryHandler_.joinable()) 291 { 292 retryHandler_.join(); 293 } 294 295 for (size_t i = 0; i < workers_.size(); i++) 296 { 297 assert(workers_[i] != NULL); 298 299 if (workers_[i]->joinable()) 300 { 301 workers_[i]->join(); 302 } 303 304 delete workers_[i]; 305 } 306 307 { 308 boost::mutex::scoped_lock lock(stateMutex_); 309 state_ = State_Done; 310 } 311 312 CLOG(WARNING, JOBS) << "The jobs engine has stopped"; 313 } 314 } 315