1 #ifndef OBJTOOLS_DATA_LOADERS_PSG___PSG_LOADER_IMPL__HPP 2 #define OBJTOOLS_DATA_LOADERS_PSG___PSG_LOADER_IMPL__HPP 3 4 /* $Id: psg_loader_impl.hpp 637407 2021-09-13 11:50:12Z ivanov $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Author: Eugene Vasilchenko, Aleksey Grichenko 30 * 31 * File Description: PSG data loader 32 * 33 * =========================================================================== 34 */ 35 36 #include <corelib/ncbistd.hpp> 37 #include <objtools/data_loaders/genbank/psg_loader.hpp> 38 #include <objtools/pubseq_gateway/client/psg_client.hpp> 39 #include <objtools/data_loaders/genbank/blob_id.hpp> 40 #include <memory> 41 #include <vector> 42 43 #if defined(HAVE_PSG_LOADER) 44 45 BEGIN_NCBI_SCOPE 46 47 class CThreadPool; 48 49 BEGIN_SCOPE(objects) 50 51 struct SPsgBioseqInfo 52 { 53 SPsgBioseqInfo(const CPSG_BioseqInfo& bioseq_info); 54 55 typedef CPSG_Request_Resolve::TIncludeInfo TIncludedInfo; 56 typedef vector<CSeq_id_Handle> TIds; 57 TIncludedInfo included_info; 58 CSeq_inst::TMol molecule_type; 59 Uint8 length; 60 CPSG_BioseqInfo::TState state; 61 TTaxId tax_id; 62 int hash; 63 TGi gi; 64 CSeq_id_Handle canonical; 65 TIds ids; 66 string blob_id; 67 68 CDeadline deadline; 69 70 TIncludedInfo Update(const CPSG_BioseqInfo& bioseq_info); 71 72 private: 73 SPsgBioseqInfo(const SPsgBioseqInfo&); 74 SPsgBioseqInfo& operator=(const SPsgBioseqInfo&); 75 }; 76 77 78 struct SPsgBlobInfo 79 { 80 SPsgBlobInfo(const CPSG_BlobInfo& blob_info); 81 82 string blob_id_main; 83 string id2_info; 84 int blob_state; 85 Int8 last_modified; 86 GetBlobVersionSPsgBlobInfo87 int GetBlobVersion() const { return int(last_modified/60000); /* ms to minutes */ } 88 IsSplitSPsgBlobInfo89 bool IsSplit() const { return !id2_info.empty(); } 90 91 private: 92 SPsgBlobInfo(const SPsgBlobInfo&); 93 SPsgBlobInfo& operator=(const SPsgBlobInfo&); 94 }; 95 96 97 class CPsgClientThread; 98 class CPSGBioseqCache; 99 class CPSGBlobMap; 100 class CPsgClientContext_Bulk; 101 class CPSG_Blob_Task; 102 103 104 class CPSGDataLoader_Impl : public CObject 105 { 106 public: 107 explicit CPSGDataLoader_Impl(const CGBLoaderParams& params); 108 ~CPSGDataLoader_Impl(void); 109 110 typedef CDataLoader::TIds TIds; 111 typedef CDataLoader::TGis TGis; 112 typedef CDataLoader::TLoaded TLoaded; 113 typedef CDataLoader::TTSE_LockSets TTSE_LockSets; 114 115 void GetIds(const CSeq_id_Handle& idh, TIds& ids); 116 void GetIdsOnce(const CSeq_id_Handle& idh, TIds& ids); 117 CDataLoader::SGiFound GetGi(const CSeq_id_Handle& idh); 118 CDataLoader::SGiFound GetGiOnce(const CSeq_id_Handle& idh); 119 CDataLoader::SAccVerFound GetAccVer(const CSeq_id_Handle& idh); 120 CDataLoader::SAccVerFound GetAccVerOnce(const CSeq_id_Handle& idh); 121 TTaxId GetTaxId(const CSeq_id_Handle& idh); 122 TTaxId GetTaxIdOnce(const CSeq_id_Handle& idh); 123 TSeqPos GetSequenceLength(const CSeq_id_Handle& idh); 124 TSeqPos GetSequenceLengthOnce(const CSeq_id_Handle& idh); 125 CDataLoader::SHashFound GetSequenceHash(const CSeq_id_Handle& idh); 126 CDataLoader::SHashFound GetSequenceHashOnce(const CSeq_id_Handle& idh); 127 CDataLoader::STypeFound GetSequenceType(const CSeq_id_Handle& idh); 128 CDataLoader::STypeFound GetSequenceTypeOnce(const CSeq_id_Handle& idh); 129 int GetSequenceState(const CSeq_id_Handle& idh); 130 int GetSequenceStateOnce(const CSeq_id_Handle& idh); 131 132 CDataLoader::TTSE_LockSet GetRecords(CDataSource* data_source, 133 const CSeq_id_Handle& idh, 134 CDataLoader::EChoice choice); 135 CDataLoader::TTSE_LockSet GetRecordsOnce(CDataSource* data_source, 136 const CSeq_id_Handle& idh, 137 CDataLoader::EChoice choice); 138 CRef<CPsgBlobId> GetBlobId(const CSeq_id_Handle& idh); 139 CRef<CPsgBlobId> GetBlobIdOnce(const CSeq_id_Handle& idh); 140 CTSE_Lock GetBlobById(CDataSource* data_source, 141 const CPsgBlobId& blob_id); 142 CTSE_Lock GetBlobByIdOnce(CDataSource* data_source, 143 const CPsgBlobId& blob_id); 144 void LoadChunk(CDataSource* data_source, 145 CTSE_Chunk_Info& chunk_info); 146 void LoadChunks(CDataSource* data_source, 147 const CDataLoader::TChunkSet& chunks); 148 void LoadChunksOnce(CDataSource* data_source, 149 const CDataLoader::TChunkSet& chunks); 150 151 void GetBlobs(CDataSource* data_source, TTSE_LockSets& tse_sets); 152 void GetBlobsOnce(CDataSource* data_source, TTSE_LockSets& tse_sets); 153 154 CDataLoader::TTSE_LockSet GetAnnotRecordsNA(CDataSource* data_source, 155 const CSeq_id_Handle& idh, 156 const SAnnotSelector* sel, 157 CDataLoader::TProcessedNAs* processed_nas); 158 CDataLoader::TTSE_LockSet GetAnnotRecordsNAOnce(CDataSource* data_source, 159 const CSeq_id_Handle& idh, 160 const SAnnotSelector* sel, 161 CDataLoader::TProcessedNAs* processed_nas); 162 163 void DropTSE(const CPsgBlobId& blob_id); 164 165 void GetAccVers(const TIds& ids, TLoaded& loaded, TIds& ret); 166 void GetAccVersOnce(const TIds& ids, TLoaded& loaded, TIds& ret); 167 void GetGis(const TIds& ids, TLoaded& loaded, TGis& ret); 168 void GetGisOnce(const TIds& ids, TLoaded& loaded, TGis& ret); 169 170 static CObjectIStream* GetBlobDataStream(const CPSG_BlobInfo& blob_info, const CPSG_BlobData& blob_data); 171 172 typedef vector<shared_ptr<SPsgBioseqInfo>> TBioseqInfos; 173 174 struct SReplyResult { 175 CTSE_Lock lock; 176 string blob_id; 177 }; 178 179 static void NCBI_XLOADER_GENBANK_EXPORT SetGetBlobByIdShouldFail(bool value); 180 static bool NCBI_XLOADER_GENBANK_EXPORT GetGetBlobByIdShouldFail(); 181 182 template<class Call> 183 typename std::result_of<Call()>::type 184 CallWithRetry(Call&& call, 185 const char* name, 186 int retry_count = 0); 187 188 private: 189 friend class CPSG_Blob_Task; 190 191 void x_SendRequest(shared_ptr<CPSG_Request> request); 192 CPSG_BioId x_GetBioId(const CSeq_id_Handle& idh); 193 shared_ptr<CPSG_Reply> x_ProcessRequest(shared_ptr<CPSG_Request> request); 194 SReplyResult x_ProcessBlobReply(shared_ptr<CPSG_Reply> reply, CDataSource* data_source, CSeq_id_Handle req_idh, bool retry, bool lock_asap = false, CTSE_LoadLock* load_lock = nullptr); 195 SReplyResult x_RetryBlobRequest(const string& blob_id, CDataSource* data_source, CSeq_id_Handle req_idh); 196 shared_ptr<SPsgBioseqInfo> x_GetBioseqInfo(const CSeq_id_Handle& idh); 197 CTSE_Lock x_LoadBlob(const SPsgBlobInfo& psg_blob_info, CDataSource& data_source); 198 199 enum EMainChunkType { 200 eNoDelayedMainChunk, 201 eDelayedMainChunk 202 }; 203 enum ESplitInfoType { 204 eNoSplitInfo, 205 eIsSplitInfo 206 }; 207 // caller of x_ReadBlobData() should call SetLoaded(); 208 void x_ReadBlobData( 209 const SPsgBlobInfo& psg_blob_info, 210 const CPSG_BlobInfo& blob_info, 211 const CPSG_BlobData& blob_data, 212 CTSE_LoadLock& load_lock, 213 ESplitInfoType split_info_type); 214 void x_SetLoaded(CTSE_LoadLock& load_lock, EMainChunkType main_chunk_type); 215 216 typedef map<void*, size_t> TIdxMap; 217 218 // returns pair(number of loaded infos, number of failed infos) 219 pair<size_t, size_t> x_GetBulkBioseqInfo(CPSG_Request_Resolve::EIncludeInfo info, 220 const TIds& ids, 221 const TLoaded& loaded, 222 TBioseqInfos& ret); 223 224 shared_ptr<CPSG_Request_Blob> 225 x_MakeLoadLocalCDDEntryRequest(CDataSource* data_source, 226 CDataLoader::TChunk chunk, 227 shared_ptr<CPsgClientContext_Bulk> context); 228 bool x_ReadCDDChunk(CDataSource* data_source, 229 CDataLoader::TChunk chunk, 230 const CPSG_BlobInfo& blob_info, 231 const CPSG_BlobData& blob_data); 232 233 // Map seq-id to bioseq info. 234 typedef map<CSeq_id_Handle, shared_ptr<SPsgBioseqInfo> > TBioseqCache; 235 236 CPSG_Request_Biodata::EIncludeData m_TSERequestMode = CPSG_Request_Biodata::eSmartTSE; 237 CPSG_Request_Biodata::EIncludeData m_TSERequestModeBulk = CPSG_Request_Biodata::eWholeTSE; 238 bool m_AddWGSMasterDescr = true; 239 shared_ptr<CPSG_Queue> m_Queue; 240 CRef<CPsgClientThread> m_Thread; 241 unique_ptr<CPSGBlobMap> m_BlobMap; 242 unique_ptr<CPSGBioseqCache> m_BioseqCache; 243 unique_ptr<CThreadPool> m_ThreadPool; 244 }; 245 246 END_SCOPE(objects) 247 END_NCBI_SCOPE 248 249 #endif // HAVE_PSG_LOADER 250 251 #endif // OBJTOOLS_DATA_LOADERS_PSG___PSG_LOADER_IMPL__HPP 252