1 /*  $Id: osg_getblob_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Eugene Vasilchenko
27  *
28  * File Description: processor for data from OSG
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "osg_getblob_base.hpp"
35 
36 #include <objects/id2/id2__.hpp>
37 #include <objects/seqsplit/seqsplit__.hpp>
38 #include <objtools/pubseq_gateway/impl/cassandra/bioseq_info/record.hpp>
39 #include "pubseq_gateway_convert_utils.hpp"
40 #include "id2info.hpp"
41 
42 BEGIN_NCBI_NAMESPACE;
43 BEGIN_NAMESPACE(psg);
44 BEGIN_NAMESPACE(osg);
45 
46 
CPSGS_OSGGetBlobBase()47 CPSGS_OSGGetBlobBase::CPSGS_OSGGetBlobBase()
48 {
49 }
50 
51 
~CPSGS_OSGGetBlobBase()52 CPSGS_OSGGetBlobBase::~CPSGS_OSGGetBlobBase()
53 {
54 }
55 
56 
x_SetSplitVersion(const CID2_Blob_Id & osg_blob_id,TID2SplitVersion split_version)57 void CPSGS_OSGGetBlobBase::x_SetSplitVersion(const CID2_Blob_Id& osg_blob_id,
58                                              TID2SplitVersion split_version)
59 {
60     m_PSGBlobId2SplitVersion[GetPSGBlobId(osg_blob_id)] = split_version;
61 }
62 
63 
64 CPSGS_OSGGetBlobBase::TID2SplitVersion
x_GetSplitVersion(const CID2_Blob_Id & osg_blob_id)65 CPSGS_OSGGetBlobBase::x_GetSplitVersion(const CID2_Blob_Id& osg_blob_id)
66 {
67     return m_PSGBlobId2SplitVersion[GetPSGBlobId(osg_blob_id)];
68 }
69 
70 
ProcessBlobReply(const CID2_Reply & reply)71 void CPSGS_OSGGetBlobBase::ProcessBlobReply(const CID2_Reply& reply)
72 {
73     switch ( reply.GetReply().Which() ) {
74     case CID2_Reply::TReply::e_Get_blob:
75         if ( IsOSGBlob(reply.GetReply().GetGet_blob().GetBlob_id()) ) {
76             if ( 0 && reply.GetReply().GetGet_blob().GetBlob_id().GetSat() == 8087 ) {
77                 ERR_POST("OSG: simulating CDD blob read failure");
78                 return;
79             }
80             if ( m_Blob ) {
81                 ERR_POST(GetName()<<": "
82                          "Duplicate blob reply: "<<MSerial_AsnText<<reply);
83             }
84             m_Blob = &reply.GetReply().GetGet_blob();
85         }
86         break;
87     case CID2_Reply::TReply::e_Get_split_info:
88         if ( IsOSGBlob(reply.GetReply().GetGet_split_info().GetBlob_id()) ) {
89             if ( 0 && reply.GetReply().GetGet_blob().GetBlob_id().GetSat() == 8087 ) {
90                 ERR_POST("OSG: simulating CDD blob read failure");
91                 return;
92             }
93             if ( m_SplitInfo ) {
94                 ERR_POST(GetName()<<": "
95                          "Duplicate blob reply: "<<MSerial_AsnText<<reply);
96             }
97             m_SplitInfo = &reply.GetReply().GetGet_split_info();
98         }
99         break;
100     case CID2_Reply::TReply::e_Get_chunk:
101         if ( IsOSGBlob(reply.GetReply().GetGet_chunk().GetBlob_id()) ) {
102             if ( m_Chunk ) {
103                 ERR_POST(GetName()<<": "
104                          "Duplicate blob reply: "<<MSerial_AsnText<<reply);
105             }
106             m_Chunk = &reply.GetReply().GetGet_chunk();
107         }
108         break;
109     default:
110         ERR_POST(GetName()<<": "
111                  "Invalid blob reply: "<<MSerial_AsnText<<reply);
112         break;
113     }
114 }
115 
116 
x_SetBlobVersion(CBlobRecord & blob_props,const CID2_Blob_Id & blob_id)117 void CPSGS_OSGGetBlobBase::x_SetBlobVersion(CBlobRecord& blob_props,
118                                             const CID2_Blob_Id& blob_id)
119 {
120     if ( blob_id.IsSetVersion() ) {
121         blob_props.SetModified(int64_t(blob_id.GetVersion())*60000);
122     }
123 }
124 
125 
x_SetBlobDataProps(CBlobRecord & blob_props,const CID2_Reply_Data & data)126 void CPSGS_OSGGetBlobBase::x_SetBlobDataProps(CBlobRecord& blob_props,
127                                               const CID2_Reply_Data& data)
128 {
129     if ( data.GetData_compression() == data.eData_compression_gzip ) {
130         blob_props.SetGzip(true);
131     }
132     blob_props.SetNChunks(data.GetData().size());
133 }
134 
135 
x_SetBlobState(CBlobRecord & blob_props,TID2BlobState blob_state)136 void CPSGS_OSGGetBlobBase::x_SetBlobState(CBlobRecord& blob_props,
137                                           TID2BlobState blob_state)
138 {
139     if ( blob_state & (1<<eID2_Blob_State_withdrawn) ) {
140         blob_props.SetWithdrawn(true);
141     }
142     if ( blob_state & ((1<<eID2_Blob_State_suppressed)|
143                        (1<<eID2_Blob_State_suppressed_temp)) ) {
144         blob_props.SetSuppress(true);
145     }
146     if ( blob_state & (1<<eID2_Blob_State_dead) ) {
147         blob_props.SetDead(true);
148     }
149 }
150 
151 
SendExcludedBlob(const string & psg_blob_id)152 void CPSGS_OSGGetBlobBase::SendExcludedBlob(const string& psg_blob_id)
153 {
154     size_t item_id = GetReply()->GetItemId();
155     if ( GetDebugLevel() >= eDebug_exchange ) {
156         LOG_POST(GetDiagSeverity() << "OSG: "
157                  "Sending blob excluded: "<<psg_blob_id);
158     }
159     GetReply()->PrepareBlobExcluded(item_id, GetName(), psg_blob_id, ePSGS_BlobExcluded);
160 }
161 
162 
x_SendBlobProps(const string & psg_blob_id,CBlobRecord & blob_props)163 void CPSGS_OSGGetBlobBase::x_SendBlobProps(const string& psg_blob_id,
164                                            CBlobRecord& blob_props)
165 {
166     size_t item_id = GetReply()->GetItemId();
167     string data_to_send = ToJson(blob_props).Repr(CJsonNode::fStandardJson);
168     if ( GetDebugLevel() >= eDebug_exchange ) {
169         LOG_POST(GetDiagSeverity() << "OSG: "
170                  "Sending blob_prop("<<psg_blob_id<<"): "<<data_to_send);
171     }
172     GetReply()->PrepareBlobPropData(item_id, GetName(), psg_blob_id, data_to_send);
173     GetReply()->PrepareBlobPropCompletion(item_id, GetName(), 2);
174 }
175 
176 
x_SendChunkBlobProps(const string & id2_info,TID2ChunkId chunk_id,CBlobRecord & blob_props)177 void CPSGS_OSGGetBlobBase::x_SendChunkBlobProps(const string& id2_info,
178                                                 TID2ChunkId chunk_id,
179                                                 CBlobRecord& blob_props)
180 {
181     size_t item_id = GetReply()->GetItemId();
182     string data_to_send = ToJson(blob_props).Repr(CJsonNode::fStandardJson);
183     if ( GetDebugLevel() >= eDebug_exchange ) {
184         LOG_POST(GetDiagSeverity() << "OSG: "
185                  "Sending chunk blob_prop("<<id2_info<<','<<chunk_id<<"): "<<data_to_send);
186     }
187     GetReply()->PrepareTSEBlobPropData(item_id, GetName(), chunk_id, id2_info, data_to_send);
188     GetReply()->PrepareBlobPropCompletion(item_id, GetName(), 2);
189 }
190 
191 
x_SendBlobData(const string & psg_blob_id,const CID2_Reply_Data & data)192 void CPSGS_OSGGetBlobBase::x_SendBlobData(const string& psg_blob_id,
193                                           const CID2_Reply_Data& data)
194 {
195     size_t item_id = GetReply()->GetItemId();
196     int chunk_no = 0;
197     for ( auto& chunk : data.GetData() ) {
198         GetReply()->PrepareBlobData(item_id, GetName(), psg_blob_id,
199                                     (const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
200     }
201     GetReply()->PrepareBlobCompletion(item_id, GetName(), chunk_no+1);
202 }
203 
204 
x_SendChunkBlobData(const string & id2_info,TID2ChunkId chunk_id,const CID2_Reply_Data & data)205 void CPSGS_OSGGetBlobBase::x_SendChunkBlobData(const string& id2_info,
206                                                TID2ChunkId chunk_id,
207                                                const CID2_Reply_Data& data)
208 {
209     size_t item_id = GetReply()->GetItemId();
210     int chunk_no = 0;
211     for ( auto& chunk : data.GetData() ) {
212         GetReply()->PrepareTSEBlobData(item_id, GetName(),
213                                        (const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
214                                        chunk_id, id2_info);
215     }
216     GetReply()->PrepareTSEBlobCompletion(item_id, GetName(), chunk_no+1);
217 }
218 
219 
x_GetSplitInfoPSGBlobId(const string & main_blob_id)220 string CPSGS_OSGGetBlobBase::x_GetSplitInfoPSGBlobId(const string& main_blob_id)
221 {
222     return main_blob_id + '.';
223 }
224 
225 
x_GetChunkPSGBlobId(const string & main_blob_id,TID2ChunkId chunk_id)226 string CPSGS_OSGGetBlobBase::x_GetChunkPSGBlobId(const string& main_blob_id,
227                                                  TID2ChunkId chunk_id)
228 {
229     return main_blob_id + '.' + to_string(chunk_id);
230 }
231 
232 
x_SendMainEntry(const CID2_Blob_Id & osg_blob_id,TID2BlobState blob_state,const CID2_Reply_Data & data)233 void CPSGS_OSGGetBlobBase::x_SendMainEntry(const CID2_Blob_Id& osg_blob_id,
234                                            TID2BlobState blob_state,
235                                            const CID2_Reply_Data& data)
236 {
237     string main_blob_id = GetPSGBlobId(osg_blob_id);
238 
239     CBlobRecord main_blob_props;
240     x_SetBlobVersion(main_blob_props, osg_blob_id);
241     x_SetBlobState(main_blob_props, blob_state);
242     x_SetBlobDataProps(main_blob_props, data);
243     x_SendBlobProps(main_blob_id, main_blob_props);
244     x_SendBlobData(main_blob_id, data);
245 }
246 
247 
x_SendSplitInfo(const CID2_Blob_Id & osg_blob_id,TID2BlobState blob_state,TID2SplitVersion split_version,const CID2_Reply_Data & data)248 void CPSGS_OSGGetBlobBase::x_SendSplitInfo(const CID2_Blob_Id& osg_blob_id,
249                                            TID2BlobState blob_state,
250                                            TID2SplitVersion split_version,
251                                            const CID2_Reply_Data& data)
252 {
253     // first send main blob props
254     string main_blob_id = GetPSGBlobId(osg_blob_id);
255     string id2_info = GetPSGId2Info(osg_blob_id, split_version);
256 
257     CBlobRecord main_blob_props;
258     x_SetBlobVersion(main_blob_props, osg_blob_id);
259     x_SetBlobState(main_blob_props, blob_state);
260     main_blob_props.SetId2Info(id2_info);
261     x_SendBlobProps(main_blob_id, main_blob_props);
262 
263     CBlobRecord split_info_blob_props;
264     x_SetBlobDataProps(split_info_blob_props, data);
265     x_SendChunkBlobProps(id2_info, kSplitInfoChunk, split_info_blob_props);
266     x_SendChunkBlobData(id2_info, kSplitInfoChunk, data);
267 }
268 
269 
x_SendChunk(const CID2_Blob_Id & osg_blob_id,TID2ChunkId chunk_id,const CID2_Reply_Data & data)270 void CPSGS_OSGGetBlobBase::x_SendChunk(const CID2_Blob_Id& osg_blob_id,
271                                        TID2ChunkId chunk_id,
272                                        const CID2_Reply_Data& data)
273 {
274     string id2_info = GetPSGId2Info(osg_blob_id, x_GetSplitVersion(osg_blob_id));
275 
276     CBlobRecord chunk_blob_props;
277     x_SetBlobDataProps(chunk_blob_props, data);
278     x_SendChunkBlobProps(id2_info, chunk_id, chunk_blob_props);
279     x_SendChunkBlobData(id2_info, chunk_id, data);
280 }
281 
282 
HasBlob() const283 bool CPSGS_OSGGetBlobBase::HasBlob() const
284 {
285     if ( m_SplitInfo ) {
286         if ( !m_SplitInfo->IsSetData() ) {
287             return false;
288         }
289         if ( m_Blob ) {
290             return m_Blob->IsSetData();
291         }
292     }
293     else {
294         if ( m_Blob ) {
295             return m_Blob->IsSetData();
296         }
297     }
298     return false;
299 }
300 
301 
SendBlob()302 void CPSGS_OSGGetBlobBase::SendBlob()
303 {
304     if ( m_SplitInfo && !m_Blob ) {
305         // split_info with blob inside
306         x_SetSplitVersion(m_SplitInfo->GetBlob_id(), m_SplitInfo->GetSplit_version());
307         x_SendSplitInfo(m_SplitInfo->GetBlob_id(),
308                         x_GetBlobState(*m_SplitInfo),
309                         m_SplitInfo->GetSplit_version(),
310                         m_SplitInfo->GetData());
311         SetFinalStatus(ePSGS_Found);
312     }
313     else if ( m_Blob && !m_SplitInfo && m_Blob->IsSetData() &&
314               m_Blob->GetData().GetData_type() == CID2_Reply_Data::eData_type_id2s_split_info ) {
315         // split_info with blob inside in a get-blob reply
316         // TODO: really???
317         x_SendSplitInfo(m_Blob->GetBlob_id(),
318                         x_GetBlobState(*m_Blob),
319                         0,
320                         m_Blob->GetData());
321         SetFinalStatus(ePSGS_Found);
322     }
323     else if ( m_Blob && !m_SplitInfo && m_Blob->IsSetData() ) {
324         // blob only
325         x_SendMainEntry(m_Blob->GetBlob_id(),
326                         x_GetBlobState(*m_Blob),
327                         m_Blob->GetData());
328         SetFinalStatus(ePSGS_Found);
329     }
330     else if ( m_Blob && !m_SplitInfo && !m_Blob->IsSetData() ) {
331         // no blob reply TODO
332         SetFinalStatus(ePSGS_NotFound);
333     }
334     else if ( m_Blob && m_SplitInfo ) {
335         // separate blob and m_SplitInfo TODO
336         SetFinalStatus(ePSGS_NotFound);
337     }
338     else if ( m_Chunk ) {
339         x_SendChunk(m_Chunk->GetBlob_id(),
340                     m_Chunk->GetChunk_id(),
341                     m_Chunk->GetData());
342         SetFinalStatus(ePSGS_Found);
343     }
344     else {
345         // nothing TODO
346         SetFinalStatus(ePSGS_NotFound);
347     }
348 }
349 
350 
351 /////////////////////////////////////////////////////////////////////////////
352 // Blob id parsing methods
353 /////////////////////////////////////////////////////////////////////////////
354 
355 
356 static const char kSubSatSeparator = '/';
357 static const int kOSG_Sat_WGS_min = 1000;
358 static const int kOSG_Sat_WGS_max = 1130;
359 static const int kOSG_Sat_SNP_min = 2001;
360 static const int kOSG_Sat_SNP_max = 3999;
361 static const int kOSG_Sat_CDD_min = 8087;
362 static const int kOSG_Sat_CDD_max = 8088;
363 //static const int kOSG_Sat_NAGraph_min = 8000;
364 //static const int kOSG_Sat_NAGraph_max = 8000;
365 
366 
s_IsOSGBlob(Int4 sat,Int4,Int4)367 static bool s_IsOSGBlob(Int4 sat, Int4 /*subsat*/, Int4 /*satkey*/)
368 {
369     if ( sat >= kOSG_Sat_WGS_min &&
370          sat <= kOSG_Sat_WGS_max ) {
371         return true;
372     }
373     if ( sat >= kOSG_Sat_SNP_min &&
374          sat <= kOSG_Sat_SNP_max ) {
375         return true;
376     }
377     if ( sat >= kOSG_Sat_CDD_min &&
378          sat <= kOSG_Sat_CDD_max ) {
379         return true;
380     }
381     /*
382     if ( sat >= kOSG_Sat_NAGraph_min &&
383          sat <= kOSG_Sat_NAGraph_max ) {
384         return true;
385     }
386     */
387     return false;
388 }
389 
390 
s_IsCDDBlob(Int4 sat,Int4,Int4)391 static bool s_IsCDDBlob(Int4 sat, Int4 /*subsat*/, Int4 /*satkey*/)
392 {
393     if ( sat >= kOSG_Sat_CDD_min &&
394          sat <= kOSG_Sat_CDD_max ) {
395         return true;
396     }
397     return false;
398 }
399 
400 
IsOSGBlob(const CID2_Blob_Id & blob_id)401 bool CPSGS_OSGGetBlobBase::IsOSGBlob(const CID2_Blob_Id& blob_id)
402 {
403     return s_IsOSGBlob(blob_id.GetSat(), blob_id.GetSub_sat(), blob_id.GetSat_key());
404 }
405 
406 
IsCDDBlob(const CID2_Blob_Id & blob_id)407 bool CPSGS_OSGGetBlobBase::IsCDDBlob(const CID2_Blob_Id& blob_id)
408 {
409     return s_IsCDDBlob(blob_id.GetSat(), blob_id.GetSub_sat(), blob_id.GetSat_key());
410 }
411 
412 
s_Skip(CTempString & str,char c)413 static bool s_Skip(CTempString& str, char c)
414 {
415     if ( str.empty() || str[0] != c ) {
416         return false;
417     }
418     str = str.substr(1);
419     return true;
420 }
421 
422 
s_IsValidIntChar(char c)423 static inline bool s_IsValidIntChar(char c)
424 {
425     return c == '-' || (c >= '0' && c <= '9');
426 }
427 
428 
429 template<class Int>
s_ParseInt(CTempString & str,Int & v)430 static bool s_ParseInt(CTempString& str, Int& v)
431 {
432     size_t int_size = 0;
433     while ( int_size < str.size() && s_IsValidIntChar(str[int_size]) ) {
434         ++int_size;
435     }
436     if ( !NStr::StringToNumeric(str.substr(0, int_size), &v,
437                                 NStr::fConvErr_NoThrow|NStr::fConvErr_NoErrMessage) ) {
438         return false;
439     }
440     str = str.substr(int_size);
441     return true;
442 }
443 
444 
s_ParseOSGBlob(CTempString & s,Int4 & sat,Int4 & subsat,Int4 & satkey)445 static bool s_ParseOSGBlob(CTempString& s,
446                            Int4& sat, Int4& subsat, Int4& satkey)
447 {
448     if ( s.find(kSubSatSeparator) == NPOS ) {
449         return false;
450     }
451     if ( !s_ParseInt(s, sat) ) {
452         return false;
453     }
454     if ( !s_Skip(s, kSubSatSeparator) ) {
455         return false;
456     }
457     if ( !s_ParseInt(s, subsat) ) {
458         return false;
459     }
460     if ( !s_Skip(s, '.') ) {
461         return false;
462     }
463     if ( !s_ParseInt(s, satkey) ) {
464         return false;
465     }
466     return s_IsOSGBlob(sat, subsat, satkey);
467 }
468 
469 
s_FormatBlobId(ostream & s,const CID2_Blob_Id & blob_id)470 static void s_FormatBlobId(ostream& s, const CID2_Blob_Id& blob_id)
471 {
472     s << blob_id.GetSat()
473       << kSubSatSeparator << blob_id.GetSub_sat()
474       << '.' << blob_id.GetSat_key();
475 }
476 
477 
ParsePSGBlobId(const SPSGS_BlobId & blob_id)478 CRef<CID2_Blob_Id> CPSGS_OSGGetBlobBase::ParsePSGBlobId(const SPSGS_BlobId& blob_id)
479 {
480     Int4 sat;
481     Int4 subsat;
482     Int4 satkey;
483     auto id_str = blob_id.GetId();
484     CTempString s = id_str;
485     if ( !s_ParseOSGBlob(s, sat, subsat, satkey) || !s.empty() ) {
486         return null;
487     }
488     CRef<CID2_Blob_Id> id(new CID2_Blob_Id);
489     id->SetSat(sat);
490     id->SetSub_sat(subsat);
491     id->SetSat_key(satkey);
492     return id;
493 }
494 
495 
GetPSGBlobId(const CID2_Blob_Id & blob_id)496 string CPSGS_OSGGetBlobBase::GetPSGBlobId(const CID2_Blob_Id& blob_id)
497 {
498     ostringstream s;
499     if ( IsOSGBlob(blob_id) ) {
500         s_FormatBlobId(s, blob_id);
501     }
502     return s.str();
503 }
504 
505 
GetPSGId2Info(const CID2_Blob_Id & tse_id,TID2SplitVersion split_version)506 string CPSGS_OSGGetBlobBase::GetPSGId2Info(const CID2_Blob_Id& tse_id,
507                                            TID2SplitVersion split_version)
508 {
509     ostringstream s;
510     if ( IsOSGBlob(tse_id) ) {
511         s_FormatBlobId(s, tse_id);
512         TID2BlobVersion blob_version = tse_id.IsSetVersion()? tse_id.GetVersion(): 0;
513         s << '.' << blob_version << '.' << split_version;
514     }
515     return s.str();
516 }
517 
518 
519 CPSGS_OSGGetBlobBase::SParsedId2Info
ParsePSGId2Info(const string & id2_info)520 CPSGS_OSGGetBlobBase::ParsePSGId2Info(const string& id2_info)
521 {
522     Int4 sat;
523     Int4 subsat;
524     Int4 satkey;
525     TID2BlobVersion tse_version;
526     TID2SplitVersion split_version;
527 
528     CTempString s = id2_info;
529     if ( !s_ParseOSGBlob(s, sat, subsat, satkey) ||
530          !s_Skip(s, '.') ||
531          !s_ParseInt(s, tse_version) ||
532          !s_Skip(s, '.') ||
533          !s_ParseInt(s, split_version) ||
534          !s.empty() ) {
535         return SParsedId2Info{};
536     }
537 
538     CRef<CID2_Blob_Id> id(new CID2_Blob_Id);
539     id->SetSat(sat);
540     id->SetSub_sat(subsat);
541     id->SetSat_key(satkey);
542     id->SetVersion(tse_version);
543     return SParsedId2Info{id, split_version};
544 }
545 
546 
547 END_NAMESPACE(osg);
548 END_NAMESPACE(psg);
549 END_NCBI_NAMESPACE;
550