1 /*  $Id: async_resolve_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: base class for processors which need to resolve seq_id
29  *                   asynchronously
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include <corelib/request_status.hpp>
36 #include <corelib/ncbidiag.hpp>
37 
38 #include "pubseq_gateway.hpp"
39 #include "pubseq_gateway_utils.hpp"
40 #include "pubseq_gateway_cache_utils.hpp"
41 #include "cass_fetch.hpp"
42 #include "psgs_request.hpp"
43 #include "psgs_reply.hpp"
44 #include "insdc_utils.hpp"
45 #include "async_resolve_base.hpp"
46 #include "insdc_utils.hpp"
47 #include "pubseq_gateway_convert_utils.hpp"
48 
49 #include <objects/seqloc/Seq_id.hpp>
50 #include <objects/general/Dbtag.hpp>
51 #include <objects/general/Object_id.hpp>
52 USING_IDBLOB_SCOPE;
53 USING_SCOPE(objects);
54 
55 using namespace std::placeholders;
56 
57 
CPSGS_AsyncResolveBase()58 CPSGS_AsyncResolveBase::CPSGS_AsyncResolveBase()
59 {}
60 
61 
CPSGS_AsyncResolveBase(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TSeqIdResolutionFinishedCB finished_cb,TSeqIdResolutionErrorCB error_cb,TSeqIdResolutionStartProcessingCB start_processing_cb)62 CPSGS_AsyncResolveBase::CPSGS_AsyncResolveBase(
63                                 shared_ptr<CPSGS_Request> request,
64                                 shared_ptr<CPSGS_Reply> reply,
65                                 TSeqIdResolutionFinishedCB finished_cb,
66                                 TSeqIdResolutionErrorCB error_cb,
67                                 TSeqIdResolutionStartProcessingCB  start_processing_cb) :
68     m_FinishedCB(finished_cb),
69     m_ErrorCB(error_cb),
70     m_StartProcessingCB(start_processing_cb),
71     m_ResolveStage(eInit),
72     m_SecondaryIndex(0),
73     m_CurrentFetch(nullptr),
74     m_NoSeqIdTypeFetch(nullptr),
75     m_StartProcessingCalled(false)
76 {}
77 
78 
~CPSGS_AsyncResolveBase()79 CPSGS_AsyncResolveBase::~CPSGS_AsyncResolveBase()
80 {}
81 
82 
83 void
Process(int16_t effective_version,int16_t effective_seq_id_type,list<string> && secondary_id_list,string && primary_seq_id,bool composed_ok,SBioseqResolution && bioseq_resolution)84 CPSGS_AsyncResolveBase::Process(int16_t               effective_version,
85                                 int16_t               effective_seq_id_type,
86                                 list<string> &&       secondary_id_list,
87                                 string &&             primary_seq_id,
88                                 bool                  composed_ok,
89                                 SBioseqResolution &&  bioseq_resolution)
90 {
91     m_ComposedOk = composed_ok;
92     m_PrimarySeqId = move(primary_seq_id);
93     m_EffectiveVersion = effective_version;
94     m_EffectiveSeqIdType = effective_seq_id_type;
95     m_SecondaryIdList = move(secondary_id_list);
96     m_BioseqResolution = move(bioseq_resolution);
97     m_AsyncCassResolutionStart = chrono::high_resolution_clock::now();
98 
99     x_Process();
100 }
101 
102 
103 int16_t
GetEffectiveVersion(const CTextseq_id * text_seq_id)104 CPSGS_AsyncResolveBase::GetEffectiveVersion(const CTextseq_id *  text_seq_id)
105 {
106     try {
107         if (text_seq_id == nullptr)
108             return -1;
109         if (text_seq_id->CanGetVersion())
110             return text_seq_id->GetVersion();
111     } catch (...) {
112     }
113     return -1;
114 }
115 
116 
117 string
GetRequestSeqId(void)118 CPSGS_AsyncResolveBase::GetRequestSeqId(void)
119 {
120     switch (m_Request->GetRequestType()) {
121         case CPSGS_Request::ePSGS_ResolveRequest:
122             return m_Request->GetRequest<SPSGS_ResolveRequest>().m_SeqId;
123         case CPSGS_Request::ePSGS_BlobBySeqIdRequest:
124             return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_SeqId;
125         case CPSGS_Request::ePSGS_AnnotationRequest:
126             return m_Request->GetRequest<SPSGS_AnnotRequest>().m_SeqId;
127         default:
128             break;
129     }
130     NCBI_THROW(CPubseqGatewayException, eLogic,
131                "Not handled request type " +
132                to_string(static_cast<int>(m_Request->GetRequestType())));
133 }
134 
135 
136 int16_t
GetRequestSeqIdType(void)137 CPSGS_AsyncResolveBase::GetRequestSeqIdType(void)
138 {
139     switch (m_Request->GetRequestType()) {
140         case CPSGS_Request::ePSGS_ResolveRequest:
141             return m_Request->GetRequest<SPSGS_ResolveRequest>().m_SeqIdType;
142         case CPSGS_Request::ePSGS_BlobBySeqIdRequest:
143             return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_SeqIdType;
144         case CPSGS_Request::ePSGS_AnnotationRequest:
145             return m_Request->GetRequest<SPSGS_AnnotRequest>().m_SeqIdType;
146         default:
147             break;
148     }
149     NCBI_THROW(CPubseqGatewayException, eLogic,
150                "Not handled request type " +
151                to_string(static_cast<int>(m_Request->GetRequestType())));
152 }
153 
154 
155 SPSGS_ResolveRequest::TPSGS_BioseqIncludeData
GetBioseqInfoFields(void)156 CPSGS_AsyncResolveBase::GetBioseqInfoFields(void)
157 {
158     if (m_Request->GetRequestType() == CPSGS_Request::ePSGS_ResolveRequest)
159         return m_Request->GetRequest<SPSGS_ResolveRequest>().m_IncludeDataFlags;
160     return SPSGS_ResolveRequest::fPSGS_AllBioseqFields;
161 }
162 
163 
164 bool
NonKeyBioseqInfoFieldsRequested(void)165 CPSGS_AsyncResolveBase::NonKeyBioseqInfoFieldsRequested(void)
166 {
167     return (GetBioseqInfoFields() &
168             ~SPSGS_ResolveRequest::fPSGS_BioseqKeyFields) != 0;
169 }
170 
171 
172 // The method tells if the BIOSEQ_INFO record needs to be retrieved.
173 // It can be skipped under very specific conditions.
174 // It makes sense if the source of data is SI2CSI, i.e. only key fields are
175 // available.
176 bool
CanSkipBioseqInfoRetrieval(const CBioseqInfoRecord & bioseq_info_record)177 CPSGS_AsyncResolveBase::CanSkipBioseqInfoRetrieval(
178                                 const CBioseqInfoRecord &  bioseq_info_record)
179 {
180     if (m_Request->GetRequestType() != CPSGS_Request::ePSGS_ResolveRequest)
181         return false;   // The get request supposes the full bioseq info
182 
183     if (NonKeyBioseqInfoFieldsRequested())
184         return false;   // In the resolve request more bioseq_info fields are requested
185 
186 
187     auto    seq_id_type = bioseq_info_record.GetSeqIdType();
188     if (bioseq_info_record.GetVersion() > 0 && seq_id_type != CSeq_id::e_Gi)
189         return true;    // This combination in data never requires accession adjustments
190 
191     auto    include_flags = m_Request->GetRequest<SPSGS_ResolveRequest>().m_IncludeDataFlags;
192     if ((include_flags & ~SPSGS_ResolveRequest::fPSGS_Gi) == 0)
193         return true;    // Only GI field or no fields are requested so no accession
194                         // adjustments are required
195 
196     auto    acc_subst = m_Request->GetRequest<SPSGS_ResolveRequest>().m_AccSubstOption;
197     if (acc_subst == SPSGS_RequestBase::ePSGS_NeverAccSubstitute)
198         return true;    // No accession adjustments anyway so key fields are enough
199 
200     if (acc_subst == SPSGS_RequestBase::ePSGS_LimitedAccSubstitution &&
201         seq_id_type != CSeq_id::e_Gi)
202         return true;    // No accession adjustments required
203 
204     return false;
205 }
206 
207 
208 SPSGS_RequestBase::EPSGS_AccSubstitutioOption
GetAccessionSubstitutionOption(void)209 CPSGS_AsyncResolveBase::GetAccessionSubstitutionOption(void)
210 {
211     // The substitution makes sense only for resolve/get/annot requests
212     switch (m_Request->GetRequestType()) {
213         case CPSGS_Request::ePSGS_ResolveRequest:
214             return m_Request->GetRequest<SPSGS_ResolveRequest>().m_AccSubstOption;
215         case CPSGS_Request::ePSGS_BlobBySeqIdRequest:
216             return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_AccSubstOption;
217         case CPSGS_Request::ePSGS_AnnotationRequest:
218             return SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
219         default:
220             break;
221     }
222     NCBI_THROW(CPubseqGatewayException, eLogic,
223                "Not handled request type " +
224                to_string(static_cast<int>(m_Request->GetRequestType())));
225 }
226 
227 
228 EPSGS_AccessionAdjustmentResult
AdjustBioseqAccession(SBioseqResolution & bioseq_resolution)229 CPSGS_AsyncResolveBase::AdjustBioseqAccession(
230                                     SBioseqResolution &  bioseq_resolution)
231 {
232     if (CanSkipBioseqInfoRetrieval(bioseq_resolution.m_BioseqInfo)) {
233         if (m_Request->NeedTrace()) {
234             m_Reply->SendTrace("Accession adjustment is not required "
235                                "(bioseq info is not provided)",
236                                m_Request->GetStartTimestamp());
237         }
238         return ePSGS_NotRequired;
239     }
240 
241     auto    acc_subst_option = GetAccessionSubstitutionOption();
242     if (acc_subst_option == SPSGS_RequestBase::ePSGS_NeverAccSubstitute) {
243         if (m_Request->NeedTrace()) {
244             m_Reply->SendTrace("Accession adjustment is not required "
245                                "(substitute option is 'never')",
246                                m_Request->GetStartTimestamp());
247         }
248         return ePSGS_NotRequired;
249     }
250 
251     if (acc_subst_option == SPSGS_RequestBase::ePSGS_LimitedAccSubstitution &&
252         bioseq_resolution.m_BioseqInfo.GetSeqIdType() != CSeq_id::e_Gi) {
253         if (m_Request->NeedTrace()) {
254             m_Reply->SendTrace("Accession adjustment is not required "
255                                "(substitute option is 'limited' and seq_id_type is not gi)",
256                                m_Request->GetStartTimestamp());
257         }
258         return ePSGS_NotRequired;
259     }
260 
261     auto    adj_result = bioseq_resolution.AdjustAccession(m_Request, m_Reply);
262     if (adj_result == ePSGS_LogicError ||
263         adj_result == ePSGS_SeqIdsEmpty) {
264         if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
265             PSG_WARNING("BIOSEQ_INFO cache error: " +
266                         bioseq_resolution.m_AdjustmentError);
267         else
268             PSG_WARNING("BIOSEQ_INFO Cassandra error: " +
269                         bioseq_resolution.m_AdjustmentError);
270     }
271     return adj_result;
272 }
273 
274 
275 // The method is called when:
276 // - resolution is initialized
277 // - there was no record found at any stage, i.e. a next try should be
278 //   initiated
279 // NB: if a record was found, the method is not called. Instead, the pending
280 //     operation class is called back directly
x_Process(void)281 void CPSGS_AsyncResolveBase::x_Process(void)
282 {
283     switch (m_ResolveStage) {
284         case eInit:
285             if (!m_ComposedOk) {
286                 // The only thing to try is the AsIs resolution
287                 m_ResolveStage = eSecondaryAsIs;
288                 x_Process();
289                 break;
290             }
291 
292             if (m_PrimarySeqId.empty()) {
293                 m_ResolveStage = eSecondarySi2csi;
294                 x_Process();
295                 break;
296             }
297 
298             m_ResolveStage = ePrimaryBioseq;
299             x_Process();
300             break;
301 
302         case ePrimaryBioseq:
303             m_ResolveStage = eSecondarySi2csi;
304 
305             // true => with seq_id_type
306             x_PreparePrimaryBioseqInfoQuery(m_PrimarySeqId, m_EffectiveVersion,
307                                             m_EffectiveSeqIdType, -1, true);
308             break;
309 
310         case eSecondarySi2csi:
311             // loop over all secondary seq_id
312             if (m_SecondaryIndex >= m_SecondaryIdList.size()) {
313                 m_ResolveStage = eSecondaryAsIs;
314                 x_Process();
315                 break;
316             }
317             x_PrepareSecondarySi2csiQuery();
318             ++m_SecondaryIndex;
319             break;
320 
321         case eSecondaryAsIs:
322             m_ResolveStage = eSecondaryAsIsModified;
323             x_PrepareSecondaryAsIsSi2csiQuery();
324             break;
325 
326         case eSecondaryAsIsModified:
327             m_ResolveStage = eFinished;
328             x_PrepareSecondaryAsIsModifiedSi2csiQuery();
329             break;
330 
331         case ePostSi2Csi:
332             // Really, there is no stage after that. This is post processing.
333             // What is done is defined in the found or error callbacks.
334             // true => with seq_id_type
335             x_PreparePrimaryBioseqInfoQuery(
336                 m_BioseqResolution.m_BioseqInfo.GetAccession(),
337                 m_BioseqResolution.m_BioseqInfo.GetVersion(),
338                 m_BioseqResolution.m_BioseqInfo.GetSeqIdType(),
339                 m_BioseqResolution.m_BioseqInfo.GetGI(),
340                 true);
341             break;
342 
343         case eFinished:
344         default:
345             // 'not found' of PendingOperation
346             m_BioseqResolution.m_ResolutionResult = ePSGS_NotResolved;
347             m_BioseqResolution.m_BioseqInfo.Reset();
348 
349             x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
350     }
351 }
352 
353 
354 void
x_PreparePrimaryBioseqInfoQuery(const CBioseqInfoRecord::TAccession & seq_id,CBioseqInfoRecord::TVersion version,CBioseqInfoRecord::TSeqIdType seq_id_type,CBioseqInfoRecord::TGI gi,bool with_seq_id_type)355 CPSGS_AsyncResolveBase::x_PreparePrimaryBioseqInfoQuery(
356                             const CBioseqInfoRecord::TAccession &  seq_id,
357                             CBioseqInfoRecord::TVersion  version,
358                             CBioseqInfoRecord::TSeqIdType  seq_id_type,
359                             CBioseqInfoRecord::TGI  gi,
360                             bool  with_seq_id_type)
361 {
362     ++m_BioseqResolution.m_CassQueryCount;
363     m_BioseqInfoRequestedAccession = seq_id;
364     m_BioseqInfoRequestedVersion = version;
365     m_BioseqInfoRequestedSeqIdType = seq_id_type;
366     m_BioseqInfoRequestedGI = gi;
367 
368     unique_ptr<CCassBioseqInfoFetch>   details;
369     details.reset(new CCassBioseqInfoFetch());
370 
371     CBioseqInfoFetchRequest     bioseq_info_request;
372     bioseq_info_request.SetAccession(seq_id);
373     if (version != -1)
374         bioseq_info_request.SetVersion(version);
375     if (with_seq_id_type) {
376         if (seq_id_type != -1)
377             bioseq_info_request.SetSeqIdType(seq_id_type);
378     }
379     if (gi != -1)
380         bioseq_info_request.SetGI(gi);
381 
382     auto    app = CPubseqGatewayApp::GetInstance();
383     CCassBioseqInfoTaskFetch *  fetch_task =
384             new CCassBioseqInfoTaskFetch(
385                     app->GetCassandraTimeout(),
386                     app->GetCassandraMaxRetries(),
387                     app->GetCassandraConnection(),
388                     app->GetBioseqKeyspace(),
389                     bioseq_info_request,
390                     nullptr, nullptr);
391     details->SetLoader(fetch_task);
392 
393     if (with_seq_id_type)
394         fetch_task->SetConsumeCallback(
395             std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfo, this, _1));
396     else
397         fetch_task->SetConsumeCallback(
398             std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfoWithoutSeqIdType, this, _1));
399 
400     fetch_task->SetErrorCB(
401         std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfoError, this, _1, _2, _3, _4));
402     fetch_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
403 
404     m_BioseqInfoStart = chrono::high_resolution_clock::now();
405     if (with_seq_id_type) {
406         m_CurrentFetch = details.release();
407         m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_CurrentFetch));
408     } else {
409         m_NoSeqIdTypeFetch = details.release();
410         m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_NoSeqIdTypeFetch));
411     }
412 
413     if (m_Request->NeedTrace()) {
414         if (with_seq_id_type)
415             m_Reply->SendTrace(
416                 "Cassandra request: " +
417                 ToJson(bioseq_info_request).Repr(CJsonNode::fStandardJson),
418                 m_Request->GetStartTimestamp());
419         else
420             m_Reply->SendTrace(
421                 "Cassandra request for INSDC types: " +
422                 ToJson(bioseq_info_request).Repr(CJsonNode::fStandardJson),
423                 m_Request->GetStartTimestamp());
424     }
425 
426     fetch_task->Wait();
427 }
428 
429 
x_PrepareSi2csiQuery(const string & secondary_id,int16_t effective_seq_id_type)430 void CPSGS_AsyncResolveBase::x_PrepareSi2csiQuery(const string &  secondary_id,
431                                                int16_t  effective_seq_id_type)
432 {
433     ++m_BioseqResolution.m_CassQueryCount;
434 
435     unique_ptr<CCassSi2csiFetch>   details;
436     details.reset(new CCassSi2csiFetch());
437 
438     CSi2CsiFetchRequest     si2csi_request;
439     si2csi_request.SetSecSeqId(secondary_id);
440     if (effective_seq_id_type != -1)
441         si2csi_request.SetSecSeqIdType(effective_seq_id_type);
442 
443     auto    app = CPubseqGatewayApp::GetInstance();
444     CCassSI2CSITaskFetch *  fetch_task =
445             new CCassSI2CSITaskFetch(
446                     app->GetCassandraTimeout(),
447                     app->GetCassandraMaxRetries(),
448                     app->GetCassandraConnection(),
449                     app->GetBioseqKeyspace(),
450                     si2csi_request,
451                     nullptr, nullptr);
452 
453     details->SetLoader(fetch_task);
454 
455     fetch_task->SetConsumeCallback(std::bind(&CPSGS_AsyncResolveBase::x_OnSi2csiRecord, this, _1));
456     fetch_task->SetErrorCB(std::bind(&CPSGS_AsyncResolveBase::x_OnSi2csiError, this, _1, _2, _3, _4));
457     fetch_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
458 
459     m_CurrentFetch = details.release();
460 
461     m_Si2csiStart = chrono::high_resolution_clock::now();
462     m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_CurrentFetch));
463 
464     if (m_Request->NeedTrace())
465         m_Reply->SendTrace(
466             "Cassandra request: " +
467             ToJson(si2csi_request).Repr(CJsonNode::fStandardJson),
468             m_Request->GetStartTimestamp());
469 
470     fetch_task->Wait();
471 }
472 
473 
x_PrepareSecondarySi2csiQuery(void)474 void CPSGS_AsyncResolveBase::x_PrepareSecondarySi2csiQuery(void)
475 {
476     // Use m_SecondaryIndex, it was properly formed in the state machine
477     x_PrepareSi2csiQuery(*std::next(m_SecondaryIdList.begin(),
478                                     m_SecondaryIndex),
479                          m_EffectiveSeqIdType);
480 }
481 
482 
x_PrepareSecondaryAsIsSi2csiQuery(void)483 void CPSGS_AsyncResolveBase::x_PrepareSecondaryAsIsSi2csiQuery(void)
484 {
485     // Need to capitalize the seq_id before going to the tables.
486     // Capitalizing in place suites because the other tries are done via copies
487     // provided by OSLT
488     auto    upper_request_seq_id = GetRequestSeqId();
489     NStr::ToUpper(upper_request_seq_id);
490 
491     if (upper_request_seq_id == m_PrimarySeqId &&
492         GetRequestSeqIdType() == m_EffectiveSeqIdType) {
493         // Such a request has already been made; it was because the primary id
494         // matches the one from URL
495         x_Process();
496     } else {
497         x_PrepareSi2csiQuery(upper_request_seq_id, GetRequestSeqIdType());
498     }
499 }
500 
501 
x_PrepareSecondaryAsIsModifiedSi2csiQuery(void)502 void CPSGS_AsyncResolveBase::x_PrepareSecondaryAsIsModifiedSi2csiQuery(void)
503 {
504     auto    upper_request_seq_id = GetRequestSeqId();
505     NStr::ToUpper(upper_request_seq_id);
506 
507     // if there are | at the end => strip all trailing bars
508     // else => add one | at the end
509 
510     if (upper_request_seq_id[upper_request_seq_id.size() - 1] == '|') {
511         string      strip_bar_seq_id(upper_request_seq_id);
512         while (strip_bar_seq_id[strip_bar_seq_id.size() - 1] == '|')
513             strip_bar_seq_id.erase(strip_bar_seq_id.size() - 1, 1);
514 
515         x_PrepareSi2csiQuery(strip_bar_seq_id, GetRequestSeqIdType());
516     } else {
517         string      seq_id_added_bar(upper_request_seq_id);
518         seq_id_added_bar.append(1, '|');
519 
520         x_PrepareSi2csiQuery(seq_id_added_bar, GetRequestSeqIdType());
521     }
522 }
523 
524 
x_OnBioseqInfo(vector<CBioseqInfoRecord> && records)525 void CPSGS_AsyncResolveBase::x_OnBioseqInfo(vector<CBioseqInfoRecord>&&  records)
526 {
527     auto    record_count = records.size();
528     auto    app = CPubseqGatewayApp::GetInstance();
529     m_CurrentFetch->SetReadFinished();
530 
531     if (m_Request->NeedTrace()) {
532         string  msg = to_string(records.size()) + " hit(s)";
533         for (const auto &  item : records) {
534             msg += "\n" + ToJson(item, SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
535                             Repr(CJsonNode::fStandardJson);
536         }
537         m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
538     }
539 
540     size_t  index_to_pick = 0;
541     if (record_count > 1) {
542         // Multiple records:
543         // => choose the highest version;
544         //    if more than one with the highest version then choose the highest
545         //    date changed
546         auto        version = records[0].GetVersion();
547         auto        date_changed = records[0].GetDateChanged();
548         for (size_t  k = 0; k < records.size(); ++k) {
549             if (records[k].GetVersion() > version) {
550                 index_to_pick = k;
551                 version = records[k].GetVersion();
552                 date_changed = records[k].GetDateChanged();
553             } else {
554                 if (records[k].GetVersion() == version) {
555                     if (records[k].GetDateChanged() > date_changed) {
556                         index_to_pick = k;
557                         date_changed = records[k].GetDateChanged();
558                     }
559                 }
560             }
561         }
562         // Pretend there was exactly one record
563         record_count = 1;
564     }
565 
566     if (record_count != 1) {
567         // Did not find anything. Need more tries
568         if (record_count > 1) {
569             app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
570                                       m_BioseqInfoStart);
571             app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundMany);
572         } else {
573             app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusNotFound,
574                                       m_BioseqInfoStart);
575             app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoNotFound);
576         }
577 
578         if (record_count == 0 && IsINSDCSeqIdType(m_BioseqInfoRequestedSeqIdType)) {
579             x_PreparePrimaryBioseqInfoQuery(
580                 m_BioseqInfoRequestedAccession, m_BioseqInfoRequestedVersion,
581                 m_BioseqInfoRequestedSeqIdType, m_BioseqInfoRequestedGI,
582                 false);
583             return;
584         }
585 
586         if (m_ResolveStage == ePostSi2Csi) {
587             // Special case for post si2csi results; no next stage
588             if (record_count > 1) {
589                 m_ErrorCB(
590                     CRequestStatus::e502_BadGateway,
591                     ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
592                     "Data inconsistency. More than one BIOSEQ_INFO table record is found for "
593                     "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
594             } else {
595                 m_ErrorCB(
596                     CRequestStatus::e502_BadGateway,
597                     ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
598                     "Data inconsistency. A BIOSEQ_INFO table record is not found for "
599                     "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
600             }
601             return;
602         }
603 
604         x_Process();
605         return;
606     }
607 
608     // Looking good data have appeared => inform the upper level
609     x_SignalStartProcessing();
610 
611     if (m_Request->NeedTrace()) {
612         string      prefix;
613         if (records.size() == 1)
614             prefix = "Selected record:\n";
615         else
616             prefix = "Selected (out of multiple records) the record with the highest version "
617                      "(and with highest date changed if more than one with highest version):\n";
618         m_Reply->SendTrace(
619             prefix +
620             ToJson(records[index_to_pick], SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
621                 Repr(CJsonNode::fStandardJson),
622             m_Request->GetStartTimestamp());
623     }
624 
625     m_BioseqResolution.m_ResolutionResult = ePSGS_BioseqDB;
626     m_BioseqResolution.m_BioseqInfo = std::move(records[index_to_pick]);
627 
628     // Adjust accession if needed
629     auto    adj_result = AdjustBioseqAccession(m_BioseqResolution);
630     if (adj_result == ePSGS_LogicError || adj_result == ePSGS_SeqIdsEmpty) {
631         // The problem has already been logged
632         m_ErrorCB(
633             CRequestStatus::e502_BadGateway,
634             ePSGS_BioseqInfoAccessionAdjustmentError, eDiag_Error,
635             "BIOSEQ_INFO Cassandra error: " + m_BioseqResolution.m_AdjustmentError);
636         return;
637     }
638 
639     // Everything is fine
640     app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
641                               m_BioseqInfoStart);
642     app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundOne);
643 
644     x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
645 }
646 
647 
x_OnBioseqInfoWithoutSeqIdType(vector<CBioseqInfoRecord> && records)648 void CPSGS_AsyncResolveBase::x_OnBioseqInfoWithoutSeqIdType(
649                                         vector<CBioseqInfoRecord>&&  records)
650 {
651     m_NoSeqIdTypeFetch->SetReadFinished();
652 
653     auto                app = CPubseqGatewayApp::GetInstance();
654     SINSDCDecision      decision = DecideINSDC(records, m_BioseqInfoRequestedVersion);
655 
656     if (m_Request->NeedTrace()) {
657         string  msg = to_string(records.size()) +
658                       " hit(s); decision status: " + to_string(decision.status);
659         for (const auto &  item : records) {
660             msg += "\n" + ToJson(item, SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
661                             Repr(CJsonNode::fStandardJson);
662         }
663         m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
664     }
665 
666     switch (decision.status) {
667         case CRequestStatus::e200_Ok:
668             // Looking good data have appeared => inform the upper level
669             x_SignalStartProcessing();
670 
671             m_BioseqResolution.m_ResolutionResult = ePSGS_BioseqDB;
672 
673             app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
674                                       m_BioseqInfoStart);
675             app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundOne);
676             m_BioseqResolution.m_BioseqInfo = std::move(records[decision.index]);
677 
678             // Data callback
679             x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
680             break;
681         case CRequestStatus::e404_NotFound:
682             app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusNotFound,
683                                       m_BioseqInfoStart);
684             app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoNotFound);
685             if (m_ResolveStage == ePostSi2Csi) {
686                 m_ErrorCB(
687                     CRequestStatus::e502_BadGateway,
688                     ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
689                     "Data inconsistency. A BIOSEQ_INFO table record is not found for "
690                     "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
691             } else {
692                 // Move to the next stage
693                 x_Process();
694             }
695             break;
696         case CRequestStatus::e500_InternalServerError:
697             app->GetTiming().Register(eLookupCassBioseqInfo, eOpStatusFound,
698                                       m_BioseqInfoStart);
699             app->GetCounters().Increment(CPSGSCounters::ePSGS_BioseqInfoFoundMany);
700             if (m_ResolveStage == ePostSi2Csi) {
701                 m_ErrorCB(
702                     CRequestStatus::e502_BadGateway,
703                     ePSGS_BioseqInfoNotFoundForGi, eDiag_Error,
704                     "Data inconsistency. More than one BIOSEQ_INFO table record is found for "
705                     "accession " + m_BioseqResolution.m_BioseqInfo.GetAccession());
706 
707             } else {
708                 // Move to the next stage
709                 x_Process();
710             }
711             break;
712         default:
713             // Impossible
714             m_ErrorCB(
715                 CRequestStatus::e500_InternalServerError, ePSGS_ServerLogicError,
716                 eDiag_Error, "Unexpected decision code when a secondary INSCD "
717                 "request results processed while resolving seq id asynchronously");
718     }
719 }
720 
721 
x_OnBioseqInfoError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)722 void CPSGS_AsyncResolveBase::x_OnBioseqInfoError(CRequestStatus::ECode  status, int  code,
723                                               EDiagSev  severity, const string &  message)
724 {
725     if (m_Request->NeedTrace())
726         m_Reply->SendTrace("Cassandra error: " + message,
727                            m_Request->GetStartTimestamp());
728 
729     if (m_CurrentFetch)
730         m_CurrentFetch->SetReadFinished();
731     if (m_NoSeqIdTypeFetch)
732         m_NoSeqIdTypeFetch->SetReadFinished();
733 
734     CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
735                                 CPSGSCounters::ePSGS_BioseqInfoError);
736 
737     m_ErrorCB(status, code, severity, message);
738 }
739 
740 
x_OnSi2csiRecord(vector<CSI2CSIRecord> && records)741 void CPSGS_AsyncResolveBase::x_OnSi2csiRecord(vector<CSI2CSIRecord> &&  records)
742 {
743     auto    record_count = records.size();
744     auto    app = CPubseqGatewayApp::GetInstance();
745     m_CurrentFetch->SetReadFinished();
746 
747     if (m_Request->NeedTrace()) {
748         string  msg = to_string(record_count) + " hit(s)";
749         for (const auto &  item : records) {
750             msg += "\n" + ToJson(item).Repr(CJsonNode::fStandardJson);
751         }
752         if (record_count > 1)
753             msg += "\nMore than one record => may be more tries";
754 
755         m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
756     }
757 
758     if (record_count != 1) {
759         // Multiple records or did not find anything. Need more tries
760         if (record_count > 1) {
761             app->GetTiming().Register(eLookupCassSi2csi, eOpStatusFound,
762                                       m_Si2csiStart);
763             app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiFoundMany);
764         } else {
765             app->GetTiming().Register(eLookupCassSi2csi, eOpStatusNotFound,
766                                       m_Si2csiStart);
767             app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiNotFound);
768         }
769 
770         x_Process();
771         return;
772     }
773 
774     // Looking good data have appeared
775     x_SignalStartProcessing();
776 
777     app->GetTiming().Register(eLookupCassSi2csi, eOpStatusFound,
778                               m_Si2csiStart);
779     app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiFoundOne);
780 
781     m_BioseqResolution.m_ResolutionResult = ePSGS_Si2csiDB;
782     m_BioseqResolution.m_BioseqInfo.SetAccession(records[0].GetAccession());
783     m_BioseqResolution.m_BioseqInfo.SetVersion(records[0].GetVersion());
784     m_BioseqResolution.m_BioseqInfo.SetSeqIdType(records[0].GetSeqIdType());
785     m_BioseqResolution.m_BioseqInfo.SetGI(records[0].GetGI());
786 
787     // Special case for the seq_id like gi|156232
788     if (!CanSkipBioseqInfoRetrieval(m_BioseqResolution.m_BioseqInfo)) {
789         m_ResolveStage = ePostSi2Csi;
790         x_Process();
791         return;
792     }
793 
794     x_OnSeqIdAsyncResolutionFinished(move(m_BioseqResolution));
795 }
796 
797 
x_OnSi2csiError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)798 void CPSGS_AsyncResolveBase::x_OnSi2csiError(CRequestStatus::ECode  status, int  code,
799                                           EDiagSev  severity, const string &  message)
800 {
801     if (m_Request->NeedTrace())
802         m_Reply->SendTrace("Cassandra error: " + message,
803                            m_Request->GetStartTimestamp());
804 
805     auto    app = CPubseqGatewayApp::GetInstance();
806 
807     m_CurrentFetch->SetReadFinished();
808     app->GetCounters().Increment(CPSGSCounters::ePSGS_Si2csiError);
809 
810     m_ErrorCB(status, code, severity, message);
811 }
812 
813 
814 void
x_OnSeqIdAsyncResolutionFinished(SBioseqResolution && async_bioseq_resolution)815 CPSGS_AsyncResolveBase::x_OnSeqIdAsyncResolutionFinished(
816                                 SBioseqResolution &&  async_bioseq_resolution)
817 {
818     auto    app = CPubseqGatewayApp::GetInstance();
819 
820     if (async_bioseq_resolution.IsValid()) {
821         // Just in case; the second call will be prevented anyway
822         x_SignalStartProcessing();
823 
824         m_FinishedCB(move(async_bioseq_resolution));
825     } else {
826         app->GetCounters().Increment(CPSGSCounters::ePSGS_InputSeqIdNotResolved);
827 
828         if (async_bioseq_resolution.m_Error.HasError())
829             m_ErrorCB(
830                     async_bioseq_resolution.m_Error.m_ErrorCode,
831                     ePSGS_UnresolvedSeqId, eDiag_Error,
832                     async_bioseq_resolution.m_Error.m_ErrorMessage);
833         else
834             m_ErrorCB(
835                     CRequestStatus::e404_NotFound, ePSGS_UnresolvedSeqId,
836                     eDiag_Error, "Could not resolve seq_id " + GetRequestSeqId());
837     }
838 }
839 
840 
841 
x_SignalStartProcessing(void)842 void CPSGS_AsyncResolveBase::x_SignalStartProcessing(void)
843 {
844     if (!m_StartProcessingCalled) {
845         m_StartProcessingCalled = true;
846         m_StartProcessingCB();
847     }
848 }
849 
850