1 /** 2 * Orthanc - A Lightweight, RESTful DICOM Store 3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics 4 * Department, University Hospital of Liege, Belgium 5 * Copyright (C) 2017-2021 Osimis S.A., Belgium 6 * 7 * This program is free software: you can redistribute it and/or 8 * modify it under the terms of the GNU General Public License as 9 * published by the Free Software Foundation, either version 3 of the 10 * License, or (at your option) any later version. 11 * 12 * In addition, as a special exception, the copyright holders of this 13 * program give permission to link the code of its release with the 14 * OpenSSL project's "OpenSSL" library (or with modified versions of it 15 * that use the same license as the "OpenSSL" library), and distribute 16 * the linked executables. You must obey the GNU General Public License 17 * in all respects for all of the code used other than "OpenSSL". If you 18 * modify file(s) with this exception, you may extend this exception to 19 * your version of the file(s), but you are not obligated to do so. If 20 * you do not wish to do so, delete this exception statement from your 21 * version. If you delete this exception statement from all source files 22 * in the program, then also delete it here. 23 * 24 * This program is distributed in the hope that it will be useful, but 25 * WITHOUT ANY WARRANTY; without even the implied warranty of 26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 27 * General Public License for more details. 28 * 29 * You should have received a copy of the GNU General Public License 30 * along with this program. If not, see <http://www.gnu.org/licenses/>. 31 **/ 32 33 34 #include "PrecompiledHeadersServer.h" 35 #include "ServerIndex.h" 36 37 #ifndef NOMINMAX 38 #define NOMINMAX 39 #endif 40 41 #include "../../OrthancFramework/Sources/Logging.h" 42 #include "../../OrthancFramework/Sources/Toolbox.h" 43 44 #include "OrthancConfiguration.h" 45 #include "ServerContext.h" 46 #include "ServerIndexChange.h" 47 #include "ServerToolbox.h" 48 49 50 static const uint64_t MEGA_BYTES = 1024 * 1024; 51 52 namespace Orthanc 53 { 54 class ServerIndex::TransactionContext : public StatelessDatabaseOperations::ITransactionContext 55 { 56 private: 57 struct FileToRemove 58 { 59 private: 60 std::string uuid_; 61 FileContentType type_; 62 63 public: FileToRemoveOrthanc::ServerIndex::TransactionContext::FileToRemove64 explicit FileToRemove(const FileInfo& info) : 65 uuid_(info.GetUuid()), 66 type_(info.GetContentType()) 67 { 68 } 69 GetUuidOrthanc::ServerIndex::TransactionContext::FileToRemove70 const std::string& GetUuid() const 71 { 72 return uuid_; 73 } 74 GetContentTypeOrthanc::ServerIndex::TransactionContext::FileToRemove75 FileContentType GetContentType() const 76 { 77 return type_; 78 } 79 }; 80 81 ServerContext& context_; 82 bool hasRemainingLevel_; 83 ResourceType remainingType_; 84 std::string remainingPublicId_; 85 std::list<FileToRemove> pendingFilesToRemove_; 86 std::list<ServerIndexChange> pendingChanges_; 87 uint64_t sizeOfFilesToRemove_; 88 uint64_t sizeOfAddedAttachments_; 89 Reset()90 void Reset() 91 { 92 sizeOfFilesToRemove_ = 0; 93 hasRemainingLevel_ = false; 94 remainingType_ = ResourceType_Instance; // dummy initialization 95 pendingFilesToRemove_.clear(); 96 pendingChanges_.clear(); 97 sizeOfAddedAttachments_ = 0; 98 } 99 CommitFilesToRemove()100 void CommitFilesToRemove() 101 { 102 for (std::list<FileToRemove>::const_iterator 103 it = pendingFilesToRemove_.begin(); 104 it != pendingFilesToRemove_.end(); ++it) 105 { 106 try 107 { 108 context_.RemoveFile(it->GetUuid(), it->GetContentType()); 109 } 110 catch (OrthancException& e) 111 { 112 LOG(ERROR) << "Unable to remove an attachment from the storage area: " 113 << it->GetUuid() << " (type: " << EnumerationToString(it->GetContentType()) << ")"; 114 } 115 } 116 } 117 CommitChanges()118 void CommitChanges() 119 { 120 for (std::list<ServerIndexChange>::const_iterator 121 it = pendingChanges_.begin(); 122 it != pendingChanges_.end(); ++it) 123 { 124 context_.SignalChange(*it); 125 } 126 } 127 128 public: TransactionContext(ServerContext & context)129 explicit TransactionContext(ServerContext& context) : 130 context_(context) 131 { 132 Reset(); 133 assert(ResourceType_Patient < ResourceType_Study && 134 ResourceType_Study < ResourceType_Series && 135 ResourceType_Series < ResourceType_Instance); 136 } 137 SignalRemainingAncestor(ResourceType parentType,const std::string & publicId)138 virtual void SignalRemainingAncestor(ResourceType parentType, 139 const std::string& publicId) ORTHANC_OVERRIDE 140 { 141 LOG(TRACE) << "Remaining ancestor \"" << publicId << "\" (" << parentType << ")"; 142 143 if (hasRemainingLevel_) 144 { 145 if (parentType < remainingType_) 146 { 147 remainingType_ = parentType; 148 remainingPublicId_ = publicId; 149 } 150 } 151 else 152 { 153 hasRemainingLevel_ = true; 154 remainingType_ = parentType; 155 remainingPublicId_ = publicId; 156 } 157 } 158 SignalAttachmentDeleted(const FileInfo & info)159 virtual void SignalAttachmentDeleted(const FileInfo& info) ORTHANC_OVERRIDE 160 { 161 assert(Toolbox::IsUuid(info.GetUuid())); 162 pendingFilesToRemove_.push_back(FileToRemove(info)); 163 sizeOfFilesToRemove_ += info.GetCompressedSize(); 164 } 165 SignalResourceDeleted(ResourceType type,const std::string & publicId)166 virtual void SignalResourceDeleted(ResourceType type, 167 const std::string& publicId) ORTHANC_OVERRIDE 168 { 169 SignalChange(ServerIndexChange(ChangeType_Deleted, type, publicId)); 170 } 171 SignalChange(const ServerIndexChange & change)172 virtual void SignalChange(const ServerIndexChange& change) ORTHANC_OVERRIDE 173 { 174 LOG(TRACE) << "Change related to resource " << change.GetPublicId() << " of type " 175 << EnumerationToString(change.GetResourceType()) << ": " 176 << EnumerationToString(change.GetChangeType()); 177 178 pendingChanges_.push_back(change); 179 } 180 SignalAttachmentsAdded(uint64_t compressedSize)181 virtual void SignalAttachmentsAdded(uint64_t compressedSize) ORTHANC_OVERRIDE 182 { 183 sizeOfAddedAttachments_ += compressedSize; 184 } 185 LookupRemainingLevel(std::string & remainingPublicId,ResourceType & remainingLevel)186 virtual bool LookupRemainingLevel(std::string& remainingPublicId /* out */, 187 ResourceType& remainingLevel /* out */) ORTHANC_OVERRIDE 188 { 189 if (hasRemainingLevel_) 190 { 191 remainingPublicId = remainingPublicId_; 192 remainingLevel = remainingType_; 193 return true; 194 } 195 else 196 { 197 return false; 198 } 199 }; 200 MarkAsUnstable(int64_t id,Orthanc::ResourceType type,const std::string & publicId)201 virtual void MarkAsUnstable(int64_t id, 202 Orthanc::ResourceType type, 203 const std::string& publicId) ORTHANC_OVERRIDE 204 { 205 context_.GetIndex().MarkAsUnstable(id, type, publicId); 206 } 207 IsUnstableResource(int64_t id)208 virtual bool IsUnstableResource(int64_t id) ORTHANC_OVERRIDE 209 { 210 return context_.GetIndex().IsUnstableResource(id); 211 } 212 Commit()213 virtual void Commit() ORTHANC_OVERRIDE 214 { 215 // We can remove the files once the SQLite transaction has 216 // been successfully committed. Some files might have to be 217 // deleted because of recycling. 218 CommitFilesToRemove(); 219 220 // Send all the pending changes to the Orthanc plugins 221 CommitChanges(); 222 } 223 GetCompressedSizeDelta()224 virtual int64_t GetCompressedSizeDelta() ORTHANC_OVERRIDE 225 { 226 return (static_cast<int64_t>(sizeOfAddedAttachments_) - 227 static_cast<int64_t>(sizeOfFilesToRemove_)); 228 } 229 }; 230 231 232 class ServerIndex::TransactionContextFactory : public ITransactionContextFactory 233 { 234 private: 235 ServerContext& context_; 236 237 public: TransactionContextFactory(ServerContext & context)238 explicit TransactionContextFactory(ServerContext& context) : 239 context_(context) 240 { 241 } 242 Create()243 virtual ITransactionContext* Create() 244 { 245 // There can be concurrent calls to this method, which is not an 246 // issue because we simply create an object 247 return new TransactionContext(context_); 248 } 249 }; 250 251 252 class ServerIndex::UnstableResourcePayload 253 { 254 private: 255 ResourceType type_; 256 std::string publicId_; 257 boost::posix_time::ptime time_; 258 259 public: UnstableResourcePayload()260 UnstableResourcePayload() : type_(ResourceType_Instance) 261 { 262 } 263 UnstableResourcePayload(Orthanc::ResourceType type,const std::string & publicId)264 UnstableResourcePayload(Orthanc::ResourceType type, 265 const std::string& publicId) : 266 type_(type), 267 publicId_(publicId), 268 time_(boost::posix_time::second_clock::local_time()) 269 { 270 } 271 GetAge() const272 unsigned int GetAge() const 273 { 274 return (boost::posix_time::second_clock::local_time() - time_).total_seconds(); 275 } 276 GetResourceType() const277 ResourceType GetResourceType() const 278 { 279 return type_; 280 } 281 GetPublicId() const282 const std::string& GetPublicId() const 283 { 284 return publicId_; 285 } 286 }; 287 288 FlushThread(ServerIndex * that,unsigned int threadSleepGranularityMilliseconds)289 void ServerIndex::FlushThread(ServerIndex* that, 290 unsigned int threadSleepGranularityMilliseconds) 291 { 292 // By default, wait for 10 seconds before flushing 293 static const unsigned int SLEEP_SECONDS = 10; 294 295 if (threadSleepGranularityMilliseconds > 1000) 296 { 297 throw OrthancException(ErrorCode_ParameterOutOfRange); 298 } 299 300 LOG(INFO) << "Starting the database flushing thread (sleep = " << SLEEP_SECONDS << " seconds)"; 301 302 unsigned int count = 0; 303 unsigned int countThreshold = (1000 * SLEEP_SECONDS) / threadSleepGranularityMilliseconds; 304 305 while (!that->done_) 306 { 307 boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds)); 308 count++; 309 310 if (count >= countThreshold) 311 { 312 Logging::Flush(); 313 that->FlushToDisk(); 314 315 count = 0; 316 } 317 } 318 319 LOG(INFO) << "Stopping the database flushing thread"; 320 } 321 322 IsUnstableResource(int64_t id)323 bool ServerIndex::IsUnstableResource(int64_t id) 324 { 325 boost::mutex::scoped_lock lock(monitoringMutex_); 326 return unstableResources_.Contains(id); 327 } 328 329 ServerIndex(ServerContext & context,IDatabaseWrapper & db,unsigned int threadSleepGranularityMilliseconds)330 ServerIndex::ServerIndex(ServerContext& context, 331 IDatabaseWrapper& db, 332 unsigned int threadSleepGranularityMilliseconds) : 333 StatelessDatabaseOperations(db), 334 done_(false), 335 maximumStorageSize_(0), 336 maximumPatients_(0) 337 { 338 SetTransactionContextFactory(new TransactionContextFactory(context)); 339 340 // Initial recycling if the parameters have changed since the last 341 // execution of Orthanc 342 StandaloneRecycling(maximumStorageSize_, maximumPatients_); 343 344 if (HasFlushToDisk()) 345 { 346 flushThread_ = boost::thread(FlushThread, this, threadSleepGranularityMilliseconds); 347 } 348 349 unstableResourcesMonitorThread_ = boost::thread 350 (UnstableResourcesMonitorThread, this, threadSleepGranularityMilliseconds); 351 } 352 353 ~ServerIndex()354 ServerIndex::~ServerIndex() 355 { 356 if (!done_) 357 { 358 LOG(ERROR) << "INTERNAL ERROR: ServerIndex::Stop() should be invoked manually to avoid mess in the destruction order!"; 359 Stop(); 360 } 361 } 362 363 Stop()364 void ServerIndex::Stop() 365 { 366 if (!done_) 367 { 368 done_ = true; 369 370 if (flushThread_.joinable()) 371 { 372 flushThread_.join(); 373 } 374 375 if (unstableResourcesMonitorThread_.joinable()) 376 { 377 unstableResourcesMonitorThread_.join(); 378 } 379 } 380 } 381 382 SetMaximumPatientCount(unsigned int count)383 void ServerIndex::SetMaximumPatientCount(unsigned int count) 384 { 385 { 386 boost::mutex::scoped_lock lock(monitoringMutex_); 387 maximumPatients_ = count; 388 389 if (count == 0) 390 { 391 LOG(WARNING) << "No limit on the number of stored patients"; 392 } 393 else 394 { 395 LOG(WARNING) << "At most " << count << " patients will be stored"; 396 } 397 } 398 399 StandaloneRecycling(maximumStorageSize_, maximumPatients_); 400 } 401 402 SetMaximumStorageSize(uint64_t size)403 void ServerIndex::SetMaximumStorageSize(uint64_t size) 404 { 405 { 406 boost::mutex::scoped_lock lock(monitoringMutex_); 407 maximumStorageSize_ = size; 408 409 if (size == 0) 410 { 411 LOG(WARNING) << "No limit on the size of the storage area"; 412 } 413 else 414 { 415 LOG(WARNING) << "At most " << (size / MEGA_BYTES) << "MB will be used for the storage area"; 416 } 417 } 418 419 StandaloneRecycling(maximumStorageSize_, maximumPatients_); 420 } 421 422 UnstableResourcesMonitorThread(ServerIndex * that,unsigned int threadSleepGranularityMilliseconds)423 void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that, 424 unsigned int threadSleepGranularityMilliseconds) 425 { 426 int stableAge; 427 428 { 429 OrthancConfiguration::ReaderLock lock; 430 stableAge = lock.GetConfiguration().GetUnsignedIntegerParameter("StableAge", 60); 431 } 432 433 if (stableAge <= 0) 434 { 435 stableAge = 60; 436 } 437 438 LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")"; 439 440 while (!that->done_) 441 { 442 // Check for stable resources each few seconds 443 boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds)); 444 445 for (;;) 446 { 447 UnstableResourcePayload stableResource; 448 int64_t stableId; 449 450 { 451 boost::mutex::scoped_lock lock(that->monitoringMutex_); 452 453 if (!that->unstableResources_.IsEmpty() && 454 that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge)) 455 { 456 // This DICOM resource has not received any new instance for 457 // some time. It can be considered as stable. 458 stableId = that->unstableResources_.RemoveOldest(stableResource); 459 //LOG(TRACE) << "Stable resource: " << EnumerationToString(stableResource.GetResourceType()) << " " << stableId; 460 } 461 else 462 { 463 // No more stable DICOM resource, leave the internal loop 464 break; 465 } 466 } 467 468 try 469 { 470 /** 471 * WARNING: Don't protect the calls to "LogChange()" using 472 * "monitoringMutex_", as this could lead to deadlocks in 473 * other threads (typically, if "Store()" is being running in 474 * another thread, which leads to calls to "MarkAsUnstable()", 475 * which leads to two lockings of "monitoringMutex_"). 476 **/ 477 switch (stableResource.GetResourceType()) 478 { 479 case ResourceType_Patient: 480 that->LogChange(stableId, ChangeType_StablePatient, stableResource.GetPublicId(), ResourceType_Patient); 481 break; 482 483 case ResourceType_Study: 484 that->LogChange(stableId, ChangeType_StableStudy, stableResource.GetPublicId(), ResourceType_Study); 485 break; 486 487 case ResourceType_Series: 488 that->LogChange(stableId, ChangeType_StableSeries, stableResource.GetPublicId(), ResourceType_Series); 489 break; 490 491 default: 492 throw OrthancException(ErrorCode_InternalError); 493 } 494 } 495 catch (OrthancException& e) 496 { 497 LOG(ERROR) << "Cannot log a change about a stable resource into the database"; 498 } 499 } 500 } 501 502 LOG(INFO) << "Closing the monitor thread for stable resources"; 503 } 504 505 MarkAsUnstable(int64_t id,Orthanc::ResourceType type,const std::string & publicId)506 void ServerIndex::MarkAsUnstable(int64_t id, 507 Orthanc::ResourceType type, 508 const std::string& publicId) 509 { 510 assert(type == Orthanc::ResourceType_Patient || 511 type == Orthanc::ResourceType_Study || 512 type == Orthanc::ResourceType_Series); 513 514 { 515 boost::mutex::scoped_lock lock(monitoringMutex_); 516 UnstableResourcePayload payload(type, publicId); 517 unstableResources_.AddOrMakeMostRecent(id, payload); 518 //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id; 519 } 520 } 521 522 Store(std::map<MetadataType,std::string> & instanceMetadata,const DicomMap & dicomSummary,const ServerIndex::Attachments & attachments,const ServerIndex::MetadataMap & metadata,const DicomInstanceOrigin & origin,bool overwrite,bool hasTransferSyntax,DicomTransferSyntax transferSyntax,bool hasPixelDataOffset,uint64_t pixelDataOffset)523 StoreStatus ServerIndex::Store(std::map<MetadataType, std::string>& instanceMetadata, 524 const DicomMap& dicomSummary, 525 const ServerIndex::Attachments& attachments, 526 const ServerIndex::MetadataMap& metadata, 527 const DicomInstanceOrigin& origin, 528 bool overwrite, 529 bool hasTransferSyntax, 530 DicomTransferSyntax transferSyntax, 531 bool hasPixelDataOffset, 532 uint64_t pixelDataOffset) 533 { 534 uint64_t maximumStorageSize; 535 unsigned int maximumPatients; 536 537 { 538 boost::mutex::scoped_lock lock(monitoringMutex_); 539 maximumStorageSize = maximumStorageSize_; 540 maximumPatients = maximumPatients_; 541 } 542 543 return StatelessDatabaseOperations::Store( 544 instanceMetadata, dicomSummary, attachments, metadata, origin, overwrite, hasTransferSyntax, 545 transferSyntax, hasPixelDataOffset, pixelDataOffset, maximumStorageSize, maximumPatients); 546 } 547 548 AddAttachment(int64_t & newRevision,const FileInfo & attachment,const std::string & publicId,bool hasOldRevision,int64_t oldRevision,const std::string & oldMD5)549 StoreStatus ServerIndex::AddAttachment(int64_t& newRevision, 550 const FileInfo& attachment, 551 const std::string& publicId, 552 bool hasOldRevision, 553 int64_t oldRevision, 554 const std::string& oldMD5) 555 { 556 uint64_t maximumStorageSize; 557 unsigned int maximumPatients; 558 559 { 560 boost::mutex::scoped_lock lock(monitoringMutex_); 561 maximumStorageSize = maximumStorageSize_; 562 maximumPatients = maximumPatients_; 563 } 564 565 return StatelessDatabaseOperations::AddAttachment( 566 newRevision, attachment, publicId, maximumStorageSize, maximumPatients, 567 hasOldRevision, oldRevision, oldMD5); 568 } 569 } 570