1 /*  $Id: psg_loader_impl.cpp 637408 2021-09-13 11:50:24Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Eugene Vasilchenko, Aleksey Grichenko
27  *
28  * File Description: PSG data loader
29  *
30  * ===========================================================================
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbistd.hpp>
35 #include <corelib/ncbithr.hpp>
36 #include <corelib/ncbi_param.hpp>
37 #include <corelib/plugin_manager_store.hpp>
38 #include <objects/seqsplit/ID2S_Split_Info.hpp>
39 #include <objects/seqsplit/ID2S_Chunk.hpp>
40 #include <objects/seqsplit/ID2S_Feat_type_Info.hpp>
41 #include <objects/general/Dbtag.hpp>
42 #include <objmgr/impl/data_source.hpp>
43 #include <objmgr/impl/tse_loadlock.hpp>
44 #include <objmgr/impl/tse_chunk_info.hpp>
45 #include <objmgr/impl/tse_split_info.hpp>
46 #include <objmgr/impl/split_parser.hpp>
47 #include <objmgr/data_loader_factory.hpp>
48 #include <objmgr/annot_selector.hpp>
49 #include <objtools/data_loaders/genbank/impl/psg_loader_impl.hpp>
50 #include <objtools/data_loaders/genbank/impl/wgsmaster.hpp>
51 #include <util/compress/compress.hpp>
52 #include <util/compress/stream.hpp>
53 #include <util/compress/zlib.hpp>
54 #include <serial/objistr.hpp>
55 #include <serial/serial.hpp>
56 #include <util/thread_pool.hpp>
57 #include <sstream>
58 
59 #if defined(HAVE_PSG_LOADER)
60 
61 #define LOCK4GET 1
62 
63 BEGIN_NCBI_SCOPE
64 
65 //#define NCBI_USE_ERRCODE_X   PSGLoader
66 //NCBI_DEFINE_ERR_SUBCODE_X(1);
67 
68 BEGIN_SCOPE(objects)
69 
70 const int kSplitInfoChunkId = 999999999;
71 
72 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, DEBUG);
73 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, DEBUG, 1,
74     eParam_NoThread, PSG_LOADER_DEBUG);
75 typedef NCBI_PARAM_TYPE(PSG_LOADER, DEBUG) TPSG_Debug;
76 
77 
s_GetDebugLevel()78 static unsigned int s_GetDebugLevel()
79 {
80     static auto value = TPSG_Debug::GetDefault();
81     return value;
82 }
83 
84 /////////////////////////////////////////////////////////////////////////////
85 // CPsgClientContext
86 /////////////////////////////////////////////////////////////////////////////
87 
88 class CPsgClientContext
89 {
90 public:
91     CPsgClientContext(void);
~CPsgClientContext(void)92     virtual ~CPsgClientContext(void) {}
93 
94     virtual void SetReply(shared_ptr<CPSG_Reply> reply);
95     virtual shared_ptr<CPSG_Reply> GetReply(void);
96 
97 protected:
98     CSemaphore m_Sema;
99 private:
100     shared_ptr<CPSG_Reply> m_Reply;
101 };
102 
103 
CPsgClientContext(void)104 CPsgClientContext::CPsgClientContext(void)
105     : m_Sema(0, kMax_UInt)
106 {
107 }
108 
109 
SetReply(shared_ptr<CPSG_Reply> reply)110 void CPsgClientContext::SetReply(shared_ptr<CPSG_Reply> reply)
111 {
112     m_Reply = reply;
113     m_Sema.Post();
114 }
115 
116 
GetReply(void)117 shared_ptr<CPSG_Reply> CPsgClientContext::GetReply(void)
118 {
119     m_Sema.Wait();
120     return m_Reply;
121 }
122 
123 
124 class CPsgClientContext_Bulk : public CPsgClientContext
125 {
126 public:
CPsgClientContext_Bulk(void)127     CPsgClientContext_Bulk(void) {}
~CPsgClientContext_Bulk(void)128     virtual ~CPsgClientContext_Bulk(void) {}
129 
130     void SetReply(shared_ptr<CPSG_Reply> reply) override;
131     shared_ptr<CPSG_Reply> GetReply(void) override;
132 
133 private:
134     deque<shared_ptr<CPSG_Reply>> m_Replies;
135     CFastMutex m_Lock;
136 };
137 
138 
SetReply(shared_ptr<CPSG_Reply> reply)139 void CPsgClientContext_Bulk::SetReply(shared_ptr<CPSG_Reply> reply)
140 {
141     CFastMutexGuard guard(m_Lock);
142     m_Replies.push_front(reply);
143     m_Sema.Post();
144 }
145 
146 
GetReply(void)147 shared_ptr<CPSG_Reply> CPsgClientContext_Bulk::GetReply(void)
148 {
149     m_Sema.Wait();
150     shared_ptr<CPSG_Reply> ret;
151     CFastMutexGuard guard(m_Lock);
152     _ASSERT(!m_Replies.empty());
153     ret = m_Replies.back();
154     m_Replies.pop_back();
155     return ret;
156 }
157 
158 
159 /////////////////////////////////////////////////////////////////////////////
160 // CPsgClientThread
161 /////////////////////////////////////////////////////////////////////////////
162 
163 class CPsgClientThread : public CThread
164 {
165 public:
CPsgClientThread(shared_ptr<CPSG_Queue> queue)166     CPsgClientThread(shared_ptr<CPSG_Queue> queue) : m_Queue(queue), m_WakeSema(0, kMax_UInt) {}
167 
Stop(void)168     void Stop(void)
169     {
170         m_Stop = true;
171         Wake();
172     }
173 
Wake()174     void Wake()
175     {
176         m_WakeSema.Post();
177     }
178 
179 protected:
180     void* Main(void) override;
181 
182 private:
183     bool m_Stop = false;
184     shared_ptr<CPSG_Queue> m_Queue;
185     CSemaphore m_WakeSema;
186 };
187 
188 
189 const unsigned int kMaxWaitSeconds = 3;
190 const unsigned int kMaxWaitMillisec = 0;
191 
192 #define DEFAULT_DEADLINE CDeadline(kMaxWaitSeconds, kMaxWaitMillisec)
193 
Main(void)194 void* CPsgClientThread::Main(void)
195 {
196     for (;;) {
197         m_WakeSema.Wait();
198         if (m_Stop) break;
199         shared_ptr<CPSG_Reply> reply;
200         do {
201             reply = m_Queue->GetNextReply(DEFAULT_DEADLINE);
202         }
203         while (!reply && !m_Stop);
204         if (m_Stop) break;
205         auto context = reply->GetRequest()->GetUserContext<CPsgClientContext>();
206         context->SetReply(reply);
207     }
208     return nullptr;
209 }
210 
211 
212 /////////////////////////////////////////////////////////////////////////////
213 // CPSGBioseqCache
214 /////////////////////////////////////////////////////////////////////////////
215 
216 
PsgIdToHandle(const CPSG_BioId & id)217 CSeq_id_Handle PsgIdToHandle(const CPSG_BioId& id)
218 {
219     string sid = id.GetId();
220     if (sid.empty()) return CSeq_id_Handle();
221     return CSeq_id_Handle::GetHandle(sid);
222 }
223 
224 
225 const int kMaxCacheLifespanSeconds = 300;
226 const size_t kMaxCacheSize = 10000;
227 
228 
229 class CPSGBioseqCache
230 {
231 public:
CPSGBioseqCache(void)232     CPSGBioseqCache(void) {}
~CPSGBioseqCache(void)233     ~CPSGBioseqCache(void) {}
234 
235     shared_ptr<SPsgBioseqInfo> Get(const CSeq_id_Handle& idh);
236     shared_ptr<SPsgBioseqInfo> Add(const CPSG_BioseqInfo& info, CSeq_id_Handle req_idh);
237 
238 private:
239     typedef map<CSeq_id_Handle, shared_ptr<SPsgBioseqInfo> > TIdMap;
240     typedef list<shared_ptr<SPsgBioseqInfo> > TInfoQueue;
241 
242     mutable CFastMutex m_Mutex;
243     size_t m_MaxSize = kMaxCacheSize;
244     TIdMap m_Ids;
245     TInfoQueue m_Infos;
246 };
247 
248 
Get(const CSeq_id_Handle & idh)249 shared_ptr<SPsgBioseqInfo> CPSGBioseqCache::Get(const CSeq_id_Handle& idh)
250 {
251     CFastMutexGuard guard(m_Mutex);
252     auto found = m_Ids.find(idh);
253     if (found == m_Ids.end()) return nullptr;
254     shared_ptr<SPsgBioseqInfo> ret = found->second;
255     m_Infos.remove(ret);
256     ret->deadline = CDeadline(kMaxCacheLifespanSeconds);
257     m_Infos.push_back(ret);
258     return ret;
259 }
260 
261 
Add(const CPSG_BioseqInfo & info,CSeq_id_Handle req_idh)262 shared_ptr<SPsgBioseqInfo> CPSGBioseqCache::Add(const CPSG_BioseqInfo& info, CSeq_id_Handle req_idh)
263 {
264     CSeq_id_Handle idh = PsgIdToHandle(info.GetCanonicalId());
265     if (!idh) return nullptr;
266     // Try to find an existing entry (though this should not be a common case).
267     CFastMutexGuard guard(m_Mutex);
268     auto found = m_Ids.find(idh);
269     if (found != m_Ids.end()) {
270         found->second->Update(info);
271         return found->second;
272     }
273     // Create new entry.
274     shared_ptr<SPsgBioseqInfo> ret = make_shared<SPsgBioseqInfo>(info);
275     while (!m_Infos.empty() && (m_Infos.size() > m_MaxSize || m_Infos.front()->deadline.IsExpired())) {
276         auto rm = m_Infos.front();
277         m_Infos.pop_front();
278         ITERATE(SPsgBioseqInfo::TIds, id, rm->ids) {
279             m_Ids.erase(*id);
280         }
281     }
282     m_Infos.push_back(ret);
283     if (req_idh) {
284         m_Ids[req_idh] = ret;
285     }
286     ITERATE(SPsgBioseqInfo::TIds, it, ret->ids) {
287         m_Ids[*it] = ret;
288     }
289     return ret;
290 }
291 
292 
293 /////////////////////////////////////////////////////////////////////////////
294 // CPSGBlobMap
295 /////////////////////////////////////////////////////////////////////////////
296 
297 
298 class CPSGBlobMap
299 {
300 public:
CPSGBlobMap(void)301     CPSGBlobMap(void) {}
~CPSGBlobMap(void)302     ~CPSGBlobMap(void) {}
303 
FindBlob(const string & bid)304     shared_ptr<SPsgBlobInfo> FindBlob(const string& bid) {
305         CFastMutexGuard guard(m_Mutex);
306         auto found = m_Blobs.find(bid);
307         return found != m_Blobs.end() ? found->second : nullptr;
308     }
309 
AddBlob(const string & bid,shared_ptr<SPsgBlobInfo> blob)310     void AddBlob(const string& bid, shared_ptr<SPsgBlobInfo> blob) {
311         CFastMutexGuard guard(m_Mutex);
312         m_Blobs[bid] = blob;
313     }
314 
DropBlob(const CPsgBlobId & blob_id)315     void DropBlob(const CPsgBlobId& blob_id) {
316         CFastMutexGuard guard(m_Mutex);
317         auto blob_it = m_Blobs.find(blob_id.ToPsgId());
318         if (blob_it != m_Blobs.end()) {
319             m_Blobs.erase(blob_it);
320         }
321     }
322 
323 private:
324     // Map blob-id to blob info
325     typedef map<string, shared_ptr<SPsgBlobInfo> > TBlobs;
326 
327     CFastMutex m_Mutex;
328     TBlobs m_Blobs;
329 };
330 
331 
332 /////////////////////////////////////////////////////////////////////////////
333 // SPsgBioseqInfo
334 /////////////////////////////////////////////////////////////////////////////
335 
336 
SPsgBioseqInfo(const CPSG_BioseqInfo & bioseq_info)337 SPsgBioseqInfo::SPsgBioseqInfo(const CPSG_BioseqInfo& bioseq_info)
338     : included_info(0),
339       molecule_type(CSeq_inst::eMol_not_set),
340       length(0),
341       state(0),
342       tax_id(INVALID_TAX_ID),
343       hash(0),
344       deadline(kMaxCacheLifespanSeconds)
345 {
346     Update(bioseq_info);
347 }
348 
349 
Update(const CPSG_BioseqInfo & bioseq_info)350 SPsgBioseqInfo::TIncludedInfo SPsgBioseqInfo::Update(const CPSG_BioseqInfo& bioseq_info)
351 {
352     TIncludedInfo inc_info = bioseq_info.IncludedInfo() & ~included_info;
353     if (inc_info & CPSG_Request_Resolve::fMoleculeType)
354         molecule_type = bioseq_info.GetMoleculeType();
355 
356     if (inc_info & CPSG_Request_Resolve::fLength)
357         length = bioseq_info.GetLength();
358 
359     if (inc_info & CPSG_Request_Resolve::fState)
360         state = bioseq_info.GetState();
361 
362     if (inc_info & CPSG_Request_Resolve::fTaxId)
363         tax_id = bioseq_info.GetTaxId();
364 
365     if (inc_info & CPSG_Request_Resolve::fHash)
366         hash = bioseq_info.GetHash();
367 
368     if (inc_info & CPSG_Request_Resolve::fCanonicalId) {
369         canonical = PsgIdToHandle(bioseq_info.GetCanonicalId());
370         ids.push_back(canonical);
371     }
372     if (inc_info & CPSG_Request_Resolve::fGi)
373         gi = bioseq_info.GetGi();
374 
375     if (inc_info & CPSG_Request_Resolve::fOtherIds) {
376         vector<CPSG_BioId> other_ids = bioseq_info.GetOtherIds();
377         ITERATE(vector<CPSG_BioId>, other_id, other_ids) {
378             ids.push_back(PsgIdToHandle(*other_id));
379         }
380     }
381     if (inc_info & CPSG_Request_Resolve::fBlobId)
382         blob_id = bioseq_info.GetBlobId().GetId();
383 
384     included_info |= inc_info;
385     return inc_info;
386 }
387 
388 
389 /////////////////////////////////////////////////////////////////////////////
390 // SPsgBlobInfo
391 /////////////////////////////////////////////////////////////////////////////
392 
393 
SPsgBlobInfo(const CPSG_BlobInfo & blob_info)394 SPsgBlobInfo::SPsgBlobInfo(const CPSG_BlobInfo& blob_info)
395     : blob_state(0)
396 {
397     auto blob_id = blob_info.GetId<CPSG_BlobId>();
398     _ASSERT(blob_id);
399     blob_id_main = blob_id->GetId();
400     id2_info = blob_info.GetId2Info();
401 
402     if (blob_info.IsDead()) blob_state |= CBioseq_Handle::fState_dead;
403     if (blob_info.IsSuppressed()) blob_state |= CBioseq_Handle::fState_suppress_perm;
404     if (blob_info.IsWithdrawn()) blob_state |= CBioseq_Handle::fState_withdrawn;
405 
406     auto lm = blob_id->GetLastModified(); // last_modified is in milliseconds
407     last_modified = lm.IsNull()? 0: lm.GetValue();
408 }
409 
410 
411 /////////////////////////////////////////////////////////////////////////////
412 // CPSG_Task
413 /////////////////////////////////////////////////////////////////////////////
414 
ReportStatus(TReply reply,EPSG_Status status)415 template<class TReply> void ReportStatus(TReply reply, EPSG_Status status)
416 {
417     if (status == EPSG_Status::eSuccess) return;
418     string sstatus;
419     switch (status) {
420     case EPSG_Status::eCanceled:
421         sstatus = "Canceled";
422         break;
423     case EPSG_Status::eError:
424         sstatus = "Error";
425         break;
426     case EPSG_Status::eInProgress:
427         sstatus = "In progress";
428         break;
429     case EPSG_Status::eNotFound:
430         sstatus = "Not found";
431         break;
432     default:
433         sstatus = to_string((int)status);
434         break;
435     }
436     while (true) {
437         string msg = reply->GetNextMessage();
438         if (msg.empty()) break;
439         _TRACE("Request failed: " << sstatus << " - " << msg << " @ "<<CStackTrace());
440     }
441 }
442 
443 
444 class CPSG_TaskGroup;
445 
446 
447 class CPSG_Task : public CThreadPool_Task
448 {
449 public:
450     typedef shared_ptr<CPSG_Reply> TReply;
451 
452     CPSG_Task(TReply reply, CPSG_TaskGroup& group);
~CPSG_Task(void)453     ~CPSG_Task(void) override {}
454 
455     EStatus Execute(void) override;
456     virtual void Finish(void) = 0;
457 
GotNotFound() const458     bool GotNotFound() const {
459         return m_GotNotFound;
460     }
461 
462 protected:
463     void OnStatusChange(EStatus old) override;
464 
GetReply(void)465     TReply& GetReply(void) { return m_Reply; }
466 
DoExecute(void)467     virtual void DoExecute(void) {
468         if (!CheckReplyStatus()) return;
469         ReadReply();
470         if (m_Status == eExecuting) m_Status = eCompleted;
471     }
472 
IsCancelled(void)473     bool IsCancelled(void) {
474         if (IsCancelRequested()) {
475             m_Status = eFailed;
476             return true;
477         }
478         return false;
479     }
480 
481     bool CheckReplyStatus(void);
482     void ReadReply(void);
483     virtual void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) = 0;
484 
485     TReply m_Reply;
486     EStatus m_Status;
487     bool m_GotNotFound;
488 private:
489     CPSG_TaskGroup& m_Group;
490 };
491 
492 
493 // It may happen that a CThreadPool's thread holds CRef to a task longer than
494 // the loader exists. In this case the task needs to release some data
495 // (e.g. load locks) before the loader is destroyed. The guard calls
496 // Finish() to do the cleanup.
497 class CPSG_Task_Guard
498 {
499 public:
CPSG_Task_Guard(CPSG_Task & task)500     CPSG_Task_Guard(CPSG_Task& task) : m_Task(&task) {}
~CPSG_Task_Guard(void)501     ~CPSG_Task_Guard(void) { if (m_Task) m_Task->Finish(); }
Resease(void)502     void Resease(void) { m_Task.Reset(); }
503 private:
504     CPSG_Task_Guard(const CPSG_Task_Guard&);
505     CPSG_Task_Guard& operator=(const CPSG_Task_Guard&);
506 
507     CRef<CPSG_Task> m_Task;
508 };
509 
510 
511 class CPSG_TaskGroup
512 {
513 public:
CPSG_TaskGroup(CThreadPool & pool)514     CPSG_TaskGroup(CThreadPool& pool)
515         : m_Pool(pool), m_Semaphore(0, kMax_UInt) {}
516 
AddTask(CPSG_Task * task)517     void AddTask(CPSG_Task* task) {
518         {
519             CMutexGuard guard(m_Mutex);
520             m_Tasks.insert(Ref(task));
521             m_Pool.AddTask(task);
522         }
523     }
524 
PostFinished(CPSG_Task & task)525     void PostFinished(CPSG_Task& task)
526     {
527         {
528             CRef<CPSG_Task> ref(&task);
529             CMutexGuard guard(m_Mutex);
530             TTasks::iterator it = m_Tasks.find(ref);
531             if (it == m_Tasks.end()) return;
532             m_Done.insert(ref);
533             m_Tasks.erase(it);
534         }
535         m_Semaphore.Post();
536     }
537 
HasTasks(void) const538     bool HasTasks(void) const
539     {
540         CMutexGuard guard(m_Mutex);
541         return !m_Tasks.empty() || ! m_Done.empty();
542     }
543 
WaitAll(void)544     void WaitAll(void) {
545         while (HasTasks()) GetTask<CPSG_Task>();
546     }
547 
548     template<class T>
GetTask(void)549     CRef<T> GetTask(void) {
550         m_Semaphore.Wait();
551         CRef<T> ret;
552         CMutexGuard guard(m_Mutex);
553         _ASSERT(!m_Done.empty());
554         TTasks::iterator it = m_Done.begin();
555         ret.Reset(dynamic_cast<T*>(it->GetNCPointerOrNull()));
556         m_Done.erase(it);
557         return ret;
558     }
559 
CancelAll(void)560     void CancelAll(void)
561     {
562         {
563             CMutexGuard guard(m_Mutex);
564             for (CRef<CPSG_Task> task : m_Tasks) {
565                 task->RequestToCancel();
566             }
567         }
568         WaitAll();
569     }
570 
571 private:
572     typedef set<CRef<CPSG_Task>> TTasks;
573 
574     CThreadPool& m_Pool;
575     CSemaphore m_Semaphore;
576     TTasks m_Tasks;
577     TTasks m_Done;
578     mutable CMutex m_Mutex;
579 };
580 
581 
CPSG_Task(TReply reply,CPSG_TaskGroup & group)582 CPSG_Task::CPSG_Task(TReply reply, CPSG_TaskGroup& group)
583     : m_Reply(reply),
584       m_Status(eIdle),
585       m_GotNotFound(false),
586       m_Group(group)
587 {
588 }
589 
590 
Execute(void)591 CPSG_Task::EStatus CPSG_Task::Execute(void)
592 {
593     m_Status = eExecuting;
594     try {
595         DoExecute();
596     }
597     catch (CException& exc) {
598         LOG_POST("CPSGDataLoader: exception in retrieval thread: "<<exc);
599         return eFailed;
600     }
601     catch (exception& exc) {
602         LOG_POST("CPSGDataLoader: exception in retrieval thread: "<<exc.what());
603         return eFailed;
604     }
605     return m_Status;
606 }
607 
608 
OnStatusChange(EStatus old)609 void CPSG_Task::OnStatusChange(EStatus old)
610 {
611     EStatus status = GetStatus();
612     if (status == eCompleted || status == eFailed || status == eCanceled) {
613         m_Group.PostFinished(*this);
614     }
615 }
616 
CheckReplyStatus(void)617 bool CPSG_Task::CheckReplyStatus(void)
618 {
619     EPSG_Status status = m_Reply->GetStatus(0);
620     if (status != EPSG_Status::eSuccess && status != EPSG_Status::eInProgress) {
621         ReportStatus(m_Reply, status);
622         m_Status = eFailed;
623         return false;
624     }
625     return true;
626 }
627 
628 
ReadReply(void)629 void CPSG_Task::ReadReply(void)
630 {
631     EPSG_Status status;
632     for (;;) {
633         if (IsCancelled()) return;
634         auto reply_item = m_Reply->GetNextItem(DEFAULT_DEADLINE);
635         if (!reply_item) continue;
636         if (reply_item->GetType() == CPSG_ReplyItem::eEndOfReply) break;
637         if (IsCancelled()) return;
638 
639         EPSG_Status status = reply_item->GetStatus(0);
640         if (status != EPSG_Status::eSuccess && status != EPSG_Status::eInProgress) {
641             ReportStatus(reply_item, status);
642             if ( status == EPSG_Status::eNotFound ) {
643                 m_GotNotFound = true;
644                 continue;
645             }
646             m_Status = eFailed;
647             return;
648         }
649         if (status == EPSG_Status::eInProgress) {
650             status = reply_item->GetStatus(CDeadline::eInfinite);
651             if (IsCancelled()) return;
652         }
653         if (status != EPSG_Status::eSuccess) {
654             ReportStatus(reply_item, status);
655             m_Status = eFailed;
656             return;
657         }
658         ProcessReplyItem(reply_item);
659     }
660     if (IsCancelled()) return;
661     status = m_Reply->GetStatus(CDeadline::eInfinite);
662     if (status != EPSG_Status::eSuccess) {
663         ReportStatus(m_Reply, status);
664         m_Status = eFailed;
665     }
666 }
667 
668 
669 /////////////////////////////////////////////////////////////////////////////
670 // CPSGDataLoader_Impl
671 /////////////////////////////////////////////////////////////////////////////
672 
673 #define NCBI_PSGLOADER_NAME "psg_loader"
674 #define NCBI_PSGLOADER_SERVICE_NAME "service_name"
675 #define NCBI_PSGLOADER_NOSPLIT "no_split"
676 #define NCBI_PSGLOADER_WHOLE_TSE "whole_tse"
677 #define NCBI_PSGLOADER_WHOLE_TSE_BULK "whole_tse_bulk"
678 #define NCBI_PSGLOADER_ADD_WGS_MASTER "add_wgs_master"
679 
680 NCBI_PARAM_DECL(string, PSG_LOADER, SERVICE_NAME);
681 NCBI_PARAM_DEF_EX(string, PSG_LOADER, SERVICE_NAME, "PSG2",
682     eParam_NoThread, PSG_LOADER_SERVICE_NAME);
683 typedef NCBI_PARAM_TYPE(PSG_LOADER, SERVICE_NAME) TPSG_ServiceName;
684 
685 NCBI_PARAM_DECL(bool, PSG_LOADER, NO_SPLIT);
686 NCBI_PARAM_DEF_EX(bool, PSG_LOADER, NO_SPLIT, false,
687     eParam_NoThread, PSG_LOADER_NO_SPLIT);
688 typedef NCBI_PARAM_TYPE(PSG_LOADER, NO_SPLIT) TPSG_NoSplit;
689 
690 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, WHOLE_TSE);
691 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, WHOLE_TSE, false,
692     eParam_NoThread, PSG_LOADER_WHOLE_TSE);
693 typedef NCBI_PARAM_TYPE(PSG_LOADER, WHOLE_TSE) TPSG_WholeTSE;
694 
695 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, WHOLE_TSE_BULK);
696 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, WHOLE_TSE_BULK, true,
697     eParam_NoThread, PSG_LOADER_WHOLE_TSE_BULK);
698 typedef NCBI_PARAM_TYPE(PSG_LOADER, WHOLE_TSE_BULK) TPSG_WholeTSEBulk;
699 
700 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, MAX_POOL_THREADS);
701 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, MAX_POOL_THREADS, 10,
702     eParam_NoThread, PSG_LOADER_MAX_POOL_THREADS);
703 typedef NCBI_PARAM_TYPE(PSG_LOADER, MAX_POOL_THREADS) TPSG_MaxPoolThreads;
704 
CPSGDataLoader_Impl(const CGBLoaderParams & params)705 CPSGDataLoader_Impl::CPSGDataLoader_Impl(const CGBLoaderParams& params)
706     : m_BlobMap(new CPSGBlobMap()),
707       m_BioseqCache(new CPSGBioseqCache()),
708       m_ThreadPool(new CThreadPool(kMax_UInt, TPSG_MaxPoolThreads::GetDefault()))
709 {
710     auto_ptr<CPSGDataLoader::TParamTree> app_params;
711     const CPSGDataLoader::TParamTree* psg_params = 0;
712     if (params.GetParamTree()) {
713         psg_params = CPSGDataLoader::GetParamsSubnode(params.GetParamTree(), NCBI_PSGLOADER_NAME);
714     }
715     else {
716         CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
717         if (app) {
718             app_params.reset(CConfig::ConvertRegToTree(app->GetConfig()));
719             psg_params = CPSGDataLoader::GetParamsSubnode(app_params.get(), NCBI_PSGLOADER_NAME);
720         }
721     }
722 
723     string service_name;
724     if (service_name.empty() && psg_params) {
725         service_name = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_SERVICE_NAME);
726     }
727     if (service_name.empty()) {
728         service_name = params.GetPSGServiceName();
729     }
730     if (service_name.empty()) {
731         service_name = TPSG_ServiceName::GetDefault();
732     }
733 
734     bool no_split = params.GetPSGNoSplit();
735     if (psg_params) {
736         try {
737             string value = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_NOSPLIT);
738             if (!value.empty()) {
739                 no_split = NStr::StringToBool(value);
740             }
741         }
742         catch (CException&) {
743         }
744     }
745     if ( no_split ) {
746         m_TSERequestModeBulk = m_TSERequestMode = CPSG_Request_Biodata::eOrigTSE;
747     }
748     else {
749         {{
750             bool whole_tse = TPSG_WholeTSE::GetDefault();
751             if ( psg_params ) {
752                 try {
753                     string value = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_WHOLE_TSE);
754                     if (!value.empty()) {
755                         whole_tse = NStr::StringToBool(value);
756                     }
757                 }
758                 catch (CException&) {
759                 }
760             }
761             if ( whole_tse ) {
762                 m_TSERequestMode = CPSG_Request_Biodata::eWholeTSE;
763             }
764             else {
765                 m_TSERequestMode = CPSG_Request_Biodata::eSmartTSE;
766             }
767         }}
768         {{
769             bool whole_tse = TPSG_WholeTSEBulk::GetDefault();
770             if ( psg_params ) {
771                 try {
772                     string value = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_WHOLE_TSE_BULK);
773                     if (!value.empty()) {
774                         whole_tse = NStr::StringToBool(value);
775                     }
776                 }
777                 catch (CException&) {
778                 }
779             }
780             if ( whole_tse ) {
781                 m_TSERequestModeBulk = CPSG_Request_Biodata::eWholeTSE;
782             }
783             else {
784                 m_TSERequestModeBulk = CPSG_Request_Biodata::eSmartTSE;
785             }
786         }}
787     }
788 
789     m_AddWGSMasterDescr = true;
790     if ( psg_params ) {
791         string param = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_ADD_WGS_MASTER);
792         if ( !param.empty() ) {
793             try {
794                 m_AddWGSMasterDescr = NStr::StringToBool(param);
795             }
796             catch ( CException& exc ) {
797                 NCBI_RETHROW_FMT(exc, CLoaderException, eBadConfig,
798                                  "Bad value of parameter "
799                                  NCBI_PSGLOADER_ADD_WGS_MASTER
800                                  ": \""<<param<<"\"");
801             }
802         }
803     }
804 
805     m_Queue = make_shared<CPSG_Queue>(service_name);
806     m_Thread.Reset(new CPsgClientThread(m_Queue));
807     m_Thread->Run();
808 }
809 
810 
~CPSGDataLoader_Impl(void)811 CPSGDataLoader_Impl::~CPSGDataLoader_Impl(void)
812 {
813     m_Thread->Stop();
814     m_Thread->Join();
815 }
816 
817 
CannotProcess(const CSeq_id_Handle & sih)818 static bool CannotProcess(const CSeq_id_Handle& sih)
819 {
820     if ( !sih ) {
821         return true;
822     }
823     if ( sih.Which() == CSeq_id::e_Local ) {
824         return true;
825     }
826     if ( sih.Which() == CSeq_id::e_General ) {
827         if ( NStr::EqualNocase(sih.GetSeqId()->GetGeneral().GetDb(), "SRA") ) {
828             // SRA is good
829             return false;
830         }
831         if ( NStr::StartsWith(sih.GetSeqId()->GetGeneral().GetDb(), "WGS:", NStr::eNocase) ) {
832             // WGS is good
833             return false;
834         }
835         // other general ids are good too(?)
836         return false;
837     }
838     return false;
839 }
840 
841 
842 template<class Call>
843 typename std::result_of<Call()>::type
CallWithRetry(Call && call,const char * name,int retry_count)844 CPSGDataLoader_Impl::CallWithRetry(Call&& call,
845                                    const char* name,
846                                    int retry_count)
847 {
848     if ( retry_count == 0 ) {
849         retry_count = 4;
850     }
851     for ( int t = 1; t < retry_count; ++ t ) {
852         try {
853             return call();
854         }
855         catch ( CLoaderException& exc ) {
856             if ( exc.GetErrCode() == exc.eConnectionFailed ||
857                  exc.GetErrCode() == exc.eLoaderFailed ) {
858                 // can retry
859                 LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc);
860             }
861             else {
862                 // no retry
863                 throw;
864             }
865         }
866         catch ( CException& exc ) {
867             LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc);
868         }
869         catch ( exception& exc ) {
870             LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc.what());
871         }
872         catch ( ... ) {
873             LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception");
874         }
875         double wait_sec = 1<<(t-1);
876         LOG_POST(Warning<<"CPSGDataLoader: waiting "<<wait_sec<<"s before retry");
877         SleepMilliSec(Uint4(wait_sec*1000));
878     }
879     return call();
880 }
881 
882 
GetIds(const CSeq_id_Handle & idh,TIds & ids)883 void CPSGDataLoader_Impl::GetIds(const CSeq_id_Handle& idh, TIds& ids)
884 {
885     CallWithRetry(bind(&CPSGDataLoader_Impl::GetIdsOnce, this,
886                        cref(idh), ref(ids)),
887                   "GetIds");
888 }
889 
890 
GetIdsOnce(const CSeq_id_Handle & idh,TIds & ids)891 void CPSGDataLoader_Impl::GetIdsOnce(const CSeq_id_Handle& idh, TIds& ids)
892 {
893     if ( CannotProcess(idh) ) {
894         return;
895     }
896     auto seq_info = x_GetBioseqInfo(idh);
897     if (!seq_info) return;
898 
899     ITERATE(SPsgBioseqInfo::TIds, it, seq_info->ids) {
900         ids.push_back(*it);
901     }
902 }
903 
904 
905 CDataLoader::SGiFound
GetGi(const CSeq_id_Handle & idh)906 CPSGDataLoader_Impl::GetGi(const CSeq_id_Handle& idh)
907 {
908     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetGiOnce, this,
909                               cref(idh)),
910                          "GetGi");
911 }
912 
913 
914 CDataLoader::SGiFound
GetGiOnce(const CSeq_id_Handle & idh)915 CPSGDataLoader_Impl::GetGiOnce(const CSeq_id_Handle& idh)
916 {
917     if ( CannotProcess(idh) ) {
918         return CDataLoader::SGiFound();
919     }
920     CDataLoader::SGiFound ret;
921     auto seq_info = x_GetBioseqInfo(idh);
922     if (seq_info) {
923         ret.sequence_found = true;
924         if ( seq_info->gi != ZERO_GI ) {
925             ret.gi = seq_info->gi;
926         }
927     }
928     return ret;
929 }
930 
931 
932 CDataLoader::SAccVerFound
GetAccVer(const CSeq_id_Handle & idh)933 CPSGDataLoader_Impl::GetAccVer(const CSeq_id_Handle& idh)
934 {
935     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetAccVerOnce, this,
936                               cref(idh)),
937                          "GetAccVer");
938 }
939 
940 
941 CDataLoader::SAccVerFound
GetAccVerOnce(const CSeq_id_Handle & idh)942 CPSGDataLoader_Impl::GetAccVerOnce(const CSeq_id_Handle& idh)
943 {
944     if ( CannotProcess(idh) ) {
945         return CDataLoader::SAccVerFound();
946     }
947     CDataLoader::SAccVerFound ret;
948     auto seq_info = x_GetBioseqInfo(idh);
949     if (seq_info) {
950         ret.sequence_found = true;
951         if ( seq_info->canonical.IsAccVer() ) {
952             ret.acc_ver = seq_info->canonical;
953         }
954     }
955     return ret;
956 }
957 
958 
GetTaxId(const CSeq_id_Handle & idh)959 TTaxId CPSGDataLoader_Impl::GetTaxId(const CSeq_id_Handle& idh)
960 {
961     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetTaxIdOnce, this,
962                               cref(idh)),
963                          "GetTaxId");
964 }
965 
966 
GetTaxIdOnce(const CSeq_id_Handle & idh)967 TTaxId CPSGDataLoader_Impl::GetTaxIdOnce(const CSeq_id_Handle& idh)
968 {
969     if ( CannotProcess(idh) ) {
970         return INVALID_TAX_ID;
971     }
972     auto seq_info = x_GetBioseqInfo(idh);
973     return seq_info ? seq_info->tax_id : INVALID_TAX_ID;
974 }
975 
976 
GetSequenceLength(const CSeq_id_Handle & idh)977 TSeqPos CPSGDataLoader_Impl::GetSequenceLength(const CSeq_id_Handle& idh)
978 {
979     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceLengthOnce, this,
980                               cref(idh)),
981                          "GetSequenceLength");
982 }
983 
984 
GetSequenceLengthOnce(const CSeq_id_Handle & idh)985 TSeqPos CPSGDataLoader_Impl::GetSequenceLengthOnce(const CSeq_id_Handle& idh)
986 {
987     if ( CannotProcess(idh) ) {
988         return kInvalidSeqPos;
989     }
990     auto seq_info = x_GetBioseqInfo(idh);
991     return (seq_info && seq_info->length > 0) ? TSeqPos(seq_info->length) : kInvalidSeqPos;
992 }
993 
994 
995 CDataLoader::SHashFound
GetSequenceHash(const CSeq_id_Handle & idh)996 CPSGDataLoader_Impl::GetSequenceHash(const CSeq_id_Handle& idh)
997 {
998     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceHashOnce, this,
999                               cref(idh)),
1000                          "GetSequenceHash");
1001 }
1002 
1003 
1004 CDataLoader::SHashFound
GetSequenceHashOnce(const CSeq_id_Handle & idh)1005 CPSGDataLoader_Impl::GetSequenceHashOnce(const CSeq_id_Handle& idh)
1006 {
1007     if ( CannotProcess(idh) ) {
1008         return CDataLoader::SHashFound();
1009     }
1010     CDataLoader::SHashFound ret;
1011     auto seq_info = x_GetBioseqInfo(idh);
1012     if (seq_info) {
1013         ret.sequence_found = true;
1014         if ( ret.hash ) {
1015             ret.hash_known = true;
1016             ret.hash = seq_info->hash;
1017         }
1018     }
1019     return ret;
1020 }
1021 
1022 
1023 CDataLoader::STypeFound
GetSequenceType(const CSeq_id_Handle & idh)1024 CPSGDataLoader_Impl::GetSequenceType(const CSeq_id_Handle& idh)
1025 {
1026     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceTypeOnce, this,
1027                               cref(idh)),
1028                          "GetSequenceType");
1029 }
1030 
1031 
1032 CDataLoader::STypeFound
GetSequenceTypeOnce(const CSeq_id_Handle & idh)1033 CPSGDataLoader_Impl::GetSequenceTypeOnce(const CSeq_id_Handle& idh)
1034 {
1035     if ( CannotProcess(idh) ) {
1036         return CDataLoader::STypeFound();
1037     }
1038     CDataLoader::STypeFound ret;
1039     auto seq_info = x_GetBioseqInfo(idh);
1040     if (seq_info && seq_info->molecule_type != CSeq_inst::eMol_not_set) {
1041         ret.sequence_found = true;
1042         ret.type = seq_info->molecule_type;
1043     }
1044     return ret;
1045 }
1046 
1047 
GetSequenceState(const CSeq_id_Handle & idh)1048 int CPSGDataLoader_Impl::GetSequenceState(const CSeq_id_Handle& idh)
1049 {
1050     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceStateOnce, this,
1051                               cref(idh)),
1052                          "GetSequenceState");
1053 }
1054 
1055 
GetSequenceStateOnce(const CSeq_id_Handle & idh)1056 int CPSGDataLoader_Impl::GetSequenceStateOnce(const CSeq_id_Handle& idh)
1057 {
1058     const int kNotFound = (CBioseq_Handle::fState_not_found |
1059                            CBioseq_Handle::fState_no_data);
1060     if ( CannotProcess(idh) ) {
1061         return kNotFound;
1062     }
1063     auto seq_info = x_GetBioseqInfo(idh);
1064     if (!seq_info) {
1065         return kNotFound;
1066     }
1067     if (seq_info->included_info & CPSG_Request_Resolve::fState) {
1068         switch (seq_info->state) {
1069         case CPSG_BioseqInfo::eDead:
1070             return CBioseq_Handle::fState_dead;
1071         case CPSG_BioseqInfo::eReserved:
1072             return CBioseq_Handle::fState_suppress_perm;
1073         case CPSG_BioseqInfo::eLive:
1074             return CBioseq_Handle::fState_none;
1075         default:
1076             LOG_POST(Warning << "CPSGDataLoader: uknown " << idh << " state: " << seq_info->state);
1077             return CBioseq_Handle::fState_none;
1078         }
1079     }
1080     return CBioseq_Handle::fState_none;
1081 }
1082 
1083 
1084 CDataLoader::TTSE_LockSet
GetRecords(CDataSource * data_source,const CSeq_id_Handle & idh,CDataLoader::EChoice choice)1085 CPSGDataLoader_Impl::GetRecords(CDataSource* data_source,
1086     const CSeq_id_Handle& idh,
1087     CDataLoader::EChoice choice)
1088 {
1089     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetRecordsOnce, this,
1090                               data_source, cref(idh), choice),
1091                          "GetRecords");
1092 }
1093 
1094 
1095 CDataLoader::TTSE_LockSet
GetRecordsOnce(CDataSource * data_source,const CSeq_id_Handle & idh,CDataLoader::EChoice choice)1096 CPSGDataLoader_Impl::GetRecordsOnce(CDataSource* data_source,
1097     const CSeq_id_Handle& idh,
1098     CDataLoader::EChoice choice)
1099 {
1100     CDataLoader::TTSE_LockSet locks;
1101     if (choice == CDataLoader::eOrphanAnnot) {
1102         // PSG loader doesn't provide orphan annotations
1103         return locks;
1104     }
1105     if ( CannotProcess(idh) ) {
1106         return locks;
1107     }
1108 
1109     CPSG_BioId bio_id = x_GetBioId(idh);
1110     auto context = make_shared<CPsgClientContext>();
1111     auto request = make_shared<CPSG_Request_Biodata>(move(bio_id), context);
1112 
1113     CPSG_Request_Biodata::EIncludeData inc_data = CPSG_Request_Biodata::eNoTSE;
1114     if (data_source) {
1115         inc_data = m_TSERequestMode;
1116         CDataSource::TLoadedBlob_ids loaded_blob_ids;
1117         data_source->GetLoadedBlob_ids(idh, CDataSource::fLoaded_bioseqs, loaded_blob_ids);
1118         ITERATE(CDataSource::TLoadedBlob_ids, loaded_blob_id, loaded_blob_ids) {
1119             const CPsgBlobId* pbid = dynamic_cast<const CPsgBlobId*>(&**loaded_blob_id);
1120             if (!pbid) continue;
1121             request->ExcludeTSE(CPSG_BlobId(pbid->ToPsgId()));
1122         }
1123     }
1124     request->IncludeData(inc_data);
1125     auto reply = x_ProcessRequest(request);
1126     CTSE_Lock tse_lock = x_ProcessBlobReply(reply, data_source, idh, true, true).lock;
1127 
1128     if (!tse_lock) {
1129         // TODO: return correct state with CBlobStateException
1130         if ( 0 ) {
1131             NCBI_THROW(CLoaderException, eLoaderFailed,
1132                        "error loading blob for " + idh.AsString());
1133         }
1134     }
1135     else {
1136         locks.insert(tse_lock);
1137     }
1138     return locks;
1139 }
1140 
1141 
GetBlobId(const CSeq_id_Handle & idh)1142 CRef<CPsgBlobId> CPSGDataLoader_Impl::GetBlobId(const CSeq_id_Handle& idh)
1143 {
1144     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetBlobIdOnce, this,
1145                               cref(idh)),
1146                          "GetBlobId");
1147 }
1148 
1149 
GetBlobIdOnce(const CSeq_id_Handle & idh)1150 CRef<CPsgBlobId> CPSGDataLoader_Impl::GetBlobIdOnce(const CSeq_id_Handle& idh)
1151 {
1152     if ( CannotProcess(idh) ) {
1153         return null;
1154     }
1155     string blob_id;
1156 
1157     // Check cache first.
1158     auto seq_info = x_GetBioseqInfo(idh);
1159     if (seq_info && !seq_info->blob_id.empty()) {
1160         blob_id = seq_info->blob_id;
1161     }
1162     else {
1163         CPSG_BioId bio_id = x_GetBioId(idh);
1164         auto context = make_shared<CPsgClientContext>();
1165         auto request = make_shared<CPSG_Request_Biodata>(move(bio_id), context);
1166         request->IncludeData(CPSG_Request_Biodata::eNoTSE);
1167         auto reply = x_ProcessRequest(request);
1168         blob_id = x_ProcessBlobReply(reply, nullptr, idh, true).blob_id;
1169     }
1170     CRef<CPsgBlobId> ret;
1171     if (!blob_id.empty()) {
1172         ret.Reset(new CPsgBlobId(blob_id));
1173     }
1174     return ret;
1175 }
1176 
1177 
1178 static bool x_IsLocalCDDEntryId(const CPsgBlobId& blob_id);
1179 static bool x_ParseLocalCDDEntryId(const CPsgBlobId& blob_id,
1180                                    CSeq_id_Handle& gi, CSeq_id_Handle& acc_ver);
1181 static CTSE_Lock x_CreateLocalCDDEntry(CDataSource* data_source,
1182                                        const CSeq_id_Handle& gi,
1183                                        const CSeq_id_Handle& acc_ver);
1184 
1185 
1186 
1187 static bool s_GetBlobByIdShouldFail = false;
1188 
SetGetBlobByIdShouldFail(bool value)1189 void CPSGDataLoader_Impl::SetGetBlobByIdShouldFail(bool value)
1190 {
1191     s_GetBlobByIdShouldFail = value;
1192 }
1193 
1194 
GetGetBlobByIdShouldFail()1195 bool CPSGDataLoader_Impl::GetGetBlobByIdShouldFail()
1196 {
1197     return s_GetBlobByIdShouldFail;
1198 }
1199 
1200 
GetBlobById(CDataSource * data_source,const CPsgBlobId & blob_id)1201 CTSE_Lock CPSGDataLoader_Impl::GetBlobById(CDataSource* data_source, const CPsgBlobId& blob_id)
1202 {
1203     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetBlobByIdOnce, this,
1204                               data_source, cref(blob_id)),
1205                          "GetBlobById",
1206                          GetGetBlobByIdShouldFail()? 1: 0);
1207 }
1208 
1209 
GetBlobByIdOnce(CDataSource * data_source,const CPsgBlobId & blob_id)1210 CTSE_Lock CPSGDataLoader_Impl::GetBlobByIdOnce(CDataSource* data_source, const CPsgBlobId& blob_id)
1211 {
1212     if (!data_source) return CTSE_Lock();
1213 
1214     if ( GetGetBlobByIdShouldFail() ) {
1215         _TRACE("GetBlobById("<<blob_id.ToPsgId()<<") should fail");
1216     }
1217 #ifdef LOCK4GET
1218     CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(&blob_id);
1219     CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
1220     if ( load_lock.IsLoaded() ) {
1221         _TRACE("GetBlobById() already loaded " << blob_id.ToPsgId());
1222         return load_lock;
1223     }
1224 #else
1225     CTSE_LoadLock load_lock;
1226     {{
1227         CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(&blob_id);
1228         load_lock = data_source->GetTSE_LoadLockIfLoaded(dl_blob_id);
1229         if ( load_lock && load_lock.IsLoaded() ) {
1230             _TRACE("GetBlobById() already loaded " << blob_id.ToPsgId());
1231             return load_lock;
1232         }
1233     }}
1234 #endif
1235 
1236     CTSE_Lock ret;
1237     if ( x_IsLocalCDDEntryId(blob_id) ) {
1238         if ( s_GetDebugLevel() >= 5 ) {
1239             LOG_POST(Info<<"PSG loader: Re-loading CDD blob: " << blob_id.ToString());
1240         }
1241         CSeq_id_Handle gi, acc_ver;
1242         if ( x_ParseLocalCDDEntryId(blob_id, gi, acc_ver) ) {
1243             ret = x_CreateLocalCDDEntry(data_source, gi, acc_ver);
1244         }
1245     }
1246     else {
1247         CPSG_BlobId bid(blob_id.ToPsgId());
1248         auto context = make_shared<CPsgClientContext>();
1249         auto request = make_shared<CPSG_Request_Blob>(bid, context);
1250         request->IncludeData(m_TSERequestMode);
1251         auto reply = x_ProcessRequest(request);
1252         ret = x_ProcessBlobReply(reply, data_source, CSeq_id_Handle(), true, false, &load_lock).lock;
1253     }
1254     if (!ret) {
1255         _TRACE("Failed to load blob for " << blob_id.ToPsgId()<<" @ "<<CStackTrace());
1256         NCBI_THROW(CLoaderException, eLoaderFailed,
1257                    "CPSGDataLoader::GetBlobById("+blob_id.ToPsgId()+") failed");
1258     }
1259     return ret;
1260 }
1261 
1262 
1263 class CPSG_Blob_Task : public CPSG_Task
1264 {
1265 public:
CPSG_Blob_Task(TReply reply,CPSG_TaskGroup & group,const CSeq_id_Handle & idh,CDataSource * data_source,CPSGDataLoader_Impl & loader,bool lock_asap=false,CTSE_LoadLock * load_lock_ptr=nullptr)1266     CPSG_Blob_Task(
1267         TReply reply,
1268         CPSG_TaskGroup& group,
1269         const CSeq_id_Handle& idh,
1270         CDataSource* data_source,
1271         CPSGDataLoader_Impl& loader,
1272         bool lock_asap = false,
1273         CTSE_LoadLock* load_lock_ptr = nullptr)
1274         : CPSG_Task(reply, group),
1275         m_Id(idh),
1276         m_DataSource(data_source),
1277         m_Loader(loader),
1278         m_LockASAP(lock_asap),
1279         m_LoadLockPtr(load_lock_ptr)
1280     {
1281     }
1282 
~CPSG_Blob_Task(void)1283     ~CPSG_Blob_Task(void) override {}
1284 
1285     struct SAutoReleaseLock {
SAutoReleaseLockCPSG_Blob_Task::SAutoReleaseLock1286         SAutoReleaseLock(bool lock_asap, CTSE_LoadLock*& lock_ptr)
1287             : m_LockPtr(lock_ptr)
1288             {
1289                 if ( lock_asap && !m_LockPtr ) {
1290                     m_LockPtr = &m_LocalLock;
1291                 }
1292             }
~SAutoReleaseLockCPSG_Blob_Task::SAutoReleaseLock1293         ~SAutoReleaseLock()
1294             {
1295                 m_LockPtr = 0;
1296             }
1297 
1298         CTSE_LoadLock m_LocalLock;
1299         CTSE_LoadLock*& m_LockPtr;
1300     };
1301 
1302     typedef pair<shared_ptr<CPSG_BlobInfo>, shared_ptr<CPSG_BlobData>> TBlobSlot;
1303     typedef map<string, TBlobSlot> TTSEBlobMap; // by PSG blob_id
1304     typedef map<string, map<TChunkId, TBlobSlot>> TChunkBlobMap; // by id2_info, id2_chunk
1305 
1306     CSeq_id_Handle m_Id;
1307     shared_ptr<CPSG_SkippedBlob> m_Skipped;
1308     CPSGDataLoader_Impl::SReplyResult m_ReplyResult;
1309     shared_ptr<SPsgBlobInfo> m_PsgBlobInfo;
1310 
1311     const TBlobSlot* GetTSESlot(const string& psg_id) const;
1312     const TBlobSlot* GetChunkSlot(const string& id2_info, TChunkId chunk_id) const;
1313     const TBlobSlot* GetBlobSlot(const CPSG_DataId& id) const;
1314     TBlobSlot* SetBlobSlot(const CPSG_DataId& id);
GetLoadLock() const1315     CTSE_LoadLock& GetLoadLock() const
1316         {
1317             _ASSERT(m_LoadLockPtr);
1318             return *m_LoadLockPtr;
1319         }
1320     void ObtainLoadLock();
1321     bool GotBlobData(const string& psg_blob_id) const;
1322     CPSGDataLoader_Impl::SReplyResult WaitForSkipped(void);
1323 
Finish(void)1324     void Finish(void) override
1325     {
1326         m_Skipped.reset();
1327         m_ReplyResult = CPSGDataLoader_Impl::SReplyResult();
1328         m_PsgBlobInfo.reset();
1329         m_TSEBlobMap.clear();
1330         m_ChunkBlobMap.clear();
1331         m_BlobIdMap.clear();
1332     }
1333 
SetDLBlobId(const string & psg_blob_id,CDataLoader::TBlobId dl_blob_id)1334     void SetDLBlobId(const string& psg_blob_id, CDataLoader::TBlobId dl_blob_id)
1335     {
1336         m_BlobIdMap[psg_blob_id] = dl_blob_id;
1337     }
1338 
GetDLBlobId(const string & psg_blob_id) const1339     CDataLoader::TBlobId GetDLBlobId(const string& psg_blob_id) const
1340     {
1341         auto iter = m_BlobIdMap.find(psg_blob_id);
1342         if ( iter != m_BlobIdMap.end() ) {
1343             return iter->second;
1344         }
1345         return CDataLoader::TBlobId(new CPsgBlobId(psg_blob_id));
1346     }
1347 
1348 protected:
1349     void DoExecute(void) override;
1350     void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override;
1351 
1352 private:
1353     CDataSource* m_DataSource;
1354     CPSGDataLoader_Impl& m_Loader;
1355     bool m_LockASAP;
1356     CTSE_LoadLock* m_LoadLockPtr;
1357     TTSEBlobMap m_TSEBlobMap;
1358     TChunkBlobMap m_ChunkBlobMap;
1359     map<string, CDataLoader::TBlobId> m_BlobIdMap;
1360 };
1361 
1362 
GetTSESlot(const string & blob_id) const1363 const CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::GetTSESlot(const string& blob_id) const
1364 {
1365     auto iter = m_TSEBlobMap.find(blob_id);
1366     if ( iter != m_TSEBlobMap.end() ) {
1367         return &iter->second;
1368     }
1369     return 0;
1370 }
1371 
1372 
GetChunkSlot(const string & id2_info,TChunkId chunk_id) const1373 const CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::GetChunkSlot(const string& id2_info,
1374                                                               TChunkId chunk_id) const
1375 {
1376     auto iter = m_ChunkBlobMap.find(id2_info);
1377     if ( iter != m_ChunkBlobMap.end() ) {
1378         auto iter2 = iter->second.find(chunk_id);
1379         if ( iter2 != iter->second.end() ) {
1380             return &iter2->second;
1381         }
1382     }
1383     return 0;
1384 }
1385 
1386 
GetBlobSlot(const CPSG_DataId & id) const1387 const CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::GetBlobSlot(const CPSG_DataId& id) const
1388 {
1389     if ( auto tse_id = dynamic_cast<const CPSG_BlobId*>(&id) ) {
1390         return GetTSESlot(tse_id->GetId());
1391     }
1392     else if ( auto chunk_id = dynamic_cast<const CPSG_ChunkId*>(&id) ) {
1393         return GetChunkSlot(chunk_id->GetId2Info(), chunk_id->GetId2Chunk());
1394     }
1395     return 0;
1396 }
1397 
1398 
SetBlobSlot(const CPSG_DataId & id)1399 CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::SetBlobSlot(const CPSG_DataId& id)
1400 {
1401     if ( auto tse_id = dynamic_cast<const CPSG_BlobId*>(&id) ) {
1402         _TRACE("Blob slot for tse_id="<<tse_id->GetId());
1403         return &m_TSEBlobMap[tse_id->GetId()];
1404     }
1405     else if ( auto chunk_id = dynamic_cast<const CPSG_ChunkId*>(&id) ) {
1406         _TRACE("Blob slot for id2_info="<<chunk_id->GetId2Info()<<" chunk="<<chunk_id->GetId2Chunk());
1407         return &m_ChunkBlobMap[chunk_id->GetId2Info()][chunk_id->GetId2Chunk()];
1408     }
1409     return 0;
1410 }
1411 
1412 
GotBlobData(const string & psg_blob_id) const1413 bool CPSG_Blob_Task::GotBlobData(const string& psg_blob_id) const
1414 {
1415     const TBlobSlot* main_blob_slot = GetTSESlot(psg_blob_id);
1416     if ( !main_blob_slot || !main_blob_slot->first ) {
1417         // no TSE blob props yet
1418         if ( s_GetDebugLevel() >= 6 ) {
1419             LOG_POST("GotBlobData("<<psg_blob_id<<"): no TSE blob props");
1420         }
1421         return false;
1422     }
1423     if ( main_blob_slot->second ) {
1424         // got TSE blob data
1425         if ( s_GetDebugLevel() >= 6 ) {
1426             LOG_POST("GotBlobData("<<psg_blob_id<<"): got TSE blob data");
1427         }
1428         return true;
1429     }
1430     auto id2_info = main_blob_slot->first->GetId2Info();
1431     if ( id2_info.empty() ) {
1432         // TSE doesn't have split info
1433         if ( s_GetDebugLevel() >= 6 ) {
1434             LOG_POST("GotBlobData("<<psg_blob_id<<"): not split");
1435         }
1436         return false;
1437     }
1438     const TBlobSlot* split_blob_slot = GetChunkSlot(id2_info, kSplitInfoChunkId);
1439     if ( !split_blob_slot || !split_blob_slot->second ) {
1440         // no split info blob data yet
1441         if ( s_GetDebugLevel() >= 6 ) {
1442             LOG_POST("GotBlobData("<<psg_blob_id<<"): no split blob data");
1443         }
1444         return false;
1445     }
1446     else {
1447         // got split info blob data
1448         if ( s_GetDebugLevel() >= 6 ) {
1449             LOG_POST("GotBlobData("<<psg_blob_id<<"): got split blob data");
1450         }
1451         return true;
1452     }
1453 }
1454 
1455 
ObtainLoadLock()1456 void CPSG_Blob_Task::ObtainLoadLock()
1457 {
1458     if ( !m_LockASAP ) {
1459         // load lock is not requested
1460         return;
1461     }
1462     if ( GetLoadLock() ) {
1463         // load lock already obtained
1464         return;
1465     }
1466     if ( m_ReplyResult.blob_id.empty() ) {
1467         // blob id is not known yet
1468         return;
1469     }
1470     if ( !GotBlobData(m_ReplyResult.blob_id) ) {
1471         return;
1472     }
1473     if ( s_GetDebugLevel() >= 6 ) {
1474         LOG_POST("ObtainLoadLock("<<m_ReplyResult.blob_id<<"): getting load lock");
1475     }
1476     GetLoadLock() = m_DataSource->GetTSE_LoadLock(GetDLBlobId(m_ReplyResult.blob_id));
1477     if ( s_GetDebugLevel() >= 6 ) {
1478         LOG_POST("ObtainLoadLock("<<m_ReplyResult.blob_id<<"): obtained load lock");
1479     }
1480 }
1481 
1482 
DoExecute(void)1483 void CPSG_Blob_Task::DoExecute(void)
1484 {
1485     _TRACE("CPSG_Blob_Task::DoExecute()");
1486     if (!CheckReplyStatus()) return;
1487     SAutoReleaseLock lock_guard(m_LockASAP, m_LoadLockPtr);
1488     ReadReply();
1489     if (m_Status == eFailed) return;
1490     if (m_Skipped) {
1491         m_Status = eCompleted;
1492         return;
1493     }
1494 
1495     if (m_ReplyResult.blob_id.empty()) {
1496         // If the source request was for blob rather than bioseq, there may be no bioseq info
1497         // and blob_id stays empty.
1498         if (m_Reply->GetRequest()->GetType() == "blob") {
1499             shared_ptr<const CPSG_Request_Blob> blob_request = static_pointer_cast<const CPSG_Request_Blob>(m_Reply->GetRequest());
1500             if (blob_request) {
1501                 m_ReplyResult.blob_id = blob_request->GetId();
1502             }
1503         }
1504     }
1505     if (m_ReplyResult.blob_id.empty()) {
1506         _TRACE("no blob_id");
1507         m_Status = eCompleted;
1508         return;
1509     }
1510 
1511     _TRACE("tse_id="<<m_ReplyResult.blob_id);
1512     if ( !m_LoadLockPtr ) {
1513         // to TSE requested
1514         m_Status = eCompleted;
1515         return;
1516     }
1517 
1518     const TBlobSlot* main_blob_slot = GetTSESlot(m_ReplyResult.blob_id);
1519     if ( !main_blob_slot || !main_blob_slot->first ) {
1520         _TRACE("No blob info for tse_id="<<m_ReplyResult.blob_id);
1521         m_Status = eFailed;
1522         return;
1523     }
1524 
1525     const TBlobSlot* split_blob_slot = 0;
1526     auto id2_info = main_blob_slot->first->GetId2Info();
1527     if ( !id2_info.empty() ) {
1528         split_blob_slot = GetChunkSlot(id2_info, kSplitInfoChunkId);
1529         if ( !split_blob_slot || !split_blob_slot->first ) {
1530             _TRACE("No split info tse_id="<<m_ReplyResult.blob_id<<" id2_info="<<id2_info);
1531         }
1532     }
1533 
1534     // Find or create main blob-info entry.
1535     m_PsgBlobInfo = m_Loader.m_BlobMap->FindBlob(m_ReplyResult.blob_id);
1536     if (!m_PsgBlobInfo) {
1537         m_PsgBlobInfo = make_shared<SPsgBlobInfo>(*main_blob_slot->first);
1538         m_Loader.m_BlobMap->AddBlob(m_ReplyResult.blob_id, m_PsgBlobInfo);
1539     }
1540 
1541     if (!m_DataSource) {
1542         _TRACE("No data source for tse_id="<<m_ReplyResult.blob_id);
1543         // No data to load, just bioseq-info.
1544         m_Status = eCompleted;
1545         return;
1546     }
1547 
1548     // Read blob data (if any) and pass to the data source.
1549     if ( CPSGDataLoader_Impl::GetGetBlobByIdShouldFail() ) {
1550         m_Status = eFailed;
1551         return;
1552     }
1553     CDataLoader::TBlobId dl_blob_id = GetDLBlobId(m_ReplyResult.blob_id);
1554     CTSE_LoadLock load_lock;
1555     if ( GetLoadLock() && GetLoadLock()->GetBlobId() == dl_blob_id ) {
1556         load_lock = GetLoadLock();
1557     }
1558     else {
1559         load_lock = m_DataSource->GetTSE_LoadLock(dl_blob_id);
1560     }
1561     if (!load_lock) {
1562         _TRACE("Cannot get TSE load lock for tse_id="<<m_ReplyResult.blob_id);
1563         m_Status = eFailed;
1564         return;
1565     }
1566     CPSGDataLoader_Impl::EMainChunkType main_chunk_type = CPSGDataLoader_Impl::eNoDelayedMainChunk;
1567     if ( load_lock.IsLoaded() ) {
1568         if ( load_lock->x_NeedsDelayedMainChunk() &&
1569              !load_lock->GetSplitInfo().GetChunk(kDelayedMain_ChunkId).IsLoaded() ) {
1570             main_chunk_type = CPSGDataLoader_Impl::eDelayedMainChunk;
1571         }
1572         else {
1573             _TRACE("Already loaded tse_id="<<m_ReplyResult.blob_id);
1574             m_ReplyResult.lock = load_lock;
1575             m_Status = eCompleted;
1576             return;
1577         }
1578     }
1579 
1580     if ( split_blob_slot && split_blob_slot->first && split_blob_slot->second ) {
1581         auto& blob_id = *load_lock->GetBlobId();
1582         dynamic_cast<CPsgBlobId&>(const_cast<CBlobId&>(blob_id)).SetId2Info(id2_info);
1583         m_Loader.x_ReadBlobData(*m_PsgBlobInfo,
1584                                 *split_blob_slot->first,
1585                                 *split_blob_slot->second,
1586                                 load_lock,
1587                                 CPSGDataLoader_Impl::eIsSplitInfo);
1588         CTSE_Split_Info& tse_split_info = load_lock->GetSplitInfo();
1589         for ( auto& chunk_slot : m_ChunkBlobMap[id2_info] ) {
1590             TChunkId chunk_id = chunk_slot.first;
1591             if ( chunk_id == kSplitInfoChunkId ) {
1592                 continue;
1593             }
1594             if ( !chunk_slot.second.first || !chunk_slot.second.second ) {
1595                 continue;
1596             }
1597             CTSE_Chunk_Info* chunk = 0;
1598             try {
1599                 chunk = &tse_split_info.GetChunk(chunk_id);
1600             }
1601             catch ( CException& /*ignored*/ ) {
1602             }
1603             if ( !chunk || chunk->IsLoaded() ) {
1604                 continue;
1605             }
1606 
1607             auto_ptr<CObjectIStream> in
1608                 (CPSGDataLoader_Impl::GetBlobDataStream(*chunk_slot.second.first,
1609                                                         *chunk_slot.second.second));
1610             CRef<CID2S_Chunk> id2_chunk(new CID2S_Chunk);
1611             *in >> *id2_chunk;
1612             if ( s_GetDebugLevel() >= 8 ) {
1613                 LOG_POST(Info<<"PSG loader: TSE "<<chunk->GetBlobId().ToString()<<" "<<
1614                          " chunk "<<chunk->GetChunkId()<<" "<<MSerial_AsnText<<*id2_chunk);
1615             }
1616 
1617             CSplitParser::Load(*chunk, *id2_chunk);
1618             chunk->SetLoaded();
1619         }
1620     }
1621     else if ( main_blob_slot && main_blob_slot->first && main_blob_slot->second ) {
1622         m_Loader.x_ReadBlobData(*m_PsgBlobInfo,
1623                                 *main_blob_slot->first,
1624                                 *main_blob_slot->second,
1625                                 load_lock,
1626                                 CPSGDataLoader_Impl::eNoSplitInfo);
1627     }
1628     else {
1629         _TRACE("No data for tse_id="<<m_ReplyResult.blob_id);
1630         load_lock.Reset();
1631     }
1632     if ( load_lock ) {
1633         m_Loader.x_SetLoaded(load_lock, main_chunk_type);
1634         m_ReplyResult.lock = load_lock;
1635         m_Status = eCompleted;
1636     }
1637     else {
1638         m_Status = eFailed;
1639     }
1640 }
1641 
1642 
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)1643 void CPSG_Blob_Task::ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)
1644 {
1645     switch (item->GetType()) {
1646     case CPSG_ReplyItem::eBioseqInfo:
1647     {
1648         // Only one bioseq-info is allowed per reply.
1649         shared_ptr<CPSG_BioseqInfo> bioseq_info = static_pointer_cast<CPSG_BioseqInfo>(item);
1650         m_ReplyResult.blob_id = bioseq_info->GetBlobId().GetId();
1651         ObtainLoadLock();
1652         m_Loader.m_BioseqCache->Add(*bioseq_info, m_Id);
1653         break;
1654     }
1655     case CPSG_ReplyItem::eBlobInfo:
1656     {
1657         auto blob_info = static_pointer_cast<CPSG_BlobInfo>(item);
1658         _TRACE("Blob info: "<<blob_info->GetId()->Repr());
1659         if ( auto slot = SetBlobSlot(*blob_info->GetId()) ) {
1660             slot->first = blob_info;
1661             ObtainLoadLock();
1662         }
1663         break;
1664     }
1665     case CPSG_ReplyItem::eBlobData:
1666     {
1667         shared_ptr<CPSG_BlobData> data = static_pointer_cast<CPSG_BlobData>(item);
1668         _TRACE("Blob data: "<<data->GetId()->Repr());
1669         if ( auto slot = SetBlobSlot(*data->GetId()) ) {
1670             slot->second = data;
1671             ObtainLoadLock();
1672         }
1673         break;
1674     }
1675     case CPSG_ReplyItem::eSkippedBlob:
1676     {
1677         // Only main blob can be skipped.
1678         _ASSERT(!m_Skipped);
1679         m_Skipped = static_pointer_cast<CPSG_SkippedBlob>(item);
1680         break;
1681     }
1682     default:
1683     {
1684         break;
1685     }
1686     }
1687 }
1688 
1689 
WaitForSkipped(void)1690 CPSGDataLoader_Impl::SReplyResult CPSG_Blob_Task::WaitForSkipped(void)
1691 {
1692     CPSGDataLoader_Impl::SReplyResult ret;
1693     ret.blob_id = m_ReplyResult.blob_id;
1694     if (!m_DataSource) return ret;
1695 
1696     CDataLoader::TBlobId dl_blob_id = GetDLBlobId(ret.blob_id);
1697     CTSE_LoadLock load_lock;
1698     _ASSERT(m_Skipped);
1699     CPSG_SkippedBlob::EReason skip_reason = m_Skipped->GetReason();
1700     switch (skip_reason) {
1701     case CPSG_SkippedBlob::eInProgress:
1702         // Try to wait for the blob to be loaded.
1703         load_lock = m_DataSource->GetLoadedTSE_Lock(dl_blob_id, CTimeout(1));
1704         if ( !load_lock && s_GetDebugLevel() >= 6 ) {
1705             LOG_POST("CPSGDataLoader: 'in progress' blob is not loaded: "<<dl_blob_id.ToString());
1706         }
1707         break;
1708     case CPSG_SkippedBlob::eSent:
1709         // Try to wait for the blob to be loaded.
1710         load_lock = m_DataSource->GetLoadedTSE_Lock(dl_blob_id, CTimeout(.2));
1711         if ( !load_lock && s_GetDebugLevel() >= 6 ) {
1712             LOG_POST("CPSGDataLoader: 'sent' blob is not loaded: "<<dl_blob_id.ToString());
1713         }
1714         break;
1715     case CPSG_SkippedBlob::eExcluded:
1716         // Check if the blob is already loaded, force loading if necessary.
1717         load_lock = m_DataSource->GetTSE_LoadLockIfLoaded(dl_blob_id);
1718         if ( !load_lock && s_GetDebugLevel() >= 6 ) {
1719             LOG_POST("CPSGDataLoader: 'excluded' blob is not loaded: "<<dl_blob_id.ToString());
1720         }
1721         break;
1722     default: // unknown
1723         return ret;
1724     }
1725     if (load_lock && load_lock.IsLoaded()) {
1726         m_Skipped.reset();
1727         ret.lock = load_lock;
1728     }
1729     return ret;
1730 }
1731 
1732 
GetBlobs(CDataSource * data_source,TTSE_LockSets & tse_sets)1733 void CPSGDataLoader_Impl::GetBlobs(CDataSource* data_source, TTSE_LockSets& tse_sets)
1734 {
1735     CallWithRetry(bind(&CPSGDataLoader_Impl::GetBlobsOnce, this,
1736                        data_source, ref(tse_sets)),
1737                   "GetBlobs");
1738 }
1739 
1740 
GetBlobsOnce(CDataSource * data_source,TTSE_LockSets & tse_sets)1741 void CPSGDataLoader_Impl::GetBlobsOnce(CDataSource* data_source, TTSE_LockSets& tse_sets)
1742 {
1743     if (!data_source) return;
1744     auto context = make_shared<CPsgClientContext_Bulk>();
1745     CPSG_TaskGroup group(*m_ThreadPool);
1746     ITERATE(TTSE_LockSets, tse_set, tse_sets) {
1747         const CSeq_id_Handle& id = tse_set->first;
1748         CPSG_BioId bio_id = x_GetBioId(id);
1749         auto request = make_shared<CPSG_Request_Biodata>(move(bio_id), context);
1750         request->IncludeData(m_TSERequestModeBulk);
1751         auto reply = x_ProcessRequest(request);
1752         CRef<CPSG_Blob_Task> task(
1753             new CPSG_Blob_Task(reply, group, id, data_source, *this, true));
1754         group.AddTask(task);
1755     }
1756     // Waiting for skipped blobs can block all pool threads. To prevent this postpone
1757     // waiting until all other tasks are completed.
1758     typedef list<CRef<CPSG_Blob_Task>> TTasks;
1759     TTasks skipped_tasks;
1760     list<shared_ptr<CPSG_Task_Guard>> guards;
1761     while (group.HasTasks()) {
1762         CRef<CPSG_Blob_Task> task(group.GetTask<CPSG_Blob_Task>().GetNCPointerOrNull());
1763         _ASSERT(task);
1764         guards.push_back(make_shared<CPSG_Task_Guard>(*task));
1765         if (task->GetStatus() == CThreadPool_Task::eFailed) {
1766             _TRACE("Failed to get blob for " << task->m_Id);
1767             group.CancelAll();
1768             NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load blobs for "+task->m_Id.AsString());
1769         }
1770         if (task->m_Skipped) {
1771             skipped_tasks.push_back(task);
1772             continue;
1773         }
1774         SReplyResult res = task->m_ReplyResult;
1775         if (task->m_ReplyResult.lock) tse_sets[task->m_Id].insert(task->m_ReplyResult.lock);
1776     }
1777     NON_CONST_ITERATE(TTasks, it, skipped_tasks) {
1778         CPSG_Blob_Task& task = **it;
1779         SReplyResult result = task.WaitForSkipped();
1780         if (!result.lock) {
1781             // Force reloading blob
1782             result = x_RetryBlobRequest(task.m_ReplyResult.blob_id, data_source, task.m_Id);
1783         }
1784         if (result.lock) tse_sets[task.m_Id].insert(result.lock);
1785     }
1786 }
1787 
1788 
LoadChunk(CDataSource * data_source,CTSE_Chunk_Info & chunk_info)1789 void CPSGDataLoader_Impl::LoadChunk(CDataSource* data_source,
1790                                     CTSE_Chunk_Info& chunk_info)
1791 {
1792     CDataLoader::TChunkSet chunks;
1793     chunks.push_back(Ref(&chunk_info));
1794     LoadChunks(data_source, chunks);
1795 }
1796 
1797 
1798 class CPSG_LoadChunk_Task : public CPSG_Task
1799 {
1800 public:
CPSG_LoadChunk_Task(TReply reply,CPSG_TaskGroup & group,CDataLoader::TChunk chunk)1801     CPSG_LoadChunk_Task(TReply reply, CPSG_TaskGroup& group, CDataLoader::TChunk chunk)
1802         : CPSG_Task(reply, group), m_Chunk(chunk) {}
1803 
~CPSG_LoadChunk_Task(void)1804     ~CPSG_LoadChunk_Task(void) override {}
1805 
Finish(void)1806     void Finish(void) override {
1807         m_Chunk.Reset();
1808         m_BlobInfo.reset();
1809         m_BlobData.reset();
1810     }
1811 
1812 protected:
1813     void DoExecute(void) override;
1814     void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override;
1815 
1816 private:
1817     CDataLoader::TChunk m_Chunk;
1818     shared_ptr<CPSG_BlobInfo> m_BlobInfo;
1819     shared_ptr<CPSG_BlobData> m_BlobData;
1820 };
1821 
1822 
DoExecute(void)1823 void CPSG_LoadChunk_Task::DoExecute(void)
1824 {
1825     if (!CheckReplyStatus()) return;
1826     ReadReply();
1827     if (m_Status == eFailed) return;
1828 
1829     if (!m_BlobInfo || !m_BlobData) {
1830         _TRACE("Failed to get chunk info or data for blob-id " << m_Chunk->GetBlobId());
1831         m_Status = eFailed;
1832         return;
1833     }
1834 
1835     if (IsCancelled()) return;
1836     auto_ptr<CObjectIStream> in(CPSGDataLoader_Impl::GetBlobDataStream(*m_BlobInfo, *m_BlobData));
1837     if (!in.get()) {
1838         _TRACE("Failed to open chunk data stream for blob-id " << m_BlobInfo->GetId()->Repr());
1839         m_Status = eFailed;
1840         return;
1841     }
1842 
1843     CRef<CID2S_Chunk> id2_chunk(new CID2S_Chunk);
1844     *in >> *id2_chunk;
1845     if ( s_GetDebugLevel() >= 8 ) {
1846         LOG_POST(Info<<"PSG loader: TSE "<<m_Chunk->GetBlobId().ToString()<<" "<<
1847                  " chunk "<<m_Chunk->GetChunkId()<<" "<<MSerial_AsnText<<*id2_chunk);
1848     }
1849     CSplitParser::Load(*m_Chunk, *id2_chunk);
1850     m_Chunk->SetLoaded();
1851 
1852     m_Status = eCompleted;
1853 }
1854 
1855 
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)1856 void CPSG_LoadChunk_Task::ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)
1857 {
1858     switch (item->GetType()) {
1859     case CPSG_ReplyItem::eBlobInfo:
1860         m_BlobInfo = static_pointer_cast<CPSG_BlobInfo>(item);
1861         break;
1862     case CPSG_ReplyItem::eBlobData:
1863         m_BlobData = static_pointer_cast<CPSG_BlobData>(item);
1864         break;
1865     default:
1866         break;
1867     }
1868 }
1869 
1870 
1871 const char kCDDAnnotName[] = "CDD";
1872 const bool kCreateLocalCDDEntries = true;
1873 const char kLocalCDDEntryIdPrefix[] = "CDD:";
1874 const char kLocalCDDEntryIdSeparator = '|';
1875 
x_MakeLocalCDDEntryId(const CSeq_id_Handle & gi,const CSeq_id_Handle & acc_ver)1876 static string x_MakeLocalCDDEntryId(const CSeq_id_Handle& gi, const CSeq_id_Handle& acc_ver)
1877 {
1878     ostringstream str;
1879     _ASSERT(gi && gi.IsGi());
1880     str << kLocalCDDEntryIdPrefix << gi.GetGi();
1881     if ( acc_ver ) {
1882         str << kLocalCDDEntryIdSeparator << acc_ver;
1883     }
1884     return str.str();
1885 }
1886 
1887 
x_IsLocalCDDEntryId(const CPsgBlobId & blob_id)1888 static bool x_IsLocalCDDEntryId(const CPsgBlobId& blob_id)
1889 {
1890     return NStr::StartsWith(blob_id.ToPsgId(), kLocalCDDEntryIdPrefix);
1891 }
1892 
1893 
x_ParseLocalCDDEntryId(const CPsgBlobId & blob_id,CSeq_id_Handle & gi,CSeq_id_Handle & acc_ver)1894 static bool x_ParseLocalCDDEntryId(const CPsgBlobId& blob_id,
1895                                    CSeq_id_Handle& gi, CSeq_id_Handle& acc_ver)
1896 {
1897     if ( !x_IsLocalCDDEntryId(blob_id) ) {
1898         return false;
1899     }
1900     istringstream str(blob_id.ToPsgId().substr(strlen(kLocalCDDEntryIdPrefix)));
1901     TIntId gi_id = 0;
1902     str >> gi_id;
1903     if ( !gi_id ) {
1904         return false;
1905     }
1906     gi = CSeq_id_Handle::GetGiHandle(GI_FROM(TIntId, gi_id));
1907     if ( str.get() == kLocalCDDEntryIdSeparator ) {
1908         string extra;
1909         str >> extra;
1910         acc_ver = CSeq_id_Handle::GetHandle(extra);
1911     }
1912     return true;
1913 }
1914 
1915 
x_LocalCDDEntryIdToBioId(const CPsgBlobId & blob_id)1916 static CPSG_BioId x_LocalCDDEntryIdToBioId(const CPsgBlobId& blob_id)
1917 {
1918     const string& str = blob_id.ToPsgId();
1919     size_t start = strlen(kLocalCDDEntryIdPrefix);
1920     size_t end = str.find(kLocalCDDEntryIdSeparator, start);
1921     return { str.substr(start, end-start), CSeq_id::e_Gi };
1922 }
1923 
1924 
x_CreateLocalCDDEntryChunk(const CSeq_id_Handle & id1,const CSeq_id_Handle & id2)1925 static CRef<CTSE_Chunk_Info> x_CreateLocalCDDEntryChunk(const CSeq_id_Handle& id1,
1926                                                         const CSeq_id_Handle& id2)
1927 {
1928     if ( !id1 && !id2 ) {
1929         return null;
1930     }
1931     CRange<TSeqPos> range = CRange<TSeqPos>::GetWhole();
1932     CRef<CTSE_Chunk_Info> chunk(new CTSE_Chunk_Info(kDelayedMain_ChunkId));
1933     // add main annot types
1934     CAnnotName name = kCDDAnnotName;
1935     CSeqFeatData::ESubtype subtypes[] = {
1936         CSeqFeatData::eSubtype_region,
1937         CSeqFeatData::eSubtype_site
1938     };
1939     for ( auto subtype : subtypes ) {
1940         SAnnotTypeSelector type(subtype);
1941         if ( id1 ) {
1942             chunk->x_AddAnnotType(name, type, id1, range);
1943         }
1944         if ( id2 ) {
1945             chunk->x_AddAnnotType(name, type, id2, range);
1946         }
1947     }
1948     return chunk;
1949 }
1950 
1951 
x_CreateLocalCDDEntry(CDataSource * data_source,const CSeq_id_Handle & gi,const CSeq_id_Handle & acc_ver)1952 static CTSE_Lock x_CreateLocalCDDEntry(CDataSource* data_source,
1953                                        const CSeq_id_Handle& gi,
1954                                        const CSeq_id_Handle& acc_ver)
1955 {
1956     CRef<CPsgBlobId> blob_id(new CPsgBlobId(x_MakeLocalCDDEntryId(gi, acc_ver)));
1957     if ( auto chunk = x_CreateLocalCDDEntryChunk(gi, acc_ver) ) {
1958         CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(blob_id);
1959         CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
1960         if ( load_lock ) {
1961             if ( !load_lock.IsLoaded() ) {
1962                 if ( CPSGDataLoader_Impl::GetGetBlobByIdShouldFail() ) {
1963                     return CTSE_Lock();
1964                 }
1965                 load_lock->SetName(kCDDAnnotName);
1966                 load_lock->GetSplitInfo().AddChunk(*chunk);
1967                 _ASSERT(load_lock->x_NeedsDelayedMainChunk());
1968                 load_lock.SetLoaded();
1969             }
1970             return load_lock;
1971         }
1972     }
1973     return CTSE_Lock();
1974 }
1975 
1976 
x_CreateEmptyLocalCDDEntry(CDataSource * data_source,CDataLoader::TChunk chunk)1977 static void x_CreateEmptyLocalCDDEntry(CDataSource* data_source,
1978                                        CDataLoader::TChunk chunk)
1979 {
1980     CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(chunk->GetBlobId());
1981     _ASSERT(load_lock);
1982     _ASSERT(load_lock.IsLoaded());
1983     _ASSERT(load_lock->HasNoSeq_entry());
1984     CRef<CSeq_entry> entry(new CSeq_entry);
1985     entry->SetSet().SetSeq_set();
1986     if ( s_GetDebugLevel() >= 8 ) {
1987         LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
1988                  " created empty CDD entry");
1989     }
1990     load_lock->SetSeq_entry(*entry);
1991     chunk->SetLoaded();
1992 }
1993 
1994 
s_SameId(const CPSG_BlobId * id1,const CPSG_BlobId & id2)1995 static bool s_SameId(const CPSG_BlobId* id1, const CPSG_BlobId& id2)
1996 {
1997     return id1 && id1->GetId() == id2.GetId();
1998 }
1999 
2000 
x_ReadCDDChunk(CDataSource * data_source,CDataLoader::TChunk chunk,const CPSG_BlobInfo & blob_info,const CPSG_BlobData & blob_data)2001 bool CPSGDataLoader_Impl::x_ReadCDDChunk(CDataSource* data_source,
2002                                          CDataLoader::TChunk chunk,
2003                                          const CPSG_BlobInfo& blob_info,
2004                                          const CPSG_BlobData& blob_data)
2005 {
2006     _ASSERT(chunk->GetChunkId() == kDelayedMain_ChunkId);
2007     _DEBUG_ARG(const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk->GetBlobId()));
2008     _ASSERT(x_IsLocalCDDEntryId(blob_id));
2009     _ASSERT(!chunk->IsLoaded());
2010 
2011     CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(chunk->GetBlobId());
2012     if ( !load_lock ||
2013          !load_lock.IsLoaded() ||
2014          !load_lock->x_NeedsDelayedMainChunk() ) {
2015         _TRACE("Cannot make CDD entry because of wrong TSE state id="<<blob_id.ToString());
2016         return false;
2017     }
2018 
2019     unique_ptr<CObjectIStream> in(GetBlobDataStream(blob_info, blob_data));
2020     if (!in.get()) {
2021         _TRACE("Failed to open blob data stream for blob-id " << blob_id.ToString());
2022         return false;
2023     }
2024 
2025     CRef<CSeq_entry> entry(new CSeq_entry);
2026     *in >> *entry;
2027     if ( s_GetDebugLevel() >= 8 ) {
2028         LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2029                  MSerial_AsnText<<*entry);
2030     }
2031     load_lock->SetSeq_entry(*entry);
2032     chunk->SetLoaded();
2033     return true;
2034 }
2035 
2036 
2037 shared_ptr<CPSG_Request_Blob>
x_MakeLoadLocalCDDEntryRequest(CDataSource * data_source,CDataLoader::TChunk chunk,shared_ptr<CPsgClientContext_Bulk> context)2038 CPSGDataLoader_Impl::x_MakeLoadLocalCDDEntryRequest(CDataSource* data_source,
2039                                                     CDataLoader::TChunk chunk,
2040                                                     shared_ptr<CPsgClientContext_Bulk> context)
2041 {
2042     _ASSERT(chunk->GetChunkId() == kDelayedMain_ChunkId);
2043     const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk->GetBlobId());
2044     _ASSERT(x_IsLocalCDDEntryId(blob_id));
2045     _ASSERT(!chunk->IsLoaded());
2046 
2047     bool failed = false;
2048     shared_ptr<CPSG_NamedAnnotInfo> cdd_info;
2049 
2050     // load CDD blob id
2051     {{
2052         CPSG_BioId bio_id = x_LocalCDDEntryIdToBioId(blob_id);
2053         CPSG_Request_NamedAnnotInfo::TAnnotNames names = { kCDDAnnotName };
2054         _ASSERT(bio_id.GetId().find('|') == NPOS);
2055         auto request = make_shared<CPSG_Request_NamedAnnotInfo>(bio_id, names, context);
2056         request->IncludeData(m_TSERequestMode);
2057         auto reply = x_ProcessRequest(request);
2058         shared_ptr<CPSG_BioseqInfo> bioseq_info;
2059         shared_ptr<CPSG_BlobInfo> blob_info;
2060         shared_ptr<CPSG_BlobData> blob_data;
2061         for (;;) {
2062             auto reply_item = reply->GetNextItem(DEFAULT_DEADLINE);
2063             if (!reply_item) continue;
2064             if (reply_item->GetType() == CPSG_ReplyItem::eEndOfReply) break;
2065             EPSG_Status status = reply_item->GetStatus(0);
2066             if (status != EPSG_Status::eSuccess && status != EPSG_Status::eInProgress) {
2067                 ReportStatus(reply_item, status);
2068                 if ( status == EPSG_Status::eNotFound ) {
2069                     continue;
2070                 }
2071                 failed = true;
2072                 break;
2073             }
2074             if (status == EPSG_Status::eInProgress) {
2075                 status = reply_item->GetStatus(CDeadline::eInfinite);
2076             }
2077             if (status != EPSG_Status::eSuccess) {
2078                 ReportStatus(reply_item, status);
2079                 failed = true;
2080                 break;
2081             }
2082             if (reply_item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2083                 bioseq_info = static_pointer_cast<CPSG_BioseqInfo>(reply_item);
2084             }
2085             if (reply_item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2086                 auto na_info = static_pointer_cast<CPSG_NamedAnnotInfo>(reply_item);
2087                 if ( NStr::EqualNocase(na_info->GetName(), kCDDAnnotName) ) {
2088                     cdd_info = na_info;
2089                 }
2090             }
2091             if (reply_item->GetType() == CPSG_ReplyItem::eBlobInfo) {
2092                 blob_info = static_pointer_cast<CPSG_BlobInfo>(reply_item);
2093             }
2094             if (reply_item->GetType() == CPSG_ReplyItem::eBlobData) {
2095                 blob_data = static_pointer_cast<CPSG_BlobData>(reply_item);
2096             }
2097         }
2098         if ( failed ) {
2099             // TODO
2100             x_CreateEmptyLocalCDDEntry(data_source, chunk);
2101             return nullptr;
2102         }
2103         if ( !cdd_info ) {
2104             x_CreateEmptyLocalCDDEntry(data_source, chunk);
2105             return nullptr;
2106         }
2107         // see if we got blob already
2108         if ( blob_info && s_SameId(blob_info->GetId<CPSG_BlobId>(), cdd_info->GetBlobId()) &&
2109              blob_data && s_SameId(blob_data->GetId<CPSG_BlobId>(), cdd_info->GetBlobId()) ) {
2110             _TRACE("Got CDD entry: "<<cdd_info->GetBlobId().Repr());
2111             if ( x_ReadCDDChunk(data_source, chunk, *blob_info, *blob_data) ) {
2112                 return nullptr;
2113             }
2114         }
2115     }}
2116 
2117     // load CDD blob request
2118     return make_shared<CPSG_Request_Blob>(cdd_info->GetBlobId(), context);
2119 }
2120 
2121 
LoadChunks(CDataSource * data_source,const CDataLoader::TChunkSet & chunks)2122 void CPSGDataLoader_Impl::LoadChunks(CDataSource* data_source,
2123                                      const CDataLoader::TChunkSet& chunks)
2124 {
2125     CallWithRetry(bind(&CPSGDataLoader_Impl::LoadChunksOnce, this,
2126                        data_source, cref(chunks)),
2127                   "LoadChunks");
2128 }
2129 
2130 
LoadChunksOnce(CDataSource * data_source,const CDataLoader::TChunkSet & chunks)2131 void CPSGDataLoader_Impl::LoadChunksOnce(CDataSource* data_source,
2132                                      const CDataLoader::TChunkSet& chunks)
2133 {
2134     if (chunks.empty()) return;
2135 
2136     typedef map<void*, CDataLoader::TChunk> TChunkMap;
2137     TChunkMap chunk_map;
2138     auto context = make_shared<CPsgClientContext_Bulk>();
2139     ITERATE(CDataLoader::TChunkSet, it, chunks) {
2140         const CTSE_Chunk_Info& chunk = **it;
2141         if ( chunk.IsLoaded() ) {
2142             continue;
2143         }
2144         if ( chunk.GetChunkId() == kMasterWGS_ChunkId ) {
2145             CWGSMasterSupport::LoadWGSMaster(data_source->GetDataLoader(), *it);
2146             continue;
2147         }
2148         if ( chunk.GetChunkId() == kDelayedMain_ChunkId ) {
2149             const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId());
2150             shared_ptr<CPSG_Request_Blob> request;
2151             if ( x_IsLocalCDDEntryId(blob_id) ) {
2152                 request = x_MakeLoadLocalCDDEntryRequest(data_source, *it, context);
2153                 if ( !request ) {
2154                     continue;
2155                 }
2156             }
2157             else {
2158                 request = make_shared<CPSG_Request_Blob>(blob_id.ToPsgId(), context);
2159             }
2160             request->IncludeData(m_TSERequestMode);
2161             chunk_map[request.get()] = *it;
2162             x_SendRequest(request);
2163         }
2164         else {
2165             const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId());
2166             auto request = make_shared<CPSG_Request_Chunk>(CPSG_ChunkId(chunk.GetChunkId(),
2167                                                                         blob_id.GetId2Info()),
2168                                                            context);
2169             chunk_map[request.get()] = *it;
2170             x_SendRequest(request);
2171         }
2172     }
2173 
2174     CPSG_TaskGroup group(*m_ThreadPool);
2175     list<shared_ptr<CPSG_Task_Guard>> guards;
2176     while (!chunk_map.empty()) {
2177         auto reply = context->GetReply();
2178         if (!reply) continue;
2179         TChunkMap::iterator chunk_it = chunk_map.find((void*)reply->GetRequest().get());
2180         _ASSERT(chunk_it != chunk_map.end());
2181         CDataLoader::TChunk chunk = chunk_it->second;
2182         chunk_map.erase(chunk_it);
2183         if ( chunk->GetChunkId() == kDelayedMain_ChunkId ) {
2184             CRef<CPSG_Blob_Task> task(new CPSG_Blob_Task(reply, group, CSeq_id_Handle(), data_source, *this, true));
2185             task->SetDLBlobId(dynamic_cast<const CPSG_Request_Blob&>(*reply->GetRequest()).GetBlobId().GetId(),
2186                               chunk->GetBlobId());
2187             guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2188             group.AddTask(task);
2189         }
2190         else {
2191             CRef<CPSG_LoadChunk_Task> task(new CPSG_LoadChunk_Task(reply, group, chunk));
2192             guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2193             group.AddTask(task);
2194         }
2195     }
2196     group.WaitAll();
2197     // check if all chunks are loaded
2198     ITERATE(CDataLoader::TChunkSet, it, chunks) {
2199         const CTSE_Chunk_Info & chunk = **it;
2200         if (!chunk.IsLoaded()) {
2201             _TRACE("Failed to load chunk " << chunk.GetChunkId() << " of " << dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId()).ToPsgId());
2202             NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load some chunks");
2203         }
2204     }
2205 }
2206 
2207 
2208 class CPSG_AnnotRecordsNA_Task : public CPSG_Task
2209 {
2210 public:
CPSG_AnnotRecordsNA_Task(TReply reply,CPSG_TaskGroup & group)2211     CPSG_AnnotRecordsNA_Task( TReply reply, CPSG_TaskGroup& group)
2212         : CPSG_Task(reply, group) {}
2213 
~CPSG_AnnotRecordsNA_Task(void)2214     ~CPSG_AnnotRecordsNA_Task(void) override {}
2215 
2216     shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
2217     list<shared_ptr<CPSG_NamedAnnotInfo>> m_AnnotInfo;
2218 
Finish(void)2219     void Finish(void) override {
2220         m_BioseqInfo.reset();
2221         m_AnnotInfo.clear();
2222     }
2223 
2224 protected:
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)2225     void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2226         if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2227             m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
2228         }
2229         if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2230             m_AnnotInfo.push_back(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
2231         }
2232     }
2233 };
2234 
2235 class CPSG_AnnotRecordsCDD_Task : public CPSG_Task
2236 {
2237 public:
CPSG_AnnotRecordsCDD_Task(TReply reply,CPSG_TaskGroup & group)2238     CPSG_AnnotRecordsCDD_Task( TReply reply, CPSG_TaskGroup& group)
2239         : CPSG_Task(reply, group) {}
2240 
~CPSG_AnnotRecordsCDD_Task(void)2241     ~CPSG_AnnotRecordsCDD_Task(void) override {}
2242 
2243     shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
2244     list<shared_ptr<CPSG_NamedAnnotInfo>> m_AnnotInfo;
2245 
Finish(void)2246     void Finish(void) override {
2247         m_BioseqInfo.reset();
2248         m_AnnotInfo.clear();
2249     }
2250 
2251 protected:
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)2252     void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2253         if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2254             m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
2255         }
2256         if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2257             m_AnnotInfo.push_back(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
2258         }
2259     }
2260 };
2261 
2262 static
2263 pair<CRef<CTSE_Chunk_Info>, string>
s_CreateNAChunk(const CPSG_NamedAnnotInfo & psg_annot_info,const CPSG_BioseqInfo * bioseq_info)2264 s_CreateNAChunk(const CPSG_NamedAnnotInfo& psg_annot_info,
2265                 const CPSG_BioseqInfo* bioseq_info)
2266 {
2267     pair<CRef<CTSE_Chunk_Info>, string> ret;
2268     CRef<CTSE_Chunk_Info> chunk(new CTSE_Chunk_Info(kDelayedMain_ChunkId));
2269     unsigned main_count = 0;
2270     unsigned zoom_count = 0;
2271     // detailed annot info
2272     set<string> names;
2273     for ( auto& annot_info_ref : psg_annot_info.GetId2AnnotInfoList() ) {
2274         if ( s_GetDebugLevel() >= 8 ) {
2275             LOG_POST(Info<<"PSG loader: NA info "<<MSerial_AsnText<<*annot_info_ref);
2276         }
2277         const CID2S_Seq_annot_Info& annot_info = *annot_info_ref;
2278         // create special external annotations blob
2279         CAnnotName name(annot_info.GetName());
2280         if ( name.IsNamed() && !ExtractZoomLevel(name.GetName(), 0, 0) ) {
2281             //setter.GetTSE_LoadLock()->SetName(name);
2282             names.insert(name.GetName());
2283             ++main_count;
2284         }
2285         else {
2286             ++zoom_count;
2287         }
2288 
2289         vector<SAnnotTypeSelector> types;
2290         if ( annot_info.IsSetAlign() ) {
2291             types.push_back(SAnnotTypeSelector(CSeq_annot::C_Data::e_Align));
2292         }
2293         if ( annot_info.IsSetGraph() ) {
2294             types.push_back(SAnnotTypeSelector(CSeq_annot::C_Data::e_Graph));
2295         }
2296         if ( annot_info.IsSetFeat() ) {
2297             for ( auto feat_type_info_iter : annot_info.GetFeat() ) {
2298                 const CID2S_Feat_type_Info& finfo = *feat_type_info_iter;
2299                 int feat_type = finfo.GetType();
2300                 if ( feat_type == 0 ) {
2301                     types.push_back(SAnnotTypeSelector
2302                                     (CSeq_annot::C_Data::e_Seq_table));
2303                 }
2304                 else if ( !finfo.IsSetSubtypes() ) {
2305                     types.push_back(SAnnotTypeSelector
2306                                     (CSeqFeatData::E_Choice(feat_type)));
2307                 }
2308                 else {
2309                     for ( auto feat_subtype : finfo.GetSubtypes() ) {
2310                         types.push_back(SAnnotTypeSelector
2311                                         (CSeqFeatData::ESubtype(feat_subtype)));
2312                     }
2313                 }
2314             }
2315         }
2316 
2317         CTSE_Chunk_Info::TLocationSet loc;
2318         CSplitParser::x_ParseLocation(loc, annot_info.GetSeq_loc());
2319 
2320         ITERATE ( vector<SAnnotTypeSelector>, it, types ) {
2321             chunk->x_AddAnnotType(name, *it, loc);
2322         }
2323     }
2324     if ( names.size() == 1 ) {
2325         ret.second = *names.begin();
2326     }
2327     if ( s_GetDebugLevel() >= 5 ) {
2328         LOG_POST(Info<<"PSG loader: TSE "<<psg_annot_info.GetBlobId().GetId()<<
2329                  " annots: "<<ret.second<<" "<<main_count<<"+"<<zoom_count);
2330     }
2331     ret.first = chunk;
2332     return ret;
2333 }
2334 
2335 
GetAnnotRecordsNA(CDataSource * data_source,const CSeq_id_Handle & idh,const SAnnotSelector * sel,CDataLoader::TProcessedNAs * processed_nas)2336 CDataLoader::TTSE_LockSet CPSGDataLoader_Impl::GetAnnotRecordsNA(
2337     CDataSource* data_source,
2338     const CSeq_id_Handle& idh,
2339     const SAnnotSelector* sel,
2340     CDataLoader::TProcessedNAs* processed_nas)
2341 {
2342     return CallWithRetry(bind(&CPSGDataLoader_Impl::GetAnnotRecordsNAOnce, this,
2343                               data_source, cref(idh), sel, processed_nas),
2344                          "GetAnnotRecordsNA");
2345 }
2346 
2347 
GetAnnotRecordsNAOnce(CDataSource * data_source,const CSeq_id_Handle & idh,const SAnnotSelector * sel,CDataLoader::TProcessedNAs * processed_nas)2348 CDataLoader::TTSE_LockSet CPSGDataLoader_Impl::GetAnnotRecordsNAOnce(
2349     CDataSource* data_source,
2350     const CSeq_id_Handle& idh,
2351     const SAnnotSelector* sel,
2352     CDataLoader::TProcessedNAs* processed_nas)
2353 {
2354     CDataLoader::TTSE_LockSet locks;
2355     if ( !data_source ) {
2356         return locks;
2357     }
2358     if ( CannotProcess(idh) ) {
2359         return locks;
2360     }
2361     if ( sel && sel->IsIncludedAnyNamedAnnotAccession() ) {
2362         CPSG_BioId bio_id = x_GetBioId(idh);
2363         CPSG_Request_NamedAnnotInfo::TAnnotNames annot_names;
2364         const SAnnotSelector::TNamedAnnotAccessions& accs = sel->GetNamedAnnotAccessions();
2365         ITERATE(SAnnotSelector::TNamedAnnotAccessions, it, accs) {
2366             if ( kCreateLocalCDDEntries && NStr::EqualNocase(it->first, kCDDAnnotName) ) {
2367                 // CDDs are added as external annotations
2368                 continue;
2369             }
2370             annot_names.push_back(it->first);
2371         }
2372         auto context = make_shared<CPsgClientContext>();
2373         //_ASSERT(PsgIdToHandle(bio_id));
2374         auto request = make_shared<CPSG_Request_NamedAnnotInfo>(move(bio_id), annot_names, context);
2375         if ( auto reply = x_ProcessRequest(request) ) {
2376             CPSG_TaskGroup group(*m_ThreadPool);
2377             CRef<CPSG_AnnotRecordsNA_Task> task(new CPSG_AnnotRecordsNA_Task(reply, group));
2378             CPSG_Task_Guard guard(*task);
2379             group.AddTask(task);
2380             group.WaitAll();
2381 
2382             if (task->GetStatus() == CThreadPool_Task::eCompleted) {
2383                 for ( auto& info : task->m_AnnotInfo ) {
2384                     CDataLoader::SetProcessedNA(info->GetName(), processed_nas);
2385                     CRef<CPsgBlobId> blob_id(new CPsgBlobId(info->GetBlobId().GetId()));
2386                     auto chunk_info = s_CreateNAChunk(*info, task->m_BioseqInfo.get());
2387                     if ( chunk_info.first ) {
2388                         CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(blob_id);
2389                         CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
2390                         if ( load_lock ) {
2391                             if ( !load_lock.IsLoaded() ) {
2392                                 if ( !chunk_info.second.empty() ) {
2393                                     load_lock->SetName(chunk_info.second);
2394                                 }
2395                                 load_lock->GetSplitInfo().AddChunk(*chunk_info.first);
2396                                 _ASSERT(load_lock->x_NeedsDelayedMainChunk());
2397                                 load_lock.SetLoaded();
2398                             }
2399                             locks.insert(load_lock);
2400                         }
2401                     }
2402                     else {
2403                         // no annot info
2404                         if ( auto tse_lock = GetBlobById(data_source, *blob_id) ) {
2405                             locks.insert(tse_lock);
2406                         }
2407                     }
2408                 }
2409             }
2410             else {
2411                 _TRACE("Failed to load annotations for " << idh.AsString());
2412             }
2413         }
2414     }
2415     if ( kCreateLocalCDDEntries ) {
2416         CSeq_id_Handle gi;
2417         CSeq_id_Handle acc_ver;
2418         bool is_protein = true;
2419         TIds ids;
2420         GetIds(idh, ids);
2421         for ( auto id : ids ) {
2422             if ( id.IsGi() ) {
2423                 gi = id;
2424                 continue;
2425             }
2426             if ( id.Which() == CSeq_id::e_Pdb ) {
2427                 if ( !acc_ver ) {
2428                     acc_ver = id;
2429                 }
2430                 continue;
2431             }
2432             auto seq_id = id.GetSeqId();
2433             if ( auto text_id = seq_id->GetTextseq_Id() ) {
2434                 auto acc_type = seq_id->IdentifyAccession();
2435                 if ( acc_type & CSeq_id::fAcc_nuc ) {
2436                     is_protein = false;
2437                     break;
2438                 }
2439                 else if ( text_id->IsSetAccession() && text_id->IsSetVersion() &&
2440                           (acc_type & CSeq_id::fAcc_prot) ) {
2441                     is_protein = true;
2442                     acc_ver = CSeq_id_Handle::GetHandle(text_id->GetAccession()+'.'+
2443                                                         NStr::NumericToString(text_id->GetVersion()));
2444                 }
2445             }
2446         }
2447         if ( is_protein && gi ) {
2448             if ( auto tse_lock = x_CreateLocalCDDEntry(data_source, gi, acc_ver) ) {
2449                 locks.insert(tse_lock);
2450             }
2451         }
2452     }
2453     return locks;
2454 }
2455 
2456 
DropTSE(const CPsgBlobId & blob_id)2457 void CPSGDataLoader_Impl::DropTSE(const CPsgBlobId& blob_id)
2458 {
2459     m_BlobMap->DropBlob(blob_id);
2460 }
2461 
2462 
GetAccVers(const TIds & ids,TLoaded & loaded,TIds & ret)2463 void CPSGDataLoader_Impl::GetAccVers(const TIds& ids, TLoaded& loaded, TIds& ret)
2464 {
2465     CallWithRetry(bind(&CPSGDataLoader_Impl::GetAccVersOnce, this,
2466                        cref(ids), ref(loaded), ref(ret)),
2467                   "GetAccVers",
2468                   6);
2469 }
2470 
2471 
GetAccVersOnce(const TIds & ids,TLoaded & loaded,TIds & ret)2472 void CPSGDataLoader_Impl::GetAccVersOnce(const TIds& ids, TLoaded& loaded, TIds& ret)
2473 {
2474     vector<shared_ptr<SPsgBioseqInfo>> infos;
2475     infos.resize(ret.size());
2476     auto counts = x_GetBulkBioseqInfo(CPSG_Request_Resolve::fCanonicalId, ids, loaded, infos);
2477     if ( counts.first ) {
2478         // have loaded infos
2479         for (size_t i = 0; i < infos.size(); ++i) {
2480             if (loaded[i] || !infos[i].get()) continue;
2481             CSeq_id_Handle idh = infos[i]->canonical;
2482             if (idh.IsAccVer()) {
2483                 ret[i] = idh;
2484             }
2485             loaded[i] = true;
2486         }
2487     }
2488     if ( counts.second ) {
2489         NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load some acc.ver in bulk request");
2490     }
2491 }
2492 
2493 
GetGis(const TIds & ids,TLoaded & loaded,TGis & ret)2494 void CPSGDataLoader_Impl::GetGis(const TIds& ids, TLoaded& loaded, TGis& ret)
2495 {
2496     CallWithRetry(bind(&CPSGDataLoader_Impl::GetGisOnce, this,
2497                        cref(ids), ref(loaded), ref(ret)),
2498                   "GetAccVers",
2499                   8);
2500 }
2501 
2502 
GetGisOnce(const TIds & ids,TLoaded & loaded,TGis & ret)2503 void CPSGDataLoader_Impl::GetGisOnce(const TIds& ids, TLoaded& loaded, TGis& ret)
2504 {
2505     vector<shared_ptr<SPsgBioseqInfo>> infos;
2506     infos.resize(ret.size());
2507     auto counts = x_GetBulkBioseqInfo(CPSG_Request_Resolve::fGi, ids, loaded, infos);
2508     if ( counts.first ) {
2509         // have loaded infos
2510         for (size_t i = 0; i < infos.size(); ++i) {
2511             if (loaded[i] || !infos[i].get()) continue;
2512             ret[i] = infos[i]->gi;
2513             loaded[i] = true;
2514         }
2515     }
2516     if ( counts.second ) {
2517         NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load some acc.ver in bulk request");
2518     }
2519 }
2520 
2521 
x_GetBioId(const CSeq_id_Handle & idh)2522 CPSG_BioId CPSGDataLoader_Impl::x_GetBioId(const CSeq_id_Handle& idh)
2523 {
2524     CConstRef<CSeq_id> id = idh.GetSeqId();
2525     string label = id->AsFastaString();
2526     return CPSG_BioId(label, id->Which());
2527 }
2528 
2529 
x_SendRequest(shared_ptr<CPSG_Request> request)2530 void CPSGDataLoader_Impl::x_SendRequest(shared_ptr<CPSG_Request> request)
2531 {
2532     m_Queue->SendRequest(request, DEFAULT_DEADLINE);
2533     m_Thread->Wake();
2534 }
2535 
2536 
x_ProcessRequest(shared_ptr<CPSG_Request> request)2537 shared_ptr<CPSG_Reply> CPSGDataLoader_Impl::x_ProcessRequest(shared_ptr<CPSG_Request> request)
2538 {
2539     x_SendRequest(request);
2540     auto context = request->GetUserContext<CPsgClientContext>();
2541     _ASSERT(context);
2542     return context->GetReply();
2543 }
2544 
2545 
2546 CPSGDataLoader_Impl::SReplyResult
x_RetryBlobRequest(const string & blob_id,CDataSource * data_source,CSeq_id_Handle req_idh)2547 CPSGDataLoader_Impl::x_RetryBlobRequest(const string& blob_id, CDataSource* data_source, CSeq_id_Handle req_idh)
2548 {
2549 #ifdef LOCK4GET
2550     CDataLoader::TBlobId dl_blob_id(new CPsgBlobId(blob_id));
2551     CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
2552     if ( load_lock.IsLoaded() ) {
2553         _TRACE("x_RetryBlobRequest() already loaded " << blob_id);
2554         SReplyResult ret;
2555         ret.lock = load_lock;
2556         ret.blob_id = blob_id;
2557         return ret;
2558     }
2559 #else
2560     CTSE_LoadLock load_lock;
2561     {{
2562         CDataLoader::TBlobId dl_blob_id(new CPsgBlobId(blob_id));
2563         CTSE_LoadLock load_lock = data_source->GetTSE_LoadLockIfLoaded(dl_blob_id);
2564         if ( load_lock && load_lock.IsLoaded() ) {
2565             _TRACE("x_RetryBlobRequest() already loaded " << blob_id);
2566             SReplyResult ret;
2567             ret.lock = load_lock;
2568             ret.blob_id = blob_id;
2569             return ret;
2570         }
2571     }}
2572 #endif
2573 
2574     CPSG_BlobId req_blob_id(blob_id);
2575     auto context = make_shared<CPsgClientContext>();
2576     auto blob_request = make_shared<CPSG_Request_Blob>(req_blob_id, context);
2577     blob_request->IncludeData(m_TSERequestMode);
2578     auto blob_reply = x_ProcessRequest(blob_request);
2579     return x_ProcessBlobReply(blob_reply, data_source, req_idh, false, false, &load_lock);
2580 }
2581 
2582 
x_ProcessBlobReply(shared_ptr<CPSG_Reply> reply,CDataSource * data_source,CSeq_id_Handle req_idh,bool retry,bool lock_asap,CTSE_LoadLock * load_lock)2583 CPSGDataLoader_Impl::SReplyResult CPSGDataLoader_Impl::x_ProcessBlobReply(
2584     shared_ptr<CPSG_Reply> reply,
2585     CDataSource* data_source,
2586     CSeq_id_Handle req_idh,
2587     bool retry,
2588     bool lock_asap,
2589     CTSE_LoadLock* load_lock)
2590 {
2591     SReplyResult ret;
2592 
2593     if (!reply) {
2594         _TRACE("Request failed: null reply");
2595         return ret;
2596     }
2597 
2598     CPSG_TaskGroup group(*m_ThreadPool);
2599     CRef<CPSG_Blob_Task> task(
2600         new CPSG_Blob_Task(reply, group, req_idh, data_source, *this, lock_asap, load_lock));
2601     CPSG_Task_Guard guard(*task);
2602     group.AddTask(task);
2603     group.WaitAll();
2604 
2605     if (task->GetStatus() == CThreadPool_Task::eCompleted) {
2606         if (task->m_Skipped) {
2607             ret = task->WaitForSkipped();
2608             if (!ret.lock && retry) {
2609                 // Force reloading blob
2610                 ret = x_RetryBlobRequest(task->m_ReplyResult.blob_id, data_source, req_idh);
2611             }
2612         }
2613         else {
2614             ret = task->m_ReplyResult;
2615         }
2616     }
2617     else if ( !GetGetBlobByIdShouldFail() &&
2618               (lock_asap || load_lock) &&
2619               !task->m_ReplyResult.blob_id.empty() &&
2620               retry &&
2621               !task->GotNotFound() ) {
2622         // blob is required, but not received, yet blob_id is known, so we retry
2623         ret = x_RetryBlobRequest(task->m_ReplyResult.blob_id, data_source, req_idh);
2624         if ( !ret.lock ) {
2625             _TRACE("Failed to load blob for " << req_idh.AsString()<<" @ "<<CStackTrace());
2626             NCBI_THROW(CLoaderException, eLoaderFailed,
2627                        "CPSGDataLoader::GetRecords("+req_idh.AsString()+") failed");
2628         }
2629     }
2630     else if ( task->GotNotFound() ) {
2631         NCBI_THROW_FMT(CLoaderException, eNoData,
2632                        "CPSGDataLoader: No blob for seq_id="<<req_idh<<" blob_id="<<task->m_ReplyResult.blob_id);
2633     }
2634     else {
2635         _TRACE("Failed to load blob for " << req_idh.AsString()<<" @ "<<CStackTrace());
2636         NCBI_THROW(CLoaderException, eLoaderFailed,
2637                    "CPSGDataLoader::GetRecords("+req_idh.AsString()+") failed");
2638     }
2639     return ret;
2640 }
2641 
2642 
2643 class CPSG_BioseqInfo_Task : public CPSG_Task
2644 {
2645 public:
CPSG_BioseqInfo_Task(TReply reply,CPSG_TaskGroup & group)2646     CPSG_BioseqInfo_Task(TReply reply, CPSG_TaskGroup& group)
2647         : CPSG_Task(reply, group) {}
2648 
~CPSG_BioseqInfo_Task(void)2649     ~CPSG_BioseqInfo_Task(void) override {}
2650 
2651     shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
2652 
Finish(void)2653     void Finish(void) override {
2654         m_BioseqInfo.reset();
2655     }
2656 
2657 protected:
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)2658     void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2659         if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2660             m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
2661         }
2662     }
2663 };
2664 
2665 
x_GetBioseqInfo(const CSeq_id_Handle & idh)2666 shared_ptr<SPsgBioseqInfo> CPSGDataLoader_Impl::x_GetBioseqInfo(const CSeq_id_Handle& idh)
2667 {
2668     shared_ptr<SPsgBioseqInfo> ret = m_BioseqCache->Get(idh);
2669     if (ret) {
2670         return ret;
2671     }
2672 
2673     CPSG_BioId bio_id = x_GetBioId(idh);
2674     auto context = make_shared<CPsgClientContext>();
2675     shared_ptr<CPSG_Request_Resolve> request = make_shared<CPSG_Request_Resolve>(move(bio_id), context);
2676     request->IncludeInfo(CPSG_Request_Resolve::fAllInfo);
2677     x_SendRequest(request);
2678     auto reply = context->GetReply();
2679     if (!reply) {
2680         _TRACE("Request failed: null reply");
2681         NCBI_THROW(CLoaderException, eLoaderFailed, "null reply for "+idh.AsString());
2682         return nullptr;
2683     }
2684 
2685     CPSG_TaskGroup group(*m_ThreadPool);
2686     CRef<CPSG_BioseqInfo_Task> task(new CPSG_BioseqInfo_Task(reply, group));
2687     CPSG_Task_Guard guard(*task);
2688     group.AddTask(task);
2689     group.WaitAll();
2690 
2691     if (task->GetStatus() != CThreadPool_Task::eCompleted) {
2692         _TRACE("Failed to get bioseq info for " << idh.AsString() << " @ "<<CStackTrace());
2693         NCBI_THROW(CLoaderException, eLoaderFailed, "failed to get bioseq info for "+idh.AsString());
2694     }
2695     if (!task->m_BioseqInfo) {
2696         _TRACE("No bioseq info for " << idh.AsString());
2697         return nullptr;
2698     }
2699 
2700     return m_BioseqCache->Add(*task->m_BioseqInfo, idh);
2701 }
2702 
2703 
x_ReadBlobData(const SPsgBlobInfo & psg_blob_info,const CPSG_BlobInfo & blob_info,const CPSG_BlobData & blob_data,CTSE_LoadLock & load_lock,ESplitInfoType split_info_type)2704 void CPSGDataLoader_Impl::x_ReadBlobData(
2705     const SPsgBlobInfo& psg_blob_info,
2706     const CPSG_BlobInfo& blob_info,
2707     const CPSG_BlobData& blob_data,
2708     CTSE_LoadLock& load_lock,
2709     ESplitInfoType split_info_type)
2710 {
2711     if ( !load_lock.IsLoaded() ) {
2712         load_lock->SetBlobVersion(psg_blob_info.GetBlobVersion());
2713         load_lock->SetBlobState(psg_blob_info.blob_state);
2714     }
2715 
2716     auto_ptr<CObjectIStream> in(GetBlobDataStream(blob_info, blob_data));
2717     if (!in.get()) {
2718         _TRACE("Failed to open blob data stream for blob-id " << blob_info.GetId()->Repr());
2719         return;
2720     }
2721 
2722     if ( split_info_type == eIsSplitInfo ) {
2723         CRef<CID2S_Split_Info> split_info(new CID2S_Split_Info);
2724         *in >> *split_info;
2725         if ( s_GetDebugLevel() >= 8 ) {
2726             LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2727                      MSerial_AsnText<<*split_info);
2728         }
2729         CSplitParser::Attach(*load_lock, *split_info);
2730     }
2731     else {
2732         CRef<CSeq_entry> entry(new CSeq_entry);
2733         *in >> *entry;
2734         if ( s_GetDebugLevel() >= 8 ) {
2735             LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2736                      MSerial_AsnText<<*entry);
2737         }
2738         load_lock->SetSeq_entry(*entry);
2739     }
2740     if ( m_AddWGSMasterDescr ) {
2741         CWGSMasterSupport::AddWGSMaster(load_lock);
2742     }
2743 }
2744 
2745 
x_SetLoaded(CTSE_LoadLock & load_lock,EMainChunkType main_chunk_type)2746 void CPSGDataLoader_Impl::x_SetLoaded(CTSE_LoadLock& load_lock,
2747                                       EMainChunkType main_chunk_type)
2748 {
2749     if ( main_chunk_type == eDelayedMainChunk ) {
2750         load_lock->GetSplitInfo().GetChunk(kDelayedMain_ChunkId).SetLoaded();
2751         //_ASSERT(!load_lock->x_NeedsDelayedMainChunk());
2752     }
2753     else {
2754         load_lock.SetLoaded();
2755     }
2756 }
2757 
2758 
GetBlobDataStream(const CPSG_BlobInfo & blob_info,const CPSG_BlobData & blob_data)2759 CObjectIStream* CPSGDataLoader_Impl::GetBlobDataStream(
2760     const CPSG_BlobInfo& blob_info,
2761     const CPSG_BlobData& blob_data)
2762 {
2763     istream& data_stream = blob_data.GetStream();
2764     CNcbiIstream* in = &data_stream;
2765     auto_ptr<CNcbiIstream> z_stream;
2766     CObjectIStream* ret = nullptr;
2767 
2768     if (blob_info.GetCompression() == "gzip") {
2769         z_stream.reset(new CCompressionIStream(data_stream,
2770             new CZipStreamDecompressor(CZipCompression::fGZip), 0));
2771         in = z_stream.get();
2772     }
2773     else if (!blob_info.GetCompression().empty()) {
2774         _TRACE("Unsupported data compression: '" << blob_info.GetCompression() << "'");
2775         return nullptr;
2776     }
2777 
2778     EOwnership own = z_stream.get() ? eTakeOwnership : eNoOwnership;
2779     if (blob_info.GetFormat() == "asn.1") {
2780         ret = CObjectIStream::Open(eSerial_AsnBinary, *in, own);
2781     }
2782     else if (blob_info.GetFormat() == "asn1-text") {
2783         ret = CObjectIStream::Open(eSerial_AsnText, *in, own);
2784     }
2785     else if (blob_info.GetFormat() == "xml") {
2786         ret = CObjectIStream::Open(eSerial_Xml, *in, own);
2787     }
2788     else if (blob_info.GetFormat() == "json") {
2789         ret = CObjectIStream::Open(eSerial_Json, *in, own);
2790     }
2791     else {
2792         _TRACE("Unsupported data format: '" << blob_info.GetFormat() << "'");
2793         return nullptr;
2794     }
2795     _ASSERT(ret);
2796     z_stream.release();
2797     return ret;
2798 }
2799 
2800 
x_GetBulkBioseqInfo(CPSG_Request_Resolve::EIncludeInfo info,const TIds & ids,const TLoaded & loaded,TBioseqInfos & ret)2801 pair<size_t, size_t> CPSGDataLoader_Impl::x_GetBulkBioseqInfo(
2802     CPSG_Request_Resolve::EIncludeInfo info,
2803     const TIds& ids,
2804     const TLoaded& loaded,
2805     TBioseqInfos& ret)
2806 {
2807     pair<size_t, size_t> counts(0, 0);
2808     TIdxMap idx_map;
2809     auto context = make_shared<CPsgClientContext_Bulk>();
2810     for (size_t i = 0; i < ids.size(); ++i) {
2811         if (loaded[i]) continue;
2812         if ( CannotProcess(ids[i]) ) {
2813             continue;
2814         }
2815         ret[i] = m_BioseqCache->Get(ids[i]);
2816         if (ret[i]) {
2817             counts.first += 1;
2818             continue;
2819         }
2820         CPSG_BioId bio_id = x_GetBioId(ids[i]);
2821         shared_ptr<CPSG_Request_Resolve> request = make_shared<CPSG_Request_Resolve>(move(bio_id), context);
2822         idx_map[request.get()] = i;
2823         request->IncludeInfo(info);
2824         x_SendRequest(request);
2825     }
2826 
2827     CPSG_TaskGroup group(*m_ThreadPool);
2828     typedef  map<CRef<CPSG_BioseqInfo_Task>, size_t> TTasks;
2829     TTasks tasks;
2830     list<shared_ptr<CPSG_Task_Guard>> guards;
2831     while (!idx_map.empty()) {
2832         auto reply = context->GetReply();
2833         if (!reply) continue;
2834         TIdxMap::iterator idx_it = idx_map.find((void*)reply->GetRequest().get());
2835         size_t idx = ret.size();
2836         if (idx_it != idx_map.end()) {
2837             idx = idx_it->second;
2838             idx_map.erase(idx_it);
2839         }
2840 
2841         CRef<CPSG_BioseqInfo_Task> task(new CPSG_BioseqInfo_Task(reply, group));
2842         guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2843         tasks[task] = idx;
2844         group.AddTask(task);
2845     }
2846     while (group.HasTasks()) {
2847         CRef<CPSG_BioseqInfo_Task> task = group.GetTask<CPSG_BioseqInfo_Task>();
2848         _ASSERT(task);
2849         TTasks::const_iterator it = tasks.find(task);
2850         _ASSERT(it != tasks.end());
2851         if (task->GetStatus() == CThreadPool_Task::eFailed) {
2852             _TRACE("Failed to load bioseq info for " << ids[it->second].AsString());
2853             counts.second += 1;
2854             continue;
2855         }
2856         if (!task->m_BioseqInfo) {
2857             _TRACE("No bioseq info for " << ids[it->second].AsString());
2858             // not loaded and no failure
2859             continue;
2860         }
2861         _ASSERT(task->m_BioseqInfo);
2862         ret[it->second] = make_shared<SPsgBioseqInfo>(*task->m_BioseqInfo);
2863         counts.first += 1;
2864     }
2865     return counts;
2866 }
2867 
2868 
2869 END_SCOPE(objects)
2870 END_NCBI_SCOPE
2871 
2872 #endif // HAVE_PSG_LOADER
2873