1 #ifndef BDB___SPLIT_BLOB_HPP__
2 #define BDB___SPLIT_BLOB_HPP__
3 
4 /* $Id: bdb_split_blob.hpp 617975 2020-10-08 18:28:17Z grichenk $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author:  Anatoliy Kuznetsov
30  *
31  * File Description: BDB library split BLOB store.
32  *
33  */
34 
35 
36 /// @file bdb_split_blob.hpp
37 /// BDB library split BLOB store.
38 
39 #include <corelib/ncbistd.hpp>
40 #include <corelib/ncbimtx.hpp>
41 #include <corelib/ncbistre.hpp>
42 #include <corelib/ncbistr.hpp>
43 #include <corelib/ncbifile.hpp>
44 
45 #include <util/math/matrix.hpp>
46 
47 #include <db/bdb/bdb_blob.hpp>
48 #include <db/bdb/bdb_bv_store.hpp>
49 #include <db/bdb/bdb_cursor.hpp>
50 #include <db/bdb/bdb_trans.hpp>
51 #include <db/error_codes.hpp>
52 
53 #include <util/id_mux.hpp>
54 
55 
56 BEGIN_NCBI_SCOPE
57 
58 /** @addtogroup BDB_BLOB
59  *
60  * @{
61  */
62 
63 
64 /// Persistent storage for demux information
65 ///
66 template<class TBV>
67 class CBDB_BlobStoreDict : public CBDB_BvStore<TBV>
68 {
69 public:
70     CBDB_FieldUint4        dim;     ///< dimention
71     CBDB_FieldUint4        dim_idx; ///< projection index
72 
73     typedef CBDB_BvStore<TBV>         TParent;
74 
75 public:
CBDB_BlobStoreDict()76     CBDB_BlobStoreDict()
77     {
78         this->BindKey("dim",       &dim);
79         this->BindKey("dim_idx",   &dim_idx);
80     }
81 };
82 
83 /// Base class for page-split demultiplexers
84 ///
85 class CBDB_BlobDeMuxSplit : public IObjDeMux<unsigned>
86 {
87 public:
88     /// LOBs are getting split into slices based on LOB size,
89     /// similar BLOBs go to the compartment with more optimal storage
90     /// paramaters
91     ///
92     static
SelectSplit(size_t blob_size)93     unsigned SelectSplit(size_t blob_size)
94     {
95         static const unsigned size_split[] = {
96             256, 512, 2048, 4096, 8192, 16384, 32768
97         };
98         const unsigned kMaxSize = (unsigned)(sizeof(size_split) / sizeof(*size_split));
99 
100         for (unsigned i = 0; i < kMaxSize;  ++i) {
101             if (blob_size < size_split[i]) {
102                 return i;
103             }
104         }
105         return kMaxSize;
106     }
107 
108     /// Returns total number of splits (horizontal projection)
109     /// If method returns 0 - means there is no hard number: open ended proj
GetSplitSize() const110     unsigned GetSplitSize() const { return 8; }
111 
112     /// Returns total number of volumes (vertical projection)
113     /// If method returns 0 - means there is no hard number: open ended proj
GetVolumeSize() const114     unsigned GetVolumeSize() const { return 0; }
115 };
116 
117 /// Volume split BLOB demultiplexer
118 ///
119 /// This class is doing some simple accounting, counting size and number
120 /// of incoming LOBs, splitting them into [volume, page size]
121 ///
122 class CBDB_BlobDeMux : public CBDB_BlobDeMuxSplit
123 {
124 public:
125     typedef CNcbiMatrix<double>    TVolumeSize;
126     typedef CNcbiMatrix<unsigned>  TVolumeRecs;
127 
128 public:
CBDB_BlobDeMux(double vol_max=1.5* (1024.00* 1024.00* 1024.00),unsigned rec_max=3* 1000000)129     CBDB_BlobDeMux(double    vol_max = 1.5 * (1024.00*1024.00*1024.00),
130                    unsigned  rec_max = 3 * 1000000)
131         : m_VolMax(vol_max), m_RecMax(rec_max)
132     {
133     }
134 
135     /// coordinates:
136     ///
137     ///  0 - active volume number
138     ///  1 - page split number
139     ///
GetCoordinates(unsigned blob_size,unsigned * coord)140     void GetCoordinates(unsigned blob_size, unsigned* coord)
141     {
142         _ASSERT(coord);
143         _ASSERT(m_RecS.GetRows() == m_VolS.GetRows());
144         _ASSERT(m_RecS.GetCols() == m_VolS.GetCols());
145 
146         coord[1] = CBDB_BlobDeMux::SelectSplit(blob_size);
147         size_t max_col = max((size_t)m_RecS.GetCols(),
148                              (size_t)coord[1] + 1);
149         m_RecS.Resize(m_RecS.GetRows(), max_col);
150         m_VolS.Resize(m_VolS.GetRows(), max_col);
151 
152         for (unsigned i = 0;  i < m_RecS.GetRows();  ++i) {
153             if (m_RecS(i, coord[1]) < m_RecMax  &&
154                 m_VolS(i, coord[1]) < m_VolMax) {
155                 coord[0] = i;
156                 ++m_RecS(i, coord[1]);
157                 m_VolS  (i, coord[1]) += blob_size;
158                 return;
159             }
160         }
161 
162         /// not found
163         NewPlane();
164         coord[0] = unsigned(m_RecS.GetRows() - 1);
165     }
166 
167 
168 protected:
NewPlane()169     void NewPlane()
170     {
171         size_t max_col = max((size_t)m_RecS.GetCols(), (size_t)1);
172         m_RecS.Resize(m_RecS.GetRows() + 1, max_col);
173         m_VolS.Resize(m_VolS.GetRows() + 1, max_col);
174     }
175 
176 
177     TVolumeSize  m_VolS;  ///< Volumes BLOB sizes
178     TVolumeRecs  m_RecS;  ///< Volumes record counts
179 
180     double       m_VolMax; ///< Volume max size
181     unsigned     m_RecMax; ///< Maximum number of records
182 };
183 
184 /// Split demux which can save and load state into a file
185 /// Stateful (persistent) class.
186 ///
187 class CBDB_BlobDeMuxPersistent : public CBDB_BlobDeMux
188 {
189 public:
CBDB_BlobDeMuxPersistent(const string & path,double vol_max=1.5* (1024.00* 1024.00* 1024.00),unsigned rec_max=3* 1000000)190     CBDB_BlobDeMuxPersistent(const string& path,
191         double    vol_max = 1.5 * (1024.00*1024.00*1024.00),
192         unsigned  rec_max = 3 * 1000000)
193         : CBDB_BlobDeMux(vol_max, rec_max)
194         , m_Path(path)
195     {
196         if ( !m_Path.empty()  &&  CFile(m_Path).Exists()) {
197             CNcbiIfstream istr(m_Path.c_str());
198             Load(istr);
199         }
200     }
201 
~CBDB_BlobDeMuxPersistent()202     ~CBDB_BlobDeMuxPersistent()
203     {
204         if ( !m_Path.empty() ) {
205             try {
206                 CNcbiOfstream ostr(m_Path.c_str());
207                 Save(ostr);
208             }
209             catch (CException& e) {
210                 ERR_POST_XX(Db_Bdb_Blob, 2, Error << "CBDB_BlobDeMux::~CBDB_BlobDeMux(): "
211                             "error saving demultiplex data: " << e.what());
212             }
213         }
214     }
215 
216 
Save(CNcbiOstream & ostr)217     void Save(CNcbiOstream& ostr)
218     {
219         ostr << m_RecS.GetRows() << " " << m_RecS.GetCols() << endl;
220         for (size_t i = 0;  i < m_RecS.GetRows();  ++i) {
221             for (size_t j = 0;  j < m_RecS.GetCols();  ++j) {
222                 ostr << m_RecS(i, j) << " ";
223             }
224             ostr << endl;
225         }
226         ostr << m_VolS.GetRows() << " " << m_VolS.GetCols() << endl;
227         for (size_t i = 0;  i < m_VolS.GetRows();  ++i) {
228             for (size_t j = 0;  j < m_VolS.GetCols();  ++j) {
229                 ostr << m_VolS(i, j) << " ";
230             }
231             ostr << endl;
232         }
233     }
234 
Load(CNcbiIstream & istr)235     void Load(CNcbiIstream& istr)
236     {
237         size_t i, j;
238 
239         istr >> i >> j;
240         m_RecS.Resize(i, j);
241         for (i = 0; i < m_RecS.GetRows(); ++i) {
242             for (j = 0; j < m_RecS.GetCols(); ++j) {
243                 istr >> m_RecS(i, j);
244             }
245         }
246 
247         istr >> i >> j;
248         m_VolS.Resize(i, j);
249         for (i = 0; i < m_VolS.GetRows(); ++i) {
250             for (j = 0; j < m_VolS.GetCols(); ++j) {
251                 istr >> m_VolS(i, j);
252             }
253         }
254     }
255 
256 private:
257     string m_Path;
258 };
259 
260 
261 /// BLOB demultiplexer implements round-robin volume rotation.
262 ///
263 /// This demultiplexer sends every new BLOB to a next volume, reducing
264 /// locking contention over one BDB database.
265 ///
266 class CBDB_BlobDeMux_RoundRobin : public CBDB_BlobDeMuxSplit
267 {
268 public:
CBDB_BlobDeMux_RoundRobin(unsigned volumes=0)269     CBDB_BlobDeMux_RoundRobin(unsigned volumes = 0)
270         : m_Volumes(volumes), m_CurrVolume(0)
271     {
272     }
273 
274     /// coordinates:
275     ///
276     ///  0 - active volume number
277     ///  1 - page split number
278     ///
GetCoordinates(unsigned blob_size,unsigned * coord)279     void GetCoordinates(unsigned blob_size, unsigned* coord)
280     {
281         _ASSERT(coord);
282 
283         coord[0] = m_CurrVolume;
284         coord[1] = SelectSplit(blob_size);
285 
286         // every next BLOB goes to the next volume (round-robin)
287         ++m_CurrVolume;
288         if (m_CurrVolume >= m_Volumes) {
289             m_CurrVolume = 0;
290         }
291     }
292 
293     /// Returns total number of volumes (vertical projection)
294     /// If method returns 0 - means there is no hard number: open ended proj
GetVolumeSize() const295     unsigned GetVolumeSize() const { return m_Volumes; }
296 
297 private:
298     unsigned  m_Volumes;
299     unsigned  m_CurrVolume;
300 };
301 
302 
303 
304 
305 /// BLOB storage based on single unsigned integer key
306 /// Supports BLOB volumes and different base page size files in the volume
307 /// to guarantee the best fit.
308 ///
309 ///
310 /// Problem.
311 /// Berkeley DB shows measurable difference in behavior and performance
312 /// depending on the combination of record size and database page size.
313 /// Differences include amount of disk traffic, locking granularity,
314 /// number of overflow pages, etc.
315 ///
316 /// The most critical here is overflow pages.
317 /// If DB page cannot accommodate 2(sometimes more) records BDB creates
318 /// overflow pages.  This is found to be expensive. The typical fix is to
319 /// increase the page size.
320 /// Large page size is inefficient for dealing with small record
321 /// (you have to load/store 64K (full page) to load small object.
322 /// In transaction environment page access are also locks a lot of records.
323 /// Page size also influences B-Tree depth and number of internal pages.
324 /// Number of internal pages affects database size and retrieval performance.
325 ///
326 ///
327 /// Object maintains a matrix of BDB databases.
328 /// Every row maintains certain database volume or(and) number of records.
329 /// Every column groups BLOBs of certain size together, so class can choose
330 /// the best page size to store BLOBs without long chains of overflow pages.
331 ///
332 /// <pre>
333 ///                      Page size split:
334 ///  Volume
335 ///  split:        4K     8K     16K    32K
336 ///              +------+------+------+------+
337 ///  row = 0     | DB   | ...................|  = SUM = N Gbytes
338 ///  row = 1     | DB   | .....              |  = SUM = N GBytes
339 ///
340 ///                .........................
341 ///
342 ///              +------+------+------+------+
343 ///
344 /// </pre>
345 ///
346 /// Matrix coordinates picking is implemented using concept called DeMux.
347 /// It maintains BLOB_ID <-> coordinates association.
348 /// Demux implementation(s) use bit-vectors to do the job. BLOB ID must be
349 /// unique across the store. In general DeMux can work with N-dimensional
350 /// coordinates to address  host, partition, volume, slice  (distributed
351 /// store).  But current practical implementation uses 2D matrix (volume,
352 /// slice).
353 ///
354 
355 template<class TBV, class TObjDeMux=CBDB_BlobDeMux, class TL=CFastMutex>
356 class CBDB_BlobSplitStore : public CThreadLocalTransactional
357 {
358 public:
359     typedef CIdDeMux<TBV>                TIdDeMux;
360     typedef TBV                          TBitVector;
361     typedef CBDB_BlobStoreDict<TBV>      TDeMuxStore;
362     typedef TL                           TLock;
363     typedef typename TL::TWriteLockGuard TLockGuard;
364     typedef CBDB_IdBlobFile              TBlobFile;
365 
366     #ifndef NCBI_SWIG
367     /// BDB Database together with the locker
368     /// One database is opened twice, one regular mode,
369     /// another - dedicated read-only instance to improve concurrency
370     ///
371     struct SLockedDb : public CObject
372     {
373         AutoPtr<TBlobFile>      db;       ///< database file
374         AutoPtr<TLock>          lock;     ///< db lock
375         AutoPtr<TBlobFile>      db_ro;    ///< database file for reads
376         AutoPtr<TLock>          lock_ro;  ///< db lock for reads
377     };
378     #endif
379 
380     /// Volume split on optimal page size
381     struct SVolume
382     {
383         vector< CRef<SLockedDb> >  db_vect;
384     };
385 
386     typedef vector<SVolume*>  TVolumeVect;
387 
388 public:
389     /// Construction
390     /// The main parameter here is object demultiplexer for splitting
391     /// incoming LOBs into volumes and slices
392     ///
393     CBDB_BlobSplitStore(TObjDeMux* de_mux);
394     ~CBDB_BlobSplitStore();
395 
396     /// Open storage (reads storage dictionary into memory)
397     void Open(const string&             storage_name,
398               CBDB_RawFile::EOpenMode   open_mode,
399               CBDB_RawFile::EDBType     db_type=CBDB_RawFile::eBtree);
400 
401     /// Return true if the split store has been opened
402     bool IsOpen() const;
403 
404     /// Try to open all storage files in all projections
405     /// This is only possible when object de-mux has fixed
406     /// number of projections, if it is not the call is silently ignored
407     ///
408     void OpenProjections();
409 
410     /// Save storage dictionary (demux disposition).
411     /// If you modified storage (like added new BLOBs to the storage)
412     /// you MUST call save; otherwise some disposition information is lost.
413     ///
414     void Save(typename TDeMuxStore::ECompact compact_vectors
415                                                 = TDeMuxStore::eCompact);
416 
417 
SetVolumeCacheSize(unsigned int cache_size)418     void SetVolumeCacheSize(unsigned int cache_size)
419         { m_VolumeCacheSize = cache_size; }
420 
421     /// Associate with the environment. Should be called before opening.
SetEnv(CBDB_Env & env)422     void SetEnv(CBDB_Env& env) { m_Env = &env; }
423 
424     /// Get pointer on file environment
425     /// Return NULL if no environment has been set
GetEnv(void) const426     CBDB_Env* GetEnv(void) const { return m_Env; }
427 
428     /// Return the base filename of the underlying split store
GetFileName() const429     const string& GetFileName() const { return m_StorageName; }
430 
431     /// Turn off reverse splitting on the underlying stores.  This should be
432     /// called before opening.
433     void RevSplitOff();
434 
435     /// Set the priority for this database's pages in the buffer cache
436     /// This is generally a temporary advisement, and works only if an
437     /// environment is used.
438     void SetCachePriority(CBDB_RawFile::ECachePriority);
439 
440     // ---------------------------------------------------------------
441     // Transactional interface
442     // ---------------------------------------------------------------
443     virtual void SetTransaction(ITransaction* trans);
444 
445     CBDB_Transaction* GetBDBTransaction();
446 
447 
448     // ---------------------------------------------------------------
449     // Data manipulation interface
450     // ---------------------------------------------------------------
451 
452     /// Insert BLOB into the storage.
453     ///
454     /// This method does NOT check if this object is already storead
455     /// somewhere. Method can create duplicates.
456     ///
457     /// @param id       insertion key
458     /// @param data     buffer pointer
459     /// @param size     LOB data size in bytes
460     /// @param coord    out: volume - page split number
461     ///
462     EBDB_ErrCode Insert(unsigned  id,
463                         const void* data, size_t size,
464                         unsigned*   coord);
465 
466     EBDB_ErrCode Insert(unsigned  id,
467                         const void* data, size_t size);
468 
469     /// Update or insert BLOB
470     EBDB_ErrCode UpdateInsert(unsigned id,
471                               const void* data, size_t size,
472                               unsigned*   coord);
473 
474     EBDB_ErrCode UpdateInsert(unsigned id,
475                               const void* data, size_t size);
476 
477     /// Update or insert BLOB using old coordinates
478     EBDB_ErrCode UpdateInsert(unsigned id,
479                               const unsigned* old_coord,
480                               const void* data, size_t size,
481                               unsigned*   coord);
482 
483     /// Delete BLOB
484     EBDB_ErrCode Delete(unsigned id,
485                         CBDB_RawFile::EIgnoreError on_error =
486                                                 CBDB_RawFile::eThrowOnError);
487 
488     EBDB_ErrCode Delete(unsigned id,
489                         const unsigned* coords,
490                         CBDB_RawFile::EIgnoreError on_error =
491                                                 CBDB_RawFile::eThrowOnError);
492 
493 
494     /// Find (demux) coordinates by BLOB id
495     ///
496     EBDB_ErrCode GetCoordinates(unsigned id, unsigned* coords);
497 
498     /// Assing de-mux coordinates
499     void AssignCoordinates(unsigned id, const unsigned* coords);
500 
501     /// Returns true if two sets of coordinates are the same
502     bool IsSameCoordinates(const unsigned* coords1,
503                            const unsigned* coords2);
504 
505     /// Read BLOB into vector.
506     /// If BLOB does not fit, method resizes the vector to accomodate.
507     ///
508     EBDB_ErrCode ReadRealloc(unsigned id,
509                              CBDB_RawFile::TBuffer& buffer);
510 
511     /// Read BLOB into vector using provided coordinates
512     /// If BLOB does not fit, method resizes the vector to accomodate.
513     ///
514     EBDB_ErrCode ReadRealloc(unsigned id,
515                              const unsigned* coords,
516                              CBDB_RawFile::TBuffer& buffer);
517 
518     /// Fetch LOB record directly into the provided '*buf'.
519     /// If size of the LOB is greater than 'buf_size', then
520     /// if reallocation is allowed -- '*buf' will be reallocated
521     /// to fit the LOB size; otherwise, a exception will be thrown.
522     ///
523     EBDB_ErrCode Fetch(unsigned     id,
524                        void**       buf,
525                        size_t       buf_size,
526                        CBDB_RawFile::EReallocMode allow_realloc,
527                        size_t*      blob_size);
528 
529     EBDB_ErrCode Fetch(unsigned        id,
530                        const unsigned* coords,
531                        void**          buf,
532                        size_t          buf_size,
533                        CBDB_RawFile::EReallocMode allow_realloc,
534                        size_t*         blob_size);
535 
536     /// Sync the underlying stores
537     void Sync();
538 
539     /// Create stream oriented reader
540     /// @returns NULL if BLOB not found
541     ///
542     /// This method does NOT block the specified ID from concurrent access
543     /// for the life of IReader. The nature of BDB IReader is that each Read
544     /// maps into BDB get, so somebody can delete the BLOB between IReader calls.
545     /// This potential race should be taken into account in MT concurrent
546     /// application.
547     ///
548     /// Caller is responsible for deletion.
549     ///
550     IReader* CreateReader(unsigned  id);
551 
552     IReader* CreateReader(unsigned  id, const unsigned* coords);
553 
554     /// Get size of the BLOB
555     ///
556     /// @note Price of this operation is almost the same as getting
557     /// the actual BLOB. It is often better just to fetch BLOB speculatively,
558     /// hoping it fits in the buffer and resizing the buffer on exception.
559     ///
560     EBDB_ErrCode BlobSize(unsigned   id,
561                           size_t*    blob_size);
562 
563     EBDB_ErrCode BlobSize(unsigned        id,
564                           const unsigned* coords,
565                           size_t*         blob_size);
566 
567     /// Get all id of all BLOBs stored
568     ///
569     /// @param bv
570     ///    Vector of IDs stored
571     ///
572     void GetIdVector(TBitVector* bv) const;
573 
574     /// Reclaim unused memory
575     void FreeUnusedMem();
576 protected:
577     /// Close volumes without saving or doing anything with id demux
578     void CloseVolumes();
579 
580     void LoadIdDeMux(TIdDeMux& de_mux, TDeMuxStore& dict_file);
581 
582     /// Store id demux (projection vectors) into the database file
583     void SaveIdDeMux(const TIdDeMux&                    de_mux,
584                      TDeMuxStore&                       dict_file,
585                      CBDB_Transaction*                  trans,
586                       typename TDeMuxStore::ECompact    compact_vectors);
587 
588     /// Select preferred page size for the specified slice
589     unsigned GetPageSize(unsigned splice) const;
590 
591     /// Open split storage dictionary
592     void OpenDict();
593 
594     /// Make BDB file name based on volume and page size split
595     string MakeDbFileName(unsigned vol,
596                           unsigned slice);
597 
598 
599     /// Read or write operation
600     enum EGetDB_Mode {
601         eGetRead,
602         eGetWrite
603     };
604 
605     /// Get database pair (method opens and mounts database if necessary)
606     SLockedDb& GetDb(unsigned vol, unsigned slice,
607                      EGetDB_Mode get_mode);
608 
609     /// Init database mutex lock (mathod is protected against double init)
610     void InitDbMutex(SLockedDb* ldb);
611 
612 protected:
613     int                     m_TransAssociation;
614 
615     vector<unsigned>        m_PageSizes;
616     unsigned                m_VolumeCacheSize;
617     CBDB_Env*               m_Env;
618     unique_ptr<TDeMuxStore> m_DictFile;     ///< Split dictionary(id demux file)
619     mutable TLock           m_DictFileLock; ///< id demux file locker
620 
621     unique_ptr<TIdDeMux>    m_IdDeMux;      ///< Id to coordinates mapper
622     mutable CRWLock         m_IdDeMuxLock;
623 
624     unique_ptr<TObjDeMux>   m_ObjDeMux;     ///< Obj to coordinates mapper
625     TLock                   m_ObjDeMuxLock;
626 
627     TVolumeVect             m_Volumes;      ///< Volumes
628     mutable TLock           m_VolumesLock;  ///< Volumes locker
629 
630     string                  m_StorageName;
631     CBDB_RawFile::EOpenMode m_OpenMode;
632     CBDB_RawFile::EDBType   m_DB_Type;
633     CBDB_RawFile::ECachePriority m_CachePriority;
634 
635     /// True when all proj.dbs are pre-open
636     bool                    m_AllProjAvail;
637 
638     /// Flag carrying reverse split status
639     bool                    m_RevSplitOff;
640 
641     /// Lock used to sync. muli-db transactions to avoid deadlocks
642     TLock                   m_CrossDBLock;
643 
644 private:
645     /// forbidden
646     CBDB_BlobSplitStore(const CBDB_BlobSplitStore<TBV, TObjDeMux, TL>&);
647     CBDB_BlobSplitStore<TBV, TObjDeMux, TL>& operator=(const CBDB_BlobSplitStore<TBV, TObjDeMux, TL>&);
648 };
649 
650 /* @} */
651 
652 
653 /////////////////////////////////////////////////////////////////////////////
654 //  IMPLEMENTATION of INLINE functions
655 /////////////////////////////////////////////////////////////////////////////
656 
657 
658 template<class TBV, class TObjDeMux, class TL>
659 inline
CBDB_BlobSplitStore(TObjDeMux * de_mux)660 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CBDB_BlobSplitStore(TObjDeMux* de_mux)
661  : m_TransAssociation(CBDB_Transaction::eFullAssociation),
662    m_PageSizes(7),
663    m_VolumeCacheSize(0),
664    m_Env(0),
665    m_IdDeMux(new TIdDeMux(2)),
666    m_ObjDeMux(de_mux),
667    m_OpenMode(CBDB_RawFile::eReadOnly),
668    m_DB_Type(CBDB_RawFile::eBtree),
669    m_CachePriority(CBDB_RawFile::eCache_Default),
670    m_AllProjAvail(false),
671    m_RevSplitOff(false)
672 {
673     m_PageSizes[0] = 0;         // max blob size =   256
674     m_PageSizes[1] = 0;         // max blob size =   512
675     m_PageSizes[2] = 8 * 1024;  // max blob size =  2048
676     m_PageSizes[3] = 16* 1024;  // max blob size =  4096
677     m_PageSizes[4] = 32* 1024;  // max blob size =  8192
678     m_PageSizes[5] = 64* 1024;  // max blob size = 16384
679     m_PageSizes[6] = 64* 1024;  // max blob size = 32768
680 }
681 
682 template<class TBV, class TObjDeMux, class TL>
683 inline
~CBDB_BlobSplitStore()684 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::~CBDB_BlobSplitStore()
685 {
686     try {
687         CloseVolumes();
688     }
689     catch (std::exception& e) {
690         ERR_POST_XX(Db_Bdb_Blob, 3, Error
691                     << "CBDB_BlobSplitStore<>::~CBDB_BlobSplitStore(): "
692                     "error in CloseVolumes(): " << e.what());
693     }
694 
695     try {
696         if (m_OpenMode != CBDB_RawFile::eReadOnly) {
697             Save();
698         }
699     }
700     catch (std::exception& e) {
701         ERR_POST_XX(Db_Bdb_Blob, 4, Error
702                     << "CBDB_BlobSplitStore<>::~CBDB_BlobSplitStore(): "
703                     "error in Save(): " << e.what());
704     }
705 }
706 
707 template<class TBV, class TObjDeMux, class TL>
708 inline
RevSplitOff()709 void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::RevSplitOff()
710 {
711     m_RevSplitOff = true;
712 }
713 
714 
715 template<class TBV, class TObjDeMux, class TL>
716 inline
SetCachePriority(CBDB_RawFile::ECachePriority p)717 void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SetCachePriority(CBDB_RawFile::ECachePriority p)
718 {
719     m_CachePriority = p;
720 }
721 
722 
723 template<class TBV, class TObjDeMux, class TL>
724 inline
CloseVolumes()725 void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CloseVolumes()
726 {
727     for (size_t i = 0; i < m_Volumes.size(); ++i) {
728         SVolume* v = m_Volumes[i];
729         delete v;
730     }
731 }
732 
733 template<class TBV, class TObjDeMux, class TL>
734 inline bool
IsSameCoordinates(const unsigned * coords1,const unsigned * coords2)735 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::IsSameCoordinates(
736                                             const unsigned* coords1,
737                                             const unsigned* coords2)
738 {
739     return coords1[0] == coords2[0] &&
740            coords1[1] == coords2[1];
741 }
742 
743 template<class TBV, class TObjDeMux, class TL>
744 inline void
GetIdVector(TBitVector * bv) const745 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetIdVector(TBitVector* bv) const
746 {
747     CReadLockGuard lg(m_IdDeMuxLock);
748     m_IdDeMux->GetIdVector(bv);
749 }
750 
751 
752 template<class TBV, class TObjDeMux, class TL>
753 inline void
AssignCoordinates(unsigned id,const unsigned * coords)754 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::AssignCoordinates(
755                                                     unsigned id,
756                                              const unsigned* coords)
757 {
758     unsigned old_coord[2];
759     bool found;
760     {{
761         CReadLockGuard lg(m_IdDeMuxLock);
762         found = m_IdDeMux->GetCoordinatesFast(id, old_coord);
763     }}
764     if (found && IsSameCoordinates(old_coord, coords)) {
765         return;
766     }
767     // correct coordinate mapping
768     {{
769         CWriteLockGuard lg(m_IdDeMuxLock);
770         m_IdDeMux->SetCoordinatesFast(id, coords);
771     }}
772 
773 }
774 
775 template<class TBV, class TObjDeMux, class TL>
776 inline EBDB_ErrCode
Insert(unsigned id,const void * data,size_t size)777 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Insert(unsigned    id,
778                                                 const void* data,
779                                                 size_t      size)
780 {
781     unsigned coord[2];
782     return this->Insert(id, data, size, coord);
783 
784 }
785 
786 
787 template<class TBV, class TObjDeMux, class TL>
788 inline EBDB_ErrCode
Insert(unsigned int id,const void * data,size_t size,unsigned * coord)789 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Insert(unsigned int     id,
790                                                 const void*      data,
791                                                 size_t           size,
792                                                 unsigned*        coord)
793 {
794     _ASSERT(coord);
795 
796     // check if BLOB exists
797     bool found;
798     {{
799         CReadLockGuard lg(m_IdDeMuxLock);
800         found = m_IdDeMux->GetCoordinatesFast(id, coord);
801     }}
802     if (found) {
803         return eBDB_KeyDup;
804     }
805 
806 
807     {{
808         TLockGuard lg(m_ObjDeMuxLock);
809         m_ObjDeMux->GetCoordinates((unsigned)size, coord);
810     }}
811 
812     {{
813         CWriteLockGuard lg(m_IdDeMuxLock);
814         m_IdDeMux->SetCoordinatesFast(id, coord);
815     }}
816 
817     SLockedDb& dbp = this->GetDb(coord[0], coord[1], eGetWrite);
818     {{
819         TLockGuard lg(*dbp.lock);
820         dbp.db->SetTransaction(GetTransaction());
821         dbp.db->id = id;
822         return dbp.db->Insert(data, size);
823     }}
824 }
825 
826 template<class TBV, class TObjDeMux, class TL>
827 inline EBDB_ErrCode
UpdateInsert(unsigned int id,const void * data,size_t size)828 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::UpdateInsert(unsigned int     id,
829                                                      const void*       data,
830                                                      size_t            size)
831 {
832     unsigned coord[2];
833     return this->UpdateInsert(id, data, size, coord);
834 }
835 
836 
837 template<class TBV, class TObjDeMux, class TL>
838 inline EBDB_ErrCode
UpdateInsert(unsigned int id,const void * data,size_t size,unsigned * coord)839 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::UpdateInsert(unsigned int     id,
840                                                      const void*       data,
841                                                      size_t            size,
842                                                      unsigned*         coord)
843 {
844     bool found;
845     {{
846         CReadLockGuard lg(m_IdDeMuxLock);
847         found = m_IdDeMux->GetCoordinatesFast(id, coord);
848     }}
849     if (!found) {
850         return this->Insert(id, data, size, coord);
851     }
852 
853     unsigned slice = m_ObjDeMux->SelectSplit(size);
854     if (slice != coord[1]) {
855         // lock to prevent deadlock (no guarentee on the order of update)
856         TLockGuard lg(m_CrossDBLock);
857 
858         this->Delete(id, CBDB_RawFile::eThrowOnError);
859         return this->Insert(id, data, size, coord);
860     } else {
861         SLockedDb& dbp = this->GetDb(coord[0], coord[1], eGetWrite);
862         TLockGuard lg(*(dbp.lock));
863         dbp.db->SetTransaction(GetTransaction());
864         dbp.db->id = id;
865         return dbp.db->UpdateInsert(data, size);
866     }
867 }
868 
869 template<class TBV, class TObjDeMux, class TL>
870 inline EBDB_ErrCode
UpdateInsert(unsigned id,const unsigned * old_coord,const void * data,size_t size,unsigned * coord)871 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::UpdateInsert(unsigned id,
872                                                const unsigned* old_coord,
873                                                const void*     data,
874                                                size_t          size,
875                                                unsigned*       coord)
876 {
877     _ASSERT(old_coord);
878     _ASSERT(coord);
879 
880     bool found;
881     {{
882         CReadLockGuard lg(m_IdDeMuxLock);
883         found = m_IdDeMux->GetCoordinatesFast(id, coord);
884     }}
885     if (found) {
886         // consistency check
887         if (old_coord[0] != coord[0] || old_coord[1] != coord[1]) {
888             // lock to prevent deadlock (no guarentee on the order of update)
889             TLockGuard lg(m_CrossDBLock);
890 
891             // external (or internal)coordinate storage skrewed
892             this->Delete(id, coord, CBDB_RawFile::eThrowOnError);
893             this->Delete(id, old_coord, CBDB_RawFile::eThrowOnError);
894 
895             // re-insert
896             return this->Insert(id, data, size, coord);
897         }
898     }
899 
900     coord[0] = old_coord[0];
901     coord[1] = old_coord[1];
902 
903     if (!found) {
904         return this->Insert(id, data, size, coord);
905     }
906 
907     unsigned slice = m_ObjDeMux->SelectSplit(size);
908     if (slice != coord[1]) {
909         // lock to prevent deadlock (no guarentee on the order of update)
910         TLockGuard lg(m_CrossDBLock);
911 
912         this->Delete(id, coord, CBDB_RawFile::eThrowOnError);
913         return this->Insert(id, data, size, coord);
914     } else {
915         SLockedDb& dbp = this->GetDb(coord[0], coord[1], eGetWrite);
916         TLockGuard lg(*dbp.lock);
917         dbp.db->SetTransaction(GetTransaction());
918         dbp.db->id = id;
919         return dbp.db->UpdateInsert(data, size);
920     }
921 }
922 
923 
924 
925 template<class TBV, class TObjDeMux, class TL>
926 inline EBDB_ErrCode
Delete(unsigned id,CBDB_RawFile::EIgnoreError on_error)927 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Delete(unsigned          id,
928                                       CBDB_RawFile::EIgnoreError  on_error)
929 {
930     unsigned coord[2];
931     bool found;
932     {{
933         CReadLockGuard lg(m_IdDeMuxLock);
934         found = m_IdDeMux->GetCoordinatesFast(id, coord);
935     }}
936     if (!found) {
937         return eBDB_NotFound;
938     }
939     return this->Delete(id, coord, on_error);
940 }
941 
942 template<class TBV, class TObjDeMux, class TL>
943 inline EBDB_ErrCode
Delete(unsigned id,const unsigned * coords,CBDB_RawFile::EIgnoreError on_error)944 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Delete(unsigned          id,
945                                                 const unsigned*   coords,
946                                       CBDB_RawFile::EIgnoreError  on_error)
947 {
948     // clear coordinate mapping
949     {{
950         CWriteLockGuard lg(m_IdDeMuxLock);
951         m_IdDeMux->SetCoordinatesFast(id, coords, false);
952     }}
953 
954     SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetWrite);
955     {{
956         TLockGuard lg(*dbp.lock);
957         dbp.db->SetTransaction(GetTransaction());
958         dbp.db->id = id;
959         return dbp.db->Delete(on_error);
960     }}
961 }
962 
963 
964 
965 template<class TBV, class TObjDeMux, class TL>
966 inline IReader*
CreateReader(unsigned id)967 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CreateReader(unsigned  id)
968 {
969     unsigned coord[2];
970     bool found;
971     {{
972         CReadLockGuard lg(m_IdDeMuxLock);
973         found = m_IdDeMux->GetCoordinatesFast(id, coord);
974     }}
975     if (!found) {
976         return 0;
977     }
978     return this->CreateReader(id, coord);
979 }
980 
981 template<class TBV, class TObjDeMux, class TL>
982 inline IReader*
CreateReader(unsigned id,const unsigned * coords)983 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CreateReader(
984                                                 unsigned        id,
985                                                 const unsigned* coords)
986 {
987     TBlobFile* db;
988     TLock*     lock;
989     {{
990         SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
991 
992         if (dbp.db_ro.get()) {
993             db = dbp.db_ro.get();
994             lock = dbp.lock_ro.get();
995         } else {
996             db = dbp.db.get();
997             lock = dbp.lock.get();
998         }
999     }}
1000     {{
1001         TLockGuard lg(*lock);
1002         db->SetTransaction(GetTransaction());
1003         db->id = id;
1004         if (db->Fetch() != eBDB_Ok) {
1005             return 0;
1006         }
1007         return db->CreateReader();
1008     }}
1009 }
1010 
1011 
1012 template<class TBV, class TObjDeMux, class TL>
FreeUnusedMem()1013 inline void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::FreeUnusedMem()
1014 {
1015    CWriteLockGuard lg(m_IdDeMuxLock);
1016    m_IdDeMux->FreeUnusedMem();
1017 }
1018 template<class TBV, class TObjDeMux, class TL>
1019 inline CBDB_Transaction*
GetBDBTransaction()1020 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetBDBTransaction()
1021 {
1022     ITransaction* trans = this->GetTransaction();
1023     return dynamic_cast<CBDB_Transaction*>(trans);
1024 }
1025 
1026 template<class TBV, class TObjDeMux, class TL>
1027 inline void
SetTransaction(ITransaction * trans)1028 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SetTransaction(ITransaction* trans)
1029 {
1030     CBDB_Transaction* db_trans = CBDB_Transaction::CastTransaction(trans);
1031     CBDB_Transaction* curr_trans;
1032 
1033     if (m_TransAssociation == (int) CBDB_Transaction::eFullAssociation) {
1034         curr_trans = this->GetBDBTransaction();
1035         if (curr_trans) {
1036             curr_trans->Remove(this);
1037         }
1038     }
1039 
1040     curr_trans = db_trans;
1041     if (curr_trans) {
1042         m_TransAssociation = curr_trans->GetAssociationMode();
1043         if (m_TransAssociation == (int) CBDB_Transaction::eFullAssociation) {
1044             curr_trans->Add(this);
1045         }
1046     }
1047     CThreadLocalTransactional::SetTransaction(curr_trans);
1048 }
1049 
1050 
1051 template<class TBV, class TObjDeMux, class TL>
1052 inline void
Sync()1053 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Sync()
1054 {
1055     for (size_t i = 0; i < m_Volumes.size(); ++i) {
1056         SVolume* v = m_Volumes[i];
1057         if ( !v ) {
1058             continue;
1059         }
1060         for (size_t j = 0;  j < v->db_vect.size();  ++j) {
1061             SLockedDb* db = &*(v->db_vect[j]);
1062             if (db  &&  db->db) {
1063                 db->db->Sync();
1064             }
1065         }
1066     }
1067 }
1068 
1069 
1070 template<class TBV, class TObjDeMux, class TL>
1071 inline EBDB_ErrCode
Fetch(unsigned id,const unsigned * coords,void ** buf,size_t buf_size,CBDB_RawFile::EReallocMode allow_realloc,size_t * blob_size)1072 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Fetch(unsigned        id,
1073                                                const unsigned* coords,
1074                                               void**           buf,
1075                                               size_t           buf_size,
1076                                CBDB_RawFile::EReallocMode      allow_realloc,
1077                                               size_t*          blob_size)
1078 {
1079     EBDB_ErrCode ret;
1080     TBlobFile* db;
1081     TLock*     lock;
1082     {{
1083         SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
1084 
1085         if (dbp.db_ro.get()) {
1086             db = dbp.db_ro.get();
1087             lock = dbp.lock_ro.get();
1088         } else {
1089             db = dbp.db.get();
1090             lock = dbp.lock.get();
1091         }
1092     }}
1093     {{
1094         TLockGuard lg(*lock);
1095         db->SetTransaction(GetTransaction());
1096         db->id = id;
1097 
1098         ret = db->Fetch(buf, buf_size, allow_realloc);
1099         if (ret == eBDB_Ok) {
1100             if (blob_size) {
1101                 *blob_size = db->LobSize();
1102             }
1103         }
1104     }}
1105     return ret;
1106 }
1107 
1108 
1109 
1110 template<class TBV, class TObjDeMux, class TL>
1111 inline EBDB_ErrCode
Fetch(unsigned id,void ** buf,size_t buf_size,CBDB_RawFile::EReallocMode allow_realloc,size_t * blob_size)1112 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Fetch(unsigned     id,
1113                                               void**       buf,
1114                                               size_t       buf_size,
1115                                CBDB_RawFile::EReallocMode  allow_realloc,
1116                                               size_t*      blob_size)
1117 {
1118     unsigned coord[2];
1119     bool found;
1120     {{
1121         CReadLockGuard lg(m_IdDeMuxLock);
1122         found = m_IdDeMux->GetCoordinatesFast(id, coord);
1123     }}
1124     if (!found) {
1125         return eBDB_NotFound;
1126     }
1127     return this->Fetch(id, coord, buf, buf_size, allow_realloc, blob_size);
1128 }
1129 
1130 template<class TBV, class TObjDeMux, class TL>
1131 inline EBDB_ErrCode
GetCoordinates(unsigned id,unsigned * coord)1132 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetCoordinates(unsigned  id,
1133                                                         unsigned* coord)
1134 {
1135     _ASSERT(coord);
1136     bool found;
1137     {{
1138         CReadLockGuard lg(m_IdDeMuxLock);
1139         found = m_IdDeMux->GetCoordinatesFast(id, coord);
1140     }}
1141     if (!found) {
1142         return eBDB_NotFound;
1143     }
1144     return eBDB_Ok;
1145 }
1146 
1147 template<class TBV, class TObjDeMux, class TL>
1148 inline EBDB_ErrCode
ReadRealloc(unsigned id,const unsigned * coords,CBDB_RawFile::TBuffer & buffer)1149 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::ReadRealloc(
1150                                             unsigned               id,
1151                                             const unsigned*        coords,
1152                                             CBDB_RawFile::TBuffer& buffer)
1153 {
1154     _ASSERT(coords);
1155 
1156     TBlobFile* db = NULL;
1157     TLock*     lock = NULL;
1158     {{
1159         SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
1160 
1161         if (dbp.db_ro.get()) {
1162             db = dbp.db_ro.get();
1163             lock = dbp.lock_ro.get();
1164         } else {
1165             db = dbp.db.get();
1166             lock = dbp.lock.get();
1167         }
1168     }}
1169 
1170     {{
1171         TLockGuard lg(*lock);
1172 
1173         db->SetTransaction(GetBDBTransaction());
1174         db->id = id;
1175         EBDB_ErrCode e = db->ReadRealloc(buffer);
1176         return e;
1177     }}
1178 }
1179 
1180 
1181 template<class TBV, class TObjDeMux, class TL>
1182 inline EBDB_ErrCode
ReadRealloc(unsigned id,CBDB_RawFile::TBuffer & buffer)1183 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::ReadRealloc(
1184                                             unsigned               id,
1185                                             CBDB_RawFile::TBuffer& buffer)
1186 {
1187     unsigned coord[2];
1188     bool found;
1189     {{
1190         CReadLockGuard lg(m_IdDeMuxLock);
1191         found = m_IdDeMux->GetCoordinatesFast(id, coord);
1192     }}
1193     if (!found) {
1194         return eBDB_NotFound;
1195     }
1196     return this->ReadRealloc(id, coord, buffer);
1197 }
1198 
1199 template<class TBV, class TObjDeMux, class TL>
1200 inline EBDB_ErrCode
BlobSize(unsigned id,size_t * blob_size)1201 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::BlobSize(unsigned   id,
1202                                                   size_t*    blob_size)
1203 {
1204     unsigned coord[2];
1205     bool found;
1206     {{
1207         CReadLockGuard lg(m_IdDeMuxLock);
1208         found = m_IdDeMux->GetCoordinatesFast(id, coord);
1209     }}
1210     if (!found) {
1211         return eBDB_NotFound;
1212     }
1213     return this->BlobSize(id, coord, blob_size);
1214 }
1215 
1216 template<class TBV, class TObjDeMux, class TL>
1217 inline EBDB_ErrCode
BlobSize(unsigned id,const unsigned * coords,size_t * blob_size)1218 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::BlobSize(unsigned        id,
1219                                                   const unsigned* coords,
1220                                                   size_t*         blob_size)
1221 {
1222     TBlobFile* db;
1223     TLock*     lock;
1224     {{
1225         SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
1226 
1227         if (dbp.db_ro.get()) {
1228             db = dbp.db_ro.get();
1229             lock = dbp.lock_ro.get();
1230         } else {
1231             db = dbp.db.get();
1232             lock = dbp.lock.get();
1233         }
1234     }}
1235 
1236     {{
1237         TLockGuard lg(*lock);
1238         db->SetTransaction(GetTransaction());
1239         db->id = id;
1240         EBDB_ErrCode e = db->Fetch();
1241         if (e != eBDB_Ok) {
1242             return e;
1243         }
1244         *blob_size = db->LobSize();
1245         return e;
1246     }}
1247 }
1248 
1249 
1250 
1251 template<class TBV, class TObjDeMux, class TL>
1252 inline void
Open(const string & storage_name,CBDB_RawFile::EOpenMode open_mode,CBDB_RawFile::EDBType db_type)1253 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Open(const string&     storage_name,
1254                                           CBDB_RawFile::EOpenMode  open_mode,
1255                                           CBDB_RawFile::EDBType      db_type)
1256 {
1257     CloseVolumes();
1258     m_StorageName = storage_name;
1259     m_OpenMode    = open_mode;
1260     m_DB_Type     = db_type;
1261 
1262     {{
1263     TLockGuard     lg1(m_DictFileLock);
1264     CReadLockGuard lg2(m_IdDeMuxLock);
1265 
1266     OpenDict();
1267     LoadIdDeMux(*m_IdDeMux, *m_DictFile);
1268     }}
1269 }
1270 
1271 template<class TBV, class TObjDeMux, class TL>
1272 inline bool
IsOpen() const1273 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::IsOpen() const
1274 {
1275     return m_DictFile.get() ? true : false;
1276 }
1277 
1278 template<class TBV, class TObjDeMux, class TL>
1279 inline void
OpenProjections()1280 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::OpenProjections()
1281 {
1282     unsigned max_split = m_ObjDeMux->GetSplitSize();
1283     unsigned max_vol = m_ObjDeMux->GetVolumeSize();
1284     if (!max_split || !max_vol) {
1285         // cannot do anything: open ended projections
1286         return;
1287     }
1288     for (unsigned i = 0; i < max_vol; ++i) {
1289         for (unsigned j = 0; j < max_split; ++j) {
1290             /* SLockedDb& db = */ this->GetDb(i, j, eGetRead);
1291         }
1292     }
1293     m_AllProjAvail = true;
1294     this->Save(TDeMuxStore::eNoCompact); // quick dump no compression
1295 }
1296 
1297 template<class TBV, class TObjDeMux, class TL>
1298 inline void
OpenDict()1299 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::OpenDict()
1300 {
1301     m_DictFile.reset(new TDeMuxStore);
1302     if (m_Env) {
1303         m_DictFile->SetEnv(*m_Env);
1304     }
1305     string dict_fname(m_StorageName);
1306     dict_fname.append(".splitd");
1307 
1308     m_DictFile->Open(dict_fname.c_str(), m_OpenMode);
1309 
1310     m_IdDeMux.reset(new TIdDeMux(2));
1311 }
1312 
1313 template<class TBV, class TObjDeMux, class TL>
1314 inline void
LoadIdDeMux(TIdDeMux & de_mux,TDeMuxStore & dict_file)1315 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::LoadIdDeMux(TIdDeMux&      de_mux,
1316                                                    TDeMuxStore&   dict_file)
1317 {
1318     CBDB_FileCursor cur(dict_file);
1319     cur.SetCondition(CBDB_FileCursor::eGE);
1320     cur.From << 0;
1321 
1322     typename TDeMuxStore::TBuffer& buf = dict_file.GetBuffer();
1323     EBDB_ErrCode err;
1324     while (true) {
1325         err = dict_file.FetchToBuffer(cur);
1326         if (err != eBDB_Ok) {
1327             break;
1328         }
1329         unsigned dim = dict_file.dim;
1330         unsigned dim_idx = dict_file.dim_idx;
1331 
1332         unique_ptr<TBitVector> bv(new TBitVector(bm::BM_GAP));
1333         dict_file.Deserialize(bv.get(), &buf[0]);
1334 
1335         de_mux.SetProjection(dim, dim_idx, bv.release());
1336 
1337     } // while
1338 }
1339 
1340 template<class TBV, class TObjDeMux, class TL>
1341 inline void
Save(typename TDeMuxStore::ECompact compact_vectors)1342 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Save(
1343                            typename TDeMuxStore::ECompact  compact_vectors)
1344 {
1345     if ( m_IdDeMux.get()  &&  m_DictFile.get() ) {
1346         TLockGuard     lg1(m_DictFileLock);
1347         CReadLockGuard lg2(m_IdDeMuxLock);
1348 
1349         // use NULL transaction (autocommit)
1350         this->SaveIdDeMux(*m_IdDeMux, *m_DictFile, 0, compact_vectors);
1351     }
1352 }
1353 
1354 
1355 template<class TBV, class TObjDeMux, class TL>
1356 inline void
SaveIdDeMux(const TIdDeMux & de_mux,TDeMuxStore & dict_file,CBDB_Transaction * trans,typename TDeMuxStore::ECompact compact_vectors)1357 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SaveIdDeMux(
1358                                     const TIdDeMux&      de_mux,
1359                                     TDeMuxStore&         dict_file,
1360                                     CBDB_Transaction*    trans,
1361                       typename TDeMuxStore::ECompact     compact_vectors)
1362 {
1363     dict_file.SetTransaction(trans);
1364 
1365     size_t N = de_mux.GetN();
1366     for (size_t i = 0; i < N; ++i) {
1367         const typename TIdDeMux::TDimVector& dv = de_mux.GetDimVector(i);
1368 
1369         for (size_t j = 0; j < dv.size(); ++j) {
1370             dict_file.dim = (Uint4) i;
1371             dict_file.dim_idx = (Uint4) j;
1372 
1373             const TBitVector* bv = dv[j].get();
1374             if (!bv) {
1375                 dict_file.Delete(CBDB_RawFile::eIgnoreError);
1376             } else {
1377                 dict_file.WriteVector(*bv, compact_vectors);
1378             }
1379 
1380         } // for j
1381     } // for i
1382 }
1383 
1384 template<class TBV, class TObjDeMux, class TL>
1385 inline unsigned
GetPageSize(unsigned splice) const1386 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetPageSize(unsigned splice) const
1387 {
1388     if (splice < m_PageSizes.size())
1389         return m_PageSizes[splice];
1390     return 64 * 1024;
1391 }
1392 
1393 template<class TBV, class TObjDeMux, class TL>
1394 inline string
MakeDbFileName(unsigned vol,unsigned slice)1395 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::MakeDbFileName(unsigned vol,
1396                                                       unsigned slice)
1397 {
1398     string ext;
1399     switch (m_DB_Type)
1400     {
1401     case CBDB_RawFile::eBtree:
1402         ext = ".db";
1403         break;
1404     case CBDB_RawFile::eHash:
1405         ext = ".hdb";
1406         break;
1407     default:
1408         _ASSERT(0);
1409     } // switch
1410     return m_StorageName + "_" +
1411            NStr::UIntToString(vol) + "_" + NStr::UIntToString(slice) + ext;
1412 }
1413 
1414 template<class TBV, class TObjDeMux, class TL>
InitDbMutex(SLockedDb * ldb)1415 inline void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::InitDbMutex(SLockedDb* ldb)
1416 {
1417     if ((ldb->lock.get() == 0) || (ldb->lock_ro.get() == 0)) {
1418         TLockGuard lg(m_VolumesLock);
1419         if (ldb->lock.get() == 0) {
1420             ldb->lock.reset(new TLock);
1421         }
1422         if (ldb->lock_ro.get() == 0) {
1423             ldb->lock_ro.reset(new TLock);
1424         }
1425     }
1426 }
1427 
1428 
1429 template<class TBV, class TObjDeMux, class TL>
1430 inline typename CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SLockedDb&
GetDb(unsigned vol,unsigned slice,EGetDB_Mode get_mode)1431 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetDb(unsigned     vol,
1432                                                unsigned     slice,
1433                                                EGetDB_Mode  get_mode)
1434 {
1435     // speculative un-locked check if everything is open already
1436     // (we don't close or shrink the store in parallel, so it is safe)
1437 
1438     SLockedDb* lp = 0;
1439 
1440     // all databases are pre-open - no need to lock top level mutex
1441     //
1442     if (m_AllProjAvail) {
1443 
1444         _ASSERT(m_Volumes.size() > vol);
1445         _ASSERT((m_Volumes[vol])->db_vect.size() > slice);
1446 
1447         SVolume& volume = *(m_Volumes[vol]);
1448         lp = &*volume.db_vect[slice];
1449 
1450         _ASSERT(lp->db.get());
1451         _ASSERT(lp->lock.get());
1452 
1453         return *lp;
1454     }
1455 
1456     {{
1457         TLockGuard lg(m_VolumesLock);
1458         if ((m_Volumes.size() > vol) &&
1459             ((m_Volumes[vol])->db_vect.size() > slice)) {
1460             SVolume& volume = *(m_Volumes[vol]);
1461             lp = &*volume.db_vect[slice];
1462             if (lp->db.get()) {
1463                 return *lp;
1464             }
1465          }
1466     }}
1467 
1468     {  // lock protected open
1469         TLockGuard lg(m_VolumesLock);
1470         while (m_Volumes.size() < (vol+1)) {
1471             unique_ptr<SVolume> v(new SVolume);
1472             v->db_vect.resize(slice+1);
1473             for (size_t i = 0; i < v->db_vect.size(); ++i) {
1474                 if ( !v->db_vect[i] ) {
1475                     v->db_vect[i] = new SLockedDb;
1476                 }
1477             } // for
1478             m_Volumes.push_back(v.release());
1479         }
1480 
1481         SVolume& volume = *(m_Volumes[vol]);
1482         if (volume.db_vect.size() <= slice) {
1483             volume.db_vect.resize(slice+1);
1484             for (size_t i = 0; i < volume.db_vect.size(); ++i) {
1485                 if ( !volume.db_vect[i] ) {
1486                     volume.db_vect[i] = new SLockedDb;
1487                 }
1488             } // for
1489 
1490         }
1491         lp = &*volume.db_vect[slice];
1492     }
1493 
1494     bool needs_save = false;
1495 
1496     {{
1497         _ASSERT(lp);
1498 
1499          InitDbMutex(lp);
1500          TLockGuard lg(*(lp->lock));
1501          if (lp->db.get() == 0) {
1502              string fname = this->MakeDbFileName(vol, slice);
1503              lp->db.reset(new TBlobFile(CBDB_File::eDuplicatesDisable,
1504                                         m_DB_Type));
1505              if (m_Env) {
1506                  lp->db->SetEnv(*m_Env);
1507                  lp->db_ro.reset(new TBlobFile(CBDB_File::eDuplicatesDisable,
1508                                                m_DB_Type));
1509                  lp->db_ro->SetEnv(*m_Env);
1510              } else {
1511                  if (m_VolumeCacheSize) {
1512                      lp->db->SetCacheSize(m_VolumeCacheSize);
1513                  }
1514              }
1515              unsigned page_size = GetPageSize(slice);
1516              if (page_size) {
1517                  lp->db->SetPageSize(page_size);
1518              }
1519 
1520              /// also twiddle min keys per page
1521              switch (slice) {
1522              case 0:
1523                  /// page size = default
1524                  /// blobs <= 256 bytes
1525                  lp->db->SetBtreeMinKeysPerPage(6);
1526                  break;
1527 
1528              case 1:
1529                  /// page size = default
1530                  /// blobs > 256, <= 512 bytes
1531                  lp->db->SetBtreeMinKeysPerPage(3);
1532                  break;
1533 
1534              default:
1535                  /// use default = 2
1536                  break;
1537              }
1538 
1539              /// turn off reverse splitting if requested
1540              if (m_RevSplitOff) {
1541                  lp->db->RevSplitOff();
1542              }
1543 
1544              lp->db->Open(fname.c_str(), m_OpenMode);
1545              lp->db->SetCachePriority(m_CachePriority);
1546              if (lp->db_ro.get()) {
1547                 lp->db_ro->Open(fname.c_str(), CBDB_RawFile::eReadOnly);
1548                 lp->db_ro->SetCachePriority(m_CachePriority);
1549              }
1550              needs_save = true;
1551          }
1552      }}
1553 
1554     if (needs_save  &&
1555             (m_OpenMode == CBDB_RawFile::eReadWriteCreate ||
1556              m_OpenMode == CBDB_RawFile::eReadWrite) &&
1557         get_mode == eGetWrite) {
1558         // new split volume: checkpoint the changes
1559         this->Save(TDeMuxStore::eNoCompact); // quick dump no compression
1560     }
1561 
1562     return *lp;
1563 }
1564 
1565 
1566 END_NCBI_SCOPE
1567 
1568 
1569 #endif
1570 
1571