1 #ifndef OBJTOOLS_DATA_LOADERS_PSG___PSG_LOADER_IMPL__HPP
2 #define OBJTOOLS_DATA_LOADERS_PSG___PSG_LOADER_IMPL__HPP
3 
4 /*  $Id: psg_loader_impl.hpp 637407 2021-09-13 11:50:12Z ivanov $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author: Eugene Vasilchenko, Aleksey Grichenko
30  *
31  * File Description: PSG data loader
32  *
33  * ===========================================================================
34  */
35 
36 #include <corelib/ncbistd.hpp>
37 #include <objtools/data_loaders/genbank/psg_loader.hpp>
38 #include <objtools/pubseq_gateway/client/psg_client.hpp>
39 #include <objtools/data_loaders/genbank/blob_id.hpp>
40 #include <memory>
41 #include <vector>
42 
43 #if defined(HAVE_PSG_LOADER)
44 
45 BEGIN_NCBI_SCOPE
46 
47 class CThreadPool;
48 
49 BEGIN_SCOPE(objects)
50 
51 struct SPsgBioseqInfo
52 {
53     SPsgBioseqInfo(const CPSG_BioseqInfo& bioseq_info);
54 
55     typedef CPSG_Request_Resolve::TIncludeInfo TIncludedInfo;
56     typedef vector<CSeq_id_Handle> TIds;
57     TIncludedInfo included_info;
58     CSeq_inst::TMol molecule_type;
59     Uint8 length;
60     CPSG_BioseqInfo::TState state;
61     TTaxId tax_id;
62     int hash;
63     TGi gi;
64     CSeq_id_Handle canonical;
65     TIds ids;
66     string blob_id;
67 
68     CDeadline deadline;
69 
70     TIncludedInfo Update(const CPSG_BioseqInfo& bioseq_info);
71 
72 private:
73     SPsgBioseqInfo(const SPsgBioseqInfo&);
74     SPsgBioseqInfo& operator=(const SPsgBioseqInfo&);
75 };
76 
77 
78 struct SPsgBlobInfo
79 {
80     SPsgBlobInfo(const CPSG_BlobInfo& blob_info);
81 
82     string blob_id_main;
83     string id2_info;
84     int blob_state;
85     Int8 last_modified;
86 
GetBlobVersionSPsgBlobInfo87     int GetBlobVersion() const { return int(last_modified/60000); /* ms to minutes */ }
88 
IsSplitSPsgBlobInfo89     bool IsSplit() const { return !id2_info.empty(); }
90 
91 private:
92     SPsgBlobInfo(const SPsgBlobInfo&);
93     SPsgBlobInfo& operator=(const SPsgBlobInfo&);
94 };
95 
96 
97 class CPsgClientThread;
98 class CPSGBioseqCache;
99 class CPSGBlobMap;
100 class CPsgClientContext_Bulk;
101 class CPSG_Blob_Task;
102 
103 
104 class CPSGDataLoader_Impl : public CObject
105 {
106 public:
107     explicit CPSGDataLoader_Impl(const CGBLoaderParams& params);
108     ~CPSGDataLoader_Impl(void);
109 
110     typedef CDataLoader::TIds TIds;
111     typedef CDataLoader::TGis TGis;
112     typedef CDataLoader::TLoaded TLoaded;
113     typedef CDataLoader::TTSE_LockSets TTSE_LockSets;
114 
115     void GetIds(const CSeq_id_Handle& idh, TIds& ids);
116     void GetIdsOnce(const CSeq_id_Handle& idh, TIds& ids);
117     CDataLoader::SGiFound GetGi(const CSeq_id_Handle& idh);
118     CDataLoader::SGiFound GetGiOnce(const CSeq_id_Handle& idh);
119     CDataLoader::SAccVerFound GetAccVer(const CSeq_id_Handle& idh);
120     CDataLoader::SAccVerFound GetAccVerOnce(const CSeq_id_Handle& idh);
121     TTaxId GetTaxId(const CSeq_id_Handle& idh);
122     TTaxId GetTaxIdOnce(const CSeq_id_Handle& idh);
123     TSeqPos GetSequenceLength(const CSeq_id_Handle& idh);
124     TSeqPos GetSequenceLengthOnce(const CSeq_id_Handle& idh);
125     CDataLoader::SHashFound GetSequenceHash(const CSeq_id_Handle& idh);
126     CDataLoader::SHashFound GetSequenceHashOnce(const CSeq_id_Handle& idh);
127     CDataLoader::STypeFound GetSequenceType(const CSeq_id_Handle& idh);
128     CDataLoader::STypeFound GetSequenceTypeOnce(const CSeq_id_Handle& idh);
129     int GetSequenceState(const CSeq_id_Handle& idh);
130     int GetSequenceStateOnce(const CSeq_id_Handle& idh);
131 
132     CDataLoader::TTSE_LockSet GetRecords(CDataSource* data_source,
133                                          const CSeq_id_Handle& idh,
134                                          CDataLoader::EChoice choice);
135     CDataLoader::TTSE_LockSet GetRecordsOnce(CDataSource* data_source,
136                                          const CSeq_id_Handle& idh,
137                                          CDataLoader::EChoice choice);
138     CRef<CPsgBlobId> GetBlobId(const CSeq_id_Handle& idh);
139     CRef<CPsgBlobId> GetBlobIdOnce(const CSeq_id_Handle& idh);
140     CTSE_Lock GetBlobById(CDataSource* data_source,
141                               const CPsgBlobId& blob_id);
142     CTSE_Lock GetBlobByIdOnce(CDataSource* data_source,
143                               const CPsgBlobId& blob_id);
144     void LoadChunk(CDataSource* data_source,
145                    CTSE_Chunk_Info& chunk_info);
146     void LoadChunks(CDataSource* data_source,
147                     const CDataLoader::TChunkSet& chunks);
148     void LoadChunksOnce(CDataSource* data_source,
149                     const CDataLoader::TChunkSet& chunks);
150 
151     void GetBlobs(CDataSource* data_source, TTSE_LockSets& tse_sets);
152     void GetBlobsOnce(CDataSource* data_source, TTSE_LockSets& tse_sets);
153 
154     CDataLoader::TTSE_LockSet GetAnnotRecordsNA(CDataSource* data_source,
155                                                 const CSeq_id_Handle& idh,
156                                                 const SAnnotSelector* sel,
157                                                 CDataLoader::TProcessedNAs* processed_nas);
158     CDataLoader::TTSE_LockSet GetAnnotRecordsNAOnce(CDataSource* data_source,
159                                                 const CSeq_id_Handle& idh,
160                                                 const SAnnotSelector* sel,
161                                                 CDataLoader::TProcessedNAs* processed_nas);
162 
163     void DropTSE(const CPsgBlobId& blob_id);
164 
165     void GetAccVers(const TIds& ids, TLoaded& loaded, TIds& ret);
166     void GetAccVersOnce(const TIds& ids, TLoaded& loaded, TIds& ret);
167     void GetGis(const TIds& ids, TLoaded& loaded, TGis& ret);
168     void GetGisOnce(const TIds& ids, TLoaded& loaded, TGis& ret);
169 
170     static CObjectIStream* GetBlobDataStream(const CPSG_BlobInfo& blob_info, const CPSG_BlobData& blob_data);
171 
172     typedef vector<shared_ptr<SPsgBioseqInfo>> TBioseqInfos;
173 
174     struct SReplyResult {
175         CTSE_Lock lock;
176         string blob_id;
177     };
178 
179     static void NCBI_XLOADER_GENBANK_EXPORT SetGetBlobByIdShouldFail(bool value);
180     static bool NCBI_XLOADER_GENBANK_EXPORT GetGetBlobByIdShouldFail();
181 
182     template<class Call>
183     typename std::result_of<Call()>::type
184     CallWithRetry(Call&& call,
185                   const char* name,
186                   int retry_count = 0);
187 
188 private:
189     friend class CPSG_Blob_Task;
190 
191     void x_SendRequest(shared_ptr<CPSG_Request> request);
192     CPSG_BioId x_GetBioId(const CSeq_id_Handle& idh);
193     shared_ptr<CPSG_Reply> x_ProcessRequest(shared_ptr<CPSG_Request> request);
194     SReplyResult x_ProcessBlobReply(shared_ptr<CPSG_Reply> reply, CDataSource* data_source, CSeq_id_Handle req_idh, bool retry, bool lock_asap = false, CTSE_LoadLock* load_lock = nullptr);
195     SReplyResult x_RetryBlobRequest(const string& blob_id, CDataSource* data_source, CSeq_id_Handle req_idh);
196     shared_ptr<SPsgBioseqInfo> x_GetBioseqInfo(const CSeq_id_Handle& idh);
197     CTSE_Lock x_LoadBlob(const SPsgBlobInfo& psg_blob_info, CDataSource& data_source);
198 
199     enum EMainChunkType {
200         eNoDelayedMainChunk,
201         eDelayedMainChunk
202     };
203     enum ESplitInfoType {
204         eNoSplitInfo,
205         eIsSplitInfo
206     };
207     // caller of x_ReadBlobData() should call SetLoaded();
208     void x_ReadBlobData(
209         const SPsgBlobInfo& psg_blob_info,
210         const CPSG_BlobInfo& blob_info,
211         const CPSG_BlobData& blob_data,
212         CTSE_LoadLock& load_lock,
213         ESplitInfoType split_info_type);
214     void x_SetLoaded(CTSE_LoadLock& load_lock, EMainChunkType main_chunk_type);
215 
216     typedef map<void*, size_t> TIdxMap;
217 
218     // returns pair(number of loaded infos, number of failed infos)
219     pair<size_t, size_t> x_GetBulkBioseqInfo(CPSG_Request_Resolve::EIncludeInfo info,
220         const TIds& ids,
221         const TLoaded& loaded,
222         TBioseqInfos& ret);
223 
224     shared_ptr<CPSG_Request_Blob>
225     x_MakeLoadLocalCDDEntryRequest(CDataSource* data_source,
226                                    CDataLoader::TChunk chunk,
227                                    shared_ptr<CPsgClientContext_Bulk> context);
228     bool x_ReadCDDChunk(CDataSource* data_source,
229                         CDataLoader::TChunk chunk,
230                         const CPSG_BlobInfo& blob_info,
231                         const CPSG_BlobData& blob_data);
232 
233     // Map seq-id to bioseq info.
234     typedef map<CSeq_id_Handle, shared_ptr<SPsgBioseqInfo> > TBioseqCache;
235 
236     CPSG_Request_Biodata::EIncludeData m_TSERequestMode = CPSG_Request_Biodata::eSmartTSE;
237     CPSG_Request_Biodata::EIncludeData m_TSERequestModeBulk = CPSG_Request_Biodata::eWholeTSE;
238     bool m_AddWGSMasterDescr = true;
239     shared_ptr<CPSG_Queue> m_Queue;
240     CRef<CPsgClientThread> m_Thread;
241     unique_ptr<CPSGBlobMap> m_BlobMap;
242     unique_ptr<CPSGBioseqCache> m_BioseqCache;
243     unique_ptr<CThreadPool> m_ThreadPool;
244 };
245 
246 END_SCOPE(objects)
247 END_NCBI_SCOPE
248 
249 #endif // HAVE_PSG_LOADER
250 
251 #endif  // OBJTOOLS_DATA_LOADERS_PSG___PSG_LOADER_IMPL__HPP
252