1 /* $Id: osg_annot.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Eugene Vasilchenko
27 *
28 * File Description: processor for data from OSG
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "osg_annot.hpp"
35 #include "osg_getblob_base.hpp"
36 #include "osg_fetch.hpp"
37 #include "osg_connection.hpp"
38 #include "pubseq_gateway.hpp"
39
40 #include <objects/seqloc/Seq_id.hpp>
41 #include <objects/seq/Seq_annot.hpp>
42 #include <objects/id2/id2__.hpp>
43 #include <objects/seqsplit/seqsplit__.hpp>
44 #include <util/range.hpp>
45
46 BEGIN_NCBI_NAMESPACE;
47 BEGIN_NAMESPACE(psg);
48 BEGIN_NAMESPACE(osg);
49
50
CPSGS_OSGAnnot(const CRef<COSGConnectionPool> & pool,const shared_ptr<CPSGS_Request> & request,const shared_ptr<CPSGS_Reply> & reply,TProcessorPriority priority)51 CPSGS_OSGAnnot::CPSGS_OSGAnnot(const CRef<COSGConnectionPool>& pool,
52 const shared_ptr<CPSGS_Request>& request,
53 const shared_ptr<CPSGS_Reply>& reply,
54 TProcessorPriority priority)
55 : CPSGS_OSGProcessorBase(pool, request, reply, priority)
56 {
57 }
58
59
~CPSGS_OSGAnnot()60 CPSGS_OSGAnnot::~CPSGS_OSGAnnot()
61 {
62 }
63
64
GetName() const65 string CPSGS_OSGAnnot::GetName() const
66 {
67 return "OSG-annot";
68 }
69
70
CanProcess(SPSGS_AnnotRequest & request,TProcessorPriority priority)71 bool CPSGS_OSGAnnot::CanProcess(SPSGS_AnnotRequest& request,
72 TProcessorPriority priority)
73 {
74 // check if id is good enough
75 CSeq_id id;
76 try {
77 SetSeqId(id, request.m_SeqIdType, request.m_SeqId);
78 }
79 catch ( exception& /*ignore*/ ) {
80 return false;
81 }
82 if ( !id.IsGi() && !id.GetTextseq_Id() ) {
83 return false;
84 }
85 //if ( !CanResolve(request.m_SeqIdType, request.m_SeqId) ) {
86 // return false;
87 //}
88 return !GetNamesToProcess(request, priority).empty();
89 }
90
91
GetNamesToProcess(SPSGS_AnnotRequest & request,TProcessorPriority priority)92 set<string> CPSGS_OSGAnnot::GetNamesToProcess(SPSGS_AnnotRequest& request,
93 TProcessorPriority priority)
94 {
95 set<string> ret;
96 for ( auto& name : request.GetNotProcessedName(priority) ) {
97 if ( CanProcessAnnotName(name) ) {
98 ret.insert(name);
99 }
100 }
101 return ret;
102 }
103
104
IsCDDName(const string & name)105 static bool IsCDDName(const string& name)
106 {
107 return NStr::EqualNocase(name, "CDD");
108 }
109
110
111 // primary SNP track
IsPrimarySNPName(const string & name)112 static bool IsPrimarySNPName(const string& name)
113 {
114 return NStr::EqualNocase(name, "SNP");
115 }
116
117
118 // explicit name for a SNP track
IsExplicitSNPName(const string & name)119 static bool IsExplicitSNPName(const string& name)
120 {
121 return NStr::StartsWith(name, "NA", NStr::eNocase) && name.find("#") != NPOS;
122 }
123
124
IsSNPName(const string & name)125 static bool IsSNPName(const string& name)
126 {
127 return IsPrimarySNPName(name) || IsExplicitSNPName(name);
128 }
129
130
CanProcessAnnotName(const string & name)131 bool CPSGS_OSGAnnot::CanProcessAnnotName(const string& name)
132 {
133 auto app = CPubseqGatewayApp::GetInstance();
134 auto& config = *app->GetOSGConnectionPool();
135 return
136 (config.GetEnabledCDD() && IsCDDName(name)) ||
137 (config.GetEnabledSNP() && IsSNPName(name));
138 }
139
140
CreateRequests()141 void CPSGS_OSGAnnot::CreateRequests()
142 {
143 auto& psg_req = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
144 CRef<CID2_Request> osg_req(new CID2_Request);
145 auto& req = osg_req->SetRequest().SetGet_blob_id();
146 SetSeqId(req.SetSeq_id().SetSeq_id().SetSeq_id(), psg_req.m_SeqIdType, psg_req.m_SeqId);
147 m_NamesToProcess.clear();
148 m_ApplyCDDFix = false;
149 for ( auto& name : GetNamesToProcess(psg_req, GetPriority()) ) {
150 m_NamesToProcess.insert(name);
151 if ( IsCDDName(name) ) {
152 // CDD are external annotations in OSG
153 req.SetExternal();
154 m_ApplyCDDFix = GetConnectionPool().GetCDDRetryTimeout() > 0;
155 }
156 else {
157 // others have named annot accession (source)
158 req.SetSources().push_back(name);
159 }
160 }
161 AddRequest(osg_req);
162 }
163
164
NotifyOSGCallStart()165 void CPSGS_OSGAnnot::NotifyOSGCallStart()
166 {
167 if ( m_ApplyCDDFix ) {
168 m_CDDReceived = false;
169 m_RequestTime.Restart();
170 }
171 }
172
173
NotifyOSGCallReply(const CID2_Reply & reply)174 void CPSGS_OSGAnnot::NotifyOSGCallReply(const CID2_Reply& reply)
175 {
176 if ( m_ApplyCDDFix ) {
177 if ( IsCDDReply(reply) ) {
178 m_CDDReceived = true;
179 }
180 }
181 }
182
183
NotifyOSGCallEnd()184 void CPSGS_OSGAnnot::NotifyOSGCallEnd()
185 {
186 if ( m_ApplyCDDFix ) {
187 if ( !m_CDDReceived &&
188 m_RequestTime.Elapsed() > GetConnectionPool().GetCDDRetryTimeout() ) {
189 NCBI_THROW(CPubseqGatewayException, eRequestCancelled, "no CDD due to OSG timeout");
190 }
191 }
192 }
193
194
ProcessReplies()195 void CPSGS_OSGAnnot::ProcessReplies()
196 {
197 for ( auto& f : GetFetches() ) {
198 if ( GetDebugLevel() >= eDebug_exchange ) {
199 LOG_POST(GetDiagSeverity() << "OSG: "
200 "Processing fetch: "<<MSerial_AsnText<<f->GetRequest());
201 }
202 for ( auto& r : f->GetReplies() ) {
203 if ( GetDebugLevel() >= eDebug_exchange ) {
204 LOG_POST(GetDiagSeverity() << "OSG: "
205 "Processing reply: "<<MSerial_AsnText<<*r);
206 }
207 switch ( r->GetReply().Which() ) {
208 case CID2_Reply::TReply::e_Init:
209 case CID2_Reply::TReply::e_Empty:
210 // do nothing
211 break;
212 case CID2_Reply::TReply::e_Get_seq_id:
213 // do nothing
214 break;
215 case CID2_Reply::TReply::e_Get_blob_id:
216 AddBlobId(r->GetReply().GetGet_blob_id());
217 break;
218 default:
219 ERR_POST(GetName()<<": "
220 "Unknown reply to "<<MSerial_AsnText<<*f->GetRequest()<<"\n"<<*r);
221 break;
222 }
223 }
224 }
225 SendReplies();
226 FinalizeResult(ePSGS_Found);
227 }
228
229
AddBlobId(const CID2_Reply_Get_Blob_Id & blob_id)230 void CPSGS_OSGAnnot::AddBlobId(const CID2_Reply_Get_Blob_Id& blob_id)
231 {
232 if ( !blob_id.IsSetBlob_id() ) {
233 return;
234 }
235 if ( !blob_id.IsSetAnnot_info() ) {
236 return;
237 }
238 m_BlobIds.push_back(Ref(&blob_id));
239 }
240
241
242 namespace {
243 struct SAnnotInfo {
244 // NA accession
245 string annot_name;
246 // annotated location
247 string accession;
248 int version;
249 int seq_id_type;
250 CRange<TSeqPos> range;
251 // annotation types
252 CJsonNode json;
253
Add__anonc0a29e200111::SAnnotInfo254 void Add(const CID2S_Seq_loc& loc) {
255 switch ( loc.Which() ) {
256 case CID2S_Seq_loc::e_Whole_gi:
257 Add(loc.GetWhole_gi());
258 break;
259 case CID2S_Seq_loc::e_Whole_seq_id:
260 Add(loc.GetWhole_seq_id());
261 break;
262 case CID2S_Seq_loc::e_Whole_gi_range:
263 Add(loc.GetWhole_gi_range());
264 break;
265 case CID2S_Seq_loc::e_Gi_interval:
266 Add(loc.GetGi_interval());
267 break;
268 case CID2S_Seq_loc::e_Seq_id_interval:
269 Add(loc.GetSeq_id_interval());
270 break;
271 case CID2S_Seq_loc::e_Gi_ints:
272 Add(loc.GetGi_ints());
273 break;
274 case CID2S_Seq_loc::e_Seq_id_ints:
275 Add(loc.GetSeq_id_ints());
276 break;
277 case CID2S_Seq_loc::e_Loc_set:
278 for ( auto& l : loc.GetLoc_set() ) {
279 Add(*l);
280 }
281 break;
282 default:
283 break;
284 }
285 }
Add__anonc0a29e200111::SAnnotInfo286 void Add(TGi gi) {
287 Add(gi, CRange<TSeqPos>::GetWhole());
288 }
Add__anonc0a29e200111::SAnnotInfo289 void Add(const CSeq_id& id) {
290 Add(id, CRange<TSeqPos>::GetWhole());
291 }
Add__anonc0a29e200111::SAnnotInfo292 void Add(TGi gi, TSeqPos start, TSeqPos length) {
293 Add(gi, COpenRange<TSeqPos>(start, start+length));
294 }
Add__anonc0a29e200111::SAnnotInfo295 void Add(const CSeq_id& id, TSeqPos start, TSeqPos length) {
296 Add(id, COpenRange<TSeqPos>(start, start+length));
297 }
Add__anonc0a29e200111::SAnnotInfo298 void Add(TGi gi, CRange<TSeqPos> range) {
299 Add(CSeq_id(CSeq_id::e_Gi, gi), range);
300 }
SetSeqId__anonc0a29e200111::SAnnotInfo301 void SetSeqId(const CSeq_id& id) {
302 string new_accession;
303 int new_version = 0;
304 int new_type = id.Which();
305 if ( auto text_id = id.GetTextseq_Id() ) {
306 if ( text_id->IsSetAccession() && text_id->IsSetVersion() ) {
307 new_accession = text_id->GetAccession();
308 new_version = text_id->GetVersion();
309 }
310 }
311 else {
312 id.GetLabel(&new_accession, CSeq_id::eFastaContent);
313 }
314 if ( accession.empty() ) {
315 accession = new_accession;
316 version = new_version;
317 seq_id_type = new_type;
318 }
319 else if ( accession != new_accession ||
320 version != new_version ||
321 seq_id_type != new_type ) {
322 ERR_POST("OSG-annot: multiple annotated Seq-ids");
323 throw runtime_error("");
324 }
325 }
Add__anonc0a29e200111::SAnnotInfo326 void Add(const CSeq_id& id, CRange<TSeqPos> add_range) {
327 SetSeqId(id);
328 range.CombineWith(add_range);
329 }
Add__anonc0a29e200111::SAnnotInfo330 void Add(const CID2S_Gi_Range& gi_range) {
331 for ( TIntId i = 0; i < gi_range.GetCount(); ++i ) {
332 Add(GI_FROM(TIntId, GI_TO(TIntId, gi_range.GetStart())+i));
333 }
334 }
Add__anonc0a29e200111::SAnnotInfo335 void Add(const CID2S_Gi_Interval& interval) {
336 Add(interval.GetGi(), interval.GetStart(), interval.GetLength());
337 }
Add__anonc0a29e200111::SAnnotInfo338 void Add(const CID2S_Seq_id_Interval& interval) {
339 Add(interval.GetSeq_id(), interval.GetStart(), interval.GetLength());
340 }
Add__anonc0a29e200111::SAnnotInfo341 void Add(const CID2S_Gi_Ints& ints) {
342 for ( auto& i : ints.GetInts() ) {
343 Add(ints.GetGi(), i->GetStart(), i->GetLength());
344 }
345 }
Add__anonc0a29e200111::SAnnotInfo346 void Add(const CID2S_Seq_id_Ints& ints) {
347 for ( auto& i : ints.GetInts() ) {
348 Add(ints.GetSeq_id(), i->GetStart(), i->GetLength());
349 }
350 }
351
SetAnnotName__anonc0a29e200111::SAnnotInfo352 void SetAnnotName(const string& name)
353 {
354 if ( annot_name.empty() ) {
355 annot_name = name;
356 }
357 else if ( annot_name != name ) {
358 ERR_POST("OSG-annot: multiple annot accessions: "<<annot_name<<" <> "<<name);
359 throw runtime_error("");
360 }
361 }
362
SAnnotInfo__anonc0a29e200111::SAnnotInfo363 SAnnotInfo(const list<CRef<CID2S_Seq_annot_Info>>& annot_infos)
364 : range(CRange<TSeqPos>::GetEmpty()),
365 json(CJsonNode::NewObjectNode())
366 {
367 vector<int64_t> zooms;
368 for ( auto& ai : annot_infos ) {
369 // collect location
370 if ( ai->IsSetSeq_loc() ) {
371 Add(ai->GetSeq_loc());
372 }
373
374 // collect name
375 auto& full_name = ai->GetName();
376 string acc;
377 SIZE_TYPE zoom_pos = full_name.find("@@");
378 if ( zoom_pos != NPOS ) {
379 SetAnnotName(full_name.substr(0, zoom_pos));
380 zooms.push_back(NStr::StringToInt(full_name.substr(zoom_pos+2)));
381 }
382 else {
383 SetAnnotName(full_name);
384 }
385
386 // collect types
387 if ( ai->IsSetAlign() ) {
388 CJsonNode type_json = CJsonNode::NewArrayNode();
389 type_json.AppendInteger(0);
390 json.SetByKey(to_string(CSeq_annot::C_Data::e_Align), type_json);
391 }
392 if ( ai->IsSetGraph() ) {
393 CJsonNode type_json = CJsonNode::NewArrayNode();
394 type_json.AppendInteger(0);
395 json.SetByKey(to_string(CSeq_annot::C_Data::e_Graph), type_json);
396 }
397 if ( ai->IsSetFeat() ) {
398 auto& types = ai->GetFeat();
399 if ( types.empty() ||
400 (types.size() == 1 &&
401 types.front()->GetType() == 0 &&
402 !types.front()->IsSetSubtypes()) ) {
403 CJsonNode type_json = CJsonNode::NewArrayNode();
404 type_json.AppendInteger(0);
405 json.SetByKey(to_string(CSeq_annot::C_Data::e_Seq_table), type_json);
406 }
407 else {
408 CJsonNode type_json = CJsonNode::NewObjectNode();
409 for ( auto& feat_type : types ) {
410 CJsonNode subtype_json = CJsonNode::NewArrayNode();
411 if ( feat_type->IsSetSubtypes() ) {
412 for ( auto feat_subtype : feat_type->GetSubtypes() ) {
413 subtype_json.AppendInteger(feat_subtype);
414 }
415 }
416 type_json.SetByKey(to_string(feat_type->GetType()), subtype_json);
417 }
418 json.SetByKey(to_string(CSeq_annot::C_Data::e_Ftable), type_json);
419 }
420 }
421 }
422
423 // add collected zoom levels
424 if ( !zooms.empty() ) {
425 CJsonNode zooms_json = CJsonNode::NewArrayNode();
426 for ( auto zoom : zooms ) {
427 zooms_json.AppendInteger(zoom);
428 }
429 json.SetByKey("2048", zooms_json);
430 }
431 }
432 };
433 }
434
435
SendReplies()436 void CPSGS_OSGAnnot::SendReplies()
437 {
438 if ( GetDebugLevel() >= eDebug_exchange ) {
439 for ( auto& name : m_NamesToProcess ) {
440 LOG_POST(GetDiagSeverity() << "OSG: "
441 "Asked for annot "<<name);
442 }
443 for ( auto& r : m_BlobIds ) {
444 LOG_POST(GetDiagSeverity() << "OSG: "
445 "Received annot reply "<<MSerial_AsnText<<*r);
446 }
447 }
448 auto& psg_req = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
449 for ( auto& r : m_BlobIds ) {
450 if ( !CPSGS_OSGGetBlobBase::IsOSGBlob(r->GetBlob_id()) ) {
451 continue;
452 }
453 string psg_blob_id = CPSGS_OSGGetBlobBase::GetPSGBlobId(r->GetBlob_id());
454 CJsonNode json(CJsonNode::NewObjectNode());
455 json.SetString("blob_id", psg_blob_id);
456 if ( r->GetBlob_id().IsSetVersion() ) {
457 json.SetInteger("last_modified", r->GetBlob_id().GetVersion()*60000);
458 }
459 string annot_name;
460 try {
461 SAnnotInfo info(r->GetAnnot_info());
462 annot_name = info.annot_name;
463 json.SetString("accession", info.accession);
464 json.SetInteger("version", info.version);
465 json.SetInteger("seq_id_type", info.seq_id_type);
466 if ( info.range != CRange<TSeqPos>::GetWhole() ) {
467 json.SetInteger("start", info.range.GetFrom());
468 json.SetInteger("stop", info.range.GetTo());
469 }
470 else {
471 // whole sequence
472 json.SetInteger("start", 0);
473 json.SetInteger("stop", 0);
474 }
475 json.SetString("annot_info", info.json.Repr(CJsonNode::fStandardJson));
476 }
477 catch ( exception& ) {
478 ERR_POST(GetName()<<": "
479 "Bad annot-info: "<<MSerial_AsnText<<*r);
480 // find default annot_name
481 for ( auto& ai : r->GetAnnot_info() ) {
482 if ( m_NamesToProcess.count(ai->GetName()) ) {
483 annot_name = ai->GetName();
484 break;
485 }
486 }
487 }
488 if ( r->IsSetAnnot_info() ) {
489 // set ASN.1 annot info
490 ostringstream str;
491 for ( auto& info : r->GetAnnot_info() ) {
492 str << MSerial_AsnBinary << *info;
493 }
494 json.SetString("seq_annot_info", NStr::Base64Encode(str.str(), 0));
495 }
496 if ( m_NamesToProcess.count(annot_name) ) {
497 GetReply()->PrepareNamedAnnotationData(annot_name, GetName(),
498 json.Repr(CJsonNode::fStandardJson));
499 }
500 //GetReply()->PrepareReplyCompletion();
501 }
502 // register processed names
503 for ( auto& name : m_NamesToProcess ) {
504 psg_req.RegisterProcessedName(GetPriority(), name);
505 }
506 }
507
508
IsCDDReply(const CID2_Reply & reply) const509 bool CPSGS_OSGAnnot::IsCDDReply(const CID2_Reply& reply) const
510 {
511 if ( !reply.GetReply().IsGet_blob_id() ) {
512 return false;
513 }
514
515 const CID2_Reply_Get_Blob_Id& blob_id = reply.GetReply().GetGet_blob_id();
516 if ( !blob_id.IsSetBlob_id() || !CPSGS_OSGGetBlobBase::IsOSGBlob(blob_id.GetBlob_id()) ) {
517 return false;
518 }
519
520 if ( !blob_id.IsSetAnnot_info() ) {
521 return false;
522 }
523 try {
524 SAnnotInfo info(blob_id.GetAnnot_info());
525 return IsCDDName(info.annot_name);
526 }
527 catch ( exception& /*ignored*/ ) {
528 return false;
529 }
530 }
531
532
533 END_NAMESPACE(osg);
534 END_NAMESPACE(psg);
535 END_NCBI_NAMESPACE;
536