1 /** 2 * Orthanc - A Lightweight, RESTful DICOM Store 3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics 4 * Department, University Hospital of Liege, Belgium 5 * Copyright (C) 2017-2021 Osimis S.A., Belgium 6 * 7 * This program is free software: you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public License 9 * as published by the Free Software Foundation, either version 3 of 10 * the License, or (at your option) any later version. 11 * 12 * This program is distributed in the hope that it will be useful, but 13 * WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this program. If not, see 19 * <http://www.gnu.org/licenses/>. 20 **/ 21 22 23 #include "../PrecompiledHeaders.h" 24 #include "JobsRegistry.h" 25 26 #include "../Logging.h" 27 #include "../OrthancException.h" 28 #include "../Toolbox.h" 29 #include "../SerializationToolbox.h" 30 31 namespace Orthanc 32 { 33 static const char* STATE = "State"; 34 static const char* TYPE = "Type"; 35 static const char* PRIORITY = "Priority"; 36 static const char* JOB = "Job"; 37 static const char* JOBS = "Jobs"; 38 static const char* JOBS_REGISTRY = "JobsRegistry"; 39 static const char* CREATION_TIME = "CreationTime"; 40 static const char* LAST_CHANGE_TIME = "LastChangeTime"; 41 static const char* RUNTIME = "Runtime"; 42 static const char* ERROR_CODE = "ErrorCode"; 43 static const char* ERROR_DETAILS = "ErrorDetails"; 44 45 46 class JobsRegistry::JobHandler : public boost::noncopyable 47 { 48 private: 49 std::string id_; 50 JobState state_; 51 std::string jobType_; 52 std::unique_ptr<IJob> job_; 53 int priority_; // "+inf()" means highest priority 54 boost::posix_time::ptime creationTime_; 55 boost::posix_time::ptime lastStateChangeTime_; 56 boost::posix_time::time_duration runtime_; 57 boost::posix_time::ptime retryTime_; 58 bool pauseScheduled_; 59 bool cancelScheduled_; 60 JobStatus lastStatus_; 61 Touch()62 void Touch() 63 { 64 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); 65 66 if (state_ == JobState_Running) 67 { 68 runtime_ += (now - lastStateChangeTime_); 69 } 70 71 lastStateChangeTime_ = now; 72 } 73 SetStateInternal(JobState state)74 void SetStateInternal(JobState state) 75 { 76 state_ = state; 77 pauseScheduled_ = false; 78 cancelScheduled_ = false; 79 Touch(); 80 } 81 82 public: JobHandler(IJob * job,int priority)83 JobHandler(IJob* job, 84 int priority) : 85 id_(Toolbox::GenerateUuid()), 86 state_(JobState_Pending), 87 job_(job), 88 priority_(priority), 89 creationTime_(boost::posix_time::microsec_clock::universal_time()), 90 lastStateChangeTime_(creationTime_), 91 runtime_(boost::posix_time::milliseconds(0)), 92 retryTime_(creationTime_), 93 pauseScheduled_(false), 94 cancelScheduled_(false) 95 { 96 if (job == NULL) 97 { 98 throw OrthancException(ErrorCode_NullPointer); 99 } 100 101 job->GetJobType(jobType_); 102 job->Start(); 103 104 lastStatus_ = JobStatus(ErrorCode_Success, "", *job_); 105 } 106 GetId() const107 const std::string& GetId() const 108 { 109 return id_; 110 } 111 GetJob() const112 IJob& GetJob() const 113 { 114 assert(job_.get() != NULL); 115 return *job_; 116 } 117 SetPriority(int priority)118 void SetPriority(int priority) 119 { 120 priority_ = priority; 121 } 122 GetPriority() const123 int GetPriority() const 124 { 125 return priority_; 126 } 127 GetState() const128 JobState GetState() const 129 { 130 return state_; 131 } 132 SetState(JobState state)133 void SetState(JobState state) 134 { 135 if (state == JobState_Retry) 136 { 137 // Use "SetRetryState()" 138 throw OrthancException(ErrorCode_BadSequenceOfCalls); 139 } 140 else 141 { 142 SetStateInternal(state); 143 } 144 } 145 SetRetryState(unsigned int timeout)146 void SetRetryState(unsigned int timeout) 147 { 148 if (state_ == JobState_Running) 149 { 150 SetStateInternal(JobState_Retry); 151 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + 152 boost::posix_time::milliseconds(timeout)); 153 } 154 else 155 { 156 // Only valid for running jobs 157 throw OrthancException(ErrorCode_BadSequenceOfCalls); 158 } 159 } 160 SchedulePause()161 void SchedulePause() 162 { 163 if (state_ == JobState_Running) 164 { 165 pauseScheduled_ = true; 166 } 167 else 168 { 169 // Only valid for running jobs 170 throw OrthancException(ErrorCode_BadSequenceOfCalls); 171 } 172 } 173 ScheduleCancel()174 void ScheduleCancel() 175 { 176 if (state_ == JobState_Running) 177 { 178 cancelScheduled_ = true; 179 } 180 else 181 { 182 // Only valid for running jobs 183 throw OrthancException(ErrorCode_BadSequenceOfCalls); 184 } 185 } 186 IsPauseScheduled()187 bool IsPauseScheduled() 188 { 189 return pauseScheduled_; 190 } 191 IsCancelScheduled()192 bool IsCancelScheduled() 193 { 194 return cancelScheduled_; 195 } 196 IsRetryReady(const boost::posix_time::ptime & now) const197 bool IsRetryReady(const boost::posix_time::ptime& now) const 198 { 199 if (state_ != JobState_Retry) 200 { 201 throw OrthancException(ErrorCode_BadSequenceOfCalls); 202 } 203 else 204 { 205 return retryTime_ <= now; 206 } 207 } 208 GetCreationTime() const209 const boost::posix_time::ptime& GetCreationTime() const 210 { 211 return creationTime_; 212 } 213 GetLastStateChangeTime() const214 const boost::posix_time::ptime& GetLastStateChangeTime() const 215 { 216 return lastStateChangeTime_; 217 } 218 SetLastStateChangeTime(const boost::posix_time::ptime & time)219 void SetLastStateChangeTime(const boost::posix_time::ptime& time) 220 { 221 lastStateChangeTime_ = time; 222 } 223 GetRuntime() const224 const boost::posix_time::time_duration& GetRuntime() const 225 { 226 return runtime_; 227 } 228 ResetRuntime()229 void ResetRuntime() 230 { 231 runtime_ = boost::posix_time::milliseconds(0); 232 } 233 GetLastStatus() const234 const JobStatus& GetLastStatus() const 235 { 236 return lastStatus_; 237 } 238 SetLastStatus(const JobStatus & status)239 void SetLastStatus(const JobStatus& status) 240 { 241 lastStatus_ = status; 242 Touch(); 243 } 244 SetLastErrorCode(ErrorCode code)245 void SetLastErrorCode(ErrorCode code) 246 { 247 lastStatus_.SetErrorCode(code); 248 } 249 Serialize(Json::Value & target) const250 bool Serialize(Json::Value& target) const 251 { 252 target = Json::objectValue; 253 254 bool ok; 255 256 if (state_ == JobState_Running) 257 { 258 // WARNING: Cannot directly access the "job_" member, as long 259 // as a "RunningJob" instance is running. We do not use a 260 // mutex at the "JobHandler" level, as serialization would be 261 // blocked while a step in the job is running. Instead, we 262 // save a snapshot of the serialized job. (*) 263 264 if (lastStatus_.HasSerialized()) 265 { 266 target[JOB] = lastStatus_.GetSerialized(); 267 ok = true; 268 } 269 else 270 { 271 ok = false; 272 } 273 } 274 else 275 { 276 ok = job_->Serialize(target[JOB]); 277 } 278 279 if (ok) 280 { 281 target[STATE] = EnumerationToString(state_); 282 target[PRIORITY] = priority_; 283 target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_); 284 target[LAST_CHANGE_TIME] = boost::posix_time::to_iso_string(lastStateChangeTime_); 285 target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds()); 286 287 // New in Orthanc 1.9.5 288 target[ERROR_CODE] = static_cast<int>(lastStatus_.GetErrorCode()); 289 target[ERROR_DETAILS] = lastStatus_.GetDetails(); 290 291 return true; 292 } 293 else 294 { 295 LOG(TRACE) << "Job backup is not supported for job of type: " << jobType_; 296 return false; 297 } 298 } 299 JobHandler(IJobUnserializer & unserializer,const Json::Value & serialized,const std::string & id)300 JobHandler(IJobUnserializer& unserializer, 301 const Json::Value& serialized, 302 const std::string& id) : 303 id_(id), 304 pauseScheduled_(false), 305 cancelScheduled_(false) 306 { 307 state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE)); 308 priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY); 309 creationTime_ = boost::posix_time::from_iso_string 310 (SerializationToolbox::ReadString(serialized, CREATION_TIME)); 311 lastStateChangeTime_ = boost::posix_time::from_iso_string 312 (SerializationToolbox::ReadString(serialized, LAST_CHANGE_TIME)); 313 runtime_ = boost::posix_time::milliseconds 314 (SerializationToolbox::ReadInteger(serialized, RUNTIME)); 315 316 retryTime_ = creationTime_; 317 318 job_.reset(unserializer.UnserializeJob(serialized[JOB])); 319 job_->GetJobType(jobType_); 320 job_->Start(); 321 322 ErrorCode errorCode; 323 if (serialized.isMember(ERROR_CODE)) 324 { 325 errorCode = static_cast<ErrorCode>(SerializationToolbox::ReadInteger(serialized, ERROR_CODE)); 326 } 327 else 328 { 329 errorCode = ErrorCode_Success; // Backward compatibility with Orthanc <= 1.9.4 330 } 331 332 std::string details; 333 if (serialized.isMember(ERROR_DETAILS)) // Backward compatibility with Orthanc <= 1.9.4 334 { 335 details = SerializationToolbox::ReadString(serialized, ERROR_DETAILS); 336 } 337 338 lastStatus_ = JobStatus(errorCode, details, *job_); 339 } 340 }; 341 342 operator ()(JobHandler * const & a,JobHandler * const & b) const343 bool JobsRegistry::PriorityComparator::operator() (JobHandler* const& a, 344 JobHandler* const& b) const 345 { 346 return a->GetPriority() < b->GetPriority(); 347 } 348 349 350 #if defined(NDEBUG) CheckInvariants() const351 void JobsRegistry::CheckInvariants() const 352 { 353 } 354 355 #else IsPendingJob(const JobHandler & job) const356 bool JobsRegistry::IsPendingJob(const JobHandler& job) const 357 { 358 PendingJobs copy = pendingJobs_; 359 while (!copy.empty()) 360 { 361 if (copy.top() == &job) 362 { 363 return true; 364 } 365 366 copy.pop(); 367 } 368 369 return false; 370 } 371 IsCompletedJob(JobHandler & job) const372 bool JobsRegistry::IsCompletedJob(JobHandler& job) const 373 { 374 for (CompletedJobs::const_iterator it = completedJobs_.begin(); 375 it != completedJobs_.end(); ++it) 376 { 377 if (*it == &job) 378 { 379 return true; 380 } 381 } 382 383 return false; 384 } 385 IsRetryJob(JobHandler & job) const386 bool JobsRegistry::IsRetryJob(JobHandler& job) const 387 { 388 return retryJobs_.find(&job) != retryJobs_.end(); 389 } 390 CheckInvariants() const391 void JobsRegistry::CheckInvariants() const 392 { 393 { 394 PendingJobs copy = pendingJobs_; 395 while (!copy.empty()) 396 { 397 assert(copy.top()->GetState() == JobState_Pending); 398 copy.pop(); 399 } 400 } 401 402 assert(completedJobs_.size() <= maxCompletedJobs_); 403 404 for (CompletedJobs::const_iterator it = completedJobs_.begin(); 405 it != completedJobs_.end(); ++it) 406 { 407 assert((*it)->GetState() == JobState_Success || 408 (*it)->GetState() == JobState_Failure); 409 } 410 411 for (RetryJobs::const_iterator it = retryJobs_.begin(); 412 it != retryJobs_.end(); ++it) 413 { 414 assert((*it)->GetState() == JobState_Retry); 415 } 416 417 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 418 it != jobsIndex_.end(); ++it) 419 { 420 JobHandler& job = *it->second; 421 422 assert(job.GetId() == it->first); 423 424 switch (job.GetState()) 425 { 426 case JobState_Pending: 427 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); 428 break; 429 430 case JobState_Success: 431 case JobState_Failure: 432 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); 433 break; 434 435 case JobState_Retry: 436 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); 437 break; 438 439 case JobState_Running: 440 case JobState_Paused: 441 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); 442 break; 443 444 default: 445 throw OrthancException(ErrorCode_InternalError); 446 } 447 } 448 } 449 #endif 450 451 ForgetOldCompletedJobs()452 void JobsRegistry::ForgetOldCompletedJobs() 453 { 454 while (completedJobs_.size() > maxCompletedJobs_) 455 { 456 assert(completedJobs_.front() != NULL); 457 458 std::string id = completedJobs_.front()->GetId(); 459 assert(jobsIndex_.find(id) != jobsIndex_.end()); 460 461 jobsIndex_.erase(id); 462 delete(completedJobs_.front()); 463 completedJobs_.pop_front(); 464 } 465 466 CheckInvariants(); 467 } 468 469 SetCompletedJob(JobHandler & job,bool success)470 void JobsRegistry::SetCompletedJob(JobHandler& job, 471 bool success) 472 { 473 job.SetState(success ? JobState_Success : JobState_Failure); 474 475 completedJobs_.push_back(&job); 476 someJobComplete_.notify_all(); 477 } 478 479 MarkRunningAsCompleted(JobHandler & job,CompletedReason reason)480 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, 481 CompletedReason reason) 482 { 483 const char* tmp; 484 485 switch (reason) 486 { 487 case CompletedReason_Success: 488 tmp = "success"; 489 break; 490 491 case CompletedReason_Failure: 492 tmp = "failure"; 493 break; 494 495 case CompletedReason_Canceled: 496 tmp = "cancel"; 497 break; 498 499 default: 500 throw OrthancException(ErrorCode_InternalError); 501 } 502 503 LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId(); 504 505 CheckInvariants(); 506 507 assert(job.GetState() == JobState_Running); 508 SetCompletedJob(job, reason == CompletedReason_Success); 509 510 if (reason == CompletedReason_Canceled) 511 { 512 job.SetLastErrorCode(ErrorCode_CanceledJob); 513 } 514 515 if (observer_ != NULL) 516 { 517 if (reason == CompletedReason_Success) 518 { 519 observer_->SignalJobSuccess(job.GetId()); 520 } 521 else 522 { 523 observer_->SignalJobFailure(job.GetId()); 524 } 525 } 526 527 // WARNING: The following call might make "job" invalid if the job 528 // history size is empty 529 ForgetOldCompletedJobs(); 530 } 531 532 MarkRunningAsRetry(JobHandler & job,unsigned int timeout)533 void JobsRegistry::MarkRunningAsRetry(JobHandler& job, 534 unsigned int timeout) 535 { 536 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); 537 538 CheckInvariants(); 539 540 assert(job.GetState() == JobState_Running && 541 retryJobs_.find(&job) == retryJobs_.end()); 542 543 retryJobs_.insert(&job); 544 job.SetRetryState(timeout); 545 546 CheckInvariants(); 547 } 548 549 MarkRunningAsPaused(JobHandler & job)550 void JobsRegistry::MarkRunningAsPaused(JobHandler& job) 551 { 552 LOG(INFO) << "Job paused: " << job.GetId(); 553 554 CheckInvariants(); 555 assert(job.GetState() == JobState_Running); 556 557 job.SetState(JobState_Paused); 558 559 CheckInvariants(); 560 } 561 562 GetStateInternal(JobState & state,const std::string & id)563 bool JobsRegistry::GetStateInternal(JobState& state, 564 const std::string& id) 565 { 566 CheckInvariants(); 567 568 JobsIndex::const_iterator it = jobsIndex_.find(id); 569 if (it == jobsIndex_.end()) 570 { 571 return false; 572 } 573 else 574 { 575 state = it->second->GetState(); 576 return true; 577 } 578 } 579 580 ~JobsRegistry()581 JobsRegistry::~JobsRegistry() 582 { 583 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) 584 { 585 assert(it->second != NULL); 586 delete it->second; 587 } 588 } 589 590 SetMaxCompletedJobs(size_t n)591 void JobsRegistry::SetMaxCompletedJobs(size_t n) 592 { 593 boost::mutex::scoped_lock lock(mutex_); 594 CheckInvariants(); 595 596 LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)"; 597 598 maxCompletedJobs_ = n; 599 ForgetOldCompletedJobs(); 600 } 601 602 GetMaxCompletedJobs()603 size_t JobsRegistry::GetMaxCompletedJobs() 604 { 605 boost::mutex::scoped_lock lock(mutex_); 606 CheckInvariants(); 607 return maxCompletedJobs_; 608 } 609 610 ListJobs(std::set<std::string> & target)611 void JobsRegistry::ListJobs(std::set<std::string>& target) 612 { 613 boost::mutex::scoped_lock lock(mutex_); 614 CheckInvariants(); 615 616 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 617 it != jobsIndex_.end(); ++it) 618 { 619 target.insert(it->first); 620 } 621 } 622 623 GetJobInfo(JobInfo & target,const std::string & id)624 bool JobsRegistry::GetJobInfo(JobInfo& target, 625 const std::string& id) 626 { 627 boost::mutex::scoped_lock lock(mutex_); 628 CheckInvariants(); 629 630 JobsIndex::const_iterator found = jobsIndex_.find(id); 631 632 if (found == jobsIndex_.end()) 633 { 634 return false; 635 } 636 else 637 { 638 const JobHandler& handler = *found->second; 639 target = JobInfo(handler.GetId(), 640 handler.GetPriority(), 641 handler.GetState(), 642 handler.GetLastStatus(), 643 handler.GetCreationTime(), 644 handler.GetLastStateChangeTime(), 645 handler.GetRuntime()); 646 return true; 647 } 648 } 649 650 GetJobOutput(std::string & output,MimeType & mime,const std::string & job,const std::string & key)651 bool JobsRegistry::GetJobOutput(std::string& output, 652 MimeType& mime, 653 const std::string& job, 654 const std::string& key) 655 { 656 boost::mutex::scoped_lock lock(mutex_); 657 CheckInvariants(); 658 659 JobsIndex::const_iterator found = jobsIndex_.find(job); 660 661 if (found == jobsIndex_.end()) 662 { 663 return false; 664 } 665 else 666 { 667 const JobHandler& handler = *found->second; 668 669 if (handler.GetState() == JobState_Success) 670 { 671 return handler.GetJob().GetOutput(output, mime, key); 672 } 673 else 674 { 675 return false; 676 } 677 } 678 } 679 680 SubmitInternal(std::string & id,JobHandler * handler)681 void JobsRegistry::SubmitInternal(std::string& id, 682 JobHandler* handler) 683 { 684 if (handler == NULL) 685 { 686 throw OrthancException(ErrorCode_NullPointer); 687 } 688 689 std::unique_ptr<JobHandler> protection(handler); 690 691 { 692 boost::mutex::scoped_lock lock(mutex_); 693 CheckInvariants(); 694 695 id = handler->GetId(); 696 int priority = handler->GetPriority(); 697 698 jobsIndex_.insert(std::make_pair(id, protection.release())); 699 700 switch (handler->GetState()) 701 { 702 case JobState_Pending: 703 case JobState_Retry: 704 case JobState_Running: 705 handler->SetState(JobState_Pending); 706 pendingJobs_.push(handler); 707 pendingJobAvailable_.notify_one(); 708 break; 709 710 case JobState_Success: 711 SetCompletedJob(*handler, true); 712 break; 713 714 case JobState_Failure: 715 SetCompletedJob(*handler, false); 716 break; 717 718 case JobState_Paused: 719 break; 720 721 default: 722 { 723 std::string details = ("A job should not be loaded from state: " + 724 std::string(EnumerationToString(handler->GetState()))); 725 throw OrthancException(ErrorCode_InternalError, details); 726 } 727 } 728 729 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; 730 731 if (observer_ != NULL) 732 { 733 observer_->SignalJobSubmitted(id); 734 } 735 736 // WARNING: The following call might make "handler" invalid if 737 // the job history size is empty 738 ForgetOldCompletedJobs(); 739 } 740 } 741 JobsRegistry(size_t maxCompletedJobs)742 JobsRegistry::JobsRegistry(size_t maxCompletedJobs) : 743 maxCompletedJobs_(maxCompletedJobs), 744 observer_(NULL) 745 { 746 } 747 748 Submit(std::string & id,IJob * job,int priority)749 void JobsRegistry::Submit(std::string& id, 750 IJob* job, // Takes ownership 751 int priority) 752 { 753 SubmitInternal(id, new JobHandler(job, priority)); 754 } 755 756 Submit(IJob * job,int priority)757 void JobsRegistry::Submit(IJob* job, // Takes ownership 758 int priority) 759 { 760 std::string id; 761 SubmitInternal(id, new JobHandler(job, priority)); 762 } 763 764 SubmitAndWait(Json::Value & successContent,IJob * job,int priority)765 void JobsRegistry::SubmitAndWait(Json::Value& successContent, 766 IJob* job, // Takes ownership 767 int priority) 768 { 769 std::string id; 770 Submit(id, job, priority); 771 772 JobState state = JobState_Pending; // Dummy initialization 773 774 { 775 boost::mutex::scoped_lock lock(mutex_); 776 777 for (;;) 778 { 779 if (!GetStateInternal(state, id)) 780 { 781 // Job has finished and has been lost (typically happens if 782 // "JobsHistorySize" is 0) 783 throw OrthancException(ErrorCode_InexistentItem, 784 "Cannot retrieve the status of the job, " 785 "make sure that \"JobsHistorySize\" is not 0"); 786 } 787 else if (state == JobState_Failure) 788 { 789 // Failure 790 JobsIndex::const_iterator it = jobsIndex_.find(id); 791 if (it != jobsIndex_.end()) // Should always be true, already tested in GetStateInternal() 792 { 793 ErrorCode code = it->second->GetLastStatus().GetErrorCode(); 794 const std::string& details = it->second->GetLastStatus().GetDetails(); 795 796 if (details.empty()) 797 { 798 throw OrthancException(code); 799 } 800 else 801 { 802 throw OrthancException(code, details); 803 } 804 } 805 else 806 { 807 throw OrthancException(ErrorCode_InternalError); 808 } 809 } 810 else if (state == JobState_Success) 811 { 812 // Success, try and retrieve the status of the job 813 JobsIndex::const_iterator it = jobsIndex_.find(id); 814 if (it == jobsIndex_.end()) 815 { 816 // Should not happen 817 state = JobState_Failure; 818 } 819 else 820 { 821 const JobStatus& status = it->second->GetLastStatus(); 822 successContent = status.GetPublicContent(); 823 } 824 825 return; 826 } 827 else 828 { 829 // This job has not finished yet, wait for new completion 830 someJobComplete_.wait(lock); 831 } 832 } 833 } 834 } 835 836 SetPriority(const std::string & id,int priority)837 bool JobsRegistry::SetPriority(const std::string& id, 838 int priority) 839 { 840 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; 841 842 boost::mutex::scoped_lock lock(mutex_); 843 CheckInvariants(); 844 845 JobsIndex::iterator found = jobsIndex_.find(id); 846 847 if (found == jobsIndex_.end()) 848 { 849 LOG(WARNING) << "Unknown job: " << id; 850 return false; 851 } 852 else 853 { 854 found->second->SetPriority(priority); 855 856 if (found->second->GetState() == JobState_Pending) 857 { 858 // If the job is pending, we need to reconstruct the 859 // priority queue, as the heap condition has changed 860 861 PendingJobs copy; 862 std::swap(copy, pendingJobs_); 863 864 assert(pendingJobs_.empty()); 865 while (!copy.empty()) 866 { 867 pendingJobs_.push(copy.top()); 868 copy.pop(); 869 } 870 } 871 872 CheckInvariants(); 873 return true; 874 } 875 } 876 877 RemovePendingJob(const std::string & id)878 void JobsRegistry::RemovePendingJob(const std::string& id) 879 { 880 // If the job is pending, we need to reconstruct the priority 881 // queue to remove it 882 PendingJobs copy; 883 std::swap(copy, pendingJobs_); 884 885 assert(pendingJobs_.empty()); 886 while (!copy.empty()) 887 { 888 if (copy.top()->GetId() != id) 889 { 890 pendingJobs_.push(copy.top()); 891 } 892 893 copy.pop(); 894 } 895 } 896 897 RemoveRetryJob(JobHandler * handler)898 void JobsRegistry::RemoveRetryJob(JobHandler* handler) 899 { 900 RetryJobs::iterator item = retryJobs_.find(handler); 901 assert(item != retryJobs_.end()); 902 retryJobs_.erase(item); 903 } 904 905 Pause(const std::string & id)906 bool JobsRegistry::Pause(const std::string& id) 907 { 908 LOG(INFO) << "Pausing job: " << id; 909 910 boost::mutex::scoped_lock lock(mutex_); 911 CheckInvariants(); 912 913 JobsIndex::iterator found = jobsIndex_.find(id); 914 915 if (found == jobsIndex_.end()) 916 { 917 LOG(WARNING) << "Unknown job: " << id; 918 return false; 919 } 920 else 921 { 922 switch (found->second->GetState()) 923 { 924 case JobState_Pending: 925 RemovePendingJob(id); 926 found->second->SetState(JobState_Paused); 927 break; 928 929 case JobState_Retry: 930 RemoveRetryJob(found->second); 931 found->second->SetState(JobState_Paused); 932 break; 933 934 case JobState_Paused: 935 case JobState_Success: 936 case JobState_Failure: 937 // Nothing to be done 938 break; 939 940 case JobState_Running: 941 found->second->SchedulePause(); 942 break; 943 944 default: 945 throw OrthancException(ErrorCode_InternalError); 946 } 947 948 CheckInvariants(); 949 return true; 950 } 951 } 952 953 Cancel(const std::string & id)954 bool JobsRegistry::Cancel(const std::string& id) 955 { 956 LOG(INFO) << "Canceling job: " << id; 957 958 boost::mutex::scoped_lock lock(mutex_); 959 CheckInvariants(); 960 961 JobsIndex::iterator found = jobsIndex_.find(id); 962 963 if (found == jobsIndex_.end()) 964 { 965 LOG(WARNING) << "Unknown job: " << id; 966 return false; 967 } 968 else 969 { 970 switch (found->second->GetState()) 971 { 972 case JobState_Pending: 973 RemovePendingJob(id); 974 SetCompletedJob(*found->second, false); 975 found->second->SetLastErrorCode(ErrorCode_CanceledJob); 976 break; 977 978 case JobState_Retry: 979 RemoveRetryJob(found->second); 980 SetCompletedJob(*found->second, false); 981 found->second->SetLastErrorCode(ErrorCode_CanceledJob); 982 break; 983 984 case JobState_Paused: 985 SetCompletedJob(*found->second, false); 986 found->second->SetLastErrorCode(ErrorCode_CanceledJob); 987 break; 988 989 case JobState_Success: 990 case JobState_Failure: 991 // Nothing to be done 992 break; 993 994 case JobState_Running: 995 found->second->ScheduleCancel(); 996 break; 997 998 default: 999 throw OrthancException(ErrorCode_InternalError); 1000 } 1001 1002 // WARNING: The following call might make "handler" invalid if 1003 // the job history size is empty 1004 ForgetOldCompletedJobs(); 1005 1006 return true; 1007 } 1008 } 1009 1010 Resume(const std::string & id)1011 bool JobsRegistry::Resume(const std::string& id) 1012 { 1013 LOG(INFO) << "Resuming job: " << id; 1014 1015 boost::mutex::scoped_lock lock(mutex_); 1016 CheckInvariants(); 1017 1018 JobsIndex::iterator found = jobsIndex_.find(id); 1019 1020 if (found == jobsIndex_.end()) 1021 { 1022 LOG(WARNING) << "Unknown job: " << id; 1023 return false; 1024 } 1025 else if (found->second->GetState() != JobState_Paused) 1026 { 1027 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; 1028 return false; 1029 } 1030 else 1031 { 1032 found->second->SetState(JobState_Pending); 1033 pendingJobs_.push(found->second); 1034 pendingJobAvailable_.notify_one(); 1035 CheckInvariants(); 1036 return true; 1037 } 1038 } 1039 1040 Resubmit(const std::string & id)1041 bool JobsRegistry::Resubmit(const std::string& id) 1042 { 1043 LOG(INFO) << "Resubmitting failed job: " << id; 1044 1045 boost::mutex::scoped_lock lock(mutex_); 1046 CheckInvariants(); 1047 1048 JobsIndex::iterator found = jobsIndex_.find(id); 1049 1050 if (found == jobsIndex_.end()) 1051 { 1052 LOG(WARNING) << "Unknown job: " << id; 1053 return false; 1054 } 1055 else if (found->second->GetState() != JobState_Failure) 1056 { 1057 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; 1058 return false; 1059 } 1060 else 1061 { 1062 found->second->GetJob().Reset(); 1063 1064 bool ok = false; 1065 for (CompletedJobs::iterator it = completedJobs_.begin(); 1066 it != completedJobs_.end(); ++it) 1067 { 1068 if (*it == found->second) 1069 { 1070 ok = true; 1071 completedJobs_.erase(it); 1072 break; 1073 } 1074 } 1075 1076 (void) ok; // Remove warning about unused variable in release builds 1077 assert(ok); 1078 1079 found->second->ResetRuntime(); 1080 found->second->SetState(JobState_Pending); 1081 pendingJobs_.push(found->second); 1082 pendingJobAvailable_.notify_one(); 1083 1084 CheckInvariants(); 1085 return true; 1086 } 1087 } 1088 1089 ScheduleRetries()1090 void JobsRegistry::ScheduleRetries() 1091 { 1092 boost::mutex::scoped_lock lock(mutex_); 1093 CheckInvariants(); 1094 1095 RetryJobs copy; 1096 std::swap(copy, retryJobs_); 1097 1098 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); 1099 1100 assert(retryJobs_.empty()); 1101 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) 1102 { 1103 if ((*it)->IsRetryReady(now)) 1104 { 1105 LOG(INFO) << "Retrying job: " << (*it)->GetId(); 1106 (*it)->SetState(JobState_Pending); 1107 pendingJobs_.push(*it); 1108 pendingJobAvailable_.notify_one(); 1109 } 1110 else 1111 { 1112 retryJobs_.insert(*it); 1113 } 1114 } 1115 1116 CheckInvariants(); 1117 } 1118 1119 GetState(JobState & state,const std::string & id)1120 bool JobsRegistry::GetState(JobState& state, 1121 const std::string& id) 1122 { 1123 boost::mutex::scoped_lock lock(mutex_); 1124 return GetStateInternal(state, id); 1125 } 1126 1127 SetObserver(JobsRegistry::IObserver & observer)1128 void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer) 1129 { 1130 boost::mutex::scoped_lock lock(mutex_); 1131 observer_ = &observer; 1132 } 1133 1134 ResetObserver()1135 void JobsRegistry::ResetObserver() 1136 { 1137 boost::mutex::scoped_lock lock(mutex_); 1138 observer_ = NULL; 1139 } 1140 1141 RunningJob(JobsRegistry & registry,unsigned int timeout)1142 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, 1143 unsigned int timeout) : 1144 registry_(registry), 1145 handler_(NULL), 1146 targetState_(JobState_Failure), 1147 targetRetryTimeout_(0), 1148 canceled_(false) 1149 { 1150 { 1151 boost::mutex::scoped_lock lock(registry_.mutex_); 1152 1153 while (registry_.pendingJobs_.empty()) 1154 { 1155 if (timeout == 0) 1156 { 1157 registry_.pendingJobAvailable_.wait(lock); 1158 } 1159 else 1160 { 1161 bool success = registry_.pendingJobAvailable_.timed_wait 1162 (lock, boost::posix_time::milliseconds(timeout)); 1163 if (!success) 1164 { 1165 // No pending job 1166 return; 1167 } 1168 } 1169 } 1170 1171 handler_ = registry_.pendingJobs_.top(); 1172 registry_.pendingJobs_.pop(); 1173 1174 assert(handler_->GetState() == JobState_Pending); 1175 handler_->SetState(JobState_Running); 1176 handler_->SetLastErrorCode(ErrorCode_Success); 1177 1178 job_ = &handler_->GetJob(); 1179 id_ = handler_->GetId(); 1180 priority_ = handler_->GetPriority(); 1181 } 1182 } 1183 1184 ~RunningJob()1185 JobsRegistry::RunningJob::~RunningJob() 1186 { 1187 if (IsValid()) 1188 { 1189 boost::mutex::scoped_lock lock(registry_.mutex_); 1190 1191 switch (targetState_) 1192 { 1193 case JobState_Failure: 1194 registry_.MarkRunningAsCompleted 1195 (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure); 1196 break; 1197 1198 case JobState_Success: 1199 registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success); 1200 break; 1201 1202 case JobState_Paused: 1203 registry_.MarkRunningAsPaused(*handler_); 1204 break; 1205 1206 case JobState_Retry: 1207 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); 1208 break; 1209 1210 default: 1211 assert(0); 1212 } 1213 } 1214 } 1215 1216 IsValid() const1217 bool JobsRegistry::RunningJob::IsValid() const 1218 { 1219 return (handler_ != NULL && 1220 job_ != NULL); 1221 } 1222 1223 GetId() const1224 const std::string& JobsRegistry::RunningJob::GetId() const 1225 { 1226 if (!IsValid()) 1227 { 1228 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1229 } 1230 else 1231 { 1232 return id_; 1233 } 1234 } 1235 1236 GetPriority() const1237 int JobsRegistry::RunningJob::GetPriority() const 1238 { 1239 if (!IsValid()) 1240 { 1241 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1242 } 1243 else 1244 { 1245 return priority_; 1246 } 1247 } 1248 1249 GetJob()1250 IJob& JobsRegistry::RunningJob::GetJob() 1251 { 1252 if (!IsValid()) 1253 { 1254 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1255 } 1256 else 1257 { 1258 return *job_; 1259 } 1260 } 1261 1262 IsPauseScheduled()1263 bool JobsRegistry::RunningJob::IsPauseScheduled() 1264 { 1265 if (!IsValid()) 1266 { 1267 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1268 } 1269 else 1270 { 1271 boost::mutex::scoped_lock lock(registry_.mutex_); 1272 registry_.CheckInvariants(); 1273 assert(handler_->GetState() == JobState_Running); 1274 1275 return handler_->IsPauseScheduled(); 1276 } 1277 } 1278 1279 IsCancelScheduled()1280 bool JobsRegistry::RunningJob::IsCancelScheduled() 1281 { 1282 if (!IsValid()) 1283 { 1284 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1285 } 1286 else 1287 { 1288 boost::mutex::scoped_lock lock(registry_.mutex_); 1289 registry_.CheckInvariants(); 1290 assert(handler_->GetState() == JobState_Running); 1291 1292 return handler_->IsCancelScheduled(); 1293 } 1294 } 1295 1296 MarkSuccess()1297 void JobsRegistry::RunningJob::MarkSuccess() 1298 { 1299 if (!IsValid()) 1300 { 1301 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1302 } 1303 else 1304 { 1305 targetState_ = JobState_Success; 1306 } 1307 } 1308 1309 MarkFailure()1310 void JobsRegistry::RunningJob::MarkFailure() 1311 { 1312 if (!IsValid()) 1313 { 1314 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1315 } 1316 else 1317 { 1318 targetState_ = JobState_Failure; 1319 } 1320 } 1321 1322 MarkCanceled()1323 void JobsRegistry::RunningJob::MarkCanceled() 1324 { 1325 if (!IsValid()) 1326 { 1327 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1328 } 1329 else 1330 { 1331 targetState_ = JobState_Failure; 1332 canceled_ = true; 1333 } 1334 } 1335 1336 MarkPause()1337 void JobsRegistry::RunningJob::MarkPause() 1338 { 1339 if (!IsValid()) 1340 { 1341 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1342 } 1343 else 1344 { 1345 targetState_ = JobState_Paused; 1346 } 1347 } 1348 1349 MarkRetry(unsigned int timeout)1350 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout) 1351 { 1352 if (!IsValid()) 1353 { 1354 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1355 } 1356 else 1357 { 1358 targetState_ = JobState_Retry; 1359 targetRetryTimeout_ = timeout; 1360 } 1361 } 1362 1363 UpdateStatus(ErrorCode code,const std::string & details)1364 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code, 1365 const std::string& details) 1366 { 1367 if (!IsValid()) 1368 { 1369 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1370 } 1371 else 1372 { 1373 JobStatus status(code, details, *job_); 1374 1375 boost::mutex::scoped_lock lock(registry_.mutex_); 1376 registry_.CheckInvariants(); 1377 assert(handler_->GetState() == JobState_Running); 1378 1379 handler_->SetLastStatus(status); 1380 } 1381 } 1382 1383 1384 Serialize(Json::Value & target)1385 void JobsRegistry::Serialize(Json::Value& target) 1386 { 1387 boost::mutex::scoped_lock lock(mutex_); 1388 CheckInvariants(); 1389 1390 target = Json::objectValue; 1391 target[TYPE] = JOBS_REGISTRY; 1392 target[JOBS] = Json::objectValue; 1393 1394 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 1395 it != jobsIndex_.end(); ++it) 1396 { 1397 Json::Value v; 1398 if (it->second->Serialize(v)) 1399 { 1400 target[JOBS][it->first] = v; 1401 } 1402 } 1403 } 1404 1405 JobsRegistry(IJobUnserializer & unserializer,const Json::Value & s,size_t maxCompletedJobs)1406 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, 1407 const Json::Value& s, 1408 size_t maxCompletedJobs) : 1409 maxCompletedJobs_(maxCompletedJobs), 1410 observer_(NULL) 1411 { 1412 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || 1413 !s.isMember(JOBS) || 1414 s[JOBS].type() != Json::objectValue) 1415 { 1416 throw OrthancException(ErrorCode_BadFileFormat); 1417 } 1418 1419 Json::Value::Members members = s[JOBS].getMemberNames(); 1420 1421 for (Json::Value::Members::const_iterator it = members.begin(); 1422 it != members.end(); ++it) 1423 { 1424 std::unique_ptr<JobHandler> job; 1425 1426 try 1427 { 1428 job.reset(new JobHandler(unserializer, s[JOBS][*it], *it)); 1429 } 1430 catch (OrthancException& e) 1431 { 1432 LOG(WARNING) << "Cannot unserialize one job from previous execution, " 1433 << "skipping it: " << e.What(); 1434 continue; 1435 } 1436 1437 const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime(); 1438 1439 std::string id; 1440 SubmitInternal(id, job.release()); 1441 1442 // Check whether the job has not been removed (which could be 1443 // the case if the "maxCompletedJobs_" value gets smaller) 1444 JobsIndex::iterator found = jobsIndex_.find(id); 1445 if (found != jobsIndex_.end()) 1446 { 1447 // The job still lies in the history: Update the time of its 1448 // last change to the time that was serialized 1449 assert(found->second != NULL); 1450 found->second->SetLastStateChangeTime(lastChangeTime); 1451 } 1452 } 1453 } 1454 1455 GetStatistics(unsigned int & pending,unsigned int & running,unsigned int & success,unsigned int & failed)1456 void JobsRegistry::GetStatistics(unsigned int& pending, 1457 unsigned int& running, 1458 unsigned int& success, 1459 unsigned int& failed) 1460 { 1461 boost::mutex::scoped_lock lock(mutex_); 1462 CheckInvariants(); 1463 1464 pending = 0; 1465 running = 0; 1466 success = 0; 1467 failed = 0; 1468 1469 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 1470 it != jobsIndex_.end(); ++it) 1471 { 1472 JobHandler& job = *it->second; 1473 1474 switch (job.GetState()) 1475 { 1476 case JobState_Retry: 1477 case JobState_Pending: 1478 pending ++; 1479 break; 1480 1481 case JobState_Paused: 1482 case JobState_Running: 1483 running ++; 1484 break; 1485 1486 case JobState_Success: 1487 success ++; 1488 break; 1489 1490 case JobState_Failure: 1491 failed ++; 1492 break; 1493 1494 default: 1495 throw OrthancException(ErrorCode_InternalError); 1496 } 1497 } 1498 } 1499 } 1500