1 /**
2  * Orthanc - A Lightweight, RESTful DICOM Store
3  * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4  * Department, University Hospital of Liege, Belgium
5  * Copyright (C) 2017-2021 Osimis S.A., Belgium
6  *
7  * This program is free software: you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation, either version 3 of
10  * the License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this program. If not, see
19  * <http://www.gnu.org/licenses/>.
20  **/
21 
22 
23 #pragma once
24 
25 #if !defined(ORTHANC_SANDBOXED)
26 #  error The macro ORTHANC_SANDBOXED must be defined
27 #endif
28 
29 #if ORTHANC_SANDBOXED == 1
30 #  error The job engine cannot be used in sandboxed environments
31 #endif
32 
33 #include "JobInfo.h"
34 #include "IJobUnserializer.h"
35 
36 #include <list>
37 #include <set>
38 #include <queue>
39 #include <boost/thread/mutex.hpp>
40 #include <boost/thread/condition_variable.hpp>
41 
42 namespace Orthanc
43 {
44   // This class handles the state machine of the jobs engine
45   class ORTHANC_PUBLIC JobsRegistry : public boost::noncopyable
46   {
47   public:
48     class ORTHANC_PUBLIC IObserver : public boost::noncopyable
49     {
50     public:
~IObserver()51       virtual ~IObserver()
52       {
53       }
54 
55       virtual void SignalJobSubmitted(const std::string& jobId) = 0;
56 
57       virtual void SignalJobSuccess(const std::string& jobId) = 0;
58 
59       virtual void SignalJobFailure(const std::string& jobId) = 0;
60     };
61 
62   private:
63     enum CompletedReason
64     {
65       CompletedReason_Success,
66       CompletedReason_Failure,
67       CompletedReason_Canceled
68     };
69 
70     class JobHandler;
71 
72     struct PriorityComparator
73     {
74       bool operator() (JobHandler* const& a,
75                        JobHandler* const& b) const;
76     };
77 
78     typedef std::map<std::string, JobHandler*>              JobsIndex;
79     typedef std::list<JobHandler*>                          CompletedJobs;
80     typedef std::set<JobHandler*>                           RetryJobs;
81     typedef std::priority_queue<JobHandler*,
82                                 std::vector<JobHandler*>,   // Could be a "std::deque"
83                                 PriorityComparator>         PendingJobs;
84 
85     boost::mutex               mutex_;
86     JobsIndex                  jobsIndex_;
87     PendingJobs                pendingJobs_;
88     CompletedJobs              completedJobs_;
89     RetryJobs                  retryJobs_;
90 
91     boost::condition_variable  pendingJobAvailable_;
92     boost::condition_variable  someJobComplete_;
93     size_t                     maxCompletedJobs_;
94 
95     IObserver*                 observer_;
96 
97 
98 #ifndef NDEBUG
99     bool IsPendingJob(const JobHandler& job) const;
100 
101     bool IsCompletedJob(JobHandler& job) const;
102 
103     bool IsRetryJob(JobHandler& job) const;
104 #endif
105 
106     void CheckInvariants() const;
107 
108     void ForgetOldCompletedJobs();
109 
110     void SetCompletedJob(JobHandler& job,
111                          bool success);
112 
113     void MarkRunningAsCompleted(JobHandler& job,
114                                 CompletedReason reason);
115 
116     void MarkRunningAsRetry(JobHandler& job,
117                             unsigned int timeout);
118 
119     void MarkRunningAsPaused(JobHandler& job);
120 
121     bool GetStateInternal(JobState& state,
122                           const std::string& id);
123 
124     void RemovePendingJob(const std::string& id);
125 
126     void RemoveRetryJob(JobHandler* handler);
127 
128     void SubmitInternal(std::string& id,
129                         JobHandler* handler);
130 
131   public:
132     explicit JobsRegistry(size_t maxCompletedJobs);
133 
134     JobsRegistry(IJobUnserializer& unserializer,
135                  const Json::Value& s,
136                  size_t maxCompletedJobs);
137 
138     ~JobsRegistry();
139 
140     void SetMaxCompletedJobs(size_t i);
141 
142     size_t GetMaxCompletedJobs();
143 
144     void ListJobs(std::set<std::string>& target);
145 
146     bool GetJobInfo(JobInfo& target,
147                     const std::string& id);
148 
149     bool GetJobOutput(std::string& output,
150                       MimeType& mime,
151                       const std::string& job,
152                       const std::string& key);
153 
154     void Serialize(Json::Value& target);
155 
156     void Submit(std::string& id,
157                 IJob* job,        // Takes ownership
158                 int priority);
159 
160     void Submit(IJob* job,        // Takes ownership
161                 int priority);
162 
163     void SubmitAndWait(Json::Value& successContent,
164                        IJob* job,        // Takes ownership
165                        int priority);
166 
167     bool SetPriority(const std::string& id,
168                      int priority);
169 
170     bool Pause(const std::string& id);
171 
172     bool Resume(const std::string& id);
173 
174     bool Resubmit(const std::string& id);
175 
176     bool Cancel(const std::string& id);
177 
178     void ScheduleRetries();
179 
180     bool GetState(JobState& state,
181                   const std::string& id);
182 
183     void SetObserver(IObserver& observer);
184 
185     void ResetObserver();
186 
187     void GetStatistics(unsigned int& pending,
188                        unsigned int& running,
189                        unsigned int& success,
190                        unsigned int& errors);
191 
192     class ORTHANC_PUBLIC RunningJob : public boost::noncopyable
193     {
194     private:
195       JobsRegistry&  registry_;
196       JobHandler*    handler_;  // Can only be accessed if the
197                                 // registry mutex is locked!
198       IJob*          job_;  // Will by design be in mutual exclusion,
199                             // because only one RunningJob can be
200                             // executed at a time on a JobHandler
201 
202       std::string    id_;
203       int            priority_;
204       JobState       targetState_;
205       unsigned int   targetRetryTimeout_;
206       bool           canceled_;
207 
208     public:
209       RunningJob(JobsRegistry& registry,
210                  unsigned int timeout);
211 
212       ~RunningJob();
213 
214       bool IsValid() const;
215 
216       const std::string& GetId() const;
217 
218       int GetPriority() const;
219 
220       IJob& GetJob();
221 
222       bool IsPauseScheduled();
223 
224       bool IsCancelScheduled();
225 
226       void MarkSuccess();
227 
228       void MarkFailure();
229 
230       void MarkPause();
231 
232       void MarkCanceled();
233 
234       void MarkRetry(unsigned int timeout);
235 
236       void UpdateStatus(ErrorCode code,
237                         const std::string& details);
238     };
239   };
240 }
241