1 /* $Id: async_resolve_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description: base class for processors which need to resolve seq_id
29 * asynchronously
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include <corelib/request_status.hpp>
36 #include <corelib/ncbidiag.hpp>
37
38 #include "pubseq_gateway.hpp"
39 #include "pubseq_gateway_utils.hpp"
40 #include "pubseq_gateway_cache_utils.hpp"
41 #include "cass_fetch.hpp"
42 #include "psgs_request.hpp"
43 #include "psgs_reply.hpp"
44 #include "insdc_utils.hpp"
45 #include "async_resolve_base.hpp"
46 #include "insdc_utils.hpp"
47 #include "pubseq_gateway_convert_utils.hpp"
48
49 #include <objects/seqloc/Seq_id.hpp>
50 #include <objects/general/Dbtag.hpp>
51 #include <objects/general/Object_id.hpp>
52 USING_IDBLOB_SCOPE;
53 USING_SCOPE(objects);
54
55 using namespace std::placeholders;
56
57
CPSGS_AsyncResolveBase()58 CPSGS_AsyncResolveBase::CPSGS_AsyncResolveBase()
59 {}
60
61
CPSGS_AsyncResolveBase(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TSeqIdResolutionFinishedCB finished_cb,TSeqIdResolutionErrorCB error_cb,TSeqIdResolutionStartProcessingCB start_processing_cb)62 CPSGS_AsyncResolveBase::CPSGS_AsyncResolveBase(
63 shared_ptr<CPSGS_Request> request,
64 shared_ptr<CPSGS_Reply> reply,
65 TSeqIdResolutionFinishedCB finished_cb,
66 TSeqIdResolutionErrorCB error_cb,
67 TSeqIdResolutionStartProcessingCB start_processing_cb) :
68 m_FinishedCB(finished_cb),
69 m_ErrorCB(error_cb),
70 m_StartProcessingCB(start_processing_cb),
71 m_ResolveStage(eInit),
72 m_SecondaryIndex(0),
73 m_CurrentFetch(nullptr),
74 m_NoSeqIdTypeFetch(nullptr),
75 m_StartProcessingCalled(false)
76 {}
77
78
~CPSGS_AsyncResolveBase()79 CPSGS_AsyncResolveBase::~CPSGS_AsyncResolveBase()
80 {}
81
82
83 void
Process(int16_t effective_version,int16_t effective_seq_id_type,list<string> && secondary_id_list,string && primary_seq_id,bool composed_ok,SBioseqResolution && bioseq_resolution)84 CPSGS_AsyncResolveBase::Process(int16_t effective_version,
85 int16_t effective_seq_id_type,
86 list<string> && secondary_id_list,
87 string && primary_seq_id,
88 bool composed_ok,
89 SBioseqResolution && bioseq_resolution)
90 {
91 m_ComposedOk = composed_ok;
92 m_PrimarySeqId = move(primary_seq_id);
93 m_EffectiveVersion = effective_version;
94 m_EffectiveSeqIdType = effective_seq_id_type;
95 m_SecondaryIdList = move(secondary_id_list);
96 m_BioseqResolution = move(bioseq_resolution);
97 m_AsyncCassResolutionStart = chrono::high_resolution_clock::now();
98
99 x_Process();
100 }
101
102
103 int16_t
GetEffectiveVersion(const CTextseq_id * text_seq_id)104 CPSGS_AsyncResolveBase::GetEffectiveVersion(const CTextseq_id * text_seq_id)
105 {
106 try {
107 if (text_seq_id == nullptr)
108 return -1;
109 if (text_seq_id->CanGetVersion())
110 return text_seq_id->GetVersion();
111 } catch (...) {
112 }
113 return -1;
114 }
115
116
117 string
GetRequestSeqId(void)118 CPSGS_AsyncResolveBase::GetRequestSeqId(void)
119 {
120 switch (m_Request->GetRequestType()) {
121 case CPSGS_Request::ePSGS_ResolveRequest:
122 return m_Request->GetRequest<SPSGS_ResolveRequest>().m_SeqId;
123 case CPSGS_Request::ePSGS_BlobBySeqIdRequest:
124 return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_SeqId;
125 case CPSGS_Request::ePSGS_AnnotationRequest:
126 return m_Request->GetRequest<SPSGS_AnnotRequest>().m_SeqId;
127 default:
128 break;
129 }
130 NCBI_THROW(CPubseqGatewayException, eLogic,
131 "Not handled request type " +
132 to_string(static_cast<int>(m_Request->GetRequestType())));
133 }
134
135
136 int16_t
GetRequestSeqIdType(void)137 CPSGS_AsyncResolveBase::GetRequestSeqIdType(void)
138 {
139 switch (m_Request->GetRequestType()) {
140 case CPSGS_Request::ePSGS_ResolveRequest:
141 return m_Request->GetRequest<SPSGS_ResolveRequest>().m_SeqIdType;
142 case CPSGS_Request::ePSGS_BlobBySeqIdRequest:
143 return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_SeqIdType;
144 case CPSGS_Request::ePSGS_AnnotationRequest:
145 return m_Request->GetRequest<SPSGS_AnnotRequest>().m_SeqIdType;
146 default:
147 break;
148 }
149 NCBI_THROW(CPubseqGatewayException, eLogic,
150 "Not handled request type " +
151 to_string(static_cast<int>(m_Request->GetRequestType())));
152 }
153
154
155 SPSGS_ResolveRequest::TPSGS_BioseqIncludeData
GetBioseqInfoFields(void)156 CPSGS_AsyncResolveBase::GetBioseqInfoFields(void)
157 {
158 if (m_Request->GetRequestType() == CPSGS_Request::ePSGS_ResolveRequest)
159 return m_Request->GetRequest<SPSGS_ResolveRequest>().m_IncludeDataFlags;
160 return SPSGS_ResolveRequest::fPSGS_AllBioseqFields;
161 }
162
163
164 bool
NonKeyBioseqInfoFieldsRequested(void)165 CPSGS_AsyncResolveBase::NonKeyBioseqInfoFieldsRequested(void)
166 {
167 return (GetBioseqInfoFields() &
168 ~SPSGS_ResolveRequest::fPSGS_BioseqKeyFields) != 0;
169 }
170
171
172 // The method tells if the BIOSEQ_INFO record needs to be retrieved.
173 // It can be skipped under very specific conditions.
174 // It makes sense if the source of data is SI2CSI, i.e. only key fields are
175 // available.
176 bool
CanSkipBioseqInfoRetrieval(const CBioseqInfoRecord & bioseq_info_record)177 CPSGS_AsyncResolveBase::CanSkipBioseqInfoRetrieval(
178 const CBioseqInfoRecord & bioseq_info_record)
179 {
180 if (m_Request->GetRequestType() != CPSGS_Request::ePSGS_ResolveRequest)
181 return false; // The get request supposes the full bioseq info
182
183 if (NonKeyBioseqInfoFieldsRequested())
184 return false; // In the resolve request more bioseq_info fields are requested
185
186
187 auto seq_id_type = bioseq_info_record.GetSeqIdType();
188 if (bioseq_info_record.GetVersion() > 0 && seq_id_type != CSeq_id::e_Gi)
189 return true; // This combination in data never requires accession adjustments
190
191 auto include_flags = m_Request->GetRequest<SPSGS_ResolveRequest>().m_IncludeDataFlags;
192 if ((include_flags & ~SPSGS_ResolveRequest::fPSGS_Gi) == 0)
193 return true; // Only GI field or no fields are requested so no accession
194 // adjustments are required
195
196 auto acc_subst = m_Request->GetRequest<SPSGS_ResolveRequest>().m_AccSubstOption;
197 if (acc_subst == SPSGS_RequestBase::ePSGS_NeverAccSubstitute)
198 return true; // No accession adjustments anyway so key fields are enough
199
200 if (acc_subst == SPSGS_RequestBase::ePSGS_LimitedAccSubstitution &&
201 seq_id_type != CSeq_id::e_Gi)
202 return true; // No accession adjustments required
203
204 return false;
205 }
206
207
208 SPSGS_RequestBase::EPSGS_AccSubstitutioOption
GetAccessionSubstitutionOption(void)209 CPSGS_AsyncResolveBase::GetAccessionSubstitutionOption(void)
210 {
211 // The substitution makes sense only for resolve/get/annot requests
212 switch (m_Request->GetRequestType()) {
213 case CPSGS_Request::ePSGS_ResolveRequest:
214 return m_Request->GetRequest<SPSGS_ResolveRequest>().m_AccSubstOption;
215 case CPSGS_Request::ePSGS_BlobBySeqIdRequest:
216 return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_AccSubstOption;
217 case CPSGS_Request::ePSGS_AnnotationRequest:
218 return SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
219 default:
220 break;
221 }
222 NCBI_THROW(CPubseqGatewayException, eLogic,
223 "Not handled request type " +
224 to_string(static_cast<int>(m_Request->GetRequestType())));
225 }
226
227
228 EPSGS_AccessionAdjustmentResult
AdjustBioseqAccession(SBioseqResolution & bioseq_resolution)229 CPSGS_AsyncResolveBase::AdjustBioseqAccession(
230 SBioseqResolution & bioseq_resolution)
231 {
232 if (CanSkipBioseqInfoRetrieval(bioseq_resolution.m_BioseqInfo)) {
233 if (m_Request->NeedTrace()) {
234 m_Reply->SendTrace("Accession adjustment is not required "
235 "(bioseq info is not provided)",
236 m_Request->GetStartTimestamp());
237 }
238 return ePSGS_NotRequired;
239 }
240
241 auto acc_subst_option = GetAccessionSubstitutionOption();
242 if (acc_subst_option == SPSGS_RequestBase::ePSGS_NeverAccSubstitute) {
243 if (m_Request->NeedTrace()) {
244 m_Reply->SendTrace("Accession adjustment is not required "
245 "(substitute option is 'never')",
246 m_Request->GetStartTimestamp());
247 }
248 return ePSGS_NotRequired;
249 }
250
251 if (acc_subst_option == SPSGS_RequestBase::ePSGS_LimitedAccSubstitution &&
252 bioseq_resolution.m_BioseqInfo.GetSeqIdType() != CSeq_id::e_Gi) {
253 if (m_Request->NeedTrace()) {
254 m_Reply->SendTrace("Accession adjustment is not required "
255 "(substitute option is 'limited' and seq_id_type is not gi)",
256 m_Request->GetStartTimestamp());
257 }
258 return ePSGS_NotRequired;
259 }
260
261 auto adj_result = bioseq_resolution.AdjustAccession(m_Request, m_Reply);
262 if (adj_result == ePSGS_LogicError ||
263 adj_result == ePSGS_SeqIdsEmpty) {
264 if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
265 PSG_WARNING("BIOSEQ_INFO cache error: " +
266 bioseq_resolution.m_AdjustmentError);
267 else
268 PSG_WARNING("BIOSEQ_INFO Cassandra error: " +
269 bioseq_resolution.m_AdjustmentError);
270 }
271 return adj_result;
272 }
273
274
275 // The method is called when:
276 // - resolution is initialized
277 // - there was no record found at any stage, i.e. a next try should be
278 // initiated
279 // NB: if a record was found, the method is not called. Instead, the pending
280 // operation class is called back directly
x_Process(void)281 void CPSGS_AsyncResolveBase::x_Process(void)
282 {
283 switch (m_ResolveStage) {
284 case eInit:
285 if (!m_ComposedOk) {
286 // The only thing to try is the AsIs resolution
287 m_ResolveStage = eSecondaryAsIs;
288 x_Process();
289 break;
290 }
291
292 if (m_PrimarySeqId.empty()) {
293 m_ResolveStage = eSecondarySi2csi;
294 x_Process();
295 break;
296 }
297
298 m_ResolveStage = ePrimaryBioseq;
299 x_Process();
300 break;
301
302 case ePrimaryBioseq:
303 m_ResolveStage = eSecondarySi2csi;
304
305 // true => with seq_id_type
306 x_PreparePrimaryBioseqInfoQuery(m_PrimarySeqId, m_EffectiveVersion,
307 m_EffectiveSeqIdType, -1, true);
308 break;
309
310 case eSecondarySi2csi:
311 // loop over all secondary seq_id
312 if (m_SecondaryIndex >= m_SecondaryIdList.size()) {
313 m_ResolveStage = eSecondaryAsIs;
314 x_Process();
315 break;
316 }
317 x_PrepareSecondarySi2csiQuery();
318 ++m_SecondaryIndex;
319 break;
320
321 case eSecondaryAsIs:
322 m_ResolveStage = eSecondaryAsIsModified;
323 x_PrepareSecondaryAsIsSi2csiQuery();
324 break;
325
326 case eSecondaryAsIsModified:
327 m_ResolveStage = eFinished;
328 x_PrepareSecondaryAsIsModifiedSi2csiQuery();
329 break;
330
331 case ePostSi2Csi:
332 // Really, there is no stage after that. This is post processing.
333 // What is done is defined in the found or error callbacks.
334 // true => with seq_id_type
335 x_PreparePrimaryBioseqInfoQuery(
336 m_BioseqResolution.m_BioseqInfo.GetAccession(),
337 m_BioseqResolution.m_BioseqInfo.GetVersion(),
338 m_BioseqResolution.m_BioseqInfo.GetSeqIdType(),
339 m_BioseqResolution.m_BioseqInfo.GetGI(),
340 true);
341 break;
342
343 case eFinished:
344 default:
345 // 'not found' of PendingOperation
346 m_BioseqResolution.m_ResolutionResult = ePSGS_NotResolved;
347 m_BioseqResolution.m_BioseqInfo.Reset();
348
349 x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
350 }
351 }
352
353
354 void
x_PreparePrimaryBioseqInfoQuery(const CBioseqInfoRecord::TAccession & seq_id,CBioseqInfoRecord::TVersion version,CBioseqInfoRecord::TSeqIdType seq_id_type,CBioseqInfoRecord::TGI gi,bool with_seq_id_type)355 CPSGS_AsyncResolveBase::x_PreparePrimaryBioseqInfoQuery(
356 const CBioseqInfoRecord::TAccession & seq_id,
357 CBioseqInfoRecord::TVersion version,
358 CBioseqInfoRecord::TSeqIdType seq_id_type,
359 CBioseqInfoRecord::TGI gi,
360 bool with_seq_id_type)
361 {
362 ++m_BioseqResolution.m_CassQueryCount;
363 m_BioseqInfoRequestedAccession = seq_id;
364 m_BioseqInfoRequestedVersion = version;
365 m_BioseqInfoRequestedSeqIdType = seq_id_type;
366 m_BioseqInfoRequestedGI = gi;
367
368 unique_ptr<CCassBioseqInfoFetch> details;
369 details.reset(new CCassBioseqInfoFetch());
370
371 CBioseqInfoFetchRequest bioseq_info_request;
372 bioseq_info_request.SetAccession(seq_id);
373 if (version != -1)
374 bioseq_info_request.SetVersion(version);
375 if (with_seq_id_type) {
376 if (seq_id_type != -1)
377 bioseq_info_request.SetSeqIdType(seq_id_type);
378 }
379 if (gi != -1)
380 bioseq_info_request.SetGI(gi);
381
382 auto app = CPubseqGatewayApp::GetInstance();
383 CCassBioseqInfoTaskFetch * fetch_task =
384 new CCassBioseqInfoTaskFetch(
385 app->GetCassandraTimeout(),
386 app->GetCassandraMaxRetries(),
387 app->GetCassandraConnection(),
388 app->GetBioseqKeyspace(),
389 bioseq_info_request,
390 nullptr, nullptr);
391 details->SetLoader(fetch_task);
392
393 if (with_seq_id_type)
394 fetch_task->SetConsumeCallback(
395 std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfo, this, _1));
396 else
397 fetch_task->SetConsumeCallback(
398 std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfoWithoutSeqIdType, this, _1));
399
400 fetch_task->SetErrorCB(
401 std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfoError, this, _1, _2, _3, _4));
402 fetch_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
403
404 m_BioseqInfoStart = chrono::high_resolution_clock::now();
405 if (with_seq_id_type) {
406 m_CurrentFetch = details.release();
407 m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_CurrentFetch));
408 } else {
409 m_NoSeqIdTypeFetch = details.release();
410 m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_NoSeqIdTypeFetch));
411 }
412
413 if (m_Request->NeedTrace()) {
414 if (with_seq_id_type)
415 m_Reply->SendTrace(
416 "Cassandra request: " +
417 ToJson(bioseq_info_request).Repr(CJsonNode::fStandardJson),
418 m_Request->GetStartTimestamp());
419 else
420 m_Reply->SendTrace(
421 "Cassandra request for INSDC types: " +
422 ToJson(bioseq_info_request).Repr(CJsonNode::fStandardJson),
423 m_Request->GetStartTimestamp());
424 }
425
426 fetch_task->Wait();
427 }
428
429
x_PrepareSi2csiQuery(const string & secondary_id,int16_t effective_seq_id_type)430 void CPSGS_AsyncResolveBase::x_PrepareSi2csiQuery(const string & secondary_id,
431 int16_t effective_seq_id_type)
432 {
433 ++m_BioseqResolution.m_CassQueryCount;
434
435 unique_ptr<CCassSi2csiFetch> details;
436 details.reset(new CCassSi2csiFetch());
437
438 CSi2CsiFetchRequest si2csi_request;
439 si2csi_request.SetSecSeqId(secondary_id);
440 if (effective_seq_id_type != -1)
441 si2csi_request.SetSecSeqIdType(effective_seq_id_type);
442
443 auto app = CPubseqGatewayApp::GetInstance();
444 CCassSI2CSITaskFetch * fetch_task =
445 new CCassSI2CSITaskFetch(
446 app->GetCassandraTimeout(),
447 app->GetCassandraMaxRetries(),
448 app->GetCassandraConnection(),
449 app->GetBioseqKeyspace(),
450 si2csi_request,
451 nullptr, nullptr);
452
453 details->SetLoader(fetch_task);
454
455 fetch_task->SetConsumeCallback(std::bind(&CPSGS_AsyncResolveBase::x_OnSi2csiRecord, this, _1));
456 fetch_task->SetErrorCB(std::bind(&CPSGS_AsyncResolveBase::x_OnSi2csiError, this, _1, _2, _3, _4));
457 fetch_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
458
459 m_CurrentFetch = details.release();
460
461 m_Si2csiStart = chrono::high_resolution_clock::now();
462 m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_CurrentFetch));
463
464 if (m_Request->NeedTrace())
465 m_Reply->SendTrace(
466 "Cassandra request: " +
467 ToJson(si2csi_request).Repr(CJsonNode::fStandardJson),
468 m_Request->GetStartTimestamp());
469
470 fetch_task->Wait();
471 }
472
473
x_PrepareSecondarySi2csiQuery(void)474 void CPSGS_AsyncResolveBase::x_PrepareSecondarySi2csiQuery(void)
475 {
476 // Use m_SecondaryIndex, it was properly formed in the state machine
477 x_PrepareSi2csiQuery(*std::next(m_SecondaryIdList.begin(),
478 m_SecondaryIndex),
479 m_EffectiveSeqIdType);
480 }
481
482
x_PrepareSecondaryAsIsSi2csiQuery(void)483 void CPSGS_AsyncResolveBase::x_PrepareSecondaryAsIsSi2csiQuery(void)
484 {
485 // Need to capitalize the seq_id before going to the tables.
486 // Capitalizing in place suites because the other tries are done via copies
487 // provided by OSLT
488 auto upper_request_seq_id = GetRequestSeqId();
489 NStr::ToUpper(upper_request_seq_id);
490
491 if (upper_request_seq_id == m_PrimarySeqId &&
492 GetRequestSeqIdType() == m_EffectiveSeqIdType) {
493 // Such a request has already been made; it was because the primary id
494 // matches the one from URL
495 x_Process();
496 } else {
497 x_PrepareSi2csiQuery(upper_request_seq_id, GetRequestSeqIdType());
498 }
499 }
500
501
x_PrepareSecondaryAsIsModifiedSi2csiQuery(void)502 void CPSGS_AsyncResolveBase::x_PrepareSecondaryAsIsModifiedSi2csiQuery(void)
503 {
504 auto upper_request_seq_id = GetRequestSeqId();
505 NStr::ToUpper(upper_request_seq_id);
506
507 // if there are | at the end => strip all trailing bars
508 // else => add one | at the end
509
510 if (upper_request_seq_id[upper_request_seq_id.size() - 1] == '|') {
511 string strip_bar_seq_id(upper_request_seq_id);
512 while (strip_bar_seq_id[strip_bar_seq_id.size() - 1] == '|')
513 strip_bar_seq_id.erase(strip_bar_seq_id.size() - 1, 1);
514
515 x_PrepareSi2csiQuery(strip_bar_seq_id, GetRequestSeqIdType());
516 } else {
517 string seq_id_added_bar(upper_request_seq_id);
518 seq_id_added_bar.append(1, '|');
519
520 x_PrepareSi2csiQuery(seq_id_added_bar, GetRequestSeqIdType());
521 }
522 }
523
524
x_OnBioseqInfo(vector<CBioseqInfoRecord> && records)525 void CPSGS_AsyncResolveBase::x_OnBioseqInfo(vector<CBioseqInfoRecord>&& records)
526 {
527 auto record_count = records.size();
528 auto app = CPubseqGatewayApp::GetInstance();
529 m_CurrentFetch->SetReadFinished();
530
531 if (m_Request->NeedTrace()) {
532 string msg = to_string(records.size()) + " hit(s)";
533 for (const auto & item : records) {
534 msg += "\n" + ToJson(item, SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
535 Repr(CJsonNode::fStandardJson);
536 }
537 m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
538 }
539
540 size_t index_to_pick = 0;
541 if (record_count > 1) {
542 // Multiple records:
543 // => choose the highest version;
544 // if more than one with the highest version then choose the highest
545 // date changed
546 auto version = records[0].GetVersion();
547 auto date_changed = records[0].GetDateChanged();
548 for (size_t k = 0; k < records.size(); ++k) {
549 if (records[k].GetVersion() > version) {
550 index_to_pick = k;
551 version = records[k].GetVersion();
552 date_changed = records[k].GetDateChanged();
553 } else {
554 if (records[k].GetVersion() == version) {
555 if (records[k].GetDateChanged() > date_changed) {
556 index_to_pick = k;
557 date_changed = records[k].GetDateChanged();
558 }
559 }
560 }
561 }
562 // Pretend there was exactly one record
563 record_count = 1;
564 }
565
566 if (record_count != 1) {
567 // Did not find anything. Need more tries
568 if (record_count > 1) {
569 app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
570 m_BioseqInfoStart);
571 app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundMany);
572 } else {
573 app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusNotFound,
574 m_BioseqInfoStart);
575 app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoNotFound);
576 }
577
578 if (record_count == 0 && IsINSDCSeqIdType(m_BioseqInfoRequestedSeqIdType)) {
579 x_PreparePrimaryBioseqInfoQuery(
580 m_BioseqInfoRequestedAccession, m_BioseqInfoRequestedVersion,
581 m_BioseqInfoRequestedSeqIdType, m_BioseqInfoRequestedGI,
582 false);
583 return;
584 }
585
586 if (m_ResolveStage == ePostSi2Csi) {
587 // Special case for post si2csi results; no next stage
588 if (record_count > 1) {
589 m_ErrorCB(
590 CRequestStatus::e502_BadGateway,
591 ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
592 "Data inconsistency. More than one BIOSEQ_INFO table record is found for "
593 "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
594 } else {
595 m_ErrorCB(
596 CRequestStatus::e502_BadGateway,
597 ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
598 "Data inconsistency. A BIOSEQ_INFO table record is not found for "
599 "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
600 }
601 return;
602 }
603
604 x_Process();
605 return;
606 }
607
608 // Looking good data have appeared => inform the upper level
609 x_SignalStartProcessing();
610
611 if (m_Request->NeedTrace()) {
612 string prefix;
613 if (records.size() == 1)
614 prefix = "Selected record:\n";
615 else
616 prefix = "Selected (out of multiple records) the record with the highest version "
617 "(and with highest date changed if more than one with highest version):\n";
618 m_Reply->SendTrace(
619 prefix +
620 ToJson(records[index_to_pick], SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
621 Repr(CJsonNode::fStandardJson),
622 m_Request->GetStartTimestamp());
623 }
624
625 m_BioseqResolution.m_ResolutionResult = ePSGS_BioseqDB;
626 m_BioseqResolution.m_BioseqInfo = std::move(records[index_to_pick]);
627
628 // Adjust accession if needed
629 auto adj_result = AdjustBioseqAccession(m_BioseqResolution);
630 if (adj_result == ePSGS_LogicError || adj_result == ePSGS_SeqIdsEmpty) {
631 // The problem has already been logged
632 m_ErrorCB(
633 CRequestStatus::e502_BadGateway,
634 ePSGS_BioseqInfoAccessionAdjustmentError, eDiag_Error,
635 "BIOSEQ_INFO Cassandra error: " + m_BioseqResolution.m_AdjustmentError);
636 return;
637 }
638
639 // Everything is fine
640 app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
641 m_BioseqInfoStart);
642 app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundOne);
643
644 x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
645 }
646
647
x_OnBioseqInfoWithoutSeqIdType(vector<CBioseqInfoRecord> && records)648 void CPSGS_AsyncResolveBase::x_OnBioseqInfoWithoutSeqIdType(
649 vector<CBioseqInfoRecord>&& records)
650 {
651 m_NoSeqIdTypeFetch->SetReadFinished();
652
653 auto app = CPubseqGatewayApp::GetInstance();
654 SINSDCDecision decision = DecideINSDC(records, m_BioseqInfoRequestedVersion);
655
656 if (m_Request->NeedTrace()) {
657 string msg = to_string(records.size()) +
658 " hit(s); decision status: " + to_string(decision.status);
659 for (const auto & item : records) {
660 msg += "\n" + ToJson(item, SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
661 Repr(CJsonNode::fStandardJson);
662 }
663 m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
664 }
665
666 switch (decision.status) {
667 case CRequestStatus::e200_Ok:
668 // Looking good data have appeared => inform the upper level
669 x_SignalStartProcessing();
670
671 m_BioseqResolution.m_ResolutionResult = ePSGS_BioseqDB;
672
673 app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
674 m_BioseqInfoStart);
675 app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundOne);
676 m_BioseqResolution.m_BioseqInfo = std::move(records[decision.index]);
677
678 // Data callback
679 x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
680 break;
681 case CRequestStatus::e404_NotFound:
682 app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusNotFound,
683 m_BioseqInfoStart);
684 app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoNotFound);
685 if (m_ResolveStage == ePostSi2Csi) {
686 m_ErrorCB(
687 CRequestStatus::e502_BadGateway,
688 ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
689 "Data inconsistency. A BIOSEQ_INFO table record is not found for "
690 "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
691 } else {
692 // Move to the next stage
693 x_Process();
694 }
695 break;
696 case CRequestStatus::e500_InternalServerError:
697 app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
698 m_BioseqInfoStart);
699 app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundMany);
700 if (m_ResolveStage == ePostSi2Csi) {
701 m_ErrorCB(
702 CRequestStatus::e502_BadGateway,
703 ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
704 "Data inconsistency. More than one BIOSEQ_INFO table record is found for "
705 "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
706
707 } else {
708 // Move to the next stage
709 x_Process();
710 }
711 break;
712 default:
713 // Impossible
714 m_ErrorCB(
715 CRequestStatus::e500_InternalServerError, ePSGS_ServerLogicError,
716 eDiag_Error, "Unexpected decision code when a secondary INSCD "
717 "request results processed while resolving seq id asynchronously");
718 }
719 }
720
721
x_OnBioseqInfoError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)722 void CPSGS_AsyncResolveBase::x_OnBioseqInfoError(CRequestStatus::ECode status, int code,
723 EDiagSev severity, const string & message)
724 {
725 if (m_Request->NeedTrace())
726 m_Reply->SendTrace("Cassandra error: " + message,
727 m_Request->GetStartTimestamp());
728
729 if (m_CurrentFetch)
730 m_CurrentFetch->SetReadFinished();
731 if (m_NoSeqIdTypeFetch)
732 m_NoSeqIdTypeFetch->SetReadFinished();
733
734 CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
735 CPSGSCounters::ePSGS_BioseqInfoError);
736
737 m_ErrorCB(status, code, severity, message);
738 }
739
740
x_OnSi2csiRecord(vector<CSI2CSIRecord> && records)741 void CPSGS_AsyncResolveBase::x_OnSi2csiRecord(vector<CSI2CSIRecord> && records)
742 {
743 auto record_count = records.size();
744 auto app = CPubseqGatewayApp::GetInstance();
745 m_CurrentFetch->SetReadFinished();
746
747 if (m_Request->NeedTrace()) {
748 string msg = to_string(record_count) + " hit(s)";
749 for (const auto & item : records) {
750 msg += "\n" + ToJson(item).Repr(CJsonNode::fStandardJson);
751 }
752 if (record_count > 1)
753 msg += "\nMore than one record => may be more tries";
754
755 m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
756 }
757
758 if (record_count != 1) {
759 // Multiple records or did not find anything. Need more tries
760 if (record_count > 1) {
761 app->GetTiming().Register(eLookupCassSi2csi, eOpStatusFound,
762 m_Si2csiStart);
763 app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiFoundMany);
764 } else {
765 app->GetTiming().Register(eLookupCassSi2csi, eOpStatusNotFound,
766 m_Si2csiStart);
767 app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiNotFound);
768 }
769
770 x_Process();
771 return;
772 }
773
774 // Looking good data have appeared
775 x_SignalStartProcessing();
776
777 app->GetTiming().Register(eLookupCassSi2csi, eOpStatusFound,
778 m_Si2csiStart);
779 app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiFoundOne);
780
781 m_BioseqResolution.m_ResolutionResult = ePSGS_Si2csiDB;
782 m_BioseqResolution.m_BioseqInfo.SetAccession(records[0].GetAccession());
783 m_BioseqResolution.m_BioseqInfo.SetVersion(records[0].GetVersion());
784 m_BioseqResolution.m_BioseqInfo.SetSeqIdType(records[0].GetSeqIdType());
785 m_BioseqResolution.m_BioseqInfo.SetGI(records[0].GetGI());
786
787 // Special case for the seq_id like gi|156232
788 if (!CanSkipBioseqInfoRetrieval(m_BioseqResolution.m_BioseqInfo)) {
789 m_ResolveStage = ePostSi2Csi;
790 x_Process();
791 return;
792 }
793
794 x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
795 }
796
797
x_OnSi2csiError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)798 void CPSGS_AsyncResolveBase::x_OnSi2csiError(CRequestStatus::ECode status, int code,
799 EDiagSev severity, const string & message)
800 {
801 if (m_Request->NeedTrace())
802 m_Reply->SendTrace("Cassandra error: " + message,
803 m_Request->GetStartTimestamp());
804
805 auto app = CPubseqGatewayApp::GetInstance();
806
807 m_CurrentFetch->SetReadFinished();
808 app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiError);
809
810 m_ErrorCB(status, code, severity, message);
811 }
812
813
814 void
x_OnSeqIdAsyncResolutionFinished(SBioseqResolution && async_bioseq_resolution)815 CPSGS_AsyncResolveBase::x_OnSeqIdAsyncResolutionFinished(
816 SBioseqResolution && async_bioseq_resolution)
817 {
818 auto app = CPubseqGatewayApp::GetInstance();
819
820 if (async_bioseq_resolution.IsValid()) {
821 // Just in case; the second call will be prevented anyway
822 x_SignalStartProcessing();
823
824 m_FinishedCB(move(async_bioseq_resolution));
825 } else {
826 app->GetCounters().Increment(CPSGSCounters::ePSGS_InputSeqIdNotResolved);
827
828 if (async_bioseq_resolution.m_Error.HasError())
829 m_ErrorCB(
830 async_bioseq_resolution.m_Error.m_ErrorCode,
831 ePSGS_UnresolvedSeqId, eDiag_Error,
832 async_bioseq_resolution.m_Error.m_ErrorMessage);
833 else
834 m_ErrorCB(
835 CRequestStatus::e404_NotFound, ePSGS_UnresolvedSeqId,
836 eDiag_Error, "Could not resolve seq_id " + GetRequestSeqId());
837 }
838 }
839
840
841
x_SignalStartProcessing(void)842 void CPSGS_AsyncResolveBase::x_SignalStartProcessing(void)
843 {
844 if (!m_StartProcessingCalled) {
845 m_StartProcessingCalled = true;
846 m_StartProcessingCB();
847 }
848 }
849
850