1 #ifndef ALGO_MERGE_HPP_
2 #define ALGO_MERGE_HPP_
3 
4 /* $Id: volume_merge.hpp 106619 2007-07-02 20:59:13Z grichenk $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author:  Mike DiCuccio, Anatoliy Kuznetsov
30  *
31  * File Description: Interfaces and algorithm to merge sorted volumes
32  *
33  */
34 
35 #include <corelib/ncbistd.hpp>
36 #include <corelib/ncbiexpt.hpp>
37 #include <corelib/ncbimtx.hpp>
38 #include <corelib/ncbimisc.hpp>
39 
40 #include <util/resource_pool.hpp>
41 #include <util/simple_buffer.hpp>
42 
43 #include <vector>
44 
45 
46 BEGIN_NCBI_SCOPE
47 
48 
49 class  CMerge_Exception;
50 struct IMergeVolumeWalker;
51 struct IMergeStore;
52 class  IMergeBlob;
53 
54 /// Main volume merge algorithm class
55 ///
56 class CMergeVolumes
57 {
58 public:
59     /// Raw BLOB buffer type used for raw information exchange between
60     /// merge algorithm interfaces
61     /// Raw buffers are allocated/deallocated using one central
62     /// resource pool
63     typedef CSimpleBuffer TRawBuffer;
64 
65     /// Memory manager for raw buffers to avoid unnecessary reallocations
66     /// and memory fragmentation
67     typedef CResourcePool<TRawBuffer, CFastMutex> TBufResourcePool;
68 
69     /// Buffer pool guard
70     typedef CResourcePoolGuard<TBufResourcePool>   TBufPoolGuard;
71 
72 public:
73     CMergeVolumes();
74     ~CMergeVolumes();
75 
76     /// Set merge accumulator component
77     void SetMergeAccumulator(IMergeBlob*  merger,
78                              EOwnership   own=eTakeOwnership);
79 
80     /// Set merge volumes
81     void SetVolumes(const vector<IMergeVolumeWalker*>& vol_vector,
82                     EOwnership                         own=eTakeOwnership);
83 
84     /// Set merge store (destination)
85     void SetMergeStore(IMergeStore*  store,
86                        EOwnership   own=eTakeOwnership);
87 
88     /// Get BLOB buffer pool
GetBufResourcePool()89     TBufResourcePool& GetBufResourcePool() { return m_BufResourcePool; }
90 
91     /// Execute merge process (main merge loop)
92     void Run();
93 private:
94     /// check if volume key is new minimum or equals to old minimum
95     void x_EvaluateMinKey(unsigned new_key, size_t volume_idx);
96     /// Reset min. evaluation
97     void x_ResetMinKey();
98     /// Merge all discovered min key candidates
99     void x_MergeCandidates();
100     void x_MergeVolume(IMergeVolumeWalker* volume);
101     void x_StoreMerger();
102 
103 private:
104     TBufResourcePool            m_BufResourcePool;
105     AutoPtr<IMergeBlob>         m_Merger;
106     vector<IMergeVolumeWalker*> m_VolumeVect;
107     EOwnership                  m_OwnVolumeVect;
108     AutoPtr<IMergeStore>        m_Store;
109 
110     unsigned                    m_MergeKey;         ///< key in the merger
111     unsigned                    m_MinKey;           ///< min key value
112     vector<size_t>              m_MinKeyCandidates; ///< min-key volumes
113 };
114 
115 
116 /// Base Merge algorithms exception class
117 ///
118 class CMerge_Exception : EXCEPTION_VIRTUAL_BASE public CException
119 {
120 public:
121     enum EErrCode {
122         eOperationNotReady,
123         eUnsupportedKeyType,
124         eInterfaceNotReady,
125         eStoreFailure,
126         eInputVolumeFailure
127     };
GetErrCodeString(void) const128     virtual const char* GetErrCodeString(void) const
129     {
130         switch (GetErrCode())
131         {
132         case eOperationNotReady:    return "eOperationNotReady";
133         case eUnsupportedKeyType:   return "eUnsupportedKeyType";
134         case eInterfaceNotReady:    return "eInterfaceNotReady";
135         case eStoreFailure:         return "eStoreFailure";
136         case eInputVolumeFailure:   return "eInputVolumeFailure";
137         default: return CException::GetErrCodeString();
138         }
139     }
140 
141     NCBI_EXCEPTION_DEFAULT(CMerge_Exception, CException);
142 };
143 
144 /**
145     Interface defines async. processing primitives
146 */
147 struct IAsyncInterface
148 {
~IAsyncInterfaceIAsyncInterface149     virtual ~IAsyncInterface() {}
150 
151     /// Async. device status, describes if interface is in Ok
152     /// condition to continue processing
153     /// Some interface implementations can support async. execution,
154     /// and stay "not ready" until background logic finishes
155     /// (successfully or not)
156     enum EStatus
157     {
158         eReady = 0,   ///< Volume is ready
159         eFailed,      ///< Last operation failed and interface cannot recover
160         eNoMoreData,  ///< Interface reached the end (EOF)
161         eNotReady     ///< Last request did not finish yet
162     };
163 
164     /// Get current interface async. status
165     virtual EStatus GetStatus() const = 0;
166 
167     /// Wait until interface is ready (or operation fails)
168     /// (On failure volume is free to throw an exception)
169     virtual EStatus WaitReady() const = 0;
170 };
171 
172 
173 /**
174     Interface to traverse volume for merge.
175 
176     Volume provides access to key->BLOB pairs, and guarantees
177     that keys are coming in an ascending sorted order.
178     Volume can support async. execution using threads or remote system
179     to facilitate parallel execution
180 */
181 struct IMergeVolumeWalker
182 {
~IMergeVolumeWalkerIMergeVolumeWalker183     virtual ~IMergeVolumeWalker() {}
184 
185     /// Get pointer to async. suport.
186     /// Returns NULL if this interface is fully syncronous.
187     virtual IAsyncInterface* QueryIAsync() = 0;
188 
189     /// Return TRUE when volume traverse reaches the end
190     virtual bool IsEof() const = 0;
191 
192     /// Return TRUE if volume is in good condition (not failed)
193     virtual bool IsGood() const = 0;
194 
195     /// Request to start fetching data
196     /// This request can be asyncronous
197     ///   caller needs to check status using IAsyncInterface
198     ///   to make sure Fetch finished
199     virtual void FetchFirst() = 0;
200 
201     /// Request to get next record
202     /// This request can be asyncronous
203     ///   caller needs to check status using IAsyncInterface
204     ///   to make sure Fetch finished
205     virtual void Fetch() = 0;
206 
207     /// Get low level access to the current key buffer
208     virtual const unsigned char* GetKeyPtr() const = 0;
209 
210     /// Get access to the key as unsigned integer
211     /// (if this type is supported)
212     virtual Uint4 GetUint4Key() const = 0;
213 
214     /// Get low level access to the merge BLOB buffer and buffer size
215     /// (next Fetch call invalidates this pointer)
216     virtual const unsigned char* GetBufferPtr(size_t* buf_size) const = 0;
217 
218     /// Close volume (when it ends)
219     /// Method is responsible for finalization of merge procedure
220     /// (it could be deletion of records, compaction of data files, etc),
221     /// stopping background threads, closing server connections
222     virtual void Close() = 0;
223 
224     /// Signals that current record moved to merged storage
225     /// (volume manager may decide to delete it later)
226     /// Volume manager should NOT fetch next record on this call
227     virtual void SetRecordMoved() = 0;
228 };
229 
230 
231 /**
232     BLOB merge interface, merges one or more BLOBs together
233 
234     Implementation of this interface should be statefull so
235     consequtive merges accumulate in the internal buffer,
236     then flush called to get the final merged BLOB to storage
237 
238     Implementation notice:
239     When receiving first Merge call implementation should not immediately
240     unpack the buffer, because the next call may ask to discard the merge, or
241     get the BLOB back, in this case decompression will be a waste CPU.
242 */
243 class IMergeBlob
244 {
245 public:
IMergeBlob()246     IMergeBlob()
247         : m_BufResourcePool(0)
248     {}
249 
~IMergeBlob()250     virtual ~IMergeBlob() {}
251 
252     /// Set resource pool for BLOB buffer memory management
SetResourcePool(CMergeVolumes::TBufResourcePool & res_pool)253     void SetResourcePool(CMergeVolumes::TBufResourcePool& res_pool)
254     {
255         m_BufResourcePool = &res_pool;
256     }
257 
258     /// Merge request
259     /// Implementation MUST return the buffer to the pool
260     virtual void Merge(CMergeVolumes::TRawBuffer* buffer) = 0;
261 
262     /// Returns destination (merged) buffer
263     /// Caller MUST return the buffer to the buffer pool
264     ///
265     /// @return
266     ///    NULL if nothing to merge
267     virtual CMergeVolumes::TRawBuffer* GetMergeBuffer() = 0;
268 
269     /// Reset merging, forget all the accumulated buffers
270     virtual void Reset() = 0;
271 
272 protected:
273     CMergeVolumes::TBufResourcePool* m_BufResourcePool;
274 };
275 
276 /**
277     Interface to store merged BLOBs
278 */
279 struct IMergeStore
280 {
~IMergeStoreIMergeStore281     virtual ~IMergeStore() {}
282 
283     /// Get pointer to async. suport.
284     /// Returns NULL if this interface is fully syncronous.
285     virtual IAsyncInterface* QueryIAsync() = 0;
286 
287     /// Set resource pool for BLOB buffer memory management
SetResourcePoolIMergeStore288     void SetResourcePool(CMergeVolumes::TBufResourcePool& res_pool)
289     {
290         m_BufResourcePool = &res_pool;
291     }
292 
293     /// Return TRUE if storage device is in good shape
294     virtual bool IsGood() const = 0;
295 
296     /// Store BLOB request
297     /// This request can be asyncronous
298     ///   caller needs to check status using IAsyncInterface
299     ///   to make sure Fetch finished
300     /// Method implementation MUST return storage buffer to the resource pool
301     virtual void Store(Uint4 blob_id, CMergeVolumes::TRawBuffer* buffer) = 0;
302 
303     /// Read buffer with the specified blob_id
304     /// This method is for store update, when we are merging into an
305     /// existing store. If method returns a non NULL value, the existing
306     /// buffer is merged with the coming buffer
307     /// Method implementation MUST return storage buffer to the resource pool
308     ///
309     virtual CMergeVolumes::TRawBuffer* ReadBlob(Uint4 blob_id) = 0;
310 
311     /// Close storage (when it ends)
312     /// Method is responsible for finalization of store procedure,
313     /// stopping background threads, closing server connections
314     virtual void Close() = 0;
315 
316 protected:
317     CMergeVolumes::TBufResourcePool* m_BufResourcePool;
318 };
319 
320 END_NCBI_SCOPE
321 
322 #endif /* ALGO_MERGE_HPP */
323