1 #ifndef BDB___SPLIT_BLOB_HPP__
2 #define BDB___SPLIT_BLOB_HPP__
3
4 /* $Id: bdb_split_blob.hpp 617975 2020-10-08 18:28:17Z grichenk $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Author: Anatoliy Kuznetsov
30 *
31 * File Description: BDB library split BLOB store.
32 *
33 */
34
35
36 /// @file bdb_split_blob.hpp
37 /// BDB library split BLOB store.
38
39 #include <corelib/ncbistd.hpp>
40 #include <corelib/ncbimtx.hpp>
41 #include <corelib/ncbistre.hpp>
42 #include <corelib/ncbistr.hpp>
43 #include <corelib/ncbifile.hpp>
44
45 #include <util/math/matrix.hpp>
46
47 #include <db/bdb/bdb_blob.hpp>
48 #include <db/bdb/bdb_bv_store.hpp>
49 #include <db/bdb/bdb_cursor.hpp>
50 #include <db/bdb/bdb_trans.hpp>
51 #include <db/error_codes.hpp>
52
53 #include <util/id_mux.hpp>
54
55
56 BEGIN_NCBI_SCOPE
57
58 /** @addtogroup BDB_BLOB
59 *
60 * @{
61 */
62
63
64 /// Persistent storage for demux information
65 ///
66 template<class TBV>
67 class CBDB_BlobStoreDict : public CBDB_BvStore<TBV>
68 {
69 public:
70 CBDB_FieldUint4 dim; ///< dimention
71 CBDB_FieldUint4 dim_idx; ///< projection index
72
73 typedef CBDB_BvStore<TBV> TParent;
74
75 public:
CBDB_BlobStoreDict()76 CBDB_BlobStoreDict()
77 {
78 this->BindKey("dim", &dim);
79 this->BindKey("dim_idx", &dim_idx);
80 }
81 };
82
83 /// Base class for page-split demultiplexers
84 ///
85 class CBDB_BlobDeMuxSplit : public IObjDeMux<unsigned>
86 {
87 public:
88 /// LOBs are getting split into slices based on LOB size,
89 /// similar BLOBs go to the compartment with more optimal storage
90 /// paramaters
91 ///
92 static
SelectSplit(size_t blob_size)93 unsigned SelectSplit(size_t blob_size)
94 {
95 static const unsigned size_split[] = {
96 256, 512, 2048, 4096, 8192, 16384, 32768
97 };
98 const unsigned kMaxSize = (unsigned)(sizeof(size_split) / sizeof(*size_split));
99
100 for (unsigned i = 0; i < kMaxSize; ++i) {
101 if (blob_size < size_split[i]) {
102 return i;
103 }
104 }
105 return kMaxSize;
106 }
107
108 /// Returns total number of splits (horizontal projection)
109 /// If method returns 0 - means there is no hard number: open ended proj
GetSplitSize() const110 unsigned GetSplitSize() const { return 8; }
111
112 /// Returns total number of volumes (vertical projection)
113 /// If method returns 0 - means there is no hard number: open ended proj
GetVolumeSize() const114 unsigned GetVolumeSize() const { return 0; }
115 };
116
117 /// Volume split BLOB demultiplexer
118 ///
119 /// This class is doing some simple accounting, counting size and number
120 /// of incoming LOBs, splitting them into [volume, page size]
121 ///
122 class CBDB_BlobDeMux : public CBDB_BlobDeMuxSplit
123 {
124 public:
125 typedef CNcbiMatrix<double> TVolumeSize;
126 typedef CNcbiMatrix<unsigned> TVolumeRecs;
127
128 public:
CBDB_BlobDeMux(double vol_max=1.5* (1024.00* 1024.00* 1024.00),unsigned rec_max=3* 1000000)129 CBDB_BlobDeMux(double vol_max = 1.5 * (1024.00*1024.00*1024.00),
130 unsigned rec_max = 3 * 1000000)
131 : m_VolMax(vol_max), m_RecMax(rec_max)
132 {
133 }
134
135 /// coordinates:
136 ///
137 /// 0 - active volume number
138 /// 1 - page split number
139 ///
GetCoordinates(unsigned blob_size,unsigned * coord)140 void GetCoordinates(unsigned blob_size, unsigned* coord)
141 {
142 _ASSERT(coord);
143 _ASSERT(m_RecS.GetRows() == m_VolS.GetRows());
144 _ASSERT(m_RecS.GetCols() == m_VolS.GetCols());
145
146 coord[1] = CBDB_BlobDeMux::SelectSplit(blob_size);
147 size_t max_col = max((size_t)m_RecS.GetCols(),
148 (size_t)coord[1] + 1);
149 m_RecS.Resize(m_RecS.GetRows(), max_col);
150 m_VolS.Resize(m_VolS.GetRows(), max_col);
151
152 for (unsigned i = 0; i < m_RecS.GetRows(); ++i) {
153 if (m_RecS(i, coord[1]) < m_RecMax &&
154 m_VolS(i, coord[1]) < m_VolMax) {
155 coord[0] = i;
156 ++m_RecS(i, coord[1]);
157 m_VolS (i, coord[1]) += blob_size;
158 return;
159 }
160 }
161
162 /// not found
163 NewPlane();
164 coord[0] = unsigned(m_RecS.GetRows() - 1);
165 }
166
167
168 protected:
NewPlane()169 void NewPlane()
170 {
171 size_t max_col = max((size_t)m_RecS.GetCols(), (size_t)1);
172 m_RecS.Resize(m_RecS.GetRows() + 1, max_col);
173 m_VolS.Resize(m_VolS.GetRows() + 1, max_col);
174 }
175
176
177 TVolumeSize m_VolS; ///< Volumes BLOB sizes
178 TVolumeRecs m_RecS; ///< Volumes record counts
179
180 double m_VolMax; ///< Volume max size
181 unsigned m_RecMax; ///< Maximum number of records
182 };
183
184 /// Split demux which can save and load state into a file
185 /// Stateful (persistent) class.
186 ///
187 class CBDB_BlobDeMuxPersistent : public CBDB_BlobDeMux
188 {
189 public:
CBDB_BlobDeMuxPersistent(const string & path,double vol_max=1.5* (1024.00* 1024.00* 1024.00),unsigned rec_max=3* 1000000)190 CBDB_BlobDeMuxPersistent(const string& path,
191 double vol_max = 1.5 * (1024.00*1024.00*1024.00),
192 unsigned rec_max = 3 * 1000000)
193 : CBDB_BlobDeMux(vol_max, rec_max)
194 , m_Path(path)
195 {
196 if ( !m_Path.empty() && CFile(m_Path).Exists()) {
197 CNcbiIfstream istr(m_Path.c_str());
198 Load(istr);
199 }
200 }
201
~CBDB_BlobDeMuxPersistent()202 ~CBDB_BlobDeMuxPersistent()
203 {
204 if ( !m_Path.empty() ) {
205 try {
206 CNcbiOfstream ostr(m_Path.c_str());
207 Save(ostr);
208 }
209 catch (CException& e) {
210 ERR_POST_XX(Db_Bdb_Blob, 2, Error << "CBDB_BlobDeMux::~CBDB_BlobDeMux(): "
211 "error saving demultiplex data: " << e.what());
212 }
213 }
214 }
215
216
Save(CNcbiOstream & ostr)217 void Save(CNcbiOstream& ostr)
218 {
219 ostr << m_RecS.GetRows() << " " << m_RecS.GetCols() << endl;
220 for (size_t i = 0; i < m_RecS.GetRows(); ++i) {
221 for (size_t j = 0; j < m_RecS.GetCols(); ++j) {
222 ostr << m_RecS(i, j) << " ";
223 }
224 ostr << endl;
225 }
226 ostr << m_VolS.GetRows() << " " << m_VolS.GetCols() << endl;
227 for (size_t i = 0; i < m_VolS.GetRows(); ++i) {
228 for (size_t j = 0; j < m_VolS.GetCols(); ++j) {
229 ostr << m_VolS(i, j) << " ";
230 }
231 ostr << endl;
232 }
233 }
234
Load(CNcbiIstream & istr)235 void Load(CNcbiIstream& istr)
236 {
237 size_t i, j;
238
239 istr >> i >> j;
240 m_RecS.Resize(i, j);
241 for (i = 0; i < m_RecS.GetRows(); ++i) {
242 for (j = 0; j < m_RecS.GetCols(); ++j) {
243 istr >> m_RecS(i, j);
244 }
245 }
246
247 istr >> i >> j;
248 m_VolS.Resize(i, j);
249 for (i = 0; i < m_VolS.GetRows(); ++i) {
250 for (j = 0; j < m_VolS.GetCols(); ++j) {
251 istr >> m_VolS(i, j);
252 }
253 }
254 }
255
256 private:
257 string m_Path;
258 };
259
260
261 /// BLOB demultiplexer implements round-robin volume rotation.
262 ///
263 /// This demultiplexer sends every new BLOB to a next volume, reducing
264 /// locking contention over one BDB database.
265 ///
266 class CBDB_BlobDeMux_RoundRobin : public CBDB_BlobDeMuxSplit
267 {
268 public:
CBDB_BlobDeMux_RoundRobin(unsigned volumes=0)269 CBDB_BlobDeMux_RoundRobin(unsigned volumes = 0)
270 : m_Volumes(volumes), m_CurrVolume(0)
271 {
272 }
273
274 /// coordinates:
275 ///
276 /// 0 - active volume number
277 /// 1 - page split number
278 ///
GetCoordinates(unsigned blob_size,unsigned * coord)279 void GetCoordinates(unsigned blob_size, unsigned* coord)
280 {
281 _ASSERT(coord);
282
283 coord[0] = m_CurrVolume;
284 coord[1] = SelectSplit(blob_size);
285
286 // every next BLOB goes to the next volume (round-robin)
287 ++m_CurrVolume;
288 if (m_CurrVolume >= m_Volumes) {
289 m_CurrVolume = 0;
290 }
291 }
292
293 /// Returns total number of volumes (vertical projection)
294 /// If method returns 0 - means there is no hard number: open ended proj
GetVolumeSize() const295 unsigned GetVolumeSize() const { return m_Volumes; }
296
297 private:
298 unsigned m_Volumes;
299 unsigned m_CurrVolume;
300 };
301
302
303
304
305 /// BLOB storage based on single unsigned integer key
306 /// Supports BLOB volumes and different base page size files in the volume
307 /// to guarantee the best fit.
308 ///
309 ///
310 /// Problem.
311 /// Berkeley DB shows measurable difference in behavior and performance
312 /// depending on the combination of record size and database page size.
313 /// Differences include amount of disk traffic, locking granularity,
314 /// number of overflow pages, etc.
315 ///
316 /// The most critical here is overflow pages.
317 /// If DB page cannot accommodate 2(sometimes more) records BDB creates
318 /// overflow pages. This is found to be expensive. The typical fix is to
319 /// increase the page size.
320 /// Large page size is inefficient for dealing with small record
321 /// (you have to load/store 64K (full page) to load small object.
322 /// In transaction environment page access are also locks a lot of records.
323 /// Page size also influences B-Tree depth and number of internal pages.
324 /// Number of internal pages affects database size and retrieval performance.
325 ///
326 ///
327 /// Object maintains a matrix of BDB databases.
328 /// Every row maintains certain database volume or(and) number of records.
329 /// Every column groups BLOBs of certain size together, so class can choose
330 /// the best page size to store BLOBs without long chains of overflow pages.
331 ///
332 /// <pre>
333 /// Page size split:
334 /// Volume
335 /// split: 4K 8K 16K 32K
336 /// +------+------+------+------+
337 /// row = 0 | DB | ...................| = SUM = N Gbytes
338 /// row = 1 | DB | ..... | = SUM = N GBytes
339 ///
340 /// .........................
341 ///
342 /// +------+------+------+------+
343 ///
344 /// </pre>
345 ///
346 /// Matrix coordinates picking is implemented using concept called DeMux.
347 /// It maintains BLOB_ID <-> coordinates association.
348 /// Demux implementation(s) use bit-vectors to do the job. BLOB ID must be
349 /// unique across the store. In general DeMux can work with N-dimensional
350 /// coordinates to address host, partition, volume, slice (distributed
351 /// store). But current practical implementation uses 2D matrix (volume,
352 /// slice).
353 ///
354
355 template<class TBV, class TObjDeMux=CBDB_BlobDeMux, class TL=CFastMutex>
356 class CBDB_BlobSplitStore : public CThreadLocalTransactional
357 {
358 public:
359 typedef CIdDeMux<TBV> TIdDeMux;
360 typedef TBV TBitVector;
361 typedef CBDB_BlobStoreDict<TBV> TDeMuxStore;
362 typedef TL TLock;
363 typedef typename TL::TWriteLockGuard TLockGuard;
364 typedef CBDB_IdBlobFile TBlobFile;
365
366 #ifndef NCBI_SWIG
367 /// BDB Database together with the locker
368 /// One database is opened twice, one regular mode,
369 /// another - dedicated read-only instance to improve concurrency
370 ///
371 struct SLockedDb : public CObject
372 {
373 AutoPtr<TBlobFile> db; ///< database file
374 AutoPtr<TLock> lock; ///< db lock
375 AutoPtr<TBlobFile> db_ro; ///< database file for reads
376 AutoPtr<TLock> lock_ro; ///< db lock for reads
377 };
378 #endif
379
380 /// Volume split on optimal page size
381 struct SVolume
382 {
383 vector< CRef<SLockedDb> > db_vect;
384 };
385
386 typedef vector<SVolume*> TVolumeVect;
387
388 public:
389 /// Construction
390 /// The main parameter here is object demultiplexer for splitting
391 /// incoming LOBs into volumes and slices
392 ///
393 CBDB_BlobSplitStore(TObjDeMux* de_mux);
394 ~CBDB_BlobSplitStore();
395
396 /// Open storage (reads storage dictionary into memory)
397 void Open(const string& storage_name,
398 CBDB_RawFile::EOpenMode open_mode,
399 CBDB_RawFile::EDBType db_type=CBDB_RawFile::eBtree);
400
401 /// Return true if the split store has been opened
402 bool IsOpen() const;
403
404 /// Try to open all storage files in all projections
405 /// This is only possible when object de-mux has fixed
406 /// number of projections, if it is not the call is silently ignored
407 ///
408 void OpenProjections();
409
410 /// Save storage dictionary (demux disposition).
411 /// If you modified storage (like added new BLOBs to the storage)
412 /// you MUST call save; otherwise some disposition information is lost.
413 ///
414 void Save(typename TDeMuxStore::ECompact compact_vectors
415 = TDeMuxStore::eCompact);
416
417
SetVolumeCacheSize(unsigned int cache_size)418 void SetVolumeCacheSize(unsigned int cache_size)
419 { m_VolumeCacheSize = cache_size; }
420
421 /// Associate with the environment. Should be called before opening.
SetEnv(CBDB_Env & env)422 void SetEnv(CBDB_Env& env) { m_Env = &env; }
423
424 /// Get pointer on file environment
425 /// Return NULL if no environment has been set
GetEnv(void) const426 CBDB_Env* GetEnv(void) const { return m_Env; }
427
428 /// Return the base filename of the underlying split store
GetFileName() const429 const string& GetFileName() const { return m_StorageName; }
430
431 /// Turn off reverse splitting on the underlying stores. This should be
432 /// called before opening.
433 void RevSplitOff();
434
435 /// Set the priority for this database's pages in the buffer cache
436 /// This is generally a temporary advisement, and works only if an
437 /// environment is used.
438 void SetCachePriority(CBDB_RawFile::ECachePriority);
439
440 // ---------------------------------------------------------------
441 // Transactional interface
442 // ---------------------------------------------------------------
443 virtual void SetTransaction(ITransaction* trans);
444
445 CBDB_Transaction* GetBDBTransaction();
446
447
448 // ---------------------------------------------------------------
449 // Data manipulation interface
450 // ---------------------------------------------------------------
451
452 /// Insert BLOB into the storage.
453 ///
454 /// This method does NOT check if this object is already storead
455 /// somewhere. Method can create duplicates.
456 ///
457 /// @param id insertion key
458 /// @param data buffer pointer
459 /// @param size LOB data size in bytes
460 /// @param coord out: volume - page split number
461 ///
462 EBDB_ErrCode Insert(unsigned id,
463 const void* data, size_t size,
464 unsigned* coord);
465
466 EBDB_ErrCode Insert(unsigned id,
467 const void* data, size_t size);
468
469 /// Update or insert BLOB
470 EBDB_ErrCode UpdateInsert(unsigned id,
471 const void* data, size_t size,
472 unsigned* coord);
473
474 EBDB_ErrCode UpdateInsert(unsigned id,
475 const void* data, size_t size);
476
477 /// Update or insert BLOB using old coordinates
478 EBDB_ErrCode UpdateInsert(unsigned id,
479 const unsigned* old_coord,
480 const void* data, size_t size,
481 unsigned* coord);
482
483 /// Delete BLOB
484 EBDB_ErrCode Delete(unsigned id,
485 CBDB_RawFile::EIgnoreError on_error =
486 CBDB_RawFile::eThrowOnError);
487
488 EBDB_ErrCode Delete(unsigned id,
489 const unsigned* coords,
490 CBDB_RawFile::EIgnoreError on_error =
491 CBDB_RawFile::eThrowOnError);
492
493
494 /// Find (demux) coordinates by BLOB id
495 ///
496 EBDB_ErrCode GetCoordinates(unsigned id, unsigned* coords);
497
498 /// Assing de-mux coordinates
499 void AssignCoordinates(unsigned id, const unsigned* coords);
500
501 /// Returns true if two sets of coordinates are the same
502 bool IsSameCoordinates(const unsigned* coords1,
503 const unsigned* coords2);
504
505 /// Read BLOB into vector.
506 /// If BLOB does not fit, method resizes the vector to accomodate.
507 ///
508 EBDB_ErrCode ReadRealloc(unsigned id,
509 CBDB_RawFile::TBuffer& buffer);
510
511 /// Read BLOB into vector using provided coordinates
512 /// If BLOB does not fit, method resizes the vector to accomodate.
513 ///
514 EBDB_ErrCode ReadRealloc(unsigned id,
515 const unsigned* coords,
516 CBDB_RawFile::TBuffer& buffer);
517
518 /// Fetch LOB record directly into the provided '*buf'.
519 /// If size of the LOB is greater than 'buf_size', then
520 /// if reallocation is allowed -- '*buf' will be reallocated
521 /// to fit the LOB size; otherwise, a exception will be thrown.
522 ///
523 EBDB_ErrCode Fetch(unsigned id,
524 void** buf,
525 size_t buf_size,
526 CBDB_RawFile::EReallocMode allow_realloc,
527 size_t* blob_size);
528
529 EBDB_ErrCode Fetch(unsigned id,
530 const unsigned* coords,
531 void** buf,
532 size_t buf_size,
533 CBDB_RawFile::EReallocMode allow_realloc,
534 size_t* blob_size);
535
536 /// Sync the underlying stores
537 void Sync();
538
539 /// Create stream oriented reader
540 /// @returns NULL if BLOB not found
541 ///
542 /// This method does NOT block the specified ID from concurrent access
543 /// for the life of IReader. The nature of BDB IReader is that each Read
544 /// maps into BDB get, so somebody can delete the BLOB between IReader calls.
545 /// This potential race should be taken into account in MT concurrent
546 /// application.
547 ///
548 /// Caller is responsible for deletion.
549 ///
550 IReader* CreateReader(unsigned id);
551
552 IReader* CreateReader(unsigned id, const unsigned* coords);
553
554 /// Get size of the BLOB
555 ///
556 /// @note Price of this operation is almost the same as getting
557 /// the actual BLOB. It is often better just to fetch BLOB speculatively,
558 /// hoping it fits in the buffer and resizing the buffer on exception.
559 ///
560 EBDB_ErrCode BlobSize(unsigned id,
561 size_t* blob_size);
562
563 EBDB_ErrCode BlobSize(unsigned id,
564 const unsigned* coords,
565 size_t* blob_size);
566
567 /// Get all id of all BLOBs stored
568 ///
569 /// @param bv
570 /// Vector of IDs stored
571 ///
572 void GetIdVector(TBitVector* bv) const;
573
574 /// Reclaim unused memory
575 void FreeUnusedMem();
576 protected:
577 /// Close volumes without saving or doing anything with id demux
578 void CloseVolumes();
579
580 void LoadIdDeMux(TIdDeMux& de_mux, TDeMuxStore& dict_file);
581
582 /// Store id demux (projection vectors) into the database file
583 void SaveIdDeMux(const TIdDeMux& de_mux,
584 TDeMuxStore& dict_file,
585 CBDB_Transaction* trans,
586 typename TDeMuxStore::ECompact compact_vectors);
587
588 /// Select preferred page size for the specified slice
589 unsigned GetPageSize(unsigned splice) const;
590
591 /// Open split storage dictionary
592 void OpenDict();
593
594 /// Make BDB file name based on volume and page size split
595 string MakeDbFileName(unsigned vol,
596 unsigned slice);
597
598
599 /// Read or write operation
600 enum EGetDB_Mode {
601 eGetRead,
602 eGetWrite
603 };
604
605 /// Get database pair (method opens and mounts database if necessary)
606 SLockedDb& GetDb(unsigned vol, unsigned slice,
607 EGetDB_Mode get_mode);
608
609 /// Init database mutex lock (mathod is protected against double init)
610 void InitDbMutex(SLockedDb* ldb);
611
612 protected:
613 int m_TransAssociation;
614
615 vector<unsigned> m_PageSizes;
616 unsigned m_VolumeCacheSize;
617 CBDB_Env* m_Env;
618 unique_ptr<TDeMuxStore> m_DictFile; ///< Split dictionary(id demux file)
619 mutable TLock m_DictFileLock; ///< id demux file locker
620
621 unique_ptr<TIdDeMux> m_IdDeMux; ///< Id to coordinates mapper
622 mutable CRWLock m_IdDeMuxLock;
623
624 unique_ptr<TObjDeMux> m_ObjDeMux; ///< Obj to coordinates mapper
625 TLock m_ObjDeMuxLock;
626
627 TVolumeVect m_Volumes; ///< Volumes
628 mutable TLock m_VolumesLock; ///< Volumes locker
629
630 string m_StorageName;
631 CBDB_RawFile::EOpenMode m_OpenMode;
632 CBDB_RawFile::EDBType m_DB_Type;
633 CBDB_RawFile::ECachePriority m_CachePriority;
634
635 /// True when all proj.dbs are pre-open
636 bool m_AllProjAvail;
637
638 /// Flag carrying reverse split status
639 bool m_RevSplitOff;
640
641 /// Lock used to sync. muli-db transactions to avoid deadlocks
642 TLock m_CrossDBLock;
643
644 private:
645 /// forbidden
646 CBDB_BlobSplitStore(const CBDB_BlobSplitStore<TBV, TObjDeMux, TL>&);
647 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>& operator=(const CBDB_BlobSplitStore<TBV, TObjDeMux, TL>&);
648 };
649
650 /* @} */
651
652
653 /////////////////////////////////////////////////////////////////////////////
654 // IMPLEMENTATION of INLINE functions
655 /////////////////////////////////////////////////////////////////////////////
656
657
658 template<class TBV, class TObjDeMux, class TL>
659 inline
CBDB_BlobSplitStore(TObjDeMux * de_mux)660 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CBDB_BlobSplitStore(TObjDeMux* de_mux)
661 : m_TransAssociation(CBDB_Transaction::eFullAssociation),
662 m_PageSizes(7),
663 m_VolumeCacheSize(0),
664 m_Env(0),
665 m_IdDeMux(new TIdDeMux(2)),
666 m_ObjDeMux(de_mux),
667 m_OpenMode(CBDB_RawFile::eReadOnly),
668 m_DB_Type(CBDB_RawFile::eBtree),
669 m_CachePriority(CBDB_RawFile::eCache_Default),
670 m_AllProjAvail(false),
671 m_RevSplitOff(false)
672 {
673 m_PageSizes[0] = 0; // max blob size = 256
674 m_PageSizes[1] = 0; // max blob size = 512
675 m_PageSizes[2] = 8 * 1024; // max blob size = 2048
676 m_PageSizes[3] = 16* 1024; // max blob size = 4096
677 m_PageSizes[4] = 32* 1024; // max blob size = 8192
678 m_PageSizes[5] = 64* 1024; // max blob size = 16384
679 m_PageSizes[6] = 64* 1024; // max blob size = 32768
680 }
681
682 template<class TBV, class TObjDeMux, class TL>
683 inline
~CBDB_BlobSplitStore()684 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::~CBDB_BlobSplitStore()
685 {
686 try {
687 CloseVolumes();
688 }
689 catch (std::exception& e) {
690 ERR_POST_XX(Db_Bdb_Blob, 3, Error
691 << "CBDB_BlobSplitStore<>::~CBDB_BlobSplitStore(): "
692 "error in CloseVolumes(): " << e.what());
693 }
694
695 try {
696 if (m_OpenMode != CBDB_RawFile::eReadOnly) {
697 Save();
698 }
699 }
700 catch (std::exception& e) {
701 ERR_POST_XX(Db_Bdb_Blob, 4, Error
702 << "CBDB_BlobSplitStore<>::~CBDB_BlobSplitStore(): "
703 "error in Save(): " << e.what());
704 }
705 }
706
707 template<class TBV, class TObjDeMux, class TL>
708 inline
RevSplitOff()709 void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::RevSplitOff()
710 {
711 m_RevSplitOff = true;
712 }
713
714
715 template<class TBV, class TObjDeMux, class TL>
716 inline
SetCachePriority(CBDB_RawFile::ECachePriority p)717 void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SetCachePriority(CBDB_RawFile::ECachePriority p)
718 {
719 m_CachePriority = p;
720 }
721
722
723 template<class TBV, class TObjDeMux, class TL>
724 inline
CloseVolumes()725 void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CloseVolumes()
726 {
727 for (size_t i = 0; i < m_Volumes.size(); ++i) {
728 SVolume* v = m_Volumes[i];
729 delete v;
730 }
731 }
732
733 template<class TBV, class TObjDeMux, class TL>
734 inline bool
IsSameCoordinates(const unsigned * coords1,const unsigned * coords2)735 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::IsSameCoordinates(
736 const unsigned* coords1,
737 const unsigned* coords2)
738 {
739 return coords1[0] == coords2[0] &&
740 coords1[1] == coords2[1];
741 }
742
743 template<class TBV, class TObjDeMux, class TL>
744 inline void
GetIdVector(TBitVector * bv) const745 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetIdVector(TBitVector* bv) const
746 {
747 CReadLockGuard lg(m_IdDeMuxLock);
748 m_IdDeMux->GetIdVector(bv);
749 }
750
751
752 template<class TBV, class TObjDeMux, class TL>
753 inline void
AssignCoordinates(unsigned id,const unsigned * coords)754 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::AssignCoordinates(
755 unsigned id,
756 const unsigned* coords)
757 {
758 unsigned old_coord[2];
759 bool found;
760 {{
761 CReadLockGuard lg(m_IdDeMuxLock);
762 found = m_IdDeMux->GetCoordinatesFast(id, old_coord);
763 }}
764 if (found && IsSameCoordinates(old_coord, coords)) {
765 return;
766 }
767 // correct coordinate mapping
768 {{
769 CWriteLockGuard lg(m_IdDeMuxLock);
770 m_IdDeMux->SetCoordinatesFast(id, coords);
771 }}
772
773 }
774
775 template<class TBV, class TObjDeMux, class TL>
776 inline EBDB_ErrCode
Insert(unsigned id,const void * data,size_t size)777 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Insert(unsigned id,
778 const void* data,
779 size_t size)
780 {
781 unsigned coord[2];
782 return this->Insert(id, data, size, coord);
783
784 }
785
786
787 template<class TBV, class TObjDeMux, class TL>
788 inline EBDB_ErrCode
Insert(unsigned int id,const void * data,size_t size,unsigned * coord)789 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Insert(unsigned int id,
790 const void* data,
791 size_t size,
792 unsigned* coord)
793 {
794 _ASSERT(coord);
795
796 // check if BLOB exists
797 bool found;
798 {{
799 CReadLockGuard lg(m_IdDeMuxLock);
800 found = m_IdDeMux->GetCoordinatesFast(id, coord);
801 }}
802 if (found) {
803 return eBDB_KeyDup;
804 }
805
806
807 {{
808 TLockGuard lg(m_ObjDeMuxLock);
809 m_ObjDeMux->GetCoordinates((unsigned)size, coord);
810 }}
811
812 {{
813 CWriteLockGuard lg(m_IdDeMuxLock);
814 m_IdDeMux->SetCoordinatesFast(id, coord);
815 }}
816
817 SLockedDb& dbp = this->GetDb(coord[0], coord[1], eGetWrite);
818 {{
819 TLockGuard lg(*dbp.lock);
820 dbp.db->SetTransaction(GetTransaction());
821 dbp.db->id = id;
822 return dbp.db->Insert(data, size);
823 }}
824 }
825
826 template<class TBV, class TObjDeMux, class TL>
827 inline EBDB_ErrCode
UpdateInsert(unsigned int id,const void * data,size_t size)828 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::UpdateInsert(unsigned int id,
829 const void* data,
830 size_t size)
831 {
832 unsigned coord[2];
833 return this->UpdateInsert(id, data, size, coord);
834 }
835
836
837 template<class TBV, class TObjDeMux, class TL>
838 inline EBDB_ErrCode
UpdateInsert(unsigned int id,const void * data,size_t size,unsigned * coord)839 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::UpdateInsert(unsigned int id,
840 const void* data,
841 size_t size,
842 unsigned* coord)
843 {
844 bool found;
845 {{
846 CReadLockGuard lg(m_IdDeMuxLock);
847 found = m_IdDeMux->GetCoordinatesFast(id, coord);
848 }}
849 if (!found) {
850 return this->Insert(id, data, size, coord);
851 }
852
853 unsigned slice = m_ObjDeMux->SelectSplit(size);
854 if (slice != coord[1]) {
855 // lock to prevent deadlock (no guarentee on the order of update)
856 TLockGuard lg(m_CrossDBLock);
857
858 this->Delete(id, CBDB_RawFile::eThrowOnError);
859 return this->Insert(id, data, size, coord);
860 } else {
861 SLockedDb& dbp = this->GetDb(coord[0], coord[1], eGetWrite);
862 TLockGuard lg(*(dbp.lock));
863 dbp.db->SetTransaction(GetTransaction());
864 dbp.db->id = id;
865 return dbp.db->UpdateInsert(data, size);
866 }
867 }
868
869 template<class TBV, class TObjDeMux, class TL>
870 inline EBDB_ErrCode
UpdateInsert(unsigned id,const unsigned * old_coord,const void * data,size_t size,unsigned * coord)871 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::UpdateInsert(unsigned id,
872 const unsigned* old_coord,
873 const void* data,
874 size_t size,
875 unsigned* coord)
876 {
877 _ASSERT(old_coord);
878 _ASSERT(coord);
879
880 bool found;
881 {{
882 CReadLockGuard lg(m_IdDeMuxLock);
883 found = m_IdDeMux->GetCoordinatesFast(id, coord);
884 }}
885 if (found) {
886 // consistency check
887 if (old_coord[0] != coord[0] || old_coord[1] != coord[1]) {
888 // lock to prevent deadlock (no guarentee on the order of update)
889 TLockGuard lg(m_CrossDBLock);
890
891 // external (or internal)coordinate storage skrewed
892 this->Delete(id, coord, CBDB_RawFile::eThrowOnError);
893 this->Delete(id, old_coord, CBDB_RawFile::eThrowOnError);
894
895 // re-insert
896 return this->Insert(id, data, size, coord);
897 }
898 }
899
900 coord[0] = old_coord[0];
901 coord[1] = old_coord[1];
902
903 if (!found) {
904 return this->Insert(id, data, size, coord);
905 }
906
907 unsigned slice = m_ObjDeMux->SelectSplit(size);
908 if (slice != coord[1]) {
909 // lock to prevent deadlock (no guarentee on the order of update)
910 TLockGuard lg(m_CrossDBLock);
911
912 this->Delete(id, coord, CBDB_RawFile::eThrowOnError);
913 return this->Insert(id, data, size, coord);
914 } else {
915 SLockedDb& dbp = this->GetDb(coord[0], coord[1], eGetWrite);
916 TLockGuard lg(*dbp.lock);
917 dbp.db->SetTransaction(GetTransaction());
918 dbp.db->id = id;
919 return dbp.db->UpdateInsert(data, size);
920 }
921 }
922
923
924
925 template<class TBV, class TObjDeMux, class TL>
926 inline EBDB_ErrCode
Delete(unsigned id,CBDB_RawFile::EIgnoreError on_error)927 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Delete(unsigned id,
928 CBDB_RawFile::EIgnoreError on_error)
929 {
930 unsigned coord[2];
931 bool found;
932 {{
933 CReadLockGuard lg(m_IdDeMuxLock);
934 found = m_IdDeMux->GetCoordinatesFast(id, coord);
935 }}
936 if (!found) {
937 return eBDB_NotFound;
938 }
939 return this->Delete(id, coord, on_error);
940 }
941
942 template<class TBV, class TObjDeMux, class TL>
943 inline EBDB_ErrCode
Delete(unsigned id,const unsigned * coords,CBDB_RawFile::EIgnoreError on_error)944 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Delete(unsigned id,
945 const unsigned* coords,
946 CBDB_RawFile::EIgnoreError on_error)
947 {
948 // clear coordinate mapping
949 {{
950 CWriteLockGuard lg(m_IdDeMuxLock);
951 m_IdDeMux->SetCoordinatesFast(id, coords, false);
952 }}
953
954 SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetWrite);
955 {{
956 TLockGuard lg(*dbp.lock);
957 dbp.db->SetTransaction(GetTransaction());
958 dbp.db->id = id;
959 return dbp.db->Delete(on_error);
960 }}
961 }
962
963
964
965 template<class TBV, class TObjDeMux, class TL>
966 inline IReader*
CreateReader(unsigned id)967 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CreateReader(unsigned id)
968 {
969 unsigned coord[2];
970 bool found;
971 {{
972 CReadLockGuard lg(m_IdDeMuxLock);
973 found = m_IdDeMux->GetCoordinatesFast(id, coord);
974 }}
975 if (!found) {
976 return 0;
977 }
978 return this->CreateReader(id, coord);
979 }
980
981 template<class TBV, class TObjDeMux, class TL>
982 inline IReader*
CreateReader(unsigned id,const unsigned * coords)983 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::CreateReader(
984 unsigned id,
985 const unsigned* coords)
986 {
987 TBlobFile* db;
988 TLock* lock;
989 {{
990 SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
991
992 if (dbp.db_ro.get()) {
993 db = dbp.db_ro.get();
994 lock = dbp.lock_ro.get();
995 } else {
996 db = dbp.db.get();
997 lock = dbp.lock.get();
998 }
999 }}
1000 {{
1001 TLockGuard lg(*lock);
1002 db->SetTransaction(GetTransaction());
1003 db->id = id;
1004 if (db->Fetch() != eBDB_Ok) {
1005 return 0;
1006 }
1007 return db->CreateReader();
1008 }}
1009 }
1010
1011
1012 template<class TBV, class TObjDeMux, class TL>
FreeUnusedMem()1013 inline void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::FreeUnusedMem()
1014 {
1015 CWriteLockGuard lg(m_IdDeMuxLock);
1016 m_IdDeMux->FreeUnusedMem();
1017 }
1018 template<class TBV, class TObjDeMux, class TL>
1019 inline CBDB_Transaction*
GetBDBTransaction()1020 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetBDBTransaction()
1021 {
1022 ITransaction* trans = this->GetTransaction();
1023 return dynamic_cast<CBDB_Transaction*>(trans);
1024 }
1025
1026 template<class TBV, class TObjDeMux, class TL>
1027 inline void
SetTransaction(ITransaction * trans)1028 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SetTransaction(ITransaction* trans)
1029 {
1030 CBDB_Transaction* db_trans = CBDB_Transaction::CastTransaction(trans);
1031 CBDB_Transaction* curr_trans;
1032
1033 if (m_TransAssociation == (int) CBDB_Transaction::eFullAssociation) {
1034 curr_trans = this->GetBDBTransaction();
1035 if (curr_trans) {
1036 curr_trans->Remove(this);
1037 }
1038 }
1039
1040 curr_trans = db_trans;
1041 if (curr_trans) {
1042 m_TransAssociation = curr_trans->GetAssociationMode();
1043 if (m_TransAssociation == (int) CBDB_Transaction::eFullAssociation) {
1044 curr_trans->Add(this);
1045 }
1046 }
1047 CThreadLocalTransactional::SetTransaction(curr_trans);
1048 }
1049
1050
1051 template<class TBV, class TObjDeMux, class TL>
1052 inline void
Sync()1053 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Sync()
1054 {
1055 for (size_t i = 0; i < m_Volumes.size(); ++i) {
1056 SVolume* v = m_Volumes[i];
1057 if ( !v ) {
1058 continue;
1059 }
1060 for (size_t j = 0; j < v->db_vect.size(); ++j) {
1061 SLockedDb* db = &*(v->db_vect[j]);
1062 if (db && db->db) {
1063 db->db->Sync();
1064 }
1065 }
1066 }
1067 }
1068
1069
1070 template<class TBV, class TObjDeMux, class TL>
1071 inline EBDB_ErrCode
Fetch(unsigned id,const unsigned * coords,void ** buf,size_t buf_size,CBDB_RawFile::EReallocMode allow_realloc,size_t * blob_size)1072 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Fetch(unsigned id,
1073 const unsigned* coords,
1074 void** buf,
1075 size_t buf_size,
1076 CBDB_RawFile::EReallocMode allow_realloc,
1077 size_t* blob_size)
1078 {
1079 EBDB_ErrCode ret;
1080 TBlobFile* db;
1081 TLock* lock;
1082 {{
1083 SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
1084
1085 if (dbp.db_ro.get()) {
1086 db = dbp.db_ro.get();
1087 lock = dbp.lock_ro.get();
1088 } else {
1089 db = dbp.db.get();
1090 lock = dbp.lock.get();
1091 }
1092 }}
1093 {{
1094 TLockGuard lg(*lock);
1095 db->SetTransaction(GetTransaction());
1096 db->id = id;
1097
1098 ret = db->Fetch(buf, buf_size, allow_realloc);
1099 if (ret == eBDB_Ok) {
1100 if (blob_size) {
1101 *blob_size = db->LobSize();
1102 }
1103 }
1104 }}
1105 return ret;
1106 }
1107
1108
1109
1110 template<class TBV, class TObjDeMux, class TL>
1111 inline EBDB_ErrCode
Fetch(unsigned id,void ** buf,size_t buf_size,CBDB_RawFile::EReallocMode allow_realloc,size_t * blob_size)1112 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Fetch(unsigned id,
1113 void** buf,
1114 size_t buf_size,
1115 CBDB_RawFile::EReallocMode allow_realloc,
1116 size_t* blob_size)
1117 {
1118 unsigned coord[2];
1119 bool found;
1120 {{
1121 CReadLockGuard lg(m_IdDeMuxLock);
1122 found = m_IdDeMux->GetCoordinatesFast(id, coord);
1123 }}
1124 if (!found) {
1125 return eBDB_NotFound;
1126 }
1127 return this->Fetch(id, coord, buf, buf_size, allow_realloc, blob_size);
1128 }
1129
1130 template<class TBV, class TObjDeMux, class TL>
1131 inline EBDB_ErrCode
GetCoordinates(unsigned id,unsigned * coord)1132 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetCoordinates(unsigned id,
1133 unsigned* coord)
1134 {
1135 _ASSERT(coord);
1136 bool found;
1137 {{
1138 CReadLockGuard lg(m_IdDeMuxLock);
1139 found = m_IdDeMux->GetCoordinatesFast(id, coord);
1140 }}
1141 if (!found) {
1142 return eBDB_NotFound;
1143 }
1144 return eBDB_Ok;
1145 }
1146
1147 template<class TBV, class TObjDeMux, class TL>
1148 inline EBDB_ErrCode
ReadRealloc(unsigned id,const unsigned * coords,CBDB_RawFile::TBuffer & buffer)1149 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::ReadRealloc(
1150 unsigned id,
1151 const unsigned* coords,
1152 CBDB_RawFile::TBuffer& buffer)
1153 {
1154 _ASSERT(coords);
1155
1156 TBlobFile* db = NULL;
1157 TLock* lock = NULL;
1158 {{
1159 SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
1160
1161 if (dbp.db_ro.get()) {
1162 db = dbp.db_ro.get();
1163 lock = dbp.lock_ro.get();
1164 } else {
1165 db = dbp.db.get();
1166 lock = dbp.lock.get();
1167 }
1168 }}
1169
1170 {{
1171 TLockGuard lg(*lock);
1172
1173 db->SetTransaction(GetBDBTransaction());
1174 db->id = id;
1175 EBDB_ErrCode e = db->ReadRealloc(buffer);
1176 return e;
1177 }}
1178 }
1179
1180
1181 template<class TBV, class TObjDeMux, class TL>
1182 inline EBDB_ErrCode
ReadRealloc(unsigned id,CBDB_RawFile::TBuffer & buffer)1183 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::ReadRealloc(
1184 unsigned id,
1185 CBDB_RawFile::TBuffer& buffer)
1186 {
1187 unsigned coord[2];
1188 bool found;
1189 {{
1190 CReadLockGuard lg(m_IdDeMuxLock);
1191 found = m_IdDeMux->GetCoordinatesFast(id, coord);
1192 }}
1193 if (!found) {
1194 return eBDB_NotFound;
1195 }
1196 return this->ReadRealloc(id, coord, buffer);
1197 }
1198
1199 template<class TBV, class TObjDeMux, class TL>
1200 inline EBDB_ErrCode
BlobSize(unsigned id,size_t * blob_size)1201 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::BlobSize(unsigned id,
1202 size_t* blob_size)
1203 {
1204 unsigned coord[2];
1205 bool found;
1206 {{
1207 CReadLockGuard lg(m_IdDeMuxLock);
1208 found = m_IdDeMux->GetCoordinatesFast(id, coord);
1209 }}
1210 if (!found) {
1211 return eBDB_NotFound;
1212 }
1213 return this->BlobSize(id, coord, blob_size);
1214 }
1215
1216 template<class TBV, class TObjDeMux, class TL>
1217 inline EBDB_ErrCode
BlobSize(unsigned id,const unsigned * coords,size_t * blob_size)1218 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::BlobSize(unsigned id,
1219 const unsigned* coords,
1220 size_t* blob_size)
1221 {
1222 TBlobFile* db;
1223 TLock* lock;
1224 {{
1225 SLockedDb& dbp = this->GetDb(coords[0], coords[1], eGetRead);
1226
1227 if (dbp.db_ro.get()) {
1228 db = dbp.db_ro.get();
1229 lock = dbp.lock_ro.get();
1230 } else {
1231 db = dbp.db.get();
1232 lock = dbp.lock.get();
1233 }
1234 }}
1235
1236 {{
1237 TLockGuard lg(*lock);
1238 db->SetTransaction(GetTransaction());
1239 db->id = id;
1240 EBDB_ErrCode e = db->Fetch();
1241 if (e != eBDB_Ok) {
1242 return e;
1243 }
1244 *blob_size = db->LobSize();
1245 return e;
1246 }}
1247 }
1248
1249
1250
1251 template<class TBV, class TObjDeMux, class TL>
1252 inline void
Open(const string & storage_name,CBDB_RawFile::EOpenMode open_mode,CBDB_RawFile::EDBType db_type)1253 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Open(const string& storage_name,
1254 CBDB_RawFile::EOpenMode open_mode,
1255 CBDB_RawFile::EDBType db_type)
1256 {
1257 CloseVolumes();
1258 m_StorageName = storage_name;
1259 m_OpenMode = open_mode;
1260 m_DB_Type = db_type;
1261
1262 {{
1263 TLockGuard lg1(m_DictFileLock);
1264 CReadLockGuard lg2(m_IdDeMuxLock);
1265
1266 OpenDict();
1267 LoadIdDeMux(*m_IdDeMux, *m_DictFile);
1268 }}
1269 }
1270
1271 template<class TBV, class TObjDeMux, class TL>
1272 inline bool
IsOpen() const1273 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::IsOpen() const
1274 {
1275 return m_DictFile.get() ? true : false;
1276 }
1277
1278 template<class TBV, class TObjDeMux, class TL>
1279 inline void
OpenProjections()1280 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::OpenProjections()
1281 {
1282 unsigned max_split = m_ObjDeMux->GetSplitSize();
1283 unsigned max_vol = m_ObjDeMux->GetVolumeSize();
1284 if (!max_split || !max_vol) {
1285 // cannot do anything: open ended projections
1286 return;
1287 }
1288 for (unsigned i = 0; i < max_vol; ++i) {
1289 for (unsigned j = 0; j < max_split; ++j) {
1290 /* SLockedDb& db = */ this->GetDb(i, j, eGetRead);
1291 }
1292 }
1293 m_AllProjAvail = true;
1294 this->Save(TDeMuxStore::eNoCompact); // quick dump no compression
1295 }
1296
1297 template<class TBV, class TObjDeMux, class TL>
1298 inline void
OpenDict()1299 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::OpenDict()
1300 {
1301 m_DictFile.reset(new TDeMuxStore);
1302 if (m_Env) {
1303 m_DictFile->SetEnv(*m_Env);
1304 }
1305 string dict_fname(m_StorageName);
1306 dict_fname.append(".splitd");
1307
1308 m_DictFile->Open(dict_fname.c_str(), m_OpenMode);
1309
1310 m_IdDeMux.reset(new TIdDeMux(2));
1311 }
1312
1313 template<class TBV, class TObjDeMux, class TL>
1314 inline void
LoadIdDeMux(TIdDeMux & de_mux,TDeMuxStore & dict_file)1315 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::LoadIdDeMux(TIdDeMux& de_mux,
1316 TDeMuxStore& dict_file)
1317 {
1318 CBDB_FileCursor cur(dict_file);
1319 cur.SetCondition(CBDB_FileCursor::eGE);
1320 cur.From << 0;
1321
1322 typename TDeMuxStore::TBuffer& buf = dict_file.GetBuffer();
1323 EBDB_ErrCode err;
1324 while (true) {
1325 err = dict_file.FetchToBuffer(cur);
1326 if (err != eBDB_Ok) {
1327 break;
1328 }
1329 unsigned dim = dict_file.dim;
1330 unsigned dim_idx = dict_file.dim_idx;
1331
1332 unique_ptr<TBitVector> bv(new TBitVector(bm::BM_GAP));
1333 dict_file.Deserialize(bv.get(), &buf[0]);
1334
1335 de_mux.SetProjection(dim, dim_idx, bv.release());
1336
1337 } // while
1338 }
1339
1340 template<class TBV, class TObjDeMux, class TL>
1341 inline void
Save(typename TDeMuxStore::ECompact compact_vectors)1342 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::Save(
1343 typename TDeMuxStore::ECompact compact_vectors)
1344 {
1345 if ( m_IdDeMux.get() && m_DictFile.get() ) {
1346 TLockGuard lg1(m_DictFileLock);
1347 CReadLockGuard lg2(m_IdDeMuxLock);
1348
1349 // use NULL transaction (autocommit)
1350 this->SaveIdDeMux(*m_IdDeMux, *m_DictFile, 0, compact_vectors);
1351 }
1352 }
1353
1354
1355 template<class TBV, class TObjDeMux, class TL>
1356 inline void
SaveIdDeMux(const TIdDeMux & de_mux,TDeMuxStore & dict_file,CBDB_Transaction * trans,typename TDeMuxStore::ECompact compact_vectors)1357 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SaveIdDeMux(
1358 const TIdDeMux& de_mux,
1359 TDeMuxStore& dict_file,
1360 CBDB_Transaction* trans,
1361 typename TDeMuxStore::ECompact compact_vectors)
1362 {
1363 dict_file.SetTransaction(trans);
1364
1365 size_t N = de_mux.GetN();
1366 for (size_t i = 0; i < N; ++i) {
1367 const typename TIdDeMux::TDimVector& dv = de_mux.GetDimVector(i);
1368
1369 for (size_t j = 0; j < dv.size(); ++j) {
1370 dict_file.dim = (Uint4) i;
1371 dict_file.dim_idx = (Uint4) j;
1372
1373 const TBitVector* bv = dv[j].get();
1374 if (!bv) {
1375 dict_file.Delete(CBDB_RawFile::eIgnoreError);
1376 } else {
1377 dict_file.WriteVector(*bv, compact_vectors);
1378 }
1379
1380 } // for j
1381 } // for i
1382 }
1383
1384 template<class TBV, class TObjDeMux, class TL>
1385 inline unsigned
GetPageSize(unsigned splice) const1386 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetPageSize(unsigned splice) const
1387 {
1388 if (splice < m_PageSizes.size())
1389 return m_PageSizes[splice];
1390 return 64 * 1024;
1391 }
1392
1393 template<class TBV, class TObjDeMux, class TL>
1394 inline string
MakeDbFileName(unsigned vol,unsigned slice)1395 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::MakeDbFileName(unsigned vol,
1396 unsigned slice)
1397 {
1398 string ext;
1399 switch (m_DB_Type)
1400 {
1401 case CBDB_RawFile::eBtree:
1402 ext = ".db";
1403 break;
1404 case CBDB_RawFile::eHash:
1405 ext = ".hdb";
1406 break;
1407 default:
1408 _ASSERT(0);
1409 } // switch
1410 return m_StorageName + "_" +
1411 NStr::UIntToString(vol) + "_" + NStr::UIntToString(slice) + ext;
1412 }
1413
1414 template<class TBV, class TObjDeMux, class TL>
InitDbMutex(SLockedDb * ldb)1415 inline void CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::InitDbMutex(SLockedDb* ldb)
1416 {
1417 if ((ldb->lock.get() == 0) || (ldb->lock_ro.get() == 0)) {
1418 TLockGuard lg(m_VolumesLock);
1419 if (ldb->lock.get() == 0) {
1420 ldb->lock.reset(new TLock);
1421 }
1422 if (ldb->lock_ro.get() == 0) {
1423 ldb->lock_ro.reset(new TLock);
1424 }
1425 }
1426 }
1427
1428
1429 template<class TBV, class TObjDeMux, class TL>
1430 inline typename CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::SLockedDb&
GetDb(unsigned vol,unsigned slice,EGetDB_Mode get_mode)1431 CBDB_BlobSplitStore<TBV, TObjDeMux, TL>::GetDb(unsigned vol,
1432 unsigned slice,
1433 EGetDB_Mode get_mode)
1434 {
1435 // speculative un-locked check if everything is open already
1436 // (we don't close or shrink the store in parallel, so it is safe)
1437
1438 SLockedDb* lp = 0;
1439
1440 // all databases are pre-open - no need to lock top level mutex
1441 //
1442 if (m_AllProjAvail) {
1443
1444 _ASSERT(m_Volumes.size() > vol);
1445 _ASSERT((m_Volumes[vol])->db_vect.size() > slice);
1446
1447 SVolume& volume = *(m_Volumes[vol]);
1448 lp = &*volume.db_vect[slice];
1449
1450 _ASSERT(lp->db.get());
1451 _ASSERT(lp->lock.get());
1452
1453 return *lp;
1454 }
1455
1456 {{
1457 TLockGuard lg(m_VolumesLock);
1458 if ((m_Volumes.size() > vol) &&
1459 ((m_Volumes[vol])->db_vect.size() > slice)) {
1460 SVolume& volume = *(m_Volumes[vol]);
1461 lp = &*volume.db_vect[slice];
1462 if (lp->db.get()) {
1463 return *lp;
1464 }
1465 }
1466 }}
1467
1468 { // lock protected open
1469 TLockGuard lg(m_VolumesLock);
1470 while (m_Volumes.size() < (vol+1)) {
1471 unique_ptr<SVolume> v(new SVolume);
1472 v->db_vect.resize(slice+1);
1473 for (size_t i = 0; i < v->db_vect.size(); ++i) {
1474 if ( !v->db_vect[i] ) {
1475 v->db_vect[i] = new SLockedDb;
1476 }
1477 } // for
1478 m_Volumes.push_back(v.release());
1479 }
1480
1481 SVolume& volume = *(m_Volumes[vol]);
1482 if (volume.db_vect.size() <= slice) {
1483 volume.db_vect.resize(slice+1);
1484 for (size_t i = 0; i < volume.db_vect.size(); ++i) {
1485 if ( !volume.db_vect[i] ) {
1486 volume.db_vect[i] = new SLockedDb;
1487 }
1488 } // for
1489
1490 }
1491 lp = &*volume.db_vect[slice];
1492 }
1493
1494 bool needs_save = false;
1495
1496 {{
1497 _ASSERT(lp);
1498
1499 InitDbMutex(lp);
1500 TLockGuard lg(*(lp->lock));
1501 if (lp->db.get() == 0) {
1502 string fname = this->MakeDbFileName(vol, slice);
1503 lp->db.reset(new TBlobFile(CBDB_File::eDuplicatesDisable,
1504 m_DB_Type));
1505 if (m_Env) {
1506 lp->db->SetEnv(*m_Env);
1507 lp->db_ro.reset(new TBlobFile(CBDB_File::eDuplicatesDisable,
1508 m_DB_Type));
1509 lp->db_ro->SetEnv(*m_Env);
1510 } else {
1511 if (m_VolumeCacheSize) {
1512 lp->db->SetCacheSize(m_VolumeCacheSize);
1513 }
1514 }
1515 unsigned page_size = GetPageSize(slice);
1516 if (page_size) {
1517 lp->db->SetPageSize(page_size);
1518 }
1519
1520 /// also twiddle min keys per page
1521 switch (slice) {
1522 case 0:
1523 /// page size = default
1524 /// blobs <= 256 bytes
1525 lp->db->SetBtreeMinKeysPerPage(6);
1526 break;
1527
1528 case 1:
1529 /// page size = default
1530 /// blobs > 256, <= 512 bytes
1531 lp->db->SetBtreeMinKeysPerPage(3);
1532 break;
1533
1534 default:
1535 /// use default = 2
1536 break;
1537 }
1538
1539 /// turn off reverse splitting if requested
1540 if (m_RevSplitOff) {
1541 lp->db->RevSplitOff();
1542 }
1543
1544 lp->db->Open(fname.c_str(), m_OpenMode);
1545 lp->db->SetCachePriority(m_CachePriority);
1546 if (lp->db_ro.get()) {
1547 lp->db_ro->Open(fname.c_str(), CBDB_RawFile::eReadOnly);
1548 lp->db_ro->SetCachePriority(m_CachePriority);
1549 }
1550 needs_save = true;
1551 }
1552 }}
1553
1554 if (needs_save &&
1555 (m_OpenMode == CBDB_RawFile::eReadWriteCreate ||
1556 m_OpenMode == CBDB_RawFile::eReadWrite) &&
1557 get_mode == eGetWrite) {
1558 // new split volume: checkpoint the changes
1559 this->Save(TDeMuxStore::eNoCompact); // quick dump no compression
1560 }
1561
1562 return *lp;
1563 }
1564
1565
1566 END_NCBI_SCOPE
1567
1568
1569 #endif
1570
1571