1 /*  $Id: tse_chunk_processor.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: get TSE chunk processor
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "tse_chunk_processor.hpp"
35 #include "pubseq_gateway.hpp"
36 #include "pubseq_gateway_cache_utils.hpp"
37 #include "pubseq_gateway_convert_utils.hpp"
38 #include "get_blob_callback.hpp"
39 #include "split_history_callback.hpp"
40 
41 USING_NCBI_SCOPE;
42 
43 using namespace std::placeholders;
44 
CPSGS_TSEChunkProcessor()45 CPSGS_TSEChunkProcessor::CPSGS_TSEChunkProcessor() :
46     m_TSEChunkRequest(nullptr)
47 {}
48 
49 
CPSGS_TSEChunkProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority,shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info,shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> id_mod_ver_id2info)50 CPSGS_TSEChunkProcessor::CPSGS_TSEChunkProcessor(
51             shared_ptr<CPSGS_Request> request,
52             shared_ptr<CPSGS_Reply> reply,
53             TProcessorPriority  priority,
54             shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info,
55             shared_ptr<CPSGS_IdModifiedVerFlavorId2Info>    id_mod_ver_id2info) :
56     CPSGS_CassProcessorBase(request, reply, priority),
57     CPSGS_CassBlobBase(request, reply, GetName()),
58     m_SatInfoChunkVerId2Info(sat_info_chunk_ver_id2info),
59     m_IdModVerId2Info(id_mod_ver_id2info)
60 {
61     // Convenience to avoid calling
62     // m_Request->GetRequest<SPSGS_TSEChunkRequest>() everywhere
63     m_TSEChunkRequest = & request->GetRequest<SPSGS_TSEChunkRequest>();
64 }
65 
66 
~CPSGS_TSEChunkProcessor()67 CPSGS_TSEChunkProcessor::~CPSGS_TSEChunkProcessor()
68 {}
69 
70 
71 IPSGS_Processor*
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const72 CPSGS_TSEChunkProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
73                                          shared_ptr<CPSGS_Reply> reply,
74                                          TProcessorPriority  priority) const
75 {
76     if (!IsCassandraProcessorEnabled(request))
77         return nullptr;
78 
79     if (request->GetRequestType() != CPSGS_Request::ePSGS_TSEChunkRequest)
80         return nullptr;
81 
82     auto            tse_chunk_request = & request->GetRequest<SPSGS_TSEChunkRequest>();
83 
84     // CXX-11478: some VDB chunks start with 0 but in Cassandra they always
85     // start with 1. Non negative condition is checked at the time when the
86     // request is received.
87     if (tse_chunk_request->m_Id2Chunk == 0)
88         return nullptr;
89 
90     // Check parseability of the id2_info parameter
91     shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info>  sat_info_chunk_ver_id2info;
92     shared_ptr<CPSGS_IdModifiedVerFlavorId2Info>     id_mod_ver_id2info;
93     if (x_DetectId2InfoFlavor(tse_chunk_request->m_Id2Info,
94                               sat_info_chunk_ver_id2info,
95                               id_mod_ver_id2info) !=
96                                             ePSGS_UnknownId2InfoFlavor) {
97         // Check the DB availability
98         auto *      app = CPubseqGatewayApp::GetInstance();
99         auto        startup_data_state = app->GetStartupDataState();
100         if (startup_data_state != ePSGS_StartupDataOK) {
101             if (request->NeedTrace()) {
102                 reply->SendTrace("Cannot create " + GetName() +
103                                  " processor because Cassandra DB "
104                                  "is not available.\n" +
105                                  GetCassStartupDataStateMessage(startup_data_state),
106                                  request->GetStartTimestamp());
107             }
108             return nullptr;
109         }
110 
111         return new CPSGS_TSEChunkProcessor(request, reply, priority,
112                                            sat_info_chunk_ver_id2info,
113                                            id_mod_ver_id2info);
114     }
115 
116     return nullptr;
117 }
118 
119 
120 EPSGSId2InfoFlavor
x_DetectId2InfoFlavor(const string & id2_info,shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & sat_info_chunk_ver_id2info,shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> & id_mod_ver_id2info) const121 CPSGS_TSEChunkProcessor::x_DetectId2InfoFlavor(
122             const string &                                    id2_info,
123             shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & sat_info_chunk_ver_id2info,
124             shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> &    id_mod_ver_id2info) const
125 {
126     try {
127         // false -> do not count errors
128         sat_info_chunk_ver_id2info.reset(
129             new CPSGS_SatInfoChunksVerFlavorId2Info(id2_info, false));
130         return ePSGS_SatInfoChunksVerId2InfoFlavor;
131     } catch (...) {
132         // Parsing error: may be it is another id2_info format
133     }
134 
135     try {
136         id_mod_ver_id2info.reset(
137             new CPSGS_IdModifiedVerFlavorId2Info(id2_info));
138         return ePSGS_IdModifiedVerId2InfoFlavor;
139     } catch (...) {
140         // Parsing error: may be it is for another processor
141     }
142     return ePSGS_UnknownId2InfoFlavor;
143 }
144 
145 
x_ParseTSEChunkId2Info(const string & info,unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & id2_info,const SCass_BlobId & blob_id,bool need_finish)146 bool CPSGS_TSEChunkProcessor::x_ParseTSEChunkId2Info(
147             const string &                                     info,
148             unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> &  id2_info,
149             const SCass_BlobId &                               blob_id,
150             bool                                               need_finish)
151 {
152     string      err_msg;
153     try {
154         id2_info.reset(new CPSGS_SatInfoChunksVerFlavorId2Info(info));
155         return true;
156     } catch (const exception &  exc) {
157         err_msg = "Error extracting id2 info for blob " +
158                   blob_id.ToString() + ": " + exc.what();
159     } catch (...) {
160         err_msg = "Unknown error while extracting id2 info for blob " +
161                   blob_id.ToString();
162     }
163 
164     auto *  app = CPubseqGatewayApp::GetInstance();
165     app->GetCounters().Increment(CPSGSCounters::ePSGS_InvalidId2InfoError);
166     if (need_finish) {
167         x_SendProcessorError(err_msg, CRequestStatus::e500_InternalServerError,
168                              ePSGS_InvalidId2Info);
169         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
170         m_Completed = true;
171         SignalFinishProcessing();
172     } else {
173         PSG_WARNING(err_msg);
174     }
175     return false;
176 }
177 
178 
179 bool
x_TSEChunkSatToKeyspace(SCass_BlobId & blob_id,bool need_finish)180 CPSGS_TSEChunkProcessor::x_TSEChunkSatToKeyspace(SCass_BlobId &  blob_id,
181                                                  bool  need_finish)
182 {
183     auto *      app = CPubseqGatewayApp::GetInstance();
184     if (app->SatToKeyspace(blob_id.m_Sat, blob_id.m_Keyspace))
185         return true;
186 
187     app->GetCounters().Increment(CPSGSCounters::ePSGS_ServerSatToSatNameError);
188     string  msg = "Unknown TSE chunk satellite number " +
189                   to_string(blob_id.m_Sat) +
190                   " for the blob " + blob_id.ToString();
191     if (need_finish) {
192         x_SendProcessorError(msg, CRequestStatus::e500_InternalServerError,
193                              ePSGS_UnknownResolvedSatellite);
194 
195         // This method is used only in case of the TSE chunk requests.
196         // So in case of errors - synchronous or asynchronous - it is
197         // necessary to finish the reply anyway.
198         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
199         m_Completed = true;
200         SignalFinishProcessing();
201     } else {
202         PSG_WARNING(msg);
203     }
204     return false;
205 }
206 
207 
Process(void)208 void CPSGS_TSEChunkProcessor::Process(void)
209 {
210     if (m_SatInfoChunkVerId2Info.get() != nullptr) {
211         x_ProcessSatInfoChunkVerId2Info();
212         return;
213     }
214     if (m_IdModVerId2Info.get() != nullptr) {
215         x_ProcessIdModVerId2Info();
216         return;
217     }
218 
219     NCBI_THROW(CPubseqGatewayException, eLogic,
220                "Logic error: none of the id2_info options were initialized");
221 }
222 
223 
x_ProcessIdModVerId2Info(void)224 void CPSGS_TSEChunkProcessor::x_ProcessIdModVerId2Info(void)
225 {
226     // This option is when id2info came in a shape of
227     // tse_id~~last_modified~~split_version
228 
229     CRequestContextResetter     context_resetter;
230     IPSGS_Processor::m_Request->SetRequestContext();
231 
232     auto    app = CPubseqGatewayApp::GetInstance();
233     string  err_msg;
234 
235     if (!m_IdModVerId2Info->GetTSEId().MapSatToKeyspace()) {
236         app->GetCounters().Increment(CPSGSCounters::ePSGS_ClientSatToSatNameError);
237 
238         err_msg = GetName() + " failed to map sat " +
239                   to_string(m_IdModVerId2Info->GetTSEId().m_Sat) +
240                   " to a Cassandra keyspace";
241         x_SendProcessorError(err_msg, CRequestStatus::e404_NotFound,
242                              ePSGS_UnknownResolvedSatellite);
243         UpdateOverallStatus(CRequestStatus::e404_NotFound);
244         m_Completed = true;
245         SignalFinishProcessing();
246 
247         if (IPSGS_Processor::m_Reply->IsOutputReady())
248             x_Peek(false);
249         return;
250     }
251 
252     // First, check the blob prop cache, may be the requested version matches
253     // the requested one
254     unique_ptr<CBlobRecord>     blob_record(new CBlobRecord);
255     CPSGCache                   psg_cache(IPSGS_Processor::m_Request,
256                                           IPSGS_Processor::m_Reply);
257 
258     // CXX-11478: last modified is to be ignored for now
259     int64_t     last_modified = INT64_MIN;  // last modified is unknown
260     auto        blob_prop_cache_lookup_result =
261         psg_cache.LookupBlobProp(m_IdModVerId2Info->GetTSEId().m_Sat,
262                                  m_IdModVerId2Info->GetTSEId().m_SatKey,
263                                  last_modified, *blob_record.get());
264     if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
265         do {
266             // Step 1: check the id2info presense
267             if (blob_record->GetId2Info().empty()) {
268                 app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheNotMatched);
269                 PSG_WARNING("Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
270                             " properties id2info is empty in cache");
271                 break;  // Continue with cassandra
272             }
273 
274             // Step 2: check that the id2info is parsable
275             unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info>     cache_id2_info;
276             // false -> do not finish the request
277             if (!x_ParseTSEChunkId2Info(blob_record->GetId2Info(),
278                                         cache_id2_info,
279                                         m_IdModVerId2Info->GetTSEId(),
280                                         false)) {
281                 app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheNotMatched);
282                 break;  // Continue with cassandra
283             }
284 
285             // Step 3: check the split version in cache
286             if (cache_id2_info->GetSplitVersion() != m_IdModVerId2Info->GetSplitVersion()) {
287                 app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheNotMatched);
288                 PSG_WARNING("Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
289                             " split version in cache does not match the requested one");
290                 break;  // Continue with cassandra
291             }
292 
293             app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheMatched);
294 
295             // Step 4: validate the chunk number
296             if (!x_ValidateTSEChunkNumber(m_TSEChunkRequest->m_Id2Chunk,
297                                           cache_id2_info->GetChunks(), false)) {
298                 break; // Continue with cassandra
299             }
300 
301             // Step 5: For the target chunk - convert sat to sat name
302             // Chunk's blob id
303             int64_t         sat_key;
304             if (m_TSEChunkRequest->m_Id2Chunk == kSplitInfoChunk) {
305                 // Special case
306                 sat_key = cache_id2_info->GetInfo();
307             } else {
308                 // For the target chunk - convert sat to sat name chunk's blob id
309                 sat_key = cache_id2_info->GetInfo() -
310                           cache_id2_info->GetChunks() - 1 +
311                           m_TSEChunkRequest->m_Id2Chunk;
312             }
313             SCass_BlobId    chunk_blob_id(cache_id2_info->GetSat(), sat_key);
314             if (!x_TSEChunkSatToKeyspace(chunk_blob_id, false)) {
315                 break;  // Continue with cassandra
316             }
317 
318             // Step 6: search in cache the TSE chunk properties
319             last_modified = INT64_MIN;
320             auto  tse_blob_prop_cache_lookup_result = psg_cache.LookupBlobProp(
321                     chunk_blob_id.m_Sat, chunk_blob_id.m_SatKey,
322                     last_modified, *blob_record.get());
323             if (tse_blob_prop_cache_lookup_result != ePSGS_CacheHit) {
324                 err_msg = "TSE chunk blob " + chunk_blob_id.ToString() +
325                           " properties are not found in cache";
326                 if (tse_blob_prop_cache_lookup_result == ePSGS_CacheFailure)
327                     err_msg += " due to LMDB error";
328                 PSG_WARNING(err_msg);
329                 break;  // Continue with cassandra
330             }
331 
332             // Step 7: initiate the chunk request
333             SPSGS_RequestBase::EPSGS_Trace  trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
334             if (IPSGS_Processor::m_Request->NeedTrace())
335                 trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
336             SPSGS_BlobBySatSatKeyRequest
337                         chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()),
338                                       INT64_MIN,
339                                       SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
340                                       SPSGS_RequestBase::ePSGS_UnknownUseCache,
341                                       "", 0, trace_flag,
342                                       vector<string>(), vector<string>(),
343                                       chrono::high_resolution_clock::now());
344 
345             unique_ptr<CCassBlobFetch>  fetch_details;
346             fetch_details.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
347             CCassBlobTaskLoadBlob *         load_task =
348                 new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
349                                           app->GetCassandraMaxRetries(),
350                                           app->GetCassandraConnection(),
351                                           chunk_blob_id.m_Keyspace,
352                                           move(blob_record),
353                                           true, nullptr);
354             fetch_details->SetLoader(load_task);
355             load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
356             load_task->SetErrorCB(
357                 CGetBlobErrorCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobError,
358                                            this, _1, _2, _3, _4, _5),
359                                       fetch_details.get()));
360             load_task->SetPropsCallback(
361                 CBlobPropCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobProp,
362                                        this, _1, _2, _3),
363                                   IPSGS_Processor::m_Request,
364                                   IPSGS_Processor::m_Reply,
365                                   fetch_details.get(), false));
366             load_task->SetChunkCallback(
367                 CBlobChunkCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobChunk,
368                                         this, _1, _2, _3, _4, _5),
369                                    fetch_details.get()));
370 
371             if (IPSGS_Processor::m_Request->NeedTrace()) {
372                 IPSGS_Processor::m_Reply->SendTrace(
373                     "Cassandra request: " +
374                     ToJson(*load_task).Repr(CJsonNode::fStandardJson),
375                     IPSGS_Processor::m_Request->GetStartTimestamp());
376             }
377 
378             m_FetchDetails.push_back(move(fetch_details));
379             load_task->Wait();  // Initiate cassandra request
380             return;
381         } while (false);
382     } else {
383         if (psg_cache.IsAllowed()) {
384             err_msg = "Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
385                       " properties are not found in cache";
386             if (blob_prop_cache_lookup_result == ePSGS_CacheFailure) {
387                 err_msg += " due to LMDB error";
388                 PSG_WARNING(err_msg);
389             } else {
390                 // This warning could be confusing for the following cases:
391                 // - requested blob has been recently removed
392                 //   but the split history still has the record for it
393                 // - so the cache has nothing but split history directs to the
394                 //   still hanging blo
395                 // The data will be sent back sucessfully and a log file will
396                 // have a confusing warning. So it was decided to comment the
397                 // warning out.
398                 // PSG_WARNING(err_msg);
399             }
400         }
401     }
402 
403 
404     // Here:
405     // - fallback to cassandra
406     // - cache is not allowed
407     // - not found in cache
408 
409     // Initiate async the history request
410     unique_ptr<CCassSplitHistoryFetch>      fetch_details;
411     fetch_details.reset(new CCassSplitHistoryFetch(*m_TSEChunkRequest,
412                                                    m_IdModVerId2Info->GetTSEId(),
413                                                    m_IdModVerId2Info->GetSplitVersion()));
414     CCassBlobTaskFetchSplitHistory *   load_task =
415         new  CCassBlobTaskFetchSplitHistory(app->GetCassandraTimeout(),
416                                             app->GetCassandraMaxRetries(),
417                                             app->GetCassandraConnection(),
418                                             m_IdModVerId2Info->GetTSEId().m_Keyspace,
419                                             m_IdModVerId2Info->GetTSEId().m_SatKey,
420                                             m_IdModVerId2Info->GetSplitVersion(),
421                                             nullptr, nullptr);
422     fetch_details->SetLoader(load_task);
423     load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
424     load_task->SetErrorCB(
425         CSplitHistoryErrorCallback(
426             bind(&CPSGS_TSEChunkProcessor::OnGetSplitHistoryError,
427                  this, _1, _2, _3, _4, _5),
428             fetch_details.get()));
429     load_task->SetConsumeCallback(
430         CSplitHistoryConsumeCallback(
431             bind(&CPSGS_TSEChunkProcessor::OnGetSplitHistory,
432                  this, _1, _2),
433             fetch_details.get()));
434 
435     if (IPSGS_Processor::m_Request->NeedTrace()) {
436         IPSGS_Processor::m_Reply->SendTrace(
437             "Cassandra request: " +
438             ToJson(*load_task).Repr(CJsonNode::fStandardJson),
439             IPSGS_Processor::m_Request->GetStartTimestamp());
440     }
441 
442     m_FetchDetails.push_back(move(fetch_details));
443     load_task->Wait();  // Initiate cassandra request
444 }
445 
446 
x_ProcessSatInfoChunkVerId2Info(void)447 void CPSGS_TSEChunkProcessor::x_ProcessSatInfoChunkVerId2Info(void)
448 {
449     // This option is when id2info came in a shape of sat.info.chunks[.ver]
450 
451     auto    app = CPubseqGatewayApp::GetInstance();
452     string  err_msg;
453 
454     // Note: the TSE id (blob id) is not used in the chunk retrieval.
455     //       So there is no need to map its sat to keyspace. The TSE id
456     //       will be sent to the client as as.
457     // The user provided id2_info is used to calculate the chunk blob_id
458     // so the sat from id2_info will be mapped to the cassandra keyspace
459 
460     // Validate the chunk number
461     // true -> finish if failed
462     if (!x_ValidateTSEChunkNumber(m_TSEChunkRequest->m_Id2Chunk,
463                                   m_SatInfoChunkVerId2Info->GetChunks(),
464                                   true))
465         return;
466 
467     int64_t         sat_key;
468     if (m_TSEChunkRequest->m_Id2Chunk == kSplitInfoChunk) {
469         // Special case
470         sat_key = m_SatInfoChunkVerId2Info->GetInfo();
471     } else {
472         // For the target chunk - convert sat to sat name chunk's blob id
473         sat_key = m_SatInfoChunkVerId2Info->GetInfo() -
474                   m_SatInfoChunkVerId2Info->GetChunks() - 1 + m_TSEChunkRequest->m_Id2Chunk;
475     }
476     SCass_BlobId    chunk_blob_id(m_SatInfoChunkVerId2Info->GetSat(), sat_key);
477     if (!x_TSEChunkSatToKeyspace(chunk_blob_id))
478         return;
479 
480     // Search in cache the TSE chunk properties
481     CPSGCache                   psg_cache(IPSGS_Processor::m_Request,
482                                           IPSGS_Processor::m_Reply);
483     int64_t                     last_modified = INT64_MIN;
484     unique_ptr<CBlobRecord>     blob_record(new CBlobRecord);
485     auto                        tse_blob_prop_cache_lookup_result =
486         psg_cache.LookupBlobProp(chunk_blob_id.m_Sat, chunk_blob_id.m_SatKey,
487                                  last_modified, *blob_record.get());
488 
489 
490     // Initiate the chunk request
491     SPSGS_RequestBase::EPSGS_Trace  trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
492     if (IPSGS_Processor::m_Request->NeedTrace())
493         trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
494     SPSGS_BlobBySatSatKeyRequest
495                         chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()),
496                                       INT64_MIN,
497                                       SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
498                                       SPSGS_RequestBase::ePSGS_UnknownUseCache,
499                                       "", 0, trace_flag,
500                                       vector<string>(), vector<string>(),
501                                       chrono::high_resolution_clock::now());
502 
503     unique_ptr<CCassBlobFetch>  fetch_details;
504     fetch_details.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
505 
506     CCassBlobTaskLoadBlob *         load_task = nullptr;
507     if (tse_blob_prop_cache_lookup_result != ePSGS_CacheHit) {
508         // Cassandra should look for blob props as well
509         load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
510                                               app->GetCassandraMaxRetries(),
511                                               app->GetCassandraConnection(),
512                                               chunk_blob_id.m_Keyspace,
513                                               chunk_blob_id.m_SatKey,
514                                               true, nullptr);
515     } else {
516         // Blob props are already here
517         load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
518                                               app->GetCassandraMaxRetries(),
519                                               app->GetCassandraConnection(),
520                                               chunk_blob_id.m_Keyspace,
521                                               move(blob_record),
522                                               true, nullptr);
523     }
524 
525     fetch_details->SetLoader(load_task);
526     load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
527     load_task->SetErrorCB(
528         CGetBlobErrorCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobError,
529                                    this, _1, _2, _3, _4, _5),
530                               fetch_details.get()));
531     load_task->SetPropsCallback(
532         CBlobPropCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobProp,
533                                 this, _1, _2, _3),
534                           IPSGS_Processor::m_Request,
535                           IPSGS_Processor::m_Reply,
536                           fetch_details.get(), false));
537     load_task->SetChunkCallback(
538         CBlobChunkCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobChunk,
539                                 this, _1, _2, _3, _4, _5),
540                            fetch_details.get()));
541 
542     if (IPSGS_Processor::m_Request->NeedTrace()) {
543         IPSGS_Processor::m_Reply->SendTrace(
544                     "Cassandra request: " +
545                     ToJson(*load_task).Repr(CJsonNode::fStandardJson),
546                     IPSGS_Processor::m_Request->GetStartTimestamp());
547     }
548 
549     m_FetchDetails.push_back(move(fetch_details));
550     load_task->Wait();  // Initiate cassandra request
551 }
552 
553 
OnGetBlobProp(CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool is_found)554 void CPSGS_TSEChunkProcessor::OnGetBlobProp(CCassBlobFetch *  fetch_details,
555                                             CBlobRecord const &  blob,
556                                             bool is_found)
557 {
558     if (m_Cancelled) {
559         m_Completed = true;
560         return;
561     }
562 
563     if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
564         m_Completed = true;
565         return;
566     }
567 
568     // Note: cannot use CPSGS_CassBlobBase::OnGetBlobChunk() anymore because
569     // the reply has to have a few more fields for ID/get_tse_chunk request
570     CRequestContextResetter     context_resetter;
571     IPSGS_Processor::m_Request->SetRequestContext();
572 
573     if (IPSGS_Processor::m_Request->NeedTrace()) {
574         IPSGS_Processor::m_Reply->SendTrace(
575                 "Blob prop callback; found: " + to_string(is_found),
576                 IPSGS_Processor::m_Request->GetStartTimestamp());
577     }
578 
579     if (is_found) {
580         IPSGS_Processor::m_Reply->PrepareTSEBlobPropData(
581                 fetch_details, GetName(),
582                 m_TSEChunkRequest->m_Id2Chunk,
583                 m_TSEChunkRequest->m_Id2Info,
584                 ToJson(blob).Repr(CJsonNode::fStandardJson));
585         IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
586                 fetch_details, GetName());
587     } else {
588         // Not found; it is the user error, not the data inconsistency
589         auto *  app = CPubseqGatewayApp::GetInstance();
590         app->GetCounters().Increment(CPSGSCounters::ePSGS_BlobPropsNotFoundError);
591 
592         auto    blob_id = fetch_details->GetBlobId();
593         string  message = "Blob " + blob_id.ToString() + " properties are not found";
594         PSG_WARNING(message);
595         UpdateOverallStatus(CRequestStatus::e404_NotFound);
596         IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
597                 fetch_details, GetName(),
598                 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
599                 message, CRequestStatus::e404_NotFound,
600                 ePSGS_BlobPropsNotFound, eDiag_Error);
601         IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
602                 fetch_details, GetName());
603         SetFinished(fetch_details);
604     }
605 
606     if (IPSGS_Processor::m_Reply->IsOutputReady())
607         x_Peek(false);
608 }
609 
610 
OnGetBlobError(CCassBlobFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)611 void CPSGS_TSEChunkProcessor::OnGetBlobError(CCassBlobFetch *  fetch_details,
612                                              CRequestStatus::ECode  status,
613                                              int  code,
614                                              EDiagSev  severity,
615                                              const string &  message)
616 {
617     if (m_Cancelled) {
618         m_Completed = true;
619         return;
620     }
621 
622     // Cannot use CPSGS_CassBlobBase::OnGetBlobError() anymore because
623     // the TSE messages have different parameters
624 
625     CRequestContextResetter     context_resetter;
626     IPSGS_Processor::m_Request->SetRequestContext();
627 
628     // To avoid sending an error in Peek()
629     fetch_details->GetLoader()->ClearError();
630 
631     // It could be a message or an error
632     bool    is_error = CountError(status, code, severity, message);
633 
634     if (is_error) {
635         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
636 
637         if (fetch_details->IsBlobPropStage()) {
638             IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
639                 fetch_details, GetName(),
640                 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
641                 message, CRequestStatus::e500_InternalServerError, code, severity);
642             IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
643                 fetch_details, GetName());
644         } else {
645             IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
646                 fetch_details, GetName(),
647                 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
648                 message, CRequestStatus::e500_InternalServerError, code, severity);
649             IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
650                 fetch_details, GetName());
651         }
652 
653         // If it is an error then regardless what stage it was, props or
654         // chunks, there will be no more activity
655         fetch_details->SetReadFinished();
656     } else {
657         if (fetch_details->IsBlobPropStage())
658             IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
659                 fetch_details, GetName(),
660                 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
661                 message, status, code, severity);
662         else
663             IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
664                 fetch_details, GetName(),
665                 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
666                 message, status, code, severity);
667     }
668 
669     SetFinished(fetch_details);
670 
671     if (IPSGS_Processor::m_Reply->IsOutputReady())
672         x_Peek(false);
673 }
674 
675 
OnGetBlobChunk(CCassBlobFetch * fetch_details,CBlobRecord const & blob,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)676 void CPSGS_TSEChunkProcessor::OnGetBlobChunk(CCassBlobFetch *  fetch_details,
677                                              CBlobRecord const &  blob,
678                                              const unsigned char *  chunk_data,
679                                              unsigned int  data_size,
680                                              int  chunk_no)
681 {
682     // Note: cannot use CPSGS_CassBlobBase::OnGetBlobChunk() anymore because
683     // the reply has to have a few more fields for TSE chunks
684     CRequestContextResetter     context_resetter;
685     IPSGS_Processor::m_Request->SetRequestContext();
686 
687     if (m_Cancelled) {
688         fetch_details->GetLoader()->Cancel();
689         SetFinished(fetch_details);
690         if (IPSGS_Processor::m_Reply->IsOutputReady())
691             x_Peek(false);
692         return;
693     }
694     if (IPSGS_Processor::m_Reply->IsFinished()) {
695         CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
696                                             CPSGSCounters::ePSGS_UnknownError);
697         PSG_ERROR("Unexpected data received "
698                   "while the output has finished, ignoring");
699         if (IPSGS_Processor::m_Reply->IsOutputReady())
700             x_Peek(false);
701         return;
702     }
703 
704     if (chunk_no >= 0) {
705         if (IPSGS_Processor::m_Request->NeedTrace()) {
706             IPSGS_Processor::m_Reply->SendTrace(
707                     "Blob chunk " + to_string(chunk_no) + " callback",
708                     IPSGS_Processor::m_Request->GetStartTimestamp());
709         }
710 
711         // A blob chunk; 0-length chunks are allowed too
712         IPSGS_Processor::m_Reply->PrepareTSEBlobData(
713                 fetch_details, GetName(),
714                 chunk_data, data_size, chunk_no,
715                 m_TSEChunkRequest->m_Id2Chunk,
716                 m_TSEChunkRequest->m_Id2Info);
717     } else {
718         if (IPSGS_Processor::m_Request->NeedTrace()) {
719             IPSGS_Processor::m_Reply->SendTrace(
720                     "Blob chunk no-more-data callback",
721                     IPSGS_Processor::m_Request->GetStartTimestamp());
722         }
723 
724         // End of the blob
725         IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
726                 fetch_details, GetName());
727         SetFinished(fetch_details);
728 
729         // Note: no need to set the blob completed in the exclude blob cache.
730         // It will happen in Peek()
731     }
732 
733     if (IPSGS_Processor::m_Reply->IsOutputReady())
734         x_Peek(false);
735 }
736 
737 
738 void
OnGetSplitHistoryError(CCassSplitHistoryFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)739 CPSGS_TSEChunkProcessor::OnGetSplitHistoryError(
740                                     CCassSplitHistoryFetch *  fetch_details,
741                                     CRequestStatus::ECode  status,
742                                     int  code,
743                                     EDiagSev  severity,
744                                     const string &  message)
745 {
746     if (m_Cancelled) {
747         m_Completed = true;
748         return;
749     }
750 
751     CRequestContextResetter     context_resetter;
752     IPSGS_Processor::m_Request->SetRequestContext();
753 
754     // To avoid sending an error in Peek()
755     fetch_details->GetLoader()->ClearError();
756 
757     // It could be a message or an error
758     bool    is_error = (severity == eDiag_Error ||
759                         severity == eDiag_Critical ||
760                         severity == eDiag_Fatal);
761 
762     auto *  app = CPubseqGatewayApp::GetInstance();
763     if (status >= CRequestStatus::e400_BadRequest &&
764         status < CRequestStatus::e500_InternalServerError) {
765         PSG_WARNING(message);
766     } else {
767         PSG_ERROR(message);
768     }
769 
770     if (IPSGS_Processor::m_Request->NeedTrace()) {
771         IPSGS_Processor::m_Reply->SendTrace(
772                 "Split history error callback; status: " + to_string(status),
773                 IPSGS_Processor::m_Request->GetStartTimestamp());
774     }
775 
776     IPSGS_Processor::m_Reply->PrepareProcessorMessage(
777             IPSGS_Processor::m_Reply->GetItemId(),
778             GetName(), message, status, code, severity);
779 
780     if (is_error) {
781         if (code == CCassandraException::eQueryTimeout)
782             app->GetCounters().Increment(CPSGSCounters::ePSGS_CassQueryTimeoutError);
783         else
784             app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
785 
786         // If it is an error then there will be no more activity
787         fetch_details->SetReadFinished();
788     }
789 
790     if (IPSGS_Processor::m_Reply->IsOutputReady())
791         x_Peek(false);
792 }
793 
794 
795 void
OnGetSplitHistory(CCassSplitHistoryFetch * fetch_details,vector<SSplitHistoryRecord> && result)796 CPSGS_TSEChunkProcessor::OnGetSplitHistory(
797                                     CCassSplitHistoryFetch *  fetch_details,
798                                     vector<SSplitHistoryRecord> && result)
799 {
800     CRequestContextResetter     context_resetter;
801     IPSGS_Processor::m_Request->SetRequestContext();
802 
803     fetch_details->SetReadFinished();
804 
805     if (m_Cancelled) {
806         fetch_details->GetLoader()->Cancel();
807         return;
808     }
809 
810     if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
811         m_Completed = true;
812         return;
813     }
814 
815     if (IPSGS_Processor::m_Request->NeedTrace()) {
816         IPSGS_Processor::m_Reply->SendTrace(
817                 "Split history callback; number of records: " + to_string(result.size()),
818                 IPSGS_Processor::m_Request->GetStartTimestamp());
819     }
820 
821     auto *      app = CPubseqGatewayApp::GetInstance();
822     if (result.empty()) {
823         // Split history is not found
824         app->GetCounters().Increment(CPSGSCounters::ePSGS_SplitHistoryNotFoundError);
825 
826         string      message = "Split history version " +
827                               to_string(fetch_details->GetSplitVersion()) +
828                               " is not found for the TSE id " +
829                               fetch_details->GetTSEId().ToString();
830         PSG_WARNING(message);
831         UpdateOverallStatus(CRequestStatus::e404_NotFound);
832         IPSGS_Processor::m_Reply->PrepareProcessorMessage(
833                 IPSGS_Processor::m_Reply->GetItemId(),
834                 GetName(), message, CRequestStatus::e404_NotFound,
835                 ePSGS_SplitHistoryNotFound, eDiag_Error);
836         SignalFinishProcessing();
837     } else {
838         // Split history found.
839         // Note: the request was issued so that there could be exactly one
840         // split history record or none at all. So it is not checked that
841         // there are more than one record.
842         x_RequestTSEChunk(result[0], fetch_details);
843     }
844 
845     if (IPSGS_Processor::m_Reply->IsOutputReady())
846         x_Peek(false);
847 }
848 
849 
850 void
x_RequestTSEChunk(const SSplitHistoryRecord & split_record,CCassSplitHistoryFetch * fetch_details)851 CPSGS_TSEChunkProcessor::x_RequestTSEChunk(
852                                     const SSplitHistoryRecord &  split_record,
853                                     CCassSplitHistoryFetch *  fetch_details)
854 {
855     // Parse id2info
856     unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info>     id2_info;
857     if (!x_ParseTSEChunkId2Info(split_record.id2_info,
858                                 id2_info, fetch_details->GetTSEId(), true))
859         return;
860 
861     // Check the requested chunk
862     // true -> finish the request if failed
863     if (!x_ValidateTSEChunkNumber(fetch_details->GetChunk(),
864                                   id2_info->GetChunks(), true))
865         return;
866 
867     // Resolve sat to satkey
868     int64_t         sat_key;
869     if (fetch_details->GetChunk() == kSplitInfoChunk) {
870         // Special case
871         sat_key = id2_info->GetInfo();
872     } else {
873         sat_key = id2_info->GetInfo() - id2_info->GetChunks() - 1 +
874                   fetch_details->GetChunk();
875     }
876     SCass_BlobId    chunk_blob_id(id2_info->GetSat(), sat_key);
877     if (!x_TSEChunkSatToKeyspace(chunk_blob_id, true))
878         return;
879 
880     // Look for the blob props
881     // Form the chunk request with/without blob props
882     unique_ptr<CBlobRecord>     blob_record(new CBlobRecord);
883     CPSGCache                   psg_cache(
884             fetch_details->GetUseCache() != SPSGS_RequestBase::ePSGS_DbOnly,
885             IPSGS_Processor::m_Request,
886             IPSGS_Processor::m_Reply);
887     int64_t                     last_modified = INT64_MIN;  // last modified is unknown
888     auto                        blob_prop_cache_lookup_result =
889         psg_cache.LookupBlobProp(chunk_blob_id.m_Sat,
890                                  chunk_blob_id.m_SatKey,
891                                  last_modified, *blob_record.get());
892     if (blob_prop_cache_lookup_result != ePSGS_CacheHit &&
893         fetch_details->GetUseCache() == SPSGS_RequestBase::ePSGS_CacheOnly) {
894         // Cassandra is forbidden for the blob prop
895         string  err_msg = "TSE chunk blob " + chunk_blob_id.ToString() +
896                           " properties are not found in cache";
897         if (blob_prop_cache_lookup_result == ePSGS_CacheFailure)
898             err_msg += " due to LMDB error";
899         x_SendProcessorError(err_msg, CRequestStatus::e404_NotFound,
900                              ePSGS_BlobPropsNotFound);
901         UpdateOverallStatus(CRequestStatus::e404_NotFound);
902         SignalFinishProcessing();
903         return;
904     }
905 
906     SPSGS_RequestBase::EPSGS_Trace  trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
907     if (IPSGS_Processor::m_Request->NeedTrace())
908         trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
909     SPSGS_BlobBySatSatKeyRequest
910         chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()), INT64_MIN,
911                       SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
912                       SPSGS_RequestBase::ePSGS_UnknownUseCache,
913                       "", 0, trace_flag,
914                       vector<string>(), vector<string>(),
915                       chrono::high_resolution_clock::now());
916     unique_ptr<CCassBlobFetch>  cass_blob_fetch;
917     cass_blob_fetch.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
918 
919     auto    app = CPubseqGatewayApp::GetInstance();
920     CCassBlobTaskLoadBlob *     load_task = nullptr;
921 
922     if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
923         load_task = new CCassBlobTaskLoadBlob(
924                             app->GetCassandraTimeout(),
925                             app->GetCassandraMaxRetries(),
926                             app->GetCassandraConnection(),
927                             chunk_blob_id.m_Keyspace,
928                             move(blob_record),
929                             true, nullptr);
930     } else {
931         load_task = new CCassBlobTaskLoadBlob(
932                             app->GetCassandraTimeout(),
933                             app->GetCassandraMaxRetries(),
934                             app->GetCassandraConnection(),
935                             chunk_blob_id.m_Keyspace,
936                             chunk_blob_id.m_SatKey,
937                             true, nullptr);
938     }
939     cass_blob_fetch->SetLoader(load_task);
940 
941     load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
942     load_task->SetErrorCB(
943         CGetBlobErrorCallback(
944             bind(&CPSGS_TSEChunkProcessor::OnGetBlobError,
945                  this, _1, _2, _3, _4, _5),
946             cass_blob_fetch.get()));
947     load_task->SetPropsCallback(
948         CBlobPropCallback(
949             bind(&CPSGS_TSEChunkProcessor::OnGetBlobProp,
950                  this, _1, _2, _3),
951             IPSGS_Processor::m_Request,
952             IPSGS_Processor::m_Reply,
953             cass_blob_fetch.get(),
954             blob_prop_cache_lookup_result != ePSGS_CacheHit));
955     load_task->SetChunkCallback(
956         CBlobChunkCallback(
957             bind(&CPSGS_TSEChunkProcessor::OnGetBlobChunk,
958                  this, _1, _2, _3, _4, _5),
959             cass_blob_fetch.get()));
960 
961     if (IPSGS_Processor::m_Request->NeedTrace()) {
962         IPSGS_Processor::m_Reply->SendTrace(
963                     "Cassandra request: " +
964                     ToJson(*load_task).Repr(CJsonNode::fStandardJson),
965                     IPSGS_Processor::m_Request->GetStartTimestamp());
966     }
967 
968     m_FetchDetails.push_back(move(cass_blob_fetch));
969     load_task->Wait();
970 }
971 
972 
973 void
x_SendProcessorError(const string & msg,CRequestStatus::ECode status,int code)974 CPSGS_TSEChunkProcessor::x_SendProcessorError(const string &  msg,
975                                               CRequestStatus::ECode  status,
976                                               int  code)
977 {
978     if (m_Cancelled)
979         return;
980 
981     CRequestContextResetter     context_resetter;
982     IPSGS_Processor::m_Request->SetRequestContext();
983 
984     IPSGS_Processor::m_Reply->PrepareProcessorMessage(
985                 IPSGS_Processor::m_Reply->GetItemId(),
986                 GetName(), msg, status, code, eDiag_Error);
987     UpdateOverallStatus(status);
988 
989     if (status >= CRequestStatus::e400_BadRequest &&
990         status < CRequestStatus::e500_InternalServerError) {
991         PSG_WARNING(msg);
992     } else {
993         PSG_ERROR(msg);
994     }
995 }
996 
997 
998 bool
x_ValidateTSEChunkNumber(int64_t requested_chunk,CPSGS_SatInfoChunksVerFlavorId2Info::TChunks total_chunks,bool need_finish)999 CPSGS_TSEChunkProcessor::x_ValidateTSEChunkNumber(
1000                                 int64_t  requested_chunk,
1001                                 CPSGS_SatInfoChunksVerFlavorId2Info::TChunks  total_chunks,
1002                                 bool  need_finish)
1003 {
1004     if (requested_chunk == kSplitInfoChunk) {
1005         // Special value: the info chunk must be provided
1006         return true;
1007     }
1008 
1009     if (requested_chunk > total_chunks) {
1010         string      msg = "Invalid chunk requested. "
1011                           "The number of available chunks: " +
1012                           to_string(total_chunks) + ", requested number: " +
1013                           to_string(requested_chunk);
1014         if (need_finish) {
1015             auto *  app = CPubseqGatewayApp::GetInstance();
1016             app->GetCounters().Increment(CPSGSCounters::ePSGS_MalformedArgs);
1017             x_SendProcessorError(msg, CRequestStatus::e400_BadRequest,
1018                                  ePSGS_MalformedParameter);
1019             UpdateOverallStatus(CRequestStatus::e400_BadRequest);
1020             m_Completed = true;
1021             SignalFinishProcessing();
1022         } else {
1023             PSG_WARNING(msg);
1024         }
1025         return false;
1026     }
1027     return true;
1028 }
1029 
1030 
1031 bool
x_TSEChunkSatToKeyspace(SCass_BlobId & blob_id)1032 CPSGS_TSEChunkProcessor::x_TSEChunkSatToKeyspace(SCass_BlobId &  blob_id)
1033 {
1034     auto *      app = CPubseqGatewayApp::GetInstance();
1035     if (app->SatToKeyspace(blob_id.m_Sat, blob_id.m_Keyspace))
1036         return true;
1037 
1038     app->GetCounters().Increment(CPSGSCounters::ePSGS_ClientSatToSatNameError);
1039 
1040     string  msg = "Unknown TSE chunk satellite number " +
1041                   to_string(blob_id.m_Sat) +
1042                   " for the blob " + blob_id.ToString();
1043     x_SendProcessorError(msg, CRequestStatus::e500_InternalServerError,
1044                          ePSGS_UnknownResolvedSatellite);
1045 
1046     // This method is used only in case of the TSE chunk requests.
1047     // So in case of errors - synchronous or asynchronous - it is
1048     // necessary to finish the reply anyway.
1049     UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
1050     m_Completed = true;
1051     SignalFinishProcessing();
1052     return false;
1053 }
1054 
1055 
Cancel(void)1056 void CPSGS_TSEChunkProcessor::Cancel(void)
1057 {
1058     m_Cancelled = true;
1059     CancelLoaders();
1060 }
1061 
1062 
GetStatus(void)1063 IPSGS_Processor::EPSGS_Status CPSGS_TSEChunkProcessor::GetStatus(void)
1064 {
1065     auto    status = CPSGS_CassProcessorBase::GetStatus();
1066     if (status == IPSGS_Processor::ePSGS_InProgress)
1067         return status;
1068 
1069     if (m_Cancelled)
1070         return IPSGS_Processor::ePSGS_Cancelled;
1071 
1072     return status;
1073 }
1074 
1075 
GetName(void) const1076 string CPSGS_TSEChunkProcessor::GetName(void) const
1077 {
1078     return "Cassandra-gettsechunk";
1079 }
1080 
1081 
ProcessEvent(void)1082 void CPSGS_TSEChunkProcessor::ProcessEvent(void)
1083 {
1084     x_Peek(true);
1085 }
1086 
1087 
x_Peek(bool need_wait)1088 void CPSGS_TSEChunkProcessor::x_Peek(bool  need_wait)
1089 {
1090     if (m_Cancelled)
1091         return;
1092 
1093     if (m_InPeek)
1094         return;
1095     m_InPeek = true;
1096 
1097     // 1 -> call m_Loader->Wait1 to pick data
1098     // 2 -> check if we have ready-to-send buffers
1099     // 3 -> call reply->Send()  to send what we have if it is ready
1100     bool        overall_final_state = false;
1101 
1102     while (true) {
1103         auto initial_size = m_FetchDetails.size();
1104 
1105         for (auto &  details: m_FetchDetails) {
1106             if (details)
1107                 overall_final_state |= x_Peek(details, need_wait);
1108         }
1109 
1110         if (initial_size == m_FetchDetails.size())
1111             break;
1112     }
1113 
1114     // TSE chunk: ready packets need to be sent right away
1115     if (IPSGS_Processor::m_Reply->IsOutputReady())
1116         IPSGS_Processor::m_Reply->Flush(false);
1117 
1118     m_InPeek = false;
1119 }
1120 
1121 
x_Peek(unique_ptr<CCassFetch> & fetch_details,bool need_wait)1122 bool CPSGS_TSEChunkProcessor::x_Peek(unique_ptr<CCassFetch> &  fetch_details,
1123                                      bool  need_wait)
1124 {
1125     if (!fetch_details->GetLoader())
1126         return true;
1127 
1128     bool    final_state = false;
1129     if (need_wait) {
1130         if (!fetch_details->ReadFinished()) {
1131             final_state = fetch_details->GetLoader()->Wait();
1132         }
1133     }
1134 
1135     if (fetch_details->GetLoader()->HasError() &&
1136             IPSGS_Processor::m_Reply->IsOutputReady() &&
1137             ! IPSGS_Processor::m_Reply->IsFinished()) {
1138         // Send an error
1139         string      error = fetch_details->GetLoader()->LastError();
1140         auto *      app = CPubseqGatewayApp::GetInstance();
1141 
1142         app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
1143         PSG_ERROR(error);
1144 
1145         CCassBlobFetch *  blob_fetch = static_cast<CCassBlobFetch *>(fetch_details.get());
1146         if (blob_fetch->IsBlobPropStage()) {
1147             IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
1148                 blob_fetch, GetName(),
1149                 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
1150                 error, CRequestStatus::e500_InternalServerError,
1151                 ePSGS_UnknownError, eDiag_Error);
1152             IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
1153                     blob_fetch, GetName());
1154         } else {
1155             IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
1156                 blob_fetch, GetName(),
1157                 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
1158                 error, CRequestStatus::e500_InternalServerError,
1159                 ePSGS_UnknownError, eDiag_Error);
1160             IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
1161                 blob_fetch, GetName());
1162         }
1163 
1164         // Mark finished
1165         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
1166         fetch_details->SetReadFinished();
1167         SignalFinishProcessing();
1168     }
1169 
1170     return final_state;
1171 }
1172 
1173