1 /* $Id: id2snp_impl.cpp 593248 2019-09-16 12:18:31Z grichenk $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Eugene Vasilchenko
27 *
28 * File Description:
29 * Processor of ID2 requests for SNP data
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <sra/data_loaders/snp/impl/id2snp_impl.hpp>
35 #include <objects/id2/id2processor_interface.hpp>
36 #include <sra/data_loaders/snp/id2snp_params.h>
37 #include <sra/readers/sra/snpread.hpp>
38 #include <sra/error_codes.hpp>
39 #include <corelib/reader_writer.hpp>
40 #include <corelib/rwstream.hpp>
41 #include <util/compress/zlib.hpp>
42 #include <serial/objostrasnb.hpp>
43 #include <serial/serial.hpp>
44 #include <objects/id2/id2__.hpp>
45 #include <objects/seqsplit/seqsplit__.hpp>
46 #include <objects/general/general__.hpp>
47 #include <objects/seqloc/seqloc__.hpp>
48 #include <objects/seqres/seqres__.hpp>
49 #include <objects/seq/Bioseq.hpp>
50 #include <objects/seq/Seq_annot.hpp>
51 #include <objects/seq/Annot_descr.hpp>
52 #include <objects/seq/Annotdesc.hpp>
53 #include <objects/seqset/Seq_entry.hpp>
54 #include <objects/seqsplit/ID2S_Split_Info.hpp>
55 #include <objects/seqsplit/ID2S_Chunk.hpp>
56 #include <objmgr/bioseq_handle.hpp>
57 #include <objmgr/annot_selector.hpp>
58
59 BEGIN_NCBI_NAMESPACE;
60
61 #define NCBI_USE_ERRCODE_X ID2SNPProcessor
62 NCBI_DEFINE_ERR_SUBCODE_X(24);
63
64 BEGIN_NAMESPACE(objects);
65
66 // behavior options
67 #define TRACE_PROCESSING
68
69 enum EResolveMaster {
70 eResolveMaster_never,
71 eResolveMaster_without_gi,
72 eResolveMaster_always
73 };
74 static const EResolveMaster kResolveMaster = eResolveMaster_never;
75
76 // default configuration parameters
77 #define DEFAULT_VDB_CACHE_SIZE 10
78 #define DEFAULT_INDEX_UPDATE_TIME 600
79 #define DEFAULT_COMPRESS_DATA CID2SNPContext::eCompressData_some
80
81 // debug levels
82 enum EDebugLevel {
83 eDebug_none = 0,
84 eDebug_error = 1,
85 eDebug_open = 2,
86 eDebug_request = 5,
87 eDebug_replies = 6,
88 eDebug_resolve = 7,
89 eDebug_data = 8,
90 eDebug_all = 9
91 };
92
93 // SNP accession parameters
94
95
96 // parameters reading
97 NCBI_PARAM_DECL(bool, ID2SNP, ENABLE);
98 NCBI_PARAM_DEF_EX(bool, ID2SNP, ENABLE, true,
99 eParam_NoThread, ID2SNP_ENABLE);
100
101
102 NCBI_PARAM_DECL(int, ID2SNP, DEBUG);
103 NCBI_PARAM_DEF_EX(int, ID2SNP, DEBUG, eDebug_error,
104 eParam_NoThread, ID2SNP_DEBUG);
105
106
107 NCBI_PARAM_DECL(bool, ID2SNP, FILTER_ALL);
108 NCBI_PARAM_DEF_EX(bool, ID2SNP, FILTER_ALL, true,
109 eParam_NoThread, ID2SNP_FILTER_ALL);
110
111
s_Enabled(void)112 static inline bool s_Enabled(void)
113 {
114 static CSafeStatic<NCBI_PARAM_TYPE(ID2SNP, ENABLE)> s_Value;
115 return s_Value->Get();
116 }
117
118
s_DebugLevel(void)119 static inline int s_DebugLevel(void)
120 {
121 static CSafeStatic<NCBI_PARAM_TYPE(ID2SNP, DEBUG)> s_Value;
122 return s_Value->Get();
123 }
124
125
s_DebugEnabled(EDebugLevel level)126 static inline bool s_DebugEnabled(EDebugLevel level)
127 {
128 return s_DebugLevel() >= level;
129 }
130
131
s_FilterAll(void)132 static inline bool s_FilterAll(void)
133 {
134 static CSafeStatic<NCBI_PARAM_TYPE(ID2SNP, FILTER_ALL)> s_Value;
135 return s_Value->Get();
136 }
137
138
139 /////////////////////////////////////////////////////////////////////////////
140 // CID2SNPProcessor_Impl
141 /////////////////////////////////////////////////////////////////////////////
142
143 // Blob id
144 // sat = 2001-2099 : SNP NA version 1 - 99
145 // subsat : NA accession number
146 // satkey : SequenceIndex + 1000000*FilterIndex;
147 // satkey bits 24-30:
148
149 const int kSNPSatBase = 2000;
150 const int kNAIndexDigits = 9;
151 const int kNAIndexMin = 1;
152 const int kNAIndexMax = 999999999;
153 const int kNAVersionDigitsMin = 1;
154 const int kNAVersionDigitsMax = 2;
155 const int kNALengthMin = 2 + kNAIndexDigits + 1 + kNAVersionDigitsMin; // NA000000000.0
156 const int kNALengthMax = 2 + kNAIndexDigits + 1 + kNAVersionDigitsMax; // NA000000000.00
157 const int kNAVersionMin = 1;
158 const int kNAVersionMax = 99;
159 const int kSeqIndexCount = 1000000;
160 const int kFilterIndexCount = 2000;
161 const int kFilterIndexMaxLength = 4;
162
163
164 // splitter parameters for SNPs and graphs
165 static const int kTSEId = 1;
166 static const int kChunkIdFeat = 0;
167 static const int kChunkIdGraph = 1;
168 static const int kChunkIdMul = 2;
169 static const TSeqPos kFeatChunkSize = 1000000;
170 static const TSeqPos kGraphChunkSize = 10000000;
171
172
173 BEGIN_LOCAL_NAMESPACE;
174
175
176 template<class Cont>
sx_AddNew(Cont & cont)177 typename Cont::value_type::TObjectType& sx_AddNew(Cont& cont)
178 {
179 typename Cont::value_type obj(new typename Cont::value_type::TObjectType);
180 cont.push_back(obj);
181 return *obj;
182 }
183
184
sx_SetZoomLevel(CSeq_annot & annot,int zoom_level)185 void sx_SetZoomLevel(CSeq_annot& annot, int zoom_level)
186 {
187 CUser_object& obj = sx_AddNew(annot.SetDesc().Set()).SetUser();
188 obj.SetType().SetStr("AnnotationTrack");
189 obj.AddField("ZoomLevel", zoom_level);
190 }
191
192
IsValidNAIndex(size_t na_index)193 bool IsValidNAIndex(size_t na_index)
194 {
195 return na_index >= kNAIndexMin && na_index <= kNAIndexMax;
196 }
197
198
IsValidNAVersion(size_t na_version)199 bool IsValidNAVersion(size_t na_version)
200 {
201 return na_version >= kNAVersionMin && na_version <= kNAVersionMax;
202 }
203
204
IsValidSeqIndex(size_t seq_index)205 bool IsValidSeqIndex(size_t seq_index)
206 {
207 return seq_index < kSeqIndexCount;
208 }
209
210
IsValidFilterIndex(size_t filter_index)211 bool IsValidFilterIndex(size_t filter_index)
212 {
213 return filter_index < kFilterIndexCount;
214 }
215
216
GetNAAccession(const SSNPDbTrackInfo & track)217 string GetNAAccession(const SSNPDbTrackInfo& track)
218 {
219 CNcbiOstrstream str;
220 str << "NA" << setw(kNAIndexDigits) << setfill('0') << track.m_NAIndex
221 << '.' << track.m_NAVersion;
222 return CNcbiOstrstreamToString(str);
223 }
224
225
FormatTrack(const SSNPDbTrackInfo & track)226 string FormatTrack(const SSNPDbTrackInfo& track)
227 {
228 CNcbiOstrstream str;
229 str << "NA" << setw(kNAIndexDigits) << setfill('0') << track.m_NAIndex
230 << '.' << track.m_NAVersion
231 << '#' << (track.m_FilterIndex+1);
232 return CNcbiOstrstreamToString(str);
233 }
234
235
ParseTrack(CTempString acc_filter)236 SSNPDbTrackInfo ParseTrack(CTempString acc_filter)
237 {
238 SSNPDbTrackInfo ret;
239 // NA123456789.1#1234
240 size_t hash_pos = acc_filter.find('#');
241 if ( hash_pos == NPOS ) {
242 return ret;
243 }
244 CTempString acc = acc_filter.substr(0, hash_pos);
245 CTempString filter = acc_filter.substr(hash_pos+1);
246 if ( acc.size() < kNALengthMin || acc.size() > kNALengthMax ||
247 acc[0] != 'N' || acc[1] != 'A' || acc[2+kNAIndexDigits] != '.' ) {
248 return ret;
249 }
250 if ( filter.empty() || filter[0] == '0' || filter.size() > kFilterIndexMaxLength ) {
251 return ret;
252 }
253 size_t na_index = NStr::StringToNumeric<size_t>(acc.substr(2, kNAIndexDigits),
254 NStr::fConvErr_NoThrow);
255 if ( !IsValidNAIndex(na_index) ) {
256 return ret;
257 }
258 size_t na_version = NStr::StringToNumeric<size_t>(acc.substr(2+kNAIndexDigits+1),
259 NStr::fConvErr_NoThrow);
260 if ( !IsValidNAVersion(na_version) ) {
261 return ret;
262 }
263 size_t filter_index = NStr::StringToNumeric<size_t>(filter,
264 NStr::fConvErr_NoThrow)-1;
265 if ( !IsValidFilterIndex(filter_index) ) {
266 return ret;
267 }
268 ret.m_NAIndex = na_index;
269 ret.m_NAVersion = na_version;
270 ret.m_FilterIndex = filter_index;
271 return ret;
272 }
273
274
275 #ifdef TRACE_PROCESSING
276
277 static CStopWatch sw;
278
279 # define START_TRACE() do { if(s_DebugLevel()>0)sw.Restart(); } while(0)
280
operator <<(CNcbiOstream & out,const CID2SNPProcessor_Impl::SSNPEntryInfo & seq)281 CNcbiOstream& operator<<(CNcbiOstream& out,
282 const CID2SNPProcessor_Impl::SSNPEntryInfo& seq)
283 {
284 return out << FormatTrack(seq.m_Track) << '/' << seq.m_SeqIndex;
285 }
286 # define TRACE_X(t,l,m) \
287 do { \
288 if ( s_DebugEnabled(l) ) { \
289 LOG_POST_X(t, Info<<sw.Elapsed()<<": ID2SNP: "<<m); \
290 } \
291 } while(0)
292 #else
293 # define START_TRACE() do{}while(0)
294 # define TRACE_X(t,l,m) do{}while(0)
295 #endif
296
297
298 class COSSWriter : public IWriter
299 {
300 public:
301 typedef vector<char> TOctetString;
302 typedef list<TOctetString*> TOctetStringSequence;
303
COSSWriter(TOctetStringSequence & out)304 COSSWriter(TOctetStringSequence& out)
305 : m_Output(out)
306 {
307 }
308
Write(const void * buffer,size_t count,size_t * written)309 virtual ERW_Result Write(const void* buffer,
310 size_t count,
311 size_t* written)
312 {
313 const char* data = static_cast<const char*>(buffer);
314 m_Output.push_back(new TOctetString(data, data+count));
315 if ( written ) {
316 *written = count;
317 }
318 return eRW_Success;
319 }
Flush(void)320 virtual ERW_Result Flush(void)
321 {
322 return eRW_Success;
323 }
324
325 private:
326 TOctetStringSequence& m_Output;
327 };
328
329
sx_GetSize(const CID2_Reply_Data & data)330 size_t sx_GetSize(const CID2_Reply_Data& data)
331 {
332 size_t size = 0;
333 ITERATE ( CID2_Reply_Data::TData, it, data.GetData() ) {
334 size += (*it)->size();
335 }
336 return size;
337 }
338
339
340 END_LOCAL_NAMESPACE;
341
342
CID2SNPContext(void)343 CID2SNPContext::CID2SNPContext(void)
344 : m_CompressData(eCompressData_never),
345 m_ExplicitBlobState(false),
346 m_AllowVDB(false)
347 {
348 }
349
350
CID2SNPProcessor_Impl(const CConfig::TParamTree * params,const string & driver_name)351 CID2SNPProcessor_Impl::CID2SNPProcessor_Impl(const CConfig::TParamTree* params,
352 const string& driver_name)
353 {
354 auto_ptr<CConfig::TParamTree> app_params;
355 if ( !params ) {
356 if ( CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard() ) {
357 app_params.reset(CConfig::ConvertRegToTree(app->GetConfig()));
358 params = app_params.get();
359 }
360 }
361 if ( params ) {
362 params = params->FindSubNode(CInterfaceVersion<CID2Processor>::GetName());
363 }
364 if ( params ) {
365 params = params->FindSubNode(driver_name);
366 }
367 CConfig conf(params);
368
369 size_t cache_size =
370 conf.GetInt(driver_name,
371 NCBI_ID2PROC_SNP_PARAM_VDB_CACHE_SIZE,
372 CConfig::eErr_NoThrow,
373 DEFAULT_VDB_CACHE_SIZE);
374 TRACE_X(23, eDebug_open, "ID2SNP: cache_size = "<<cache_size);
375 m_SNPDbCache.set_size_limit(cache_size);
376
377 int compress_data =
378 conf.GetInt(driver_name,
379 NCBI_ID2PROC_SNP_PARAM_COMPRESS_DATA,
380 CConfig::eErr_NoThrow,
381 DEFAULT_COMPRESS_DATA);
382 if ( compress_data >= CID2SNPContext::eCompressData_never &&
383 compress_data <= CID2SNPContext::eCompressData_always ) {
384 m_InitialContext.m_CompressData =
385 CID2SNPContext::ECompressData(compress_data);
386 }
387 TRACE_X(23, eDebug_open, "ID2SNP: compress_data = "<<m_InitialContext.m_CompressData);
388 }
389
390
~CID2SNPProcessor_Impl(void)391 CID2SNPProcessor_Impl::~CID2SNPProcessor_Impl(void)
392 {
393 }
394
395
InitContext(CID2SNPContext & context,const CID2_Request & request)396 void CID2SNPProcessor_Impl::InitContext(CID2SNPContext& context,
397 const CID2_Request& request)
398 {
399 context = GetInitialContext();
400 if ( request.IsSetParams() ) {
401 // check if blob-state field is allowed
402 ITERATE ( CID2_Request::TParams::Tdata, it, request.GetParams().Get() ) {
403 const CID2_Param& param = **it;
404 if ( param.GetName() == "id2:allow" && param.IsSetValue() ) {
405 ITERATE ( CID2_Param::TValue, it2, param.GetValue() ) {
406 if ( *it2 == "*.blob-state" ) {
407 context.m_ExplicitBlobState = true;
408 }
409 if ( *it2 == "vdb-snp" ) {
410 context.m_AllowVDB = true;
411 }
412 }
413 }
414 }
415 }
416 }
417
418
GetSNPDb(const string & na)419 CSNPDb CID2SNPProcessor_Impl::GetSNPDb(const string& na)
420 {
421 CMutexGuard guard(m_Mutex);
422 TSNPDbCache::iterator it = m_SNPDbCache.find(na);
423 if ( it != m_SNPDbCache.end() ) {
424 return it->second;
425 }
426 try {
427 CSNPDb snp_db(m_Mgr, na);
428 m_SNPDbCache[na] = snp_db;
429 TRACE_X(1, eDebug_open, "GetSNPDb: "<<na);
430 return snp_db;
431 }
432 catch ( CSraException& exc ) {
433 if ( exc.GetErrCode() == exc.eNotFoundDb ||
434 exc.GetErrCode() == exc.eProtectedDb ) {
435 // no such SNP table
436 }
437 else {
438 TRACE_X(22, eDebug_error, "ID2SNP: "
439 "Exception while opening SNP DB "<<na<<": "<<exc);
440 }
441 }
442 catch ( CException& exc ) {
443 TRACE_X(22, eDebug_error, "ID2SNP: "
444 "Exception while opening SNP DB "<<na<<": "<<exc);
445 }
446 catch ( exception& exc ) {
447 TRACE_X(22, eDebug_error, "ID2SNP: "
448 "Exception while opening SNP DB "<<na<<": "<<exc.what());
449 }
450 return CSNPDb();
451 }
452
453
GetSNPDb(SSNPEntryInfo & seq)454 CSNPDb& CID2SNPProcessor_Impl::GetSNPDb(SSNPEntryInfo& seq)
455 {
456 if ( !seq.m_SNPDb ) {
457 seq.m_SNPDb = GetSNPDb(GetNAAccession(seq.m_Track));
458 if ( seq.m_SNPDb ) {
459 seq.m_Valid = true;
460 }
461 }
462 return seq.m_SNPDb;
463 }
464
465
ResetIteratorCache(SSNPEntryInfo & seq)466 void CID2SNPProcessor_Impl::ResetIteratorCache(SSNPEntryInfo& seq)
467 {
468 seq.m_SeqIter.Reset();
469 seq.m_BlobId.Reset();
470 }
471
472
GetSeqIterator(SSNPEntryInfo & seq)473 CSNPDbSeqIterator& CID2SNPProcessor_Impl::GetSeqIterator(SSNPEntryInfo& seq)
474 {
475 if ( !seq.m_SeqIter ) {
476 CSNPDb& db = GetSNPDb(seq);
477 seq.m_SeqIter = CSNPDbSeqIterator(db, seq.m_SeqIndex);
478 if ( seq.m_Track.m_FilterIndex ) {
479 seq.m_SeqIter.SetTrack(CSNPDbTrackIterator(db, seq.m_Track.m_FilterIndex));
480 }
481 }
482 return seq.m_SeqIter;
483 }
484
485
x_GetBlobId(SSNPEntryInfo & seq)486 CID2_Blob_Id& CID2SNPProcessor_Impl::x_GetBlobId(SSNPEntryInfo& seq)
487 {
488 if ( seq.m_BlobId ) {
489 return *seq.m_BlobId;
490 }
491 CRef<CID2_Blob_Id> id(new CID2_Blob_Id);
492 id->SetSat(kSNPSatBase + seq.m_Track.m_NAVersion);
493 id->SetSub_sat(seq.m_Track.m_NAIndex);
494 id->SetSat_key(seq.m_SeqIndex + seq.m_Track.m_FilterIndex * kSeqIndexCount);
495 seq.m_BlobId = id;
496 return *id;
497 }
498
499
500 CID2SNPProcessor_Impl::SSNPEntryInfo
x_ResolveBlobId(const CID2_Blob_Id & id)501 CID2SNPProcessor_Impl::x_ResolveBlobId(const CID2_Blob_Id& id)
502 {
503 SSNPEntryInfo seq;
504 if ( id.GetSat() < kSNPSatBase + kNAVersionMin ||
505 id.GetSat() > kSNPSatBase + kNAVersionMax ) {
506 return SSNPEntryInfo();
507 }
508 seq.m_Track.m_NAVersion = id.GetSat() - kSNPSatBase;
509 seq.m_Track.m_NAIndex = id.GetSub_sat();
510 if ( !IsValidNAIndex(seq.m_Track.m_NAIndex) ) {
511 return SSNPEntryInfo();
512 }
513 seq.m_SeqIndex = id.GetSat_key() % kSeqIndexCount;
514 if ( !IsValidSeqIndex(seq.m_SeqIndex) ) {
515 return SSNPEntryInfo();
516 }
517 seq.m_Track.m_FilterIndex = id.GetSat_key() / kSeqIndexCount;
518 if ( !IsValidFilterIndex(seq.m_Track.m_FilterIndex) ) {
519 return SSNPEntryInfo();
520 }
521 if ( CSNPDb snp_db = GetSNPDb(seq) ) {
522 seq.m_Valid = true;
523 }
524 return seq;
525 }
526
527
528 CID2SNPProcessor_Impl::SSNPEntryInfo
x_ResolveBlobId(const SSNPDbTrackInfo & track,const string & acc_ver)529 CID2SNPProcessor_Impl::x_ResolveBlobId(const SSNPDbTrackInfo& track,
530 const string& acc_ver)
531 {
532 SSNPEntryInfo seq;
533 if ( CSNPDb snp_db = GetSNPDb(GetNAAccession(track)) ) {
534 CSNPDbSeqIterator seq_iter(snp_db, CSeq_id_Handle::GetHandle(acc_ver));
535 if ( seq_iter ) {
536 seq.m_Track = track;
537 seq.m_SeqIndex = seq_iter.GetVDBSeqIndex();
538 seq.m_SNPDb = snp_db;
539 seq.m_Valid = true;
540 seq.m_SeqIter = seq_iter;
541 }
542 }
543 return seq;
544 }
545
546
WorthCompressing(const SSNPEntryInfo &)547 bool CID2SNPProcessor_Impl::WorthCompressing(const SSNPEntryInfo& /*seq*/)
548 {
549 return false;
550 }
551
552
WriteData(CID2SNPContext & context,const SSNPEntryInfo & seq,CID2_Reply_Data & data,const CSerialObject & obj)553 void CID2SNPProcessor_Impl::WriteData(CID2SNPContext& context,
554 const SSNPEntryInfo& seq,
555 CID2_Reply_Data& data,
556 const CSerialObject& obj)
557 {
558 data.SetData_format(CID2_Reply_Data::eData_format_asn_binary);
559 COSSWriter writer(data.SetData());
560 CWStream writer_stream(&writer);
561 AutoPtr<CNcbiOstream> str;
562 if ( (context.m_CompressData == CID2SNPContext::eCompressData_always) ||
563 (context.m_CompressData == CID2SNPContext::eCompressData_some && WorthCompressing(seq)) ) {
564 data.SetData_compression(CID2_Reply_Data::eData_compression_gzip);
565 str.reset(new CCompressionOStream(writer_stream,
566 new CZipStreamCompressor,
567 CCompressionIStream::fOwnProcessor));
568 }
569 else {
570 data.SetData_compression(CID2_Reply_Data::eData_compression_none);
571 str.reset(&writer_stream, eNoOwnership);
572 }
573 CObjectOStreamAsnBinary objstr(*str);
574 objstr << obj;
575 }
576
577
x_GetAccVer(string & acc_ver,const CSeq_id & id)578 bool CID2SNPProcessor_Impl::x_GetAccVer(string& acc_ver, const CSeq_id& id)
579 {
580 if ( !acc_ver.empty() ) {
581 return true;
582 }
583 if ( const CTextseq_id* text_id = id.GetTextseq_Id() ) {
584 if ( text_id->IsSetAccession() && !text_id->GetAccession().empty() &&
585 text_id->IsSetVersion() && text_id->GetVersion() > 0 ) {
586 // fully qualified text id, no more information is necessary
587 acc_ver = text_id->GetAccession()+'.'+NStr::NumericToString(text_id->GetVersion());
588 return true;
589 }
590 }
591 return false;
592 }
593
594
x_AddSeqIdRequest(CID2_Request_Get_Seq_id & request,CID2SNPProcessorPacketContext::SRequestInfo & info)595 void CID2SNPProcessor_Impl::x_AddSeqIdRequest(CID2_Request_Get_Seq_id& request,
596 CID2SNPProcessorPacketContext::SRequestInfo& info)
597 {
598 CID2_Request_Get_Seq_id::TSeq_id_type request_type = request.GetSeq_id_type();
599 info.m_OriginalSeqIdType = request_type;
600 if ( request.GetSeq_id().IsSeq_id() &&
601 x_GetAccVer(info.m_SeqAcc, request.GetSeq_id().GetSeq_id()) ) {
602 return;
603 }
604 if ( request_type == CID2_Request_Get_Seq_id::eSeq_id_type_any ) {
605 // ask for all Seq-ids instead of any
606 request.SetSeq_id_type(CID2_Request_Get_Seq_id::eSeq_id_type_all);
607 }
608 else if ( request_type & CID2_Request_Get_Seq_id::eSeq_id_type_text ) {
609 // text seq-id already asked
610 }
611 else {
612 // add text seq-id to the requested type set
613 request.SetSeq_id_type(request_type | CID2_Request_Get_Seq_id::eSeq_id_type_text);
614 }
615 }
616
617
618 CID2SNPProcessor_Impl::EProcessStatus
x_ProcessGetBlobId(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,TReplies & replies,CID2_Request & main_request,CID2_Request_Get_Blob_Id & request)619 CID2SNPProcessor_Impl::x_ProcessGetBlobId(CID2SNPContext& context,
620 CID2SNPProcessorPacketContext& packet_context,
621 TReplies& replies,
622 CID2_Request& main_request,
623 CID2_Request_Get_Blob_Id& request)
624 {
625 START_TRACE();
626 TRACE_X(7, eDebug_request, "GetBlobId: "<<MSerial_AsnText<<main_request);
627 if ( request.IsSetSources() ) {
628 CID2SNPProcessorPacketContext::SRequestInfo* info = 0;
629 // move SNP NAs from ID2 sources to m_SNPAccs
630 ERASE_ITERATE ( CID2_Request_Get_Blob_Id::TSources, it, request.SetSources() ) {
631 SSNPDbTrackInfo track = ParseTrack(*it);
632 if ( !track.m_NAIndex ) {
633 continue;
634 }
635 CSNPDb db = GetSNPDb(GetNAAccession(track));
636 if ( !db ) {
637 continue;
638 }
639 if ( track.m_FilterIndex >= db.GetTrackCount() ) {
640 // bad track index
641 request.SetSources().erase(it);
642 continue;
643 }
644 if ( !info ) {
645 info = &packet_context.m_SNPRequests[main_request.GetSerial_number()];
646 }
647 info->m_SNPTracks.push_back(track);
648 request.SetSources().erase(it);
649 }
650 if ( request.GetSources().empty() ) {
651 // no other ID2 sources left
652 request.ResetSources();
653 }
654 if ( info ) {
655 // add accession request if it's not known
656 x_AddSeqIdRequest(request.SetSeq_id(), *info);
657 return eNeedReplies;
658 }
659 }
660 return eNotProcessed;
661 }
662
663
664 BEGIN_LOCAL_NAMESPACE;
665
666 template<class Values>
sx_HasNonZero(const Values & values,TSeqPos index,TSeqPos count)667 bool sx_HasNonZero(const Values& values, TSeqPos index, TSeqPos count)
668 {
669 TSeqPos end = min(index+count, TSeqPos(values.size()));
670 for ( TSeqPos i = index; i < end; ++i ) {
671 if ( values[i] ) {
672 return true;
673 }
674 }
675 return false;
676 }
677
678
679 template<class TValues>
sx_AddBits2(vector<char> & bits,TSeqPos bit_values,TSeqPos pos_index,const TValues & values)680 void sx_AddBits2(vector<char>& bits,
681 TSeqPos bit_values,
682 TSeqPos pos_index,
683 const TValues& values)
684 {
685 TSeqPos dst_ind = pos_index / bit_values;
686 TSeqPos src_ind = 0;
687 if ( TSeqPos first_offset = pos_index % bit_values ) {
688 TSeqPos first_count = bit_values - first_offset;
689 if ( !bits[dst_ind] ) {
690 bits[dst_ind] = sx_HasNonZero(values, 0, first_count);
691 }
692 dst_ind += 1;
693 src_ind += first_count;
694 }
695 while ( src_ind < values.size() ) {
696 if ( !bits[dst_ind] ) {
697 bits[dst_ind] = sx_HasNonZero(values, src_ind, bit_values);
698 }
699 ++dst_ind;
700 src_ind += bit_values;
701 }
702 }
703
704
705 static
sx_AddBits(vector<char> & bits,TSeqPos kChunkSize,const CSeq_graph & graph)706 void sx_AddBits(vector<char>& bits,
707 TSeqPos kChunkSize,
708 const CSeq_graph& graph)
709 {
710 TSeqPos comp = graph.GetComp();
711 _ASSERT(kChunkSize % comp == 0);
712 TSeqPos bit_values = kChunkSize / comp;
713 const CSeq_interval& loc = graph.GetLoc().GetInt();
714 TSeqPos pos = loc.GetFrom();
715 _ASSERT(pos % comp == 0);
716 _ASSERT(graph.GetNumval()*comp == loc.GetLength());
717 TSeqPos pos_index = pos/comp;
718 if ( graph.GetGraph().IsByte() ) {
719 auto& values = graph.GetGraph().GetByte().GetValues();
720 _ASSERT(values.size() == graph.GetNumval());
721 sx_AddBits2(bits, bit_values, pos_index, values);
722 }
723 else {
724 auto& values = graph.GetGraph().GetInt().GetValues();
725 _ASSERT(values.size() == graph.GetNumval());
726 sx_AddBits2(bits, bit_values, pos_index, values);
727 }
728 }
729
730
731 END_LOCAL_NAMESPACE;
732
733
x_LoadBlob(CID2SNPContext & context,SSNPEntryInfo & info)734 CRef<CSerialObject> CID2SNPProcessor_Impl::x_LoadBlob(CID2SNPContext& context,
735 SSNPEntryInfo& info)
736 {
737 CRef<CID2S_Split_Info> split_info(new CID2S_Split_Info);
738 split_info->SetChunks();
739 CBioseq_set& skeleton = split_info->SetSkeleton().SetSet();
740 skeleton.SetId().SetId(kTSEId);
741 skeleton.SetSeq_set();
742
743
744 CSNPDbSeqIterator it = GetSeqIterator(info);
745 CRange<TSeqPos> total_range = it.GetSNPRange();
746 vector<char> feat_chunks(total_range.GetTo()/kFeatChunkSize+1);
747 string na_acc = FormatTrack(info.m_Track);
748 {{
749 // overview graph
750 CRef<CSeq_annot> annot = it.GetOverviewAnnot(total_range, na_acc);
751 sx_SetZoomLevel(*annot, it.GetOverviewZoom());
752 if ( annot ) {
753 for ( auto& g : annot->GetData().GetGraph() ) {
754 sx_AddBits(feat_chunks, kFeatChunkSize, *g);
755 }
756 skeleton.SetAnnot().push_back(annot);
757 }
758 }}
759 {{
760 // coverage graphs
761 string graph_name = CombineWithZoomLevel(na_acc, it.GetCoverageZoom());
762 _ASSERT(kGraphChunkSize % kFeatChunkSize == 0);
763 const TSeqPos feat_per_graph = kGraphChunkSize/kFeatChunkSize;
764 for ( int i = 0; i*kGraphChunkSize < total_range.GetToOpen(); ++i ) {
765 if ( !sx_HasNonZero(feat_chunks, i*feat_per_graph, feat_per_graph) ) {
766 continue;
767 }
768 int chunk_id = i*kChunkIdMul+kChunkIdGraph;
769 CID2S_Chunk_Info& chunk = sx_AddNew(split_info->SetChunks());
770 chunk.SetId().Set(chunk_id);
771 CID2S_Seq_annot_Info& annot_info = sx_AddNew(chunk.SetContent()).SetSeq_annot();
772 annot_info.SetName(graph_name);
773 annot_info.SetGraph();
774 CID2S_Seq_id_Interval& interval = annot_info.SetSeq_loc().SetSeq_id_interval();
775 interval.SetSeq_id(*it.GetSeqId());
776 interval.SetStart(i*kGraphChunkSize);
777 interval.SetLength(kGraphChunkSize);
778 }
779 }}
780 {{
781 // features
782 TSeqPos overflow = it.GetMaxSNPLength()-1;
783 for ( int i = 0; i*kFeatChunkSize < total_range.GetToOpen(); ++i ) {
784 if ( !feat_chunks[i] ) {
785 continue;
786 }
787 int chunk_id = i*kChunkIdMul+kChunkIdFeat;
788 CID2S_Chunk_Info& chunk = sx_AddNew(split_info->SetChunks());
789 chunk.SetId().Set(chunk_id);
790 CID2S_Seq_annot_Info& annot_info = sx_AddNew(chunk.SetContent()).SetSeq_annot();
791 annot_info.SetName(na_acc);
792 CID2S_Feat_type_Info& feat_type = sx_AddNew(annot_info.SetFeat());
793 feat_type.SetType(CSeqFeatData::e_Imp);
794 feat_type.SetSubtypes().push_back(CSeqFeatData::eSubtype_variation);
795 CID2S_Seq_id_Interval& interval = annot_info.SetSeq_loc().SetSeq_id_interval();
796 interval.SetSeq_id(*it.GetSeqId());
797 interval.SetStart(i*kFeatChunkSize);
798 interval.SetLength(kFeatChunkSize+overflow);
799 }
800 }}
801 return Ref<CSerialObject>(split_info);
802 }
803
804
x_LoadChunk(CID2SNPContext & context,SSNPEntryInfo & info,int chunk_id)805 CRef<CSerialObject> CID2SNPProcessor_Impl::x_LoadChunk(CID2SNPContext& context,
806 SSNPEntryInfo& info,
807 int chunk_id)
808 {
809 CRef<CID2S_Chunk> chunk(new CID2S_Chunk);
810 CID2S_Chunk_Data& data = sx_AddNew(chunk->SetData());
811 int chunk_type = chunk_id%kChunkIdMul;
812 int i = chunk_id/kChunkIdMul;
813 data.SetId().SetBioseq_set(kTSEId);
814
815 string na_acc = FormatTrack(info.m_Track);
816 CSNPDbSeqIterator& it = GetSeqIterator(info);
817 if ( chunk_type == kChunkIdFeat ) {
818 CRange<TSeqPos> range;
819 range.SetFrom(i*kFeatChunkSize);
820 range.SetToOpen((i+1)*kFeatChunkSize);
821 for ( auto annot : it.GetTableFeatAnnots(range, na_acc) ) {
822 data.SetAnnots().push_back(annot);
823 }
824 }
825 else if ( chunk_type == kChunkIdGraph ) {
826 CRange<TSeqPos> range;
827 range.SetFrom(i*kGraphChunkSize);
828 range.SetToOpen((i+1)*kGraphChunkSize);
829 if ( auto annot = it.GetCoverageAnnot(range, na_acc) ) {
830 sx_SetZoomLevel(*annot, it.GetCoverageZoom());
831 data.SetAnnots().push_back(annot);
832 }
833 }
834 return Ref<CSerialObject>(chunk);
835 }
836
837
838 CID2SNPProcessor_Impl::EProcessStatus
x_ProcessGetBlobInfo(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,TReplies & replies,CID2_Request & main_request,CID2_Request_Get_Blob_Info & request)839 CID2SNPProcessor_Impl::x_ProcessGetBlobInfo(CID2SNPContext& context,
840 CID2SNPProcessorPacketContext& packet_context,
841 TReplies& replies,
842 CID2_Request& main_request,
843 CID2_Request_Get_Blob_Info& request)
844 {
845 if ( !request.GetBlob_id().IsBlob_id() ) {
846 return eNotProcessed;
847 }
848 if ( SSNPEntryInfo info = x_ResolveBlobId(request.GetBlob_id().GetBlob_id()) ) {
849 CID2_Reply& main_reply = sx_AddNew(replies);
850 if ( main_request.IsSetSerial_number() ) {
851 main_reply.SetSerial_number(main_request.GetSerial_number());
852 }
853 CID2_Reply_Get_Blob& reply = main_reply.SetReply().SetGet_blob();
854 reply.SetBlob_id(x_GetBlobId(info));
855 CID2_Reply_Data& data = reply.SetData();
856
857 CRef<CSerialObject> obj = x_LoadBlob(context, info);
858 if ( obj->GetThisTypeInfo() == CID2S_Split_Info::GetTypeInfo() ) {
859 // split info
860 TRACE_X(11, eDebug_resolve, "GetSplitInfo: "<<info);
861 data.SetData_type(CID2_Reply_Data::eData_type_id2s_split_info);
862 }
863 else {
864 TRACE_X(11, eDebug_resolve, "GetSeq_entry: "<<info);
865 data.SetData_type(CID2_Reply_Data::eData_type_seq_entry);
866 }
867 WriteData(context, info, data, *obj);
868 TRACE_X(12, eDebug_resolve, "Seq("<<info<<"): "<<
869 " data size: "<<sx_GetSize(data));
870 main_reply.SetEnd_of_reply();
871 return eProcessed;
872 }
873 return eNotProcessed;
874 }
875
876
877 CID2SNPProcessor_Impl::EProcessStatus
x_ProcessGetChunks(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,TReplies & replies,CID2_Request & main_request,CID2S_Request_Get_Chunks & request)878 CID2SNPProcessor_Impl::x_ProcessGetChunks(CID2SNPContext& context,
879 CID2SNPProcessorPacketContext& packet_context,
880 TReplies& replies,
881 CID2_Request& main_request,
882 CID2S_Request_Get_Chunks& request)
883 {
884 if ( SSNPEntryInfo info = x_ResolveBlobId(request.GetBlob_id()) ) {
885 ITERATE ( CID2S_Request_Get_Chunks::TChunks, it, request.GetChunks() ) {
886 CID2_Reply& main_reply = sx_AddNew(replies);
887 if ( main_request.IsSetSerial_number() ) {
888 main_reply.SetSerial_number(main_request.GetSerial_number());
889 }
890 CID2S_Reply_Get_Chunk& reply = main_reply.SetReply().SetGet_chunk();
891 reply.SetBlob_id(request.SetBlob_id());
892 reply.SetChunk_id(*it);
893 CRef<CSerialObject> obj = x_LoadChunk(context, info, *it);
894 if ( obj && obj->GetThisTypeInfo() == CID2S_Chunk::GetTypeInfo() ) {
895 // chunk
896 TRACE_X(11, eDebug_resolve, "GetChunk: "<<info<<"."<<*it);
897 CID2_Reply_Data& data = reply.SetData();
898 data.SetData_type(CID2_Reply_Data::eData_type_id2s_chunk);
899 WriteData(context, info, data, *obj);
900 TRACE_X(12, eDebug_resolve, "Seq("<<info<<"): "<<
901 " data size: "<<sx_GetSize(data));
902 }
903 else {
904 TRACE_X(11, eDebug_resolve, "GetChunk: "<<info<<'.'<<*it<<": bad chunk");
905 CID2_Error& error = sx_AddNew(main_reply.SetError());
906 error.SetSeverity(CID2_Error::eSeverity_no_data);
907 error.SetMessage("Invalid chunk id");
908 }
909 }
910 replies.back()->SetEnd_of_reply();
911 return eProcessed;
912 }
913 return eNotProcessed;
914 }
915
916
x_ProcessReplyGetSeqId(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,CID2_Reply & main_reply,TReplies & replies,CID2SNPProcessorPacketContext::SRequestInfo & info,CID2_Reply_Get_Seq_id & reply)917 void CID2SNPProcessor_Impl::x_ProcessReplyGetSeqId(CID2SNPContext& context,
918 CID2SNPProcessorPacketContext& packet_context,
919 CID2_Reply& main_reply,
920 TReplies& replies,
921 CID2SNPProcessorPacketContext::SRequestInfo& info,
922 CID2_Reply_Get_Seq_id& reply)
923 {
924 replies.push_back(Ref(&main_reply));
925 if ( reply.IsSetSeq_id() ) {
926 for ( auto& r : reply.GetSeq_id() ) {
927 x_GetAccVer(info.m_SeqAcc, *r);
928 }
929 }
930 }
931
932
x_ProcessReplyGetBlobId(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,CID2_Reply & main_reply,TReplies & replies,CID2SNPProcessorPacketContext::SRequestInfo & req_info,CID2_Reply_Get_Blob_Id & reply)933 void CID2SNPProcessor_Impl::x_ProcessReplyGetBlobId(CID2SNPContext& context,
934 CID2SNPProcessorPacketContext& packet_context,
935 CID2_Reply& main_reply,
936 TReplies& replies,
937 CID2SNPProcessorPacketContext::SRequestInfo& req_info,
938 CID2_Reply_Get_Blob_Id& reply)
939 {
940 replies.push_back(Ref(&main_reply));
941 if ( !req_info.m_SentBlobIds && reply.IsSetBlob_id() && !reply.IsSetAnnot_info() ) {
942 CRef<CSeq_id> seq_id(new CSeq_id(req_info.m_SeqAcc));
943 for ( auto& track : req_info.m_SNPTracks ) {
944 if ( SSNPEntryInfo snp_info = x_ResolveBlobId(track, req_info.m_SeqAcc) ) {
945 string na_acc = FormatTrack(track);
946 CID2_Reply& snp_main_reply = sx_AddNew(replies);
947 snp_main_reply.SetSerial_number(main_reply.GetSerial_number());
948 CID2_Reply_Get_Blob_Id& snp_reply = snp_main_reply.SetReply().SetGet_blob_id();
949 snp_reply.SetSeq_id(reply.SetSeq_id());
950 snp_reply.SetBlob_id(x_GetBlobId(snp_info));
951 {{
952 // add SNP feat type info
953 CID2S_Seq_annot_Info& annot_info = sx_AddNew(snp_reply.SetAnnot_info());
954 annot_info.SetSeq_loc().SetWhole_seq_id(*seq_id);
955 annot_info.SetName(na_acc);
956 CID2S_Feat_type_Info& type_info = sx_AddNew(annot_info.SetFeat());
957 type_info.SetType(CSeqFeatData::e_Imp);
958 type_info.SetSubtypes().push_back(CSeqFeatData::eSubtype_variation);
959 }}
960 {{
961 // add SNP graph type info
962 CID2S_Seq_annot_Info& annot_info = sx_AddNew(snp_reply.SetAnnot_info());
963 annot_info.SetSeq_loc().SetWhole_seq_id(*seq_id);
964 annot_info.SetName(CombineWithZoomLevel(na_acc, GetSNPDb(snp_info).GetCoverageZoom()));
965 annot_info.SetGraph();
966 }}
967 {{
968 // add SNP overvew graph type info
969 CID2S_Seq_annot_Info& annot_info = sx_AddNew(snp_reply.SetAnnot_info());
970 annot_info.SetSeq_loc().SetWhole_seq_id(*seq_id);
971 annot_info.SetName(CombineWithZoomLevel(na_acc, GetSNPDb(snp_info).GetOverviewZoom()));
972 annot_info.SetGraph();
973 }}
974 }
975 }
976 if ( reply.IsSetEnd_of_reply() ) {
977 reply.ResetEnd_of_reply();
978 replies.back()->SetReply().SetGet_blob_id().SetEnd_of_reply();
979 }
980 if ( main_reply.IsSetEnd_of_reply() ) {
981 main_reply.ResetEnd_of_reply();
982 replies.back()->SetEnd_of_reply();
983 }
984 req_info.m_SentBlobIds = true;
985 }
986 }
987
988
989 /////////////////////////////////////////////////////////////////////////////
990 // new interface
991
992
993 CRef<CID2SNPProcessorContext>
CreateContext(void)994 CID2SNPProcessor_Impl::CreateContext(void)
995 {
996 CRef<CID2SNPProcessorContext> context(new CID2SNPProcessorContext);
997 context->m_Context = m_InitialContext;
998 return context;
999 }
1000
1001
1002 CRef<CID2SNPProcessorPacketContext>
ProcessPacket(CID2SNPProcessorContext * context,CID2_Request_Packet & packet,TReplies & replies)1003 CID2SNPProcessor_Impl::ProcessPacket(CID2SNPProcessorContext* context,
1004 CID2_Request_Packet& packet,
1005 TReplies& replies)
1006 {
1007 CRef<CID2SNPProcessorPacketContext> ret(new CID2SNPProcessorPacketContext);
1008 ERASE_ITERATE ( CID2_Request_Packet::Tdata, it, packet.Set() ) {
1009 // init request can come without serial number
1010 if ( (*it)->GetRequest().IsInit() ) {
1011 InitContext(context->m_Context, **it);
1012 continue;
1013 }
1014 if ( !context->m_Context.m_AllowVDB ) {
1015 continue;
1016 }
1017 if ( !(*it)->IsSetSerial_number() ) {
1018 // cannot process requests with no serial number
1019 continue;
1020 }
1021 EProcessStatus status = eNotProcessed;
1022 switch ( (*it)->GetRequest().Which() ) {
1023 case CID2_Request::TRequest::e_Get_blob_id:
1024 status = x_ProcessGetBlobId(context->m_Context, *ret, replies, **it,
1025 (*it)->SetRequest().SetGet_blob_id());
1026 break;
1027 case CID2_Request::TRequest::e_Get_blob_info:
1028 status = x_ProcessGetBlobInfo(context->m_Context, *ret, replies, **it,
1029 (*it)->SetRequest().SetGet_blob_info());
1030 break;
1031 case CID2_Request::TRequest::e_Get_chunks:
1032 status = x_ProcessGetChunks(context->m_Context, *ret, replies, **it,
1033 (*it)->SetRequest().SetGet_chunks());
1034 break;
1035 default:
1036 break;
1037 }
1038 if ( status == eProcessed ) {
1039 packet.Set().erase(it);
1040 }
1041 }
1042 if ( ret->m_SNPRequests.empty() ) {
1043 ret = null;
1044 }
1045 return ret;
1046 }
1047
1048
ProcessReply(CID2SNPProcessorContext * context,CID2SNPProcessorPacketContext * packet_context,CID2_Reply & reply,TReplies & replies)1049 void CID2SNPProcessor_Impl::ProcessReply(CID2SNPProcessorContext* context,
1050 CID2SNPProcessorPacketContext* packet_context,
1051 CID2_Reply& reply,
1052 TReplies& replies)
1053 {
1054 if ( packet_context && reply.IsSetSerial_number() ) {
1055 auto it = packet_context->m_SNPRequests.find(reply.GetSerial_number());
1056 if ( it != packet_context->m_SNPRequests.end() ) {
1057 if ( reply.GetReply().IsGet_seq_id() ) {
1058 x_ProcessReplyGetSeqId(context->m_Context, *packet_context, reply, replies,
1059 it->second, reply.SetReply().SetGet_seq_id());
1060 }
1061 else if ( reply.GetReply().IsGet_blob_id() ) {
1062 x_ProcessReplyGetBlobId(context->m_Context, *packet_context, reply, replies,
1063 it->second, reply.SetReply().SetGet_blob_id());
1064 }
1065 else {
1066 replies.push_back(Ref(&reply));
1067 }
1068 return;
1069 }
1070 }
1071 // cannot process requests with no serial number
1072 replies.push_back(Ref(&reply));
1073 }
1074
1075
1076 // end of new interface
1077 /////////////////////////////////////////////////////////////////////////////
1078
1079
1080 END_NAMESPACE(objects);
1081 END_NCBI_NAMESPACE;
1082