1 /** 2 * Orthanc - A Lightweight, RESTful DICOM Store 3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics 4 * Department, University Hospital of Liege, Belgium 5 * Copyright (C) 2017-2021 Osimis S.A., Belgium 6 * 7 * This program is free software: you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public License 9 * as published by the Free Software Foundation, either version 3 of 10 * the License, or (at your option) any later version. 11 * 12 * This program is distributed in the hope that it will be useful, but 13 * WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this program. If not, see 19 * <http://www.gnu.org/licenses/>. 20 **/ 21 22 23 #pragma once 24 25 #if !defined(ORTHANC_SANDBOXED) 26 # error The macro ORTHANC_SANDBOXED must be defined 27 #endif 28 29 #if ORTHANC_SANDBOXED == 1 30 # error The job engine cannot be used in sandboxed environments 31 #endif 32 33 #include "JobInfo.h" 34 #include "IJobUnserializer.h" 35 36 #include <list> 37 #include <set> 38 #include <queue> 39 #include <boost/thread/mutex.hpp> 40 #include <boost/thread/condition_variable.hpp> 41 42 namespace Orthanc 43 { 44 // This class handles the state machine of the jobs engine 45 class ORTHANC_PUBLIC JobsRegistry : public boost::noncopyable 46 { 47 public: 48 class ORTHANC_PUBLIC IObserver : public boost::noncopyable 49 { 50 public: ~IObserver()51 virtual ~IObserver() 52 { 53 } 54 55 virtual void SignalJobSubmitted(const std::string& jobId) = 0; 56 57 virtual void SignalJobSuccess(const std::string& jobId) = 0; 58 59 virtual void SignalJobFailure(const std::string& jobId) = 0; 60 }; 61 62 private: 63 enum CompletedReason 64 { 65 CompletedReason_Success, 66 CompletedReason_Failure, 67 CompletedReason_Canceled 68 }; 69 70 class JobHandler; 71 72 struct PriorityComparator 73 { 74 bool operator() (JobHandler* const& a, 75 JobHandler* const& b) const; 76 }; 77 78 typedef std::map<std::string, JobHandler*> JobsIndex; 79 typedef std::list<JobHandler*> CompletedJobs; 80 typedef std::set<JobHandler*> RetryJobs; 81 typedef std::priority_queue<JobHandler*, 82 std::vector<JobHandler*>, // Could be a "std::deque" 83 PriorityComparator> PendingJobs; 84 85 boost::mutex mutex_; 86 JobsIndex jobsIndex_; 87 PendingJobs pendingJobs_; 88 CompletedJobs completedJobs_; 89 RetryJobs retryJobs_; 90 91 boost::condition_variable pendingJobAvailable_; 92 boost::condition_variable someJobComplete_; 93 size_t maxCompletedJobs_; 94 95 IObserver* observer_; 96 97 98 #ifndef NDEBUG 99 bool IsPendingJob(const JobHandler& job) const; 100 101 bool IsCompletedJob(JobHandler& job) const; 102 103 bool IsRetryJob(JobHandler& job) const; 104 #endif 105 106 void CheckInvariants() const; 107 108 void ForgetOldCompletedJobs(); 109 110 void SetCompletedJob(JobHandler& job, 111 bool success); 112 113 void MarkRunningAsCompleted(JobHandler& job, 114 CompletedReason reason); 115 116 void MarkRunningAsRetry(JobHandler& job, 117 unsigned int timeout); 118 119 void MarkRunningAsPaused(JobHandler& job); 120 121 bool GetStateInternal(JobState& state, 122 const std::string& id); 123 124 void RemovePendingJob(const std::string& id); 125 126 void RemoveRetryJob(JobHandler* handler); 127 128 void SubmitInternal(std::string& id, 129 JobHandler* handler); 130 131 public: 132 explicit JobsRegistry(size_t maxCompletedJobs); 133 134 JobsRegistry(IJobUnserializer& unserializer, 135 const Json::Value& s, 136 size_t maxCompletedJobs); 137 138 ~JobsRegistry(); 139 140 void SetMaxCompletedJobs(size_t i); 141 142 size_t GetMaxCompletedJobs(); 143 144 void ListJobs(std::set<std::string>& target); 145 146 bool GetJobInfo(JobInfo& target, 147 const std::string& id); 148 149 bool GetJobOutput(std::string& output, 150 MimeType& mime, 151 const std::string& job, 152 const std::string& key); 153 154 void Serialize(Json::Value& target); 155 156 void Submit(std::string& id, 157 IJob* job, // Takes ownership 158 int priority); 159 160 void Submit(IJob* job, // Takes ownership 161 int priority); 162 163 void SubmitAndWait(Json::Value& successContent, 164 IJob* job, // Takes ownership 165 int priority); 166 167 bool SetPriority(const std::string& id, 168 int priority); 169 170 bool Pause(const std::string& id); 171 172 bool Resume(const std::string& id); 173 174 bool Resubmit(const std::string& id); 175 176 bool Cancel(const std::string& id); 177 178 void ScheduleRetries(); 179 180 bool GetState(JobState& state, 181 const std::string& id); 182 183 void SetObserver(IObserver& observer); 184 185 void ResetObserver(); 186 187 void GetStatistics(unsigned int& pending, 188 unsigned int& running, 189 unsigned int& success, 190 unsigned int& errors); 191 192 class ORTHANC_PUBLIC RunningJob : public boost::noncopyable 193 { 194 private: 195 JobsRegistry& registry_; 196 JobHandler* handler_; // Can only be accessed if the 197 // registry mutex is locked! 198 IJob* job_; // Will by design be in mutual exclusion, 199 // because only one RunningJob can be 200 // executed at a time on a JobHandler 201 202 std::string id_; 203 int priority_; 204 JobState targetState_; 205 unsigned int targetRetryTimeout_; 206 bool canceled_; 207 208 public: 209 RunningJob(JobsRegistry& registry, 210 unsigned int timeout); 211 212 ~RunningJob(); 213 214 bool IsValid() const; 215 216 const std::string& GetId() const; 217 218 int GetPriority() const; 219 220 IJob& GetJob(); 221 222 bool IsPauseScheduled(); 223 224 bool IsCancelScheduled(); 225 226 void MarkSuccess(); 227 228 void MarkFailure(); 229 230 void MarkPause(); 231 232 void MarkCanceled(); 233 234 void MarkRetry(unsigned int timeout); 235 236 void UpdateStatus(ErrorCode code, 237 const std::string& details); 238 }; 239 }; 240 } 241