1 /* $Id: osg_getblob_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Eugene Vasilchenko
27 *
28 * File Description: processor for data from OSG
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "osg_getblob_base.hpp"
35
36 #include <objects/id2/id2__.hpp>
37 #include <objects/seqsplit/seqsplit__.hpp>
38 #include <objtools/pubseq_gateway/impl/cassandra/bioseq_info/record.hpp>
39 #include "pubseq_gateway_convert_utils.hpp"
40 #include "id2info.hpp"
41
42 BEGIN_NCBI_NAMESPACE;
43 BEGIN_NAMESPACE(psg);
44 BEGIN_NAMESPACE(osg);
45
46
CPSGS_OSGGetBlobBase()47 CPSGS_OSGGetBlobBase::CPSGS_OSGGetBlobBase()
48 {
49 }
50
51
~CPSGS_OSGGetBlobBase()52 CPSGS_OSGGetBlobBase::~CPSGS_OSGGetBlobBase()
53 {
54 }
55
56
x_SetSplitVersion(const CID2_Blob_Id & osg_blob_id,TID2SplitVersion split_version)57 void CPSGS_OSGGetBlobBase::x_SetSplitVersion(const CID2_Blob_Id& osg_blob_id,
58 TID2SplitVersion split_version)
59 {
60 m_PSGBlobId2SplitVersion[GetPSGBlobId(osg_blob_id)] = split_version;
61 }
62
63
64 CPSGS_OSGGetBlobBase::TID2SplitVersion
x_GetSplitVersion(const CID2_Blob_Id & osg_blob_id)65 CPSGS_OSGGetBlobBase::x_GetSplitVersion(const CID2_Blob_Id& osg_blob_id)
66 {
67 return m_PSGBlobId2SplitVersion[GetPSGBlobId(osg_blob_id)];
68 }
69
70
ProcessBlobReply(const CID2_Reply & reply)71 void CPSGS_OSGGetBlobBase::ProcessBlobReply(const CID2_Reply& reply)
72 {
73 switch ( reply.GetReply().Which() ) {
74 case CID2_Reply::TReply::e_Get_blob:
75 if ( IsOSGBlob(reply.GetReply().GetGet_blob().GetBlob_id()) ) {
76 if ( 0 && reply.GetReply().GetGet_blob().GetBlob_id().GetSat() == 8087 ) {
77 ERR_POST("OSG: simulating CDD blob read failure");
78 return;
79 }
80 if ( m_Blob ) {
81 ERR_POST(GetName()<<": "
82 "Duplicate blob reply: "<<MSerial_AsnText<<reply);
83 }
84 m_Blob = &reply.GetReply().GetGet_blob();
85 }
86 break;
87 case CID2_Reply::TReply::e_Get_split_info:
88 if ( IsOSGBlob(reply.GetReply().GetGet_split_info().GetBlob_id()) ) {
89 if ( 0 && reply.GetReply().GetGet_blob().GetBlob_id().GetSat() == 8087 ) {
90 ERR_POST("OSG: simulating CDD blob read failure");
91 return;
92 }
93 if ( m_SplitInfo ) {
94 ERR_POST(GetName()<<": "
95 "Duplicate blob reply: "<<MSerial_AsnText<<reply);
96 }
97 m_SplitInfo = &reply.GetReply().GetGet_split_info();
98 }
99 break;
100 case CID2_Reply::TReply::e_Get_chunk:
101 if ( IsOSGBlob(reply.GetReply().GetGet_chunk().GetBlob_id()) ) {
102 if ( m_Chunk ) {
103 ERR_POST(GetName()<<": "
104 "Duplicate blob reply: "<<MSerial_AsnText<<reply);
105 }
106 m_Chunk = &reply.GetReply().GetGet_chunk();
107 }
108 break;
109 default:
110 ERR_POST(GetName()<<": "
111 "Invalid blob reply: "<<MSerial_AsnText<<reply);
112 break;
113 }
114 }
115
116
x_SetBlobVersion(CBlobRecord & blob_props,const CID2_Blob_Id & blob_id)117 void CPSGS_OSGGetBlobBase::x_SetBlobVersion(CBlobRecord& blob_props,
118 const CID2_Blob_Id& blob_id)
119 {
120 if ( blob_id.IsSetVersion() ) {
121 blob_props.SetModified(int64_t(blob_id.GetVersion())*60000);
122 }
123 }
124
125
x_SetBlobDataProps(CBlobRecord & blob_props,const CID2_Reply_Data & data)126 void CPSGS_OSGGetBlobBase::x_SetBlobDataProps(CBlobRecord& blob_props,
127 const CID2_Reply_Data& data)
128 {
129 if ( data.GetData_compression() == data.eData_compression_gzip ) {
130 blob_props.SetGzip(true);
131 }
132 blob_props.SetNChunks(data.GetData().size());
133 }
134
135
x_SetBlobState(CBlobRecord & blob_props,TID2BlobState blob_state)136 void CPSGS_OSGGetBlobBase::x_SetBlobState(CBlobRecord& blob_props,
137 TID2BlobState blob_state)
138 {
139 if ( blob_state & (1<<eID2_Blob_State_withdrawn) ) {
140 blob_props.SetWithdrawn(true);
141 }
142 if ( blob_state & ((1<<eID2_Blob_State_suppressed)|
143 (1<<eID2_Blob_State_suppressed_temp)) ) {
144 blob_props.SetSuppress(true);
145 }
146 if ( blob_state & (1<<eID2_Blob_State_dead) ) {
147 blob_props.SetDead(true);
148 }
149 }
150
151
SendExcludedBlob(const string & psg_blob_id)152 void CPSGS_OSGGetBlobBase::SendExcludedBlob(const string& psg_blob_id)
153 {
154 size_t item_id = GetReply()->GetItemId();
155 if ( GetDebugLevel() >= eDebug_exchange ) {
156 LOG_POST(GetDiagSeverity() << "OSG: "
157 "Sending blob excluded: "<<psg_blob_id);
158 }
159 GetReply()->PrepareBlobExcluded(item_id, GetName(), psg_blob_id, ePSGS_BlobExcluded);
160 }
161
162
x_SendBlobProps(const string & psg_blob_id,CBlobRecord & blob_props)163 void CPSGS_OSGGetBlobBase::x_SendBlobProps(const string& psg_blob_id,
164 CBlobRecord& blob_props)
165 {
166 size_t item_id = GetReply()->GetItemId();
167 string data_to_send = ToJson(blob_props).Repr(CJsonNode::fStandardJson);
168 if ( GetDebugLevel() >= eDebug_exchange ) {
169 LOG_POST(GetDiagSeverity() << "OSG: "
170 "Sending blob_prop("<<psg_blob_id<<"): "<<data_to_send);
171 }
172 GetReply()->PrepareBlobPropData(item_id, GetName(), psg_blob_id, data_to_send);
173 GetReply()->PrepareBlobPropCompletion(item_id, GetName(), 2);
174 }
175
176
x_SendChunkBlobProps(const string & id2_info,TID2ChunkId chunk_id,CBlobRecord & blob_props)177 void CPSGS_OSGGetBlobBase::x_SendChunkBlobProps(const string& id2_info,
178 TID2ChunkId chunk_id,
179 CBlobRecord& blob_props)
180 {
181 size_t item_id = GetReply()->GetItemId();
182 string data_to_send = ToJson(blob_props).Repr(CJsonNode::fStandardJson);
183 if ( GetDebugLevel() >= eDebug_exchange ) {
184 LOG_POST(GetDiagSeverity() << "OSG: "
185 "Sending chunk blob_prop("<<id2_info<<','<<chunk_id<<"): "<<data_to_send);
186 }
187 GetReply()->PrepareTSEBlobPropData(item_id, GetName(), chunk_id, id2_info, data_to_send);
188 GetReply()->PrepareBlobPropCompletion(item_id, GetName(), 2);
189 }
190
191
x_SendBlobData(const string & psg_blob_id,const CID2_Reply_Data & data)192 void CPSGS_OSGGetBlobBase::x_SendBlobData(const string& psg_blob_id,
193 const CID2_Reply_Data& data)
194 {
195 size_t item_id = GetReply()->GetItemId();
196 int chunk_no = 0;
197 for ( auto& chunk : data.GetData() ) {
198 GetReply()->PrepareBlobData(item_id, GetName(), psg_blob_id,
199 (const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
200 }
201 GetReply()->PrepareBlobCompletion(item_id, GetName(), chunk_no+1);
202 }
203
204
x_SendChunkBlobData(const string & id2_info,TID2ChunkId chunk_id,const CID2_Reply_Data & data)205 void CPSGS_OSGGetBlobBase::x_SendChunkBlobData(const string& id2_info,
206 TID2ChunkId chunk_id,
207 const CID2_Reply_Data& data)
208 {
209 size_t item_id = GetReply()->GetItemId();
210 int chunk_no = 0;
211 for ( auto& chunk : data.GetData() ) {
212 GetReply()->PrepareTSEBlobData(item_id, GetName(),
213 (const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
214 chunk_id, id2_info);
215 }
216 GetReply()->PrepareTSEBlobCompletion(item_id, GetName(), chunk_no+1);
217 }
218
219
x_GetSplitInfoPSGBlobId(const string & main_blob_id)220 string CPSGS_OSGGetBlobBase::x_GetSplitInfoPSGBlobId(const string& main_blob_id)
221 {
222 return main_blob_id + '.';
223 }
224
225
x_GetChunkPSGBlobId(const string & main_blob_id,TID2ChunkId chunk_id)226 string CPSGS_OSGGetBlobBase::x_GetChunkPSGBlobId(const string& main_blob_id,
227 TID2ChunkId chunk_id)
228 {
229 return main_blob_id + '.' + to_string(chunk_id);
230 }
231
232
x_SendMainEntry(const CID2_Blob_Id & osg_blob_id,TID2BlobState blob_state,const CID2_Reply_Data & data)233 void CPSGS_OSGGetBlobBase::x_SendMainEntry(const CID2_Blob_Id& osg_blob_id,
234 TID2BlobState blob_state,
235 const CID2_Reply_Data& data)
236 {
237 string main_blob_id = GetPSGBlobId(osg_blob_id);
238
239 CBlobRecord main_blob_props;
240 x_SetBlobVersion(main_blob_props, osg_blob_id);
241 x_SetBlobState(main_blob_props, blob_state);
242 x_SetBlobDataProps(main_blob_props, data);
243 x_SendBlobProps(main_blob_id, main_blob_props);
244 x_SendBlobData(main_blob_id, data);
245 }
246
247
x_SendSplitInfo(const CID2_Blob_Id & osg_blob_id,TID2BlobState blob_state,TID2SplitVersion split_version,const CID2_Reply_Data & data)248 void CPSGS_OSGGetBlobBase::x_SendSplitInfo(const CID2_Blob_Id& osg_blob_id,
249 TID2BlobState blob_state,
250 TID2SplitVersion split_version,
251 const CID2_Reply_Data& data)
252 {
253 // first send main blob props
254 string main_blob_id = GetPSGBlobId(osg_blob_id);
255 string id2_info = GetPSGId2Info(osg_blob_id, split_version);
256
257 CBlobRecord main_blob_props;
258 x_SetBlobVersion(main_blob_props, osg_blob_id);
259 x_SetBlobState(main_blob_props, blob_state);
260 main_blob_props.SetId2Info(id2_info);
261 x_SendBlobProps(main_blob_id, main_blob_props);
262
263 CBlobRecord split_info_blob_props;
264 x_SetBlobDataProps(split_info_blob_props, data);
265 x_SendChunkBlobProps(id2_info, kSplitInfoChunk, split_info_blob_props);
266 x_SendChunkBlobData(id2_info, kSplitInfoChunk, data);
267 }
268
269
x_SendChunk(const CID2_Blob_Id & osg_blob_id,TID2ChunkId chunk_id,const CID2_Reply_Data & data)270 void CPSGS_OSGGetBlobBase::x_SendChunk(const CID2_Blob_Id& osg_blob_id,
271 TID2ChunkId chunk_id,
272 const CID2_Reply_Data& data)
273 {
274 string id2_info = GetPSGId2Info(osg_blob_id, x_GetSplitVersion(osg_blob_id));
275
276 CBlobRecord chunk_blob_props;
277 x_SetBlobDataProps(chunk_blob_props, data);
278 x_SendChunkBlobProps(id2_info, chunk_id, chunk_blob_props);
279 x_SendChunkBlobData(id2_info, chunk_id, data);
280 }
281
282
HasBlob() const283 bool CPSGS_OSGGetBlobBase::HasBlob() const
284 {
285 if ( m_SplitInfo ) {
286 if ( !m_SplitInfo->IsSetData() ) {
287 return false;
288 }
289 if ( m_Blob ) {
290 return m_Blob->IsSetData();
291 }
292 }
293 else {
294 if ( m_Blob ) {
295 return m_Blob->IsSetData();
296 }
297 }
298 return false;
299 }
300
301
SendBlob()302 void CPSGS_OSGGetBlobBase::SendBlob()
303 {
304 if ( m_SplitInfo && !m_Blob ) {
305 // split_info with blob inside
306 x_SetSplitVersion(m_SplitInfo->GetBlob_id(), m_SplitInfo->GetSplit_version());
307 x_SendSplitInfo(m_SplitInfo->GetBlob_id(),
308 x_GetBlobState(*m_SplitInfo),
309 m_SplitInfo->GetSplit_version(),
310 m_SplitInfo->GetData());
311 SetFinalStatus(ePSGS_Found);
312 }
313 else if ( m_Blob && !m_SplitInfo && m_Blob->IsSetData() &&
314 m_Blob->GetData().GetData_type() == CID2_Reply_Data::eData_type_id2s_split_info ) {
315 // split_info with blob inside in a get-blob reply
316 // TODO: really???
317 x_SendSplitInfo(m_Blob->GetBlob_id(),
318 x_GetBlobState(*m_Blob),
319 0,
320 m_Blob->GetData());
321 SetFinalStatus(ePSGS_Found);
322 }
323 else if ( m_Blob && !m_SplitInfo && m_Blob->IsSetData() ) {
324 // blob only
325 x_SendMainEntry(m_Blob->GetBlob_id(),
326 x_GetBlobState(*m_Blob),
327 m_Blob->GetData());
328 SetFinalStatus(ePSGS_Found);
329 }
330 else if ( m_Blob && !m_SplitInfo && !m_Blob->IsSetData() ) {
331 // no blob reply TODO
332 SetFinalStatus(ePSGS_NotFound);
333 }
334 else if ( m_Blob && m_SplitInfo ) {
335 // separate blob and m_SplitInfo TODO
336 SetFinalStatus(ePSGS_NotFound);
337 }
338 else if ( m_Chunk ) {
339 x_SendChunk(m_Chunk->GetBlob_id(),
340 m_Chunk->GetChunk_id(),
341 m_Chunk->GetData());
342 SetFinalStatus(ePSGS_Found);
343 }
344 else {
345 // nothing TODO
346 SetFinalStatus(ePSGS_NotFound);
347 }
348 }
349
350
351 /////////////////////////////////////////////////////////////////////////////
352 // Blob id parsing methods
353 /////////////////////////////////////////////////////////////////////////////
354
355
356 static const char kSubSatSeparator = '/';
357 static const int kOSG_Sat_WGS_min = 1000;
358 static const int kOSG_Sat_WGS_max = 1130;
359 static const int kOSG_Sat_SNP_min = 2001;
360 static const int kOSG_Sat_SNP_max = 3999;
361 static const int kOSG_Sat_CDD_min = 8087;
362 static const int kOSG_Sat_CDD_max = 8088;
363 //static const int kOSG_Sat_NAGraph_min = 8000;
364 //static const int kOSG_Sat_NAGraph_max = 8000;
365
366
s_IsOSGBlob(Int4 sat,Int4,Int4)367 static bool s_IsOSGBlob(Int4 sat, Int4 /*subsat*/, Int4 /*satkey*/)
368 {
369 if ( sat >= kOSG_Sat_WGS_min &&
370 sat <= kOSG_Sat_WGS_max ) {
371 return true;
372 }
373 if ( sat >= kOSG_Sat_SNP_min &&
374 sat <= kOSG_Sat_SNP_max ) {
375 return true;
376 }
377 if ( sat >= kOSG_Sat_CDD_min &&
378 sat <= kOSG_Sat_CDD_max ) {
379 return true;
380 }
381 /*
382 if ( sat >= kOSG_Sat_NAGraph_min &&
383 sat <= kOSG_Sat_NAGraph_max ) {
384 return true;
385 }
386 */
387 return false;
388 }
389
390
s_IsCDDBlob(Int4 sat,Int4,Int4)391 static bool s_IsCDDBlob(Int4 sat, Int4 /*subsat*/, Int4 /*satkey*/)
392 {
393 if ( sat >= kOSG_Sat_CDD_min &&
394 sat <= kOSG_Sat_CDD_max ) {
395 return true;
396 }
397 return false;
398 }
399
400
IsOSGBlob(const CID2_Blob_Id & blob_id)401 bool CPSGS_OSGGetBlobBase::IsOSGBlob(const CID2_Blob_Id& blob_id)
402 {
403 return s_IsOSGBlob(blob_id.GetSat(), blob_id.GetSub_sat(), blob_id.GetSat_key());
404 }
405
406
IsCDDBlob(const CID2_Blob_Id & blob_id)407 bool CPSGS_OSGGetBlobBase::IsCDDBlob(const CID2_Blob_Id& blob_id)
408 {
409 return s_IsCDDBlob(blob_id.GetSat(), blob_id.GetSub_sat(), blob_id.GetSat_key());
410 }
411
412
s_Skip(CTempString & str,char c)413 static bool s_Skip(CTempString& str, char c)
414 {
415 if ( str.empty() || str[0] != c ) {
416 return false;
417 }
418 str = str.substr(1);
419 return true;
420 }
421
422
s_IsValidIntChar(char c)423 static inline bool s_IsValidIntChar(char c)
424 {
425 return c == '-' || (c >= '0' && c <= '9');
426 }
427
428
429 template<class Int>
s_ParseInt(CTempString & str,Int & v)430 static bool s_ParseInt(CTempString& str, Int& v)
431 {
432 size_t int_size = 0;
433 while ( int_size < str.size() && s_IsValidIntChar(str[int_size]) ) {
434 ++int_size;
435 }
436 if ( !NStr::StringToNumeric(str.substr(0, int_size), &v,
437 NStr::fConvErr_NoThrow|NStr::fConvErr_NoErrMessage) ) {
438 return false;
439 }
440 str = str.substr(int_size);
441 return true;
442 }
443
444
s_ParseOSGBlob(CTempString & s,Int4 & sat,Int4 & subsat,Int4 & satkey)445 static bool s_ParseOSGBlob(CTempString& s,
446 Int4& sat, Int4& subsat, Int4& satkey)
447 {
448 if ( s.find(kSubSatSeparator) == NPOS ) {
449 return false;
450 }
451 if ( !s_ParseInt(s, sat) ) {
452 return false;
453 }
454 if ( !s_Skip(s, kSubSatSeparator) ) {
455 return false;
456 }
457 if ( !s_ParseInt(s, subsat) ) {
458 return false;
459 }
460 if ( !s_Skip(s, '.') ) {
461 return false;
462 }
463 if ( !s_ParseInt(s, satkey) ) {
464 return false;
465 }
466 return s_IsOSGBlob(sat, subsat, satkey);
467 }
468
469
s_FormatBlobId(ostream & s,const CID2_Blob_Id & blob_id)470 static void s_FormatBlobId(ostream& s, const CID2_Blob_Id& blob_id)
471 {
472 s << blob_id.GetSat()
473 << kSubSatSeparator << blob_id.GetSub_sat()
474 << '.' << blob_id.GetSat_key();
475 }
476
477
ParsePSGBlobId(const SPSGS_BlobId & blob_id)478 CRef<CID2_Blob_Id> CPSGS_OSGGetBlobBase::ParsePSGBlobId(const SPSGS_BlobId& blob_id)
479 {
480 Int4 sat;
481 Int4 subsat;
482 Int4 satkey;
483 auto id_str = blob_id.GetId();
484 CTempString s = id_str;
485 if ( !s_ParseOSGBlob(s, sat, subsat, satkey) || !s.empty() ) {
486 return null;
487 }
488 CRef<CID2_Blob_Id> id(new CID2_Blob_Id);
489 id->SetSat(sat);
490 id->SetSub_sat(subsat);
491 id->SetSat_key(satkey);
492 return id;
493 }
494
495
GetPSGBlobId(const CID2_Blob_Id & blob_id)496 string CPSGS_OSGGetBlobBase::GetPSGBlobId(const CID2_Blob_Id& blob_id)
497 {
498 ostringstream s;
499 if ( IsOSGBlob(blob_id) ) {
500 s_FormatBlobId(s, blob_id);
501 }
502 return s.str();
503 }
504
505
GetPSGId2Info(const CID2_Blob_Id & tse_id,TID2SplitVersion split_version)506 string CPSGS_OSGGetBlobBase::GetPSGId2Info(const CID2_Blob_Id& tse_id,
507 TID2SplitVersion split_version)
508 {
509 ostringstream s;
510 if ( IsOSGBlob(tse_id) ) {
511 s_FormatBlobId(s, tse_id);
512 TID2BlobVersion blob_version = tse_id.IsSetVersion()? tse_id.GetVersion(): 0;
513 s << '.' << blob_version << '.' << split_version;
514 }
515 return s.str();
516 }
517
518
519 CPSGS_OSGGetBlobBase::SParsedId2Info
ParsePSGId2Info(const string & id2_info)520 CPSGS_OSGGetBlobBase::ParsePSGId2Info(const string& id2_info)
521 {
522 Int4 sat;
523 Int4 subsat;
524 Int4 satkey;
525 TID2BlobVersion tse_version;
526 TID2SplitVersion split_version;
527
528 CTempString s = id2_info;
529 if ( !s_ParseOSGBlob(s, sat, subsat, satkey) ||
530 !s_Skip(s, '.') ||
531 !s_ParseInt(s, tse_version) ||
532 !s_Skip(s, '.') ||
533 !s_ParseInt(s, split_version) ||
534 !s.empty() ) {
535 return SParsedId2Info{};
536 }
537
538 CRef<CID2_Blob_Id> id(new CID2_Blob_Id);
539 id->SetSat(sat);
540 id->SetSub_sat(subsat);
541 id->SetSat_key(satkey);
542 id->SetVersion(tse_version);
543 return SParsedId2Info{id, split_version};
544 }
545
546
547 END_NAMESPACE(osg);
548 END_NAMESPACE(psg);
549 END_NCBI_NAMESPACE;
550