1 /* $Id: psg_client.cpp 628921 2021-04-07 18:46:41Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Rafael Sadyrov
27 *
28 */
29
30 #include <ncbi_pch.hpp>
31
32 #include <objtools/pubseq_gateway/client/psg_client.hpp>
33
34 #ifdef HAVE_PSG_CLIENT
35
36 #include <condition_variable>
37 #include <mutex>
38 #include <sstream>
39 #include <string>
40 #include <thread>
41 #include <type_traits>
42
43 #include <corelib/ncbitime.hpp>
44 #include <corelib/ncbi_base64.h>
45 #include <connect/ncbi_socket.hpp>
46 #include <connect/ncbi_service.h>
47 #include <connect/ncbi_connutil.h>
48
49 #include <serial/serial.hpp>
50 #include <serial/objistrasnb.hpp>
51 #undef ThrowError // unfortunately
52
53 #include "psg_client_impl.hpp"
54
55 BEGIN_NCBI_SCOPE
56
57
GetErrCodeString(void) const58 const char* CPSG_Exception::GetErrCodeString(void) const
59 {
60 switch (GetErrCode())
61 {
62 case eTimeout: return "eTimeout";
63 case eServerError: return "eServerError";
64 case eInternalError: return "eInternalError";
65 case eParameterMissing: return "eParameterMissing";
66 default: return CException::GetErrCodeString();
67 }
68 }
69
70
SPSG_BlobReader(SPSG_Reply::SItem::TTS * src)71 SPSG_BlobReader::SPSG_BlobReader(SPSG_Reply::SItem::TTS* src)
72 : m_Src(src)
73 {
74 assert(src);
75 }
76
x_Read(void * buf,size_t count,size_t * bytes_read)77 ERW_Result SPSG_BlobReader::x_Read(void* buf, size_t count, size_t* bytes_read)
78 {
79 assert(bytes_read);
80
81 *bytes_read = 0;
82
83 CheckForNewChunks();
84
85 for (; m_Chunk < m_Data.size(); ++m_Chunk) {
86 auto& data = m_Data[m_Chunk];
87
88 // Chunk has not been received yet
89 if (data.empty()) return eRW_Success;
90
91 auto available = data.size() - m_Index;
92 auto to_copy = min(count, available);
93
94 memcpy(buf, data.data() + m_Index, to_copy);
95 buf = (char*)buf + to_copy;
96 count -= to_copy;
97 *bytes_read += to_copy;
98 m_Index += to_copy;
99
100 if (!count) return eRW_Success;
101
102 m_Index = 0;
103 }
104
105 auto src_locked = m_Src->GetLock();
106 return src_locked->expected.Cmp<equal_to>(src_locked->received) ? eRW_Eof : eRW_Success;
107 }
108
Read(void * buf,size_t count,size_t * bytes_read)109 ERW_Result SPSG_BlobReader::Read(void* buf, size_t count, size_t* bytes_read)
110 {
111 size_t read;
112 const auto kSeconds = TPSG_ReaderTimeout::GetDefault();
113 CDeadline deadline(kSeconds);
114
115 do {
116 auto rv = x_Read(buf, count, &read);
117
118 if ((rv != eRW_Success) || (read != 0)) {
119 if (bytes_read) *bytes_read = read;
120 return rv;
121 }
122 }
123 while (m_Src->WaitUntil(deadline));
124
125 NCBI_THROW_FMT(CPSG_Exception, eTimeout, "Timeout on reading (after " << kSeconds << " seconds)");
126 return eRW_Error;
127 }
128
PendingCount(size_t * count)129 ERW_Result SPSG_BlobReader::PendingCount(size_t* count)
130 {
131 assert(count);
132
133 *count = 0;
134
135 CheckForNewChunks();
136
137 auto k = m_Index;
138
139 for (auto i = m_Chunk; i < m_Data.size(); ++i) {
140 auto& data = m_Data[i];
141
142 // Chunk has not been received yet
143 if (data.empty()) return eRW_Success;
144
145 *count += data.size() - k;
146 k = 0;
147 }
148
149 return eRW_Success;
150 }
151
CheckForNewChunks()152 void SPSG_BlobReader::CheckForNewChunks()
153 {
154 if (m_Src->GetMTSafe().state.Empty()) return;
155
156 auto src_locked = m_Src->GetLock();
157 auto& src = *src_locked;
158 auto& chunks = src.chunks;
159
160 if (m_Data.size() < chunks.size()) m_Data.resize(chunks.size());
161
162 for (size_t i = 0; i < chunks.size(); ++i) {
163 if (!chunks[i].empty()) {
164 m_Data[i].swap(chunks[i]);
165 }
166 }
167 }
168
169
170 static_assert(is_nothrow_move_constructible<CPSG_BioId>::value, "CPSG_BioId move constructor must be noexcept");
171
Repr() const172 string CPSG_BioId::Repr() const
173 {
174 return m_Type == TType::e_not_set ? m_Id : m_Id + '~' + to_string(m_Type);
175 }
176
s_GetBioId(const CJsonNode & data)177 CPSG_BioId s_GetBioId(const CJsonNode& data)
178 {
179 auto type = static_cast<CPSG_BioId::TType>(data.GetInteger("seq_id_type"));
180 auto accession = data.GetString("accession");
181 auto name_node = data.GetByKeyOrNull("name");
182 auto name = name_node && name_node.IsString() ? name_node.AsString() : string();
183 auto version = static_cast<int>(data.GetInteger("version"));
184 return { objects::CSeq_id(type, accession, name, version).AsFastaString(), type };
185 };
186
operator <<(ostream & os,const CPSG_BioId & bio_id)187 ostream& operator<<(ostream& os, const CPSG_BioId& bio_id)
188 {
189 if (bio_id.GetType()) os << "seq_id_type=" << bio_id.GetType() << '&';
190 return os << "seq_id=" << bio_id.GetId();
191 }
192
193
194 static_assert(is_nothrow_move_constructible<CPSG_BlobId>::value, "CPSG_BlobId move constructor must be noexcept");
195
Repr() const196 string CPSG_BlobId::Repr() const
197 {
198 return m_LastModified.IsNull() ? m_Id : m_Id + '~' + to_string(m_LastModified.GetValue());
199 }
200
s_GetDataId(const SPSG_Args & args)201 unique_ptr<CPSG_DataId> s_GetDataId(const SPSG_Args& args)
202 {
203 try {
204 const auto& blob_id = args.GetValue("blob_id");
205
206 if (blob_id.empty()) {
207 auto id2_chunk = NStr::StringToNumeric<Uint8>(args.GetValue("id2_chunk"));
208 return unique_ptr<CPSG_DataId>(new CPSG_ChunkId(id2_chunk, args.GetValue("id2_info")));
209 }
210
211 CPSG_BlobId::TLastModified last_modified;
212 const auto& last_modified_str = args.GetValue("last_modified");
213
214 if (!last_modified_str.empty()) {
215 last_modified = NStr::StringToNumeric<Int8>(last_modified_str);
216 }
217
218 return unique_ptr<CPSG_DataId>(new CPSG_BlobId(blob_id, move(last_modified)));
219 }
220 catch (...) {
221 NCBI_THROW_FMT(CPSG_Exception, eServerError,
222 "Both blob_id[+last_modified] and id2_chunk+id2_info pairs are missing/corrupted in server response: " <<
223 args.GetQueryString(CUrlArgs::eAmp_Char));
224 }
225 }
226
s_GetBlobId(const CJsonNode & data)227 CPSG_BlobId s_GetBlobId(const CJsonNode& data)
228 {
229 CPSG_BlobId::TLastModified last_modified;
230
231 if (data.HasKey("last_modified")) {
232 last_modified = data.GetInteger("last_modified");
233 }
234
235 if (data.HasKey("blob_id")) {
236 return { data.GetString("blob_id"), last_modified };
237 }
238
239 auto sat = static_cast<int>(data.GetInteger("sat"));
240 auto sat_key = static_cast<int>(data.GetInteger("sat_key"));
241 return { sat, sat_key, last_modified };
242 }
243
operator <<(ostream & os,const CPSG_BlobId & blob_id)244 ostream& operator<<(ostream& os, const CPSG_BlobId& blob_id)
245 {
246 if (!blob_id.GetLastModified().IsNull()) os << "last_modified=" << blob_id.GetLastModified().GetValue() << '&';
247 return os << "blob_id=" << blob_id.GetId();
248 }
249
250
Repr() const251 string CPSG_ChunkId::Repr() const
252 {
253 return to_string(m_Id2Chunk) + '~' + m_Id2Info;
254 }
255
operator <<(ostream & os,const CPSG_ChunkId & chunk_id)256 ostream& operator<<(ostream& os, const CPSG_ChunkId& chunk_id)
257 {
258 return os << "id2_chunk=" << chunk_id.GetId2Chunk() << "&id2_info=" << chunk_id.GetId2Info();
259 }
260
261
262 template <class TReplyItem>
CreateImpl(TReplyItem * item,const vector<SPSG_Chunk> & chunks)263 TReplyItem* CPSG_Reply::SImpl::CreateImpl(TReplyItem* item, const vector<SPSG_Chunk>& chunks)
264 {
265 if (chunks.empty()) return item;
266
267 unique_ptr<TReplyItem> rv(item);
268 rv->m_Data = CJsonNode::ParseJSON(chunks.front(), CJsonNode::fStandardJson);
269
270 return rv.release();
271 }
272
273 struct SItemTypeAndReason : pair<CPSG_ReplyItem::EType, CPSG_SkippedBlob::EReason>
274 {
275 static SItemTypeAndReason Get(const SPSG_Args& args);
276
277 private:
278 using TBase = pair<CPSG_ReplyItem::EType, CPSG_SkippedBlob::EReason>;
279 using TBase::TBase;
SItemTypeAndReasonSItemTypeAndReason280 SItemTypeAndReason(CPSG_ReplyItem::EType type) : TBase(type, CPSG_SkippedBlob::eUnknown) {}
281 };
282
Get(const SPSG_Args & args)283 SItemTypeAndReason SItemTypeAndReason::Get(const SPSG_Args& args)
284 {
285 const auto item_type = args.GetValue("item_type");
286
287 if (item_type == "blob") {
288 const auto reason = args.GetValue("reason");
289
290 if (reason.empty()) {
291 return CPSG_ReplyItem::eBlobData;
292
293 } else if (reason == "excluded") {
294 return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eExcluded };
295
296 } else if (reason == "inprogress") {
297 return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eInProgress };
298
299 } else if (reason == "sent") {
300 return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eSent };
301
302 } else {
303 return { CPSG_ReplyItem::eSkippedBlob, CPSG_SkippedBlob::eUnknown };
304 }
305
306 } else if (item_type == "bioseq_info") {
307 return CPSG_ReplyItem::eBioseqInfo;
308
309 } else if (item_type == "blob_prop") {
310 return CPSG_ReplyItem::eBlobInfo;
311
312 } else if (item_type == "bioseq_na") {
313 return CPSG_ReplyItem::eNamedAnnotInfo;
314
315 } else if (item_type == "public_comment") {
316 return CPSG_ReplyItem::ePublicComment;
317
318 } else if (item_type == "processor") {
319 return CPSG_ReplyItem::eProcessor;
320
321 } else {
322 if (TPSG_FailOnUnknownItems::GetDefault()) {
323 NCBI_THROW_FMT(CPSG_Exception, eServerError, "Received unknown item type: " << item_type);
324 }
325
326 static atomic_bool reported(false);
327
328 if (!reported.exchange(true)) {
329 ERR_POST("Received unknown item type: " << item_type);
330 }
331
332 return CPSG_ReplyItem::eEndOfReply;
333 }
334 }
335
Create(SPSG_Reply::SItem::TTS * item_ts)336 shared_ptr<CPSG_ReplyItem> CPSG_Reply::SImpl::Create(SPSG_Reply::SItem::TTS* item_ts)
337 {
338 auto user_reply_locked = user_reply.lock();
339
340 assert(user_reply_locked);
341 assert(item_ts);
342
343 auto item_locked = item_ts->GetLock();
344
345 item_locked->state.SetReturned();
346
347 unique_ptr<CPSG_ReplyItem::SImpl> impl(new CPSG_ReplyItem::SImpl);
348 impl->item = item_ts;
349
350 shared_ptr<CPSG_ReplyItem> rv;
351
352 auto& chunks = item_locked->chunks;
353 auto& args = item_locked->args;
354 auto processor_id = args.GetValue("processor_id");
355
356 const auto& state = item_locked->state.GetState();
357 const auto itar = SItemTypeAndReason::Get(args);
358
359 if ((state == SPSG_Reply::SState::eNotFound) || (state == SPSG_Reply::SState::eError)) {
360 rv.reset(new CPSG_ReplyItem(itar.first));
361
362 } else if (itar.first == CPSG_ReplyItem::eBlobData) {
363 unique_ptr<CPSG_BlobData> blob_data(new CPSG_BlobData(s_GetDataId(args)));
364 blob_data->m_Stream.reset(new SPSG_RStream(item_ts));
365 rv.reset(blob_data.release());
366
367 } else if (itar.first == CPSG_ReplyItem::eSkippedBlob) {
368 auto data_id = s_GetDataId(args);
369 auto blob_id = move(dynamic_cast<CPSG_BlobId&>(*data_id));
370 rv.reset(new CPSG_SkippedBlob(move(blob_id), itar.second));
371
372 } else if (itar.first == CPSG_ReplyItem::eBioseqInfo) {
373 rv.reset(CreateImpl(new CPSG_BioseqInfo, chunks));
374
375 } else if (itar.first == CPSG_ReplyItem::eBlobInfo) {
376 rv.reset(CreateImpl(new CPSG_BlobInfo(s_GetDataId(args)), chunks));
377
378 } else if (itar.first == CPSG_ReplyItem::eNamedAnnotInfo) {
379 auto name = args.GetValue("na");
380 rv.reset(CreateImpl(new CPSG_NamedAnnotInfo(name), chunks));
381
382 } else if (itar.first == CPSG_ReplyItem::ePublicComment) {
383 auto text = chunks.empty() ? string() : chunks.front();
384 rv.reset(new CPSG_PublicComment(s_GetDataId(args), text));
385
386 } else if (itar.first == CPSG_ReplyItem::eProcessor) {
387 rv.reset(new CPSG_ReplyItem(CPSG_ReplyItem::eProcessor));
388
389 } else {
390 return rv;
391 }
392
393 rv->m_Impl.reset(impl.release());
394 rv->m_Reply = user_reply_locked;
395 rv->m_ProcessorId = move(processor_id);
396 return rv;
397 }
398
399
400 pair<mutex, weak_ptr<CPSG_Queue::SImpl::CService::TMap>> CPSG_Queue::SImpl::CService::sm_Instance;
401
GetIoC(const string & service)402 SPSG_IoCoordinator& CPSG_Queue::SImpl::CService::GetIoC(const string& service)
403 {
404 if (service.empty()) {
405 NCBI_THROW(CPSG_Exception, eParameterMissing, "Service name is empty");
406 }
407
408 unique_lock<mutex> lock(sm_Instance.first);
409
410 auto found = m_Map->find(service);
411
412 if (found != m_Map->end()) {
413 return *found->second;
414 }
415
416 auto created = m_Map->emplace(service, unique_ptr<SPSG_IoCoordinator>(new SPSG_IoCoordinator(service)));
417 return *created.first->second;
418 }
419
GetMap()420 shared_ptr<CPSG_Queue::SImpl::CService::TMap> CPSG_Queue::SImpl::CService::GetMap()
421 {
422 unique_lock<mutex> lock(sm_Instance.first);
423
424 auto rv = sm_Instance.second.lock();
425
426 if (!rv) {
427 rv = make_shared<TMap>();
428 sm_Instance.second = rv;
429 }
430
431 return rv;
432 }
433
434
SImpl(const string & service)435 CPSG_Queue::SImpl::SImpl(const string& service) :
436 m_Service(service)
437 {
438 }
439
s_GetTSE(CPSG_Request_Biodata::EIncludeData include_data)440 const char* s_GetTSE(CPSG_Request_Biodata::EIncludeData include_data)
441 {
442 switch (include_data) {
443 case CPSG_Request_Biodata::eDefault: return nullptr;
444 case CPSG_Request_Biodata::eNoTSE: return "none";
445 case CPSG_Request_Biodata::eSlimTSE: return "slim";
446 case CPSG_Request_Biodata::eSmartTSE: return "smart";
447 case CPSG_Request_Biodata::eWholeTSE: return "whole";
448 case CPSG_Request_Biodata::eOrigTSE: return "orig";
449 }
450
451 return nullptr;
452 }
453
x_GetAbsPathRef(shared_ptr<const CPSG_Request> user_request)454 string CPSG_Queue::SImpl::x_GetAbsPathRef(shared_ptr<const CPSG_Request> user_request)
455 {
456 ostringstream os;
457 user_request->x_GetAbsPathRef(os);
458 os << m_Service.ioc.GetUrlArgs();
459
460 if (const auto hops = user_request->m_Hops) os << "&hops=" << hops;
461 return os.str();
462 }
463
s_GetAccSubstitution(EPSG_AccSubstitution acc_substitution)464 const char* s_GetAccSubstitution(EPSG_AccSubstitution acc_substitution)
465 {
466 switch (acc_substitution) {
467 case EPSG_AccSubstitution::Default: break;
468 case EPSG_AccSubstitution::Limited: return "&acc_substitution=limited";
469 case EPSG_AccSubstitution::Never: return "&acc_substitution=never";
470 }
471
472 return "";
473 }
474
475
s_GetAutoBlobSkipping(ESwitch value)476 const char* s_GetAutoBlobSkipping(ESwitch value)
477 {
478 switch (value) {
479 case eDefault: break;
480 case eOn: return "&auto_blob_skipping=yes";
481 case eOff: return "&auto_blob_skipping=no";
482 }
483
484 return "";
485 }
486
487
x_GetAbsPathRef(ostream & os) const488 void CPSG_Request_Biodata::x_GetAbsPathRef(ostream& os) const
489 {
490 os << "/ID/get?" << m_BioId;
491
492 if (const auto tse = s_GetTSE(m_IncludeData)) os << "&tse=" << tse;
493
494 if (!m_ExcludeTSEs.empty()) {
495 os << "&exclude_blobs";
496
497 char delimiter = '=';
498 for (const auto& blob_id : m_ExcludeTSEs) {
499 os << delimiter << blob_id.GetId();
500 delimiter = ',';
501 }
502 }
503
504 os << s_GetAccSubstitution(m_AccSubstitution);
505 os << s_GetAutoBlobSkipping(m_AutoBlobSkipping);
506 }
507
x_GetAbsPathRef(ostream & os) const508 void CPSG_Request_Resolve::x_GetAbsPathRef(ostream& os) const
509 {
510 os << "/ID/resolve?" << m_BioId << "&fmt=json";
511
512 auto value = "yes";
513 auto include_info = m_IncludeInfo;
514 const auto max_bit = (numeric_limits<unsigned>::max() >> 1) + 1;
515
516 if (include_info & CPSG_Request_Resolve::TIncludeInfo(max_bit)) {
517 os << "&all_info=yes";
518 value = "no";
519 include_info = ~include_info;
520 }
521
522 if (include_info & CPSG_Request_Resolve::fCanonicalId) os << "&canon_id=" << value;
523 if (include_info & CPSG_Request_Resolve::fName) os << "&name=" << value;
524 if (include_info & CPSG_Request_Resolve::fOtherIds) os << "&seq_ids=" << value;
525 if (include_info & CPSG_Request_Resolve::fMoleculeType) os << "&mol_type=" << value;
526 if (include_info & CPSG_Request_Resolve::fLength) os << "&length=" << value;
527 if (include_info & CPSG_Request_Resolve::fChainState) os << "&seq_state=" << value;
528 if (include_info & CPSG_Request_Resolve::fState) os << "&state=" << value;
529 if (include_info & CPSG_Request_Resolve::fBlobId) os << "&blob_id=" << value;
530 if (include_info & CPSG_Request_Resolve::fTaxId) os << "&tax_id=" << value;
531 if (include_info & CPSG_Request_Resolve::fHash) os << "&hash=" << value;
532 if (include_info & CPSG_Request_Resolve::fDateChanged) os << "&date_changed=" << value;
533 if (include_info & CPSG_Request_Resolve::fGi) os << "&gi=" << value;
534
535 os << s_GetAccSubstitution(m_AccSubstitution);
536 }
537
x_GetAbsPathRef(ostream & os) const538 void CPSG_Request_Blob::x_GetAbsPathRef(ostream& os) const
539 {
540 os << "/ID/getblob?" << m_BlobId;
541
542 if (const auto tse = s_GetTSE(m_IncludeData)) os << "&tse=" << tse;
543 }
544
x_GetAbsPathRef(ostream & os) const545 void CPSG_Request_NamedAnnotInfo::x_GetAbsPathRef(ostream& os) const
546 {
547 os << "/ID/get_na?" << m_BioId << "&names=";
548
549 for (const auto& name : m_AnnotNames) {
550 os << name << ",";
551 }
552
553 // Remove last comma (there must be some output after seekp to succeed)
554 os.seekp(-1, ios_base::cur);
555
556 if (const auto tse = s_GetTSE(m_IncludeData)) os << "&tse=" << tse;
557
558 os << s_GetAccSubstitution(m_AccSubstitution);
559 }
560
x_GetAbsPathRef(ostream & os) const561 void CPSG_Request_Chunk::x_GetAbsPathRef(ostream& os) const
562 {
563 os << "/ID/get_tse_chunk?" << m_ChunkId;
564 }
565
SendRequest(shared_ptr<const CPSG_Request> user_request,const CDeadline & deadline)566 bool CPSG_Queue::SImpl::SendRequest(shared_ptr<const CPSG_Request> user_request, const CDeadline& deadline)
567 {
568 auto& ioc = m_Service.ioc;
569 auto& params = ioc.params;
570
571 auto user_context = params.client_mode == EPSG_PsgClientMode::eOff ?
572 nullptr : user_request->GetUserContext<string>();
573 const auto request_id = user_context ? *user_context : ioc.GetNewRequestId();
574 auto reply = make_shared<SPSG_Reply>(move(request_id), params);
575 auto abs_path_ref = x_GetAbsPathRef(user_request);
576 auto request = make_shared<SPSG_Request>(move(abs_path_ref), reply, user_request->m_RequestContext, params);
577
578 if (ioc.AddRequest(request, Stopped(), deadline)) {
579 shared_ptr<CPSG_Reply> user_reply(new CPSG_Reply);
580 user_reply->m_Impl->reply = move(reply);
581 user_reply->m_Impl->user_reply = user_reply;
582 user_reply->m_Request = move(user_request);
583 Push(move(user_reply));
584 return true;
585 }
586
587 return false;
588 }
589
s_GetStatus(SPSG_Reply::SItem::TTS * ts,const CDeadline & deadline)590 EPSG_Status s_GetStatus(SPSG_Reply::SItem::TTS* ts, const CDeadline& deadline)
591 {
592 assert(ts);
593
594 auto& state = ts->GetMTSafe().state;
595
596 do {
597 switch (state.GetState()) {
598 case SPSG_Reply::SState::eNotFound: return EPSG_Status::eNotFound;
599 case SPSG_Reply::SState::eError: return EPSG_Status::eError;
600 case SPSG_Reply::SState::eSuccess: return EPSG_Status::eSuccess;
601 case SPSG_Reply::SState::eInProgress: break;
602 }
603 }
604 while (state.change.WaitUntil(deadline));
605
606 return EPSG_Status::eInProgress;
607 }
608
GetStatus(CDeadline deadline) const609 EPSG_Status CPSG_ReplyItem::GetStatus(CDeadline deadline) const
610 {
611 assert(m_Impl);
612
613 return s_GetStatus(m_Impl->item, deadline);
614 }
615
GetNextMessage() const616 string CPSG_ReplyItem::GetNextMessage() const
617 {
618 assert(m_Impl);
619 assert(m_Impl->item);
620
621 return m_Impl->item->GetLock()->state.GetError();
622 }
623
~CPSG_ReplyItem()624 CPSG_ReplyItem::~CPSG_ReplyItem()
625 {
626 }
627
CPSG_ReplyItem(EType type)628 CPSG_ReplyItem::CPSG_ReplyItem(EType type) :
629 m_Type(type)
630 {
631 }
632
633
CPSG_BlobData(unique_ptr<CPSG_DataId> id)634 CPSG_BlobData::CPSG_BlobData(unique_ptr<CPSG_DataId> id) :
635 CPSG_ReplyItem(eBlobData),
636 m_Id(move(id))
637 {
638 }
639
640
CPSG_BlobInfo(unique_ptr<CPSG_DataId> id)641 CPSG_BlobInfo::CPSG_BlobInfo(unique_ptr<CPSG_DataId> id) :
642 CPSG_ReplyItem(eBlobInfo),
643 m_Id(move(id))
644 {
645 }
646
647 enum EPSG_BlobInfo_Flags
648 {
649 fPSGBI_CheckFailed = 1 << 0,
650 fPSGBI_Gzip = 1 << 1,
651 fPSGBI_Not4Gbu = 1 << 2,
652 fPSGBI_Withdrawn = 1 << 3,
653 fPSGBI_Suppress = 1 << 4,
654 fPSGBI_Dead = 1 << 5,
655 };
656
GetCompression() const657 string CPSG_BlobInfo::GetCompression() const
658 {
659 return m_Data.GetInteger("flags") & fPSGBI_Gzip ? "gzip" : "";
660 }
661
GetFormat() const662 string CPSG_BlobInfo::GetFormat() const
663 {
664 return "asn.1";
665 }
666
GetStorageSize() const667 Uint8 CPSG_BlobInfo::GetStorageSize() const
668 {
669 return static_cast<Uint8>(m_Data.GetInteger("size"));
670 }
671
GetSize() const672 Uint8 CPSG_BlobInfo::GetSize() const
673 {
674 return static_cast<Uint8>(m_Data.GetInteger("size_unpacked"));
675 }
676
IsDead() const677 bool CPSG_BlobInfo::IsDead() const
678 {
679 return m_Data.GetInteger("flags") & fPSGBI_Dead;
680 }
681
IsSuppressed() const682 bool CPSG_BlobInfo::IsSuppressed() const
683 {
684 return m_Data.GetInteger("flags") & fPSGBI_Suppress;
685 }
686
IsWithdrawn() const687 bool CPSG_BlobInfo::IsWithdrawn() const
688 {
689 return m_Data.GetInteger("flags") & fPSGBI_Withdrawn;
690 }
691
s_GetTime(Int8 milliseconds)692 CTime s_GetTime(Int8 milliseconds)
693 {
694 return milliseconds > 0 ? CTime(static_cast<time_t>(milliseconds / kMilliSecondsPerSecond)) : CTime();
695 }
696
GetHupReleaseDate() const697 CTime CPSG_BlobInfo::GetHupReleaseDate() const
698 {
699 return s_GetTime(m_Data.GetInteger("hup_date"));
700 }
701
GetOwner() const702 Uint8 CPSG_BlobInfo::GetOwner() const
703 {
704 return static_cast<Uint8>(m_Data.GetInteger("owner"));
705 }
706
GetOriginalLoadDate() const707 CTime CPSG_BlobInfo::GetOriginalLoadDate() const
708 {
709 return s_GetTime(m_Data.GetInteger("date_asn1"));
710 }
711
GetClass() const712 objects::CBioseq_set::EClass CPSG_BlobInfo::GetClass() const
713 {
714 return static_cast<objects::CBioseq_set::EClass>(m_Data.GetInteger("class"));
715 }
716
GetDivision() const717 string CPSG_BlobInfo::GetDivision() const
718 {
719 return m_Data.GetString("div");
720 }
721
GetUsername() const722 string CPSG_BlobInfo::GetUsername() const
723 {
724 return m_Data.GetString("username");
725 }
726
GetId2Info() const727 string CPSG_BlobInfo::GetId2Info() const
728 {
729 return m_Data.GetString("id2_info");
730 }
731
GetNChunks() const732 Uint8 CPSG_BlobInfo::GetNChunks() const
733 {
734 return static_cast<Uint8>(m_Data.GetInteger("n_chunks"));
735 }
736
737
CPSG_SkippedBlob(CPSG_BlobId id,EReason reason)738 CPSG_SkippedBlob::CPSG_SkippedBlob(CPSG_BlobId id, EReason reason) :
739 CPSG_ReplyItem(eSkippedBlob),
740 m_Id(id),
741 m_Reason(reason)
742 {
743 }
744
745
CPSG_BioseqInfo()746 CPSG_BioseqInfo::CPSG_BioseqInfo()
747 : CPSG_ReplyItem(eBioseqInfo)
748 {
749 }
750
GetCanonicalId() const751 CPSG_BioId CPSG_BioseqInfo::GetCanonicalId() const
752 {
753 return s_GetBioId(m_Data);
754 };
755
GetOtherIds() const756 vector<CPSG_BioId> CPSG_BioseqInfo::GetOtherIds() const
757 {
758 auto seq_ids = m_Data.GetByKey("seq_ids");
759 vector<CPSG_BioId> rv;
760 bool error = !seq_ids.IsArray();
761
762 for (CJsonIterator it = seq_ids.Iterate(); !error && it.IsValid(); it.Next()) {
763 auto seq_id = it.GetNode();
764 error = !seq_id.IsArray() || (seq_id.GetSize() != 2);
765
766 if (!error) {
767 auto type = static_cast<CPSG_BioId::TType>(seq_id.GetAt(0).AsInteger());
768 auto content = seq_id.GetAt(1).AsString();
769 rv.emplace_back(string(objects::CSeq_id::WhichFastaTag(type))
770 + '|' + content, type);
771 }
772 }
773
774 if (error) {
775 auto reply = GetReply();
776 _ASSERT(reply);
777
778 auto request = reply->GetRequest().get();
779 _ASSERT(request);
780
781 NCBI_THROW_FMT(CPSG_Exception, eServerError, "Wrong seq_ids format: '" << seq_ids.Repr() <<
782 "' for " << request->GetType() << " request '" << request->GetId() << '\'');
783 }
784
785 return rv;
786 }
787
GetMoleculeType() const788 objects::CSeq_inst::TMol CPSG_BioseqInfo::GetMoleculeType() const
789 {
790 return static_cast<objects::CSeq_inst::TMol>(m_Data.GetInteger("mol"));
791 }
792
GetLength() const793 Uint8 CPSG_BioseqInfo::GetLength() const
794 {
795 return m_Data.GetInteger("length");
796 }
797
GetChainState() const798 CPSG_BioseqInfo::TState CPSG_BioseqInfo::GetChainState() const
799 {
800 return static_cast<TState>(m_Data.GetInteger("seq_state"));
801 }
802
GetState() const803 CPSG_BioseqInfo::TState CPSG_BioseqInfo::GetState() const
804 {
805 return static_cast<TState>(m_Data.GetInteger("state"));
806 }
807
GetBlobId() const808 CPSG_BlobId CPSG_BioseqInfo::GetBlobId() const
809 {
810 return s_GetBlobId(m_Data);
811 }
812
GetTaxId() const813 TTaxId CPSG_BioseqInfo::GetTaxId() const
814 {
815 return TAX_ID_FROM(Int8, m_Data.GetInteger("tax_id"));
816 }
817
GetHash() const818 int CPSG_BioseqInfo::GetHash() const
819 {
820 return static_cast<int>(m_Data.GetInteger("hash"));
821 }
822
GetDateChanged() const823 CTime CPSG_BioseqInfo::GetDateChanged() const
824 {
825 return s_GetTime(m_Data.GetInteger("date_changed"));
826 }
827
GetGi() const828 TGi CPSG_BioseqInfo::GetGi() const
829 {
830 return static_cast<TGi>(m_Data.GetInteger("gi"));
831 }
832
IncludedInfo() const833 CPSG_Request_Resolve::TIncludeInfo CPSG_BioseqInfo::IncludedInfo() const
834 {
835 CPSG_Request_Resolve::TIncludeInfo rv = {};
836
837 if (m_Data.HasKey("accession") && m_Data.HasKey("seq_id_type")) rv |= CPSG_Request_Resolve::fCanonicalId;
838 if (m_Data.HasKey("name")) rv |= CPSG_Request_Resolve::fName;
839 if (m_Data.HasKey("seq_ids") && m_Data.GetByKey("seq_ids").GetSize()) rv |= CPSG_Request_Resolve::fOtherIds;
840 if (m_Data.HasKey("mol")) rv |= CPSG_Request_Resolve::fMoleculeType;
841 if (m_Data.HasKey("length")) rv |= CPSG_Request_Resolve::fLength;
842 if (m_Data.HasKey("seq_state")) rv |= CPSG_Request_Resolve::fChainState;
843 if (m_Data.HasKey("state")) rv |= CPSG_Request_Resolve::fState;
844 if (m_Data.HasKey("blob_id") ||
845 (m_Data.HasKey("sat") && m_Data.HasKey("sat_key"))) rv |= CPSG_Request_Resolve::fBlobId;
846 if (m_Data.HasKey("tax_id")) rv |= CPSG_Request_Resolve::fTaxId;
847 if (m_Data.HasKey("hash")) rv |= CPSG_Request_Resolve::fHash;
848 if (m_Data.HasKey("date_changed")) rv |= CPSG_Request_Resolve::fDateChanged;
849 if (m_Data.HasKey("gi")) rv |= CPSG_Request_Resolve::fGi;
850
851 return rv;
852 }
853
854
CPSG_NamedAnnotInfo(string name)855 CPSG_NamedAnnotInfo::CPSG_NamedAnnotInfo(string name) :
856 CPSG_ReplyItem(eNamedAnnotInfo),
857 m_Name(move(name))
858 {
859 }
860
861
GetId2AnnotInfo() const862 string CPSG_NamedAnnotInfo::GetId2AnnotInfo() const
863 {
864 auto node = m_Data.GetByKeyOrNull("seq_annot_info");
865 return node && node.IsString() ? node.AsString() : string();
866 }
867
868
GetId2AnnotInfoList() const869 CPSG_NamedAnnotInfo::TId2AnnotInfoList CPSG_NamedAnnotInfo::GetId2AnnotInfoList() const
870 {
871 TId2AnnotInfoList ret;
872 auto info_str = GetId2AnnotInfo();
873 if (!info_str.empty()) {
874 auto in_string = NStr::Base64Decode(info_str);
875 istringstream in_stream(in_string);
876 CObjectIStreamAsnBinary in(in_stream);
877 while ( in.HaveMoreData() ) {
878 CRef<TId2AnnotInfo> info(new TId2AnnotInfo);
879 in >> *info;
880 ret.push_back(info);
881 }
882 }
883 return ret;
884 }
885
886
GetAnnotatedId() const887 CPSG_BioId CPSG_NamedAnnotInfo::GetAnnotatedId() const
888 {
889 return s_GetBioId(m_Data);
890 }
891
GetRange() const892 CRange<TSeqPos> CPSG_NamedAnnotInfo::GetRange() const
893 {
894 if (auto start = static_cast<TSeqPos>(m_Data.GetInteger("start"))) {
895 if (auto stop = static_cast<TSeqPos>(m_Data.GetInteger("stop"))) {
896 return { start, stop };
897 }
898 }
899
900 return CRange<TSeqPos>::GetWhole();
901 }
902
GetBlobId() const903 CPSG_BlobId CPSG_NamedAnnotInfo::GetBlobId() const
904 {
905 return s_GetBlobId(m_Data);
906 }
907
908 template <class TResult>
909 struct SAnnotInfoProcessor
910 {
911 using TAction = function<bool(const CJsonNode&, TResult& result)>;
912 using TActions = map<int, TAction>;
913
SAnnotInfoProcessorSAnnotInfoProcessor914 SAnnotInfoProcessor(TActions actions) : m_Actions(move(actions)) {}
915
916 TResult operator()(const CPSG_ReplyItem* item, const CJsonNode& data) const;
917
918 private:
919 void ThrowError(const CPSG_ReplyItem* item, const CJsonNode& annot_info) const;
920
921 TActions m_Actions;
922 };
923
924 template <class TResult>
operator ()(const CPSG_ReplyItem * item,const CJsonNode & data) const925 TResult SAnnotInfoProcessor<TResult>::operator()(const CPSG_ReplyItem* item, const CJsonNode& data) const
926 {
927 auto annot_info_str(NStr::Unescape(data.GetString("annot_info")));
928 auto annot_info(CJsonNode::ParseJSON(annot_info_str, CJsonNode::fStandardJson));
929
930 if (!annot_info.IsObject()) ThrowError(item, annot_info);
931
932 TResult result;
933
934 for (CJsonIterator it = annot_info.Iterate(); it.IsValid(); it.Next()) {
935 auto key = stoi(it.GetKey());
936 auto found = m_Actions.find(key);
937
938 if (found != m_Actions.end()) {
939 auto node = it.GetNode();
940
941 if (!found->second(node, result)) {
942 ThrowError(item, annot_info);
943 }
944 }
945 }
946
947 return result;
948 }
949
950 template <class TResult>
ThrowError(const CPSG_ReplyItem * item,const CJsonNode & annot_info) const951 void SAnnotInfoProcessor<TResult>::ThrowError(const CPSG_ReplyItem* item, const CJsonNode& annot_info) const
952 {
953 _ASSERT(item);
954
955 auto reply = item->GetReply();
956 _ASSERT(reply);
957
958 auto request = reply->GetRequest().get();
959 _ASSERT(request);
960
961 NCBI_THROW_FMT(CPSG_Exception, eServerError, "Wrong annot_info format: '" << annot_info.Repr() <<
962 "' for " << request->GetType() << " request '" << request->GetId() << '\'');
963 }
964
s_GetZoomLevels(const CJsonNode & annot_data,CPSG_NamedAnnotInfo::TZoomLevels & result)965 bool s_GetZoomLevels(const CJsonNode& annot_data, CPSG_NamedAnnotInfo::TZoomLevels& result)
966 {
967 if (!annot_data.IsArray()) return false;
968
969 for (CJsonIterator it = annot_data.Iterate(); it.IsValid(); it.Next()) {
970 auto zoom_level = it.GetNode();
971
972 if (!zoom_level.IsInteger()) return false;
973
974 result.push_back(static_cast<unsigned>(zoom_level.AsInteger()));
975 }
976
977 return true;
978 }
979
980 template <objects::CSeq_annot::C_Data::E_Choice kAnnot>
s_GetTypeAndSubtype(const CJsonNode & annot_data,CPSG_NamedAnnotInfo::TAnnotInfoList & result)981 bool s_GetTypeAndSubtype(const CJsonNode& annot_data, CPSG_NamedAnnotInfo::TAnnotInfoList& result)
982 {
983 if (!annot_data.IsObject()) return false;
984
985 for (CJsonIterator it = annot_data.Iterate(); it.IsValid(); it.Next()) {
986 auto type = stoi(it.GetKey());
987 auto subtypes = it.GetNode();
988
989 if (!subtypes.IsArray()) return false;
990
991 for (CJsonIterator sub_it = subtypes.Iterate(); sub_it.IsValid(); sub_it.Next()) {
992 auto subtype_node = sub_it.GetNode();
993
994 if (!subtype_node.IsInteger()) return false;
995
996 auto subtype = static_cast<int>(subtype_node.AsInteger());
997 result.push_back({ kAnnot, type, subtype });
998 }
999 }
1000
1001 return true;
1002 }
1003
1004 template <objects::CSeq_annot::C_Data::E_Choice kAnnot>
s_GetType(const CJsonNode & annot_data,CPSG_NamedAnnotInfo::TAnnotInfoList & result)1005 bool s_GetType(const CJsonNode& annot_data, CPSG_NamedAnnotInfo::TAnnotInfoList& result)
1006 {
1007 if (!annot_data.IsArray()) return false;
1008
1009 for (CJsonIterator it = annot_data.Iterate(); it.IsValid(); it.Next()) {
1010 auto type_node = it.GetNode();
1011
1012 if (!type_node.IsInteger()) return false;
1013
1014 auto type = static_cast<int>(type_node.AsInteger());
1015 result.push_back({ kAnnot, type, 0 });
1016 }
1017
1018 return true;
1019 }
1020
GetZoomLevels() const1021 CPSG_NamedAnnotInfo::TZoomLevels CPSG_NamedAnnotInfo::GetZoomLevels() const
1022 {
1023 static SAnnotInfoProcessor<TZoomLevels> processor
1024 ({
1025 { 2048, &s_GetZoomLevels } // Special value for DensityGraph
1026 });
1027
1028 return processor(this, m_Data);
1029 }
1030
GetAnnotInfoList() const1031 CPSG_NamedAnnotInfo::TAnnotInfoList CPSG_NamedAnnotInfo::GetAnnotInfoList() const
1032 {
1033 using TAnnotType = SAnnotInfo::TAnnotType;
1034
1035 static SAnnotInfoProcessor<TAnnotInfoList> processor
1036 ({
1037 { TAnnotType::e_Ftable, &s_GetTypeAndSubtype<TAnnotType::e_Ftable> },
1038 { TAnnotType::e_Align, &s_GetType <TAnnotType::e_Align> },
1039 { TAnnotType::e_Graph, &s_GetType <TAnnotType::e_Graph> },
1040 { TAnnotType::e_Seq_table, &s_GetTypeAndSubtype<TAnnotType::e_Seq_table> },
1041 });
1042
1043 return processor(this, m_Data);
1044 }
1045
1046
CPSG_PublicComment(unique_ptr<CPSG_DataId> id,string text)1047 CPSG_PublicComment::CPSG_PublicComment(unique_ptr<CPSG_DataId> id, string text) :
1048 CPSG_ReplyItem(ePublicComment),
1049 m_Id(move(id)),
1050 m_Text(move(text))
1051 {
1052 }
1053
1054
CPSG_Reply()1055 CPSG_Reply::CPSG_Reply() :
1056 m_Impl(new SImpl)
1057 {
1058 }
1059
GetStatus(CDeadline deadline) const1060 EPSG_Status CPSG_Reply::GetStatus(CDeadline deadline) const
1061 {
1062 assert(m_Impl);
1063
1064 return s_GetStatus(&m_Impl->reply->reply_item, deadline);
1065 }
1066
GetNextMessage() const1067 string CPSG_Reply::GetNextMessage() const
1068 {
1069 assert(m_Impl);
1070 assert(m_Impl->reply);
1071
1072 return m_Impl->reply->reply_item.GetLock()->state.GetError();
1073 }
1074
GetNextItem(CDeadline deadline)1075 shared_ptr<CPSG_ReplyItem> CPSG_Reply::GetNextItem(CDeadline deadline)
1076 {
1077 assert(m_Impl);
1078 assert(m_Impl->reply);
1079
1080 auto& reply_item = m_Impl->reply->reply_item;
1081 auto& reply_state = reply_item.GetMTSafe().state;
1082
1083 do {
1084 bool was_in_progress = reply_state.InProgress();
1085
1086 if (auto items_locked = m_Impl->reply->items.GetLock()) {
1087 auto& items = *items_locked;
1088
1089 for (auto& item_ts : items) {
1090 const auto& item_state = item_ts.GetMTSafe().state;
1091
1092 if (item_state.Returned()) continue;
1093
1094 if (item_state.Empty()) {
1095 auto item_locked = item_ts.GetLock();
1096 auto& item = *item_locked;
1097
1098 // Wait for more chunks on this item
1099 if (!item.expected.Cmp<less_equal>(item.received)) continue;
1100 }
1101
1102 // Do not hold lock on item_ts around this call!
1103 if (auto rv = m_Impl->Create(&item_ts)) {
1104 return rv;
1105 }
1106
1107 continue;
1108 }
1109 }
1110
1111 // No more reply items
1112 if (!was_in_progress) {
1113 return shared_ptr<CPSG_ReplyItem>(new CPSG_ReplyItem(CPSG_ReplyItem::eEndOfReply));
1114 }
1115 }
1116 while (reply_item.WaitUntil(reply_state.GetState(), deadline, SPSG_Reply::SState::eInProgress, true));
1117
1118 return {};
1119 }
1120
~CPSG_Reply()1121 CPSG_Reply::~CPSG_Reply()
1122 {
1123 }
1124
1125
1126 CPSG_Queue::CPSG_Queue() = default;
1127 CPSG_Queue::CPSG_Queue(CPSG_Queue&&) = default;
1128 CPSG_Queue& CPSG_Queue::operator=(CPSG_Queue&&) = default;
1129 CPSG_Queue::~CPSG_Queue() = default;
1130
CPSG_Queue(const string & service)1131 CPSG_Queue::CPSG_Queue(const string& service) :
1132 m_Impl(new SImpl(service))
1133 {
1134 }
1135
SendRequest(shared_ptr<CPSG_Request> request,CDeadline deadline)1136 bool CPSG_Queue::SendRequest(shared_ptr<CPSG_Request> request, CDeadline deadline)
1137 {
1138 _ASSERT(m_Impl);
1139 return m_Impl->SendRequest(const_pointer_cast<const CPSG_Request>(request), deadline);
1140 }
1141
GetNextReply(CDeadline deadline)1142 shared_ptr<CPSG_Reply> CPSG_Queue::GetNextReply(CDeadline deadline)
1143 {
1144 _ASSERT(m_Impl);
1145
1146 shared_ptr<CPSG_Reply> rv;
1147 m_Impl->Pop(rv, deadline);
1148 return rv;
1149 }
1150
Stop()1151 void CPSG_Queue::Stop()
1152 {
1153 _ASSERT(m_Impl);
1154 m_Impl->Stop(CPSG_Queue::SImpl::eDrain);
1155 }
1156
Reset()1157 void CPSG_Queue::Reset()
1158 {
1159 _ASSERT(m_Impl);
1160 m_Impl->Stop(CPSG_Queue::SImpl::eClear);
1161 }
1162
IsEmpty() const1163 bool CPSG_Queue::IsEmpty() const
1164 {
1165 _ASSERT(m_Impl);
1166 return m_Impl->Empty();
1167 }
1168
RejectsRequests() const1169 bool CPSG_Queue::RejectsRequests() const
1170 {
1171 _ASSERT(m_Impl);
1172 return m_Impl->RejectsRequests();
1173 }
1174
1175
GetApiLock()1176 CPSG_Queue::TApiLock CPSG_Queue::GetApiLock()
1177 {
1178 return SImpl::GetApiLock();
1179 }
1180
1181
1182 END_NCBI_SCOPE
1183
1184 #endif
1185