1 /**
2  * Orthanc - A Lightweight, RESTful DICOM Store
3  * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4  * Department, University Hospital of Liege, Belgium
5  * Copyright (C) 2017-2021 Osimis S.A., Belgium
6  *
7  * This program is free software: you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation, either version 3 of
10  * the License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this program. If not, see
19  * <http://www.gnu.org/licenses/>.
20  **/
21 
22 
23 #include "../PrecompiledHeaders.h"
24 #include "JobsRegistry.h"
25 
26 #include "../Logging.h"
27 #include "../OrthancException.h"
28 #include "../Toolbox.h"
29 #include "../SerializationToolbox.h"
30 
31 namespace Orthanc
32 {
33   static const char* STATE = "State";
34   static const char* TYPE = "Type";
35   static const char* PRIORITY = "Priority";
36   static const char* JOB = "Job";
37   static const char* JOBS = "Jobs";
38   static const char* JOBS_REGISTRY = "JobsRegistry";
39   static const char* CREATION_TIME = "CreationTime";
40   static const char* LAST_CHANGE_TIME = "LastChangeTime";
41   static const char* RUNTIME = "Runtime";
42   static const char* ERROR_CODE = "ErrorCode";
43   static const char* ERROR_DETAILS = "ErrorDetails";
44 
45 
46   class JobsRegistry::JobHandler : public boost::noncopyable
47   {
48   private:
49     std::string                       id_;
50     JobState                          state_;
51     std::string                       jobType_;
52     std::unique_ptr<IJob>             job_;
53     int                               priority_;  // "+inf()" means highest priority
54     boost::posix_time::ptime          creationTime_;
55     boost::posix_time::ptime          lastStateChangeTime_;
56     boost::posix_time::time_duration  runtime_;
57     boost::posix_time::ptime          retryTime_;
58     bool                              pauseScheduled_;
59     bool                              cancelScheduled_;
60     JobStatus                         lastStatus_;
61 
Touch()62     void Touch()
63     {
64       const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
65 
66       if (state_ == JobState_Running)
67       {
68         runtime_ += (now - lastStateChangeTime_);
69       }
70 
71       lastStateChangeTime_ = now;
72     }
73 
SetStateInternal(JobState state)74     void SetStateInternal(JobState state)
75     {
76       state_ = state;
77       pauseScheduled_ = false;
78       cancelScheduled_ = false;
79       Touch();
80     }
81 
82   public:
JobHandler(IJob * job,int priority)83     JobHandler(IJob* job,
84                int priority) :
85       id_(Toolbox::GenerateUuid()),
86       state_(JobState_Pending),
87       job_(job),
88       priority_(priority),
89       creationTime_(boost::posix_time::microsec_clock::universal_time()),
90       lastStateChangeTime_(creationTime_),
91       runtime_(boost::posix_time::milliseconds(0)),
92       retryTime_(creationTime_),
93       pauseScheduled_(false),
94       cancelScheduled_(false)
95     {
96       if (job == NULL)
97       {
98         throw OrthancException(ErrorCode_NullPointer);
99       }
100 
101       job->GetJobType(jobType_);
102       job->Start();
103 
104       lastStatus_ = JobStatus(ErrorCode_Success, "", *job_);
105     }
106 
GetId() const107     const std::string& GetId() const
108     {
109       return id_;
110     }
111 
GetJob() const112     IJob& GetJob() const
113     {
114       assert(job_.get() != NULL);
115       return *job_;
116     }
117 
SetPriority(int priority)118     void SetPriority(int priority)
119     {
120       priority_ = priority;
121     }
122 
GetPriority() const123     int GetPriority() const
124     {
125       return priority_;
126     }
127 
GetState() const128     JobState GetState() const
129     {
130       return state_;
131     }
132 
SetState(JobState state)133     void SetState(JobState state)
134     {
135       if (state == JobState_Retry)
136       {
137         // Use "SetRetryState()"
138         throw OrthancException(ErrorCode_BadSequenceOfCalls);
139       }
140       else
141       {
142         SetStateInternal(state);
143       }
144     }
145 
SetRetryState(unsigned int timeout)146     void SetRetryState(unsigned int timeout)
147     {
148       if (state_ == JobState_Running)
149       {
150         SetStateInternal(JobState_Retry);
151         retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
152                       boost::posix_time::milliseconds(timeout));
153       }
154       else
155       {
156         // Only valid for running jobs
157         throw OrthancException(ErrorCode_BadSequenceOfCalls);
158       }
159     }
160 
SchedulePause()161     void SchedulePause()
162     {
163       if (state_ == JobState_Running)
164       {
165         pauseScheduled_ = true;
166       }
167       else
168       {
169         // Only valid for running jobs
170         throw OrthancException(ErrorCode_BadSequenceOfCalls);
171       }
172     }
173 
ScheduleCancel()174     void ScheduleCancel()
175     {
176       if (state_ == JobState_Running)
177       {
178         cancelScheduled_ = true;
179       }
180       else
181       {
182         // Only valid for running jobs
183         throw OrthancException(ErrorCode_BadSequenceOfCalls);
184       }
185     }
186 
IsPauseScheduled()187     bool IsPauseScheduled()
188     {
189       return pauseScheduled_;
190     }
191 
IsCancelScheduled()192     bool IsCancelScheduled()
193     {
194       return cancelScheduled_;
195     }
196 
IsRetryReady(const boost::posix_time::ptime & now) const197     bool IsRetryReady(const boost::posix_time::ptime& now) const
198     {
199       if (state_ != JobState_Retry)
200       {
201         throw OrthancException(ErrorCode_BadSequenceOfCalls);
202       }
203       else
204       {
205         return retryTime_ <= now;
206       }
207     }
208 
GetCreationTime() const209     const boost::posix_time::ptime& GetCreationTime() const
210     {
211       return creationTime_;
212     }
213 
GetLastStateChangeTime() const214     const boost::posix_time::ptime& GetLastStateChangeTime() const
215     {
216       return lastStateChangeTime_;
217     }
218 
SetLastStateChangeTime(const boost::posix_time::ptime & time)219     void SetLastStateChangeTime(const boost::posix_time::ptime& time)
220     {
221       lastStateChangeTime_ = time;
222     }
223 
GetRuntime() const224     const boost::posix_time::time_duration& GetRuntime() const
225     {
226       return runtime_;
227     }
228 
ResetRuntime()229     void ResetRuntime()
230     {
231       runtime_ = boost::posix_time::milliseconds(0);
232     }
233 
GetLastStatus() const234     const JobStatus& GetLastStatus() const
235     {
236       return lastStatus_;
237     }
238 
SetLastStatus(const JobStatus & status)239     void SetLastStatus(const JobStatus& status)
240     {
241       lastStatus_ = status;
242       Touch();
243     }
244 
SetLastErrorCode(ErrorCode code)245     void SetLastErrorCode(ErrorCode code)
246     {
247       lastStatus_.SetErrorCode(code);
248     }
249 
Serialize(Json::Value & target) const250     bool Serialize(Json::Value& target) const
251     {
252       target = Json::objectValue;
253 
254       bool ok;
255 
256       if (state_ == JobState_Running)
257       {
258         // WARNING: Cannot directly access the "job_" member, as long
259         // as a "RunningJob" instance is running. We do not use a
260         // mutex at the "JobHandler" level, as serialization would be
261         // blocked while a step in the job is running. Instead, we
262         // save a snapshot of the serialized job. (*)
263 
264         if (lastStatus_.HasSerialized())
265         {
266           target[JOB] = lastStatus_.GetSerialized();
267           ok = true;
268         }
269         else
270         {
271           ok = false;
272         }
273       }
274       else
275       {
276         ok = job_->Serialize(target[JOB]);
277       }
278 
279       if (ok)
280       {
281         target[STATE] = EnumerationToString(state_);
282         target[PRIORITY] = priority_;
283         target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_);
284         target[LAST_CHANGE_TIME] = boost::posix_time::to_iso_string(lastStateChangeTime_);
285         target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds());
286 
287         // New in Orthanc 1.9.5
288         target[ERROR_CODE] = static_cast<int>(lastStatus_.GetErrorCode());
289         target[ERROR_DETAILS] = lastStatus_.GetDetails();
290 
291         return true;
292       }
293       else
294       {
295         LOG(TRACE) << "Job backup is not supported for job of type: " << jobType_;
296         return false;
297       }
298     }
299 
JobHandler(IJobUnserializer & unserializer,const Json::Value & serialized,const std::string & id)300     JobHandler(IJobUnserializer& unserializer,
301                const Json::Value& serialized,
302                const std::string& id) :
303       id_(id),
304       pauseScheduled_(false),
305       cancelScheduled_(false)
306     {
307       state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE));
308       priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY);
309       creationTime_ = boost::posix_time::from_iso_string
310         (SerializationToolbox::ReadString(serialized, CREATION_TIME));
311       lastStateChangeTime_ = boost::posix_time::from_iso_string
312         (SerializationToolbox::ReadString(serialized, LAST_CHANGE_TIME));
313       runtime_ = boost::posix_time::milliseconds
314         (SerializationToolbox::ReadInteger(serialized, RUNTIME));
315 
316       retryTime_ = creationTime_;
317 
318       job_.reset(unserializer.UnserializeJob(serialized[JOB]));
319       job_->GetJobType(jobType_);
320       job_->Start();
321 
322       ErrorCode errorCode;
323       if (serialized.isMember(ERROR_CODE))
324       {
325         errorCode = static_cast<ErrorCode>(SerializationToolbox::ReadInteger(serialized, ERROR_CODE));
326       }
327       else
328       {
329         errorCode = ErrorCode_Success;  // Backward compatibility with Orthanc <= 1.9.4
330       }
331 
332       std::string details;
333       if (serialized.isMember(ERROR_DETAILS))  // Backward compatibility with Orthanc <= 1.9.4
334       {
335         details = SerializationToolbox::ReadString(serialized, ERROR_DETAILS);
336       }
337 
338       lastStatus_ = JobStatus(errorCode, details, *job_);
339     }
340   };
341 
342 
operator ()(JobHandler * const & a,JobHandler * const & b) const343   bool JobsRegistry::PriorityComparator::operator() (JobHandler* const& a,
344                                                      JobHandler* const& b) const
345   {
346     return a->GetPriority() < b->GetPriority();
347   }
348 
349 
350 #if defined(NDEBUG)
CheckInvariants() const351   void JobsRegistry::CheckInvariants() const
352   {
353   }
354 
355 #else
IsPendingJob(const JobHandler & job) const356   bool JobsRegistry::IsPendingJob(const JobHandler& job) const
357   {
358     PendingJobs copy = pendingJobs_;
359     while (!copy.empty())
360     {
361       if (copy.top() == &job)
362       {
363         return true;
364       }
365 
366       copy.pop();
367     }
368 
369     return false;
370   }
371 
IsCompletedJob(JobHandler & job) const372   bool JobsRegistry::IsCompletedJob(JobHandler& job) const
373   {
374     for (CompletedJobs::const_iterator it = completedJobs_.begin();
375          it != completedJobs_.end(); ++it)
376     {
377       if (*it == &job)
378       {
379         return true;
380       }
381     }
382 
383     return false;
384   }
385 
IsRetryJob(JobHandler & job) const386   bool JobsRegistry::IsRetryJob(JobHandler& job) const
387   {
388     return retryJobs_.find(&job) != retryJobs_.end();
389   }
390 
CheckInvariants() const391   void JobsRegistry::CheckInvariants() const
392   {
393     {
394       PendingJobs copy = pendingJobs_;
395       while (!copy.empty())
396       {
397         assert(copy.top()->GetState() == JobState_Pending);
398         copy.pop();
399       }
400     }
401 
402     assert(completedJobs_.size() <= maxCompletedJobs_);
403 
404     for (CompletedJobs::const_iterator it = completedJobs_.begin();
405          it != completedJobs_.end(); ++it)
406     {
407       assert((*it)->GetState() == JobState_Success ||
408              (*it)->GetState() == JobState_Failure);
409     }
410 
411     for (RetryJobs::const_iterator it = retryJobs_.begin();
412          it != retryJobs_.end(); ++it)
413     {
414       assert((*it)->GetState() == JobState_Retry);
415     }
416 
417     for (JobsIndex::const_iterator it = jobsIndex_.begin();
418          it != jobsIndex_.end(); ++it)
419     {
420       JobHandler& job = *it->second;
421 
422       assert(job.GetId() == it->first);
423 
424       switch (job.GetState())
425       {
426         case JobState_Pending:
427           assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
428           break;
429 
430         case JobState_Success:
431         case JobState_Failure:
432           assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
433           break;
434 
435         case JobState_Retry:
436           assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
437           break;
438 
439         case JobState_Running:
440         case JobState_Paused:
441           assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
442           break;
443 
444         default:
445           throw OrthancException(ErrorCode_InternalError);
446       }
447     }
448   }
449 #endif
450 
451 
ForgetOldCompletedJobs()452   void JobsRegistry::ForgetOldCompletedJobs()
453   {
454     while (completedJobs_.size() > maxCompletedJobs_)
455     {
456       assert(completedJobs_.front() != NULL);
457 
458       std::string id = completedJobs_.front()->GetId();
459       assert(jobsIndex_.find(id) != jobsIndex_.end());
460 
461       jobsIndex_.erase(id);
462       delete(completedJobs_.front());
463       completedJobs_.pop_front();
464     }
465 
466     CheckInvariants();
467   }
468 
469 
SetCompletedJob(JobHandler & job,bool success)470   void JobsRegistry::SetCompletedJob(JobHandler& job,
471                                      bool success)
472   {
473     job.SetState(success ? JobState_Success : JobState_Failure);
474 
475     completedJobs_.push_back(&job);
476     someJobComplete_.notify_all();
477   }
478 
479 
MarkRunningAsCompleted(JobHandler & job,CompletedReason reason)480   void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
481                                             CompletedReason reason)
482   {
483     const char* tmp;
484 
485     switch (reason)
486     {
487       case CompletedReason_Success:
488         tmp = "success";
489         break;
490 
491       case CompletedReason_Failure:
492         tmp = "failure";
493         break;
494 
495       case CompletedReason_Canceled:
496         tmp = "cancel";
497         break;
498 
499       default:
500         throw OrthancException(ErrorCode_InternalError);
501     }
502 
503     LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId();
504 
505     CheckInvariants();
506 
507     assert(job.GetState() == JobState_Running);
508     SetCompletedJob(job, reason == CompletedReason_Success);
509 
510     if (reason == CompletedReason_Canceled)
511     {
512       job.SetLastErrorCode(ErrorCode_CanceledJob);
513     }
514 
515     if (observer_ != NULL)
516     {
517       if (reason == CompletedReason_Success)
518       {
519         observer_->SignalJobSuccess(job.GetId());
520       }
521       else
522       {
523         observer_->SignalJobFailure(job.GetId());
524       }
525     }
526 
527     // WARNING: The following call might make "job" invalid if the job
528     // history size is empty
529     ForgetOldCompletedJobs();
530   }
531 
532 
MarkRunningAsRetry(JobHandler & job,unsigned int timeout)533   void JobsRegistry::MarkRunningAsRetry(JobHandler& job,
534                                         unsigned int timeout)
535   {
536     LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
537 
538     CheckInvariants();
539 
540     assert(job.GetState() == JobState_Running &&
541            retryJobs_.find(&job) == retryJobs_.end());
542 
543     retryJobs_.insert(&job);
544     job.SetRetryState(timeout);
545 
546     CheckInvariants();
547   }
548 
549 
MarkRunningAsPaused(JobHandler & job)550   void JobsRegistry::MarkRunningAsPaused(JobHandler& job)
551   {
552     LOG(INFO) << "Job paused: " << job.GetId();
553 
554     CheckInvariants();
555     assert(job.GetState() == JobState_Running);
556 
557     job.SetState(JobState_Paused);
558 
559     CheckInvariants();
560   }
561 
562 
GetStateInternal(JobState & state,const std::string & id)563   bool JobsRegistry::GetStateInternal(JobState& state,
564                                       const std::string& id)
565   {
566     CheckInvariants();
567 
568     JobsIndex::const_iterator it = jobsIndex_.find(id);
569     if (it == jobsIndex_.end())
570     {
571       return false;
572     }
573     else
574     {
575       state = it->second->GetState();
576       return true;
577     }
578   }
579 
580 
~JobsRegistry()581   JobsRegistry::~JobsRegistry()
582   {
583     for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
584     {
585       assert(it->second != NULL);
586       delete it->second;
587     }
588   }
589 
590 
SetMaxCompletedJobs(size_t n)591   void JobsRegistry::SetMaxCompletedJobs(size_t n)
592   {
593     boost::mutex::scoped_lock lock(mutex_);
594     CheckInvariants();
595 
596     LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)";
597 
598     maxCompletedJobs_ = n;
599     ForgetOldCompletedJobs();
600   }
601 
602 
GetMaxCompletedJobs()603   size_t JobsRegistry::GetMaxCompletedJobs()
604   {
605     boost::mutex::scoped_lock lock(mutex_);
606     CheckInvariants();
607     return maxCompletedJobs_;
608   }
609 
610 
ListJobs(std::set<std::string> & target)611   void JobsRegistry::ListJobs(std::set<std::string>& target)
612   {
613     boost::mutex::scoped_lock lock(mutex_);
614     CheckInvariants();
615 
616     for (JobsIndex::const_iterator it = jobsIndex_.begin();
617          it != jobsIndex_.end(); ++it)
618     {
619       target.insert(it->first);
620     }
621   }
622 
623 
GetJobInfo(JobInfo & target,const std::string & id)624   bool JobsRegistry::GetJobInfo(JobInfo& target,
625                                 const std::string& id)
626   {
627     boost::mutex::scoped_lock lock(mutex_);
628     CheckInvariants();
629 
630     JobsIndex::const_iterator found = jobsIndex_.find(id);
631 
632     if (found == jobsIndex_.end())
633     {
634       return false;
635     }
636     else
637     {
638       const JobHandler& handler = *found->second;
639       target = JobInfo(handler.GetId(),
640                        handler.GetPriority(),
641                        handler.GetState(),
642                        handler.GetLastStatus(),
643                        handler.GetCreationTime(),
644                        handler.GetLastStateChangeTime(),
645                        handler.GetRuntime());
646       return true;
647     }
648   }
649 
650 
GetJobOutput(std::string & output,MimeType & mime,const std::string & job,const std::string & key)651   bool JobsRegistry::GetJobOutput(std::string& output,
652                                   MimeType& mime,
653                                   const std::string& job,
654                                   const std::string& key)
655   {
656     boost::mutex::scoped_lock lock(mutex_);
657     CheckInvariants();
658 
659     JobsIndex::const_iterator found = jobsIndex_.find(job);
660 
661     if (found == jobsIndex_.end())
662     {
663       return false;
664     }
665     else
666     {
667       const JobHandler& handler = *found->second;
668 
669       if (handler.GetState() == JobState_Success)
670       {
671         return handler.GetJob().GetOutput(output, mime, key);
672       }
673       else
674       {
675         return false;
676       }
677     }
678   }
679 
680 
SubmitInternal(std::string & id,JobHandler * handler)681   void JobsRegistry::SubmitInternal(std::string& id,
682                                     JobHandler* handler)
683   {
684     if (handler == NULL)
685     {
686       throw OrthancException(ErrorCode_NullPointer);
687     }
688 
689     std::unique_ptr<JobHandler>  protection(handler);
690 
691     {
692       boost::mutex::scoped_lock lock(mutex_);
693       CheckInvariants();
694 
695       id = handler->GetId();
696       int priority = handler->GetPriority();
697 
698       jobsIndex_.insert(std::make_pair(id, protection.release()));
699 
700       switch (handler->GetState())
701       {
702         case JobState_Pending:
703         case JobState_Retry:
704         case JobState_Running:
705           handler->SetState(JobState_Pending);
706           pendingJobs_.push(handler);
707           pendingJobAvailable_.notify_one();
708           break;
709 
710         case JobState_Success:
711           SetCompletedJob(*handler, true);
712           break;
713 
714         case JobState_Failure:
715           SetCompletedJob(*handler, false);
716           break;
717 
718         case JobState_Paused:
719           break;
720 
721         default:
722         {
723           std::string details = ("A job should not be loaded from state: " +
724                                  std::string(EnumerationToString(handler->GetState())));
725           throw OrthancException(ErrorCode_InternalError, details);
726         }
727       }
728 
729       LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
730 
731       if (observer_ != NULL)
732       {
733         observer_->SignalJobSubmitted(id);
734       }
735 
736       // WARNING: The following call might make "handler" invalid if
737       // the job history size is empty
738       ForgetOldCompletedJobs();
739     }
740   }
741 
JobsRegistry(size_t maxCompletedJobs)742   JobsRegistry::JobsRegistry(size_t maxCompletedJobs) :
743     maxCompletedJobs_(maxCompletedJobs),
744     observer_(NULL)
745   {
746   }
747 
748 
Submit(std::string & id,IJob * job,int priority)749   void JobsRegistry::Submit(std::string& id,
750                             IJob* job,        // Takes ownership
751                             int priority)
752   {
753     SubmitInternal(id, new JobHandler(job, priority));
754   }
755 
756 
Submit(IJob * job,int priority)757   void JobsRegistry::Submit(IJob* job,        // Takes ownership
758                             int priority)
759   {
760     std::string id;
761     SubmitInternal(id, new JobHandler(job, priority));
762   }
763 
764 
SubmitAndWait(Json::Value & successContent,IJob * job,int priority)765   void JobsRegistry::SubmitAndWait(Json::Value& successContent,
766                                    IJob* job,        // Takes ownership
767                                    int priority)
768   {
769     std::string id;
770     Submit(id, job, priority);
771 
772     JobState state = JobState_Pending;  // Dummy initialization
773 
774     {
775       boost::mutex::scoped_lock lock(mutex_);
776 
777       for (;;)
778       {
779         if (!GetStateInternal(state, id))
780         {
781           // Job has finished and has been lost (typically happens if
782           // "JobsHistorySize" is 0)
783           throw OrthancException(ErrorCode_InexistentItem,
784                                  "Cannot retrieve the status of the job, "
785                                  "make sure that \"JobsHistorySize\" is not 0");
786         }
787         else if (state == JobState_Failure)
788         {
789           // Failure
790           JobsIndex::const_iterator it = jobsIndex_.find(id);
791           if (it != jobsIndex_.end())  // Should always be true, already tested in GetStateInternal()
792           {
793             ErrorCode code = it->second->GetLastStatus().GetErrorCode();
794             const std::string& details = it->second->GetLastStatus().GetDetails();
795 
796             if (details.empty())
797             {
798               throw OrthancException(code);
799             }
800             else
801             {
802               throw OrthancException(code, details);
803             }
804           }
805           else
806           {
807             throw OrthancException(ErrorCode_InternalError);
808           }
809         }
810         else if (state == JobState_Success)
811         {
812           // Success, try and retrieve the status of the job
813           JobsIndex::const_iterator it = jobsIndex_.find(id);
814           if (it == jobsIndex_.end())
815           {
816             // Should not happen
817             state = JobState_Failure;
818           }
819           else
820           {
821             const JobStatus& status = it->second->GetLastStatus();
822             successContent = status.GetPublicContent();
823           }
824 
825           return;
826         }
827         else
828         {
829           // This job has not finished yet, wait for new completion
830           someJobComplete_.wait(lock);
831         }
832       }
833     }
834   }
835 
836 
SetPriority(const std::string & id,int priority)837   bool JobsRegistry::SetPriority(const std::string& id,
838                                  int priority)
839   {
840     LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
841 
842     boost::mutex::scoped_lock lock(mutex_);
843     CheckInvariants();
844 
845     JobsIndex::iterator found = jobsIndex_.find(id);
846 
847     if (found == jobsIndex_.end())
848     {
849       LOG(WARNING) << "Unknown job: " << id;
850       return false;
851     }
852     else
853     {
854       found->second->SetPriority(priority);
855 
856       if (found->second->GetState() == JobState_Pending)
857       {
858         // If the job is pending, we need to reconstruct the
859         // priority queue, as the heap condition has changed
860 
861         PendingJobs copy;
862         std::swap(copy, pendingJobs_);
863 
864         assert(pendingJobs_.empty());
865         while (!copy.empty())
866         {
867           pendingJobs_.push(copy.top());
868           copy.pop();
869         }
870       }
871 
872       CheckInvariants();
873       return true;
874     }
875   }
876 
877 
RemovePendingJob(const std::string & id)878   void JobsRegistry::RemovePendingJob(const std::string& id)
879   {
880     // If the job is pending, we need to reconstruct the priority
881     // queue to remove it
882     PendingJobs copy;
883     std::swap(copy, pendingJobs_);
884 
885     assert(pendingJobs_.empty());
886     while (!copy.empty())
887     {
888       if (copy.top()->GetId() != id)
889       {
890         pendingJobs_.push(copy.top());
891       }
892 
893       copy.pop();
894     }
895   }
896 
897 
RemoveRetryJob(JobHandler * handler)898   void JobsRegistry::RemoveRetryJob(JobHandler* handler)
899   {
900     RetryJobs::iterator item = retryJobs_.find(handler);
901     assert(item != retryJobs_.end());
902     retryJobs_.erase(item);
903   }
904 
905 
Pause(const std::string & id)906   bool JobsRegistry::Pause(const std::string& id)
907   {
908     LOG(INFO) << "Pausing job: " << id;
909 
910     boost::mutex::scoped_lock lock(mutex_);
911     CheckInvariants();
912 
913     JobsIndex::iterator found = jobsIndex_.find(id);
914 
915     if (found == jobsIndex_.end())
916     {
917       LOG(WARNING) << "Unknown job: " << id;
918       return false;
919     }
920     else
921     {
922       switch (found->second->GetState())
923       {
924         case JobState_Pending:
925           RemovePendingJob(id);
926           found->second->SetState(JobState_Paused);
927           break;
928 
929         case JobState_Retry:
930           RemoveRetryJob(found->second);
931           found->second->SetState(JobState_Paused);
932           break;
933 
934         case JobState_Paused:
935         case JobState_Success:
936         case JobState_Failure:
937           // Nothing to be done
938           break;
939 
940         case JobState_Running:
941           found->second->SchedulePause();
942           break;
943 
944         default:
945           throw OrthancException(ErrorCode_InternalError);
946       }
947 
948       CheckInvariants();
949       return true;
950     }
951   }
952 
953 
Cancel(const std::string & id)954   bool JobsRegistry::Cancel(const std::string& id)
955   {
956     LOG(INFO) << "Canceling job: " << id;
957 
958     boost::mutex::scoped_lock lock(mutex_);
959     CheckInvariants();
960 
961     JobsIndex::iterator found = jobsIndex_.find(id);
962 
963     if (found == jobsIndex_.end())
964     {
965       LOG(WARNING) << "Unknown job: " << id;
966       return false;
967     }
968     else
969     {
970       switch (found->second->GetState())
971       {
972         case JobState_Pending:
973           RemovePendingJob(id);
974           SetCompletedJob(*found->second, false);
975           found->second->SetLastErrorCode(ErrorCode_CanceledJob);
976           break;
977 
978         case JobState_Retry:
979           RemoveRetryJob(found->second);
980           SetCompletedJob(*found->second, false);
981           found->second->SetLastErrorCode(ErrorCode_CanceledJob);
982           break;
983 
984         case JobState_Paused:
985           SetCompletedJob(*found->second, false);
986           found->second->SetLastErrorCode(ErrorCode_CanceledJob);
987           break;
988 
989         case JobState_Success:
990         case JobState_Failure:
991           // Nothing to be done
992           break;
993 
994         case JobState_Running:
995           found->second->ScheduleCancel();
996           break;
997 
998         default:
999           throw OrthancException(ErrorCode_InternalError);
1000       }
1001 
1002       // WARNING: The following call might make "handler" invalid if
1003       // the job history size is empty
1004       ForgetOldCompletedJobs();
1005 
1006       return true;
1007     }
1008   }
1009 
1010 
Resume(const std::string & id)1011   bool JobsRegistry::Resume(const std::string& id)
1012   {
1013     LOG(INFO) << "Resuming job: " << id;
1014 
1015     boost::mutex::scoped_lock lock(mutex_);
1016     CheckInvariants();
1017 
1018     JobsIndex::iterator found = jobsIndex_.find(id);
1019 
1020     if (found == jobsIndex_.end())
1021     {
1022       LOG(WARNING) << "Unknown job: " << id;
1023       return false;
1024     }
1025     else if (found->second->GetState() != JobState_Paused)
1026     {
1027       LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
1028       return false;
1029     }
1030     else
1031     {
1032       found->second->SetState(JobState_Pending);
1033       pendingJobs_.push(found->second);
1034       pendingJobAvailable_.notify_one();
1035       CheckInvariants();
1036       return true;
1037     }
1038   }
1039 
1040 
Resubmit(const std::string & id)1041   bool JobsRegistry::Resubmit(const std::string& id)
1042   {
1043     LOG(INFO) << "Resubmitting failed job: " << id;
1044 
1045     boost::mutex::scoped_lock lock(mutex_);
1046     CheckInvariants();
1047 
1048     JobsIndex::iterator found = jobsIndex_.find(id);
1049 
1050     if (found == jobsIndex_.end())
1051     {
1052       LOG(WARNING) << "Unknown job: " << id;
1053       return false;
1054     }
1055     else if (found->second->GetState() != JobState_Failure)
1056     {
1057       LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
1058       return false;
1059     }
1060     else
1061     {
1062       found->second->GetJob().Reset();
1063 
1064       bool ok = false;
1065       for (CompletedJobs::iterator it = completedJobs_.begin();
1066            it != completedJobs_.end(); ++it)
1067       {
1068         if (*it == found->second)
1069         {
1070           ok = true;
1071           completedJobs_.erase(it);
1072           break;
1073         }
1074       }
1075 
1076       (void) ok;  // Remove warning about unused variable in release builds
1077       assert(ok);
1078 
1079       found->second->ResetRuntime();
1080       found->second->SetState(JobState_Pending);
1081       pendingJobs_.push(found->second);
1082       pendingJobAvailable_.notify_one();
1083 
1084       CheckInvariants();
1085       return true;
1086     }
1087   }
1088 
1089 
ScheduleRetries()1090   void JobsRegistry::ScheduleRetries()
1091   {
1092     boost::mutex::scoped_lock lock(mutex_);
1093     CheckInvariants();
1094 
1095     RetryJobs copy;
1096     std::swap(copy, retryJobs_);
1097 
1098     const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
1099 
1100     assert(retryJobs_.empty());
1101     for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
1102     {
1103       if ((*it)->IsRetryReady(now))
1104       {
1105         LOG(INFO) << "Retrying job: " << (*it)->GetId();
1106         (*it)->SetState(JobState_Pending);
1107         pendingJobs_.push(*it);
1108         pendingJobAvailable_.notify_one();
1109       }
1110       else
1111       {
1112         retryJobs_.insert(*it);
1113       }
1114     }
1115 
1116     CheckInvariants();
1117   }
1118 
1119 
GetState(JobState & state,const std::string & id)1120   bool JobsRegistry::GetState(JobState& state,
1121                               const std::string& id)
1122   {
1123     boost::mutex::scoped_lock lock(mutex_);
1124     return GetStateInternal(state, id);
1125   }
1126 
1127 
SetObserver(JobsRegistry::IObserver & observer)1128   void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer)
1129   {
1130     boost::mutex::scoped_lock lock(mutex_);
1131     observer_ = &observer;
1132   }
1133 
1134 
ResetObserver()1135   void JobsRegistry::ResetObserver()
1136   {
1137     boost::mutex::scoped_lock lock(mutex_);
1138     observer_ = NULL;
1139   }
1140 
1141 
RunningJob(JobsRegistry & registry,unsigned int timeout)1142   JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
1143                                        unsigned int timeout) :
1144     registry_(registry),
1145     handler_(NULL),
1146     targetState_(JobState_Failure),
1147     targetRetryTimeout_(0),
1148     canceled_(false)
1149   {
1150     {
1151       boost::mutex::scoped_lock lock(registry_.mutex_);
1152 
1153       while (registry_.pendingJobs_.empty())
1154       {
1155         if (timeout == 0)
1156         {
1157           registry_.pendingJobAvailable_.wait(lock);
1158         }
1159         else
1160         {
1161           bool success = registry_.pendingJobAvailable_.timed_wait
1162             (lock, boost::posix_time::milliseconds(timeout));
1163           if (!success)
1164           {
1165             // No pending job
1166             return;
1167           }
1168         }
1169       }
1170 
1171       handler_ = registry_.pendingJobs_.top();
1172       registry_.pendingJobs_.pop();
1173 
1174       assert(handler_->GetState() == JobState_Pending);
1175       handler_->SetState(JobState_Running);
1176       handler_->SetLastErrorCode(ErrorCode_Success);
1177 
1178       job_ = &handler_->GetJob();
1179       id_ = handler_->GetId();
1180       priority_ = handler_->GetPriority();
1181     }
1182   }
1183 
1184 
~RunningJob()1185   JobsRegistry::RunningJob::~RunningJob()
1186   {
1187     if (IsValid())
1188     {
1189       boost::mutex::scoped_lock lock(registry_.mutex_);
1190 
1191       switch (targetState_)
1192       {
1193         case JobState_Failure:
1194           registry_.MarkRunningAsCompleted
1195             (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure);
1196           break;
1197 
1198         case JobState_Success:
1199           registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success);
1200           break;
1201 
1202         case JobState_Paused:
1203           registry_.MarkRunningAsPaused(*handler_);
1204           break;
1205 
1206         case JobState_Retry:
1207           registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
1208           break;
1209 
1210         default:
1211           assert(0);
1212       }
1213     }
1214   }
1215 
1216 
IsValid() const1217   bool JobsRegistry::RunningJob::IsValid() const
1218   {
1219     return (handler_ != NULL &&
1220             job_ != NULL);
1221   }
1222 
1223 
GetId() const1224   const std::string& JobsRegistry::RunningJob::GetId() const
1225   {
1226     if (!IsValid())
1227     {
1228       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1229     }
1230     else
1231     {
1232       return id_;
1233     }
1234   }
1235 
1236 
GetPriority() const1237   int JobsRegistry::RunningJob::GetPriority() const
1238   {
1239     if (!IsValid())
1240     {
1241       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1242     }
1243     else
1244     {
1245       return priority_;
1246     }
1247   }
1248 
1249 
GetJob()1250   IJob& JobsRegistry::RunningJob::GetJob()
1251   {
1252     if (!IsValid())
1253     {
1254       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1255     }
1256     else
1257     {
1258       return *job_;
1259     }
1260   }
1261 
1262 
IsPauseScheduled()1263   bool JobsRegistry::RunningJob::IsPauseScheduled()
1264   {
1265     if (!IsValid())
1266     {
1267       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1268     }
1269     else
1270     {
1271       boost::mutex::scoped_lock lock(registry_.mutex_);
1272       registry_.CheckInvariants();
1273       assert(handler_->GetState() == JobState_Running);
1274 
1275       return handler_->IsPauseScheduled();
1276     }
1277   }
1278 
1279 
IsCancelScheduled()1280   bool JobsRegistry::RunningJob::IsCancelScheduled()
1281   {
1282     if (!IsValid())
1283     {
1284       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1285     }
1286     else
1287     {
1288       boost::mutex::scoped_lock lock(registry_.mutex_);
1289       registry_.CheckInvariants();
1290       assert(handler_->GetState() == JobState_Running);
1291 
1292       return handler_->IsCancelScheduled();
1293     }
1294   }
1295 
1296 
MarkSuccess()1297   void JobsRegistry::RunningJob::MarkSuccess()
1298   {
1299     if (!IsValid())
1300     {
1301       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1302     }
1303     else
1304     {
1305       targetState_ = JobState_Success;
1306     }
1307   }
1308 
1309 
MarkFailure()1310   void JobsRegistry::RunningJob::MarkFailure()
1311   {
1312     if (!IsValid())
1313     {
1314       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1315     }
1316     else
1317     {
1318       targetState_ = JobState_Failure;
1319     }
1320   }
1321 
1322 
MarkCanceled()1323   void JobsRegistry::RunningJob::MarkCanceled()
1324   {
1325     if (!IsValid())
1326     {
1327       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1328     }
1329     else
1330     {
1331       targetState_ = JobState_Failure;
1332       canceled_ = true;
1333     }
1334   }
1335 
1336 
MarkPause()1337   void JobsRegistry::RunningJob::MarkPause()
1338   {
1339     if (!IsValid())
1340     {
1341       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1342     }
1343     else
1344     {
1345       targetState_ = JobState_Paused;
1346     }
1347   }
1348 
1349 
MarkRetry(unsigned int timeout)1350   void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout)
1351   {
1352     if (!IsValid())
1353     {
1354       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1355     }
1356     else
1357     {
1358       targetState_ = JobState_Retry;
1359       targetRetryTimeout_ = timeout;
1360     }
1361   }
1362 
1363 
UpdateStatus(ErrorCode code,const std::string & details)1364   void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code,
1365                                               const std::string& details)
1366   {
1367     if (!IsValid())
1368     {
1369       throw OrthancException(ErrorCode_BadSequenceOfCalls);
1370     }
1371     else
1372     {
1373       JobStatus status(code, details, *job_);
1374 
1375       boost::mutex::scoped_lock lock(registry_.mutex_);
1376       registry_.CheckInvariants();
1377       assert(handler_->GetState() == JobState_Running);
1378 
1379       handler_->SetLastStatus(status);
1380     }
1381   }
1382 
1383 
1384 
Serialize(Json::Value & target)1385   void JobsRegistry::Serialize(Json::Value& target)
1386   {
1387     boost::mutex::scoped_lock lock(mutex_);
1388     CheckInvariants();
1389 
1390     target = Json::objectValue;
1391     target[TYPE] = JOBS_REGISTRY;
1392     target[JOBS] = Json::objectValue;
1393 
1394     for (JobsIndex::const_iterator it = jobsIndex_.begin();
1395          it != jobsIndex_.end(); ++it)
1396     {
1397       Json::Value v;
1398       if (it->second->Serialize(v))
1399       {
1400         target[JOBS][it->first] = v;
1401       }
1402     }
1403   }
1404 
1405 
JobsRegistry(IJobUnserializer & unserializer,const Json::Value & s,size_t maxCompletedJobs)1406   JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
1407                              const Json::Value& s,
1408                              size_t maxCompletedJobs) :
1409     maxCompletedJobs_(maxCompletedJobs),
1410     observer_(NULL)
1411   {
1412     if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
1413         !s.isMember(JOBS) ||
1414         s[JOBS].type() != Json::objectValue)
1415     {
1416       throw OrthancException(ErrorCode_BadFileFormat);
1417     }
1418 
1419     Json::Value::Members members = s[JOBS].getMemberNames();
1420 
1421     for (Json::Value::Members::const_iterator it = members.begin();
1422          it != members.end(); ++it)
1423     {
1424       std::unique_ptr<JobHandler> job;
1425 
1426       try
1427       {
1428         job.reset(new JobHandler(unserializer, s[JOBS][*it], *it));
1429       }
1430       catch (OrthancException& e)
1431       {
1432         LOG(WARNING) << "Cannot unserialize one job from previous execution, "
1433                      << "skipping it: " << e.What();
1434         continue;
1435       }
1436 
1437       const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime();
1438 
1439       std::string id;
1440       SubmitInternal(id, job.release());
1441 
1442       // Check whether the job has not been removed (which could be
1443       // the case if the "maxCompletedJobs_" value gets smaller)
1444       JobsIndex::iterator found = jobsIndex_.find(id);
1445       if (found != jobsIndex_.end())
1446       {
1447         // The job still lies in the history: Update the time of its
1448         // last change to the time that was serialized
1449         assert(found->second != NULL);
1450         found->second->SetLastStateChangeTime(lastChangeTime);
1451       }
1452     }
1453   }
1454 
1455 
GetStatistics(unsigned int & pending,unsigned int & running,unsigned int & success,unsigned int & failed)1456   void JobsRegistry::GetStatistics(unsigned int& pending,
1457                                    unsigned int& running,
1458                                    unsigned int& success,
1459                                    unsigned int& failed)
1460   {
1461     boost::mutex::scoped_lock lock(mutex_);
1462     CheckInvariants();
1463 
1464     pending = 0;
1465     running = 0;
1466     success = 0;
1467     failed = 0;
1468 
1469     for (JobsIndex::const_iterator it = jobsIndex_.begin();
1470          it != jobsIndex_.end(); ++it)
1471     {
1472       JobHandler& job = *it->second;
1473 
1474       switch (job.GetState())
1475       {
1476         case JobState_Retry:
1477         case JobState_Pending:
1478           pending ++;
1479           break;
1480 
1481         case JobState_Paused:
1482         case JobState_Running:
1483           running ++;
1484           break;
1485 
1486         case JobState_Success:
1487           success ++;
1488           break;
1489 
1490         case JobState_Failure:
1491           failed ++;
1492           break;
1493 
1494         default:
1495           throw OrthancException(ErrorCode_InternalError);
1496       }
1497     }
1498   }
1499 }
1500