1 #ifndef ALGO_MERGE_HPP_ 2 #define ALGO_MERGE_HPP_ 3 4 /* $Id: volume_merge.hpp 106619 2007-07-02 20:59:13Z grichenk $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Author: Mike DiCuccio, Anatoliy Kuznetsov 30 * 31 * File Description: Interfaces and algorithm to merge sorted volumes 32 * 33 */ 34 35 #include <corelib/ncbistd.hpp> 36 #include <corelib/ncbiexpt.hpp> 37 #include <corelib/ncbimtx.hpp> 38 #include <corelib/ncbimisc.hpp> 39 40 #include <util/resource_pool.hpp> 41 #include <util/simple_buffer.hpp> 42 43 #include <vector> 44 45 46 BEGIN_NCBI_SCOPE 47 48 49 class CMerge_Exception; 50 struct IMergeVolumeWalker; 51 struct IMergeStore; 52 class IMergeBlob; 53 54 /// Main volume merge algorithm class 55 /// 56 class CMergeVolumes 57 { 58 public: 59 /// Raw BLOB buffer type used for raw information exchange between 60 /// merge algorithm interfaces 61 /// Raw buffers are allocated/deallocated using one central 62 /// resource pool 63 typedef CSimpleBuffer TRawBuffer; 64 65 /// Memory manager for raw buffers to avoid unnecessary reallocations 66 /// and memory fragmentation 67 typedef CResourcePool<TRawBuffer, CFastMutex> TBufResourcePool; 68 69 /// Buffer pool guard 70 typedef CResourcePoolGuard<TBufResourcePool> TBufPoolGuard; 71 72 public: 73 CMergeVolumes(); 74 ~CMergeVolumes(); 75 76 /// Set merge accumulator component 77 void SetMergeAccumulator(IMergeBlob* merger, 78 EOwnership own=eTakeOwnership); 79 80 /// Set merge volumes 81 void SetVolumes(const vector<IMergeVolumeWalker*>& vol_vector, 82 EOwnership own=eTakeOwnership); 83 84 /// Set merge store (destination) 85 void SetMergeStore(IMergeStore* store, 86 EOwnership own=eTakeOwnership); 87 88 /// Get BLOB buffer pool GetBufResourcePool()89 TBufResourcePool& GetBufResourcePool() { return m_BufResourcePool; } 90 91 /// Execute merge process (main merge loop) 92 void Run(); 93 private: 94 /// check if volume key is new minimum or equals to old minimum 95 void x_EvaluateMinKey(unsigned new_key, size_t volume_idx); 96 /// Reset min. evaluation 97 void x_ResetMinKey(); 98 /// Merge all discovered min key candidates 99 void x_MergeCandidates(); 100 void x_MergeVolume(IMergeVolumeWalker* volume); 101 void x_StoreMerger(); 102 103 private: 104 TBufResourcePool m_BufResourcePool; 105 AutoPtr<IMergeBlob> m_Merger; 106 vector<IMergeVolumeWalker*> m_VolumeVect; 107 EOwnership m_OwnVolumeVect; 108 AutoPtr<IMergeStore> m_Store; 109 110 unsigned m_MergeKey; ///< key in the merger 111 unsigned m_MinKey; ///< min key value 112 vector<size_t> m_MinKeyCandidates; ///< min-key volumes 113 }; 114 115 116 /// Base Merge algorithms exception class 117 /// 118 class CMerge_Exception : EXCEPTION_VIRTUAL_BASE public CException 119 { 120 public: 121 enum EErrCode { 122 eOperationNotReady, 123 eUnsupportedKeyType, 124 eInterfaceNotReady, 125 eStoreFailure, 126 eInputVolumeFailure 127 }; GetErrCodeString(void) const128 virtual const char* GetErrCodeString(void) const 129 { 130 switch (GetErrCode()) 131 { 132 case eOperationNotReady: return "eOperationNotReady"; 133 case eUnsupportedKeyType: return "eUnsupportedKeyType"; 134 case eInterfaceNotReady: return "eInterfaceNotReady"; 135 case eStoreFailure: return "eStoreFailure"; 136 case eInputVolumeFailure: return "eInputVolumeFailure"; 137 default: return CException::GetErrCodeString(); 138 } 139 } 140 141 NCBI_EXCEPTION_DEFAULT(CMerge_Exception, CException); 142 }; 143 144 /** 145 Interface defines async. processing primitives 146 */ 147 struct IAsyncInterface 148 { ~IAsyncInterfaceIAsyncInterface149 virtual ~IAsyncInterface() {} 150 151 /// Async. device status, describes if interface is in Ok 152 /// condition to continue processing 153 /// Some interface implementations can support async. execution, 154 /// and stay "not ready" until background logic finishes 155 /// (successfully or not) 156 enum EStatus 157 { 158 eReady = 0, ///< Volume is ready 159 eFailed, ///< Last operation failed and interface cannot recover 160 eNoMoreData, ///< Interface reached the end (EOF) 161 eNotReady ///< Last request did not finish yet 162 }; 163 164 /// Get current interface async. status 165 virtual EStatus GetStatus() const = 0; 166 167 /// Wait until interface is ready (or operation fails) 168 /// (On failure volume is free to throw an exception) 169 virtual EStatus WaitReady() const = 0; 170 }; 171 172 173 /** 174 Interface to traverse volume for merge. 175 176 Volume provides access to key->BLOB pairs, and guarantees 177 that keys are coming in an ascending sorted order. 178 Volume can support async. execution using threads or remote system 179 to facilitate parallel execution 180 */ 181 struct IMergeVolumeWalker 182 { ~IMergeVolumeWalkerIMergeVolumeWalker183 virtual ~IMergeVolumeWalker() {} 184 185 /// Get pointer to async. suport. 186 /// Returns NULL if this interface is fully syncronous. 187 virtual IAsyncInterface* QueryIAsync() = 0; 188 189 /// Return TRUE when volume traverse reaches the end 190 virtual bool IsEof() const = 0; 191 192 /// Return TRUE if volume is in good condition (not failed) 193 virtual bool IsGood() const = 0; 194 195 /// Request to start fetching data 196 /// This request can be asyncronous 197 /// caller needs to check status using IAsyncInterface 198 /// to make sure Fetch finished 199 virtual void FetchFirst() = 0; 200 201 /// Request to get next record 202 /// This request can be asyncronous 203 /// caller needs to check status using IAsyncInterface 204 /// to make sure Fetch finished 205 virtual void Fetch() = 0; 206 207 /// Get low level access to the current key buffer 208 virtual const unsigned char* GetKeyPtr() const = 0; 209 210 /// Get access to the key as unsigned integer 211 /// (if this type is supported) 212 virtual Uint4 GetUint4Key() const = 0; 213 214 /// Get low level access to the merge BLOB buffer and buffer size 215 /// (next Fetch call invalidates this pointer) 216 virtual const unsigned char* GetBufferPtr(size_t* buf_size) const = 0; 217 218 /// Close volume (when it ends) 219 /// Method is responsible for finalization of merge procedure 220 /// (it could be deletion of records, compaction of data files, etc), 221 /// stopping background threads, closing server connections 222 virtual void Close() = 0; 223 224 /// Signals that current record moved to merged storage 225 /// (volume manager may decide to delete it later) 226 /// Volume manager should NOT fetch next record on this call 227 virtual void SetRecordMoved() = 0; 228 }; 229 230 231 /** 232 BLOB merge interface, merges one or more BLOBs together 233 234 Implementation of this interface should be statefull so 235 consequtive merges accumulate in the internal buffer, 236 then flush called to get the final merged BLOB to storage 237 238 Implementation notice: 239 When receiving first Merge call implementation should not immediately 240 unpack the buffer, because the next call may ask to discard the merge, or 241 get the BLOB back, in this case decompression will be a waste CPU. 242 */ 243 class IMergeBlob 244 { 245 public: IMergeBlob()246 IMergeBlob() 247 : m_BufResourcePool(0) 248 {} 249 ~IMergeBlob()250 virtual ~IMergeBlob() {} 251 252 /// Set resource pool for BLOB buffer memory management SetResourcePool(CMergeVolumes::TBufResourcePool & res_pool)253 void SetResourcePool(CMergeVolumes::TBufResourcePool& res_pool) 254 { 255 m_BufResourcePool = &res_pool; 256 } 257 258 /// Merge request 259 /// Implementation MUST return the buffer to the pool 260 virtual void Merge(CMergeVolumes::TRawBuffer* buffer) = 0; 261 262 /// Returns destination (merged) buffer 263 /// Caller MUST return the buffer to the buffer pool 264 /// 265 /// @return 266 /// NULL if nothing to merge 267 virtual CMergeVolumes::TRawBuffer* GetMergeBuffer() = 0; 268 269 /// Reset merging, forget all the accumulated buffers 270 virtual void Reset() = 0; 271 272 protected: 273 CMergeVolumes::TBufResourcePool* m_BufResourcePool; 274 }; 275 276 /** 277 Interface to store merged BLOBs 278 */ 279 struct IMergeStore 280 { ~IMergeStoreIMergeStore281 virtual ~IMergeStore() {} 282 283 /// Get pointer to async. suport. 284 /// Returns NULL if this interface is fully syncronous. 285 virtual IAsyncInterface* QueryIAsync() = 0; 286 287 /// Set resource pool for BLOB buffer memory management SetResourcePoolIMergeStore288 void SetResourcePool(CMergeVolumes::TBufResourcePool& res_pool) 289 { 290 m_BufResourcePool = &res_pool; 291 } 292 293 /// Return TRUE if storage device is in good shape 294 virtual bool IsGood() const = 0; 295 296 /// Store BLOB request 297 /// This request can be asyncronous 298 /// caller needs to check status using IAsyncInterface 299 /// to make sure Fetch finished 300 /// Method implementation MUST return storage buffer to the resource pool 301 virtual void Store(Uint4 blob_id, CMergeVolumes::TRawBuffer* buffer) = 0; 302 303 /// Read buffer with the specified blob_id 304 /// This method is for store update, when we are merging into an 305 /// existing store. If method returns a non NULL value, the existing 306 /// buffer is merged with the coming buffer 307 /// Method implementation MUST return storage buffer to the resource pool 308 /// 309 virtual CMergeVolumes::TRawBuffer* ReadBlob(Uint4 blob_id) = 0; 310 311 /// Close storage (when it ends) 312 /// Method is responsible for finalization of store procedure, 313 /// stopping background threads, closing server connections 314 virtual void Close() = 0; 315 316 protected: 317 CMergeVolumes::TBufResourcePool* m_BufResourcePool; 318 }; 319 320 END_NCBI_SCOPE 321 322 #endif /* ALGO_MERGE_HPP */ 323