1 /* $Id: get_processor.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description: get blob processor
29 *
30 */
31 #include <ncbi_pch.hpp>
32
33 #include "get_processor.hpp"
34 #include "pubseq_gateway.hpp"
35 #include "pubseq_gateway_cache_utils.hpp"
36 #include "pubseq_gateway_convert_utils.hpp"
37 #include "get_blob_callback.hpp"
38
39 USING_NCBI_SCOPE;
40
41 using namespace std::placeholders;
42
43
CPSGS_GetProcessor()44 CPSGS_GetProcessor::CPSGS_GetProcessor() :
45 m_BlobRequest(nullptr)
46 {}
47
48
CPSGS_GetProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority)49 CPSGS_GetProcessor::CPSGS_GetProcessor(shared_ptr<CPSGS_Request> request,
50 shared_ptr<CPSGS_Reply> reply,
51 TProcessorPriority priority) :
52 CPSGS_CassProcessorBase(request, reply, priority),
53 CPSGS_ResolveBase(request, reply,
54 bind(&CPSGS_GetProcessor::x_OnSeqIdResolveFinished,
55 this, _1),
56 bind(&CPSGS_GetProcessor::x_OnSeqIdResolveError,
57 this, _1, _2, _3, _4),
58 bind(&CPSGS_GetProcessor::x_OnResolutionGoodData,
59 this)),
60 CPSGS_CassBlobBase(request, reply, GetName())
61 {
62 // Convenience to avoid calling
63 // m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>() everywhere
64 m_BlobRequest = & request->GetRequest<SPSGS_BlobBySeqIdRequest>();
65
66 // Convert generic excluded blobs into cassandra blob ids
67 for (const auto & blob_id_as_str : m_BlobRequest->m_ExcludeBlobs) {
68 SCass_BlobId cass_blob_id(blob_id_as_str);
69 if (cass_blob_id.IsValid())
70 m_ExcludeBlobs.push_back(cass_blob_id);
71 }
72 }
73
74
~CPSGS_GetProcessor()75 CPSGS_GetProcessor::~CPSGS_GetProcessor()
76 {}
77
78
79 IPSGS_Processor*
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const80 CPSGS_GetProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
81 shared_ptr<CPSGS_Reply> reply,
82 TProcessorPriority priority) const
83 {
84 if (!IsCassandraProcessorEnabled(request))
85 return nullptr;
86
87 if (request->GetRequestType() == CPSGS_Request::ePSGS_BlobBySeqIdRequest) {
88 auto * app = CPubseqGatewayApp::GetInstance();
89 auto startup_data_state = app->GetStartupDataState();
90 if (startup_data_state != ePSGS_StartupDataOK) {
91 if (request->NeedTrace()) {
92 reply->SendTrace("Cannot create " + GetName() +
93 " processor because Cassandra DB "
94 "is not available.\n" +
95 GetCassStartupDataStateMessage(startup_data_state),
96 request->GetStartTimestamp());
97 }
98 return nullptr;
99 }
100
101 return new CPSGS_GetProcessor(request, reply, priority);
102 }
103
104 return nullptr;
105 }
106
107
Process(void)108 void CPSGS_GetProcessor::Process(void)
109 {
110 // In both cases: sync or async resolution --> a callback will be called
111 ResolveInputSeqId();
112 }
113
114
115 // This callback is called in all cases when there is no valid resolution, i.e.
116 // 404, or any kind of errors
117 void
x_OnSeqIdResolveError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)118 CPSGS_GetProcessor::x_OnSeqIdResolveError(
119 CRequestStatus::ECode status,
120 int code,
121 EDiagSev severity,
122 const string & message)
123 {
124 if (m_Cancelled) {
125 m_Completed = true;
126 return;
127 }
128
129 CRequestContextResetter context_resetter;
130 IPSGS_Processor::m_Request->SetRequestContext();
131
132 UpdateOverallStatus(status);
133 PSG_WARNING(message);
134
135 size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
136 if (status == CRequestStatus::e404_NotFound) {
137 IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
138 message, status,
139 ePSGS_NoBioseqInfo,
140 eDiag_Error);
141 } else {
142 IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
143 message, status,
144 ePSGS_BioseqInfoError,
145 severity);
146 }
147 IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
148
149 m_Completed = true;
150 SignalFinishProcessing();
151 }
152
153
154 // This callback is called only in case of a valid resolution
155 void
x_OnSeqIdResolveFinished(SBioseqResolution && bioseq_resolution)156 CPSGS_GetProcessor::x_OnSeqIdResolveFinished(
157 SBioseqResolution && bioseq_resolution)
158 {
159 if (m_Cancelled) {
160 m_Completed = true;
161 return;
162 }
163
164 CRequestContextResetter context_resetter;
165 IPSGS_Processor::m_Request->SetRequestContext();
166
167 x_SendBioseqInfo(bioseq_resolution);
168
169 // Translate sat to keyspace
170 auto * app = CPubseqGatewayApp::GetInstance();
171 SCass_BlobId blob_id(bioseq_resolution.m_BioseqInfo.GetSat(),
172 bioseq_resolution.m_BioseqInfo.GetSatKey());
173 if (app->SatToKeyspace(blob_id.m_Sat, blob_id.m_Keyspace)) {
174 m_BlobId = blob_id;
175 x_GetBlob();
176 return;
177 }
178
179 // It is an error of the sat to keyspace translation
180 size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
181 string msg = "Unknown satellite number " + to_string(blob_id.m_Sat) +
182 " for bioseq info with seq_id '" +
183 m_BlobRequest->m_SeqId + "'";
184 app->GetCounters().Increment(CPSGSCounters::ePSGS_ServerSatToSatNameError);
185
186 IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
187 item_id, GetName(), msg, CRequestStatus::e500_InternalServerError,
188 ePSGS_UnknownResolvedSatellite, eDiag_Error);
189 IPSGS_Processor::m_Reply->PrepareBlobPropCompletion(item_id, GetName(), 2);
190
191 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
192 PSG_ERROR(msg);
193
194 m_Completed = true;
195 SignalFinishProcessing();
196 }
197
198
199 void
x_SendBioseqInfo(SBioseqResolution & bioseq_resolution)200 CPSGS_GetProcessor::x_SendBioseqInfo(SBioseqResolution & bioseq_resolution)
201 {
202 if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqDB ||
203 bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
204 AdjustBioseqAccession(bioseq_resolution);
205
206 size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
207 auto data_to_send = ToJson(bioseq_resolution.m_BioseqInfo,
208 SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
209 Repr(CJsonNode::fStandardJson);
210
211 IPSGS_Processor::m_Reply->PrepareBioseqData(
212 item_id, GetName(), data_to_send,
213 SPSGS_ResolveRequest::ePSGS_JsonFormat);
214 IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
215 }
216
217
x_IsExcludedBlob(void) const218 bool CPSGS_GetProcessor::x_IsExcludedBlob(void) const
219 {
220 for (const auto & item : m_ExcludeBlobs) {
221 if (item == m_BlobId)
222 return true;
223 }
224 return false;
225 }
226
227
x_GetBlob(void)228 void CPSGS_GetProcessor::x_GetBlob(void)
229 {
230 auto * app = CPubseqGatewayApp::GetInstance();
231
232 if (x_IsExcludedBlob()) {
233 IPSGS_Processor::m_Reply->PrepareBlobExcluded(
234 IPSGS_Processor::m_Reply->GetItemId(), GetName(),
235 m_BlobId.ToString(), ePSGS_BlobExcluded);
236 m_Completed = true;
237 SignalFinishProcessing();
238 return;
239 }
240
241
242 if (m_BlobRequest->m_TSEOption != SPSGS_BlobRequestBase::ePSGS_NoneTSE &&
243 m_BlobRequest->m_TSEOption != SPSGS_BlobRequestBase::ePSGS_SlimTSE) {
244 if (!m_BlobRequest->m_ClientId.empty()) {
245 // Adding to exclude blob cache is unconditional however skipping
246 // is only for the blobs identified by seq_id/seq_id_type
247 bool completed = true;
248 auto cache_result =
249 app->GetExcludeBlobCache()->AddBlobId(
250 m_BlobRequest->m_ClientId,
251 m_BlobId.m_Sat,
252 m_BlobId.m_SatKey,
253 completed);
254 if (cache_result == ePSGS_AlreadyInCache &&
255 m_BlobRequest->m_AutoBlobSkipping) {
256 if (completed)
257 IPSGS_Processor::m_Reply->PrepareBlobExcluded(
258 m_BlobId.ToString(), GetName(),
259 ePSGS_BlobSent);
260 else
261 IPSGS_Processor::m_Reply->PrepareBlobExcluded(
262 m_BlobId.ToString(), GetName(),
263 ePSGS_BlobInProgress);
264 m_Completed = true;
265 SignalFinishProcessing();
266 return;
267 }
268
269 if (cache_result == ePSGS_Added)
270 m_BlobRequest->m_ExcludeBlobCacheAdded = true;
271 }
272 }
273
274 unique_ptr<CCassBlobFetch> fetch_details;
275 fetch_details.reset(new CCassBlobFetch(*m_BlobRequest, m_BlobId));
276
277 unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
278 CPSGCache psg_cache(IPSGS_Processor::m_Request,
279 IPSGS_Processor::m_Reply);
280 int64_t last_modified = INT64_MIN;
281 auto blob_prop_cache_lookup_result =
282 psg_cache.LookupBlobProp(
283 m_BlobId.m_Sat,
284 m_BlobId.m_SatKey,
285 last_modified,
286 *blob_record.get());
287
288 CCassBlobTaskLoadBlob * load_task = nullptr;
289 if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
290 load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
291 app->GetCassandraMaxRetries(),
292 app->GetCassandraConnection(),
293 m_BlobId.m_Keyspace,
294 move(blob_record),
295 false, nullptr);
296 fetch_details->SetLoader(load_task);
297 } else {
298 if (m_BlobRequest->m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
299 // No data in cache and not going to the DB
300 size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
301 auto ret_status = CRequestStatus::e404_NotFound;
302 if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
303 IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
304 item_id, GetName(),
305 "Blob properties are not found",
306 ret_status, ePSGS_BlobPropsNotFound,
307 eDiag_Error);
308 } else {
309 ret_status = CRequestStatus::e500_InternalServerError;
310 IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
311 item_id, GetName(),
312 "Blob properties are not found due to a cache lookup error",
313 ret_status, ePSGS_BlobPropsNotFound,
314 eDiag_Error);
315 }
316 IPSGS_Processor::m_Reply->PrepareBlobPropCompletion(item_id,
317 GetName(),
318 2);
319
320 if (m_BlobRequest->m_ExcludeBlobCacheAdded &&
321 !m_BlobRequest->m_ClientId.empty()) {
322 app->GetExcludeBlobCache()->Remove(
323 m_BlobRequest->m_ClientId,
324 m_BlobId.m_Sat,
325 m_BlobId.m_SatKey);
326
327 // To prevent SetCompleted() later
328 m_BlobRequest->m_ExcludeBlobCacheAdded = false;
329 }
330
331 // Finished without reaching cassandra
332 UpdateOverallStatus(ret_status);
333 m_Completed = true;
334 SignalFinishProcessing();
335 return;
336 }
337
338 load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
339 app->GetCassandraMaxRetries(),
340 app->GetCassandraConnection(),
341 m_BlobId.m_Keyspace,
342 m_BlobId.m_SatKey,
343 false, nullptr);
344 fetch_details->SetLoader(load_task);
345 }
346
347 load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
348 load_task->SetErrorCB(
349 CGetBlobErrorCallback(bind(&CPSGS_GetProcessor::OnGetBlobError,
350 this, _1, _2, _3, _4, _5),
351 fetch_details.get()));
352 load_task->SetPropsCallback(
353 CBlobPropCallback(bind(&CPSGS_GetProcessor::OnGetBlobProp,
354 this, _1, _2, _3),
355 IPSGS_Processor::m_Request,
356 IPSGS_Processor::m_Reply,
357 fetch_details.get(),
358 blob_prop_cache_lookup_result != ePSGS_CacheHit));
359
360 if (IPSGS_Processor::m_Request->NeedTrace()) {
361 IPSGS_Processor::m_Reply->SendTrace(
362 "Cassandra request: " +
363 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
364 IPSGS_Processor::m_Request->GetStartTimestamp());
365 }
366
367 m_FetchDetails.push_back(move(fetch_details));
368
369 // Initiate cassandra request
370 load_task->Wait();
371 }
372
373
OnGetBlobProp(CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool is_found)374 void CPSGS_GetProcessor::OnGetBlobProp(CCassBlobFetch * fetch_details,
375 CBlobRecord const & blob,
376 bool is_found)
377 {
378 if (m_Cancelled) {
379 m_Completed = true;
380 return;
381 }
382
383 CPSGS_CassBlobBase::OnGetBlobProp(bind(&CPSGS_GetProcessor::OnGetBlobProp,
384 this, _1, _2, _3),
385 bind(&CPSGS_GetProcessor::OnGetBlobChunk,
386 this, _1, _2, _3, _4, _5),
387 bind(&CPSGS_GetProcessor::OnGetBlobError,
388 this, _1, _2, _3, _4, _5),
389 fetch_details, blob, is_found);
390
391 if (IPSGS_Processor::m_Reply->IsOutputReady())
392 x_Peek(false);
393 }
394
395
OnGetBlobError(CCassBlobFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)396 void CPSGS_GetProcessor::OnGetBlobError(CCassBlobFetch * fetch_details,
397 CRequestStatus::ECode status,
398 int code,
399 EDiagSev severity,
400 const string & message)
401 {
402 if (m_Cancelled) {
403 m_Completed = true;
404 return;
405 }
406
407 CPSGS_CassBlobBase::OnGetBlobError(fetch_details, status, code,
408 severity, message);
409
410 if (IPSGS_Processor::m_Reply->IsOutputReady())
411 x_Peek(false);
412 }
413
414
OnGetBlobChunk(CCassBlobFetch * fetch_details,CBlobRecord const & blob,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)415 void CPSGS_GetProcessor::OnGetBlobChunk(CCassBlobFetch * fetch_details,
416 CBlobRecord const & blob,
417 const unsigned char * chunk_data,
418 unsigned int data_size,
419 int chunk_no)
420 {
421 if (m_Cancelled) {
422 m_Completed = true;
423 return;
424 }
425
426 CPSGS_CassBlobBase::OnGetBlobChunk(m_Cancelled, fetch_details,
427 chunk_data, data_size, chunk_no);
428
429 if (IPSGS_Processor::m_Reply->IsOutputReady())
430 x_Peek(false);
431 }
432
433
Cancel(void)434 void CPSGS_GetProcessor::Cancel(void)
435 {
436 m_Cancelled = true;
437 CancelLoaders();
438 }
439
440
GetStatus(void)441 IPSGS_Processor::EPSGS_Status CPSGS_GetProcessor::GetStatus(void)
442 {
443 auto status = CPSGS_CassProcessorBase::GetStatus();
444 if (status == IPSGS_Processor::ePSGS_InProgress)
445 return status;
446
447 if (m_Cancelled)
448 return IPSGS_Processor::ePSGS_Cancelled;
449
450 return status;
451 }
452
453
GetName(void) const454 string CPSGS_GetProcessor::GetName(void) const
455 {
456 return "Cassandra-get";
457 }
458
459
ProcessEvent(void)460 void CPSGS_GetProcessor::ProcessEvent(void)
461 {
462 x_Peek(true);
463 }
464
465
x_Peek(bool need_wait)466 void CPSGS_GetProcessor::x_Peek(bool need_wait)
467 {
468 if (m_Cancelled)
469 return;
470
471 if (m_InPeek)
472 return;
473
474 m_InPeek = true;
475
476 // 1 -> call m_Loader->Wait1 to pick data
477 // 2 -> check if we have ready-to-send buffers
478 // 3 -> call reply->Send() to send what we have if it is ready
479 bool overall_final_state = false;
480
481 while (true) {
482 auto initial_size = m_FetchDetails.size();
483
484 for (auto & details: m_FetchDetails) {
485 if (details) {
486 overall_final_state |= x_Peek(details, need_wait);
487 }
488 }
489
490 if (initial_size == m_FetchDetails.size()) {
491 break;
492 }
493 }
494
495 // Blob specific: ready packets need to be sent right away
496 if (IPSGS_Processor::m_Reply->IsOutputReady())
497 IPSGS_Processor::m_Reply->Flush(false);
498
499 // Blob specific: deal with exclude blob cache
500 if (AreAllFinishedRead()) {
501 // The handler deals with both kind of blob requests:
502 // - by sat/sat_key
503 // - by seq_id/seq_id_type
504 // So get the reference to the blob base request
505 auto & blob_request =
506 IPSGS_Processor::m_Request->GetRequest<SPSGS_BlobRequestBase>();
507
508 if (blob_request.m_ExcludeBlobCacheAdded &&
509 ! blob_request.m_ExcludeBlobCacheCompleted &&
510 ! blob_request.m_ClientId.empty()) {
511 auto * app = CPubseqGatewayApp::GetInstance();
512 app->GetExcludeBlobCache()->SetCompleted(
513 blob_request.m_ClientId,
514 m_BlobId.m_Sat,
515 m_BlobId.m_SatKey, true);
516 blob_request.m_ExcludeBlobCacheCompleted = true;
517 }
518 }
519
520 m_InPeek = false;
521 }
522
523
x_Peek(unique_ptr<CCassFetch> & fetch_details,bool need_wait)524 bool CPSGS_GetProcessor::x_Peek(unique_ptr<CCassFetch> & fetch_details,
525 bool need_wait)
526 {
527 if (!fetch_details->GetLoader())
528 return true;
529
530 bool final_state = false;
531 if (need_wait)
532 if (!fetch_details->ReadFinished()) {
533 final_state = fetch_details->GetLoader()->Wait();
534 if (final_state)
535 fetch_details->SetReadFinished();
536 }
537
538 if (fetch_details->GetLoader()->HasError() &&
539 IPSGS_Processor::m_Reply->IsOutputReady() &&
540 ! IPSGS_Processor::m_Reply->IsFinished()) {
541 // Send an error
542 string error = fetch_details->GetLoader()->LastError();
543 auto * app = CPubseqGatewayApp::GetInstance();
544
545 app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
546 PSG_ERROR(error);
547
548 CCassBlobFetch * blob_fetch = static_cast<CCassBlobFetch *>(fetch_details.get());
549 PrepareServerErrorMessage(blob_fetch, ePSGS_UnknownError, eDiag_Error, error);
550
551 // Mark finished
552 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
553 fetch_details->SetReadFinished();
554 SignalFinishProcessing();
555 }
556
557 return final_state;
558 }
559
560
x_OnResolutionGoodData(void)561 void CPSGS_GetProcessor::x_OnResolutionGoodData(void)
562 {
563 // The resolution process started to receive data which look good so
564 // the dispatcher should be notified that the other processors can be
565 // stopped
566 if (m_Cancelled || m_Completed)
567 return;
568
569 if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
570 m_Completed = true;
571 }
572 }
573
574