1 /*  $Id: cass_blob_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: base class for processors which retrieve blobs
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <corelib/request_status.hpp>
35 #include <corelib/ncbidiag.hpp>
36 
37 #include "cass_fetch.hpp"
38 #include "psgs_request.hpp"
39 #include "psgs_reply.hpp"
40 #include "pubseq_gateway_utils.hpp"
41 #include "pubseq_gateway_convert_utils.hpp"
42 #include "pubseq_gateway.hpp"
43 #include "cass_blob_base.hpp"
44 #include "pubseq_gateway_cache_utils.hpp"
45 #include "pubseq_gateway_convert_utils.hpp"
46 #include "public_comment_callback.hpp"
47 
48 using namespace std::placeholders;
49 
50 
CPSGS_CassBlobBase()51 CPSGS_CassBlobBase::CPSGS_CassBlobBase() :
52     m_LastModified(-1)
53 {}
54 
55 
CPSGS_CassBlobBase(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,const string & processor_id)56 CPSGS_CassBlobBase::CPSGS_CassBlobBase(shared_ptr<CPSGS_Request>  request,
57                                        shared_ptr<CPSGS_Reply>  reply,
58                                        const string &  processor_id) :
59     m_NeedToParseId2Info(true),
60     m_ProcessorId(processor_id),
61     m_LastModified(-1)
62 {}
63 
64 
~CPSGS_CassBlobBase()65 CPSGS_CassBlobBase::~CPSGS_CassBlobBase()
66 {}
67 
68 
69 void
OnGetBlobProp(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool is_found)70 CPSGS_CassBlobBase::OnGetBlobProp(TBlobPropsCB  blob_props_cb,
71                                   TBlobChunkCB  blob_chunk_cb,
72                                   TBlobErrorCB  blob_error_cb,
73                                   CCassBlobFetch *  fetch_details,
74                                   CBlobRecord const &  blob,
75                                   bool is_found)
76 {
77     CRequestContextResetter     context_resetter;
78     m_Request->SetRequestContext();
79 
80     if (m_Request->NeedTrace()) {
81         m_Reply->SendTrace("Blob prop callback; found: " + to_string(is_found),
82                            m_Request->GetStartTimestamp());
83     }
84 
85     if (is_found) {
86         // The method could be called multiple times. First time it is called
87         // for the blob in the request (ID/getblob and ID/get). At this moment
88         // the blob id2info field should be parsed and memorized.
89         // Later the original blob id2info is used to decide if id2_chunk
90         // and id2_info should be present in the reply.
91 
92         if (m_LastModified == -1)
93             m_LastModified = blob.GetModified();
94 
95         x_PrepareBlobPropData(fetch_details, blob);
96 
97         if (m_NeedToParseId2Info) {
98             if (!blob.GetId2Info().empty()) {
99                 if (!x_ParseId2Info(fetch_details, blob)) {
100                     x_PrepareBlobPropCompletion(fetch_details);
101                     SetFinished(fetch_details);
102                     return;
103                 }
104             }
105             m_NeedToParseId2Info = false;
106         }
107 
108         // Note: initially only blob_props are requested and at that moment the
109         //       TSE option is 'known'. So the initial request should be
110         //       finished, see m_FinishedRead = true
111         //       In the further requests of the blob data regardless with blob
112         //       props or not, the TSE option is set to unknown so the request
113         //       will be finished at the moment when blob chunks are handled.
114         switch (fetch_details->GetTSEOption()) {
115             case SPSGS_BlobRequestBase::ePSGS_NoneTSE:
116                 x_OnBlobPropNoneTSE(fetch_details);
117                 break;
118             case SPSGS_BlobRequestBase::ePSGS_SlimTSE:
119                 x_OnBlobPropSlimTSE(blob_props_cb, blob_chunk_cb, blob_error_cb,
120                                     fetch_details, blob);
121                 break;
122             case SPSGS_BlobRequestBase::ePSGS_SmartTSE:
123                 x_OnBlobPropSmartTSE(blob_props_cb, blob_chunk_cb, blob_error_cb,
124                                      fetch_details, blob);
125                 break;
126             case SPSGS_BlobRequestBase::ePSGS_WholeTSE:
127                 x_OnBlobPropWholeTSE(blob_props_cb, blob_chunk_cb, blob_error_cb,
128                                      fetch_details, blob);
129                 break;
130             case SPSGS_BlobRequestBase::ePSGS_OrigTSE:
131                 x_OnBlobPropOrigTSE(blob_chunk_cb, blob_error_cb,
132                                     fetch_details, blob);
133                 break;
134             case SPSGS_BlobRequestBase::ePSGS_UnknownTSE:
135                 // Used when INFO blobs are asked; i.e. chunks have been
136                 // requested as well, so only the prop completion message needs
137                 // to be sent.
138                 x_PrepareBlobPropCompletion(fetch_details);
139                 break;
140         }
141     } else {
142         x_OnBlobPropNotFound(fetch_details);
143     }
144 }
145 
146 
147 void
x_OnBlobPropNoneTSE(CCassBlobFetch * fetch_details)148 CPSGS_CassBlobBase::x_OnBlobPropNoneTSE(CCassBlobFetch *  fetch_details)
149 {
150     // Nothing else to be sent
151     x_PrepareBlobPropCompletion(fetch_details);
152     SetFinished(fetch_details);
153 }
154 
155 
156 void
x_OnBlobPropSlimTSE(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)157 CPSGS_CassBlobBase::x_OnBlobPropSlimTSE(TBlobPropsCB  blob_props_cb,
158                                         TBlobChunkCB  blob_chunk_cb,
159                                         TBlobErrorCB  blob_error_cb,
160                                         CCassBlobFetch *  fetch_details,
161                                         CBlobRecord const &  blob)
162 {
163     auto        fetch_blob = fetch_details->GetBlobId();
164 
165     // The handler deals with both kind of blob requests:
166     // - by sat/sat_key
167     // - by seq_id/seq_id_type
168     // So get the reference to the blob base request
169     auto &      blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
170 
171     fetch_details->SetReadFinished();
172     if (blob.GetId2Info().empty()) {
173         x_PrepareBlobPropCompletion(fetch_details);
174 
175         // An original blob may be required if its size is small
176         auto *          app = CPubseqGatewayApp::GetInstance();
177         unsigned int    slim_max_blob_size = app->GetSlimMaxBlobSize();
178 
179         if (blob.GetSize() <= slim_max_blob_size) {
180             // The blob is small, get it, but first check in the
181             // exclude blob cache
182             if (x_CheckExcludeBlobCache(fetch_details,
183                                         blob_request) == ePSGS_InCache) {
184                 return;
185             }
186 
187             x_RequestOriginalBlobChunks(blob_chunk_cb, blob_error_cb,
188                                         fetch_details, blob);
189         } else {
190             // Nothing else to be sent, the original blob is big
191         }
192         return;
193     }
194 
195     // Check the cache first - only if it is about the main
196     // blob request
197     if (x_CheckExcludeBlobCache(fetch_details,
198                                 blob_request) == ePSGS_InCache) {
199         return;
200     }
201 
202     // Not in the cache, request the split INFO blob only
203     x_RequestID2BlobChunks(blob_props_cb, blob_chunk_cb, blob_error_cb,
204                            fetch_details, blob, true);
205 
206     // It is important to send completion after: there could be
207     // an error of converting/translating ID2 info
208     x_PrepareBlobPropCompletion(fetch_details);
209 }
210 
211 
212 void
x_OnBlobPropSmartTSE(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)213 CPSGS_CassBlobBase::x_OnBlobPropSmartTSE(TBlobPropsCB  blob_props_cb,
214                                          TBlobChunkCB  blob_chunk_cb,
215                                          TBlobErrorCB  blob_error_cb,
216                                          CCassBlobFetch *  fetch_details,
217                                          CBlobRecord const &  blob)
218 {
219     fetch_details->SetReadFinished();
220     if (blob.GetId2Info().empty()) {
221         // Request original blob chunks
222         x_PrepareBlobPropCompletion(fetch_details);
223         x_RequestOriginalBlobChunks(blob_chunk_cb, blob_error_cb,
224                                     fetch_details, blob);
225     } else {
226         // Request the split INFO blob only
227         x_RequestID2BlobChunks(blob_props_cb, blob_chunk_cb, blob_error_cb,
228                                fetch_details, blob, true);
229 
230         // It is important to send completion after: there could be
231         // an error of converting/translating ID2 info
232         x_PrepareBlobPropCompletion(fetch_details);
233     }
234 }
235 
236 
237 void
x_OnBlobPropWholeTSE(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)238 CPSGS_CassBlobBase::x_OnBlobPropWholeTSE(TBlobPropsCB  blob_props_cb,
239                                          TBlobChunkCB  blob_chunk_cb,
240                                          TBlobErrorCB  blob_error_cb,
241                                          CCassBlobFetch *  fetch_details,
242                                          CBlobRecord const &  blob)
243 {
244     fetch_details->SetReadFinished();
245     if (blob.GetId2Info().empty()) {
246         // Request original blob chunks
247         x_PrepareBlobPropCompletion(fetch_details);
248         x_RequestOriginalBlobChunks(blob_chunk_cb, blob_error_cb,
249                                     fetch_details, blob);
250     } else {
251         // Request the split INFO blob and all split chunks
252         x_RequestID2BlobChunks(blob_props_cb, blob_chunk_cb, blob_error_cb,
253                                fetch_details, blob, false);
254 
255         // It is important to send completion after: there could be
256         // an error of converting/translating ID2 info
257         x_PrepareBlobPropCompletion(fetch_details);
258     }
259 }
260 
261 
262 void
x_OnBlobPropOrigTSE(TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)263 CPSGS_CassBlobBase::x_OnBlobPropOrigTSE(TBlobChunkCB  blob_chunk_cb,
264                                         TBlobErrorCB  blob_error_cb,
265                                         CCassBlobFetch *  fetch_details,
266                                         CBlobRecord const &  blob)
267 {
268     fetch_details->SetReadFinished();
269     // Request original blob chunks
270     x_PrepareBlobPropCompletion(fetch_details);
271     x_RequestOriginalBlobChunks(blob_chunk_cb,
272                                 blob_error_cb,
273                                 fetch_details, blob);
274 }
275 
276 
277 void
x_RequestOriginalBlobChunks(TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)278 CPSGS_CassBlobBase::x_RequestOriginalBlobChunks(TBlobChunkCB  blob_chunk_cb,
279                                                 TBlobErrorCB  blob_error_cb,
280                                                 CCassBlobFetch *  fetch_details,
281                                                 CBlobRecord const &  blob)
282 {
283     auto    app = CPubseqGatewayApp::GetInstance();
284 
285     auto    trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
286     if (m_Request->NeedTrace())
287         trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
288 
289     // eUnknownTSE is safe here; no blob prop call will happen anyway
290     // eUnknownUseCache is safe here; no further resolution required
291     auto    cass_blob_id = fetch_details->GetBlobId();
292     SPSGS_BlobBySatSatKeyRequest
293             orig_blob_request(SPSGS_BlobId(cass_blob_id.ToString()),
294                               blob.GetModified(),
295                               SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
296                               SPSGS_RequestBase::ePSGS_UnknownUseCache,
297                               fetch_details->GetClientId(),
298                               0, trace_flag,
299                               vector<string>(), vector<string>(),
300                               chrono::high_resolution_clock::now());
301 
302     // Create the cass async loader
303     unique_ptr<CBlobRecord>             blob_record(new CBlobRecord(blob));
304     CCassBlobTaskLoadBlob *             load_task =
305         new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
306                                   app->GetCassandraMaxRetries(),
307                                   app->GetCassandraConnection(),
308                                   cass_blob_id.m_Keyspace,
309                                   move(blob_record),
310                                   true, nullptr);
311 
312     unique_ptr<CCassBlobFetch>  cass_blob_fetch;
313     cass_blob_fetch.reset(new CCassBlobFetch(orig_blob_request, cass_blob_id));
314     cass_blob_fetch->SetLoader(load_task);
315 
316     // Blob props have already been rceived
317     cass_blob_fetch->SetBlobPropSent();
318 
319     load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
320     load_task->SetErrorCB(
321         CGetBlobErrorCallback(blob_error_cb, cass_blob_fetch.get()));
322     load_task->SetPropsCallback(nullptr);
323     load_task->SetChunkCallback(
324         CBlobChunkCallback(blob_chunk_cb, cass_blob_fetch.get()));
325 
326     if (m_Request->NeedTrace()) {
327         m_Reply->SendTrace(
328             "Cassandra request: " + ToJson(*load_task).Repr(CJsonNode::fStandardJson),
329             m_Request->GetStartTimestamp());
330     }
331 
332     m_FetchDetails.push_back(std::move(cass_blob_fetch));
333 
334     load_task->Wait();
335 }
336 
337 
338 void
x_RequestID2BlobChunks(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool info_blob_only)339 CPSGS_CassBlobBase::x_RequestID2BlobChunks(TBlobPropsCB  blob_props_cb,
340                                            TBlobChunkCB  blob_chunk_cb,
341                                            TBlobErrorCB  blob_error_cb,
342                                            CCassBlobFetch *  fetch_details,
343                                            CBlobRecord const &  blob,
344                                            bool  info_blob_only)
345 {
346     auto *      app = CPubseqGatewayApp::GetInstance();
347 
348     // Translate sat to keyspace
349     SCass_BlobId    info_blob_id(m_Id2Info->GetSat(), m_Id2Info->GetInfo());    // mandatory
350 
351     if (!app->SatToKeyspace(m_Id2Info->GetSat(), info_blob_id.m_Keyspace)) {
352         // Error: send it in the context of the blob props
353         string      message = "Error mapping id2 info sat (" +
354                               to_string(m_Id2Info->GetSat()) +
355                               ") to a cassandra keyspace for the blob " +
356                               fetch_details->GetBlobId().ToString();
357         x_PrepareBlobPropMessage(fetch_details, message,
358                                  CRequestStatus::e500_InternalServerError,
359                                  ePSGS_BadID2Info, eDiag_Error);
360         app->GetCounters().Increment(CPSGSCounters::ePSGS_ServerSatToSatNameError);
361         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
362         PSG_ERROR(message);
363         return;
364     }
365 
366     auto    trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
367     if (m_Request->NeedTrace())
368         trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
369 
370     // Create the Id2Info requests.
371     // eUnknownTSE is treated in the blob prop handler as to do nothing (no
372     // sending completion message, no requesting other blobs)
373     // eUnknownUseCache is safe here; no further resolution
374     // empty client_id means that later on there will be no exclude blobs cache ops
375     SPSGS_BlobBySatSatKeyRequest
376         info_blob_request(SPSGS_BlobId(info_blob_id.ToString()),
377                           INT64_MIN,
378                           SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
379                           SPSGS_RequestBase::ePSGS_UnknownUseCache,
380                           "", 0, trace_flag,
381                           vector<string>(), vector<string>(),
382                           chrono::high_resolution_clock::now());
383 
384     // Prepare Id2Info retrieval
385     unique_ptr<CCassBlobFetch>  cass_blob_fetch;
386     cass_blob_fetch.reset(new CCassBlobFetch(info_blob_request, info_blob_id));
387 
388     unique_ptr<CBlobRecord>     blob_record(new CBlobRecord);
389     CPSGCache                   psg_cache(m_Request, m_Reply);
390     auto                        blob_prop_cache_lookup_result =
391                                     psg_cache.LookupBlobProp(
392                                         info_blob_id.m_Sat,
393                                         info_blob_id.m_SatKey,
394                                         info_blob_request.m_LastModified,
395                                         *blob_record.get());
396     CCassBlobTaskLoadBlob *     load_task = nullptr;
397 
398 
399     if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
400         load_task = new CCassBlobTaskLoadBlob(
401                         app->GetCassandraTimeout(),
402                         app->GetCassandraMaxRetries(),
403                         app->GetCassandraConnection(),
404                         info_blob_id.m_Keyspace,
405                         move(blob_record),
406                         true, nullptr);
407     } else {
408         // The handler deals with both kind of blob requests:
409         // - by sat/sat_key
410         // - by seq_id/seq_id_type
411         // So get the reference to the blob base request
412         auto &      blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
413 
414         if (blob_request.m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
415             // No need to continue; it is forbidded to look for blob props in
416             // the Cassandra DB
417             string      message;
418 
419             if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
420                 message = "Blob properties are not found";
421                 UpdateOverallStatus(CRequestStatus::e404_NotFound);
422                 x_PrepareBlobPropMessage(fetch_details, message,
423                                          CRequestStatus::e404_NotFound,
424                                          ePSGS_BlobPropsNotFound, eDiag_Error);
425             } else {
426                 message = "Blob properties are not found due to LMDB cache error";
427                 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
428                 x_PrepareBlobPropMessage(fetch_details, message,
429                                          CRequestStatus::e500_InternalServerError,
430                                          ePSGS_BlobPropsNotFound, eDiag_Error);
431             }
432 
433             PSG_WARNING(message);
434             return;
435         }
436 
437         load_task = new CCassBlobTaskLoadBlob(
438                         app->GetCassandraTimeout(),
439                         app->GetCassandraMaxRetries(),
440                         app->GetCassandraConnection(),
441                         info_blob_id.m_Keyspace,
442                         info_blob_id.m_SatKey,
443                         true, nullptr);
444     }
445     cass_blob_fetch->SetLoader(load_task);
446 
447     load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
448     load_task->SetErrorCB(
449         CGetBlobErrorCallback(blob_error_cb, cass_blob_fetch.get()));
450     load_task->SetPropsCallback(
451         CBlobPropCallback(blob_props_cb,
452                           m_Request, m_Reply, cass_blob_fetch.get(),
453                           blob_prop_cache_lookup_result != ePSGS_CacheHit));
454     load_task->SetChunkCallback(
455         CBlobChunkCallback(blob_chunk_cb, cass_blob_fetch.get()));
456 
457     if (m_Request->NeedTrace()) {
458         m_Reply->SendTrace("Cassandra request: " +
459                            ToJson(*load_task).Repr(CJsonNode::fStandardJson),
460                            m_Request->GetStartTimestamp());
461     }
462 
463     m_FetchDetails.push_back(move(cass_blob_fetch));
464     auto    to_init_iter = m_FetchDetails.end();
465     --to_init_iter;
466 
467     // We may need to request ID2 chunks
468     if (!info_blob_only) {
469         // Sat name is the same
470         x_RequestId2SplitBlobs(blob_props_cb, blob_chunk_cb, blob_error_cb,
471                                fetch_details, info_blob_id.m_Keyspace);
472     }
473 
474     // initiate retrieval: only those which were just created
475     while (to_init_iter != m_FetchDetails.end()) {
476         if (*to_init_iter)
477             (*to_init_iter)->GetLoader()->Wait();
478         ++to_init_iter;
479     }
480 }
481 
482 
483 void
x_RequestId2SplitBlobs(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,const string & keyspace)484 CPSGS_CassBlobBase::x_RequestId2SplitBlobs(TBlobPropsCB  blob_props_cb,
485                                            TBlobChunkCB  blob_chunk_cb,
486                                            TBlobErrorCB  blob_error_cb,
487                                            CCassBlobFetch *  fetch_details,
488                                            const string &  keyspace)
489 {
490     auto    app = CPubseqGatewayApp::GetInstance();
491 
492     auto    trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
493     if (m_Request->NeedTrace())
494         trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
495 
496     for (int  chunk_no = 1; chunk_no <= m_Id2Info->GetChunks(); ++chunk_no) {
497         SCass_BlobId    chunks_blob_id(m_Id2Info->GetSat(),
498                                        m_Id2Info->GetInfo() - m_Id2Info->GetChunks() - 1 + chunk_no);
499         chunks_blob_id.m_Keyspace = keyspace;
500 
501         // eUnknownTSE is treated in the blob prop handler as to do nothing (no
502         // sending completion message, no requesting other blobs)
503         // eUnknownUseCache is safe here; no further resolution required
504         // client_id is "" (empty string) so the split blobs do not participate
505         // in the exclude blob cache
506         SPSGS_BlobBySatSatKeyRequest
507             chunk_request(SPSGS_BlobId(chunks_blob_id.ToString()),
508                           INT64_MIN,
509                           SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
510                           SPSGS_RequestBase::ePSGS_UnknownUseCache,
511                           "", 0, trace_flag,
512                           vector<string>(), vector<string>(),
513                           chrono::high_resolution_clock::now());
514 
515         unique_ptr<CCassBlobFetch>   details;
516         details.reset(new CCassBlobFetch(chunk_request, chunks_blob_id));
517 
518         unique_ptr<CBlobRecord>     blob_record(new CBlobRecord);
519         CPSGCache                   psg_cache(m_Request, m_Reply);
520         auto                        blob_prop_cache_lookup_result =
521                                         psg_cache.LookupBlobProp(
522                                             chunks_blob_id.m_Sat,
523                                             chunks_blob_id.m_SatKey,
524                                             chunk_request.m_LastModified,
525                                             *blob_record.get());
526         CCassBlobTaskLoadBlob *     load_task = nullptr;
527 
528         if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
529             load_task = new CCassBlobTaskLoadBlob(
530                             app->GetCassandraTimeout(),
531                             app->GetCassandraMaxRetries(),
532                             app->GetCassandraConnection(),
533                             chunks_blob_id.m_Keyspace,
534                             move(blob_record),
535                             true, nullptr);
536             details->SetLoader(load_task);
537         } else {
538             // The handler deals with both kind of blob requests:
539             // - by sat/sat_key
540             // - by seq_id/seq_id_type
541             // So get the reference to the blob base request
542             auto &      blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
543 
544             if (blob_request.m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
545                 // No need to create a request because the Cassandra DB access
546                 // is forbidden
547                 string      message;
548                 if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
549                     message = "Blob properties are not found";
550                     UpdateOverallStatus(CRequestStatus::e404_NotFound);
551                     x_PrepareBlobPropMessage(details.get(), message,
552                                              CRequestStatus::e404_NotFound,
553                                              ePSGS_BlobPropsNotFound, eDiag_Error);
554                 } else {
555                     message = "Blob properties are not found "
556                               "due to a blob proc cache lookup error";
557                     UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
558                     x_PrepareBlobPropMessage(details.get(), message,
559                                              CRequestStatus::e500_InternalServerError,
560                                              ePSGS_BlobPropsNotFound, eDiag_Error);
561                 }
562                 x_PrepareBlobPropCompletion(details.get());
563                 PSG_WARNING(message);
564                 continue;
565             }
566 
567             load_task = new CCassBlobTaskLoadBlob(
568                             app->GetCassandraTimeout(),
569                             app->GetCassandraMaxRetries(),
570                             app->GetCassandraConnection(),
571                             chunks_blob_id.m_Keyspace,
572                             chunks_blob_id.m_SatKey,
573                             true, nullptr);
574             details->SetLoader(load_task);
575         }
576 
577         load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
578         load_task->SetErrorCB(
579             CGetBlobErrorCallback(blob_error_cb, details.get()));
580         load_task->SetPropsCallback(
581             CBlobPropCallback(blob_props_cb,
582                               m_Request, m_Reply, details.get(),
583                               blob_prop_cache_lookup_result != ePSGS_CacheHit));
584         load_task->SetChunkCallback(
585             CBlobChunkCallback(blob_chunk_cb, details.get()));
586 
587         m_FetchDetails.push_back(move(details));
588     }
589 }
590 
591 
592 
593 CPSGS_CassBlobBase::EPSGS_BlobCacheCheckResult
x_CheckExcludeBlobCache(CCassBlobFetch * fetch_details,SPSGS_BlobRequestBase & blob_request)594 CPSGS_CassBlobBase::x_CheckExcludeBlobCache(CCassBlobFetch *  fetch_details,
595                                             SPSGS_BlobRequestBase &  blob_request)
596 {
597     if (blob_request.m_ClientId.empty())
598         return ePSGS_NotInCache;
599 
600     auto        fetch_blob = fetch_details->GetBlobId();
601     if (fetch_blob != m_BlobId)
602         return ePSGS_NotInCache;
603 
604     auto *      app = CPubseqGatewayApp::GetInstance();
605     bool        completed = true;
606     auto        cache_result = app->GetExcludeBlobCache()->AddBlobId(
607                                             blob_request.m_ClientId,
608                                             m_BlobId.m_Sat,
609                                             m_BlobId.m_SatKey,
610                                             completed);
611     if (m_Request->GetRequestType() == CPSGS_Request::ePSGS_BlobBySeqIdRequest &&
612         cache_result == ePSGS_AlreadyInCache) {
613         x_PrepareBlobPropCompletion(fetch_details);
614         if (completed)
615             x_PrepareBlobExcluded(fetch_details, m_BlobId.ToString(),
616                                   ePSGS_BlobSent);
617         else
618             x_PrepareBlobExcluded(fetch_details, m_BlobId.ToString(),
619                                   ePSGS_BlobInProgress);
620         return ePSGS_InCache;
621     }
622 
623     if (cache_result == ePSGS_Added)
624         blob_request.m_ExcludeBlobCacheAdded = true;
625     return ePSGS_NotInCache;
626 }
627 
628 
629 void
PrepareServerErrorMessage(CCassBlobFetch * fetch_details,int code,EDiagSev severity,const string & message)630 CPSGS_CassBlobBase::PrepareServerErrorMessage(CCassBlobFetch *  fetch_details,
631                                               int  code,
632                                               EDiagSev  severity,
633                                               const string &  message)
634 {
635     if (fetch_details->IsBlobPropStage()) {
636         x_PrepareBlobPropMessage(fetch_details, message,
637                                  CRequestStatus::e500_InternalServerError,
638                                  code, severity);
639         x_PrepareBlobPropCompletion(fetch_details);
640     } else {
641         x_PrepareBlobMessage(fetch_details, message,
642                              CRequestStatus::e500_InternalServerError,
643                              code, severity);
644         x_PrepareBlobCompletion(fetch_details);
645     }
646 }
647 
648 
649 void
OnGetBlobError(CCassBlobFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)650 CPSGS_CassBlobBase::OnGetBlobError(CCassBlobFetch *  fetch_details,
651                                    CRequestStatus::ECode  status,
652                                    int  code,
653                                    EDiagSev  severity,
654                                    const string &  message)
655 {
656     CRequestContextResetter     context_resetter;
657     m_Request->SetRequestContext();
658 
659     // To avoid sending an error in Peek()
660     fetch_details->GetLoader()->ClearError();
661 
662     // It could be a message or an error
663     bool    is_error = CountError(status, code, severity, message);
664 
665     if (is_error) {
666         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
667         PrepareServerErrorMessage(fetch_details, code, severity, message);
668 
669         // This code is reused by 2 requests:
670         // - get blob by sat/sat_key
671         // - get blob by seq_id/seq_id_type
672         auto &      blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
673 
674         if (fetch_details->GetBlobId() == m_BlobId) {
675             if (blob_request.m_ExcludeBlobCacheAdded &&
676                 ! blob_request.m_ClientId.empty()) {
677                 auto *  app = CPubseqGatewayApp::GetInstance();
678                 app->GetExcludeBlobCache()->Remove(blob_request.m_ClientId,
679                                                    m_BlobId.m_Sat,
680                                                    m_BlobId.m_SatKey);
681 
682                 // To prevent any updates
683                 blob_request.m_ExcludeBlobCacheAdded = false;
684             }
685         }
686 
687         // If it is an error then regardless what stage it was, props or
688         // chunks, there will be no more activity
689         fetch_details->SetReadFinished();
690     } else {
691         if (fetch_details->IsBlobPropStage())
692             x_PrepareBlobPropMessage(fetch_details, message, status,
693                                      code, severity);
694         else
695             x_PrepareBlobMessage(fetch_details, message, status,
696                                  code, severity);
697     }
698 
699     SetFinished(fetch_details);
700 }
701 
702 
703 bool
CountError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)704 CPSGS_CassBlobBase::CountError(CRequestStatus::ECode  status,
705                                int  code,
706                                EDiagSev  severity,
707                                const string &  message)
708 {
709     // It could be a message or an error
710     bool    is_error = (severity == eDiag_Error ||
711                         severity == eDiag_Critical ||
712                         severity == eDiag_Fatal);
713 
714     auto *  app = CPubseqGatewayApp::GetInstance();
715     if (status >= CRequestStatus::e400_BadRequest &&
716         status < CRequestStatus::e500_InternalServerError) {
717         PSG_WARNING(message);
718     } else {
719         PSG_ERROR(message);
720     }
721 
722     if (m_Request->NeedTrace()) {
723         m_Reply->SendTrace("Blob error callback; status " + to_string(status),
724                            m_Request->GetStartTimestamp());
725     }
726 
727     if (status == CRequestStatus::e404_NotFound) {
728         app->GetCounters().Increment(CPSGSCounters::ePSGS_GetBlobNotFound);
729     } else {
730         if (is_error) {
731             if (code == CCassandraException::eQueryTimeout)
732                 app->GetCounters().Increment(CPSGSCounters::ePSGS_CassQueryTimeoutError);
733             else
734                 app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
735         }
736     }
737 
738     return is_error;
739 }
740 
741 
742 void
OnGetBlobChunk(bool cancelled,CCassBlobFetch * fetch_details,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)743 CPSGS_CassBlobBase::OnGetBlobChunk(bool  cancelled,
744                                    CCassBlobFetch *  fetch_details,
745                                    const unsigned char *  chunk_data,
746                                    unsigned int  data_size,
747                                    int  chunk_no)
748 {
749     CRequestContextResetter     context_resetter;
750     m_Request->SetRequestContext();
751 
752     if (cancelled) {
753         fetch_details->GetLoader()->Cancel();
754         SetFinished(fetch_details);
755         return;
756     }
757     if (m_Reply->IsFinished()) {
758         CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
759                                             CPSGSCounters::ePSGS_UnknownError);
760         PSG_ERROR("Unexpected data received "
761                   "while the output has finished, ignoring");
762         return;
763     }
764 
765     if (chunk_no >= 0) {
766         if (m_Request->NeedTrace()) {
767             m_Reply->SendTrace("Blob chunk " + to_string(chunk_no) + " callback",
768                                m_Request->GetStartTimestamp());
769         }
770 
771         // A blob chunk; 0-length chunks are allowed too
772         x_PrepareBlobData(fetch_details, chunk_data, data_size, chunk_no);
773     } else {
774         if (m_Request->NeedTrace()) {
775             m_Reply->SendTrace("Blob chunk no-more-data callback",
776                                m_Request->GetStartTimestamp());
777         }
778 
779         // End of the blob
780         x_PrepareBlobCompletion(fetch_details);
781         SetFinished(fetch_details);
782 
783         // Note: no need to set the blob completed in the exclude blob cache.
784         // It will happen in Peek()
785     }
786 }
787 
788 
789 void
x_OnBlobPropNotFound(CCassBlobFetch * fetch_details)790 CPSGS_CassBlobBase::x_OnBlobPropNotFound(CCassBlobFetch *  fetch_details)
791 {
792     // Not found, report 500 because it is data inconsistency
793     // or 404 if it was requested via sat.sat_key
794     auto *  app = CPubseqGatewayApp::GetInstance();
795     app->GetCounters().Increment(CPSGSCounters::ePSGS_BlobPropsNotFoundError);
796 
797     auto    blob_id = fetch_details->GetBlobId();
798     string  message = "Blob " + blob_id.ToString() +
799                       " properties are not found (last modified: " +
800                       to_string(fetch_details->GetLoader()->GetModified()) + ")";
801     if (fetch_details->GetFetchType() == ePSGS_BlobBySatSatKeyFetch) {
802         // User requested wrong sat_key, so it is a client error
803         UpdateOverallStatus(CRequestStatus::e404_NotFound);
804         x_PrepareBlobPropMessage(fetch_details, message,
805                                  CRequestStatus::e404_NotFound,
806                                  ePSGS_BlobPropsNotFound, eDiag_Error);
807     } else {
808         // Server error, data inconsistency
809         PSG_ERROR(message);
810         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
811         x_PrepareBlobPropMessage(fetch_details, message,
812                                  CRequestStatus::e500_InternalServerError,
813                                  ePSGS_BlobPropsNotFound, eDiag_Error);
814     }
815 
816     // The handler deals with 2 kind of requests:
817     // - get blob by sat/sat_key
818     // - get blob by seq_id/seq_id_type
819     auto &      blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
820 
821     if (blob_id == m_BlobId) {
822         if (blob_request.m_ExcludeBlobCacheAdded && !blob_request.m_ClientId.empty()) {
823             app->GetExcludeBlobCache()->Remove(blob_request.m_ClientId,
824                                                m_BlobId.m_Sat,
825                                                m_BlobId.m_SatKey);
826             blob_request.m_ExcludeBlobCacheAdded = false;
827         }
828     }
829 
830     x_PrepareBlobPropCompletion(fetch_details);
831     SetFinished(fetch_details);
832 }
833 
834 
835 bool
x_ParseId2Info(CCassBlobFetch * fetch_details,CBlobRecord const & blob)836 CPSGS_CassBlobBase::x_ParseId2Info(CCassBlobFetch *  fetch_details,
837                                    CBlobRecord const &  blob)
838 {
839     string      err_msg;
840     try {
841         m_Id2Info.reset(new CPSGS_SatInfoChunksVerFlavorId2Info(blob.GetId2Info()));
842         return true;
843     } catch (const exception &  exc) {
844         err_msg = "Error extracting id2 info for the blob " +
845             fetch_details->GetBlobId().ToString() + ": " + exc.what();
846     } catch (...) {
847         err_msg = "Unknown error extracting id2 info for the blob " +
848             fetch_details->GetBlobId().ToString() + ".";
849     }
850 
851     CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
852                                     CPSGSCounters::ePSGS_InvalidId2InfoError);
853     x_PrepareBlobPropMessage(fetch_details, err_msg,
854                              CRequestStatus::e500_InternalServerError,
855                              ePSGS_BadID2Info, eDiag_Error);
856     UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
857     PSG_ERROR(err_msg);
858     return false;
859 }
860 
861 
862 void
SetFinished(CCassBlobFetch * fetch_details)863 CPSGS_CassBlobBase::SetFinished(CCassBlobFetch *  fetch_details)
864 {
865     fetch_details->SetReadFinished();
866 }
867 
868 
869 bool
NeedToAddId2CunkId2Info(void) const870 CPSGS_CassBlobBase::NeedToAddId2CunkId2Info(void) const
871 {
872     auto &      blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
873     if (blob_request.m_TSEOption == SPSGS_BlobRequestBase::ePSGS_OrigTSE)
874         return false;
875 
876     return m_Id2Info.get() != nullptr && m_NeedToParseId2Info == false;
877 }
878 
879 
880 int64_t
x_GetId2ChunkNumber(CCassBlobFetch * fetch_details)881 CPSGS_CassBlobBase::x_GetId2ChunkNumber(CCassBlobFetch *  fetch_details)
882 {
883     // Note: this member is called only when m_Id2Info is parsed successfully
884 
885     auto        blob_key = fetch_details->GetBlobId().m_SatKey;
886     auto        orig_blob_info = m_Id2Info->GetInfo();
887     if (orig_blob_info == blob_key) {
888         // It is a split info chunk so use a special value
889         return kSplitInfoChunk;
890     }
891 
892     // Calculate the id2_chunk
893     return blob_key - orig_blob_info + m_Id2Info->GetChunks() + 1;
894 }
895 
896 
897 void
x_PrepareBlobPropData(CCassBlobFetch * blob_fetch_details,CBlobRecord const & blob)898 CPSGS_CassBlobBase::x_PrepareBlobPropData(CCassBlobFetch *  blob_fetch_details,
899                                           CBlobRecord const &  blob)
900 {
901     bool    need_id2_identification = NeedToAddId2CunkId2Info();
902 
903     // CXX-11547: may be public comments request is needed as well
904     if (blob.GetFlag(EBlobFlags::eSuppress) ||
905         blob.GetFlag(EBlobFlags::eWithdrawn)) {
906         // Request public comment
907         auto                                    app = CPubseqGatewayApp::GetInstance();
908         unique_ptr<CCassPublicCommentFetch>     comment_fetch_details;
909         comment_fetch_details.reset(new CCassPublicCommentFetch());
910         // Memorize the identification which will be used at the moment of
911         // sending the comment to the client
912         if (need_id2_identification) {
913             comment_fetch_details->SetId2Identification(
914                 x_GetId2ChunkNumber(blob_fetch_details),
915                 m_Id2Info->Serialize());
916         } else {
917             comment_fetch_details->SetCassBlobIdentification(
918                 blob_fetch_details->GetBlobId(),
919                 m_LastModified);
920         }
921 
922         CCassStatusHistoryTaskGetPublicComment *    load_task =
923             new CCassStatusHistoryTaskGetPublicComment(app->GetCassandraTimeout(),
924                                                        app->GetCassandraMaxRetries(),
925                                                        app->GetCassandraConnection(),
926                                                        blob_fetch_details->GetBlobId().m_Keyspace,
927                                                        blob, nullptr);
928         comment_fetch_details->SetLoader(load_task);
929         load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
930         load_task->SetErrorCB(
931             CPublicCommentErrorCallback(
932                 bind(&CPSGS_CassBlobBase::OnPublicCommentError,
933                      this, _1, _2, _3, _4, _5),
934                 comment_fetch_details.get()));
935         load_task->SetCommentCallback(
936             CPublicCommentConsumeCallback(
937                 bind(&CPSGS_CassBlobBase::OnPublicComment,
938                      this, _1, _2, _3),
939                 comment_fetch_details.get()));
940         load_task->SetMessages(app->GetPublicCommentsMapping());
941 
942         if (m_Request->NeedTrace()) {
943             m_Reply->SendTrace(
944                 "Cassandra request: " +
945                 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
946                 m_Request->GetStartTimestamp());
947         }
948 
949         m_FetchDetails.push_back(move(comment_fetch_details));
950         load_task->Wait();  // Initiate cassandra request
951     }
952 
953 
954     if (need_id2_identification) {
955         m_Reply->PrepareTSEBlobPropData(
956             blob_fetch_details, m_ProcessorId,
957             x_GetId2ChunkNumber(blob_fetch_details), m_Id2Info->Serialize(),
958             ToJson(blob).Repr(CJsonNode::fStandardJson));
959     } else {
960         // There is no id2info in the originally requested blob
961         // so just send blob props without id2_chunk/id2_info
962         m_Reply->PrepareBlobPropData(
963             blob_fetch_details, m_ProcessorId,
964             ToJson(blob).Repr(CJsonNode::fStandardJson), m_LastModified);
965     }
966 }
967 
968 
969 void
x_PrepareBlobPropCompletion(CCassBlobFetch * fetch_details)970 CPSGS_CassBlobBase::x_PrepareBlobPropCompletion(CCassBlobFetch *  fetch_details)
971 {
972     if (NeedToAddId2CunkId2Info()) {
973         m_Reply->PrepareTSEBlobPropCompletion(fetch_details, m_ProcessorId);
974     } else {
975         // There is no id2info in the originally requested blob
976         // so just send blob prop completion without id2_chunk/id2_info
977         m_Reply->PrepareBlobPropCompletion(fetch_details, m_ProcessorId);
978     }
979 }
980 
981 
982 void
x_PrepareBlobData(CCassBlobFetch * fetch_details,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)983 CPSGS_CassBlobBase::x_PrepareBlobData(CCassBlobFetch *  fetch_details,
984                                       const unsigned char *  chunk_data,
985                                       unsigned int  data_size,
986                                       int  chunk_no)
987 {
988     if (NeedToAddId2CunkId2Info()) {
989         m_Reply->PrepareTSEBlobData(
990             fetch_details, m_ProcessorId,
991             chunk_data, data_size, chunk_no,
992             x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize());
993     } else {
994         // There is no id2info in the originally requested blob
995         // so just send blob prop completion without id2_chunk/id2_info
996         m_Reply->PrepareBlobData(fetch_details, m_ProcessorId,
997                                  chunk_data, data_size, chunk_no,
998                                  m_LastModified);
999     }
1000 }
1001 
1002 
1003 void
x_PrepareBlobCompletion(CCassBlobFetch * fetch_details)1004 CPSGS_CassBlobBase::x_PrepareBlobCompletion(CCassBlobFetch *  fetch_details)
1005 {
1006     if (NeedToAddId2CunkId2Info()) {
1007         m_Reply->PrepareTSEBlobCompletion(fetch_details, m_ProcessorId);
1008     } else {
1009         // There is no id2info in the originally requested blob
1010         // so just send blob prop completion without id2_chunk/id2_info
1011         m_Reply->PrepareBlobCompletion(fetch_details, m_ProcessorId);
1012     }
1013 }
1014 
1015 
1016 void
x_PrepareBlobPropMessage(CCassBlobFetch * fetch_details,const string & message,CRequestStatus::ECode status,int err_code,EDiagSev severity)1017 CPSGS_CassBlobBase::x_PrepareBlobPropMessage(CCassBlobFetch *  fetch_details,
1018                                              const string &  message,
1019                                              CRequestStatus::ECode  status,
1020                                              int  err_code,
1021                                              EDiagSev  severity)
1022 {
1023     if (NeedToAddId2CunkId2Info()) {
1024         m_Reply->PrepareTSEBlobPropMessage(
1025             fetch_details, m_ProcessorId,
1026             x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize(),
1027             message, status, err_code, severity);
1028     } else {
1029         // There is no id2info in the originally requested blob
1030         // so just send blob prop completion without id2_chunk/id2_info
1031         m_Reply->PrepareBlobPropMessage(
1032             fetch_details, m_ProcessorId,
1033             message, status, err_code, severity);
1034     }
1035 }
1036 
1037 
1038 void
x_PrepareBlobMessage(CCassBlobFetch * fetch_details,const string & message,CRequestStatus::ECode status,int err_code,EDiagSev severity)1039 CPSGS_CassBlobBase::x_PrepareBlobMessage(CCassBlobFetch *  fetch_details,
1040                                          const string &  message,
1041                                          CRequestStatus::ECode  status,
1042                                          int  err_code,
1043                                          EDiagSev  severity)
1044 {
1045     if (NeedToAddId2CunkId2Info()) {
1046         m_Reply->PrepareTSEBlobMessage(
1047             fetch_details, m_ProcessorId,
1048             x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize(),
1049             message, status, err_code, severity);
1050     } else {
1051         // There is no id2info in the originally requested blob
1052         // so just send blob prop completion without id2_chunk/id2_info
1053         m_Reply->PrepareBlobMessage(
1054             fetch_details, m_ProcessorId,
1055             message, status, err_code, severity, m_LastModified);
1056     }
1057 }
1058 
1059 
1060 void
x_PrepareBlobExcluded(CCassBlobFetch * fetch_details,const string & blob_id,EPSGS_BlobSkipReason skip_reason)1061 CPSGS_CassBlobBase::x_PrepareBlobExcluded(CCassBlobFetch *  fetch_details,
1062                                           const string &  blob_id,
1063                                           EPSGS_BlobSkipReason  skip_reason)
1064 {
1065     if (NeedToAddId2CunkId2Info()) {
1066         m_Reply->PrepareTSEBlobExcluded(
1067             m_ProcessorId, x_GetId2ChunkNumber(fetch_details),
1068             m_Id2Info->Serialize(), skip_reason);
1069     } else {
1070         // There is no id2info in the originally requested blob
1071         // so just send blob prop completion without id2_chunk/id2_info
1072         m_Reply->PrepareBlobExcluded(blob_id, m_ProcessorId, skip_reason,
1073                                      m_LastModified);
1074     }
1075 }
1076 
1077 
1078 void
OnPublicCommentError(CCassPublicCommentFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)1079 CPSGS_CassBlobBase::OnPublicCommentError(
1080                             CCassPublicCommentFetch *  fetch_details,
1081                             CRequestStatus::ECode  status,
1082                             int  code,
1083                             EDiagSev  severity,
1084                             const string &  message)
1085 {
1086     CRequestContextResetter     context_resetter;
1087     m_Request->SetRequestContext();
1088 
1089     if (m_Cancelled) {
1090         fetch_details->SetReadFinished();
1091         fetch_details->GetLoader()->Cancel();
1092         return;
1093     }
1094 
1095     // To avoid sending an error in Peek()
1096     fetch_details->GetLoader()->ClearError();
1097 
1098     // It could be a message or an error
1099     bool    is_error = (severity == eDiag_Error ||
1100                         severity == eDiag_Critical ||
1101                         severity == eDiag_Fatal);
1102 
1103     auto *  app = CPubseqGatewayApp::GetInstance();
1104     if (status >= CRequestStatus::e400_BadRequest &&
1105         status < CRequestStatus::e500_InternalServerError) {
1106         PSG_WARNING(message);
1107     } else {
1108         PSG_ERROR(message);
1109     }
1110 
1111     if (m_Request->NeedTrace()) {
1112         m_Reply->SendTrace(
1113             "Public comment error callback; status: " + to_string(status),
1114             m_Request->GetStartTimestamp());
1115     }
1116 
1117     m_Reply->PrepareProcessorMessage(
1118         m_Reply->GetItemId(),
1119         m_ProcessorId, message, status, code, severity);
1120 
1121     if (is_error) {
1122         if (code == CCassandraException::eQueryTimeout)
1123             app->GetCounters().Increment(CPSGSCounters::ePSGS_CassQueryTimeoutError);
1124         else
1125             app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
1126 
1127         // If it is an error then there will be no more activity
1128         fetch_details->SetReadFinished();
1129     }
1130 
1131     // Note: is it necessary to call something like x_Peek() of the actual
1132     //       processor class to send this immediately? It should work without
1133     //       this call and at the moment x_Peek() is not available here
1134     // if (m_Reply->IsOutputReady())
1135     //     x_Peek(false);
1136 }
1137 
1138 
1139 void
OnPublicComment(CCassPublicCommentFetch * fetch_details,string comment,bool is_found)1140 CPSGS_CassBlobBase::OnPublicComment(
1141                             CCassPublicCommentFetch *  fetch_details,
1142                             string  comment,
1143                             bool  is_found)
1144 {
1145     CRequestContextResetter     context_resetter;
1146     m_Request->SetRequestContext();
1147 
1148     fetch_details->SetReadFinished();
1149 
1150     if (m_Cancelled) {
1151         fetch_details->GetLoader()->Cancel();
1152         return;
1153     }
1154 
1155     if (m_Request->NeedTrace()) {
1156         m_Reply->SendTrace(
1157             "Public comment callback; found: " + to_string(is_found),
1158             m_Request->GetStartTimestamp());
1159     }
1160 
1161     if (is_found) {
1162         if (fetch_details->GetIdentification() ==
1163                         CCassPublicCommentFetch::ePSGS_ById2) {
1164             m_Reply->PreparePublicComment(
1165                         m_ProcessorId, comment,
1166                         fetch_details->GetId2Chunk(),
1167                         fetch_details->GetId2Info());
1168         } else {
1169             // There is no id2info in the originally requested blob
1170             // so just send blob prop completion without id2_chunk/id2_info
1171             m_Reply->PreparePublicComment(
1172                         m_ProcessorId, comment,
1173                         fetch_details->GetBlobId().ToString(),
1174                         fetch_details->GetLastModified());
1175         }
1176     }
1177 
1178     // Note: is it necessary to call something like x_Peek() of the actual
1179     //       processor class to send this immediately? It should work without
1180     //       this call and at the moment x_Peek() is not available here
1181     // if (m_Reply->IsOutputReady())
1182     //     x_Peek(false);
1183 }
1184 
1185