1 /*  $Id: get_processor.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: get blob processor
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include "get_processor.hpp"
34 #include "pubseq_gateway.hpp"
35 #include "pubseq_gateway_cache_utils.hpp"
36 #include "pubseq_gateway_convert_utils.hpp"
37 #include "get_blob_callback.hpp"
38 
39 USING_NCBI_SCOPE;
40 
41 using namespace std::placeholders;
42 
43 
CPSGS_GetProcessor()44 CPSGS_GetProcessor::CPSGS_GetProcessor() :
45     m_BlobRequest(nullptr)
46 {}
47 
48 
CPSGS_GetProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority)49 CPSGS_GetProcessor::CPSGS_GetProcessor(shared_ptr<CPSGS_Request> request,
50                                        shared_ptr<CPSGS_Reply> reply,
51                                        TProcessorPriority  priority) :
52     CPSGS_CassProcessorBase(request, reply, priority),
53     CPSGS_ResolveBase(request, reply,
54                       bind(&CPSGS_GetProcessor::x_OnSeqIdResolveFinished,
55                            this, _1),
56                       bind(&CPSGS_GetProcessor::x_OnSeqIdResolveError,
57                            this, _1, _2, _3, _4),
58                       bind(&CPSGS_GetProcessor::x_OnResolutionGoodData,
59                            this)),
60     CPSGS_CassBlobBase(request, reply, GetName())
61 {
62     // Convenience to avoid calling
63     // m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>() everywhere
64     m_BlobRequest = & request->GetRequest<SPSGS_BlobBySeqIdRequest>();
65 
66     // Convert generic excluded blobs into cassandra blob ids
67     for (const auto &  blob_id_as_str : m_BlobRequest->m_ExcludeBlobs) {
68         SCass_BlobId    cass_blob_id(blob_id_as_str);
69         if (cass_blob_id.IsValid())
70             m_ExcludeBlobs.push_back(cass_blob_id);
71     }
72 }
73 
74 
~CPSGS_GetProcessor()75 CPSGS_GetProcessor::~CPSGS_GetProcessor()
76 {}
77 
78 
79 IPSGS_Processor*
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const80 CPSGS_GetProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
81                                     shared_ptr<CPSGS_Reply> reply,
82                                     TProcessorPriority  priority) const
83 {
84     if (!IsCassandraProcessorEnabled(request))
85         return nullptr;
86 
87     if (request->GetRequestType() == CPSGS_Request::ePSGS_BlobBySeqIdRequest) {
88         auto *      app = CPubseqGatewayApp::GetInstance();
89         auto        startup_data_state = app->GetStartupDataState();
90         if (startup_data_state != ePSGS_StartupDataOK) {
91             if (request->NeedTrace()) {
92                 reply->SendTrace("Cannot create " + GetName() +
93                                  " processor because Cassandra DB "
94                                  "is not available.\n" +
95                                  GetCassStartupDataStateMessage(startup_data_state),
96                                  request->GetStartTimestamp());
97             }
98             return nullptr;
99         }
100 
101         return new CPSGS_GetProcessor(request, reply, priority);
102     }
103 
104     return nullptr;
105 }
106 
107 
Process(void)108 void CPSGS_GetProcessor::Process(void)
109 {
110     // In both cases: sync or async resolution --> a callback will be called
111     ResolveInputSeqId();
112 }
113 
114 
115 // This callback is called in all cases when there is no valid resolution, i.e.
116 // 404, or any kind of errors
117 void
x_OnSeqIdResolveError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)118 CPSGS_GetProcessor::x_OnSeqIdResolveError(
119                         CRequestStatus::ECode  status,
120                         int  code,
121                         EDiagSev  severity,
122                         const string &  message)
123 {
124     if (m_Cancelled) {
125         m_Completed = true;
126         return;
127     }
128 
129     CRequestContextResetter     context_resetter;
130     IPSGS_Processor::m_Request->SetRequestContext();
131 
132     UpdateOverallStatus(status);
133     PSG_WARNING(message);
134 
135     size_t      item_id = IPSGS_Processor::m_Reply->GetItemId();
136     if (status == CRequestStatus::e404_NotFound) {
137         IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
138                                                        message, status,
139                                                        ePSGS_NoBioseqInfo,
140                                                        eDiag_Error);
141     } else {
142         IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
143                                                        message, status,
144                                                        ePSGS_BioseqInfoError,
145                                                        severity);
146     }
147     IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
148 
149     m_Completed = true;
150     SignalFinishProcessing();
151 }
152 
153 
154 // This callback is called only in case of a valid resolution
155 void
x_OnSeqIdResolveFinished(SBioseqResolution && bioseq_resolution)156 CPSGS_GetProcessor::x_OnSeqIdResolveFinished(
157                             SBioseqResolution &&  bioseq_resolution)
158 {
159     if (m_Cancelled) {
160         m_Completed = true;
161         return;
162     }
163 
164     CRequestContextResetter     context_resetter;
165     IPSGS_Processor::m_Request->SetRequestContext();
166 
167     x_SendBioseqInfo(bioseq_resolution);
168 
169     // Translate sat to keyspace
170     auto *          app = CPubseqGatewayApp::GetInstance();
171     SCass_BlobId    blob_id(bioseq_resolution.m_BioseqInfo.GetSat(),
172                             bioseq_resolution.m_BioseqInfo.GetSatKey());
173     if (app->SatToKeyspace(blob_id.m_Sat, blob_id.m_Keyspace)) {
174         m_BlobId = blob_id;
175         x_GetBlob();
176         return;
177     }
178 
179     // It is an error of the sat to keyspace translation
180     size_t      item_id = IPSGS_Processor::m_Reply->GetItemId();
181     string      msg = "Unknown satellite number " + to_string(blob_id.m_Sat) +
182                       " for bioseq info with seq_id '" +
183                       m_BlobRequest->m_SeqId + "'";
184     app->GetCounters().Increment(CPSGSCounters::ePSGS_ServerSatToSatNameError);
185 
186     IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
187         item_id, GetName(), msg, CRequestStatus::e500_InternalServerError,
188         ePSGS_UnknownResolvedSatellite, eDiag_Error);
189     IPSGS_Processor::m_Reply->PrepareBlobPropCompletion(item_id, GetName(), 2);
190 
191     UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
192     PSG_ERROR(msg);
193 
194     m_Completed = true;
195     SignalFinishProcessing();
196 }
197 
198 
199 void
x_SendBioseqInfo(SBioseqResolution & bioseq_resolution)200 CPSGS_GetProcessor::x_SendBioseqInfo(SBioseqResolution &  bioseq_resolution)
201 {
202     if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqDB ||
203         bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
204         AdjustBioseqAccession(bioseq_resolution);
205 
206     size_t  item_id = IPSGS_Processor::m_Reply->GetItemId();
207     auto    data_to_send = ToJson(bioseq_resolution.m_BioseqInfo,
208                                   SPSGS_ResolveRequest::fPSGS_AllBioseqFields).
209                                         Repr(CJsonNode::fStandardJson);
210 
211     IPSGS_Processor::m_Reply->PrepareBioseqData(
212             item_id, GetName(), data_to_send,
213             SPSGS_ResolveRequest::ePSGS_JsonFormat);
214     IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
215 }
216 
217 
x_IsExcludedBlob(void) const218 bool CPSGS_GetProcessor::x_IsExcludedBlob(void) const
219 {
220     for (const auto & item : m_ExcludeBlobs) {
221         if (item == m_BlobId)
222             return true;
223     }
224     return false;
225 }
226 
227 
x_GetBlob(void)228 void CPSGS_GetProcessor::x_GetBlob(void)
229 {
230     auto * app = CPubseqGatewayApp::GetInstance();
231 
232     if (x_IsExcludedBlob()) {
233         IPSGS_Processor::m_Reply->PrepareBlobExcluded(
234                 IPSGS_Processor::m_Reply->GetItemId(), GetName(),
235                 m_BlobId.ToString(), ePSGS_BlobExcluded);
236         m_Completed = true;
237         SignalFinishProcessing();
238         return;
239     }
240 
241 
242     if (m_BlobRequest->m_TSEOption != SPSGS_BlobRequestBase::ePSGS_NoneTSE &&
243         m_BlobRequest->m_TSEOption != SPSGS_BlobRequestBase::ePSGS_SlimTSE) {
244         if (!m_BlobRequest->m_ClientId.empty()) {
245             // Adding to exclude blob cache is unconditional however skipping
246             // is only for the blobs identified by seq_id/seq_id_type
247             bool        completed = true;
248             auto        cache_result =
249                 app->GetExcludeBlobCache()->AddBlobId(
250                         m_BlobRequest->m_ClientId,
251                         m_BlobId.m_Sat,
252                         m_BlobId.m_SatKey,
253                         completed);
254             if (cache_result == ePSGS_AlreadyInCache &&
255                 m_BlobRequest->m_AutoBlobSkipping) {
256                 if (completed)
257                     IPSGS_Processor::m_Reply->PrepareBlobExcluded(
258                                     m_BlobId.ToString(), GetName(),
259                                     ePSGS_BlobSent);
260                 else
261                     IPSGS_Processor::m_Reply->PrepareBlobExcluded(
262                                     m_BlobId.ToString(), GetName(),
263                                     ePSGS_BlobInProgress);
264                 m_Completed = true;
265                 SignalFinishProcessing();
266                 return;
267             }
268 
269             if (cache_result == ePSGS_Added)
270                 m_BlobRequest->m_ExcludeBlobCacheAdded = true;
271         }
272     }
273 
274     unique_ptr<CCassBlobFetch>  fetch_details;
275     fetch_details.reset(new CCassBlobFetch(*m_BlobRequest, m_BlobId));
276 
277     unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
278     CPSGCache               psg_cache(IPSGS_Processor::m_Request,
279                                       IPSGS_Processor::m_Reply);
280     int64_t                 last_modified = INT64_MIN;
281     auto                    blob_prop_cache_lookup_result =
282                                     psg_cache.LookupBlobProp(
283                                         m_BlobId.m_Sat,
284                                         m_BlobId.m_SatKey,
285                                         last_modified,
286                                         *blob_record.get());
287 
288     CCassBlobTaskLoadBlob *     load_task = nullptr;
289     if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
290         load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
291                                               app->GetCassandraMaxRetries(),
292                                               app->GetCassandraConnection(),
293                                               m_BlobId.m_Keyspace,
294                                               move(blob_record),
295                                               false, nullptr);
296         fetch_details->SetLoader(load_task);
297     } else {
298         if (m_BlobRequest->m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
299             // No data in cache and not going to the DB
300             size_t      item_id = IPSGS_Processor::m_Reply->GetItemId();
301             auto        ret_status = CRequestStatus::e404_NotFound;
302             if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
303                 IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
304                     item_id, GetName(),
305                     "Blob properties are not found",
306                     ret_status, ePSGS_BlobPropsNotFound,
307                     eDiag_Error);
308             } else {
309                 ret_status = CRequestStatus::e500_InternalServerError;
310                 IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
311                     item_id, GetName(),
312                     "Blob properties are not found due to a cache lookup error",
313                     ret_status, ePSGS_BlobPropsNotFound,
314                     eDiag_Error);
315             }
316             IPSGS_Processor::m_Reply->PrepareBlobPropCompletion(item_id,
317                                                                 GetName(),
318                                                                 2);
319 
320             if (m_BlobRequest->m_ExcludeBlobCacheAdded &&
321                 !m_BlobRequest->m_ClientId.empty()) {
322                 app->GetExcludeBlobCache()->Remove(
323                         m_BlobRequest->m_ClientId,
324                         m_BlobId.m_Sat,
325                         m_BlobId.m_SatKey);
326 
327                 // To prevent SetCompleted() later
328                 m_BlobRequest->m_ExcludeBlobCacheAdded = false;
329             }
330 
331             // Finished without reaching cassandra
332             UpdateOverallStatus(ret_status);
333             m_Completed = true;
334             SignalFinishProcessing();
335             return;
336         }
337 
338         load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
339                                               app->GetCassandraMaxRetries(),
340                                               app->GetCassandraConnection(),
341                                               m_BlobId.m_Keyspace,
342                                               m_BlobId.m_SatKey,
343                                               false, nullptr);
344         fetch_details->SetLoader(load_task);
345     }
346 
347     load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
348     load_task->SetErrorCB(
349         CGetBlobErrorCallback(bind(&CPSGS_GetProcessor::OnGetBlobError,
350                                    this, _1, _2, _3, _4, _5),
351                               fetch_details.get()));
352     load_task->SetPropsCallback(
353         CBlobPropCallback(bind(&CPSGS_GetProcessor::OnGetBlobProp,
354                                this, _1, _2, _3),
355                           IPSGS_Processor::m_Request,
356                           IPSGS_Processor::m_Reply,
357                           fetch_details.get(),
358                           blob_prop_cache_lookup_result != ePSGS_CacheHit));
359 
360     if (IPSGS_Processor::m_Request->NeedTrace()) {
361         IPSGS_Processor::m_Reply->SendTrace(
362                             "Cassandra request: " +
363                             ToJson(*load_task).Repr(CJsonNode::fStandardJson),
364                             IPSGS_Processor::m_Request->GetStartTimestamp());
365     }
366 
367     m_FetchDetails.push_back(move(fetch_details));
368 
369     // Initiate cassandra request
370     load_task->Wait();
371 }
372 
373 
OnGetBlobProp(CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool is_found)374 void CPSGS_GetProcessor::OnGetBlobProp(CCassBlobFetch *  fetch_details,
375                                        CBlobRecord const &  blob,
376                                        bool is_found)
377 {
378     if (m_Cancelled) {
379         m_Completed = true;
380         return;
381     }
382 
383     CPSGS_CassBlobBase::OnGetBlobProp(bind(&CPSGS_GetProcessor::OnGetBlobProp,
384                                            this, _1, _2, _3),
385                                       bind(&CPSGS_GetProcessor::OnGetBlobChunk,
386                                            this, _1, _2, _3, _4, _5),
387                                       bind(&CPSGS_GetProcessor::OnGetBlobError,
388                                            this, _1, _2, _3, _4, _5),
389                                       fetch_details, blob, is_found);
390 
391     if (IPSGS_Processor::m_Reply->IsOutputReady())
392         x_Peek(false);
393 }
394 
395 
OnGetBlobError(CCassBlobFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)396 void CPSGS_GetProcessor::OnGetBlobError(CCassBlobFetch *  fetch_details,
397                                         CRequestStatus::ECode  status,
398                                         int  code,
399                                         EDiagSev  severity,
400                                         const string &  message)
401 {
402     if (m_Cancelled) {
403         m_Completed = true;
404         return;
405     }
406 
407     CPSGS_CassBlobBase::OnGetBlobError(fetch_details, status, code,
408                                        severity, message);
409 
410     if (IPSGS_Processor::m_Reply->IsOutputReady())
411         x_Peek(false);
412 }
413 
414 
OnGetBlobChunk(CCassBlobFetch * fetch_details,CBlobRecord const & blob,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)415 void CPSGS_GetProcessor::OnGetBlobChunk(CCassBlobFetch *  fetch_details,
416                                         CBlobRecord const &  blob,
417                                         const unsigned char *  chunk_data,
418                                         unsigned int  data_size,
419                                         int  chunk_no)
420 {
421     if (m_Cancelled) {
422         m_Completed = true;
423         return;
424     }
425 
426     CPSGS_CassBlobBase::OnGetBlobChunk(m_Cancelled, fetch_details,
427                                        chunk_data, data_size, chunk_no);
428 
429     if (IPSGS_Processor::m_Reply->IsOutputReady())
430         x_Peek(false);
431 }
432 
433 
Cancel(void)434 void CPSGS_GetProcessor::Cancel(void)
435 {
436     m_Cancelled = true;
437     CancelLoaders();
438 }
439 
440 
GetStatus(void)441 IPSGS_Processor::EPSGS_Status CPSGS_GetProcessor::GetStatus(void)
442 {
443     auto    status = CPSGS_CassProcessorBase::GetStatus();
444     if (status == IPSGS_Processor::ePSGS_InProgress)
445         return status;
446 
447     if (m_Cancelled)
448         return IPSGS_Processor::ePSGS_Cancelled;
449 
450     return status;
451 }
452 
453 
GetName(void) const454 string CPSGS_GetProcessor::GetName(void) const
455 {
456     return "Cassandra-get";
457 }
458 
459 
ProcessEvent(void)460 void CPSGS_GetProcessor::ProcessEvent(void)
461 {
462     x_Peek(true);
463 }
464 
465 
x_Peek(bool need_wait)466 void CPSGS_GetProcessor::x_Peek(bool  need_wait)
467 {
468     if (m_Cancelled)
469         return;
470 
471     if (m_InPeek)
472         return;
473 
474     m_InPeek = true;
475 
476     // 1 -> call m_Loader->Wait1 to pick data
477     // 2 -> check if we have ready-to-send buffers
478     // 3 -> call reply->Send()  to send what we have if it is ready
479     bool        overall_final_state = false;
480 
481     while (true) {
482         auto initial_size = m_FetchDetails.size();
483 
484         for (auto &  details: m_FetchDetails) {
485             if (details) {
486                 overall_final_state |= x_Peek(details, need_wait);
487             }
488         }
489 
490         if (initial_size == m_FetchDetails.size()) {
491             break;
492         }
493     }
494 
495     // Blob specific: ready packets need to be sent right away
496     if (IPSGS_Processor::m_Reply->IsOutputReady())
497         IPSGS_Processor::m_Reply->Flush(false);
498 
499     // Blob specific: deal with exclude blob cache
500     if (AreAllFinishedRead()) {
501         // The handler deals with both kind of blob requests:
502         // - by sat/sat_key
503         // - by seq_id/seq_id_type
504         // So get the reference to the blob base request
505         auto &      blob_request =
506                 IPSGS_Processor::m_Request->GetRequest<SPSGS_BlobRequestBase>();
507 
508         if (blob_request.m_ExcludeBlobCacheAdded &&
509             ! blob_request.m_ExcludeBlobCacheCompleted &&
510             ! blob_request.m_ClientId.empty()) {
511             auto *  app = CPubseqGatewayApp::GetInstance();
512             app->GetExcludeBlobCache()->SetCompleted(
513                                             blob_request.m_ClientId,
514                                             m_BlobId.m_Sat,
515                                             m_BlobId.m_SatKey, true);
516             blob_request.m_ExcludeBlobCacheCompleted = true;
517         }
518     }
519 
520     m_InPeek = false;
521 }
522 
523 
x_Peek(unique_ptr<CCassFetch> & fetch_details,bool need_wait)524 bool CPSGS_GetProcessor::x_Peek(unique_ptr<CCassFetch> &  fetch_details,
525                                 bool  need_wait)
526 {
527     if (!fetch_details->GetLoader())
528         return true;
529 
530     bool    final_state = false;
531     if (need_wait)
532         if (!fetch_details->ReadFinished()) {
533             final_state = fetch_details->GetLoader()->Wait();
534             if (final_state)
535                 fetch_details->SetReadFinished();
536         }
537 
538     if (fetch_details->GetLoader()->HasError() &&
539             IPSGS_Processor::m_Reply->IsOutputReady() &&
540             ! IPSGS_Processor::m_Reply->IsFinished()) {
541         // Send an error
542         string      error = fetch_details->GetLoader()->LastError();
543         auto *      app = CPubseqGatewayApp::GetInstance();
544 
545         app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
546         PSG_ERROR(error);
547 
548         CCassBlobFetch *  blob_fetch = static_cast<CCassBlobFetch *>(fetch_details.get());
549         PrepareServerErrorMessage(blob_fetch, ePSGS_UnknownError, eDiag_Error, error);
550 
551         // Mark finished
552         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
553         fetch_details->SetReadFinished();
554         SignalFinishProcessing();
555     }
556 
557     return final_state;
558 }
559 
560 
x_OnResolutionGoodData(void)561 void CPSGS_GetProcessor::x_OnResolutionGoodData(void)
562 {
563     // The resolution process started to receive data which look good so
564     // the dispatcher should be notified that the other processors can be
565     // stopped
566     if (m_Cancelled || m_Completed)
567         return;
568 
569     if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
570         m_Completed = true;
571     }
572 }
573 
574