1 /*  $Id: id2snp_impl.cpp 593248 2019-09-16 12:18:31Z grichenk $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors:  Eugene Vasilchenko
27  *
28  * File Description:
29  *   Processor of ID2 requests for SNP data
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <sra/data_loaders/snp/impl/id2snp_impl.hpp>
35 #include <objects/id2/id2processor_interface.hpp>
36 #include <sra/data_loaders/snp/id2snp_params.h>
37 #include <sra/readers/sra/snpread.hpp>
38 #include <sra/error_codes.hpp>
39 #include <corelib/reader_writer.hpp>
40 #include <corelib/rwstream.hpp>
41 #include <util/compress/zlib.hpp>
42 #include <serial/objostrasnb.hpp>
43 #include <serial/serial.hpp>
44 #include <objects/id2/id2__.hpp>
45 #include <objects/seqsplit/seqsplit__.hpp>
46 #include <objects/general/general__.hpp>
47 #include <objects/seqloc/seqloc__.hpp>
48 #include <objects/seqres/seqres__.hpp>
49 #include <objects/seq/Bioseq.hpp>
50 #include <objects/seq/Seq_annot.hpp>
51 #include <objects/seq/Annot_descr.hpp>
52 #include <objects/seq/Annotdesc.hpp>
53 #include <objects/seqset/Seq_entry.hpp>
54 #include <objects/seqsplit/ID2S_Split_Info.hpp>
55 #include <objects/seqsplit/ID2S_Chunk.hpp>
56 #include <objmgr/bioseq_handle.hpp>
57 #include <objmgr/annot_selector.hpp>
58 
59 BEGIN_NCBI_NAMESPACE;
60 
61 #define NCBI_USE_ERRCODE_X   ID2SNPProcessor
62 NCBI_DEFINE_ERR_SUBCODE_X(24);
63 
64 BEGIN_NAMESPACE(objects);
65 
66 // behavior options
67 #define TRACE_PROCESSING
68 
69 enum EResolveMaster {
70     eResolveMaster_never,
71     eResolveMaster_without_gi,
72     eResolveMaster_always
73 };
74 static const EResolveMaster kResolveMaster = eResolveMaster_never;
75 
76 // default configuration parameters
77 #define DEFAULT_VDB_CACHE_SIZE 10
78 #define DEFAULT_INDEX_UPDATE_TIME 600
79 #define DEFAULT_COMPRESS_DATA CID2SNPContext::eCompressData_some
80 
81 // debug levels
82 enum EDebugLevel {
83     eDebug_none     = 0,
84     eDebug_error    = 1,
85     eDebug_open     = 2,
86     eDebug_request  = 5,
87     eDebug_replies  = 6,
88     eDebug_resolve  = 7,
89     eDebug_data     = 8,
90     eDebug_all      = 9
91 };
92 
93 // SNP accession parameters
94 
95 
96 // parameters reading
97 NCBI_PARAM_DECL(bool, ID2SNP, ENABLE);
98 NCBI_PARAM_DEF_EX(bool, ID2SNP, ENABLE, true,
99                   eParam_NoThread, ID2SNP_ENABLE);
100 
101 
102 NCBI_PARAM_DECL(int, ID2SNP, DEBUG);
103 NCBI_PARAM_DEF_EX(int, ID2SNP, DEBUG, eDebug_error,
104                   eParam_NoThread, ID2SNP_DEBUG);
105 
106 
107 NCBI_PARAM_DECL(bool, ID2SNP, FILTER_ALL);
108 NCBI_PARAM_DEF_EX(bool, ID2SNP, FILTER_ALL, true,
109                   eParam_NoThread, ID2SNP_FILTER_ALL);
110 
111 
s_Enabled(void)112 static inline bool s_Enabled(void)
113 {
114     static CSafeStatic<NCBI_PARAM_TYPE(ID2SNP, ENABLE)> s_Value;
115     return s_Value->Get();
116 }
117 
118 
s_DebugLevel(void)119 static inline int s_DebugLevel(void)
120 {
121     static CSafeStatic<NCBI_PARAM_TYPE(ID2SNP, DEBUG)> s_Value;
122     return s_Value->Get();
123 }
124 
125 
s_DebugEnabled(EDebugLevel level)126 static inline bool s_DebugEnabled(EDebugLevel level)
127 {
128     return s_DebugLevel() >= level;
129 }
130 
131 
s_FilterAll(void)132 static inline bool s_FilterAll(void)
133 {
134     static CSafeStatic<NCBI_PARAM_TYPE(ID2SNP, FILTER_ALL)> s_Value;
135     return s_Value->Get();
136 }
137 
138 
139 /////////////////////////////////////////////////////////////////////////////
140 // CID2SNPProcessor_Impl
141 /////////////////////////////////////////////////////////////////////////////
142 
143 // Blob id
144 // sat = 2001-2099 : SNP NA version 1 - 99
145 // subsat : NA accession number
146 // satkey : SequenceIndex + 1000000*FilterIndex;
147 // satkey bits 24-30:
148 
149 const int kSNPSatBase = 2000;
150 const int kNAIndexDigits = 9;
151 const int kNAIndexMin = 1;
152 const int kNAIndexMax = 999999999;
153 const int kNAVersionDigitsMin = 1;
154 const int kNAVersionDigitsMax = 2;
155 const int kNALengthMin = 2 + kNAIndexDigits + 1 + kNAVersionDigitsMin; // NA000000000.0
156 const int kNALengthMax = 2 + kNAIndexDigits + 1 + kNAVersionDigitsMax; // NA000000000.00
157 const int kNAVersionMin = 1;
158 const int kNAVersionMax = 99;
159 const int kSeqIndexCount = 1000000;
160 const int kFilterIndexCount = 2000;
161 const int kFilterIndexMaxLength = 4;
162 
163 
164 // splitter parameters for SNPs and graphs
165 static const int kTSEId = 1;
166 static const int kChunkIdFeat = 0;
167 static const int kChunkIdGraph = 1;
168 static const int kChunkIdMul = 2;
169 static const TSeqPos kFeatChunkSize = 1000000;
170 static const TSeqPos kGraphChunkSize = 10000000;
171 
172 
173 BEGIN_LOCAL_NAMESPACE;
174 
175 
176 template<class Cont>
sx_AddNew(Cont & cont)177 typename Cont::value_type::TObjectType& sx_AddNew(Cont& cont)
178 {
179     typename Cont::value_type obj(new typename Cont::value_type::TObjectType);
180     cont.push_back(obj);
181     return *obj;
182 }
183 
184 
sx_SetZoomLevel(CSeq_annot & annot,int zoom_level)185 void sx_SetZoomLevel(CSeq_annot& annot, int zoom_level)
186 {
187     CUser_object& obj = sx_AddNew(annot.SetDesc().Set()).SetUser();
188     obj.SetType().SetStr("AnnotationTrack");
189     obj.AddField("ZoomLevel", zoom_level);
190 }
191 
192 
IsValidNAIndex(size_t na_index)193 bool IsValidNAIndex(size_t na_index)
194 {
195     return na_index >= kNAIndexMin && na_index <= kNAIndexMax;
196 }
197 
198 
IsValidNAVersion(size_t na_version)199 bool IsValidNAVersion(size_t na_version)
200 {
201     return na_version >= kNAVersionMin && na_version <= kNAVersionMax;
202 }
203 
204 
IsValidSeqIndex(size_t seq_index)205 bool IsValidSeqIndex(size_t seq_index)
206 {
207     return seq_index < kSeqIndexCount;
208 }
209 
210 
IsValidFilterIndex(size_t filter_index)211 bool IsValidFilterIndex(size_t filter_index)
212 {
213     return filter_index < kFilterIndexCount;
214 }
215 
216 
GetNAAccession(const SSNPDbTrackInfo & track)217 string GetNAAccession(const SSNPDbTrackInfo& track)
218 {
219     CNcbiOstrstream str;
220     str << "NA" << setw(kNAIndexDigits) << setfill('0') << track.m_NAIndex
221         << '.' << track.m_NAVersion;
222     return CNcbiOstrstreamToString(str);
223 }
224 
225 
FormatTrack(const SSNPDbTrackInfo & track)226 string FormatTrack(const SSNPDbTrackInfo& track)
227 {
228     CNcbiOstrstream str;
229     str << "NA" << setw(kNAIndexDigits) << setfill('0') << track.m_NAIndex
230         << '.' << track.m_NAVersion
231         << '#' << (track.m_FilterIndex+1);
232     return CNcbiOstrstreamToString(str);
233 }
234 
235 
ParseTrack(CTempString acc_filter)236 SSNPDbTrackInfo ParseTrack(CTempString acc_filter)
237 {
238     SSNPDbTrackInfo ret;
239     // NA123456789.1#1234
240     size_t hash_pos = acc_filter.find('#');
241     if ( hash_pos == NPOS ) {
242         return ret;
243     }
244     CTempString acc = acc_filter.substr(0, hash_pos);
245     CTempString filter = acc_filter.substr(hash_pos+1);
246     if ( acc.size() < kNALengthMin || acc.size() > kNALengthMax ||
247          acc[0] != 'N' || acc[1] != 'A' || acc[2+kNAIndexDigits] != '.' ) {
248         return ret;
249     }
250     if ( filter.empty() || filter[0] == '0' || filter.size() > kFilterIndexMaxLength ) {
251         return ret;
252     }
253     size_t na_index = NStr::StringToNumeric<size_t>(acc.substr(2, kNAIndexDigits),
254                                                     NStr::fConvErr_NoThrow);
255     if ( !IsValidNAIndex(na_index) ) {
256         return ret;
257     }
258     size_t na_version = NStr::StringToNumeric<size_t>(acc.substr(2+kNAIndexDigits+1),
259                                                       NStr::fConvErr_NoThrow);
260     if ( !IsValidNAVersion(na_version) ) {
261         return ret;
262     }
263     size_t filter_index = NStr::StringToNumeric<size_t>(filter,
264                                                         NStr::fConvErr_NoThrow)-1;
265     if ( !IsValidFilterIndex(filter_index) ) {
266         return ret;
267     }
268     ret.m_NAIndex = na_index;
269     ret.m_NAVersion = na_version;
270     ret.m_FilterIndex = filter_index;
271     return ret;
272 }
273 
274 
275 #ifdef TRACE_PROCESSING
276 
277 static CStopWatch sw;
278 
279 # define START_TRACE() do { if(s_DebugLevel()>0)sw.Restart(); } while(0)
280 
operator <<(CNcbiOstream & out,const CID2SNPProcessor_Impl::SSNPEntryInfo & seq)281 CNcbiOstream& operator<<(CNcbiOstream& out,
282                          const CID2SNPProcessor_Impl::SSNPEntryInfo& seq)
283 {
284     return out << FormatTrack(seq.m_Track) << '/' << seq.m_SeqIndex;
285 }
286 # define TRACE_X(t,l,m)                                                 \
287     do {                                                                \
288         if ( s_DebugEnabled(l) ) {                                      \
289             LOG_POST_X(t, Info<<sw.Elapsed()<<": ID2SNP: "<<m);         \
290         }                                                               \
291     } while(0)
292 #else
293 # define START_TRACE() do{}while(0)
294 # define TRACE_X(t,l,m) do{}while(0)
295 #endif
296 
297 
298 class COSSWriter : public IWriter
299 {
300 public:
301     typedef vector<char> TOctetString;
302     typedef list<TOctetString*> TOctetStringSequence;
303 
COSSWriter(TOctetStringSequence & out)304     COSSWriter(TOctetStringSequence& out)
305         : m_Output(out)
306         {
307         }
308 
Write(const void * buffer,size_t count,size_t * written)309     virtual ERW_Result Write(const void* buffer,
310                              size_t  count,
311                              size_t* written)
312         {
313             const char* data = static_cast<const char*>(buffer);
314             m_Output.push_back(new TOctetString(data, data+count));
315             if ( written ) {
316                 *written = count;
317             }
318             return eRW_Success;
319         }
Flush(void)320     virtual ERW_Result Flush(void)
321         {
322             return eRW_Success;
323         }
324 
325 private:
326     TOctetStringSequence& m_Output;
327 };
328 
329 
sx_GetSize(const CID2_Reply_Data & data)330 size_t sx_GetSize(const CID2_Reply_Data& data)
331 {
332     size_t size = 0;
333     ITERATE ( CID2_Reply_Data::TData, it, data.GetData() ) {
334         size += (*it)->size();
335     }
336     return size;
337 }
338 
339 
340 END_LOCAL_NAMESPACE;
341 
342 
CID2SNPContext(void)343 CID2SNPContext::CID2SNPContext(void)
344     : m_CompressData(eCompressData_never),
345       m_ExplicitBlobState(false),
346       m_AllowVDB(false)
347 {
348 }
349 
350 
CID2SNPProcessor_Impl(const CConfig::TParamTree * params,const string & driver_name)351 CID2SNPProcessor_Impl::CID2SNPProcessor_Impl(const CConfig::TParamTree* params,
352                                              const string& driver_name)
353 {
354     auto_ptr<CConfig::TParamTree> app_params;
355     if ( !params ) {
356         if ( CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard() ) {
357             app_params.reset(CConfig::ConvertRegToTree(app->GetConfig()));
358             params = app_params.get();
359         }
360     }
361     if ( params ) {
362         params = params->FindSubNode(CInterfaceVersion<CID2Processor>::GetName());
363     }
364     if ( params ) {
365         params = params->FindSubNode(driver_name);
366     }
367     CConfig conf(params);
368 
369     size_t cache_size =
370         conf.GetInt(driver_name,
371                     NCBI_ID2PROC_SNP_PARAM_VDB_CACHE_SIZE,
372                     CConfig::eErr_NoThrow,
373                     DEFAULT_VDB_CACHE_SIZE);
374     TRACE_X(23, eDebug_open, "ID2SNP: cache_size = "<<cache_size);
375     m_SNPDbCache.set_size_limit(cache_size);
376 
377     int compress_data =
378         conf.GetInt(driver_name,
379                     NCBI_ID2PROC_SNP_PARAM_COMPRESS_DATA,
380                     CConfig::eErr_NoThrow,
381                     DEFAULT_COMPRESS_DATA);
382     if ( compress_data >= CID2SNPContext::eCompressData_never &&
383          compress_data <= CID2SNPContext::eCompressData_always ) {
384         m_InitialContext.m_CompressData =
385             CID2SNPContext::ECompressData(compress_data);
386     }
387     TRACE_X(23, eDebug_open, "ID2SNP: compress_data = "<<m_InitialContext.m_CompressData);
388 }
389 
390 
~CID2SNPProcessor_Impl(void)391 CID2SNPProcessor_Impl::~CID2SNPProcessor_Impl(void)
392 {
393 }
394 
395 
InitContext(CID2SNPContext & context,const CID2_Request & request)396 void CID2SNPProcessor_Impl::InitContext(CID2SNPContext& context,
397                                         const CID2_Request& request)
398 {
399     context = GetInitialContext();
400     if ( request.IsSetParams() ) {
401         // check if blob-state field is allowed
402         ITERATE ( CID2_Request::TParams::Tdata, it, request.GetParams().Get() ) {
403             const CID2_Param& param = **it;
404             if ( param.GetName() == "id2:allow" && param.IsSetValue() ) {
405                 ITERATE ( CID2_Param::TValue, it2, param.GetValue() ) {
406                     if ( *it2 == "*.blob-state" ) {
407                         context.m_ExplicitBlobState = true;
408                     }
409                     if ( *it2 == "vdb-snp" ) {
410                         context.m_AllowVDB = true;
411                     }
412                 }
413             }
414         }
415     }
416 }
417 
418 
GetSNPDb(const string & na)419 CSNPDb CID2SNPProcessor_Impl::GetSNPDb(const string& na)
420 {
421     CMutexGuard guard(m_Mutex);
422     TSNPDbCache::iterator it = m_SNPDbCache.find(na);
423     if ( it != m_SNPDbCache.end() ) {
424         return it->second;
425     }
426     try {
427         CSNPDb snp_db(m_Mgr, na);
428         m_SNPDbCache[na] = snp_db;
429         TRACE_X(1, eDebug_open, "GetSNPDb: "<<na);
430         return snp_db;
431     }
432     catch ( CSraException& exc ) {
433         if ( exc.GetErrCode() == exc.eNotFoundDb ||
434              exc.GetErrCode() == exc.eProtectedDb ) {
435             // no such SNP table
436         }
437         else {
438             TRACE_X(22, eDebug_error, "ID2SNP: "
439                     "Exception while opening SNP DB "<<na<<": "<<exc);
440         }
441     }
442     catch ( CException& exc ) {
443         TRACE_X(22, eDebug_error, "ID2SNP: "
444                 "Exception while opening SNP DB "<<na<<": "<<exc);
445     }
446     catch ( exception& exc ) {
447         TRACE_X(22, eDebug_error, "ID2SNP: "
448                 "Exception while opening SNP DB "<<na<<": "<<exc.what());
449     }
450     return CSNPDb();
451 }
452 
453 
GetSNPDb(SSNPEntryInfo & seq)454 CSNPDb& CID2SNPProcessor_Impl::GetSNPDb(SSNPEntryInfo& seq)
455 {
456     if ( !seq.m_SNPDb ) {
457         seq.m_SNPDb = GetSNPDb(GetNAAccession(seq.m_Track));
458         if ( seq.m_SNPDb ) {
459             seq.m_Valid = true;
460         }
461     }
462     return seq.m_SNPDb;
463 }
464 
465 
ResetIteratorCache(SSNPEntryInfo & seq)466 void CID2SNPProcessor_Impl::ResetIteratorCache(SSNPEntryInfo& seq)
467 {
468     seq.m_SeqIter.Reset();
469     seq.m_BlobId.Reset();
470 }
471 
472 
GetSeqIterator(SSNPEntryInfo & seq)473 CSNPDbSeqIterator& CID2SNPProcessor_Impl::GetSeqIterator(SSNPEntryInfo& seq)
474 {
475     if ( !seq.m_SeqIter ) {
476         CSNPDb& db = GetSNPDb(seq);
477         seq.m_SeqIter = CSNPDbSeqIterator(db, seq.m_SeqIndex);
478         if ( seq.m_Track.m_FilterIndex ) {
479             seq.m_SeqIter.SetTrack(CSNPDbTrackIterator(db, seq.m_Track.m_FilterIndex));
480         }
481     }
482     return seq.m_SeqIter;
483 }
484 
485 
x_GetBlobId(SSNPEntryInfo & seq)486 CID2_Blob_Id& CID2SNPProcessor_Impl::x_GetBlobId(SSNPEntryInfo& seq)
487 {
488     if ( seq.m_BlobId ) {
489         return *seq.m_BlobId;
490     }
491     CRef<CID2_Blob_Id> id(new CID2_Blob_Id);
492     id->SetSat(kSNPSatBase + seq.m_Track.m_NAVersion);
493     id->SetSub_sat(seq.m_Track.m_NAIndex);
494     id->SetSat_key(seq.m_SeqIndex + seq.m_Track.m_FilterIndex * kSeqIndexCount);
495     seq.m_BlobId = id;
496     return *id;
497 }
498 
499 
500 CID2SNPProcessor_Impl::SSNPEntryInfo
x_ResolveBlobId(const CID2_Blob_Id & id)501 CID2SNPProcessor_Impl::x_ResolveBlobId(const CID2_Blob_Id& id)
502 {
503     SSNPEntryInfo seq;
504     if ( id.GetSat() < kSNPSatBase + kNAVersionMin ||
505          id.GetSat() > kSNPSatBase + kNAVersionMax ) {
506         return SSNPEntryInfo();
507     }
508     seq.m_Track.m_NAVersion = id.GetSat() - kSNPSatBase;
509     seq.m_Track.m_NAIndex = id.GetSub_sat();
510     if ( !IsValidNAIndex(seq.m_Track.m_NAIndex) ) {
511         return SSNPEntryInfo();
512     }
513     seq.m_SeqIndex = id.GetSat_key() % kSeqIndexCount;
514     if ( !IsValidSeqIndex(seq.m_SeqIndex) ) {
515         return SSNPEntryInfo();
516     }
517     seq.m_Track.m_FilterIndex = id.GetSat_key() / kSeqIndexCount;
518     if ( !IsValidFilterIndex(seq.m_Track.m_FilterIndex) ) {
519         return SSNPEntryInfo();
520     }
521     if ( CSNPDb snp_db = GetSNPDb(seq) ) {
522         seq.m_Valid = true;
523     }
524     return seq;
525 }
526 
527 
528 CID2SNPProcessor_Impl::SSNPEntryInfo
x_ResolveBlobId(const SSNPDbTrackInfo & track,const string & acc_ver)529 CID2SNPProcessor_Impl::x_ResolveBlobId(const SSNPDbTrackInfo& track,
530                                        const string& acc_ver)
531 {
532     SSNPEntryInfo seq;
533     if ( CSNPDb snp_db = GetSNPDb(GetNAAccession(track)) ) {
534         CSNPDbSeqIterator seq_iter(snp_db, CSeq_id_Handle::GetHandle(acc_ver));
535         if ( seq_iter ) {
536             seq.m_Track = track;
537             seq.m_SeqIndex = seq_iter.GetVDBSeqIndex();
538             seq.m_SNPDb = snp_db;
539             seq.m_Valid = true;
540             seq.m_SeqIter = seq_iter;
541         }
542     }
543     return seq;
544 }
545 
546 
WorthCompressing(const SSNPEntryInfo &)547 bool CID2SNPProcessor_Impl::WorthCompressing(const SSNPEntryInfo& /*seq*/)
548 {
549     return false;
550 }
551 
552 
WriteData(CID2SNPContext & context,const SSNPEntryInfo & seq,CID2_Reply_Data & data,const CSerialObject & obj)553 void CID2SNPProcessor_Impl::WriteData(CID2SNPContext& context,
554                                       const SSNPEntryInfo& seq,
555                                       CID2_Reply_Data& data,
556                                       const CSerialObject& obj)
557 {
558     data.SetData_format(CID2_Reply_Data::eData_format_asn_binary);
559     COSSWriter writer(data.SetData());
560     CWStream writer_stream(&writer);
561     AutoPtr<CNcbiOstream> str;
562     if ( (context.m_CompressData == CID2SNPContext::eCompressData_always) ||
563          (context.m_CompressData == CID2SNPContext::eCompressData_some && WorthCompressing(seq)) ) {
564         data.SetData_compression(CID2_Reply_Data::eData_compression_gzip);
565         str.reset(new CCompressionOStream(writer_stream,
566                                           new CZipStreamCompressor,
567                                           CCompressionIStream::fOwnProcessor));
568     }
569     else {
570         data.SetData_compression(CID2_Reply_Data::eData_compression_none);
571         str.reset(&writer_stream, eNoOwnership);
572     }
573     CObjectOStreamAsnBinary objstr(*str);
574     objstr << obj;
575 }
576 
577 
x_GetAccVer(string & acc_ver,const CSeq_id & id)578 bool CID2SNPProcessor_Impl::x_GetAccVer(string& acc_ver, const CSeq_id& id)
579 {
580     if ( !acc_ver.empty() ) {
581         return true;
582     }
583     if ( const CTextseq_id* text_id = id.GetTextseq_Id() ) {
584         if ( text_id->IsSetAccession() && !text_id->GetAccession().empty() &&
585              text_id->IsSetVersion() && text_id->GetVersion() > 0 ) {
586             // fully qualified text id, no more information is necessary
587             acc_ver = text_id->GetAccession()+'.'+NStr::NumericToString(text_id->GetVersion());
588             return true;
589         }
590     }
591     return false;
592 }
593 
594 
x_AddSeqIdRequest(CID2_Request_Get_Seq_id & request,CID2SNPProcessorPacketContext::SRequestInfo & info)595 void CID2SNPProcessor_Impl::x_AddSeqIdRequest(CID2_Request_Get_Seq_id& request,
596                                               CID2SNPProcessorPacketContext::SRequestInfo& info)
597 {
598     CID2_Request_Get_Seq_id::TSeq_id_type request_type = request.GetSeq_id_type();
599     info.m_OriginalSeqIdType = request_type;
600     if ( request.GetSeq_id().IsSeq_id() &&
601          x_GetAccVer(info.m_SeqAcc, request.GetSeq_id().GetSeq_id()) ) {
602         return;
603     }
604     if ( request_type == CID2_Request_Get_Seq_id::eSeq_id_type_any ) {
605         // ask for all Seq-ids instead of any
606         request.SetSeq_id_type(CID2_Request_Get_Seq_id::eSeq_id_type_all);
607     }
608     else if ( request_type & CID2_Request_Get_Seq_id::eSeq_id_type_text ) {
609         // text seq-id already asked
610     }
611     else {
612         // add text seq-id to the requested type set
613         request.SetSeq_id_type(request_type | CID2_Request_Get_Seq_id::eSeq_id_type_text);
614     }
615 }
616 
617 
618 CID2SNPProcessor_Impl::EProcessStatus
x_ProcessGetBlobId(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,TReplies & replies,CID2_Request & main_request,CID2_Request_Get_Blob_Id & request)619 CID2SNPProcessor_Impl::x_ProcessGetBlobId(CID2SNPContext& context,
620                                           CID2SNPProcessorPacketContext& packet_context,
621                                           TReplies& replies,
622                                           CID2_Request& main_request,
623                                           CID2_Request_Get_Blob_Id& request)
624 {
625     START_TRACE();
626     TRACE_X(7, eDebug_request, "GetBlobId: "<<MSerial_AsnText<<main_request);
627     if ( request.IsSetSources() ) {
628         CID2SNPProcessorPacketContext::SRequestInfo* info = 0;
629         // move SNP NAs from ID2 sources to m_SNPAccs
630         ERASE_ITERATE ( CID2_Request_Get_Blob_Id::TSources, it, request.SetSources() ) {
631             SSNPDbTrackInfo track = ParseTrack(*it);
632             if ( !track.m_NAIndex ) {
633                 continue;
634             }
635             CSNPDb db = GetSNPDb(GetNAAccession(track));
636             if ( !db ) {
637                 continue;
638             }
639             if ( track.m_FilterIndex >= db.GetTrackCount() ) {
640                 // bad track index
641                 request.SetSources().erase(it);
642                 continue;
643             }
644             if ( !info ) {
645                 info = &packet_context.m_SNPRequests[main_request.GetSerial_number()];
646             }
647             info->m_SNPTracks.push_back(track);
648             request.SetSources().erase(it);
649         }
650         if ( request.GetSources().empty() ) {
651             // no other ID2 sources left
652             request.ResetSources();
653         }
654         if ( info ) {
655             // add accession request if it's not known
656             x_AddSeqIdRequest(request.SetSeq_id(), *info);
657             return eNeedReplies;
658         }
659     }
660     return eNotProcessed;
661 }
662 
663 
664 BEGIN_LOCAL_NAMESPACE;
665 
666 template<class Values>
sx_HasNonZero(const Values & values,TSeqPos index,TSeqPos count)667 bool sx_HasNonZero(const Values& values, TSeqPos index, TSeqPos count)
668 {
669     TSeqPos end = min(index+count, TSeqPos(values.size()));
670     for ( TSeqPos i = index; i < end; ++i ) {
671         if ( values[i] ) {
672             return true;
673         }
674     }
675     return false;
676 }
677 
678 
679 template<class TValues>
sx_AddBits2(vector<char> & bits,TSeqPos bit_values,TSeqPos pos_index,const TValues & values)680 void sx_AddBits2(vector<char>& bits,
681                  TSeqPos bit_values,
682                  TSeqPos pos_index,
683                  const TValues& values)
684 {
685     TSeqPos dst_ind = pos_index / bit_values;
686     TSeqPos src_ind = 0;
687     if ( TSeqPos first_offset = pos_index % bit_values ) {
688         TSeqPos first_count = bit_values - first_offset;
689         if ( !bits[dst_ind] ) {
690             bits[dst_ind] = sx_HasNonZero(values, 0, first_count);
691         }
692         dst_ind += 1;
693         src_ind += first_count;
694     }
695     while ( src_ind < values.size() ) {
696         if ( !bits[dst_ind] ) {
697             bits[dst_ind] = sx_HasNonZero(values, src_ind, bit_values);
698         }
699         ++dst_ind;
700         src_ind += bit_values;
701     }
702 }
703 
704 
705 static
sx_AddBits(vector<char> & bits,TSeqPos kChunkSize,const CSeq_graph & graph)706 void sx_AddBits(vector<char>& bits,
707                 TSeqPos kChunkSize,
708                 const CSeq_graph& graph)
709 {
710     TSeqPos comp = graph.GetComp();
711     _ASSERT(kChunkSize % comp == 0);
712     TSeqPos bit_values = kChunkSize / comp;
713     const CSeq_interval& loc = graph.GetLoc().GetInt();
714     TSeqPos pos = loc.GetFrom();
715     _ASSERT(pos % comp == 0);
716     _ASSERT(graph.GetNumval()*comp == loc.GetLength());
717     TSeqPos pos_index = pos/comp;
718     if ( graph.GetGraph().IsByte() ) {
719         auto& values = graph.GetGraph().GetByte().GetValues();
720         _ASSERT(values.size() == graph.GetNumval());
721         sx_AddBits2(bits, bit_values, pos_index, values);
722     }
723     else {
724         auto& values = graph.GetGraph().GetInt().GetValues();
725         _ASSERT(values.size() == graph.GetNumval());
726         sx_AddBits2(bits, bit_values, pos_index, values);
727     }
728 }
729 
730 
731 END_LOCAL_NAMESPACE;
732 
733 
x_LoadBlob(CID2SNPContext & context,SSNPEntryInfo & info)734 CRef<CSerialObject> CID2SNPProcessor_Impl::x_LoadBlob(CID2SNPContext& context,
735                                                       SSNPEntryInfo& info)
736 {
737     CRef<CID2S_Split_Info> split_info(new CID2S_Split_Info);
738     split_info->SetChunks();
739     CBioseq_set& skeleton = split_info->SetSkeleton().SetSet();
740     skeleton.SetId().SetId(kTSEId);
741     skeleton.SetSeq_set();
742 
743 
744     CSNPDbSeqIterator it = GetSeqIterator(info);
745     CRange<TSeqPos> total_range = it.GetSNPRange();
746     vector<char> feat_chunks(total_range.GetTo()/kFeatChunkSize+1);
747     string na_acc = FormatTrack(info.m_Track);
748     {{
749         // overview graph
750         CRef<CSeq_annot> annot = it.GetOverviewAnnot(total_range, na_acc);
751         sx_SetZoomLevel(*annot, it.GetOverviewZoom());
752         if ( annot ) {
753             for ( auto& g : annot->GetData().GetGraph() ) {
754                 sx_AddBits(feat_chunks, kFeatChunkSize, *g);
755             }
756             skeleton.SetAnnot().push_back(annot);
757         }
758     }}
759     {{
760         // coverage graphs
761         string graph_name = CombineWithZoomLevel(na_acc, it.GetCoverageZoom());
762         _ASSERT(kGraphChunkSize % kFeatChunkSize == 0);
763         const TSeqPos feat_per_graph = kGraphChunkSize/kFeatChunkSize;
764         for ( int i = 0; i*kGraphChunkSize < total_range.GetToOpen(); ++i ) {
765             if ( !sx_HasNonZero(feat_chunks, i*feat_per_graph, feat_per_graph) ) {
766                 continue;
767             }
768             int chunk_id = i*kChunkIdMul+kChunkIdGraph;
769             CID2S_Chunk_Info& chunk = sx_AddNew(split_info->SetChunks());
770             chunk.SetId().Set(chunk_id);
771             CID2S_Seq_annot_Info& annot_info = sx_AddNew(chunk.SetContent()).SetSeq_annot();
772             annot_info.SetName(graph_name);
773             annot_info.SetGraph();
774             CID2S_Seq_id_Interval& interval = annot_info.SetSeq_loc().SetSeq_id_interval();
775             interval.SetSeq_id(*it.GetSeqId());
776             interval.SetStart(i*kGraphChunkSize);
777             interval.SetLength(kGraphChunkSize);
778         }
779     }}
780     {{
781         // features
782         TSeqPos overflow = it.GetMaxSNPLength()-1;
783         for ( int i = 0; i*kFeatChunkSize < total_range.GetToOpen(); ++i ) {
784             if ( !feat_chunks[i] ) {
785                 continue;
786             }
787             int chunk_id = i*kChunkIdMul+kChunkIdFeat;
788             CID2S_Chunk_Info& chunk = sx_AddNew(split_info->SetChunks());
789             chunk.SetId().Set(chunk_id);
790             CID2S_Seq_annot_Info& annot_info = sx_AddNew(chunk.SetContent()).SetSeq_annot();
791             annot_info.SetName(na_acc);
792             CID2S_Feat_type_Info& feat_type = sx_AddNew(annot_info.SetFeat());
793             feat_type.SetType(CSeqFeatData::e_Imp);
794             feat_type.SetSubtypes().push_back(CSeqFeatData::eSubtype_variation);
795             CID2S_Seq_id_Interval& interval = annot_info.SetSeq_loc().SetSeq_id_interval();
796             interval.SetSeq_id(*it.GetSeqId());
797             interval.SetStart(i*kFeatChunkSize);
798             interval.SetLength(kFeatChunkSize+overflow);
799         }
800     }}
801     return Ref<CSerialObject>(split_info);
802 }
803 
804 
x_LoadChunk(CID2SNPContext & context,SSNPEntryInfo & info,int chunk_id)805 CRef<CSerialObject> CID2SNPProcessor_Impl::x_LoadChunk(CID2SNPContext& context,
806                                                        SSNPEntryInfo& info,
807                                                        int chunk_id)
808 {
809     CRef<CID2S_Chunk> chunk(new CID2S_Chunk);
810     CID2S_Chunk_Data& data = sx_AddNew(chunk->SetData());
811     int chunk_type = chunk_id%kChunkIdMul;
812     int i = chunk_id/kChunkIdMul;
813     data.SetId().SetBioseq_set(kTSEId);
814 
815     string na_acc = FormatTrack(info.m_Track);
816     CSNPDbSeqIterator& it = GetSeqIterator(info);
817     if ( chunk_type == kChunkIdFeat ) {
818         CRange<TSeqPos> range;
819         range.SetFrom(i*kFeatChunkSize);
820         range.SetToOpen((i+1)*kFeatChunkSize);
821         for ( auto annot : it.GetTableFeatAnnots(range, na_acc) ) {
822             data.SetAnnots().push_back(annot);
823         }
824     }
825     else if ( chunk_type == kChunkIdGraph ) {
826         CRange<TSeqPos> range;
827         range.SetFrom(i*kGraphChunkSize);
828         range.SetToOpen((i+1)*kGraphChunkSize);
829         if ( auto annot = it.GetCoverageAnnot(range, na_acc) ) {
830             sx_SetZoomLevel(*annot, it.GetCoverageZoom());
831             data.SetAnnots().push_back(annot);
832         }
833     }
834     return Ref<CSerialObject>(chunk);
835 }
836 
837 
838 CID2SNPProcessor_Impl::EProcessStatus
x_ProcessGetBlobInfo(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,TReplies & replies,CID2_Request & main_request,CID2_Request_Get_Blob_Info & request)839 CID2SNPProcessor_Impl::x_ProcessGetBlobInfo(CID2SNPContext& context,
840                                             CID2SNPProcessorPacketContext& packet_context,
841                                             TReplies& replies,
842                                             CID2_Request& main_request,
843                                             CID2_Request_Get_Blob_Info& request)
844 {
845     if ( !request.GetBlob_id().IsBlob_id() ) {
846         return eNotProcessed;
847     }
848     if ( SSNPEntryInfo info = x_ResolveBlobId(request.GetBlob_id().GetBlob_id()) ) {
849         CID2_Reply& main_reply = sx_AddNew(replies);
850         if ( main_request.IsSetSerial_number() ) {
851             main_reply.SetSerial_number(main_request.GetSerial_number());
852         }
853         CID2_Reply_Get_Blob& reply = main_reply.SetReply().SetGet_blob();
854         reply.SetBlob_id(x_GetBlobId(info));
855         CID2_Reply_Data& data = reply.SetData();
856 
857         CRef<CSerialObject> obj = x_LoadBlob(context, info);
858         if ( obj->GetThisTypeInfo() == CID2S_Split_Info::GetTypeInfo() ) {
859             // split info
860             TRACE_X(11, eDebug_resolve, "GetSplitInfo: "<<info);
861             data.SetData_type(CID2_Reply_Data::eData_type_id2s_split_info);
862         }
863         else {
864             TRACE_X(11, eDebug_resolve, "GetSeq_entry: "<<info);
865             data.SetData_type(CID2_Reply_Data::eData_type_seq_entry);
866         }
867         WriteData(context, info, data, *obj);
868         TRACE_X(12, eDebug_resolve, "Seq("<<info<<"): "<<
869                 " data size: "<<sx_GetSize(data));
870         main_reply.SetEnd_of_reply();
871         return eProcessed;
872     }
873     return eNotProcessed;
874 }
875 
876 
877 CID2SNPProcessor_Impl::EProcessStatus
x_ProcessGetChunks(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,TReplies & replies,CID2_Request & main_request,CID2S_Request_Get_Chunks & request)878 CID2SNPProcessor_Impl::x_ProcessGetChunks(CID2SNPContext& context,
879                                           CID2SNPProcessorPacketContext& packet_context,
880                                           TReplies& replies,
881                                           CID2_Request& main_request,
882                                           CID2S_Request_Get_Chunks& request)
883 {
884     if ( SSNPEntryInfo info = x_ResolveBlobId(request.GetBlob_id()) ) {
885         ITERATE ( CID2S_Request_Get_Chunks::TChunks, it, request.GetChunks() ) {
886             CID2_Reply& main_reply = sx_AddNew(replies);
887             if ( main_request.IsSetSerial_number() ) {
888                 main_reply.SetSerial_number(main_request.GetSerial_number());
889             }
890             CID2S_Reply_Get_Chunk& reply = main_reply.SetReply().SetGet_chunk();
891             reply.SetBlob_id(request.SetBlob_id());
892             reply.SetChunk_id(*it);
893             CRef<CSerialObject> obj = x_LoadChunk(context, info, *it);
894             if ( obj && obj->GetThisTypeInfo() == CID2S_Chunk::GetTypeInfo() ) {
895                 // chunk
896                 TRACE_X(11, eDebug_resolve, "GetChunk: "<<info<<"."<<*it);
897                 CID2_Reply_Data& data = reply.SetData();
898                 data.SetData_type(CID2_Reply_Data::eData_type_id2s_chunk);
899                 WriteData(context, info, data, *obj);
900                 TRACE_X(12, eDebug_resolve, "Seq("<<info<<"): "<<
901                         " data size: "<<sx_GetSize(data));
902             }
903             else {
904                 TRACE_X(11, eDebug_resolve, "GetChunk: "<<info<<'.'<<*it<<": bad chunk");
905                 CID2_Error& error = sx_AddNew(main_reply.SetError());
906                 error.SetSeverity(CID2_Error::eSeverity_no_data);
907                 error.SetMessage("Invalid chunk id");
908             }
909         }
910         replies.back()->SetEnd_of_reply();
911         return eProcessed;
912     }
913     return eNotProcessed;
914 }
915 
916 
x_ProcessReplyGetSeqId(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,CID2_Reply & main_reply,TReplies & replies,CID2SNPProcessorPacketContext::SRequestInfo & info,CID2_Reply_Get_Seq_id & reply)917 void CID2SNPProcessor_Impl::x_ProcessReplyGetSeqId(CID2SNPContext& context,
918                                                    CID2SNPProcessorPacketContext& packet_context,
919                                                    CID2_Reply& main_reply,
920                                                    TReplies& replies,
921                                                    CID2SNPProcessorPacketContext::SRequestInfo& info,
922                                                    CID2_Reply_Get_Seq_id& reply)
923 {
924     replies.push_back(Ref(&main_reply));
925     if ( reply.IsSetSeq_id() ) {
926         for ( auto& r : reply.GetSeq_id() ) {
927             x_GetAccVer(info.m_SeqAcc, *r);
928         }
929     }
930 }
931 
932 
x_ProcessReplyGetBlobId(CID2SNPContext & context,CID2SNPProcessorPacketContext & packet_context,CID2_Reply & main_reply,TReplies & replies,CID2SNPProcessorPacketContext::SRequestInfo & req_info,CID2_Reply_Get_Blob_Id & reply)933 void CID2SNPProcessor_Impl::x_ProcessReplyGetBlobId(CID2SNPContext& context,
934                                                     CID2SNPProcessorPacketContext& packet_context,
935                                                     CID2_Reply& main_reply,
936                                                     TReplies& replies,
937                                                     CID2SNPProcessorPacketContext::SRequestInfo& req_info,
938                                                     CID2_Reply_Get_Blob_Id& reply)
939 {
940     replies.push_back(Ref(&main_reply));
941     if ( !req_info.m_SentBlobIds && reply.IsSetBlob_id() && !reply.IsSetAnnot_info() ) {
942         CRef<CSeq_id> seq_id(new CSeq_id(req_info.m_SeqAcc));
943         for ( auto& track : req_info.m_SNPTracks ) {
944             if ( SSNPEntryInfo snp_info = x_ResolveBlobId(track, req_info.m_SeqAcc) ) {
945                 string na_acc = FormatTrack(track);
946                 CID2_Reply& snp_main_reply = sx_AddNew(replies);
947                 snp_main_reply.SetSerial_number(main_reply.GetSerial_number());
948                 CID2_Reply_Get_Blob_Id& snp_reply = snp_main_reply.SetReply().SetGet_blob_id();
949                 snp_reply.SetSeq_id(reply.SetSeq_id());
950                 snp_reply.SetBlob_id(x_GetBlobId(snp_info));
951                 {{
952                     // add SNP feat type info
953                     CID2S_Seq_annot_Info& annot_info = sx_AddNew(snp_reply.SetAnnot_info());
954                     annot_info.SetSeq_loc().SetWhole_seq_id(*seq_id);
955                     annot_info.SetName(na_acc);
956                     CID2S_Feat_type_Info& type_info = sx_AddNew(annot_info.SetFeat());
957                     type_info.SetType(CSeqFeatData::e_Imp);
958                     type_info.SetSubtypes().push_back(CSeqFeatData::eSubtype_variation);
959                 }}
960                 {{
961                     // add SNP graph type info
962                     CID2S_Seq_annot_Info& annot_info = sx_AddNew(snp_reply.SetAnnot_info());
963                     annot_info.SetSeq_loc().SetWhole_seq_id(*seq_id);
964                     annot_info.SetName(CombineWithZoomLevel(na_acc, GetSNPDb(snp_info).GetCoverageZoom()));
965                     annot_info.SetGraph();
966                 }}
967                 {{
968                     // add SNP overvew graph type info
969                     CID2S_Seq_annot_Info& annot_info = sx_AddNew(snp_reply.SetAnnot_info());
970                     annot_info.SetSeq_loc().SetWhole_seq_id(*seq_id);
971                     annot_info.SetName(CombineWithZoomLevel(na_acc, GetSNPDb(snp_info).GetOverviewZoom()));
972                     annot_info.SetGraph();
973                 }}
974             }
975         }
976         if ( reply.IsSetEnd_of_reply() ) {
977             reply.ResetEnd_of_reply();
978             replies.back()->SetReply().SetGet_blob_id().SetEnd_of_reply();
979         }
980         if ( main_reply.IsSetEnd_of_reply() ) {
981             main_reply.ResetEnd_of_reply();
982             replies.back()->SetEnd_of_reply();
983         }
984         req_info.m_SentBlobIds = true;
985     }
986 }
987 
988 
989 /////////////////////////////////////////////////////////////////////////////
990 // new interface
991 
992 
993 CRef<CID2SNPProcessorContext>
CreateContext(void)994 CID2SNPProcessor_Impl::CreateContext(void)
995 {
996     CRef<CID2SNPProcessorContext> context(new CID2SNPProcessorContext);
997     context->m_Context = m_InitialContext;
998     return context;
999 }
1000 
1001 
1002 CRef<CID2SNPProcessorPacketContext>
ProcessPacket(CID2SNPProcessorContext * context,CID2_Request_Packet & packet,TReplies & replies)1003 CID2SNPProcessor_Impl::ProcessPacket(CID2SNPProcessorContext* context,
1004                                      CID2_Request_Packet& packet,
1005                                      TReplies& replies)
1006 {
1007     CRef<CID2SNPProcessorPacketContext> ret(new CID2SNPProcessorPacketContext);
1008     ERASE_ITERATE ( CID2_Request_Packet::Tdata, it, packet.Set() ) {
1009         // init request can come without serial number
1010         if ( (*it)->GetRequest().IsInit() ) {
1011             InitContext(context->m_Context, **it);
1012             continue;
1013         }
1014         if ( !context->m_Context.m_AllowVDB ) {
1015             continue;
1016         }
1017         if ( !(*it)->IsSetSerial_number() ) {
1018             // cannot process requests with no serial number
1019             continue;
1020         }
1021         EProcessStatus status = eNotProcessed;
1022         switch ( (*it)->GetRequest().Which() ) {
1023         case CID2_Request::TRequest::e_Get_blob_id:
1024             status = x_ProcessGetBlobId(context->m_Context, *ret, replies, **it,
1025                                         (*it)->SetRequest().SetGet_blob_id());
1026             break;
1027         case CID2_Request::TRequest::e_Get_blob_info:
1028             status = x_ProcessGetBlobInfo(context->m_Context, *ret, replies, **it,
1029                                           (*it)->SetRequest().SetGet_blob_info());
1030             break;
1031         case CID2_Request::TRequest::e_Get_chunks:
1032             status = x_ProcessGetChunks(context->m_Context, *ret, replies, **it,
1033                                         (*it)->SetRequest().SetGet_chunks());
1034             break;
1035         default:
1036             break;
1037         }
1038         if ( status == eProcessed ) {
1039             packet.Set().erase(it);
1040         }
1041     }
1042     if ( ret->m_SNPRequests.empty() ) {
1043         ret = null;
1044     }
1045     return ret;
1046 }
1047 
1048 
ProcessReply(CID2SNPProcessorContext * context,CID2SNPProcessorPacketContext * packet_context,CID2_Reply & reply,TReplies & replies)1049 void CID2SNPProcessor_Impl::ProcessReply(CID2SNPProcessorContext* context,
1050                                          CID2SNPProcessorPacketContext* packet_context,
1051                                          CID2_Reply& reply,
1052                                          TReplies& replies)
1053 {
1054     if ( packet_context && reply.IsSetSerial_number() ) {
1055         auto it = packet_context->m_SNPRequests.find(reply.GetSerial_number());
1056         if ( it != packet_context->m_SNPRequests.end() ) {
1057             if ( reply.GetReply().IsGet_seq_id() ) {
1058                 x_ProcessReplyGetSeqId(context->m_Context, *packet_context, reply, replies,
1059                                        it->second, reply.SetReply().SetGet_seq_id());
1060             }
1061             else if ( reply.GetReply().IsGet_blob_id() ) {
1062                 x_ProcessReplyGetBlobId(context->m_Context, *packet_context, reply, replies,
1063                                         it->second, reply.SetReply().SetGet_blob_id());
1064             }
1065             else {
1066                 replies.push_back(Ref(&reply));
1067             }
1068             return;
1069         }
1070     }
1071     // cannot process requests with no serial number
1072     replies.push_back(Ref(&reply));
1073 }
1074 
1075 
1076 // end of new interface
1077 /////////////////////////////////////////////////////////////////////////////
1078 
1079 
1080 END_NAMESPACE(objects);
1081 END_NCBI_NAMESPACE;
1082