1 /*  $Id: psg_client.cpp 628921 2021-04-07 18:46:41Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Rafael Sadyrov
27  *
28  */
29 
30 #include <ncbi_pch.hpp>
31 
32 #include <objtools/pubseq_gateway/client/psg_client.hpp>
33 
34 #ifdef HAVE_PSG_CLIENT
35 
36 #include <condition_variable>
37 #include <mutex>
38 #include <sstream>
39 #include <string>
40 #include <thread>
41 #include <type_traits>
42 
43 #include <corelib/ncbitime.hpp>
44 #include <corelib/ncbi_base64.h>
45 #include <connect/ncbi_socket.hpp>
46 #include <connect/ncbi_service.h>
47 #include <connect/ncbi_connutil.h>
48 
49 #include <serial/serial.hpp>
50 #include <serial/objistrasnb.hpp>
51 #undef ThrowError // unfortunately
52 
53 #include "psg_client_impl.hpp"
54 
55 BEGIN_NCBI_SCOPE
56 
57 
GetErrCodeString(void) const58 const char* CPSG_Exception::GetErrCodeString(void) const
59 {
60     switch (GetErrCode())
61     {
62         case eTimeout:          return "eTimeout";
63         case eServerError:      return "eServerError";
64         case eInternalError:    return "eInternalError";
65         case eParameterMissing: return "eParameterMissing";
66         default:                return CException::GetErrCodeString();
67     }
68 }
69 
70 
SPSG_BlobReader(SPSG_Reply::SItem::TTS * src)71 SPSG_BlobReader::SPSG_BlobReader(SPSG_Reply::SItem::TTS* src)
72     : m_Src(src)
73 {
74     assert(src);
75 }
76 
x_Read(void * buf,size_t count,size_t * bytes_read)77 ERW_Result SPSG_BlobReader::x_Read(void* buf, size_t count, size_t* bytes_read)
78 {
79     assert(bytes_read);
80 
81     *bytes_read = 0;
82 
83     CheckForNewChunks();
84 
85     for (; m_Chunk < m_Data.size(); ++m_Chunk) {
86         auto& data = m_Data[m_Chunk];
87 
88         // Chunk has not been received yet
89         if (data.empty()) return eRW_Success;
90 
91         auto available = data.size() - m_Index;
92         auto to_copy = min(count, available);
93 
94         memcpy(buf, data.data() + m_Index, to_copy);
95         buf = (char*)buf + to_copy;
96         count -= to_copy;
97         *bytes_read += to_copy;
98         m_Index += to_copy;
99 
100         if (!count) return eRW_Success;
101 
102         m_Index = 0;
103     }
104 
105     auto src_locked = m_Src->GetLock();
106     return src_locked->expected.Cmp<equal_to>(src_locked->received) ? eRW_Eof : eRW_Success;
107 }
108 
Read(void * buf,size_t count,size_t * bytes_read)109 ERW_Result SPSG_BlobReader::Read(void* buf, size_t count, size_t* bytes_read)
110 {
111     size_t read;
112     const auto kSeconds = TPSG_ReaderTimeout::GetDefault();
113     CDeadline deadline(kSeconds);
114 
115     do {
116         auto rv = x_Read(buf, count, &read);
117 
118         if ((rv != eRW_Success) || (read != 0)) {
119             if (bytes_read) *bytes_read = read;
120             return rv;
121         }
122     }
123     while (m_Src->WaitUntil(deadline));
124 
125     NCBI_THROW_FMT(CPSG_Exception, eTimeout, "Timeout on reading (after " << kSeconds << " seconds)");
126     return eRW_Error;
127 }
128 
PendingCount(size_t * count)129 ERW_Result SPSG_BlobReader::PendingCount(size_t* count)
130 {
131     assert(count);
132 
133     *count = 0;
134 
135     CheckForNewChunks();
136 
137     auto k = m_Index;
138 
139     for (auto i = m_Chunk; i < m_Data.size(); ++i) {
140         auto& data = m_Data[i];
141 
142         // Chunk has not been received yet
143         if (data.empty()) return eRW_Success;
144 
145         *count += data.size() - k;
146         k = 0;
147     }
148 
149     return eRW_Success;
150 }
151 
CheckForNewChunks()152 void SPSG_BlobReader::CheckForNewChunks()
153 {
154     if (m_Src->GetMTSafe().state.Empty()) return;
155 
156     auto src_locked = m_Src->GetLock();
157     auto& src = *src_locked;
158     auto& chunks = src.chunks;
159 
160     if (m_Data.size() < chunks.size()) m_Data.resize(chunks.size());
161 
162     for (size_t i = 0; i < chunks.size(); ++i) {
163         if (!chunks[i].empty()) {
164             m_Data[i].swap(chunks[i]);
165         }
166     }
167 }
168 
169 
170 static_assert(is_nothrow_move_constructible<CPSG_BioId>::value, "CPSG_BioId move constructor must be noexcept");
171 
Repr() const172 string CPSG_BioId::Repr() const
173 {
174     return m_Type == TType::e_not_set ? m_Id : m_Id + '~' + to_string(m_Type);
175 }
176 
s_GetBioId(const CJsonNode & data)177 CPSG_BioId s_GetBioId(const CJsonNode& data)
178 {
179     auto type = static_cast<CPSG_BioId::TType>(data.GetInteger("seq_id_type"));
180     auto accession = data.GetString("accession");
181     auto name_node = data.GetByKeyOrNull("name");
182     auto name = name_node && name_node.IsString() ? name_node.AsString() : string();
183     auto version = static_cast<int>(data.GetInteger("version"));
184     return { objects::CSeq_id(type, accession, name, version).AsFastaString(), type };
185 };
186 
operator <<(ostream & os,const CPSG_BioId & bio_id)187 ostream& operator<<(ostream& os, const CPSG_BioId& bio_id)
188 {
189     if (bio_id.GetType()) os << "seq_id_type=" << bio_id.GetType() << '&';
190     return os << "seq_id=" << bio_id.GetId();
191 }
192 
193 
194 static_assert(is_nothrow_move_constructible<CPSG_BlobId>::value, "CPSG_BlobId move constructor must be noexcept");
195 
Repr() const196 string CPSG_BlobId::Repr() const
197 {
198     return m_LastModified.IsNull() ? m_Id : m_Id + '~' + to_string(m_LastModified.GetValue());
199 }
200 
s_GetDataId(const SPSG_Args & args)201 unique_ptr<CPSG_DataId> s_GetDataId(const SPSG_Args& args)
202 {
203     try {
204         const auto& blob_id = args.GetValue("blob_id");
205 
206         if (blob_id.empty()) {
207             auto id2_chunk = NStr::StringToNumeric<Uint8>(args.GetValue("id2_chunk"));
208             return unique_ptr<CPSG_DataId>(new CPSG_ChunkId(id2_chunk, args.GetValue("id2_info")));
209         }
210 
211         CPSG_BlobId::TLastModified last_modified;
212         const auto& last_modified_str = args.GetValue("last_modified");
213 
214         if (!last_modified_str.empty()) {
215             last_modified = NStr::StringToNumeric<Int8>(last_modified_str);
216         }
217 
218         return unique_ptr<CPSG_DataId>(new CPSG_BlobId(blob_id, move(last_modified)));
219     }
220     catch (...) {
221         NCBI_THROW_FMT(CPSG_Exception, eServerError,
222                 "Both blob_id[+last_modified] and id2_chunk+id2_info pairs are missing/corrupted in server response: " <<
223                 args.GetQueryString(CUrlArgs::eAmp_Char));
224     }
225 }
226 
s_GetBlobId(const CJsonNode & data)227 CPSG_BlobId s_GetBlobId(const CJsonNode& data)
228 {
229     CPSG_BlobId::TLastModified last_modified;
230 
231     if (data.HasKey("last_modified")) {
232         last_modified = data.GetInteger("last_modified");
233     }
234 
235     if (data.HasKey("blob_id")) {
236         return { data.GetString("blob_id"), last_modified };
237     }
238 
239     auto sat = static_cast<int>(data.GetInteger("sat"));
240     auto sat_key = static_cast<int>(data.GetInteger("sat_key"));
241     return { sat, sat_key, last_modified };
242 }
243 
operator <<(ostream & os,const CPSG_BlobId & blob_id)244 ostream& operator<<(ostream& os, const CPSG_BlobId& blob_id)
245 {
246     if (!blob_id.GetLastModified().IsNull()) os << "last_modified=" << blob_id.GetLastModified().GetValue() << '&';
247     return os << "blob_id=" << blob_id.GetId();
248 }
249 
250 
Repr() const251 string CPSG_ChunkId::Repr() const
252 {
253     return to_string(m_Id2Chunk) + '~' + m_Id2Info;
254 }
255 
operator <<(ostream & os,const CPSG_ChunkId & chunk_id)256 ostream& operator<<(ostream& os, const CPSG_ChunkId& chunk_id)
257 {
258     return os << "id2_chunk=" << chunk_id.GetId2Chunk() << "&id2_info=" << chunk_id.GetId2Info();
259 }
260 
261 
262 template <class TReplyItem>
CreateImpl(TReplyItem * item,const vector<SPSG_Chunk> & chunks)263 TReplyItem* CPSG_Reply::SImpl::CreateImpl(TReplyItem* item, const vector<SPSG_Chunk>& chunks)
264 {
265     if (chunks.empty()) return item;
266 
267     unique_ptr<TReplyItem> rv(item);
268     rv->m_Data = CJsonNode::ParseJSON(chunks.front(), CJsonNode::fStandardJson);
269 
270     return rv.release();
271 }
272 
273 struct SItemTypeAndReason : pair<CPSG_ReplyItem::EType, CPSG_SkippedBlob::EReason>
274 {
275     static SItemTypeAndReason Get(const SPSG_Args& args);
276 
277 private:
278     using TBase = pair<CPSG_ReplyItem::EType, CPSG_SkippedBlob::EReason>;
279     using TBase::TBase;
SItemTypeAndReasonSItemTypeAndReason280     SItemTypeAndReason(CPSG_ReplyItem::EType type) : TBase(type, CPSG_SkippedBlob::eUnknown) {}
281 };
282 
Get(const SPSG_Args & args)283 SItemTypeAndReason SItemTypeAndReason::Get(const SPSG_Args& args)
284 {
285     const auto item_type = args.GetValue("item_type");
286 
287     if (item_type == "blob") {
288         const auto reason = args.GetValue("reason");
289 
290         if (reason.empty()) {
291             return CPSG_ReplyItem::eBlobData;
292 
293         } else if (reason == "excluded") {
294             return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eExcluded };
295 
296         } else if (reason == "inprogress") {
297             return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eInProgress };
298 
299         } else if (reason == "sent") {
300             return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eSent };
301 
302         } else {
303             return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eUnknown };
304         }
305 
306     } else if (item_type == "bioseq_info") {
307         return CPSG_ReplyItem::eBioseqInfo;
308 
309     } else if (item_type == "blob_prop") {
310         return CPSG_ReplyItem::eBlobInfo;
311 
312     } else if (item_type == "bioseq_na") {
313         return CPSG_ReplyItem::eNamedAnnotInfo;
314 
315     } else if (item_type == "public_comment") {
316         return CPSG_ReplyItem::ePublicComment;
317 
318     } else if (item_type == "processor") {
319         return CPSG_ReplyItem::eProcessor;
320 
321     } else {
322         if (TPSG_FailOnUnknownItems::GetDefault()) {
323             NCBI_THROW_FMT(CPSG_Exception, eServerError, "Received unknown item type: " << item_type);
324         }
325 
326         static atomic_bool reported(false);
327 
328         if (!reported.exchange(true)) {
329             ERR_POST("Received unknown item type: " << item_type);
330         }
331 
332         return CPSG_ReplyItem::eEndOfReply;
333     }
334 }
335 
Create(SPSG_Reply::SItem::TTS * item_ts)336 shared_ptr<CPSG_ReplyItem> CPSG_Reply::SImpl::Create(SPSG_Reply::SItem::TTS* item_ts)
337 {
338     auto user_reply_locked = user_reply.lock();
339 
340     assert(user_reply_locked);
341     assert(item_ts);
342 
343     auto item_locked = item_ts->GetLock();
344 
345     item_locked->state.SetReturned();
346 
347     unique_ptr<CPSG_ReplyItem::SImpl> impl(new CPSG_ReplyItem::SImpl);
348     impl->item = item_ts;
349 
350     shared_ptr<CPSG_ReplyItem> rv;
351 
352     auto& chunks = item_locked->chunks;
353     auto& args = item_locked->args;
354     auto processor_id = args.GetValue("processor_id");
355 
356     const auto& state = item_locked->state.GetState();
357     const auto itar = SItemTypeAndReason::Get(args);
358 
359     if ((state == SPSG_Reply::SState::eNotFound) || (state == SPSG_Reply::SState::eError)) {
360         rv.reset(new CPSG_ReplyItem(itar.first));
361 
362     } else if (itar.first == CPSG_ReplyItem::eBlobData) {
363         unique_ptr<CPSG_BlobData> blob_data(new CPSG_BlobData(s_GetDataId(args)));
364         blob_data->m_Stream.reset(new SPSG_RStream(item_ts));
365         rv.reset(blob_data.release());
366 
367     } else if (itar.first == CPSG_ReplyItem::eSkippedBlob) {
368         auto data_id = s_GetDataId(args);
369         auto blob_id = move(dynamic_cast<CPSG_BlobId&>(*data_id));
370         rv.reset(new CPSG_SkippedBlob(move(blob_id), itar.second));
371 
372     } else if (itar.first == CPSG_ReplyItem::eBioseqInfo) {
373         rv.reset(CreateImpl(new CPSG_BioseqInfo, chunks));
374 
375     } else if (itar.first == CPSG_ReplyItem::eBlobInfo) {
376         rv.reset(CreateImpl(new CPSG_BlobInfo(s_GetDataId(args)), chunks));
377 
378     } else if (itar.first == CPSG_ReplyItem::eNamedAnnotInfo) {
379         auto name = args.GetValue("na");
380         rv.reset(CreateImpl(new CPSG_NamedAnnotInfo(name), chunks));
381 
382     } else if (itar.first == CPSG_ReplyItem::ePublicComment) {
383         auto text = chunks.empty() ? string() : chunks.front();
384         rv.reset(new CPSG_PublicComment(s_GetDataId(args), text));
385 
386     } else if (itar.first == CPSG_ReplyItem::eProcessor) {
387         rv.reset(new CPSG_ReplyItem(CPSG_ReplyItem::eProcessor));
388 
389     } else {
390         return rv;
391     }
392 
393     rv->m_Impl.reset(impl.release());
394     rv->m_Reply = user_reply_locked;
395     rv->m_ProcessorId = move(processor_id);
396     return rv;
397 }
398 
399 
400 pair<mutex, weak_ptr<CPSG_Queue::SImpl::CService::TMap>> CPSG_Queue::SImpl::CService::sm_Instance;
401 
GetIoC(const string & service)402 SPSG_IoCoordinator& CPSG_Queue::SImpl::CService::GetIoC(const string& service)
403 {
404     if (service.empty()) {
405         NCBI_THROW(CPSG_Exception, eParameterMissing, "Service name is empty");
406     }
407 
408     unique_lock<mutex> lock(sm_Instance.first);
409 
410     auto found = m_Map->find(service);
411 
412     if (found != m_Map->end()) {
413         return *found->second;
414     }
415 
416     auto created = m_Map->emplace(service, unique_ptr<SPSG_IoCoordinator>(new SPSG_IoCoordinator(service)));
417     return *created.first->second;
418 }
419 
GetMap()420 shared_ptr<CPSG_Queue::SImpl::CService::TMap> CPSG_Queue::SImpl::CService::GetMap()
421 {
422     unique_lock<mutex> lock(sm_Instance.first);
423 
424     auto rv = sm_Instance.second.lock();
425 
426     if (!rv) {
427         rv = make_shared<TMap>();
428         sm_Instance.second = rv;
429     }
430 
431     return rv;
432 }
433 
434 
SImpl(const string & service)435 CPSG_Queue::SImpl::SImpl(const string& service) :
436     m_Service(service)
437 {
438 }
439 
s_GetTSE(CPSG_Request_Biodata::EIncludeData include_data)440 const char* s_GetTSE(CPSG_Request_Biodata::EIncludeData include_data)
441 {
442     switch (include_data) {
443         case CPSG_Request_Biodata::eDefault:  return nullptr;
444         case CPSG_Request_Biodata::eNoTSE:    return "none";
445         case CPSG_Request_Biodata::eSlimTSE:  return "slim";
446         case CPSG_Request_Biodata::eSmartTSE: return "smart";
447         case CPSG_Request_Biodata::eWholeTSE: return "whole";
448         case CPSG_Request_Biodata::eOrigTSE:  return "orig";
449     }
450 
451     return nullptr;
452 }
453 
x_GetAbsPathRef(shared_ptr<const CPSG_Request> user_request)454 string CPSG_Queue::SImpl::x_GetAbsPathRef(shared_ptr<const CPSG_Request> user_request)
455 {
456     ostringstream os;
457     user_request->x_GetAbsPathRef(os);
458     os << m_Service.ioc.GetUrlArgs();
459 
460     if (const auto hops = user_request->m_Hops) os << "&hops=" << hops;
461     return os.str();
462 }
463 
s_GetAccSubstitution(EPSG_AccSubstitution acc_substitution)464 const char* s_GetAccSubstitution(EPSG_AccSubstitution acc_substitution)
465 {
466     switch (acc_substitution) {
467         case EPSG_AccSubstitution::Default: break;
468         case EPSG_AccSubstitution::Limited: return "&acc_substitution=limited";
469         case EPSG_AccSubstitution::Never:   return "&acc_substitution=never";
470     }
471 
472     return "";
473 }
474 
475 
s_GetAutoBlobSkipping(ESwitch value)476 const char* s_GetAutoBlobSkipping(ESwitch value)
477 {
478     switch (value) {
479         case eDefault: break;
480         case eOn:      return "&auto_blob_skipping=yes";
481         case eOff:     return "&auto_blob_skipping=no";
482     }
483 
484     return "";
485 }
486 
487 
x_GetAbsPathRef(ostream & os) const488 void CPSG_Request_Biodata::x_GetAbsPathRef(ostream& os) const
489 {
490     os << "/ID/get?" << m_BioId;
491 
492     if (const auto tse = s_GetTSE(m_IncludeData)) os << "&tse=" << tse;
493 
494     if (!m_ExcludeTSEs.empty()) {
495         os << "&exclude_blobs";
496 
497         char delimiter = '=';
498         for (const auto& blob_id : m_ExcludeTSEs) {
499             os << delimiter << blob_id.GetId();
500             delimiter = ',';
501         }
502     }
503 
504     os << s_GetAccSubstitution(m_AccSubstitution);
505     os << s_GetAutoBlobSkipping(m_AutoBlobSkipping);
506 }
507 
x_GetAbsPathRef(ostream & os) const508 void CPSG_Request_Resolve::x_GetAbsPathRef(ostream& os) const
509 {
510     os << "/ID/resolve?" << m_BioId << "&fmt=json";
511 
512     auto value = "yes";
513     auto include_info = m_IncludeInfo;
514     const auto max_bit = (numeric_limits<unsigned>::max() >> 1) + 1;
515 
516     if (include_info & CPSG_Request_Resolve::TIncludeInfo(max_bit)) {
517         os << "&all_info=yes";
518         value = "no";
519         include_info = ~include_info;
520     }
521 
522     if (include_info & CPSG_Request_Resolve::fCanonicalId)  os << "&canon_id=" << value;
523     if (include_info & CPSG_Request_Resolve::fName)         os << "&name=" << value;
524     if (include_info & CPSG_Request_Resolve::fOtherIds)     os << "&seq_ids=" << value;
525     if (include_info & CPSG_Request_Resolve::fMoleculeType) os << "&mol_type=" << value;
526     if (include_info & CPSG_Request_Resolve::fLength)       os << "&length=" << value;
527     if (include_info & CPSG_Request_Resolve::fChainState)   os << "&seq_state=" << value;
528     if (include_info & CPSG_Request_Resolve::fState)        os << "&state=" << value;
529     if (include_info & CPSG_Request_Resolve::fBlobId)       os << "&blob_id=" << value;
530     if (include_info & CPSG_Request_Resolve::fTaxId)        os << "&tax_id=" << value;
531     if (include_info & CPSG_Request_Resolve::fHash)         os << "&hash=" << value;
532     if (include_info & CPSG_Request_Resolve::fDateChanged)  os << "&date_changed=" << value;
533     if (include_info & CPSG_Request_Resolve::fGi)           os << "&gi=" << value;
534 
535     os << s_GetAccSubstitution(m_AccSubstitution);
536 }
537 
x_GetAbsPathRef(ostream & os) const538 void CPSG_Request_Blob::x_GetAbsPathRef(ostream& os) const
539 {
540     os << "/ID/getblob?" << m_BlobId;
541 
542     if (const auto tse = s_GetTSE(m_IncludeData)) os << "&tse=" << tse;
543 }
544 
x_GetAbsPathRef(ostream & os) const545 void CPSG_Request_NamedAnnotInfo::x_GetAbsPathRef(ostream& os) const
546 {
547     os << "/ID/get_na?" << m_BioId << "&names=";
548 
549     for (const auto& name : m_AnnotNames) {
550         os << name << ",";
551     }
552 
553     // Remove last comma (there must be some output after seekp to succeed)
554     os.seekp(-1, ios_base::cur);
555 
556     if (const auto tse = s_GetTSE(m_IncludeData)) os << "&tse=" << tse;
557 
558     os << s_GetAccSubstitution(m_AccSubstitution);
559 }
560 
x_GetAbsPathRef(ostream & os) const561 void CPSG_Request_Chunk::x_GetAbsPathRef(ostream& os) const
562 {
563     os << "/ID/get_tse_chunk?" << m_ChunkId;
564 }
565 
SendRequest(shared_ptr<const CPSG_Request> user_request,const CDeadline & deadline)566 bool CPSG_Queue::SImpl::SendRequest(shared_ptr<const CPSG_Request> user_request, const CDeadline& deadline)
567 {
568     auto& ioc = m_Service.ioc;
569     auto& params = ioc.params;
570 
571     auto user_context = params.client_mode == EPSG_PsgClientMode::eOff ?
572         nullptr : user_request->GetUserContext<string>();
573     const auto request_id = user_context ? *user_context : ioc.GetNewRequestId();
574     auto reply = make_shared<SPSG_Reply>(move(request_id), params);
575     auto abs_path_ref = x_GetAbsPathRef(user_request);
576     auto request = make_shared<SPSG_Request>(move(abs_path_ref), reply, user_request->m_RequestContext, params);
577 
578     if (ioc.AddRequest(request, Stopped(), deadline)) {
579         shared_ptr<CPSG_Reply> user_reply(new CPSG_Reply);
580         user_reply->m_Impl->reply = move(reply);
581         user_reply->m_Impl->user_reply = user_reply;
582         user_reply->m_Request = move(user_request);
583         Push(move(user_reply));
584         return true;
585     }
586 
587     return false;
588 }
589 
s_GetStatus(SPSG_Reply::SItem::TTS * ts,const CDeadline & deadline)590 EPSG_Status s_GetStatus(SPSG_Reply::SItem::TTS* ts, const CDeadline& deadline)
591 {
592     assert(ts);
593 
594     auto& state = ts->GetMTSafe().state;
595 
596     do {
597         switch (state.GetState()) {
598             case SPSG_Reply::SState::eNotFound:   return EPSG_Status::eNotFound;
599             case SPSG_Reply::SState::eError:      return EPSG_Status::eError;
600             case SPSG_Reply::SState::eSuccess:    return EPSG_Status::eSuccess;
601             case SPSG_Reply::SState::eInProgress: break;
602         }
603     }
604     while (state.change.WaitUntil(deadline));
605 
606     return EPSG_Status::eInProgress;
607 }
608 
GetStatus(CDeadline deadline) const609 EPSG_Status CPSG_ReplyItem::GetStatus(CDeadline deadline) const
610 {
611     assert(m_Impl);
612 
613     return s_GetStatus(m_Impl->item, deadline);
614 }
615 
GetNextMessage() const616 string CPSG_ReplyItem::GetNextMessage() const
617 {
618     assert(m_Impl);
619     assert(m_Impl->item);
620 
621     return m_Impl->item->GetLock()->state.GetError();
622 }
623 
~CPSG_ReplyItem()624 CPSG_ReplyItem::~CPSG_ReplyItem()
625 {
626 }
627 
CPSG_ReplyItem(EType type)628 CPSG_ReplyItem::CPSG_ReplyItem(EType type) :
629     m_Type(type)
630 {
631 }
632 
633 
CPSG_BlobData(unique_ptr<CPSG_DataId> id)634 CPSG_BlobData::CPSG_BlobData(unique_ptr<CPSG_DataId> id) :
635     CPSG_ReplyItem(eBlobData),
636     m_Id(move(id))
637 {
638 }
639 
640 
CPSG_BlobInfo(unique_ptr<CPSG_DataId> id)641 CPSG_BlobInfo::CPSG_BlobInfo(unique_ptr<CPSG_DataId> id) :
642     CPSG_ReplyItem(eBlobInfo),
643     m_Id(move(id))
644 {
645 }
646 
647 enum EPSG_BlobInfo_Flags
648 {
649     fPSGBI_CheckFailed = 1 << 0,
650     fPSGBI_Gzip        = 1 << 1,
651     fPSGBI_Not4Gbu     = 1 << 2,
652     fPSGBI_Withdrawn   = 1 << 3,
653     fPSGBI_Suppress    = 1 << 4,
654     fPSGBI_Dead        = 1 << 5,
655 };
656 
GetCompression() const657 string CPSG_BlobInfo::GetCompression() const
658 {
659     return m_Data.GetInteger("flags") & fPSGBI_Gzip ? "gzip" : "";
660 }
661 
GetFormat() const662 string CPSG_BlobInfo::GetFormat() const
663 {
664     return "asn.1";
665 }
666 
GetStorageSize() const667 Uint8 CPSG_BlobInfo::GetStorageSize() const
668 {
669     return static_cast<Uint8>(m_Data.GetInteger("size"));
670 }
671 
GetSize() const672 Uint8 CPSG_BlobInfo::GetSize() const
673 {
674     return static_cast<Uint8>(m_Data.GetInteger("size_unpacked"));
675 }
676 
IsDead() const677 bool CPSG_BlobInfo::IsDead() const
678 {
679     return m_Data.GetInteger("flags") & fPSGBI_Dead;
680 }
681 
IsSuppressed() const682 bool CPSG_BlobInfo::IsSuppressed() const
683 {
684     return m_Data.GetInteger("flags") & fPSGBI_Suppress;
685 }
686 
IsWithdrawn() const687 bool CPSG_BlobInfo::IsWithdrawn() const
688 {
689     return m_Data.GetInteger("flags") & fPSGBI_Withdrawn;
690 }
691 
s_GetTime(Int8 milliseconds)692 CTime s_GetTime(Int8 milliseconds)
693 {
694     return milliseconds > 0 ? CTime(static_cast<time_t>(milliseconds / kMilliSecondsPerSecond)) : CTime();
695 }
696 
GetHupReleaseDate() const697 CTime CPSG_BlobInfo::GetHupReleaseDate() const
698 {
699     return s_GetTime(m_Data.GetInteger("hup_date"));
700 }
701 
GetOwner() const702 Uint8 CPSG_BlobInfo::GetOwner() const
703 {
704     return static_cast<Uint8>(m_Data.GetInteger("owner"));
705 }
706 
GetOriginalLoadDate() const707 CTime CPSG_BlobInfo::GetOriginalLoadDate() const
708 {
709     return s_GetTime(m_Data.GetInteger("date_asn1"));
710 }
711 
GetClass() const712 objects::CBioseq_set::EClass CPSG_BlobInfo::GetClass() const
713 {
714     return static_cast<objects::CBioseq_set::EClass>(m_Data.GetInteger("class"));
715 }
716 
GetDivision() const717 string CPSG_BlobInfo::GetDivision() const
718 {
719     return m_Data.GetString("div");
720 }
721 
GetUsername() const722 string CPSG_BlobInfo::GetUsername() const
723 {
724     return m_Data.GetString("username");
725 }
726 
GetId2Info() const727 string CPSG_BlobInfo::GetId2Info() const
728 {
729     return m_Data.GetString("id2_info");
730 }
731 
GetNChunks() const732 Uint8 CPSG_BlobInfo::GetNChunks() const
733 {
734     return static_cast<Uint8>(m_Data.GetInteger("n_chunks"));
735 }
736 
737 
CPSG_SkippedBlob(CPSG_BlobId id,EReason reason)738 CPSG_SkippedBlob::CPSG_SkippedBlob(CPSG_BlobId id, EReason reason) :
739     CPSG_ReplyItem(eSkippedBlob),
740     m_Id(id),
741     m_Reason(reason)
742 {
743 }
744 
745 
CPSG_BioseqInfo()746 CPSG_BioseqInfo::CPSG_BioseqInfo()
747     : CPSG_ReplyItem(eBioseqInfo)
748 {
749 }
750 
GetCanonicalId() const751 CPSG_BioId CPSG_BioseqInfo::GetCanonicalId() const
752 {
753     return s_GetBioId(m_Data);
754 };
755 
GetOtherIds() const756 vector<CPSG_BioId> CPSG_BioseqInfo::GetOtherIds() const
757 {
758     auto seq_ids = m_Data.GetByKey("seq_ids");
759     vector<CPSG_BioId> rv;
760     bool error = !seq_ids.IsArray();
761 
762     for (CJsonIterator it = seq_ids.Iterate(); !error && it.IsValid(); it.Next()) {
763         auto seq_id = it.GetNode();
764         error = !seq_id.IsArray() || (seq_id.GetSize() != 2);
765 
766         if (!error) {
767             auto type = static_cast<CPSG_BioId::TType>(seq_id.GetAt(0).AsInteger());
768             auto content = seq_id.GetAt(1).AsString();
769             rv.emplace_back(string(objects::CSeq_id::WhichFastaTag(type))
770                             + '|' + content, type);
771         }
772     }
773 
774     if (error) {
775         auto reply = GetReply();
776         _ASSERT(reply);
777 
778         auto request = reply->GetRequest().get();
779         _ASSERT(request);
780 
781         NCBI_THROW_FMT(CPSG_Exception, eServerError, "Wrong seq_ids format: '" << seq_ids.Repr() <<
782                 "' for " << request->GetType() << " request '" << request->GetId() << '\'');
783     }
784 
785     return rv;
786 }
787 
GetMoleculeType() const788 objects::CSeq_inst::TMol CPSG_BioseqInfo::GetMoleculeType() const
789 {
790     return static_cast<objects::CSeq_inst::TMol>(m_Data.GetInteger("mol"));
791 }
792 
GetLength() const793 Uint8 CPSG_BioseqInfo::GetLength() const
794 {
795     return m_Data.GetInteger("length");
796 }
797 
GetChainState() const798 CPSG_BioseqInfo::TState CPSG_BioseqInfo::GetChainState() const
799 {
800     return static_cast<TState>(m_Data.GetInteger("seq_state"));
801 }
802 
GetState() const803 CPSG_BioseqInfo::TState CPSG_BioseqInfo::GetState() const
804 {
805     return static_cast<TState>(m_Data.GetInteger("state"));
806 }
807 
GetBlobId() const808 CPSG_BlobId CPSG_BioseqInfo::GetBlobId() const
809 {
810     return s_GetBlobId(m_Data);
811 }
812 
GetTaxId() const813 TTaxId CPSG_BioseqInfo::GetTaxId() const
814 {
815     return TAX_ID_FROM(Int8, m_Data.GetInteger("tax_id"));
816 }
817 
GetHash() const818 int CPSG_BioseqInfo::GetHash() const
819 {
820     return static_cast<int>(m_Data.GetInteger("hash"));
821 }
822 
GetDateChanged() const823 CTime CPSG_BioseqInfo::GetDateChanged() const
824 {
825     return s_GetTime(m_Data.GetInteger("date_changed"));
826 }
827 
GetGi() const828 TGi CPSG_BioseqInfo::GetGi() const
829 {
830     return static_cast<TGi>(m_Data.GetInteger("gi"));
831 }
832 
IncludedInfo() const833 CPSG_Request_Resolve::TIncludeInfo CPSG_BioseqInfo::IncludedInfo() const
834 {
835     CPSG_Request_Resolve::TIncludeInfo rv = {};
836 
837     if (m_Data.HasKey("accession") && m_Data.HasKey("seq_id_type"))       rv |= CPSG_Request_Resolve::fCanonicalId;
838     if (m_Data.HasKey("name"))                                            rv |= CPSG_Request_Resolve::fName;
839     if (m_Data.HasKey("seq_ids") && m_Data.GetByKey("seq_ids").GetSize()) rv |= CPSG_Request_Resolve::fOtherIds;
840     if (m_Data.HasKey("mol"))                                             rv |= CPSG_Request_Resolve::fMoleculeType;
841     if (m_Data.HasKey("length"))                                          rv |= CPSG_Request_Resolve::fLength;
842     if (m_Data.HasKey("seq_state"))                                       rv |= CPSG_Request_Resolve::fChainState;
843     if (m_Data.HasKey("state"))                                           rv |= CPSG_Request_Resolve::fState;
844     if (m_Data.HasKey("blob_id") ||
845         (m_Data.HasKey("sat") && m_Data.HasKey("sat_key")))               rv |= CPSG_Request_Resolve::fBlobId;
846     if (m_Data.HasKey("tax_id"))                                          rv |= CPSG_Request_Resolve::fTaxId;
847     if (m_Data.HasKey("hash"))                                            rv |= CPSG_Request_Resolve::fHash;
848     if (m_Data.HasKey("date_changed"))                                    rv |= CPSG_Request_Resolve::fDateChanged;
849     if (m_Data.HasKey("gi"))                                              rv |= CPSG_Request_Resolve::fGi;
850 
851     return rv;
852 }
853 
854 
CPSG_NamedAnnotInfo(string name)855 CPSG_NamedAnnotInfo::CPSG_NamedAnnotInfo(string name) :
856     CPSG_ReplyItem(eNamedAnnotInfo),
857     m_Name(move(name))
858 {
859 }
860 
861 
GetId2AnnotInfo() const862 string CPSG_NamedAnnotInfo::GetId2AnnotInfo() const
863 {
864     auto node = m_Data.GetByKeyOrNull("seq_annot_info");
865     return node && node.IsString() ? node.AsString() : string();
866 }
867 
868 
GetId2AnnotInfoList() const869 CPSG_NamedAnnotInfo::TId2AnnotInfoList CPSG_NamedAnnotInfo::GetId2AnnotInfoList() const
870 {
871     TId2AnnotInfoList ret;
872     auto info_str = GetId2AnnotInfo();
873     if (!info_str.empty()) {
874         auto in_string = NStr::Base64Decode(info_str);
875         istringstream in_stream(in_string);
876         CObjectIStreamAsnBinary in(in_stream);
877         while ( in.HaveMoreData() ) {
878             CRef<TId2AnnotInfo> info(new TId2AnnotInfo);
879             in >> *info;
880             ret.push_back(info);
881         }
882     }
883     return ret;
884 }
885 
886 
GetAnnotatedId() const887 CPSG_BioId CPSG_NamedAnnotInfo::GetAnnotatedId() const
888 {
889     return s_GetBioId(m_Data);
890 }
891 
GetRange() const892 CRange<TSeqPos> CPSG_NamedAnnotInfo::GetRange() const
893 {
894     if (auto start = static_cast<TSeqPos>(m_Data.GetInteger("start"))) {
895         if (auto stop = static_cast<TSeqPos>(m_Data.GetInteger("stop"))) {
896             return { start, stop };
897         }
898     }
899 
900     return CRange<TSeqPos>::GetWhole();
901 }
902 
GetBlobId() const903 CPSG_BlobId CPSG_NamedAnnotInfo::GetBlobId() const
904 {
905     return s_GetBlobId(m_Data);
906 }
907 
908 template <class TResult>
909 struct SAnnotInfoProcessor
910 {
911     using TAction = function<bool(const CJsonNode&, TResult& result)>;
912     using TActions = map<int, TAction>;
913 
SAnnotInfoProcessorSAnnotInfoProcessor914     SAnnotInfoProcessor(TActions actions) : m_Actions(move(actions)) {}
915 
916     TResult operator()(const CPSG_ReplyItem* item, const CJsonNode& data) const;
917 
918 private:
919     void ThrowError(const CPSG_ReplyItem* item, const CJsonNode& annot_info) const;
920 
921     TActions m_Actions;
922 };
923 
924 template <class TResult>
operator ()(const CPSG_ReplyItem * item,const CJsonNode & data) const925 TResult SAnnotInfoProcessor<TResult>::operator()(const CPSG_ReplyItem* item, const CJsonNode& data) const
926 {
927     auto annot_info_str(NStr::Unescape(data.GetString("annot_info")));
928     auto annot_info(CJsonNode::ParseJSON(annot_info_str, CJsonNode::fStandardJson));
929 
930     if (!annot_info.IsObject()) ThrowError(item, annot_info);
931 
932     TResult result;
933 
934     for (CJsonIterator it = annot_info.Iterate(); it.IsValid(); it.Next()) {
935         auto key = stoi(it.GetKey());
936         auto found = m_Actions.find(key);
937 
938         if (found != m_Actions.end()) {
939             auto node = it.GetNode();
940 
941             if (!found->second(node, result)) {
942                 ThrowError(item, annot_info);
943             }
944         }
945     }
946 
947     return result;
948 }
949 
950 template <class TResult>
ThrowError(const CPSG_ReplyItem * item,const CJsonNode & annot_info) const951 void SAnnotInfoProcessor<TResult>::ThrowError(const CPSG_ReplyItem* item, const CJsonNode& annot_info) const
952 {
953     _ASSERT(item);
954 
955     auto reply = item->GetReply();
956     _ASSERT(reply);
957 
958     auto request = reply->GetRequest().get();
959     _ASSERT(request);
960 
961     NCBI_THROW_FMT(CPSG_Exception, eServerError, "Wrong annot_info format: '" << annot_info.Repr() <<
962             "' for " << request->GetType() << " request '" << request->GetId() << '\'');
963 }
964 
s_GetZoomLevels(const CJsonNode & annot_data,CPSG_NamedAnnotInfo::TZoomLevels & result)965 bool s_GetZoomLevels(const CJsonNode& annot_data, CPSG_NamedAnnotInfo::TZoomLevels& result)
966 {
967     if (!annot_data.IsArray()) return false;
968 
969     for (CJsonIterator it = annot_data.Iterate(); it.IsValid(); it.Next()) {
970         auto zoom_level = it.GetNode();
971 
972         if (!zoom_level.IsInteger()) return false;
973 
974         result.push_back(static_cast<unsigned>(zoom_level.AsInteger()));
975     }
976 
977     return true;
978 }
979 
980 template <objects::CSeq_annot::C_Data::E_Choice kAnnot>
s_GetTypeAndSubtype(const CJsonNode & annot_data,CPSG_NamedAnnotInfo::TAnnotInfoList & result)981 bool s_GetTypeAndSubtype(const CJsonNode& annot_data, CPSG_NamedAnnotInfo::TAnnotInfoList& result)
982 {
983     if (!annot_data.IsObject()) return false;
984 
985     for (CJsonIterator it = annot_data.Iterate(); it.IsValid(); it.Next()) {
986         auto type = stoi(it.GetKey());
987         auto subtypes = it.GetNode();
988 
989         if (!subtypes.IsArray()) return false;
990 
991         for (CJsonIterator sub_it = subtypes.Iterate(); sub_it.IsValid(); sub_it.Next()) {
992             auto subtype_node = sub_it.GetNode();
993 
994             if (!subtype_node.IsInteger()) return false;
995 
996             auto subtype = static_cast<int>(subtype_node.AsInteger());
997             result.push_back({ kAnnot, type, subtype });
998         }
999     }
1000 
1001     return true;
1002 }
1003 
1004 template <objects::CSeq_annot::C_Data::E_Choice kAnnot>
s_GetType(const CJsonNode & annot_data,CPSG_NamedAnnotInfo::TAnnotInfoList & result)1005 bool s_GetType(const CJsonNode& annot_data, CPSG_NamedAnnotInfo::TAnnotInfoList& result)
1006 {
1007     if (!annot_data.IsArray()) return false;
1008 
1009     for (CJsonIterator it = annot_data.Iterate(); it.IsValid(); it.Next()) {
1010         auto type_node = it.GetNode();
1011 
1012         if (!type_node.IsInteger()) return false;
1013 
1014         auto type = static_cast<int>(type_node.AsInteger());
1015         result.push_back({ kAnnot, type, 0 });
1016     }
1017 
1018     return true;
1019 }
1020 
GetZoomLevels() const1021 CPSG_NamedAnnotInfo::TZoomLevels CPSG_NamedAnnotInfo::GetZoomLevels() const
1022 {
1023     static SAnnotInfoProcessor<TZoomLevels> processor
1024     ({
1025         { 2048, &s_GetZoomLevels } // Special value for DensityGraph
1026     });
1027 
1028     return processor(this, m_Data);
1029 }
1030 
GetAnnotInfoList() const1031 CPSG_NamedAnnotInfo::TAnnotInfoList CPSG_NamedAnnotInfo::GetAnnotInfoList() const
1032 {
1033     using TAnnotType = SAnnotInfo::TAnnotType;
1034 
1035     static SAnnotInfoProcessor<TAnnotInfoList> processor
1036     ({
1037         { TAnnotType::e_Ftable,    &s_GetTypeAndSubtype<TAnnotType::e_Ftable>    },
1038         { TAnnotType::e_Align,     &s_GetType          <TAnnotType::e_Align>     },
1039         { TAnnotType::e_Graph,     &s_GetType          <TAnnotType::e_Graph>     },
1040         { TAnnotType::e_Seq_table, &s_GetTypeAndSubtype<TAnnotType::e_Seq_table> },
1041     });
1042 
1043     return processor(this, m_Data);
1044 }
1045 
1046 
CPSG_PublicComment(unique_ptr<CPSG_DataId> id,string text)1047 CPSG_PublicComment::CPSG_PublicComment(unique_ptr<CPSG_DataId> id, string text) :
1048     CPSG_ReplyItem(ePublicComment),
1049     m_Id(move(id)),
1050     m_Text(move(text))
1051 {
1052 }
1053 
1054 
CPSG_Reply()1055 CPSG_Reply::CPSG_Reply() :
1056     m_Impl(new SImpl)
1057 {
1058 }
1059 
GetStatus(CDeadline deadline) const1060 EPSG_Status CPSG_Reply::GetStatus(CDeadline deadline) const
1061 {
1062     assert(m_Impl);
1063 
1064     return s_GetStatus(&m_Impl->reply->reply_item, deadline);
1065 }
1066 
GetNextMessage() const1067 string CPSG_Reply::GetNextMessage() const
1068 {
1069     assert(m_Impl);
1070     assert(m_Impl->reply);
1071 
1072     return m_Impl->reply->reply_item.GetLock()->state.GetError();
1073 }
1074 
GetNextItem(CDeadline deadline)1075 shared_ptr<CPSG_ReplyItem> CPSG_Reply::GetNextItem(CDeadline deadline)
1076 {
1077     assert(m_Impl);
1078     assert(m_Impl->reply);
1079 
1080     auto& reply_item = m_Impl->reply->reply_item;
1081     auto& reply_state = reply_item.GetMTSafe().state;
1082 
1083     do {
1084         bool was_in_progress = reply_state.InProgress();
1085 
1086         if (auto items_locked = m_Impl->reply->items.GetLock()) {
1087             auto& items = *items_locked;
1088 
1089             for (auto& item_ts : items) {
1090                 const auto& item_state = item_ts.GetMTSafe().state;
1091 
1092                 if (item_state.Returned()) continue;
1093 
1094                 if (item_state.Empty()) {
1095                     auto item_locked = item_ts.GetLock();
1096                     auto& item = *item_locked;
1097 
1098                     // Wait for more chunks on this item
1099                     if (!item.expected.Cmp<less_equal>(item.received)) continue;
1100                 }
1101 
1102                 // Do not hold lock on item_ts around this call!
1103                 if (auto rv = m_Impl->Create(&item_ts)) {
1104                     return rv;
1105                 }
1106 
1107                 continue;
1108             }
1109         }
1110 
1111         // No more reply items
1112         if (!was_in_progress) {
1113             return shared_ptr<CPSG_ReplyItem>(new CPSG_ReplyItem(CPSG_ReplyItem::eEndOfReply));
1114         }
1115     }
1116     while (reply_item.WaitUntil(reply_state.GetState(), deadline, SPSG_Reply::SState::eInProgress, true));
1117 
1118     return {};
1119 }
1120 
~CPSG_Reply()1121 CPSG_Reply::~CPSG_Reply()
1122 {
1123 }
1124 
1125 
1126 CPSG_Queue::CPSG_Queue() = default;
1127 CPSG_Queue::CPSG_Queue(CPSG_Queue&&) = default;
1128 CPSG_Queue& CPSG_Queue::operator=(CPSG_Queue&&) = default;
1129 CPSG_Queue::~CPSG_Queue() = default;
1130 
CPSG_Queue(const string & service)1131 CPSG_Queue::CPSG_Queue(const string& service) :
1132     m_Impl(new SImpl(service))
1133 {
1134 }
1135 
SendRequest(shared_ptr<CPSG_Request> request,CDeadline deadline)1136 bool CPSG_Queue::SendRequest(shared_ptr<CPSG_Request> request, CDeadline deadline)
1137 {
1138     _ASSERT(m_Impl);
1139     return m_Impl->SendRequest(const_pointer_cast<const CPSG_Request>(request), deadline);
1140 }
1141 
GetNextReply(CDeadline deadline)1142 shared_ptr<CPSG_Reply> CPSG_Queue::GetNextReply(CDeadline deadline)
1143 {
1144     _ASSERT(m_Impl);
1145 
1146     shared_ptr<CPSG_Reply> rv;
1147     m_Impl->Pop(rv, deadline);
1148     return rv;
1149 }
1150 
Stop()1151 void CPSG_Queue::Stop()
1152 {
1153     _ASSERT(m_Impl);
1154     m_Impl->Stop(CPSG_Queue::SImpl::eDrain);
1155 }
1156 
Reset()1157 void CPSG_Queue::Reset()
1158 {
1159     _ASSERT(m_Impl);
1160     m_Impl->Stop(CPSG_Queue::SImpl::eClear);
1161 }
1162 
IsEmpty() const1163 bool CPSG_Queue::IsEmpty() const
1164 {
1165     _ASSERT(m_Impl);
1166     return m_Impl->Empty();
1167 }
1168 
RejectsRequests() const1169 bool CPSG_Queue::RejectsRequests() const
1170 {
1171     _ASSERT(m_Impl);
1172     return m_Impl->RejectsRequests();
1173 }
1174 
1175 
GetApiLock()1176 CPSG_Queue::TApiLock CPSG_Queue::GetApiLock()
1177 {
1178     return SImpl::GetApiLock();
1179 }
1180 
1181 
1182 END_NCBI_SCOPE
1183 
1184 #endif
1185