1 /* $Id: psg_loader_impl.cpp 637408 2021-09-13 11:50:24Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Eugene Vasilchenko, Aleksey Grichenko
27 *
28 * File Description: PSG data loader
29 *
30 * ===========================================================================
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbistd.hpp>
35 #include <corelib/ncbithr.hpp>
36 #include <corelib/ncbi_param.hpp>
37 #include <corelib/plugin_manager_store.hpp>
38 #include <objects/seqsplit/ID2S_Split_Info.hpp>
39 #include <objects/seqsplit/ID2S_Chunk.hpp>
40 #include <objects/seqsplit/ID2S_Feat_type_Info.hpp>
41 #include <objects/general/Dbtag.hpp>
42 #include <objmgr/impl/data_source.hpp>
43 #include <objmgr/impl/tse_loadlock.hpp>
44 #include <objmgr/impl/tse_chunk_info.hpp>
45 #include <objmgr/impl/tse_split_info.hpp>
46 #include <objmgr/impl/split_parser.hpp>
47 #include <objmgr/data_loader_factory.hpp>
48 #include <objmgr/annot_selector.hpp>
49 #include <objtools/data_loaders/genbank/impl/psg_loader_impl.hpp>
50 #include <objtools/data_loaders/genbank/impl/wgsmaster.hpp>
51 #include <util/compress/compress.hpp>
52 #include <util/compress/stream.hpp>
53 #include <util/compress/zlib.hpp>
54 #include <serial/objistr.hpp>
55 #include <serial/serial.hpp>
56 #include <util/thread_pool.hpp>
57 #include <sstream>
58
59 #if defined(HAVE_PSG_LOADER)
60
61 #define LOCK4GET 1
62
63 BEGIN_NCBI_SCOPE
64
65 //#define NCBI_USE_ERRCODE_X PSGLoader
66 //NCBI_DEFINE_ERR_SUBCODE_X(1);
67
68 BEGIN_SCOPE(objects)
69
70 const int kSplitInfoChunkId = 999999999;
71
72 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, DEBUG);
73 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, DEBUG, 1,
74 eParam_NoThread, PSG_LOADER_DEBUG);
75 typedef NCBI_PARAM_TYPE(PSG_LOADER, DEBUG) TPSG_Debug;
76
77
s_GetDebugLevel()78 static unsigned int s_GetDebugLevel()
79 {
80 static auto value = TPSG_Debug::GetDefault();
81 return value;
82 }
83
84 /////////////////////////////////////////////////////////////////////////////
85 // CPsgClientContext
86 /////////////////////////////////////////////////////////////////////////////
87
88 class CPsgClientContext
89 {
90 public:
91 CPsgClientContext(void);
~CPsgClientContext(void)92 virtual ~CPsgClientContext(void) {}
93
94 virtual void SetReply(shared_ptr<CPSG_Reply> reply);
95 virtual shared_ptr<CPSG_Reply> GetReply(void);
96
97 protected:
98 CSemaphore m_Sema;
99 private:
100 shared_ptr<CPSG_Reply> m_Reply;
101 };
102
103
CPsgClientContext(void)104 CPsgClientContext::CPsgClientContext(void)
105 : m_Sema(0, kMax_UInt)
106 {
107 }
108
109
SetReply(shared_ptr<CPSG_Reply> reply)110 void CPsgClientContext::SetReply(shared_ptr<CPSG_Reply> reply)
111 {
112 m_Reply = reply;
113 m_Sema.Post();
114 }
115
116
GetReply(void)117 shared_ptr<CPSG_Reply> CPsgClientContext::GetReply(void)
118 {
119 m_Sema.Wait();
120 return m_Reply;
121 }
122
123
124 class CPsgClientContext_Bulk : public CPsgClientContext
125 {
126 public:
CPsgClientContext_Bulk(void)127 CPsgClientContext_Bulk(void) {}
~CPsgClientContext_Bulk(void)128 virtual ~CPsgClientContext_Bulk(void) {}
129
130 void SetReply(shared_ptr<CPSG_Reply> reply) override;
131 shared_ptr<CPSG_Reply> GetReply(void) override;
132
133 private:
134 deque<shared_ptr<CPSG_Reply>> m_Replies;
135 CFastMutex m_Lock;
136 };
137
138
SetReply(shared_ptr<CPSG_Reply> reply)139 void CPsgClientContext_Bulk::SetReply(shared_ptr<CPSG_Reply> reply)
140 {
141 CFastMutexGuard guard(m_Lock);
142 m_Replies.push_front(reply);
143 m_Sema.Post();
144 }
145
146
GetReply(void)147 shared_ptr<CPSG_Reply> CPsgClientContext_Bulk::GetReply(void)
148 {
149 m_Sema.Wait();
150 shared_ptr<CPSG_Reply> ret;
151 CFastMutexGuard guard(m_Lock);
152 _ASSERT(!m_Replies.empty());
153 ret = m_Replies.back();
154 m_Replies.pop_back();
155 return ret;
156 }
157
158
159 /////////////////////////////////////////////////////////////////////////////
160 // CPsgClientThread
161 /////////////////////////////////////////////////////////////////////////////
162
163 class CPsgClientThread : public CThread
164 {
165 public:
CPsgClientThread(shared_ptr<CPSG_Queue> queue)166 CPsgClientThread(shared_ptr<CPSG_Queue> queue) : m_Queue(queue), m_WakeSema(0, kMax_UInt) {}
167
Stop(void)168 void Stop(void)
169 {
170 m_Stop = true;
171 Wake();
172 }
173
Wake()174 void Wake()
175 {
176 m_WakeSema.Post();
177 }
178
179 protected:
180 void* Main(void) override;
181
182 private:
183 bool m_Stop = false;
184 shared_ptr<CPSG_Queue> m_Queue;
185 CSemaphore m_WakeSema;
186 };
187
188
189 const unsigned int kMaxWaitSeconds = 3;
190 const unsigned int kMaxWaitMillisec = 0;
191
192 #define DEFAULT_DEADLINE CDeadline(kMaxWaitSeconds, kMaxWaitMillisec)
193
Main(void)194 void* CPsgClientThread::Main(void)
195 {
196 for (;;) {
197 m_WakeSema.Wait();
198 if (m_Stop) break;
199 shared_ptr<CPSG_Reply> reply;
200 do {
201 reply = m_Queue->GetNextReply(DEFAULT_DEADLINE);
202 }
203 while (!reply && !m_Stop);
204 if (m_Stop) break;
205 auto context = reply->GetRequest()->GetUserContext<CPsgClientContext>();
206 context->SetReply(reply);
207 }
208 return nullptr;
209 }
210
211
212 /////////////////////////////////////////////////////////////////////////////
213 // CPSGBioseqCache
214 /////////////////////////////////////////////////////////////////////////////
215
216
PsgIdToHandle(const CPSG_BioId & id)217 CSeq_id_Handle PsgIdToHandle(const CPSG_BioId& id)
218 {
219 string sid = id.GetId();
220 if (sid.empty()) return CSeq_id_Handle();
221 return CSeq_id_Handle::GetHandle(sid);
222 }
223
224
225 const int kMaxCacheLifespanSeconds = 300;
226 const size_t kMaxCacheSize = 10000;
227
228
229 class CPSGBioseqCache
230 {
231 public:
CPSGBioseqCache(void)232 CPSGBioseqCache(void) {}
~CPSGBioseqCache(void)233 ~CPSGBioseqCache(void) {}
234
235 shared_ptr<SPsgBioseqInfo> Get(const CSeq_id_Handle& idh);
236 shared_ptr<SPsgBioseqInfo> Add(const CPSG_BioseqInfo& info, CSeq_id_Handle req_idh);
237
238 private:
239 typedef map<CSeq_id_Handle, shared_ptr<SPsgBioseqInfo> > TIdMap;
240 typedef list<shared_ptr<SPsgBioseqInfo> > TInfoQueue;
241
242 mutable CFastMutex m_Mutex;
243 size_t m_MaxSize = kMaxCacheSize;
244 TIdMap m_Ids;
245 TInfoQueue m_Infos;
246 };
247
248
Get(const CSeq_id_Handle & idh)249 shared_ptr<SPsgBioseqInfo> CPSGBioseqCache::Get(const CSeq_id_Handle& idh)
250 {
251 CFastMutexGuard guard(m_Mutex);
252 auto found = m_Ids.find(idh);
253 if (found == m_Ids.end()) return nullptr;
254 shared_ptr<SPsgBioseqInfo> ret = found->second;
255 m_Infos.remove(ret);
256 ret->deadline = CDeadline(kMaxCacheLifespanSeconds);
257 m_Infos.push_back(ret);
258 return ret;
259 }
260
261
Add(const CPSG_BioseqInfo & info,CSeq_id_Handle req_idh)262 shared_ptr<SPsgBioseqInfo> CPSGBioseqCache::Add(const CPSG_BioseqInfo& info, CSeq_id_Handle req_idh)
263 {
264 CSeq_id_Handle idh = PsgIdToHandle(info.GetCanonicalId());
265 if (!idh) return nullptr;
266 // Try to find an existing entry (though this should not be a common case).
267 CFastMutexGuard guard(m_Mutex);
268 auto found = m_Ids.find(idh);
269 if (found != m_Ids.end()) {
270 found->second->Update(info);
271 return found->second;
272 }
273 // Create new entry.
274 shared_ptr<SPsgBioseqInfo> ret = make_shared<SPsgBioseqInfo>(info);
275 while (!m_Infos.empty() && (m_Infos.size() > m_MaxSize || m_Infos.front()->deadline.IsExpired())) {
276 auto rm = m_Infos.front();
277 m_Infos.pop_front();
278 ITERATE(SPsgBioseqInfo::TIds, id, rm->ids) {
279 m_Ids.erase(*id);
280 }
281 }
282 m_Infos.push_back(ret);
283 if (req_idh) {
284 m_Ids[req_idh] = ret;
285 }
286 ITERATE(SPsgBioseqInfo::TIds, it, ret->ids) {
287 m_Ids[*it] = ret;
288 }
289 return ret;
290 }
291
292
293 /////////////////////////////////////////////////////////////////////////////
294 // CPSGBlobMap
295 /////////////////////////////////////////////////////////////////////////////
296
297
298 class CPSGBlobMap
299 {
300 public:
CPSGBlobMap(void)301 CPSGBlobMap(void) {}
~CPSGBlobMap(void)302 ~CPSGBlobMap(void) {}
303
FindBlob(const string & bid)304 shared_ptr<SPsgBlobInfo> FindBlob(const string& bid) {
305 CFastMutexGuard guard(m_Mutex);
306 auto found = m_Blobs.find(bid);
307 return found != m_Blobs.end() ? found->second : nullptr;
308 }
309
AddBlob(const string & bid,shared_ptr<SPsgBlobInfo> blob)310 void AddBlob(const string& bid, shared_ptr<SPsgBlobInfo> blob) {
311 CFastMutexGuard guard(m_Mutex);
312 m_Blobs[bid] = blob;
313 }
314
DropBlob(const CPsgBlobId & blob_id)315 void DropBlob(const CPsgBlobId& blob_id) {
316 CFastMutexGuard guard(m_Mutex);
317 auto blob_it = m_Blobs.find(blob_id.ToPsgId());
318 if (blob_it != m_Blobs.end()) {
319 m_Blobs.erase(blob_it);
320 }
321 }
322
323 private:
324 // Map blob-id to blob info
325 typedef map<string, shared_ptr<SPsgBlobInfo> > TBlobs;
326
327 CFastMutex m_Mutex;
328 TBlobs m_Blobs;
329 };
330
331
332 /////////////////////////////////////////////////////////////////////////////
333 // SPsgBioseqInfo
334 /////////////////////////////////////////////////////////////////////////////
335
336
SPsgBioseqInfo(const CPSG_BioseqInfo & bioseq_info)337 SPsgBioseqInfo::SPsgBioseqInfo(const CPSG_BioseqInfo& bioseq_info)
338 : included_info(0),
339 molecule_type(CSeq_inst::eMol_not_set),
340 length(0),
341 state(0),
342 tax_id(INVALID_TAX_ID),
343 hash(0),
344 deadline(kMaxCacheLifespanSeconds)
345 {
346 Update(bioseq_info);
347 }
348
349
Update(const CPSG_BioseqInfo & bioseq_info)350 SPsgBioseqInfo::TIncludedInfo SPsgBioseqInfo::Update(const CPSG_BioseqInfo& bioseq_info)
351 {
352 TIncludedInfo inc_info = bioseq_info.IncludedInfo() & ~included_info;
353 if (inc_info & CPSG_Request_Resolve::fMoleculeType)
354 molecule_type = bioseq_info.GetMoleculeType();
355
356 if (inc_info & CPSG_Request_Resolve::fLength)
357 length = bioseq_info.GetLength();
358
359 if (inc_info & CPSG_Request_Resolve::fState)
360 state = bioseq_info.GetState();
361
362 if (inc_info & CPSG_Request_Resolve::fTaxId)
363 tax_id = bioseq_info.GetTaxId();
364
365 if (inc_info & CPSG_Request_Resolve::fHash)
366 hash = bioseq_info.GetHash();
367
368 if (inc_info & CPSG_Request_Resolve::fCanonicalId) {
369 canonical = PsgIdToHandle(bioseq_info.GetCanonicalId());
370 ids.push_back(canonical);
371 }
372 if (inc_info & CPSG_Request_Resolve::fGi)
373 gi = bioseq_info.GetGi();
374
375 if (inc_info & CPSG_Request_Resolve::fOtherIds) {
376 vector<CPSG_BioId> other_ids = bioseq_info.GetOtherIds();
377 ITERATE(vector<CPSG_BioId>, other_id, other_ids) {
378 ids.push_back(PsgIdToHandle(*other_id));
379 }
380 }
381 if (inc_info & CPSG_Request_Resolve::fBlobId)
382 blob_id = bioseq_info.GetBlobId().GetId();
383
384 included_info |= inc_info;
385 return inc_info;
386 }
387
388
389 /////////////////////////////////////////////////////////////////////////////
390 // SPsgBlobInfo
391 /////////////////////////////////////////////////////////////////////////////
392
393
SPsgBlobInfo(const CPSG_BlobInfo & blob_info)394 SPsgBlobInfo::SPsgBlobInfo(const CPSG_BlobInfo& blob_info)
395 : blob_state(0)
396 {
397 auto blob_id = blob_info.GetId<CPSG_BlobId>();
398 _ASSERT(blob_id);
399 blob_id_main = blob_id->GetId();
400 id2_info = blob_info.GetId2Info();
401
402 if (blob_info.IsDead()) blob_state |= CBioseq_Handle::fState_dead;
403 if (blob_info.IsSuppressed()) blob_state |= CBioseq_Handle::fState_suppress_perm;
404 if (blob_info.IsWithdrawn()) blob_state |= CBioseq_Handle::fState_withdrawn;
405
406 auto lm = blob_id->GetLastModified(); // last_modified is in milliseconds
407 last_modified = lm.IsNull()? 0: lm.GetValue();
408 }
409
410
411 /////////////////////////////////////////////////////////////////////////////
412 // CPSG_Task
413 /////////////////////////////////////////////////////////////////////////////
414
ReportStatus(TReply reply,EPSG_Status status)415 template<class TReply> void ReportStatus(TReply reply, EPSG_Status status)
416 {
417 if (status == EPSG_Status::eSuccess) return;
418 string sstatus;
419 switch (status) {
420 case EPSG_Status::eCanceled:
421 sstatus = "Canceled";
422 break;
423 case EPSG_Status::eError:
424 sstatus = "Error";
425 break;
426 case EPSG_Status::eInProgress:
427 sstatus = "In progress";
428 break;
429 case EPSG_Status::eNotFound:
430 sstatus = "Not found";
431 break;
432 default:
433 sstatus = to_string((int)status);
434 break;
435 }
436 while (true) {
437 string msg = reply->GetNextMessage();
438 if (msg.empty()) break;
439 _TRACE("Request failed: " << sstatus << " - " << msg << " @ "<<CStackTrace());
440 }
441 }
442
443
444 class CPSG_TaskGroup;
445
446
447 class CPSG_Task : public CThreadPool_Task
448 {
449 public:
450 typedef shared_ptr<CPSG_Reply> TReply;
451
452 CPSG_Task(TReply reply, CPSG_TaskGroup& group);
~CPSG_Task(void)453 ~CPSG_Task(void) override {}
454
455 EStatus Execute(void) override;
456 virtual void Finish(void) = 0;
457
GotNotFound() const458 bool GotNotFound() const {
459 return m_GotNotFound;
460 }
461
462 protected:
463 void OnStatusChange(EStatus old) override;
464
GetReply(void)465 TReply& GetReply(void) { return m_Reply; }
466
DoExecute(void)467 virtual void DoExecute(void) {
468 if (!CheckReplyStatus()) return;
469 ReadReply();
470 if (m_Status == eExecuting) m_Status = eCompleted;
471 }
472
IsCancelled(void)473 bool IsCancelled(void) {
474 if (IsCancelRequested()) {
475 m_Status = eFailed;
476 return true;
477 }
478 return false;
479 }
480
481 bool CheckReplyStatus(void);
482 void ReadReply(void);
483 virtual void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) = 0;
484
485 TReply m_Reply;
486 EStatus m_Status;
487 bool m_GotNotFound;
488 private:
489 CPSG_TaskGroup& m_Group;
490 };
491
492
493 // It may happen that a CThreadPool's thread holds CRef to a task longer than
494 // the loader exists. In this case the task needs to release some data
495 // (e.g. load locks) before the loader is destroyed. The guard calls
496 // Finish() to do the cleanup.
497 class CPSG_Task_Guard
498 {
499 public:
CPSG_Task_Guard(CPSG_Task & task)500 CPSG_Task_Guard(CPSG_Task& task) : m_Task(&task) {}
~CPSG_Task_Guard(void)501 ~CPSG_Task_Guard(void) { if (m_Task) m_Task->Finish(); }
Resease(void)502 void Resease(void) { m_Task.Reset(); }
503 private:
504 CPSG_Task_Guard(const CPSG_Task_Guard&);
505 CPSG_Task_Guard& operator=(const CPSG_Task_Guard&);
506
507 CRef<CPSG_Task> m_Task;
508 };
509
510
511 class CPSG_TaskGroup
512 {
513 public:
CPSG_TaskGroup(CThreadPool & pool)514 CPSG_TaskGroup(CThreadPool& pool)
515 : m_Pool(pool), m_Semaphore(0, kMax_UInt) {}
516
AddTask(CPSG_Task * task)517 void AddTask(CPSG_Task* task) {
518 {
519 CMutexGuard guard(m_Mutex);
520 m_Tasks.insert(Ref(task));
521 m_Pool.AddTask(task);
522 }
523 }
524
PostFinished(CPSG_Task & task)525 void PostFinished(CPSG_Task& task)
526 {
527 {
528 CRef<CPSG_Task> ref(&task);
529 CMutexGuard guard(m_Mutex);
530 TTasks::iterator it = m_Tasks.find(ref);
531 if (it == m_Tasks.end()) return;
532 m_Done.insert(ref);
533 m_Tasks.erase(it);
534 }
535 m_Semaphore.Post();
536 }
537
HasTasks(void) const538 bool HasTasks(void) const
539 {
540 CMutexGuard guard(m_Mutex);
541 return !m_Tasks.empty() || ! m_Done.empty();
542 }
543
WaitAll(void)544 void WaitAll(void) {
545 while (HasTasks()) GetTask<CPSG_Task>();
546 }
547
548 template<class T>
GetTask(void)549 CRef<T> GetTask(void) {
550 m_Semaphore.Wait();
551 CRef<T> ret;
552 CMutexGuard guard(m_Mutex);
553 _ASSERT(!m_Done.empty());
554 TTasks::iterator it = m_Done.begin();
555 ret.Reset(dynamic_cast<T*>(it->GetNCPointerOrNull()));
556 m_Done.erase(it);
557 return ret;
558 }
559
CancelAll(void)560 void CancelAll(void)
561 {
562 {
563 CMutexGuard guard(m_Mutex);
564 for (CRef<CPSG_Task> task : m_Tasks) {
565 task->RequestToCancel();
566 }
567 }
568 WaitAll();
569 }
570
571 private:
572 typedef set<CRef<CPSG_Task>> TTasks;
573
574 CThreadPool& m_Pool;
575 CSemaphore m_Semaphore;
576 TTasks m_Tasks;
577 TTasks m_Done;
578 mutable CMutex m_Mutex;
579 };
580
581
CPSG_Task(TReply reply,CPSG_TaskGroup & group)582 CPSG_Task::CPSG_Task(TReply reply, CPSG_TaskGroup& group)
583 : m_Reply(reply),
584 m_Status(eIdle),
585 m_GotNotFound(false),
586 m_Group(group)
587 {
588 }
589
590
Execute(void)591 CPSG_Task::EStatus CPSG_Task::Execute(void)
592 {
593 m_Status = eExecuting;
594 try {
595 DoExecute();
596 }
597 catch (CException& exc) {
598 LOG_POST("CPSGDataLoader: exception in retrieval thread: "<<exc);
599 return eFailed;
600 }
601 catch (exception& exc) {
602 LOG_POST("CPSGDataLoader: exception in retrieval thread: "<<exc.what());
603 return eFailed;
604 }
605 return m_Status;
606 }
607
608
OnStatusChange(EStatus old)609 void CPSG_Task::OnStatusChange(EStatus old)
610 {
611 EStatus status = GetStatus();
612 if (status == eCompleted || status == eFailed || status == eCanceled) {
613 m_Group.PostFinished(*this);
614 }
615 }
616
CheckReplyStatus(void)617 bool CPSG_Task::CheckReplyStatus(void)
618 {
619 EPSG_Status status = m_Reply->GetStatus(0);
620 if (status != EPSG_Status::eSuccess && status != EPSG_Status::eInProgress) {
621 ReportStatus(m_Reply, status);
622 m_Status = eFailed;
623 return false;
624 }
625 return true;
626 }
627
628
ReadReply(void)629 void CPSG_Task::ReadReply(void)
630 {
631 EPSG_Status status;
632 for (;;) {
633 if (IsCancelled()) return;
634 auto reply_item = m_Reply->GetNextItem(DEFAULT_DEADLINE);
635 if (!reply_item) continue;
636 if (reply_item->GetType() == CPSG_ReplyItem::eEndOfReply) break;
637 if (IsCancelled()) return;
638
639 EPSG_Status status = reply_item->GetStatus(0);
640 if (status != EPSG_Status::eSuccess && status != EPSG_Status::eInProgress) {
641 ReportStatus(reply_item, status);
642 if ( status == EPSG_Status::eNotFound ) {
643 m_GotNotFound = true;
644 continue;
645 }
646 m_Status = eFailed;
647 return;
648 }
649 if (status == EPSG_Status::eInProgress) {
650 status = reply_item->GetStatus(CDeadline::eInfinite);
651 if (IsCancelled()) return;
652 }
653 if (status != EPSG_Status::eSuccess) {
654 ReportStatus(reply_item, status);
655 m_Status = eFailed;
656 return;
657 }
658 ProcessReplyItem(reply_item);
659 }
660 if (IsCancelled()) return;
661 status = m_Reply->GetStatus(CDeadline::eInfinite);
662 if (status != EPSG_Status::eSuccess) {
663 ReportStatus(m_Reply, status);
664 m_Status = eFailed;
665 }
666 }
667
668
669 /////////////////////////////////////////////////////////////////////////////
670 // CPSGDataLoader_Impl
671 /////////////////////////////////////////////////////////////////////////////
672
673 #define NCBI_PSGLOADER_NAME "psg_loader"
674 #define NCBI_PSGLOADER_SERVICE_NAME "service_name"
675 #define NCBI_PSGLOADER_NOSPLIT "no_split"
676 #define NCBI_PSGLOADER_WHOLE_TSE "whole_tse"
677 #define NCBI_PSGLOADER_WHOLE_TSE_BULK "whole_tse_bulk"
678 #define NCBI_PSGLOADER_ADD_WGS_MASTER "add_wgs_master"
679
680 NCBI_PARAM_DECL(string, PSG_LOADER, SERVICE_NAME);
681 NCBI_PARAM_DEF_EX(string, PSG_LOADER, SERVICE_NAME, "PSG2",
682 eParam_NoThread, PSG_LOADER_SERVICE_NAME);
683 typedef NCBI_PARAM_TYPE(PSG_LOADER, SERVICE_NAME) TPSG_ServiceName;
684
685 NCBI_PARAM_DECL(bool, PSG_LOADER, NO_SPLIT);
686 NCBI_PARAM_DEF_EX(bool, PSG_LOADER, NO_SPLIT, false,
687 eParam_NoThread, PSG_LOADER_NO_SPLIT);
688 typedef NCBI_PARAM_TYPE(PSG_LOADER, NO_SPLIT) TPSG_NoSplit;
689
690 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, WHOLE_TSE);
691 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, WHOLE_TSE, false,
692 eParam_NoThread, PSG_LOADER_WHOLE_TSE);
693 typedef NCBI_PARAM_TYPE(PSG_LOADER, WHOLE_TSE) TPSG_WholeTSE;
694
695 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, WHOLE_TSE_BULK);
696 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, WHOLE_TSE_BULK, true,
697 eParam_NoThread, PSG_LOADER_WHOLE_TSE_BULK);
698 typedef NCBI_PARAM_TYPE(PSG_LOADER, WHOLE_TSE_BULK) TPSG_WholeTSEBulk;
699
700 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, MAX_POOL_THREADS);
701 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, MAX_POOL_THREADS, 10,
702 eParam_NoThread, PSG_LOADER_MAX_POOL_THREADS);
703 typedef NCBI_PARAM_TYPE(PSG_LOADER, MAX_POOL_THREADS) TPSG_MaxPoolThreads;
704
CPSGDataLoader_Impl(const CGBLoaderParams & params)705 CPSGDataLoader_Impl::CPSGDataLoader_Impl(const CGBLoaderParams& params)
706 : m_BlobMap(new CPSGBlobMap()),
707 m_BioseqCache(new CPSGBioseqCache()),
708 m_ThreadPool(new CThreadPool(kMax_UInt, TPSG_MaxPoolThreads::GetDefault()))
709 {
710 auto_ptr<CPSGDataLoader::TParamTree> app_params;
711 const CPSGDataLoader::TParamTree* psg_params = 0;
712 if (params.GetParamTree()) {
713 psg_params = CPSGDataLoader::GetParamsSubnode(params.GetParamTree(), NCBI_PSGLOADER_NAME);
714 }
715 else {
716 CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
717 if (app) {
718 app_params.reset(CConfig::ConvertRegToTree(app->GetConfig()));
719 psg_params = CPSGDataLoader::GetParamsSubnode(app_params.get(), NCBI_PSGLOADER_NAME);
720 }
721 }
722
723 string service_name;
724 if (service_name.empty() && psg_params) {
725 service_name = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_SERVICE_NAME);
726 }
727 if (service_name.empty()) {
728 service_name = params.GetPSGServiceName();
729 }
730 if (service_name.empty()) {
731 service_name = TPSG_ServiceName::GetDefault();
732 }
733
734 bool no_split = params.GetPSGNoSplit();
735 if (psg_params) {
736 try {
737 string value = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_NOSPLIT);
738 if (!value.empty()) {
739 no_split = NStr::StringToBool(value);
740 }
741 }
742 catch (CException&) {
743 }
744 }
745 if ( no_split ) {
746 m_TSERequestModeBulk = m_TSERequestMode = CPSG_Request_Biodata::eOrigTSE;
747 }
748 else {
749 {{
750 bool whole_tse = TPSG_WholeTSE::GetDefault();
751 if ( psg_params ) {
752 try {
753 string value = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_WHOLE_TSE);
754 if (!value.empty()) {
755 whole_tse = NStr::StringToBool(value);
756 }
757 }
758 catch (CException&) {
759 }
760 }
761 if ( whole_tse ) {
762 m_TSERequestMode = CPSG_Request_Biodata::eWholeTSE;
763 }
764 else {
765 m_TSERequestMode = CPSG_Request_Biodata::eSmartTSE;
766 }
767 }}
768 {{
769 bool whole_tse = TPSG_WholeTSEBulk::GetDefault();
770 if ( psg_params ) {
771 try {
772 string value = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_WHOLE_TSE_BULK);
773 if (!value.empty()) {
774 whole_tse = NStr::StringToBool(value);
775 }
776 }
777 catch (CException&) {
778 }
779 }
780 if ( whole_tse ) {
781 m_TSERequestModeBulk = CPSG_Request_Biodata::eWholeTSE;
782 }
783 else {
784 m_TSERequestModeBulk = CPSG_Request_Biodata::eSmartTSE;
785 }
786 }}
787 }
788
789 m_AddWGSMasterDescr = true;
790 if ( psg_params ) {
791 string param = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_ADD_WGS_MASTER);
792 if ( !param.empty() ) {
793 try {
794 m_AddWGSMasterDescr = NStr::StringToBool(param);
795 }
796 catch ( CException& exc ) {
797 NCBI_RETHROW_FMT(exc, CLoaderException, eBadConfig,
798 "Bad value of parameter "
799 NCBI_PSGLOADER_ADD_WGS_MASTER
800 ": \""<<param<<"\"");
801 }
802 }
803 }
804
805 m_Queue = make_shared<CPSG_Queue>(service_name);
806 m_Thread.Reset(new CPsgClientThread(m_Queue));
807 m_Thread->Run();
808 }
809
810
~CPSGDataLoader_Impl(void)811 CPSGDataLoader_Impl::~CPSGDataLoader_Impl(void)
812 {
813 m_Thread->Stop();
814 m_Thread->Join();
815 }
816
817
CannotProcess(const CSeq_id_Handle & sih)818 static bool CannotProcess(const CSeq_id_Handle& sih)
819 {
820 if ( !sih ) {
821 return true;
822 }
823 if ( sih.Which() == CSeq_id::e_Local ) {
824 return true;
825 }
826 if ( sih.Which() == CSeq_id::e_General ) {
827 if ( NStr::EqualNocase(sih.GetSeqId()->GetGeneral().GetDb(), "SRA") ) {
828 // SRA is good
829 return false;
830 }
831 if ( NStr::StartsWith(sih.GetSeqId()->GetGeneral().GetDb(), "WGS:", NStr::eNocase) ) {
832 // WGS is good
833 return false;
834 }
835 // other general ids are good too(?)
836 return false;
837 }
838 return false;
839 }
840
841
842 template<class Call>
843 typename std::result_of<Call()>::type
CallWithRetry(Call && call,const char * name,int retry_count)844 CPSGDataLoader_Impl::CallWithRetry(Call&& call,
845 const char* name,
846 int retry_count)
847 {
848 if ( retry_count == 0 ) {
849 retry_count = 4;
850 }
851 for ( int t = 1; t < retry_count; ++ t ) {
852 try {
853 return call();
854 }
855 catch ( CLoaderException& exc ) {
856 if ( exc.GetErrCode() == exc.eConnectionFailed ||
857 exc.GetErrCode() == exc.eLoaderFailed ) {
858 // can retry
859 LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc);
860 }
861 else {
862 // no retry
863 throw;
864 }
865 }
866 catch ( CException& exc ) {
867 LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc);
868 }
869 catch ( exception& exc ) {
870 LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc.what());
871 }
872 catch ( ... ) {
873 LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception");
874 }
875 double wait_sec = 1<<(t-1);
876 LOG_POST(Warning<<"CPSGDataLoader: waiting "<<wait_sec<<"s before retry");
877 SleepMilliSec(Uint4(wait_sec*1000));
878 }
879 return call();
880 }
881
882
GetIds(const CSeq_id_Handle & idh,TIds & ids)883 void CPSGDataLoader_Impl::GetIds(const CSeq_id_Handle& idh, TIds& ids)
884 {
885 CallWithRetry(bind(&CPSGDataLoader_Impl::GetIdsOnce, this,
886 cref(idh), ref(ids)),
887 "GetIds");
888 }
889
890
GetIdsOnce(const CSeq_id_Handle & idh,TIds & ids)891 void CPSGDataLoader_Impl::GetIdsOnce(const CSeq_id_Handle& idh, TIds& ids)
892 {
893 if ( CannotProcess(idh) ) {
894 return;
895 }
896 auto seq_info = x_GetBioseqInfo(idh);
897 if (!seq_info) return;
898
899 ITERATE(SPsgBioseqInfo::TIds, it, seq_info->ids) {
900 ids.push_back(*it);
901 }
902 }
903
904
905 CDataLoader::SGiFound
GetGi(const CSeq_id_Handle & idh)906 CPSGDataLoader_Impl::GetGi(const CSeq_id_Handle& idh)
907 {
908 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetGiOnce, this,
909 cref(idh)),
910 "GetGi");
911 }
912
913
914 CDataLoader::SGiFound
GetGiOnce(const CSeq_id_Handle & idh)915 CPSGDataLoader_Impl::GetGiOnce(const CSeq_id_Handle& idh)
916 {
917 if ( CannotProcess(idh) ) {
918 return CDataLoader::SGiFound();
919 }
920 CDataLoader::SGiFound ret;
921 auto seq_info = x_GetBioseqInfo(idh);
922 if (seq_info) {
923 ret.sequence_found = true;
924 if ( seq_info->gi != ZERO_GI ) {
925 ret.gi = seq_info->gi;
926 }
927 }
928 return ret;
929 }
930
931
932 CDataLoader::SAccVerFound
GetAccVer(const CSeq_id_Handle & idh)933 CPSGDataLoader_Impl::GetAccVer(const CSeq_id_Handle& idh)
934 {
935 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetAccVerOnce, this,
936 cref(idh)),
937 "GetAccVer");
938 }
939
940
941 CDataLoader::SAccVerFound
GetAccVerOnce(const CSeq_id_Handle & idh)942 CPSGDataLoader_Impl::GetAccVerOnce(const CSeq_id_Handle& idh)
943 {
944 if ( CannotProcess(idh) ) {
945 return CDataLoader::SAccVerFound();
946 }
947 CDataLoader::SAccVerFound ret;
948 auto seq_info = x_GetBioseqInfo(idh);
949 if (seq_info) {
950 ret.sequence_found = true;
951 if ( seq_info->canonical.IsAccVer() ) {
952 ret.acc_ver = seq_info->canonical;
953 }
954 }
955 return ret;
956 }
957
958
GetTaxId(const CSeq_id_Handle & idh)959 TTaxId CPSGDataLoader_Impl::GetTaxId(const CSeq_id_Handle& idh)
960 {
961 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetTaxIdOnce, this,
962 cref(idh)),
963 "GetTaxId");
964 }
965
966
GetTaxIdOnce(const CSeq_id_Handle & idh)967 TTaxId CPSGDataLoader_Impl::GetTaxIdOnce(const CSeq_id_Handle& idh)
968 {
969 if ( CannotProcess(idh) ) {
970 return INVALID_TAX_ID;
971 }
972 auto seq_info = x_GetBioseqInfo(idh);
973 return seq_info ? seq_info->tax_id : INVALID_TAX_ID;
974 }
975
976
GetSequenceLength(const CSeq_id_Handle & idh)977 TSeqPos CPSGDataLoader_Impl::GetSequenceLength(const CSeq_id_Handle& idh)
978 {
979 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceLengthOnce, this,
980 cref(idh)),
981 "GetSequenceLength");
982 }
983
984
GetSequenceLengthOnce(const CSeq_id_Handle & idh)985 TSeqPos CPSGDataLoader_Impl::GetSequenceLengthOnce(const CSeq_id_Handle& idh)
986 {
987 if ( CannotProcess(idh) ) {
988 return kInvalidSeqPos;
989 }
990 auto seq_info = x_GetBioseqInfo(idh);
991 return (seq_info && seq_info->length > 0) ? TSeqPos(seq_info->length) : kInvalidSeqPos;
992 }
993
994
995 CDataLoader::SHashFound
GetSequenceHash(const CSeq_id_Handle & idh)996 CPSGDataLoader_Impl::GetSequenceHash(const CSeq_id_Handle& idh)
997 {
998 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceHashOnce, this,
999 cref(idh)),
1000 "GetSequenceHash");
1001 }
1002
1003
1004 CDataLoader::SHashFound
GetSequenceHashOnce(const CSeq_id_Handle & idh)1005 CPSGDataLoader_Impl::GetSequenceHashOnce(const CSeq_id_Handle& idh)
1006 {
1007 if ( CannotProcess(idh) ) {
1008 return CDataLoader::SHashFound();
1009 }
1010 CDataLoader::SHashFound ret;
1011 auto seq_info = x_GetBioseqInfo(idh);
1012 if (seq_info) {
1013 ret.sequence_found = true;
1014 if ( ret.hash ) {
1015 ret.hash_known = true;
1016 ret.hash = seq_info->hash;
1017 }
1018 }
1019 return ret;
1020 }
1021
1022
1023 CDataLoader::STypeFound
GetSequenceType(const CSeq_id_Handle & idh)1024 CPSGDataLoader_Impl::GetSequenceType(const CSeq_id_Handle& idh)
1025 {
1026 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceTypeOnce, this,
1027 cref(idh)),
1028 "GetSequenceType");
1029 }
1030
1031
1032 CDataLoader::STypeFound
GetSequenceTypeOnce(const CSeq_id_Handle & idh)1033 CPSGDataLoader_Impl::GetSequenceTypeOnce(const CSeq_id_Handle& idh)
1034 {
1035 if ( CannotProcess(idh) ) {
1036 return CDataLoader::STypeFound();
1037 }
1038 CDataLoader::STypeFound ret;
1039 auto seq_info = x_GetBioseqInfo(idh);
1040 if (seq_info && seq_info->molecule_type != CSeq_inst::eMol_not_set) {
1041 ret.sequence_found = true;
1042 ret.type = seq_info->molecule_type;
1043 }
1044 return ret;
1045 }
1046
1047
GetSequenceState(const CSeq_id_Handle & idh)1048 int CPSGDataLoader_Impl::GetSequenceState(const CSeq_id_Handle& idh)
1049 {
1050 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetSequenceStateOnce, this,
1051 cref(idh)),
1052 "GetSequenceState");
1053 }
1054
1055
GetSequenceStateOnce(const CSeq_id_Handle & idh)1056 int CPSGDataLoader_Impl::GetSequenceStateOnce(const CSeq_id_Handle& idh)
1057 {
1058 const int kNotFound = (CBioseq_Handle::fState_not_found |
1059 CBioseq_Handle::fState_no_data);
1060 if ( CannotProcess(idh) ) {
1061 return kNotFound;
1062 }
1063 auto seq_info = x_GetBioseqInfo(idh);
1064 if (!seq_info) {
1065 return kNotFound;
1066 }
1067 if (seq_info->included_info & CPSG_Request_Resolve::fState) {
1068 switch (seq_info->state) {
1069 case CPSG_BioseqInfo::eDead:
1070 return CBioseq_Handle::fState_dead;
1071 case CPSG_BioseqInfo::eReserved:
1072 return CBioseq_Handle::fState_suppress_perm;
1073 case CPSG_BioseqInfo::eLive:
1074 return CBioseq_Handle::fState_none;
1075 default:
1076 LOG_POST(Warning << "CPSGDataLoader: uknown " << idh << " state: " << seq_info->state);
1077 return CBioseq_Handle::fState_none;
1078 }
1079 }
1080 return CBioseq_Handle::fState_none;
1081 }
1082
1083
1084 CDataLoader::TTSE_LockSet
GetRecords(CDataSource * data_source,const CSeq_id_Handle & idh,CDataLoader::EChoice choice)1085 CPSGDataLoader_Impl::GetRecords(CDataSource* data_source,
1086 const CSeq_id_Handle& idh,
1087 CDataLoader::EChoice choice)
1088 {
1089 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetRecordsOnce, this,
1090 data_source, cref(idh), choice),
1091 "GetRecords");
1092 }
1093
1094
1095 CDataLoader::TTSE_LockSet
GetRecordsOnce(CDataSource * data_source,const CSeq_id_Handle & idh,CDataLoader::EChoice choice)1096 CPSGDataLoader_Impl::GetRecordsOnce(CDataSource* data_source,
1097 const CSeq_id_Handle& idh,
1098 CDataLoader::EChoice choice)
1099 {
1100 CDataLoader::TTSE_LockSet locks;
1101 if (choice == CDataLoader::eOrphanAnnot) {
1102 // PSG loader doesn't provide orphan annotations
1103 return locks;
1104 }
1105 if ( CannotProcess(idh) ) {
1106 return locks;
1107 }
1108
1109 CPSG_BioId bio_id = x_GetBioId(idh);
1110 auto context = make_shared<CPsgClientContext>();
1111 auto request = make_shared<CPSG_Request_Biodata>(move(bio_id), context);
1112
1113 CPSG_Request_Biodata::EIncludeData inc_data = CPSG_Request_Biodata::eNoTSE;
1114 if (data_source) {
1115 inc_data = m_TSERequestMode;
1116 CDataSource::TLoadedBlob_ids loaded_blob_ids;
1117 data_source->GetLoadedBlob_ids(idh, CDataSource::fLoaded_bioseqs, loaded_blob_ids);
1118 ITERATE(CDataSource::TLoadedBlob_ids, loaded_blob_id, loaded_blob_ids) {
1119 const CPsgBlobId* pbid = dynamic_cast<const CPsgBlobId*>(&**loaded_blob_id);
1120 if (!pbid) continue;
1121 request->ExcludeTSE(CPSG_BlobId(pbid->ToPsgId()));
1122 }
1123 }
1124 request->IncludeData(inc_data);
1125 auto reply = x_ProcessRequest(request);
1126 CTSE_Lock tse_lock = x_ProcessBlobReply(reply, data_source, idh, true, true).lock;
1127
1128 if (!tse_lock) {
1129 // TODO: return correct state with CBlobStateException
1130 if ( 0 ) {
1131 NCBI_THROW(CLoaderException, eLoaderFailed,
1132 "error loading blob for " + idh.AsString());
1133 }
1134 }
1135 else {
1136 locks.insert(tse_lock);
1137 }
1138 return locks;
1139 }
1140
1141
GetBlobId(const CSeq_id_Handle & idh)1142 CRef<CPsgBlobId> CPSGDataLoader_Impl::GetBlobId(const CSeq_id_Handle& idh)
1143 {
1144 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetBlobIdOnce, this,
1145 cref(idh)),
1146 "GetBlobId");
1147 }
1148
1149
GetBlobIdOnce(const CSeq_id_Handle & idh)1150 CRef<CPsgBlobId> CPSGDataLoader_Impl::GetBlobIdOnce(const CSeq_id_Handle& idh)
1151 {
1152 if ( CannotProcess(idh) ) {
1153 return null;
1154 }
1155 string blob_id;
1156
1157 // Check cache first.
1158 auto seq_info = x_GetBioseqInfo(idh);
1159 if (seq_info && !seq_info->blob_id.empty()) {
1160 blob_id = seq_info->blob_id;
1161 }
1162 else {
1163 CPSG_BioId bio_id = x_GetBioId(idh);
1164 auto context = make_shared<CPsgClientContext>();
1165 auto request = make_shared<CPSG_Request_Biodata>(move(bio_id), context);
1166 request->IncludeData(CPSG_Request_Biodata::eNoTSE);
1167 auto reply = x_ProcessRequest(request);
1168 blob_id = x_ProcessBlobReply(reply, nullptr, idh, true).blob_id;
1169 }
1170 CRef<CPsgBlobId> ret;
1171 if (!blob_id.empty()) {
1172 ret.Reset(new CPsgBlobId(blob_id));
1173 }
1174 return ret;
1175 }
1176
1177
1178 static bool x_IsLocalCDDEntryId(const CPsgBlobId& blob_id);
1179 static bool x_ParseLocalCDDEntryId(const CPsgBlobId& blob_id,
1180 CSeq_id_Handle& gi, CSeq_id_Handle& acc_ver);
1181 static CTSE_Lock x_CreateLocalCDDEntry(CDataSource* data_source,
1182 const CSeq_id_Handle& gi,
1183 const CSeq_id_Handle& acc_ver);
1184
1185
1186
1187 static bool s_GetBlobByIdShouldFail = false;
1188
SetGetBlobByIdShouldFail(bool value)1189 void CPSGDataLoader_Impl::SetGetBlobByIdShouldFail(bool value)
1190 {
1191 s_GetBlobByIdShouldFail = value;
1192 }
1193
1194
GetGetBlobByIdShouldFail()1195 bool CPSGDataLoader_Impl::GetGetBlobByIdShouldFail()
1196 {
1197 return s_GetBlobByIdShouldFail;
1198 }
1199
1200
GetBlobById(CDataSource * data_source,const CPsgBlobId & blob_id)1201 CTSE_Lock CPSGDataLoader_Impl::GetBlobById(CDataSource* data_source, const CPsgBlobId& blob_id)
1202 {
1203 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetBlobByIdOnce, this,
1204 data_source, cref(blob_id)),
1205 "GetBlobById",
1206 GetGetBlobByIdShouldFail()? 1: 0);
1207 }
1208
1209
GetBlobByIdOnce(CDataSource * data_source,const CPsgBlobId & blob_id)1210 CTSE_Lock CPSGDataLoader_Impl::GetBlobByIdOnce(CDataSource* data_source, const CPsgBlobId& blob_id)
1211 {
1212 if (!data_source) return CTSE_Lock();
1213
1214 if ( GetGetBlobByIdShouldFail() ) {
1215 _TRACE("GetBlobById("<<blob_id.ToPsgId()<<") should fail");
1216 }
1217 #ifdef LOCK4GET
1218 CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(&blob_id);
1219 CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
1220 if ( load_lock.IsLoaded() ) {
1221 _TRACE("GetBlobById() already loaded " << blob_id.ToPsgId());
1222 return load_lock;
1223 }
1224 #else
1225 CTSE_LoadLock load_lock;
1226 {{
1227 CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(&blob_id);
1228 load_lock = data_source->GetTSE_LoadLockIfLoaded(dl_blob_id);
1229 if ( load_lock && load_lock.IsLoaded() ) {
1230 _TRACE("GetBlobById() already loaded " << blob_id.ToPsgId());
1231 return load_lock;
1232 }
1233 }}
1234 #endif
1235
1236 CTSE_Lock ret;
1237 if ( x_IsLocalCDDEntryId(blob_id) ) {
1238 if ( s_GetDebugLevel() >= 5 ) {
1239 LOG_POST(Info<<"PSG loader: Re-loading CDD blob: " << blob_id.ToString());
1240 }
1241 CSeq_id_Handle gi, acc_ver;
1242 if ( x_ParseLocalCDDEntryId(blob_id, gi, acc_ver) ) {
1243 ret = x_CreateLocalCDDEntry(data_source, gi, acc_ver);
1244 }
1245 }
1246 else {
1247 CPSG_BlobId bid(blob_id.ToPsgId());
1248 auto context = make_shared<CPsgClientContext>();
1249 auto request = make_shared<CPSG_Request_Blob>(bid, context);
1250 request->IncludeData(m_TSERequestMode);
1251 auto reply = x_ProcessRequest(request);
1252 ret = x_ProcessBlobReply(reply, data_source, CSeq_id_Handle(), true, false, &load_lock).lock;
1253 }
1254 if (!ret) {
1255 _TRACE("Failed to load blob for " << blob_id.ToPsgId()<<" @ "<<CStackTrace());
1256 NCBI_THROW(CLoaderException, eLoaderFailed,
1257 "CPSGDataLoader::GetBlobById("+blob_id.ToPsgId()+") failed");
1258 }
1259 return ret;
1260 }
1261
1262
1263 class CPSG_Blob_Task : public CPSG_Task
1264 {
1265 public:
CPSG_Blob_Task(TReply reply,CPSG_TaskGroup & group,const CSeq_id_Handle & idh,CDataSource * data_source,CPSGDataLoader_Impl & loader,bool lock_asap=false,CTSE_LoadLock * load_lock_ptr=nullptr)1266 CPSG_Blob_Task(
1267 TReply reply,
1268 CPSG_TaskGroup& group,
1269 const CSeq_id_Handle& idh,
1270 CDataSource* data_source,
1271 CPSGDataLoader_Impl& loader,
1272 bool lock_asap = false,
1273 CTSE_LoadLock* load_lock_ptr = nullptr)
1274 : CPSG_Task(reply, group),
1275 m_Id(idh),
1276 m_DataSource(data_source),
1277 m_Loader(loader),
1278 m_LockASAP(lock_asap),
1279 m_LoadLockPtr(load_lock_ptr)
1280 {
1281 }
1282
~CPSG_Blob_Task(void)1283 ~CPSG_Blob_Task(void) override {}
1284
1285 struct SAutoReleaseLock {
SAutoReleaseLockCPSG_Blob_Task::SAutoReleaseLock1286 SAutoReleaseLock(bool lock_asap, CTSE_LoadLock*& lock_ptr)
1287 : m_LockPtr(lock_ptr)
1288 {
1289 if ( lock_asap && !m_LockPtr ) {
1290 m_LockPtr = &m_LocalLock;
1291 }
1292 }
~SAutoReleaseLockCPSG_Blob_Task::SAutoReleaseLock1293 ~SAutoReleaseLock()
1294 {
1295 m_LockPtr = 0;
1296 }
1297
1298 CTSE_LoadLock m_LocalLock;
1299 CTSE_LoadLock*& m_LockPtr;
1300 };
1301
1302 typedef pair<shared_ptr<CPSG_BlobInfo>, shared_ptr<CPSG_BlobData>> TBlobSlot;
1303 typedef map<string, TBlobSlot> TTSEBlobMap; // by PSG blob_id
1304 typedef map<string, map<TChunkId, TBlobSlot>> TChunkBlobMap; // by id2_info, id2_chunk
1305
1306 CSeq_id_Handle m_Id;
1307 shared_ptr<CPSG_SkippedBlob> m_Skipped;
1308 CPSGDataLoader_Impl::SReplyResult m_ReplyResult;
1309 shared_ptr<SPsgBlobInfo> m_PsgBlobInfo;
1310
1311 const TBlobSlot* GetTSESlot(const string& psg_id) const;
1312 const TBlobSlot* GetChunkSlot(const string& id2_info, TChunkId chunk_id) const;
1313 const TBlobSlot* GetBlobSlot(const CPSG_DataId& id) const;
1314 TBlobSlot* SetBlobSlot(const CPSG_DataId& id);
GetLoadLock() const1315 CTSE_LoadLock& GetLoadLock() const
1316 {
1317 _ASSERT(m_LoadLockPtr);
1318 return *m_LoadLockPtr;
1319 }
1320 void ObtainLoadLock();
1321 bool GotBlobData(const string& psg_blob_id) const;
1322 CPSGDataLoader_Impl::SReplyResult WaitForSkipped(void);
1323
Finish(void)1324 void Finish(void) override
1325 {
1326 m_Skipped.reset();
1327 m_ReplyResult = CPSGDataLoader_Impl::SReplyResult();
1328 m_PsgBlobInfo.reset();
1329 m_TSEBlobMap.clear();
1330 m_ChunkBlobMap.clear();
1331 m_BlobIdMap.clear();
1332 }
1333
SetDLBlobId(const string & psg_blob_id,CDataLoader::TBlobId dl_blob_id)1334 void SetDLBlobId(const string& psg_blob_id, CDataLoader::TBlobId dl_blob_id)
1335 {
1336 m_BlobIdMap[psg_blob_id] = dl_blob_id;
1337 }
1338
GetDLBlobId(const string & psg_blob_id) const1339 CDataLoader::TBlobId GetDLBlobId(const string& psg_blob_id) const
1340 {
1341 auto iter = m_BlobIdMap.find(psg_blob_id);
1342 if ( iter != m_BlobIdMap.end() ) {
1343 return iter->second;
1344 }
1345 return CDataLoader::TBlobId(new CPsgBlobId(psg_blob_id));
1346 }
1347
1348 protected:
1349 void DoExecute(void) override;
1350 void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override;
1351
1352 private:
1353 CDataSource* m_DataSource;
1354 CPSGDataLoader_Impl& m_Loader;
1355 bool m_LockASAP;
1356 CTSE_LoadLock* m_LoadLockPtr;
1357 TTSEBlobMap m_TSEBlobMap;
1358 TChunkBlobMap m_ChunkBlobMap;
1359 map<string, CDataLoader::TBlobId> m_BlobIdMap;
1360 };
1361
1362
GetTSESlot(const string & blob_id) const1363 const CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::GetTSESlot(const string& blob_id) const
1364 {
1365 auto iter = m_TSEBlobMap.find(blob_id);
1366 if ( iter != m_TSEBlobMap.end() ) {
1367 return &iter->second;
1368 }
1369 return 0;
1370 }
1371
1372
GetChunkSlot(const string & id2_info,TChunkId chunk_id) const1373 const CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::GetChunkSlot(const string& id2_info,
1374 TChunkId chunk_id) const
1375 {
1376 auto iter = m_ChunkBlobMap.find(id2_info);
1377 if ( iter != m_ChunkBlobMap.end() ) {
1378 auto iter2 = iter->second.find(chunk_id);
1379 if ( iter2 != iter->second.end() ) {
1380 return &iter2->second;
1381 }
1382 }
1383 return 0;
1384 }
1385
1386
GetBlobSlot(const CPSG_DataId & id) const1387 const CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::GetBlobSlot(const CPSG_DataId& id) const
1388 {
1389 if ( auto tse_id = dynamic_cast<const CPSG_BlobId*>(&id) ) {
1390 return GetTSESlot(tse_id->GetId());
1391 }
1392 else if ( auto chunk_id = dynamic_cast<const CPSG_ChunkId*>(&id) ) {
1393 return GetChunkSlot(chunk_id->GetId2Info(), chunk_id->GetId2Chunk());
1394 }
1395 return 0;
1396 }
1397
1398
SetBlobSlot(const CPSG_DataId & id)1399 CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::SetBlobSlot(const CPSG_DataId& id)
1400 {
1401 if ( auto tse_id = dynamic_cast<const CPSG_BlobId*>(&id) ) {
1402 _TRACE("Blob slot for tse_id="<<tse_id->GetId());
1403 return &m_TSEBlobMap[tse_id->GetId()];
1404 }
1405 else if ( auto chunk_id = dynamic_cast<const CPSG_ChunkId*>(&id) ) {
1406 _TRACE("Blob slot for id2_info="<<chunk_id->GetId2Info()<<" chunk="<<chunk_id->GetId2Chunk());
1407 return &m_ChunkBlobMap[chunk_id->GetId2Info()][chunk_id->GetId2Chunk()];
1408 }
1409 return 0;
1410 }
1411
1412
GotBlobData(const string & psg_blob_id) const1413 bool CPSG_Blob_Task::GotBlobData(const string& psg_blob_id) const
1414 {
1415 const TBlobSlot* main_blob_slot = GetTSESlot(psg_blob_id);
1416 if ( !main_blob_slot || !main_blob_slot->first ) {
1417 // no TSE blob props yet
1418 if ( s_GetDebugLevel() >= 6 ) {
1419 LOG_POST("GotBlobData("<<psg_blob_id<<"): no TSE blob props");
1420 }
1421 return false;
1422 }
1423 if ( main_blob_slot->second ) {
1424 // got TSE blob data
1425 if ( s_GetDebugLevel() >= 6 ) {
1426 LOG_POST("GotBlobData("<<psg_blob_id<<"): got TSE blob data");
1427 }
1428 return true;
1429 }
1430 auto id2_info = main_blob_slot->first->GetId2Info();
1431 if ( id2_info.empty() ) {
1432 // TSE doesn't have split info
1433 if ( s_GetDebugLevel() >= 6 ) {
1434 LOG_POST("GotBlobData("<<psg_blob_id<<"): not split");
1435 }
1436 return false;
1437 }
1438 const TBlobSlot* split_blob_slot = GetChunkSlot(id2_info, kSplitInfoChunkId);
1439 if ( !split_blob_slot || !split_blob_slot->second ) {
1440 // no split info blob data yet
1441 if ( s_GetDebugLevel() >= 6 ) {
1442 LOG_POST("GotBlobData("<<psg_blob_id<<"): no split blob data");
1443 }
1444 return false;
1445 }
1446 else {
1447 // got split info blob data
1448 if ( s_GetDebugLevel() >= 6 ) {
1449 LOG_POST("GotBlobData("<<psg_blob_id<<"): got split blob data");
1450 }
1451 return true;
1452 }
1453 }
1454
1455
ObtainLoadLock()1456 void CPSG_Blob_Task::ObtainLoadLock()
1457 {
1458 if ( !m_LockASAP ) {
1459 // load lock is not requested
1460 return;
1461 }
1462 if ( GetLoadLock() ) {
1463 // load lock already obtained
1464 return;
1465 }
1466 if ( m_ReplyResult.blob_id.empty() ) {
1467 // blob id is not known yet
1468 return;
1469 }
1470 if ( !GotBlobData(m_ReplyResult.blob_id) ) {
1471 return;
1472 }
1473 if ( s_GetDebugLevel() >= 6 ) {
1474 LOG_POST("ObtainLoadLock("<<m_ReplyResult.blob_id<<"): getting load lock");
1475 }
1476 GetLoadLock() = m_DataSource->GetTSE_LoadLock(GetDLBlobId(m_ReplyResult.blob_id));
1477 if ( s_GetDebugLevel() >= 6 ) {
1478 LOG_POST("ObtainLoadLock("<<m_ReplyResult.blob_id<<"): obtained load lock");
1479 }
1480 }
1481
1482
DoExecute(void)1483 void CPSG_Blob_Task::DoExecute(void)
1484 {
1485 _TRACE("CPSG_Blob_Task::DoExecute()");
1486 if (!CheckReplyStatus()) return;
1487 SAutoReleaseLock lock_guard(m_LockASAP, m_LoadLockPtr);
1488 ReadReply();
1489 if (m_Status == eFailed) return;
1490 if (m_Skipped) {
1491 m_Status = eCompleted;
1492 return;
1493 }
1494
1495 if (m_ReplyResult.blob_id.empty()) {
1496 // If the source request was for blob rather than bioseq, there may be no bioseq info
1497 // and blob_id stays empty.
1498 if (m_Reply->GetRequest()->GetType() == "blob") {
1499 shared_ptr<const CPSG_Request_Blob> blob_request = static_pointer_cast<const CPSG_Request_Blob>(m_Reply->GetRequest());
1500 if (blob_request) {
1501 m_ReplyResult.blob_id = blob_request->GetId();
1502 }
1503 }
1504 }
1505 if (m_ReplyResult.blob_id.empty()) {
1506 _TRACE("no blob_id");
1507 m_Status = eCompleted;
1508 return;
1509 }
1510
1511 _TRACE("tse_id="<<m_ReplyResult.blob_id);
1512 if ( !m_LoadLockPtr ) {
1513 // to TSE requested
1514 m_Status = eCompleted;
1515 return;
1516 }
1517
1518 const TBlobSlot* main_blob_slot = GetTSESlot(m_ReplyResult.blob_id);
1519 if ( !main_blob_slot || !main_blob_slot->first ) {
1520 _TRACE("No blob info for tse_id="<<m_ReplyResult.blob_id);
1521 m_Status = eFailed;
1522 return;
1523 }
1524
1525 const TBlobSlot* split_blob_slot = 0;
1526 auto id2_info = main_blob_slot->first->GetId2Info();
1527 if ( !id2_info.empty() ) {
1528 split_blob_slot = GetChunkSlot(id2_info, kSplitInfoChunkId);
1529 if ( !split_blob_slot || !split_blob_slot->first ) {
1530 _TRACE("No split info tse_id="<<m_ReplyResult.blob_id<<" id2_info="<<id2_info);
1531 }
1532 }
1533
1534 // Find or create main blob-info entry.
1535 m_PsgBlobInfo = m_Loader.m_BlobMap->FindBlob(m_ReplyResult.blob_id);
1536 if (!m_PsgBlobInfo) {
1537 m_PsgBlobInfo = make_shared<SPsgBlobInfo>(*main_blob_slot->first);
1538 m_Loader.m_BlobMap->AddBlob(m_ReplyResult.blob_id, m_PsgBlobInfo);
1539 }
1540
1541 if (!m_DataSource) {
1542 _TRACE("No data source for tse_id="<<m_ReplyResult.blob_id);
1543 // No data to load, just bioseq-info.
1544 m_Status = eCompleted;
1545 return;
1546 }
1547
1548 // Read blob data (if any) and pass to the data source.
1549 if ( CPSGDataLoader_Impl::GetGetBlobByIdShouldFail() ) {
1550 m_Status = eFailed;
1551 return;
1552 }
1553 CDataLoader::TBlobId dl_blob_id = GetDLBlobId(m_ReplyResult.blob_id);
1554 CTSE_LoadLock load_lock;
1555 if ( GetLoadLock() && GetLoadLock()->GetBlobId() == dl_blob_id ) {
1556 load_lock = GetLoadLock();
1557 }
1558 else {
1559 load_lock = m_DataSource->GetTSE_LoadLock(dl_blob_id);
1560 }
1561 if (!load_lock) {
1562 _TRACE("Cannot get TSE load lock for tse_id="<<m_ReplyResult.blob_id);
1563 m_Status = eFailed;
1564 return;
1565 }
1566 CPSGDataLoader_Impl::EMainChunkType main_chunk_type = CPSGDataLoader_Impl::eNoDelayedMainChunk;
1567 if ( load_lock.IsLoaded() ) {
1568 if ( load_lock->x_NeedsDelayedMainChunk() &&
1569 !load_lock->GetSplitInfo().GetChunk(kDelayedMain_ChunkId).IsLoaded() ) {
1570 main_chunk_type = CPSGDataLoader_Impl::eDelayedMainChunk;
1571 }
1572 else {
1573 _TRACE("Already loaded tse_id="<<m_ReplyResult.blob_id);
1574 m_ReplyResult.lock = load_lock;
1575 m_Status = eCompleted;
1576 return;
1577 }
1578 }
1579
1580 if ( split_blob_slot && split_blob_slot->first && split_blob_slot->second ) {
1581 auto& blob_id = *load_lock->GetBlobId();
1582 dynamic_cast<CPsgBlobId&>(const_cast<CBlobId&>(blob_id)).SetId2Info(id2_info);
1583 m_Loader.x_ReadBlobData(*m_PsgBlobInfo,
1584 *split_blob_slot->first,
1585 *split_blob_slot->second,
1586 load_lock,
1587 CPSGDataLoader_Impl::eIsSplitInfo);
1588 CTSE_Split_Info& tse_split_info = load_lock->GetSplitInfo();
1589 for ( auto& chunk_slot : m_ChunkBlobMap[id2_info] ) {
1590 TChunkId chunk_id = chunk_slot.first;
1591 if ( chunk_id == kSplitInfoChunkId ) {
1592 continue;
1593 }
1594 if ( !chunk_slot.second.first || !chunk_slot.second.second ) {
1595 continue;
1596 }
1597 CTSE_Chunk_Info* chunk = 0;
1598 try {
1599 chunk = &tse_split_info.GetChunk(chunk_id);
1600 }
1601 catch ( CException& /*ignored*/ ) {
1602 }
1603 if ( !chunk || chunk->IsLoaded() ) {
1604 continue;
1605 }
1606
1607 auto_ptr<CObjectIStream> in
1608 (CPSGDataLoader_Impl::GetBlobDataStream(*chunk_slot.second.first,
1609 *chunk_slot.second.second));
1610 CRef<CID2S_Chunk> id2_chunk(new CID2S_Chunk);
1611 *in >> *id2_chunk;
1612 if ( s_GetDebugLevel() >= 8 ) {
1613 LOG_POST(Info<<"PSG loader: TSE "<<chunk->GetBlobId().ToString()<<" "<<
1614 " chunk "<<chunk->GetChunkId()<<" "<<MSerial_AsnText<<*id2_chunk);
1615 }
1616
1617 CSplitParser::Load(*chunk, *id2_chunk);
1618 chunk->SetLoaded();
1619 }
1620 }
1621 else if ( main_blob_slot && main_blob_slot->first && main_blob_slot->second ) {
1622 m_Loader.x_ReadBlobData(*m_PsgBlobInfo,
1623 *main_blob_slot->first,
1624 *main_blob_slot->second,
1625 load_lock,
1626 CPSGDataLoader_Impl::eNoSplitInfo);
1627 }
1628 else {
1629 _TRACE("No data for tse_id="<<m_ReplyResult.blob_id);
1630 load_lock.Reset();
1631 }
1632 if ( load_lock ) {
1633 m_Loader.x_SetLoaded(load_lock, main_chunk_type);
1634 m_ReplyResult.lock = load_lock;
1635 m_Status = eCompleted;
1636 }
1637 else {
1638 m_Status = eFailed;
1639 }
1640 }
1641
1642
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)1643 void CPSG_Blob_Task::ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)
1644 {
1645 switch (item->GetType()) {
1646 case CPSG_ReplyItem::eBioseqInfo:
1647 {
1648 // Only one bioseq-info is allowed per reply.
1649 shared_ptr<CPSG_BioseqInfo> bioseq_info = static_pointer_cast<CPSG_BioseqInfo>(item);
1650 m_ReplyResult.blob_id = bioseq_info->GetBlobId().GetId();
1651 ObtainLoadLock();
1652 m_Loader.m_BioseqCache->Add(*bioseq_info, m_Id);
1653 break;
1654 }
1655 case CPSG_ReplyItem::eBlobInfo:
1656 {
1657 auto blob_info = static_pointer_cast<CPSG_BlobInfo>(item);
1658 _TRACE("Blob info: "<<blob_info->GetId()->Repr());
1659 if ( auto slot = SetBlobSlot(*blob_info->GetId()) ) {
1660 slot->first = blob_info;
1661 ObtainLoadLock();
1662 }
1663 break;
1664 }
1665 case CPSG_ReplyItem::eBlobData:
1666 {
1667 shared_ptr<CPSG_BlobData> data = static_pointer_cast<CPSG_BlobData>(item);
1668 _TRACE("Blob data: "<<data->GetId()->Repr());
1669 if ( auto slot = SetBlobSlot(*data->GetId()) ) {
1670 slot->second = data;
1671 ObtainLoadLock();
1672 }
1673 break;
1674 }
1675 case CPSG_ReplyItem::eSkippedBlob:
1676 {
1677 // Only main blob can be skipped.
1678 _ASSERT(!m_Skipped);
1679 m_Skipped = static_pointer_cast<CPSG_SkippedBlob>(item);
1680 break;
1681 }
1682 default:
1683 {
1684 break;
1685 }
1686 }
1687 }
1688
1689
WaitForSkipped(void)1690 CPSGDataLoader_Impl::SReplyResult CPSG_Blob_Task::WaitForSkipped(void)
1691 {
1692 CPSGDataLoader_Impl::SReplyResult ret;
1693 ret.blob_id = m_ReplyResult.blob_id;
1694 if (!m_DataSource) return ret;
1695
1696 CDataLoader::TBlobId dl_blob_id = GetDLBlobId(ret.blob_id);
1697 CTSE_LoadLock load_lock;
1698 _ASSERT(m_Skipped);
1699 CPSG_SkippedBlob::EReason skip_reason = m_Skipped->GetReason();
1700 switch (skip_reason) {
1701 case CPSG_SkippedBlob::eInProgress:
1702 // Try to wait for the blob to be loaded.
1703 load_lock = m_DataSource->GetLoadedTSE_Lock(dl_blob_id, CTimeout(1));
1704 if ( !load_lock && s_GetDebugLevel() >= 6 ) {
1705 LOG_POST("CPSGDataLoader: 'in progress' blob is not loaded: "<<dl_blob_id.ToString());
1706 }
1707 break;
1708 case CPSG_SkippedBlob::eSent:
1709 // Try to wait for the blob to be loaded.
1710 load_lock = m_DataSource->GetLoadedTSE_Lock(dl_blob_id, CTimeout(.2));
1711 if ( !load_lock && s_GetDebugLevel() >= 6 ) {
1712 LOG_POST("CPSGDataLoader: 'sent' blob is not loaded: "<<dl_blob_id.ToString());
1713 }
1714 break;
1715 case CPSG_SkippedBlob::eExcluded:
1716 // Check if the blob is already loaded, force loading if necessary.
1717 load_lock = m_DataSource->GetTSE_LoadLockIfLoaded(dl_blob_id);
1718 if ( !load_lock && s_GetDebugLevel() >= 6 ) {
1719 LOG_POST("CPSGDataLoader: 'excluded' blob is not loaded: "<<dl_blob_id.ToString());
1720 }
1721 break;
1722 default: // unknown
1723 return ret;
1724 }
1725 if (load_lock && load_lock.IsLoaded()) {
1726 m_Skipped.reset();
1727 ret.lock = load_lock;
1728 }
1729 return ret;
1730 }
1731
1732
GetBlobs(CDataSource * data_source,TTSE_LockSets & tse_sets)1733 void CPSGDataLoader_Impl::GetBlobs(CDataSource* data_source, TTSE_LockSets& tse_sets)
1734 {
1735 CallWithRetry(bind(&CPSGDataLoader_Impl::GetBlobsOnce, this,
1736 data_source, ref(tse_sets)),
1737 "GetBlobs");
1738 }
1739
1740
GetBlobsOnce(CDataSource * data_source,TTSE_LockSets & tse_sets)1741 void CPSGDataLoader_Impl::GetBlobsOnce(CDataSource* data_source, TTSE_LockSets& tse_sets)
1742 {
1743 if (!data_source) return;
1744 auto context = make_shared<CPsgClientContext_Bulk>();
1745 CPSG_TaskGroup group(*m_ThreadPool);
1746 ITERATE(TTSE_LockSets, tse_set, tse_sets) {
1747 const CSeq_id_Handle& id = tse_set->first;
1748 CPSG_BioId bio_id = x_GetBioId(id);
1749 auto request = make_shared<CPSG_Request_Biodata>(move(bio_id), context);
1750 request->IncludeData(m_TSERequestModeBulk);
1751 auto reply = x_ProcessRequest(request);
1752 CRef<CPSG_Blob_Task> task(
1753 new CPSG_Blob_Task(reply, group, id, data_source, *this, true));
1754 group.AddTask(task);
1755 }
1756 // Waiting for skipped blobs can block all pool threads. To prevent this postpone
1757 // waiting until all other tasks are completed.
1758 typedef list<CRef<CPSG_Blob_Task>> TTasks;
1759 TTasks skipped_tasks;
1760 list<shared_ptr<CPSG_Task_Guard>> guards;
1761 while (group.HasTasks()) {
1762 CRef<CPSG_Blob_Task> task(group.GetTask<CPSG_Blob_Task>().GetNCPointerOrNull());
1763 _ASSERT(task);
1764 guards.push_back(make_shared<CPSG_Task_Guard>(*task));
1765 if (task->GetStatus() == CThreadPool_Task::eFailed) {
1766 _TRACE("Failed to get blob for " << task->m_Id);
1767 group.CancelAll();
1768 NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load blobs for "+task->m_Id.AsString());
1769 }
1770 if (task->m_Skipped) {
1771 skipped_tasks.push_back(task);
1772 continue;
1773 }
1774 SReplyResult res = task->m_ReplyResult;
1775 if (task->m_ReplyResult.lock) tse_sets[task->m_Id].insert(task->m_ReplyResult.lock);
1776 }
1777 NON_CONST_ITERATE(TTasks, it, skipped_tasks) {
1778 CPSG_Blob_Task& task = **it;
1779 SReplyResult result = task.WaitForSkipped();
1780 if (!result.lock) {
1781 // Force reloading blob
1782 result = x_RetryBlobRequest(task.m_ReplyResult.blob_id, data_source, task.m_Id);
1783 }
1784 if (result.lock) tse_sets[task.m_Id].insert(result.lock);
1785 }
1786 }
1787
1788
LoadChunk(CDataSource * data_source,CTSE_Chunk_Info & chunk_info)1789 void CPSGDataLoader_Impl::LoadChunk(CDataSource* data_source,
1790 CTSE_Chunk_Info& chunk_info)
1791 {
1792 CDataLoader::TChunkSet chunks;
1793 chunks.push_back(Ref(&chunk_info));
1794 LoadChunks(data_source, chunks);
1795 }
1796
1797
1798 class CPSG_LoadChunk_Task : public CPSG_Task
1799 {
1800 public:
CPSG_LoadChunk_Task(TReply reply,CPSG_TaskGroup & group,CDataLoader::TChunk chunk)1801 CPSG_LoadChunk_Task(TReply reply, CPSG_TaskGroup& group, CDataLoader::TChunk chunk)
1802 : CPSG_Task(reply, group), m_Chunk(chunk) {}
1803
~CPSG_LoadChunk_Task(void)1804 ~CPSG_LoadChunk_Task(void) override {}
1805
Finish(void)1806 void Finish(void) override {
1807 m_Chunk.Reset();
1808 m_BlobInfo.reset();
1809 m_BlobData.reset();
1810 }
1811
1812 protected:
1813 void DoExecute(void) override;
1814 void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override;
1815
1816 private:
1817 CDataLoader::TChunk m_Chunk;
1818 shared_ptr<CPSG_BlobInfo> m_BlobInfo;
1819 shared_ptr<CPSG_BlobData> m_BlobData;
1820 };
1821
1822
DoExecute(void)1823 void CPSG_LoadChunk_Task::DoExecute(void)
1824 {
1825 if (!CheckReplyStatus()) return;
1826 ReadReply();
1827 if (m_Status == eFailed) return;
1828
1829 if (!m_BlobInfo || !m_BlobData) {
1830 _TRACE("Failed to get chunk info or data for blob-id " << m_Chunk->GetBlobId());
1831 m_Status = eFailed;
1832 return;
1833 }
1834
1835 if (IsCancelled()) return;
1836 auto_ptr<CObjectIStream> in(CPSGDataLoader_Impl::GetBlobDataStream(*m_BlobInfo, *m_BlobData));
1837 if (!in.get()) {
1838 _TRACE("Failed to open chunk data stream for blob-id " << m_BlobInfo->GetId()->Repr());
1839 m_Status = eFailed;
1840 return;
1841 }
1842
1843 CRef<CID2S_Chunk> id2_chunk(new CID2S_Chunk);
1844 *in >> *id2_chunk;
1845 if ( s_GetDebugLevel() >= 8 ) {
1846 LOG_POST(Info<<"PSG loader: TSE "<<m_Chunk->GetBlobId().ToString()<<" "<<
1847 " chunk "<<m_Chunk->GetChunkId()<<" "<<MSerial_AsnText<<*id2_chunk);
1848 }
1849 CSplitParser::Load(*m_Chunk, *id2_chunk);
1850 m_Chunk->SetLoaded();
1851
1852 m_Status = eCompleted;
1853 }
1854
1855
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)1856 void CPSG_LoadChunk_Task::ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)
1857 {
1858 switch (item->GetType()) {
1859 case CPSG_ReplyItem::eBlobInfo:
1860 m_BlobInfo = static_pointer_cast<CPSG_BlobInfo>(item);
1861 break;
1862 case CPSG_ReplyItem::eBlobData:
1863 m_BlobData = static_pointer_cast<CPSG_BlobData>(item);
1864 break;
1865 default:
1866 break;
1867 }
1868 }
1869
1870
1871 const char kCDDAnnotName[] = "CDD";
1872 const bool kCreateLocalCDDEntries = true;
1873 const char kLocalCDDEntryIdPrefix[] = "CDD:";
1874 const char kLocalCDDEntryIdSeparator = '|';
1875
x_MakeLocalCDDEntryId(const CSeq_id_Handle & gi,const CSeq_id_Handle & acc_ver)1876 static string x_MakeLocalCDDEntryId(const CSeq_id_Handle& gi, const CSeq_id_Handle& acc_ver)
1877 {
1878 ostringstream str;
1879 _ASSERT(gi && gi.IsGi());
1880 str << kLocalCDDEntryIdPrefix << gi.GetGi();
1881 if ( acc_ver ) {
1882 str << kLocalCDDEntryIdSeparator << acc_ver;
1883 }
1884 return str.str();
1885 }
1886
1887
x_IsLocalCDDEntryId(const CPsgBlobId & blob_id)1888 static bool x_IsLocalCDDEntryId(const CPsgBlobId& blob_id)
1889 {
1890 return NStr::StartsWith(blob_id.ToPsgId(), kLocalCDDEntryIdPrefix);
1891 }
1892
1893
x_ParseLocalCDDEntryId(const CPsgBlobId & blob_id,CSeq_id_Handle & gi,CSeq_id_Handle & acc_ver)1894 static bool x_ParseLocalCDDEntryId(const CPsgBlobId& blob_id,
1895 CSeq_id_Handle& gi, CSeq_id_Handle& acc_ver)
1896 {
1897 if ( !x_IsLocalCDDEntryId(blob_id) ) {
1898 return false;
1899 }
1900 istringstream str(blob_id.ToPsgId().substr(strlen(kLocalCDDEntryIdPrefix)));
1901 TIntId gi_id = 0;
1902 str >> gi_id;
1903 if ( !gi_id ) {
1904 return false;
1905 }
1906 gi = CSeq_id_Handle::GetGiHandle(GI_FROM(TIntId, gi_id));
1907 if ( str.get() == kLocalCDDEntryIdSeparator ) {
1908 string extra;
1909 str >> extra;
1910 acc_ver = CSeq_id_Handle::GetHandle(extra);
1911 }
1912 return true;
1913 }
1914
1915
x_LocalCDDEntryIdToBioId(const CPsgBlobId & blob_id)1916 static CPSG_BioId x_LocalCDDEntryIdToBioId(const CPsgBlobId& blob_id)
1917 {
1918 const string& str = blob_id.ToPsgId();
1919 size_t start = strlen(kLocalCDDEntryIdPrefix);
1920 size_t end = str.find(kLocalCDDEntryIdSeparator, start);
1921 return { str.substr(start, end-start), CSeq_id::e_Gi };
1922 }
1923
1924
x_CreateLocalCDDEntryChunk(const CSeq_id_Handle & id1,const CSeq_id_Handle & id2)1925 static CRef<CTSE_Chunk_Info> x_CreateLocalCDDEntryChunk(const CSeq_id_Handle& id1,
1926 const CSeq_id_Handle& id2)
1927 {
1928 if ( !id1 && !id2 ) {
1929 return null;
1930 }
1931 CRange<TSeqPos> range = CRange<TSeqPos>::GetWhole();
1932 CRef<CTSE_Chunk_Info> chunk(new CTSE_Chunk_Info(kDelayedMain_ChunkId));
1933 // add main annot types
1934 CAnnotName name = kCDDAnnotName;
1935 CSeqFeatData::ESubtype subtypes[] = {
1936 CSeqFeatData::eSubtype_region,
1937 CSeqFeatData::eSubtype_site
1938 };
1939 for ( auto subtype : subtypes ) {
1940 SAnnotTypeSelector type(subtype);
1941 if ( id1 ) {
1942 chunk->x_AddAnnotType(name, type, id1, range);
1943 }
1944 if ( id2 ) {
1945 chunk->x_AddAnnotType(name, type, id2, range);
1946 }
1947 }
1948 return chunk;
1949 }
1950
1951
x_CreateLocalCDDEntry(CDataSource * data_source,const CSeq_id_Handle & gi,const CSeq_id_Handle & acc_ver)1952 static CTSE_Lock x_CreateLocalCDDEntry(CDataSource* data_source,
1953 const CSeq_id_Handle& gi,
1954 const CSeq_id_Handle& acc_ver)
1955 {
1956 CRef<CPsgBlobId> blob_id(new CPsgBlobId(x_MakeLocalCDDEntryId(gi, acc_ver)));
1957 if ( auto chunk = x_CreateLocalCDDEntryChunk(gi, acc_ver) ) {
1958 CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(blob_id);
1959 CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
1960 if ( load_lock ) {
1961 if ( !load_lock.IsLoaded() ) {
1962 if ( CPSGDataLoader_Impl::GetGetBlobByIdShouldFail() ) {
1963 return CTSE_Lock();
1964 }
1965 load_lock->SetName(kCDDAnnotName);
1966 load_lock->GetSplitInfo().AddChunk(*chunk);
1967 _ASSERT(load_lock->x_NeedsDelayedMainChunk());
1968 load_lock.SetLoaded();
1969 }
1970 return load_lock;
1971 }
1972 }
1973 return CTSE_Lock();
1974 }
1975
1976
x_CreateEmptyLocalCDDEntry(CDataSource * data_source,CDataLoader::TChunk chunk)1977 static void x_CreateEmptyLocalCDDEntry(CDataSource* data_source,
1978 CDataLoader::TChunk chunk)
1979 {
1980 CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(chunk->GetBlobId());
1981 _ASSERT(load_lock);
1982 _ASSERT(load_lock.IsLoaded());
1983 _ASSERT(load_lock->HasNoSeq_entry());
1984 CRef<CSeq_entry> entry(new CSeq_entry);
1985 entry->SetSet().SetSeq_set();
1986 if ( s_GetDebugLevel() >= 8 ) {
1987 LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
1988 " created empty CDD entry");
1989 }
1990 load_lock->SetSeq_entry(*entry);
1991 chunk->SetLoaded();
1992 }
1993
1994
s_SameId(const CPSG_BlobId * id1,const CPSG_BlobId & id2)1995 static bool s_SameId(const CPSG_BlobId* id1, const CPSG_BlobId& id2)
1996 {
1997 return id1 && id1->GetId() == id2.GetId();
1998 }
1999
2000
x_ReadCDDChunk(CDataSource * data_source,CDataLoader::TChunk chunk,const CPSG_BlobInfo & blob_info,const CPSG_BlobData & blob_data)2001 bool CPSGDataLoader_Impl::x_ReadCDDChunk(CDataSource* data_source,
2002 CDataLoader::TChunk chunk,
2003 const CPSG_BlobInfo& blob_info,
2004 const CPSG_BlobData& blob_data)
2005 {
2006 _ASSERT(chunk->GetChunkId() == kDelayedMain_ChunkId);
2007 _DEBUG_ARG(const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk->GetBlobId()));
2008 _ASSERT(x_IsLocalCDDEntryId(blob_id));
2009 _ASSERT(!chunk->IsLoaded());
2010
2011 CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(chunk->GetBlobId());
2012 if ( !load_lock ||
2013 !load_lock.IsLoaded() ||
2014 !load_lock->x_NeedsDelayedMainChunk() ) {
2015 _TRACE("Cannot make CDD entry because of wrong TSE state id="<<blob_id.ToString());
2016 return false;
2017 }
2018
2019 unique_ptr<CObjectIStream> in(GetBlobDataStream(blob_info, blob_data));
2020 if (!in.get()) {
2021 _TRACE("Failed to open blob data stream for blob-id " << blob_id.ToString());
2022 return false;
2023 }
2024
2025 CRef<CSeq_entry> entry(new CSeq_entry);
2026 *in >> *entry;
2027 if ( s_GetDebugLevel() >= 8 ) {
2028 LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2029 MSerial_AsnText<<*entry);
2030 }
2031 load_lock->SetSeq_entry(*entry);
2032 chunk->SetLoaded();
2033 return true;
2034 }
2035
2036
2037 shared_ptr<CPSG_Request_Blob>
x_MakeLoadLocalCDDEntryRequest(CDataSource * data_source,CDataLoader::TChunk chunk,shared_ptr<CPsgClientContext_Bulk> context)2038 CPSGDataLoader_Impl::x_MakeLoadLocalCDDEntryRequest(CDataSource* data_source,
2039 CDataLoader::TChunk chunk,
2040 shared_ptr<CPsgClientContext_Bulk> context)
2041 {
2042 _ASSERT(chunk->GetChunkId() == kDelayedMain_ChunkId);
2043 const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk->GetBlobId());
2044 _ASSERT(x_IsLocalCDDEntryId(blob_id));
2045 _ASSERT(!chunk->IsLoaded());
2046
2047 bool failed = false;
2048 shared_ptr<CPSG_NamedAnnotInfo> cdd_info;
2049
2050 // load CDD blob id
2051 {{
2052 CPSG_BioId bio_id = x_LocalCDDEntryIdToBioId(blob_id);
2053 CPSG_Request_NamedAnnotInfo::TAnnotNames names = { kCDDAnnotName };
2054 _ASSERT(bio_id.GetId().find('|') == NPOS);
2055 auto request = make_shared<CPSG_Request_NamedAnnotInfo>(bio_id, names, context);
2056 request->IncludeData(m_TSERequestMode);
2057 auto reply = x_ProcessRequest(request);
2058 shared_ptr<CPSG_BioseqInfo> bioseq_info;
2059 shared_ptr<CPSG_BlobInfo> blob_info;
2060 shared_ptr<CPSG_BlobData> blob_data;
2061 for (;;) {
2062 auto reply_item = reply->GetNextItem(DEFAULT_DEADLINE);
2063 if (!reply_item) continue;
2064 if (reply_item->GetType() == CPSG_ReplyItem::eEndOfReply) break;
2065 EPSG_Status status = reply_item->GetStatus(0);
2066 if (status != EPSG_Status::eSuccess && status != EPSG_Status::eInProgress) {
2067 ReportStatus(reply_item, status);
2068 if ( status == EPSG_Status::eNotFound ) {
2069 continue;
2070 }
2071 failed = true;
2072 break;
2073 }
2074 if (status == EPSG_Status::eInProgress) {
2075 status = reply_item->GetStatus(CDeadline::eInfinite);
2076 }
2077 if (status != EPSG_Status::eSuccess) {
2078 ReportStatus(reply_item, status);
2079 failed = true;
2080 break;
2081 }
2082 if (reply_item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2083 bioseq_info = static_pointer_cast<CPSG_BioseqInfo>(reply_item);
2084 }
2085 if (reply_item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2086 auto na_info = static_pointer_cast<CPSG_NamedAnnotInfo>(reply_item);
2087 if ( NStr::EqualNocase(na_info->GetName(), kCDDAnnotName) ) {
2088 cdd_info = na_info;
2089 }
2090 }
2091 if (reply_item->GetType() == CPSG_ReplyItem::eBlobInfo) {
2092 blob_info = static_pointer_cast<CPSG_BlobInfo>(reply_item);
2093 }
2094 if (reply_item->GetType() == CPSG_ReplyItem::eBlobData) {
2095 blob_data = static_pointer_cast<CPSG_BlobData>(reply_item);
2096 }
2097 }
2098 if ( failed ) {
2099 // TODO
2100 x_CreateEmptyLocalCDDEntry(data_source, chunk);
2101 return nullptr;
2102 }
2103 if ( !cdd_info ) {
2104 x_CreateEmptyLocalCDDEntry(data_source, chunk);
2105 return nullptr;
2106 }
2107 // see if we got blob already
2108 if ( blob_info && s_SameId(blob_info->GetId<CPSG_BlobId>(), cdd_info->GetBlobId()) &&
2109 blob_data && s_SameId(blob_data->GetId<CPSG_BlobId>(), cdd_info->GetBlobId()) ) {
2110 _TRACE("Got CDD entry: "<<cdd_info->GetBlobId().Repr());
2111 if ( x_ReadCDDChunk(data_source, chunk, *blob_info, *blob_data) ) {
2112 return nullptr;
2113 }
2114 }
2115 }}
2116
2117 // load CDD blob request
2118 return make_shared<CPSG_Request_Blob>(cdd_info->GetBlobId(), context);
2119 }
2120
2121
LoadChunks(CDataSource * data_source,const CDataLoader::TChunkSet & chunks)2122 void CPSGDataLoader_Impl::LoadChunks(CDataSource* data_source,
2123 const CDataLoader::TChunkSet& chunks)
2124 {
2125 CallWithRetry(bind(&CPSGDataLoader_Impl::LoadChunksOnce, this,
2126 data_source, cref(chunks)),
2127 "LoadChunks");
2128 }
2129
2130
LoadChunksOnce(CDataSource * data_source,const CDataLoader::TChunkSet & chunks)2131 void CPSGDataLoader_Impl::LoadChunksOnce(CDataSource* data_source,
2132 const CDataLoader::TChunkSet& chunks)
2133 {
2134 if (chunks.empty()) return;
2135
2136 typedef map<void*, CDataLoader::TChunk> TChunkMap;
2137 TChunkMap chunk_map;
2138 auto context = make_shared<CPsgClientContext_Bulk>();
2139 ITERATE(CDataLoader::TChunkSet, it, chunks) {
2140 const CTSE_Chunk_Info& chunk = **it;
2141 if ( chunk.IsLoaded() ) {
2142 continue;
2143 }
2144 if ( chunk.GetChunkId() == kMasterWGS_ChunkId ) {
2145 CWGSMasterSupport::LoadWGSMaster(data_source->GetDataLoader(), *it);
2146 continue;
2147 }
2148 if ( chunk.GetChunkId() == kDelayedMain_ChunkId ) {
2149 const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId());
2150 shared_ptr<CPSG_Request_Blob> request;
2151 if ( x_IsLocalCDDEntryId(blob_id) ) {
2152 request = x_MakeLoadLocalCDDEntryRequest(data_source, *it, context);
2153 if ( !request ) {
2154 continue;
2155 }
2156 }
2157 else {
2158 request = make_shared<CPSG_Request_Blob>(blob_id.ToPsgId(), context);
2159 }
2160 request->IncludeData(m_TSERequestMode);
2161 chunk_map[request.get()] = *it;
2162 x_SendRequest(request);
2163 }
2164 else {
2165 const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId());
2166 auto request = make_shared<CPSG_Request_Chunk>(CPSG_ChunkId(chunk.GetChunkId(),
2167 blob_id.GetId2Info()),
2168 context);
2169 chunk_map[request.get()] = *it;
2170 x_SendRequest(request);
2171 }
2172 }
2173
2174 CPSG_TaskGroup group(*m_ThreadPool);
2175 list<shared_ptr<CPSG_Task_Guard>> guards;
2176 while (!chunk_map.empty()) {
2177 auto reply = context->GetReply();
2178 if (!reply) continue;
2179 TChunkMap::iterator chunk_it = chunk_map.find((void*)reply->GetRequest().get());
2180 _ASSERT(chunk_it != chunk_map.end());
2181 CDataLoader::TChunk chunk = chunk_it->second;
2182 chunk_map.erase(chunk_it);
2183 if ( chunk->GetChunkId() == kDelayedMain_ChunkId ) {
2184 CRef<CPSG_Blob_Task> task(new CPSG_Blob_Task(reply, group, CSeq_id_Handle(), data_source, *this, true));
2185 task->SetDLBlobId(dynamic_cast<const CPSG_Request_Blob&>(*reply->GetRequest()).GetBlobId().GetId(),
2186 chunk->GetBlobId());
2187 guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2188 group.AddTask(task);
2189 }
2190 else {
2191 CRef<CPSG_LoadChunk_Task> task(new CPSG_LoadChunk_Task(reply, group, chunk));
2192 guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2193 group.AddTask(task);
2194 }
2195 }
2196 group.WaitAll();
2197 // check if all chunks are loaded
2198 ITERATE(CDataLoader::TChunkSet, it, chunks) {
2199 const CTSE_Chunk_Info & chunk = **it;
2200 if (!chunk.IsLoaded()) {
2201 _TRACE("Failed to load chunk " << chunk.GetChunkId() << " of " << dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId()).ToPsgId());
2202 NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load some chunks");
2203 }
2204 }
2205 }
2206
2207
2208 class CPSG_AnnotRecordsNA_Task : public CPSG_Task
2209 {
2210 public:
CPSG_AnnotRecordsNA_Task(TReply reply,CPSG_TaskGroup & group)2211 CPSG_AnnotRecordsNA_Task( TReply reply, CPSG_TaskGroup& group)
2212 : CPSG_Task(reply, group) {}
2213
~CPSG_AnnotRecordsNA_Task(void)2214 ~CPSG_AnnotRecordsNA_Task(void) override {}
2215
2216 shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
2217 list<shared_ptr<CPSG_NamedAnnotInfo>> m_AnnotInfo;
2218
Finish(void)2219 void Finish(void) override {
2220 m_BioseqInfo.reset();
2221 m_AnnotInfo.clear();
2222 }
2223
2224 protected:
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)2225 void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2226 if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2227 m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
2228 }
2229 if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2230 m_AnnotInfo.push_back(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
2231 }
2232 }
2233 };
2234
2235 class CPSG_AnnotRecordsCDD_Task : public CPSG_Task
2236 {
2237 public:
CPSG_AnnotRecordsCDD_Task(TReply reply,CPSG_TaskGroup & group)2238 CPSG_AnnotRecordsCDD_Task( TReply reply, CPSG_TaskGroup& group)
2239 : CPSG_Task(reply, group) {}
2240
~CPSG_AnnotRecordsCDD_Task(void)2241 ~CPSG_AnnotRecordsCDD_Task(void) override {}
2242
2243 shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
2244 list<shared_ptr<CPSG_NamedAnnotInfo>> m_AnnotInfo;
2245
Finish(void)2246 void Finish(void) override {
2247 m_BioseqInfo.reset();
2248 m_AnnotInfo.clear();
2249 }
2250
2251 protected:
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)2252 void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2253 if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2254 m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
2255 }
2256 if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2257 m_AnnotInfo.push_back(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
2258 }
2259 }
2260 };
2261
2262 static
2263 pair<CRef<CTSE_Chunk_Info>, string>
s_CreateNAChunk(const CPSG_NamedAnnotInfo & psg_annot_info,const CPSG_BioseqInfo * bioseq_info)2264 s_CreateNAChunk(const CPSG_NamedAnnotInfo& psg_annot_info,
2265 const CPSG_BioseqInfo* bioseq_info)
2266 {
2267 pair<CRef<CTSE_Chunk_Info>, string> ret;
2268 CRef<CTSE_Chunk_Info> chunk(new CTSE_Chunk_Info(kDelayedMain_ChunkId));
2269 unsigned main_count = 0;
2270 unsigned zoom_count = 0;
2271 // detailed annot info
2272 set<string> names;
2273 for ( auto& annot_info_ref : psg_annot_info.GetId2AnnotInfoList() ) {
2274 if ( s_GetDebugLevel() >= 8 ) {
2275 LOG_POST(Info<<"PSG loader: NA info "<<MSerial_AsnText<<*annot_info_ref);
2276 }
2277 const CID2S_Seq_annot_Info& annot_info = *annot_info_ref;
2278 // create special external annotations blob
2279 CAnnotName name(annot_info.GetName());
2280 if ( name.IsNamed() && !ExtractZoomLevel(name.GetName(), 0, 0) ) {
2281 //setter.GetTSE_LoadLock()->SetName(name);
2282 names.insert(name.GetName());
2283 ++main_count;
2284 }
2285 else {
2286 ++zoom_count;
2287 }
2288
2289 vector<SAnnotTypeSelector> types;
2290 if ( annot_info.IsSetAlign() ) {
2291 types.push_back(SAnnotTypeSelector(CSeq_annot::C_Data::e_Align));
2292 }
2293 if ( annot_info.IsSetGraph() ) {
2294 types.push_back(SAnnotTypeSelector(CSeq_annot::C_Data::e_Graph));
2295 }
2296 if ( annot_info.IsSetFeat() ) {
2297 for ( auto feat_type_info_iter : annot_info.GetFeat() ) {
2298 const CID2S_Feat_type_Info& finfo = *feat_type_info_iter;
2299 int feat_type = finfo.GetType();
2300 if ( feat_type == 0 ) {
2301 types.push_back(SAnnotTypeSelector
2302 (CSeq_annot::C_Data::e_Seq_table));
2303 }
2304 else if ( !finfo.IsSetSubtypes() ) {
2305 types.push_back(SAnnotTypeSelector
2306 (CSeqFeatData::E_Choice(feat_type)));
2307 }
2308 else {
2309 for ( auto feat_subtype : finfo.GetSubtypes() ) {
2310 types.push_back(SAnnotTypeSelector
2311 (CSeqFeatData::ESubtype(feat_subtype)));
2312 }
2313 }
2314 }
2315 }
2316
2317 CTSE_Chunk_Info::TLocationSet loc;
2318 CSplitParser::x_ParseLocation(loc, annot_info.GetSeq_loc());
2319
2320 ITERATE ( vector<SAnnotTypeSelector>, it, types ) {
2321 chunk->x_AddAnnotType(name, *it, loc);
2322 }
2323 }
2324 if ( names.size() == 1 ) {
2325 ret.second = *names.begin();
2326 }
2327 if ( s_GetDebugLevel() >= 5 ) {
2328 LOG_POST(Info<<"PSG loader: TSE "<<psg_annot_info.GetBlobId().GetId()<<
2329 " annots: "<<ret.second<<" "<<main_count<<"+"<<zoom_count);
2330 }
2331 ret.first = chunk;
2332 return ret;
2333 }
2334
2335
GetAnnotRecordsNA(CDataSource * data_source,const CSeq_id_Handle & idh,const SAnnotSelector * sel,CDataLoader::TProcessedNAs * processed_nas)2336 CDataLoader::TTSE_LockSet CPSGDataLoader_Impl::GetAnnotRecordsNA(
2337 CDataSource* data_source,
2338 const CSeq_id_Handle& idh,
2339 const SAnnotSelector* sel,
2340 CDataLoader::TProcessedNAs* processed_nas)
2341 {
2342 return CallWithRetry(bind(&CPSGDataLoader_Impl::GetAnnotRecordsNAOnce, this,
2343 data_source, cref(idh), sel, processed_nas),
2344 "GetAnnotRecordsNA");
2345 }
2346
2347
GetAnnotRecordsNAOnce(CDataSource * data_source,const CSeq_id_Handle & idh,const SAnnotSelector * sel,CDataLoader::TProcessedNAs * processed_nas)2348 CDataLoader::TTSE_LockSet CPSGDataLoader_Impl::GetAnnotRecordsNAOnce(
2349 CDataSource* data_source,
2350 const CSeq_id_Handle& idh,
2351 const SAnnotSelector* sel,
2352 CDataLoader::TProcessedNAs* processed_nas)
2353 {
2354 CDataLoader::TTSE_LockSet locks;
2355 if ( !data_source ) {
2356 return locks;
2357 }
2358 if ( CannotProcess(idh) ) {
2359 return locks;
2360 }
2361 if ( sel && sel->IsIncludedAnyNamedAnnotAccession() ) {
2362 CPSG_BioId bio_id = x_GetBioId(idh);
2363 CPSG_Request_NamedAnnotInfo::TAnnotNames annot_names;
2364 const SAnnotSelector::TNamedAnnotAccessions& accs = sel->GetNamedAnnotAccessions();
2365 ITERATE(SAnnotSelector::TNamedAnnotAccessions, it, accs) {
2366 if ( kCreateLocalCDDEntries && NStr::EqualNocase(it->first, kCDDAnnotName) ) {
2367 // CDDs are added as external annotations
2368 continue;
2369 }
2370 annot_names.push_back(it->first);
2371 }
2372 auto context = make_shared<CPsgClientContext>();
2373 //_ASSERT(PsgIdToHandle(bio_id));
2374 auto request = make_shared<CPSG_Request_NamedAnnotInfo>(move(bio_id), annot_names, context);
2375 if ( auto reply = x_ProcessRequest(request) ) {
2376 CPSG_TaskGroup group(*m_ThreadPool);
2377 CRef<CPSG_AnnotRecordsNA_Task> task(new CPSG_AnnotRecordsNA_Task(reply, group));
2378 CPSG_Task_Guard guard(*task);
2379 group.AddTask(task);
2380 group.WaitAll();
2381
2382 if (task->GetStatus() == CThreadPool_Task::eCompleted) {
2383 for ( auto& info : task->m_AnnotInfo ) {
2384 CDataLoader::SetProcessedNA(info->GetName(), processed_nas);
2385 CRef<CPsgBlobId> blob_id(new CPsgBlobId(info->GetBlobId().GetId()));
2386 auto chunk_info = s_CreateNAChunk(*info, task->m_BioseqInfo.get());
2387 if ( chunk_info.first ) {
2388 CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(blob_id);
2389 CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
2390 if ( load_lock ) {
2391 if ( !load_lock.IsLoaded() ) {
2392 if ( !chunk_info.second.empty() ) {
2393 load_lock->SetName(chunk_info.second);
2394 }
2395 load_lock->GetSplitInfo().AddChunk(*chunk_info.first);
2396 _ASSERT(load_lock->x_NeedsDelayedMainChunk());
2397 load_lock.SetLoaded();
2398 }
2399 locks.insert(load_lock);
2400 }
2401 }
2402 else {
2403 // no annot info
2404 if ( auto tse_lock = GetBlobById(data_source, *blob_id) ) {
2405 locks.insert(tse_lock);
2406 }
2407 }
2408 }
2409 }
2410 else {
2411 _TRACE("Failed to load annotations for " << idh.AsString());
2412 }
2413 }
2414 }
2415 if ( kCreateLocalCDDEntries ) {
2416 CSeq_id_Handle gi;
2417 CSeq_id_Handle acc_ver;
2418 bool is_protein = true;
2419 TIds ids;
2420 GetIds(idh, ids);
2421 for ( auto id : ids ) {
2422 if ( id.IsGi() ) {
2423 gi = id;
2424 continue;
2425 }
2426 if ( id.Which() == CSeq_id::e_Pdb ) {
2427 if ( !acc_ver ) {
2428 acc_ver = id;
2429 }
2430 continue;
2431 }
2432 auto seq_id = id.GetSeqId();
2433 if ( auto text_id = seq_id->GetTextseq_Id() ) {
2434 auto acc_type = seq_id->IdentifyAccession();
2435 if ( acc_type & CSeq_id::fAcc_nuc ) {
2436 is_protein = false;
2437 break;
2438 }
2439 else if ( text_id->IsSetAccession() && text_id->IsSetVersion() &&
2440 (acc_type & CSeq_id::fAcc_prot) ) {
2441 is_protein = true;
2442 acc_ver = CSeq_id_Handle::GetHandle(text_id->GetAccession()+'.'+
2443 NStr::NumericToString(text_id->GetVersion()));
2444 }
2445 }
2446 }
2447 if ( is_protein && gi ) {
2448 if ( auto tse_lock = x_CreateLocalCDDEntry(data_source, gi, acc_ver) ) {
2449 locks.insert(tse_lock);
2450 }
2451 }
2452 }
2453 return locks;
2454 }
2455
2456
DropTSE(const CPsgBlobId & blob_id)2457 void CPSGDataLoader_Impl::DropTSE(const CPsgBlobId& blob_id)
2458 {
2459 m_BlobMap->DropBlob(blob_id);
2460 }
2461
2462
GetAccVers(const TIds & ids,TLoaded & loaded,TIds & ret)2463 void CPSGDataLoader_Impl::GetAccVers(const TIds& ids, TLoaded& loaded, TIds& ret)
2464 {
2465 CallWithRetry(bind(&CPSGDataLoader_Impl::GetAccVersOnce, this,
2466 cref(ids), ref(loaded), ref(ret)),
2467 "GetAccVers",
2468 6);
2469 }
2470
2471
GetAccVersOnce(const TIds & ids,TLoaded & loaded,TIds & ret)2472 void CPSGDataLoader_Impl::GetAccVersOnce(const TIds& ids, TLoaded& loaded, TIds& ret)
2473 {
2474 vector<shared_ptr<SPsgBioseqInfo>> infos;
2475 infos.resize(ret.size());
2476 auto counts = x_GetBulkBioseqInfo(CPSG_Request_Resolve::fCanonicalId, ids, loaded, infos);
2477 if ( counts.first ) {
2478 // have loaded infos
2479 for (size_t i = 0; i < infos.size(); ++i) {
2480 if (loaded[i] || !infos[i].get()) continue;
2481 CSeq_id_Handle idh = infos[i]->canonical;
2482 if (idh.IsAccVer()) {
2483 ret[i] = idh;
2484 }
2485 loaded[i] = true;
2486 }
2487 }
2488 if ( counts.second ) {
2489 NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load some acc.ver in bulk request");
2490 }
2491 }
2492
2493
GetGis(const TIds & ids,TLoaded & loaded,TGis & ret)2494 void CPSGDataLoader_Impl::GetGis(const TIds& ids, TLoaded& loaded, TGis& ret)
2495 {
2496 CallWithRetry(bind(&CPSGDataLoader_Impl::GetGisOnce, this,
2497 cref(ids), ref(loaded), ref(ret)),
2498 "GetAccVers",
2499 8);
2500 }
2501
2502
GetGisOnce(const TIds & ids,TLoaded & loaded,TGis & ret)2503 void CPSGDataLoader_Impl::GetGisOnce(const TIds& ids, TLoaded& loaded, TGis& ret)
2504 {
2505 vector<shared_ptr<SPsgBioseqInfo>> infos;
2506 infos.resize(ret.size());
2507 auto counts = x_GetBulkBioseqInfo(CPSG_Request_Resolve::fGi, ids, loaded, infos);
2508 if ( counts.first ) {
2509 // have loaded infos
2510 for (size_t i = 0; i < infos.size(); ++i) {
2511 if (loaded[i] || !infos[i].get()) continue;
2512 ret[i] = infos[i]->gi;
2513 loaded[i] = true;
2514 }
2515 }
2516 if ( counts.second ) {
2517 NCBI_THROW(CLoaderException, eLoaderFailed, "failed to load some acc.ver in bulk request");
2518 }
2519 }
2520
2521
x_GetBioId(const CSeq_id_Handle & idh)2522 CPSG_BioId CPSGDataLoader_Impl::x_GetBioId(const CSeq_id_Handle& idh)
2523 {
2524 CConstRef<CSeq_id> id = idh.GetSeqId();
2525 string label = id->AsFastaString();
2526 return CPSG_BioId(label, id->Which());
2527 }
2528
2529
x_SendRequest(shared_ptr<CPSG_Request> request)2530 void CPSGDataLoader_Impl::x_SendRequest(shared_ptr<CPSG_Request> request)
2531 {
2532 m_Queue->SendRequest(request, DEFAULT_DEADLINE);
2533 m_Thread->Wake();
2534 }
2535
2536
x_ProcessRequest(shared_ptr<CPSG_Request> request)2537 shared_ptr<CPSG_Reply> CPSGDataLoader_Impl::x_ProcessRequest(shared_ptr<CPSG_Request> request)
2538 {
2539 x_SendRequest(request);
2540 auto context = request->GetUserContext<CPsgClientContext>();
2541 _ASSERT(context);
2542 return context->GetReply();
2543 }
2544
2545
2546 CPSGDataLoader_Impl::SReplyResult
x_RetryBlobRequest(const string & blob_id,CDataSource * data_source,CSeq_id_Handle req_idh)2547 CPSGDataLoader_Impl::x_RetryBlobRequest(const string& blob_id, CDataSource* data_source, CSeq_id_Handle req_idh)
2548 {
2549 #ifdef LOCK4GET
2550 CDataLoader::TBlobId dl_blob_id(new CPsgBlobId(blob_id));
2551 CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
2552 if ( load_lock.IsLoaded() ) {
2553 _TRACE("x_RetryBlobRequest() already loaded " << blob_id);
2554 SReplyResult ret;
2555 ret.lock = load_lock;
2556 ret.blob_id = blob_id;
2557 return ret;
2558 }
2559 #else
2560 CTSE_LoadLock load_lock;
2561 {{
2562 CDataLoader::TBlobId dl_blob_id(new CPsgBlobId(blob_id));
2563 CTSE_LoadLock load_lock = data_source->GetTSE_LoadLockIfLoaded(dl_blob_id);
2564 if ( load_lock && load_lock.IsLoaded() ) {
2565 _TRACE("x_RetryBlobRequest() already loaded " << blob_id);
2566 SReplyResult ret;
2567 ret.lock = load_lock;
2568 ret.blob_id = blob_id;
2569 return ret;
2570 }
2571 }}
2572 #endif
2573
2574 CPSG_BlobId req_blob_id(blob_id);
2575 auto context = make_shared<CPsgClientContext>();
2576 auto blob_request = make_shared<CPSG_Request_Blob>(req_blob_id, context);
2577 blob_request->IncludeData(m_TSERequestMode);
2578 auto blob_reply = x_ProcessRequest(blob_request);
2579 return x_ProcessBlobReply(blob_reply, data_source, req_idh, false, false, &load_lock);
2580 }
2581
2582
x_ProcessBlobReply(shared_ptr<CPSG_Reply> reply,CDataSource * data_source,CSeq_id_Handle req_idh,bool retry,bool lock_asap,CTSE_LoadLock * load_lock)2583 CPSGDataLoader_Impl::SReplyResult CPSGDataLoader_Impl::x_ProcessBlobReply(
2584 shared_ptr<CPSG_Reply> reply,
2585 CDataSource* data_source,
2586 CSeq_id_Handle req_idh,
2587 bool retry,
2588 bool lock_asap,
2589 CTSE_LoadLock* load_lock)
2590 {
2591 SReplyResult ret;
2592
2593 if (!reply) {
2594 _TRACE("Request failed: null reply");
2595 return ret;
2596 }
2597
2598 CPSG_TaskGroup group(*m_ThreadPool);
2599 CRef<CPSG_Blob_Task> task(
2600 new CPSG_Blob_Task(reply, group, req_idh, data_source, *this, lock_asap, load_lock));
2601 CPSG_Task_Guard guard(*task);
2602 group.AddTask(task);
2603 group.WaitAll();
2604
2605 if (task->GetStatus() == CThreadPool_Task::eCompleted) {
2606 if (task->m_Skipped) {
2607 ret = task->WaitForSkipped();
2608 if (!ret.lock && retry) {
2609 // Force reloading blob
2610 ret = x_RetryBlobRequest(task->m_ReplyResult.blob_id, data_source, req_idh);
2611 }
2612 }
2613 else {
2614 ret = task->m_ReplyResult;
2615 }
2616 }
2617 else if ( !GetGetBlobByIdShouldFail() &&
2618 (lock_asap || load_lock) &&
2619 !task->m_ReplyResult.blob_id.empty() &&
2620 retry &&
2621 !task->GotNotFound() ) {
2622 // blob is required, but not received, yet blob_id is known, so we retry
2623 ret = x_RetryBlobRequest(task->m_ReplyResult.blob_id, data_source, req_idh);
2624 if ( !ret.lock ) {
2625 _TRACE("Failed to load blob for " << req_idh.AsString()<<" @ "<<CStackTrace());
2626 NCBI_THROW(CLoaderException, eLoaderFailed,
2627 "CPSGDataLoader::GetRecords("+req_idh.AsString()+") failed");
2628 }
2629 }
2630 else if ( task->GotNotFound() ) {
2631 NCBI_THROW_FMT(CLoaderException, eNoData,
2632 "CPSGDataLoader: No blob for seq_id="<<req_idh<<" blob_id="<<task->m_ReplyResult.blob_id);
2633 }
2634 else {
2635 _TRACE("Failed to load blob for " << req_idh.AsString()<<" @ "<<CStackTrace());
2636 NCBI_THROW(CLoaderException, eLoaderFailed,
2637 "CPSGDataLoader::GetRecords("+req_idh.AsString()+") failed");
2638 }
2639 return ret;
2640 }
2641
2642
2643 class CPSG_BioseqInfo_Task : public CPSG_Task
2644 {
2645 public:
CPSG_BioseqInfo_Task(TReply reply,CPSG_TaskGroup & group)2646 CPSG_BioseqInfo_Task(TReply reply, CPSG_TaskGroup& group)
2647 : CPSG_Task(reply, group) {}
2648
~CPSG_BioseqInfo_Task(void)2649 ~CPSG_BioseqInfo_Task(void) override {}
2650
2651 shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
2652
Finish(void)2653 void Finish(void) override {
2654 m_BioseqInfo.reset();
2655 }
2656
2657 protected:
ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)2658 void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2659 if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2660 m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
2661 }
2662 }
2663 };
2664
2665
x_GetBioseqInfo(const CSeq_id_Handle & idh)2666 shared_ptr<SPsgBioseqInfo> CPSGDataLoader_Impl::x_GetBioseqInfo(const CSeq_id_Handle& idh)
2667 {
2668 shared_ptr<SPsgBioseqInfo> ret = m_BioseqCache->Get(idh);
2669 if (ret) {
2670 return ret;
2671 }
2672
2673 CPSG_BioId bio_id = x_GetBioId(idh);
2674 auto context = make_shared<CPsgClientContext>();
2675 shared_ptr<CPSG_Request_Resolve> request = make_shared<CPSG_Request_Resolve>(move(bio_id), context);
2676 request->IncludeInfo(CPSG_Request_Resolve::fAllInfo);
2677 x_SendRequest(request);
2678 auto reply = context->GetReply();
2679 if (!reply) {
2680 _TRACE("Request failed: null reply");
2681 NCBI_THROW(CLoaderException, eLoaderFailed, "null reply for "+idh.AsString());
2682 return nullptr;
2683 }
2684
2685 CPSG_TaskGroup group(*m_ThreadPool);
2686 CRef<CPSG_BioseqInfo_Task> task(new CPSG_BioseqInfo_Task(reply, group));
2687 CPSG_Task_Guard guard(*task);
2688 group.AddTask(task);
2689 group.WaitAll();
2690
2691 if (task->GetStatus() != CThreadPool_Task::eCompleted) {
2692 _TRACE("Failed to get bioseq info for " << idh.AsString() << " @ "<<CStackTrace());
2693 NCBI_THROW(CLoaderException, eLoaderFailed, "failed to get bioseq info for "+idh.AsString());
2694 }
2695 if (!task->m_BioseqInfo) {
2696 _TRACE("No bioseq info for " << idh.AsString());
2697 return nullptr;
2698 }
2699
2700 return m_BioseqCache->Add(*task->m_BioseqInfo, idh);
2701 }
2702
2703
x_ReadBlobData(const SPsgBlobInfo & psg_blob_info,const CPSG_BlobInfo & blob_info,const CPSG_BlobData & blob_data,CTSE_LoadLock & load_lock,ESplitInfoType split_info_type)2704 void CPSGDataLoader_Impl::x_ReadBlobData(
2705 const SPsgBlobInfo& psg_blob_info,
2706 const CPSG_BlobInfo& blob_info,
2707 const CPSG_BlobData& blob_data,
2708 CTSE_LoadLock& load_lock,
2709 ESplitInfoType split_info_type)
2710 {
2711 if ( !load_lock.IsLoaded() ) {
2712 load_lock->SetBlobVersion(psg_blob_info.GetBlobVersion());
2713 load_lock->SetBlobState(psg_blob_info.blob_state);
2714 }
2715
2716 auto_ptr<CObjectIStream> in(GetBlobDataStream(blob_info, blob_data));
2717 if (!in.get()) {
2718 _TRACE("Failed to open blob data stream for blob-id " << blob_info.GetId()->Repr());
2719 return;
2720 }
2721
2722 if ( split_info_type == eIsSplitInfo ) {
2723 CRef<CID2S_Split_Info> split_info(new CID2S_Split_Info);
2724 *in >> *split_info;
2725 if ( s_GetDebugLevel() >= 8 ) {
2726 LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2727 MSerial_AsnText<<*split_info);
2728 }
2729 CSplitParser::Attach(*load_lock, *split_info);
2730 }
2731 else {
2732 CRef<CSeq_entry> entry(new CSeq_entry);
2733 *in >> *entry;
2734 if ( s_GetDebugLevel() >= 8 ) {
2735 LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2736 MSerial_AsnText<<*entry);
2737 }
2738 load_lock->SetSeq_entry(*entry);
2739 }
2740 if ( m_AddWGSMasterDescr ) {
2741 CWGSMasterSupport::AddWGSMaster(load_lock);
2742 }
2743 }
2744
2745
x_SetLoaded(CTSE_LoadLock & load_lock,EMainChunkType main_chunk_type)2746 void CPSGDataLoader_Impl::x_SetLoaded(CTSE_LoadLock& load_lock,
2747 EMainChunkType main_chunk_type)
2748 {
2749 if ( main_chunk_type == eDelayedMainChunk ) {
2750 load_lock->GetSplitInfo().GetChunk(kDelayedMain_ChunkId).SetLoaded();
2751 //_ASSERT(!load_lock->x_NeedsDelayedMainChunk());
2752 }
2753 else {
2754 load_lock.SetLoaded();
2755 }
2756 }
2757
2758
GetBlobDataStream(const CPSG_BlobInfo & blob_info,const CPSG_BlobData & blob_data)2759 CObjectIStream* CPSGDataLoader_Impl::GetBlobDataStream(
2760 const CPSG_BlobInfo& blob_info,
2761 const CPSG_BlobData& blob_data)
2762 {
2763 istream& data_stream = blob_data.GetStream();
2764 CNcbiIstream* in = &data_stream;
2765 auto_ptr<CNcbiIstream> z_stream;
2766 CObjectIStream* ret = nullptr;
2767
2768 if (blob_info.GetCompression() == "gzip") {
2769 z_stream.reset(new CCompressionIStream(data_stream,
2770 new CZipStreamDecompressor(CZipCompression::fGZip), 0));
2771 in = z_stream.get();
2772 }
2773 else if (!blob_info.GetCompression().empty()) {
2774 _TRACE("Unsupported data compression: '" << blob_info.GetCompression() << "'");
2775 return nullptr;
2776 }
2777
2778 EOwnership own = z_stream.get() ? eTakeOwnership : eNoOwnership;
2779 if (blob_info.GetFormat() == "asn.1") {
2780 ret = CObjectIStream::Open(eSerial_AsnBinary, *in, own);
2781 }
2782 else if (blob_info.GetFormat() == "asn1-text") {
2783 ret = CObjectIStream::Open(eSerial_AsnText, *in, own);
2784 }
2785 else if (blob_info.GetFormat() == "xml") {
2786 ret = CObjectIStream::Open(eSerial_Xml, *in, own);
2787 }
2788 else if (blob_info.GetFormat() == "json") {
2789 ret = CObjectIStream::Open(eSerial_Json, *in, own);
2790 }
2791 else {
2792 _TRACE("Unsupported data format: '" << blob_info.GetFormat() << "'");
2793 return nullptr;
2794 }
2795 _ASSERT(ret);
2796 z_stream.release();
2797 return ret;
2798 }
2799
2800
x_GetBulkBioseqInfo(CPSG_Request_Resolve::EIncludeInfo info,const TIds & ids,const TLoaded & loaded,TBioseqInfos & ret)2801 pair<size_t, size_t> CPSGDataLoader_Impl::x_GetBulkBioseqInfo(
2802 CPSG_Request_Resolve::EIncludeInfo info,
2803 const TIds& ids,
2804 const TLoaded& loaded,
2805 TBioseqInfos& ret)
2806 {
2807 pair<size_t, size_t> counts(0, 0);
2808 TIdxMap idx_map;
2809 auto context = make_shared<CPsgClientContext_Bulk>();
2810 for (size_t i = 0; i < ids.size(); ++i) {
2811 if (loaded[i]) continue;
2812 if ( CannotProcess(ids[i]) ) {
2813 continue;
2814 }
2815 ret[i] = m_BioseqCache->Get(ids[i]);
2816 if (ret[i]) {
2817 counts.first += 1;
2818 continue;
2819 }
2820 CPSG_BioId bio_id = x_GetBioId(ids[i]);
2821 shared_ptr<CPSG_Request_Resolve> request = make_shared<CPSG_Request_Resolve>(move(bio_id), context);
2822 idx_map[request.get()] = i;
2823 request->IncludeInfo(info);
2824 x_SendRequest(request);
2825 }
2826
2827 CPSG_TaskGroup group(*m_ThreadPool);
2828 typedef map<CRef<CPSG_BioseqInfo_Task>, size_t> TTasks;
2829 TTasks tasks;
2830 list<shared_ptr<CPSG_Task_Guard>> guards;
2831 while (!idx_map.empty()) {
2832 auto reply = context->GetReply();
2833 if (!reply) continue;
2834 TIdxMap::iterator idx_it = idx_map.find((void*)reply->GetRequest().get());
2835 size_t idx = ret.size();
2836 if (idx_it != idx_map.end()) {
2837 idx = idx_it->second;
2838 idx_map.erase(idx_it);
2839 }
2840
2841 CRef<CPSG_BioseqInfo_Task> task(new CPSG_BioseqInfo_Task(reply, group));
2842 guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2843 tasks[task] = idx;
2844 group.AddTask(task);
2845 }
2846 while (group.HasTasks()) {
2847 CRef<CPSG_BioseqInfo_Task> task = group.GetTask<CPSG_BioseqInfo_Task>();
2848 _ASSERT(task);
2849 TTasks::const_iterator it = tasks.find(task);
2850 _ASSERT(it != tasks.end());
2851 if (task->GetStatus() == CThreadPool_Task::eFailed) {
2852 _TRACE("Failed to load bioseq info for " << ids[it->second].AsString());
2853 counts.second += 1;
2854 continue;
2855 }
2856 if (!task->m_BioseqInfo) {
2857 _TRACE("No bioseq info for " << ids[it->second].AsString());
2858 // not loaded and no failure
2859 continue;
2860 }
2861 _ASSERT(task->m_BioseqInfo);
2862 ret[it->second] = make_shared<SPsgBioseqInfo>(*task->m_BioseqInfo);
2863 counts.first += 1;
2864 }
2865 return counts;
2866 }
2867
2868
2869 END_SCOPE(objects)
2870 END_NCBI_SCOPE
2871
2872 #endif // HAVE_PSG_LOADER
2873