1 /**
2  * Orthanc - A Lightweight, RESTful DICOM Store
3  * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4  * Department, University Hospital of Liege, Belgium
5  * Copyright (C) 2017-2021 Osimis S.A., Belgium
6  *
7  * This program is free software: you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License as
9  * published by the Free Software Foundation, either version 3 of the
10  * License, or (at your option) any later version.
11  *
12  * In addition, as a special exception, the copyright holders of this
13  * program give permission to link the code of its release with the
14  * OpenSSL project's "OpenSSL" library (or with modified versions of it
15  * that use the same license as the "OpenSSL" library), and distribute
16  * the linked executables. You must obey the GNU General Public License
17  * in all respects for all of the code used other than "OpenSSL". If you
18  * modify file(s) with this exception, you may extend this exception to
19  * your version of the file(s), but you are not obligated to do so. If
20  * you do not wish to do so, delete this exception statement from your
21  * version. If you delete this exception statement from all source files
22  * in the program, then also delete it here.
23  *
24  * This program is distributed in the hope that it will be useful, but
25  * WITHOUT ANY WARRANTY; without even the implied warranty of
26  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27  * General Public License for more details.
28  *
29  * You should have received a copy of the GNU General Public License
30  * along with this program. If not, see <http://www.gnu.org/licenses/>.
31  **/
32 
33 
34 #include "PrecompiledHeadersServer.h"
35 #include "ServerIndex.h"
36 
37 #ifndef NOMINMAX
38 #define NOMINMAX
39 #endif
40 
41 #include "../../OrthancFramework/Sources/Logging.h"
42 #include "../../OrthancFramework/Sources/Toolbox.h"
43 
44 #include "OrthancConfiguration.h"
45 #include "ServerContext.h"
46 #include "ServerIndexChange.h"
47 #include "ServerToolbox.h"
48 
49 
50 static const uint64_t MEGA_BYTES = 1024 * 1024;
51 
52 namespace Orthanc
53 {
54   class ServerIndex::TransactionContext : public StatelessDatabaseOperations::ITransactionContext
55   {
56   private:
57     struct FileToRemove
58     {
59     private:
60       std::string  uuid_;
61       FileContentType  type_;
62 
63     public:
FileToRemoveOrthanc::ServerIndex::TransactionContext::FileToRemove64       explicit FileToRemove(const FileInfo& info) :
65         uuid_(info.GetUuid()),
66         type_(info.GetContentType())
67       {
68       }
69 
GetUuidOrthanc::ServerIndex::TransactionContext::FileToRemove70       const std::string& GetUuid() const
71       {
72         return uuid_;
73       }
74 
GetContentTypeOrthanc::ServerIndex::TransactionContext::FileToRemove75       FileContentType GetContentType() const
76       {
77         return type_;
78       }
79     };
80 
81     ServerContext& context_;
82     bool hasRemainingLevel_;
83     ResourceType remainingType_;
84     std::string remainingPublicId_;
85     std::list<FileToRemove> pendingFilesToRemove_;
86     std::list<ServerIndexChange> pendingChanges_;
87     uint64_t sizeOfFilesToRemove_;
88     uint64_t sizeOfAddedAttachments_;
89 
Reset()90     void Reset()
91     {
92       sizeOfFilesToRemove_ = 0;
93       hasRemainingLevel_ = false;
94       remainingType_ = ResourceType_Instance;  // dummy initialization
95       pendingFilesToRemove_.clear();
96       pendingChanges_.clear();
97       sizeOfAddedAttachments_ = 0;
98     }
99 
CommitFilesToRemove()100     void CommitFilesToRemove()
101     {
102       for (std::list<FileToRemove>::const_iterator
103              it = pendingFilesToRemove_.begin();
104            it != pendingFilesToRemove_.end(); ++it)
105       {
106         try
107         {
108           context_.RemoveFile(it->GetUuid(), it->GetContentType());
109         }
110         catch (OrthancException& e)
111         {
112           LOG(ERROR) << "Unable to remove an attachment from the storage area: "
113                      << it->GetUuid() << " (type: " << EnumerationToString(it->GetContentType()) << ")";
114         }
115       }
116     }
117 
CommitChanges()118     void CommitChanges()
119     {
120       for (std::list<ServerIndexChange>::const_iterator
121              it = pendingChanges_.begin();
122            it != pendingChanges_.end(); ++it)
123       {
124         context_.SignalChange(*it);
125       }
126     }
127 
128   public:
TransactionContext(ServerContext & context)129     explicit TransactionContext(ServerContext& context) :
130       context_(context)
131     {
132       Reset();
133       assert(ResourceType_Patient < ResourceType_Study &&
134              ResourceType_Study < ResourceType_Series &&
135              ResourceType_Series < ResourceType_Instance);
136     }
137 
SignalRemainingAncestor(ResourceType parentType,const std::string & publicId)138     virtual void SignalRemainingAncestor(ResourceType parentType,
139                                          const std::string& publicId) ORTHANC_OVERRIDE
140     {
141       LOG(TRACE) << "Remaining ancestor \"" << publicId << "\" (" << parentType << ")";
142 
143       if (hasRemainingLevel_)
144       {
145         if (parentType < remainingType_)
146         {
147           remainingType_ = parentType;
148           remainingPublicId_ = publicId;
149         }
150       }
151       else
152       {
153         hasRemainingLevel_ = true;
154         remainingType_ = parentType;
155         remainingPublicId_ = publicId;
156       }
157     }
158 
SignalAttachmentDeleted(const FileInfo & info)159     virtual void SignalAttachmentDeleted(const FileInfo& info) ORTHANC_OVERRIDE
160     {
161       assert(Toolbox::IsUuid(info.GetUuid()));
162       pendingFilesToRemove_.push_back(FileToRemove(info));
163       sizeOfFilesToRemove_ += info.GetCompressedSize();
164     }
165 
SignalResourceDeleted(ResourceType type,const std::string & publicId)166     virtual void SignalResourceDeleted(ResourceType type,
167                                        const std::string& publicId) ORTHANC_OVERRIDE
168     {
169       SignalChange(ServerIndexChange(ChangeType_Deleted, type, publicId));
170     }
171 
SignalChange(const ServerIndexChange & change)172     virtual void SignalChange(const ServerIndexChange& change) ORTHANC_OVERRIDE
173     {
174       LOG(TRACE) << "Change related to resource " << change.GetPublicId() << " of type "
175                  << EnumerationToString(change.GetResourceType()) << ": "
176                  << EnumerationToString(change.GetChangeType());
177 
178       pendingChanges_.push_back(change);
179     }
180 
SignalAttachmentsAdded(uint64_t compressedSize)181     virtual void SignalAttachmentsAdded(uint64_t compressedSize) ORTHANC_OVERRIDE
182     {
183       sizeOfAddedAttachments_ += compressedSize;
184     }
185 
LookupRemainingLevel(std::string & remainingPublicId,ResourceType & remainingLevel)186     virtual bool LookupRemainingLevel(std::string& remainingPublicId /* out */,
187                                       ResourceType& remainingLevel   /* out */) ORTHANC_OVERRIDE
188     {
189       if (hasRemainingLevel_)
190       {
191         remainingPublicId = remainingPublicId_;
192         remainingLevel = remainingType_;
193         return true;
194       }
195       else
196       {
197         return false;
198       }
199     };
200 
MarkAsUnstable(int64_t id,Orthanc::ResourceType type,const std::string & publicId)201     virtual void MarkAsUnstable(int64_t id,
202                                 Orthanc::ResourceType type,
203                                 const std::string& publicId) ORTHANC_OVERRIDE
204     {
205       context_.GetIndex().MarkAsUnstable(id, type, publicId);
206     }
207 
IsUnstableResource(int64_t id)208     virtual bool IsUnstableResource(int64_t id) ORTHANC_OVERRIDE
209     {
210       return context_.GetIndex().IsUnstableResource(id);
211     }
212 
Commit()213     virtual void Commit() ORTHANC_OVERRIDE
214     {
215       // We can remove the files once the SQLite transaction has
216       // been successfully committed. Some files might have to be
217       // deleted because of recycling.
218       CommitFilesToRemove();
219 
220       // Send all the pending changes to the Orthanc plugins
221       CommitChanges();
222     }
223 
GetCompressedSizeDelta()224     virtual int64_t GetCompressedSizeDelta() ORTHANC_OVERRIDE
225     {
226       return (static_cast<int64_t>(sizeOfAddedAttachments_) -
227               static_cast<int64_t>(sizeOfFilesToRemove_));
228     }
229   };
230 
231 
232   class ServerIndex::TransactionContextFactory : public ITransactionContextFactory
233   {
234   private:
235     ServerContext& context_;
236 
237   public:
TransactionContextFactory(ServerContext & context)238     explicit TransactionContextFactory(ServerContext& context) :
239       context_(context)
240     {
241     }
242 
Create()243     virtual ITransactionContext* Create()
244     {
245       // There can be concurrent calls to this method, which is not an
246       // issue because we simply create an object
247       return new TransactionContext(context_);
248     }
249   };
250 
251 
252   class ServerIndex::UnstableResourcePayload
253   {
254   private:
255     ResourceType type_;
256     std::string publicId_;
257     boost::posix_time::ptime time_;
258 
259   public:
UnstableResourcePayload()260     UnstableResourcePayload() : type_(ResourceType_Instance)
261     {
262     }
263 
UnstableResourcePayload(Orthanc::ResourceType type,const std::string & publicId)264     UnstableResourcePayload(Orthanc::ResourceType type,
265                             const std::string& publicId) :
266       type_(type),
267       publicId_(publicId),
268       time_(boost::posix_time::second_clock::local_time())
269     {
270     }
271 
GetAge() const272     unsigned int GetAge() const
273     {
274       return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
275     }
276 
GetResourceType() const277     ResourceType GetResourceType() const
278     {
279       return type_;
280     }
281 
GetPublicId() const282     const std::string& GetPublicId() const
283     {
284       return publicId_;
285     }
286   };
287 
288 
FlushThread(ServerIndex * that,unsigned int threadSleepGranularityMilliseconds)289   void ServerIndex::FlushThread(ServerIndex* that,
290                                 unsigned int threadSleepGranularityMilliseconds)
291   {
292     // By default, wait for 10 seconds before flushing
293     static const unsigned int SLEEP_SECONDS = 10;
294 
295     if (threadSleepGranularityMilliseconds > 1000)
296     {
297       throw OrthancException(ErrorCode_ParameterOutOfRange);
298     }
299 
300     LOG(INFO) << "Starting the database flushing thread (sleep = " << SLEEP_SECONDS << " seconds)";
301 
302     unsigned int count = 0;
303     unsigned int countThreshold = (1000 * SLEEP_SECONDS) / threadSleepGranularityMilliseconds;
304 
305     while (!that->done_)
306     {
307       boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds));
308       count++;
309 
310       if (count >= countThreshold)
311       {
312         Logging::Flush();
313         that->FlushToDisk();
314 
315         count = 0;
316       }
317     }
318 
319     LOG(INFO) << "Stopping the database flushing thread";
320   }
321 
322 
IsUnstableResource(int64_t id)323   bool ServerIndex::IsUnstableResource(int64_t id)
324   {
325     boost::mutex::scoped_lock lock(monitoringMutex_);
326     return unstableResources_.Contains(id);
327   }
328 
329 
ServerIndex(ServerContext & context,IDatabaseWrapper & db,unsigned int threadSleepGranularityMilliseconds)330   ServerIndex::ServerIndex(ServerContext& context,
331                            IDatabaseWrapper& db,
332                            unsigned int threadSleepGranularityMilliseconds) :
333     StatelessDatabaseOperations(db),
334     done_(false),
335     maximumStorageSize_(0),
336     maximumPatients_(0)
337   {
338     SetTransactionContextFactory(new TransactionContextFactory(context));
339 
340     // Initial recycling if the parameters have changed since the last
341     // execution of Orthanc
342     StandaloneRecycling(maximumStorageSize_, maximumPatients_);
343 
344     if (HasFlushToDisk())
345     {
346       flushThread_ = boost::thread(FlushThread, this, threadSleepGranularityMilliseconds);
347     }
348 
349     unstableResourcesMonitorThread_ = boost::thread
350       (UnstableResourcesMonitorThread, this, threadSleepGranularityMilliseconds);
351   }
352 
353 
~ServerIndex()354   ServerIndex::~ServerIndex()
355   {
356     if (!done_)
357     {
358       LOG(ERROR) << "INTERNAL ERROR: ServerIndex::Stop() should be invoked manually to avoid mess in the destruction order!";
359       Stop();
360     }
361   }
362 
363 
Stop()364   void ServerIndex::Stop()
365   {
366     if (!done_)
367     {
368       done_ = true;
369 
370       if (flushThread_.joinable())
371       {
372         flushThread_.join();
373       }
374 
375       if (unstableResourcesMonitorThread_.joinable())
376       {
377         unstableResourcesMonitorThread_.join();
378       }
379     }
380   }
381 
382 
SetMaximumPatientCount(unsigned int count)383   void ServerIndex::SetMaximumPatientCount(unsigned int count)
384   {
385     {
386       boost::mutex::scoped_lock lock(monitoringMutex_);
387       maximumPatients_ = count;
388 
389       if (count == 0)
390       {
391         LOG(WARNING) << "No limit on the number of stored patients";
392       }
393       else
394       {
395         LOG(WARNING) << "At most " << count << " patients will be stored";
396       }
397     }
398 
399     StandaloneRecycling(maximumStorageSize_, maximumPatients_);
400   }
401 
402 
SetMaximumStorageSize(uint64_t size)403   void ServerIndex::SetMaximumStorageSize(uint64_t size)
404   {
405     {
406       boost::mutex::scoped_lock lock(monitoringMutex_);
407       maximumStorageSize_ = size;
408 
409       if (size == 0)
410       {
411         LOG(WARNING) << "No limit on the size of the storage area";
412       }
413       else
414       {
415         LOG(WARNING) << "At most " << (size / MEGA_BYTES) << "MB will be used for the storage area";
416       }
417     }
418 
419     StandaloneRecycling(maximumStorageSize_, maximumPatients_);
420   }
421 
422 
UnstableResourcesMonitorThread(ServerIndex * that,unsigned int threadSleepGranularityMilliseconds)423   void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that,
424                                                    unsigned int threadSleepGranularityMilliseconds)
425   {
426     int stableAge;
427 
428     {
429       OrthancConfiguration::ReaderLock lock;
430       stableAge = lock.GetConfiguration().GetUnsignedIntegerParameter("StableAge", 60);
431     }
432 
433     if (stableAge <= 0)
434     {
435       stableAge = 60;
436     }
437 
438     LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")";
439 
440     while (!that->done_)
441     {
442       // Check for stable resources each few seconds
443       boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds));
444 
445       for (;;)
446       {
447         UnstableResourcePayload stableResource;
448         int64_t stableId;
449 
450         {
451           boost::mutex::scoped_lock lock(that->monitoringMutex_);
452 
453           if (!that->unstableResources_.IsEmpty() &&
454               that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge))
455           {
456             // This DICOM resource has not received any new instance for
457             // some time. It can be considered as stable.
458             stableId = that->unstableResources_.RemoveOldest(stableResource);
459             //LOG(TRACE) << "Stable resource: " << EnumerationToString(stableResource.GetResourceType()) << " " << stableId;
460           }
461           else
462           {
463             // No more stable DICOM resource, leave the internal loop
464             break;
465           }
466         }
467 
468         try
469         {
470           /**
471            * WARNING: Don't protect the calls to "LogChange()" using
472            * "monitoringMutex_", as this could lead to deadlocks in
473            * other threads (typically, if "Store()" is being running in
474            * another thread, which leads to calls to "MarkAsUnstable()",
475            * which leads to two lockings of "monitoringMutex_").
476            **/
477           switch (stableResource.GetResourceType())
478           {
479             case ResourceType_Patient:
480               that->LogChange(stableId, ChangeType_StablePatient, stableResource.GetPublicId(), ResourceType_Patient);
481               break;
482 
483             case ResourceType_Study:
484               that->LogChange(stableId, ChangeType_StableStudy, stableResource.GetPublicId(), ResourceType_Study);
485               break;
486 
487             case ResourceType_Series:
488               that->LogChange(stableId, ChangeType_StableSeries, stableResource.GetPublicId(), ResourceType_Series);
489               break;
490 
491             default:
492               throw OrthancException(ErrorCode_InternalError);
493           }
494         }
495         catch (OrthancException& e)
496         {
497           LOG(ERROR) << "Cannot log a change about a stable resource into the database";
498         }
499       }
500     }
501 
502     LOG(INFO) << "Closing the monitor thread for stable resources";
503   }
504 
505 
MarkAsUnstable(int64_t id,Orthanc::ResourceType type,const std::string & publicId)506   void ServerIndex::MarkAsUnstable(int64_t id,
507                                    Orthanc::ResourceType type,
508                                    const std::string& publicId)
509   {
510     assert(type == Orthanc::ResourceType_Patient ||
511            type == Orthanc::ResourceType_Study ||
512            type == Orthanc::ResourceType_Series);
513 
514     {
515       boost::mutex::scoped_lock lock(monitoringMutex_);
516       UnstableResourcePayload payload(type, publicId);
517       unstableResources_.AddOrMakeMostRecent(id, payload);
518       //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id;
519     }
520   }
521 
522 
Store(std::map<MetadataType,std::string> & instanceMetadata,const DicomMap & dicomSummary,const ServerIndex::Attachments & attachments,const ServerIndex::MetadataMap & metadata,const DicomInstanceOrigin & origin,bool overwrite,bool hasTransferSyntax,DicomTransferSyntax transferSyntax,bool hasPixelDataOffset,uint64_t pixelDataOffset)523   StoreStatus ServerIndex::Store(std::map<MetadataType, std::string>& instanceMetadata,
524                                  const DicomMap& dicomSummary,
525                                  const ServerIndex::Attachments& attachments,
526                                  const ServerIndex::MetadataMap& metadata,
527                                  const DicomInstanceOrigin& origin,
528                                  bool overwrite,
529                                  bool hasTransferSyntax,
530                                  DicomTransferSyntax transferSyntax,
531                                  bool hasPixelDataOffset,
532                                  uint64_t pixelDataOffset)
533   {
534     uint64_t maximumStorageSize;
535     unsigned int maximumPatients;
536 
537     {
538       boost::mutex::scoped_lock lock(monitoringMutex_);
539       maximumStorageSize = maximumStorageSize_;
540       maximumPatients = maximumPatients_;
541     }
542 
543     return StatelessDatabaseOperations::Store(
544       instanceMetadata, dicomSummary, attachments, metadata, origin, overwrite, hasTransferSyntax,
545       transferSyntax, hasPixelDataOffset, pixelDataOffset, maximumStorageSize, maximumPatients);
546   }
547 
548 
AddAttachment(int64_t & newRevision,const FileInfo & attachment,const std::string & publicId,bool hasOldRevision,int64_t oldRevision,const std::string & oldMD5)549   StoreStatus ServerIndex::AddAttachment(int64_t& newRevision,
550                                          const FileInfo& attachment,
551                                          const std::string& publicId,
552                                          bool hasOldRevision,
553                                          int64_t oldRevision,
554                                          const std::string& oldMD5)
555   {
556     uint64_t maximumStorageSize;
557     unsigned int maximumPatients;
558 
559     {
560       boost::mutex::scoped_lock lock(monitoringMutex_);
561       maximumStorageSize = maximumStorageSize_;
562       maximumPatients = maximumPatients_;
563     }
564 
565     return StatelessDatabaseOperations::AddAttachment(
566       newRevision, attachment, publicId, maximumStorageSize, maximumPatients,
567       hasOldRevision, oldRevision, oldMD5);
568   }
569 }
570