1 /*  $Id: osg_annot.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Eugene Vasilchenko
27  *
28  * File Description: processor for data from OSG
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "osg_annot.hpp"
35 #include "osg_getblob_base.hpp"
36 #include "osg_fetch.hpp"
37 #include "osg_connection.hpp"
38 #include "pubseq_gateway.hpp"
39 
40 #include <objects/seqloc/Seq_id.hpp>
41 #include <objects/seq/Seq_annot.hpp>
42 #include <objects/id2/id2__.hpp>
43 #include <objects/seqsplit/seqsplit__.hpp>
44 #include <util/range.hpp>
45 
46 BEGIN_NCBI_NAMESPACE;
47 BEGIN_NAMESPACE(psg);
48 BEGIN_NAMESPACE(osg);
49 
50 
CPSGS_OSGAnnot(const CRef<COSGConnectionPool> & pool,const shared_ptr<CPSGS_Request> & request,const shared_ptr<CPSGS_Reply> & reply,TProcessorPriority priority)51 CPSGS_OSGAnnot::CPSGS_OSGAnnot(const CRef<COSGConnectionPool>& pool,
52                                const shared_ptr<CPSGS_Request>& request,
53                                const shared_ptr<CPSGS_Reply>& reply,
54                                TProcessorPriority priority)
55     : CPSGS_OSGProcessorBase(pool, request, reply, priority)
56 {
57 }
58 
59 
~CPSGS_OSGAnnot()60 CPSGS_OSGAnnot::~CPSGS_OSGAnnot()
61 {
62 }
63 
64 
GetName() const65 string CPSGS_OSGAnnot::GetName() const
66 {
67     return "OSG-annot";
68 }
69 
70 
CanProcess(SPSGS_AnnotRequest & request,TProcessorPriority priority)71 bool CPSGS_OSGAnnot::CanProcess(SPSGS_AnnotRequest& request,
72                                 TProcessorPriority priority)
73 {
74     // check if id is good enough
75     CSeq_id id;
76     try {
77         SetSeqId(id, request.m_SeqIdType, request.m_SeqId);
78     }
79     catch ( exception& /*ignore*/ ) {
80         return false;
81     }
82     if ( !id.IsGi() && !id.GetTextseq_Id() ) {
83         return false;
84     }
85     //if ( !CanResolve(request.m_SeqIdType, request.m_SeqId) ) {
86     //    return false;
87     //}
88     return !GetNamesToProcess(request, priority).empty();
89 }
90 
91 
GetNamesToProcess(SPSGS_AnnotRequest & request,TProcessorPriority priority)92 set<string> CPSGS_OSGAnnot::GetNamesToProcess(SPSGS_AnnotRequest& request,
93                                               TProcessorPriority priority)
94 {
95     set<string> ret;
96     for ( auto& name : request.GetNotProcessedName(priority) ) {
97         if ( CanProcessAnnotName(name) ) {
98             ret.insert(name);
99         }
100     }
101     return ret;
102 }
103 
104 
IsCDDName(const string & name)105 static bool IsCDDName(const string& name)
106 {
107     return NStr::EqualNocase(name, "CDD");
108 }
109 
110 
111 // primary SNP track
IsPrimarySNPName(const string & name)112 static bool IsPrimarySNPName(const string& name)
113 {
114     return NStr::EqualNocase(name, "SNP");
115 }
116 
117 
118 // explicit name for a SNP track
IsExplicitSNPName(const string & name)119 static bool IsExplicitSNPName(const string& name)
120 {
121     return NStr::StartsWith(name, "NA", NStr::eNocase) && name.find("#") != NPOS;
122 }
123 
124 
IsSNPName(const string & name)125 static bool IsSNPName(const string& name)
126 {
127     return IsPrimarySNPName(name) || IsExplicitSNPName(name);
128 }
129 
130 
CanProcessAnnotName(const string & name)131 bool CPSGS_OSGAnnot::CanProcessAnnotName(const string& name)
132 {
133     auto app = CPubseqGatewayApp::GetInstance();
134     auto& config = *app->GetOSGConnectionPool();
135     return
136         (config.GetEnabledCDD() && IsCDDName(name)) ||
137         (config.GetEnabledSNP() && IsSNPName(name));
138 }
139 
140 
CreateRequests()141 void CPSGS_OSGAnnot::CreateRequests()
142 {
143     auto& psg_req = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
144     CRef<CID2_Request> osg_req(new CID2_Request);
145     auto& req = osg_req->SetRequest().SetGet_blob_id();
146     SetSeqId(req.SetSeq_id().SetSeq_id().SetSeq_id(), psg_req.m_SeqIdType, psg_req.m_SeqId);
147     m_NamesToProcess.clear();
148     m_ApplyCDDFix = false;
149     for ( auto& name : GetNamesToProcess(psg_req, GetPriority()) ) {
150         m_NamesToProcess.insert(name);
151         if ( IsCDDName(name) ) {
152             // CDD are external annotations in OSG
153             req.SetExternal();
154             m_ApplyCDDFix = GetConnectionPool().GetCDDRetryTimeout() > 0;
155         }
156         else {
157             // others have named annot accession (source)
158             req.SetSources().push_back(name);
159         }
160     }
161     AddRequest(osg_req);
162 }
163 
164 
NotifyOSGCallStart()165 void CPSGS_OSGAnnot::NotifyOSGCallStart()
166 {
167     if ( m_ApplyCDDFix ) {
168         m_CDDReceived = false;
169         m_RequestTime.Restart();
170     }
171 }
172 
173 
NotifyOSGCallReply(const CID2_Reply & reply)174 void CPSGS_OSGAnnot::NotifyOSGCallReply(const CID2_Reply& reply)
175 {
176     if ( m_ApplyCDDFix ) {
177         if ( IsCDDReply(reply) ) {
178             m_CDDReceived = true;
179         }
180     }
181 }
182 
183 
NotifyOSGCallEnd()184 void CPSGS_OSGAnnot::NotifyOSGCallEnd()
185 {
186     if ( m_ApplyCDDFix ) {
187         if ( !m_CDDReceived &&
188              m_RequestTime.Elapsed() > GetConnectionPool().GetCDDRetryTimeout() ) {
189             NCBI_THROW(CPubseqGatewayException, eRequestCancelled, "no CDD due to OSG timeout");
190         }
191     }
192 }
193 
194 
ProcessReplies()195 void CPSGS_OSGAnnot::ProcessReplies()
196 {
197     for ( auto& f : GetFetches() ) {
198         if ( GetDebugLevel() >= eDebug_exchange ) {
199             LOG_POST(GetDiagSeverity() << "OSG: "
200                      "Processing fetch: "<<MSerial_AsnText<<f->GetRequest());
201         }
202         for ( auto& r : f->GetReplies() ) {
203             if ( GetDebugLevel() >= eDebug_exchange ) {
204                 LOG_POST(GetDiagSeverity() << "OSG: "
205                          "Processing reply: "<<MSerial_AsnText<<*r);
206             }
207             switch ( r->GetReply().Which() ) {
208             case CID2_Reply::TReply::e_Init:
209             case CID2_Reply::TReply::e_Empty:
210                 // do nothing
211                 break;
212             case CID2_Reply::TReply::e_Get_seq_id:
213                 // do nothing
214                 break;
215             case CID2_Reply::TReply::e_Get_blob_id:
216                 AddBlobId(r->GetReply().GetGet_blob_id());
217                 break;
218             default:
219                 ERR_POST(GetName()<<": "
220                          "Unknown reply to "<<MSerial_AsnText<<*f->GetRequest()<<"\n"<<*r);
221                 break;
222             }
223         }
224     }
225     SendReplies();
226     FinalizeResult(ePSGS_Found);
227 }
228 
229 
AddBlobId(const CID2_Reply_Get_Blob_Id & blob_id)230 void CPSGS_OSGAnnot::AddBlobId(const CID2_Reply_Get_Blob_Id& blob_id)
231 {
232     if ( !blob_id.IsSetBlob_id() ) {
233         return;
234     }
235     if ( !blob_id.IsSetAnnot_info() ) {
236         return;
237     }
238     m_BlobIds.push_back(Ref(&blob_id));
239 }
240 
241 
242 namespace {
243     struct SAnnotInfo {
244         // NA accession
245         string annot_name;
246         // annotated location
247         string accession;
248         int version;
249         int seq_id_type;
250         CRange<TSeqPos> range;
251         // annotation types
252         CJsonNode json;
253 
Add__anonc0a29e200111::SAnnotInfo254         void Add(const CID2S_Seq_loc& loc) {
255             switch ( loc.Which() ) {
256             case CID2S_Seq_loc::e_Whole_gi:
257                 Add(loc.GetWhole_gi());
258                 break;
259             case CID2S_Seq_loc::e_Whole_seq_id:
260                 Add(loc.GetWhole_seq_id());
261                 break;
262             case CID2S_Seq_loc::e_Whole_gi_range:
263                 Add(loc.GetWhole_gi_range());
264                 break;
265             case CID2S_Seq_loc::e_Gi_interval:
266                 Add(loc.GetGi_interval());
267                 break;
268             case CID2S_Seq_loc::e_Seq_id_interval:
269                 Add(loc.GetSeq_id_interval());
270                 break;
271             case CID2S_Seq_loc::e_Gi_ints:
272                 Add(loc.GetGi_ints());
273                 break;
274             case CID2S_Seq_loc::e_Seq_id_ints:
275                 Add(loc.GetSeq_id_ints());
276                 break;
277             case CID2S_Seq_loc::e_Loc_set:
278                 for ( auto& l : loc.GetLoc_set() ) {
279                     Add(*l);
280                 }
281                 break;
282             default:
283                 break;
284             }
285         }
Add__anonc0a29e200111::SAnnotInfo286         void Add(TGi gi) {
287             Add(gi, CRange<TSeqPos>::GetWhole());
288         }
Add__anonc0a29e200111::SAnnotInfo289         void Add(const CSeq_id& id) {
290             Add(id, CRange<TSeqPos>::GetWhole());
291         }
Add__anonc0a29e200111::SAnnotInfo292         void Add(TGi gi, TSeqPos start, TSeqPos length) {
293             Add(gi, COpenRange<TSeqPos>(start, start+length));
294         }
Add__anonc0a29e200111::SAnnotInfo295         void Add(const CSeq_id& id, TSeqPos start, TSeqPos length) {
296             Add(id, COpenRange<TSeqPos>(start, start+length));
297         }
Add__anonc0a29e200111::SAnnotInfo298         void Add(TGi gi, CRange<TSeqPos> range) {
299             Add(CSeq_id(CSeq_id::e_Gi, gi), range);
300         }
SetSeqId__anonc0a29e200111::SAnnotInfo301         void SetSeqId(const CSeq_id& id) {
302             string new_accession;
303             int new_version = 0;
304             int new_type = id.Which();
305             if ( auto text_id = id.GetTextseq_Id() ) {
306                 if ( text_id->IsSetAccession() && text_id->IsSetVersion() ) {
307                     new_accession = text_id->GetAccession();
308                     new_version = text_id->GetVersion();
309                 }
310             }
311             else {
312                 id.GetLabel(&new_accession, CSeq_id::eFastaContent);
313             }
314             if ( accession.empty() ) {
315                 accession = new_accession;
316                 version = new_version;
317                 seq_id_type = new_type;
318             }
319             else if ( accession != new_accession ||
320                       version != new_version ||
321                       seq_id_type != new_type ) {
322                 ERR_POST("OSG-annot: multiple annotated Seq-ids");
323                 throw runtime_error("");
324             }
325         }
Add__anonc0a29e200111::SAnnotInfo326         void Add(const CSeq_id& id, CRange<TSeqPos> add_range) {
327             SetSeqId(id);
328             range.CombineWith(add_range);
329         }
Add__anonc0a29e200111::SAnnotInfo330         void Add(const CID2S_Gi_Range& gi_range) {
331             for ( TIntId i = 0; i < gi_range.GetCount(); ++i ) {
332                 Add(GI_FROM(TIntId, GI_TO(TIntId, gi_range.GetStart())+i));
333             }
334         }
Add__anonc0a29e200111::SAnnotInfo335         void Add(const CID2S_Gi_Interval& interval) {
336             Add(interval.GetGi(), interval.GetStart(), interval.GetLength());
337         }
Add__anonc0a29e200111::SAnnotInfo338         void Add(const CID2S_Seq_id_Interval& interval) {
339             Add(interval.GetSeq_id(), interval.GetStart(), interval.GetLength());
340         }
Add__anonc0a29e200111::SAnnotInfo341         void Add(const CID2S_Gi_Ints& ints) {
342             for ( auto& i : ints.GetInts() ) {
343                 Add(ints.GetGi(), i->GetStart(), i->GetLength());
344             }
345         }
Add__anonc0a29e200111::SAnnotInfo346         void Add(const CID2S_Seq_id_Ints& ints) {
347             for ( auto& i : ints.GetInts() ) {
348                 Add(ints.GetSeq_id(), i->GetStart(), i->GetLength());
349             }
350         }
351 
SetAnnotName__anonc0a29e200111::SAnnotInfo352         void SetAnnotName(const string& name)
353             {
354                 if ( annot_name.empty() ) {
355                     annot_name = name;
356                 }
357                 else if ( annot_name != name ) {
358                     ERR_POST("OSG-annot: multiple annot accessions: "<<annot_name<<" <> "<<name);
359                     throw runtime_error("");
360                 }
361             }
362 
SAnnotInfo__anonc0a29e200111::SAnnotInfo363         SAnnotInfo(const list<CRef<CID2S_Seq_annot_Info>>& annot_infos)
364             : range(CRange<TSeqPos>::GetEmpty()),
365               json(CJsonNode::NewObjectNode())
366             {
367                 vector<int64_t> zooms;
368                 for ( auto& ai : annot_infos ) {
369                     // collect location
370                     if ( ai->IsSetSeq_loc() ) {
371                         Add(ai->GetSeq_loc());
372                     }
373 
374                     // collect name
375                     auto& full_name = ai->GetName();
376                     string acc;
377                     SIZE_TYPE zoom_pos = full_name.find("@@");
378                     if ( zoom_pos != NPOS ) {
379                         SetAnnotName(full_name.substr(0, zoom_pos));
380                         zooms.push_back(NStr::StringToInt(full_name.substr(zoom_pos+2)));
381                     }
382                     else {
383                         SetAnnotName(full_name);
384                     }
385 
386                     // collect types
387                     if ( ai->IsSetAlign() ) {
388                         CJsonNode type_json = CJsonNode::NewArrayNode();
389                         type_json.AppendInteger(0);
390                         json.SetByKey(to_string(CSeq_annot::C_Data::e_Align), type_json);
391                     }
392                     if ( ai->IsSetGraph() ) {
393                         CJsonNode type_json = CJsonNode::NewArrayNode();
394                         type_json.AppendInteger(0);
395                         json.SetByKey(to_string(CSeq_annot::C_Data::e_Graph), type_json);
396                     }
397                     if ( ai->IsSetFeat() ) {
398                         auto& types = ai->GetFeat();
399                         if ( types.empty() ||
400                              (types.size() == 1 &&
401                               types.front()->GetType() == 0 &&
402                               !types.front()->IsSetSubtypes()) ) {
403                             CJsonNode type_json = CJsonNode::NewArrayNode();
404                             type_json.AppendInteger(0);
405                             json.SetByKey(to_string(CSeq_annot::C_Data::e_Seq_table), type_json);
406                         }
407                         else {
408                             CJsonNode type_json = CJsonNode::NewObjectNode();
409                             for ( auto& feat_type : types ) {
410                                 CJsonNode subtype_json = CJsonNode::NewArrayNode();
411                                 if ( feat_type->IsSetSubtypes() ) {
412                                     for ( auto feat_subtype : feat_type->GetSubtypes() ) {
413                                         subtype_json.AppendInteger(feat_subtype);
414                                     }
415                                 }
416                                 type_json.SetByKey(to_string(feat_type->GetType()), subtype_json);
417                             }
418                             json.SetByKey(to_string(CSeq_annot::C_Data::e_Ftable), type_json);
419                         }
420                     }
421                 }
422 
423                 // add collected zoom levels
424                 if ( !zooms.empty() ) {
425                     CJsonNode zooms_json = CJsonNode::NewArrayNode();
426                     for ( auto zoom : zooms ) {
427                         zooms_json.AppendInteger(zoom);
428                     }
429                     json.SetByKey("2048", zooms_json);
430                 }
431             }
432     };
433 }
434 
435 
SendReplies()436 void CPSGS_OSGAnnot::SendReplies()
437 {
438     if ( GetDebugLevel() >= eDebug_exchange ) {
439         for ( auto& name : m_NamesToProcess ) {
440             LOG_POST(GetDiagSeverity() << "OSG: "
441                      "Asked for annot "<<name);
442         }
443         for ( auto& r : m_BlobIds ) {
444             LOG_POST(GetDiagSeverity() << "OSG: "
445                      "Received annot reply "<<MSerial_AsnText<<*r);
446         }
447     }
448     auto& psg_req = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
449     for ( auto& r : m_BlobIds ) {
450         if ( !CPSGS_OSGGetBlobBase::IsOSGBlob(r->GetBlob_id()) ) {
451             continue;
452         }
453         string psg_blob_id = CPSGS_OSGGetBlobBase::GetPSGBlobId(r->GetBlob_id());
454         CJsonNode       json(CJsonNode::NewObjectNode());
455         json.SetString("blob_id", psg_blob_id);
456         if ( r->GetBlob_id().IsSetVersion() ) {
457             json.SetInteger("last_modified", r->GetBlob_id().GetVersion()*60000);
458         }
459         string annot_name;
460         try {
461             SAnnotInfo info(r->GetAnnot_info());
462             annot_name = info.annot_name;
463             json.SetString("accession", info.accession);
464             json.SetInteger("version", info.version);
465             json.SetInteger("seq_id_type", info.seq_id_type);
466             if ( info.range != CRange<TSeqPos>::GetWhole() ) {
467                 json.SetInteger("start", info.range.GetFrom());
468                 json.SetInteger("stop", info.range.GetTo());
469             }
470             else {
471                 // whole sequence
472                 json.SetInteger("start", 0);
473                 json.SetInteger("stop", 0);
474             }
475             json.SetString("annot_info", info.json.Repr(CJsonNode::fStandardJson));
476         }
477         catch ( exception& ) {
478             ERR_POST(GetName()<<": "
479                      "Bad annot-info: "<<MSerial_AsnText<<*r);
480             // find default annot_name
481             for ( auto& ai : r->GetAnnot_info() ) {
482                 if ( m_NamesToProcess.count(ai->GetName()) ) {
483                     annot_name = ai->GetName();
484                     break;
485                 }
486             }
487         }
488         if ( r->IsSetAnnot_info() ) {
489             // set ASN.1 annot info
490             ostringstream str;
491             for ( auto& info : r->GetAnnot_info() ) {
492                 str << MSerial_AsnBinary << *info;
493             }
494             json.SetString("seq_annot_info", NStr::Base64Encode(str.str(), 0));
495         }
496         if ( m_NamesToProcess.count(annot_name) ) {
497             GetReply()->PrepareNamedAnnotationData(annot_name, GetName(),
498                                                    json.Repr(CJsonNode::fStandardJson));
499         }
500         //GetReply()->PrepareReplyCompletion();
501     }
502     // register processed names
503     for ( auto& name : m_NamesToProcess ) {
504         psg_req.RegisterProcessedName(GetPriority(), name);
505     }
506 }
507 
508 
IsCDDReply(const CID2_Reply & reply) const509 bool CPSGS_OSGAnnot::IsCDDReply(const CID2_Reply& reply) const
510 {
511     if ( !reply.GetReply().IsGet_blob_id() ) {
512         return false;
513     }
514 
515     const CID2_Reply_Get_Blob_Id& blob_id = reply.GetReply().GetGet_blob_id();
516     if ( !blob_id.IsSetBlob_id() || !CPSGS_OSGGetBlobBase::IsOSGBlob(blob_id.GetBlob_id()) ) {
517         return false;
518     }
519 
520     if ( !blob_id.IsSetAnnot_info() ) {
521         return false;
522     }
523     try {
524         SAnnotInfo info(blob_id.GetAnnot_info());
525         return IsCDDName(info.annot_name);
526     }
527     catch ( exception& /*ignored*/ ) {
528         return false;
529     }
530 }
531 
532 
533 END_NAMESPACE(osg);
534 END_NAMESPACE(psg);
535 END_NCBI_NAMESPACE;
536