1 /* $Id: tse_chunk_processor.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description: get TSE chunk processor
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "tse_chunk_processor.hpp"
35 #include "pubseq_gateway.hpp"
36 #include "pubseq_gateway_cache_utils.hpp"
37 #include "pubseq_gateway_convert_utils.hpp"
38 #include "get_blob_callback.hpp"
39 #include "split_history_callback.hpp"
40
41 USING_NCBI_SCOPE;
42
43 using namespace std::placeholders;
44
CPSGS_TSEChunkProcessor()45 CPSGS_TSEChunkProcessor::CPSGS_TSEChunkProcessor() :
46 m_TSEChunkRequest(nullptr)
47 {}
48
49
CPSGS_TSEChunkProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority,shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info,shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> id_mod_ver_id2info)50 CPSGS_TSEChunkProcessor::CPSGS_TSEChunkProcessor(
51 shared_ptr<CPSGS_Request> request,
52 shared_ptr<CPSGS_Reply> reply,
53 TProcessorPriority priority,
54 shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info,
55 shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> id_mod_ver_id2info) :
56 CPSGS_CassProcessorBase(request, reply, priority),
57 CPSGS_CassBlobBase(request, reply, GetName()),
58 m_SatInfoChunkVerId2Info(sat_info_chunk_ver_id2info),
59 m_IdModVerId2Info(id_mod_ver_id2info)
60 {
61 // Convenience to avoid calling
62 // m_Request->GetRequest<SPSGS_TSEChunkRequest>() everywhere
63 m_TSEChunkRequest = & request->GetRequest<SPSGS_TSEChunkRequest>();
64 }
65
66
~CPSGS_TSEChunkProcessor()67 CPSGS_TSEChunkProcessor::~CPSGS_TSEChunkProcessor()
68 {}
69
70
71 IPSGS_Processor*
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const72 CPSGS_TSEChunkProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
73 shared_ptr<CPSGS_Reply> reply,
74 TProcessorPriority priority) const
75 {
76 if (!IsCassandraProcessorEnabled(request))
77 return nullptr;
78
79 if (request->GetRequestType() != CPSGS_Request::ePSGS_TSEChunkRequest)
80 return nullptr;
81
82 auto tse_chunk_request = & request->GetRequest<SPSGS_TSEChunkRequest>();
83
84 // CXX-11478: some VDB chunks start with 0 but in Cassandra they always
85 // start with 1. Non negative condition is checked at the time when the
86 // request is received.
87 if (tse_chunk_request->m_Id2Chunk == 0)
88 return nullptr;
89
90 // Check parseability of the id2_info parameter
91 shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info;
92 shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> id_mod_ver_id2info;
93 if (x_DetectId2InfoFlavor(tse_chunk_request->m_Id2Info,
94 sat_info_chunk_ver_id2info,
95 id_mod_ver_id2info) !=
96 ePSGS_UnknownId2InfoFlavor) {
97 // Check the DB availability
98 auto * app = CPubseqGatewayApp::GetInstance();
99 auto startup_data_state = app->GetStartupDataState();
100 if (startup_data_state != ePSGS_StartupDataOK) {
101 if (request->NeedTrace()) {
102 reply->SendTrace("Cannot create " + GetName() +
103 " processor because Cassandra DB "
104 "is not available.\n" +
105 GetCassStartupDataStateMessage(startup_data_state),
106 request->GetStartTimestamp());
107 }
108 return nullptr;
109 }
110
111 return new CPSGS_TSEChunkProcessor(request, reply, priority,
112 sat_info_chunk_ver_id2info,
113 id_mod_ver_id2info);
114 }
115
116 return nullptr;
117 }
118
119
120 EPSGSId2InfoFlavor
x_DetectId2InfoFlavor(const string & id2_info,shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & sat_info_chunk_ver_id2info,shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> & id_mod_ver_id2info) const121 CPSGS_TSEChunkProcessor::x_DetectId2InfoFlavor(
122 const string & id2_info,
123 shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & sat_info_chunk_ver_id2info,
124 shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> & id_mod_ver_id2info) const
125 {
126 try {
127 // false -> do not count errors
128 sat_info_chunk_ver_id2info.reset(
129 new CPSGS_SatInfoChunksVerFlavorId2Info(id2_info, false));
130 return ePSGS_SatInfoChunksVerId2InfoFlavor;
131 } catch (...) {
132 // Parsing error: may be it is another id2_info format
133 }
134
135 try {
136 id_mod_ver_id2info.reset(
137 new CPSGS_IdModifiedVerFlavorId2Info(id2_info));
138 return ePSGS_IdModifiedVerId2InfoFlavor;
139 } catch (...) {
140 // Parsing error: may be it is for another processor
141 }
142 return ePSGS_UnknownId2InfoFlavor;
143 }
144
145
x_ParseTSEChunkId2Info(const string & info,unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & id2_info,const SCass_BlobId & blob_id,bool need_finish)146 bool CPSGS_TSEChunkProcessor::x_ParseTSEChunkId2Info(
147 const string & info,
148 unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & id2_info,
149 const SCass_BlobId & blob_id,
150 bool need_finish)
151 {
152 string err_msg;
153 try {
154 id2_info.reset(new CPSGS_SatInfoChunksVerFlavorId2Info(info));
155 return true;
156 } catch (const exception & exc) {
157 err_msg = "Error extracting id2 info for blob " +
158 blob_id.ToString() + ": " + exc.what();
159 } catch (...) {
160 err_msg = "Unknown error while extracting id2 info for blob " +
161 blob_id.ToString();
162 }
163
164 auto * app = CPubseqGatewayApp::GetInstance();
165 app->GetCounters().Increment(CPSGSCounters::ePSGS_InvalidId2InfoError);
166 if (need_finish) {
167 x_SendProcessorError(err_msg, CRequestStatus::e500_InternalServerError,
168 ePSGS_InvalidId2Info);
169 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
170 m_Completed = true;
171 SignalFinishProcessing();
172 } else {
173 PSG_WARNING(err_msg);
174 }
175 return false;
176 }
177
178
179 bool
x_TSEChunkSatToKeyspace(SCass_BlobId & blob_id,bool need_finish)180 CPSGS_TSEChunkProcessor::x_TSEChunkSatToKeyspace(SCass_BlobId & blob_id,
181 bool need_finish)
182 {
183 auto * app = CPubseqGatewayApp::GetInstance();
184 if (app->SatToKeyspace(blob_id.m_Sat, blob_id.m_Keyspace))
185 return true;
186
187 app->GetCounters().Increment(CPSGSCounters::ePSGS_ServerSatToSatNameError);
188 string msg = "Unknown TSE chunk satellite number " +
189 to_string(blob_id.m_Sat) +
190 " for the blob " + blob_id.ToString();
191 if (need_finish) {
192 x_SendProcessorError(msg, CRequestStatus::e500_InternalServerError,
193 ePSGS_UnknownResolvedSatellite);
194
195 // This method is used only in case of the TSE chunk requests.
196 // So in case of errors - synchronous or asynchronous - it is
197 // necessary to finish the reply anyway.
198 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
199 m_Completed = true;
200 SignalFinishProcessing();
201 } else {
202 PSG_WARNING(msg);
203 }
204 return false;
205 }
206
207
Process(void)208 void CPSGS_TSEChunkProcessor::Process(void)
209 {
210 if (m_SatInfoChunkVerId2Info.get() != nullptr) {
211 x_ProcessSatInfoChunkVerId2Info();
212 return;
213 }
214 if (m_IdModVerId2Info.get() != nullptr) {
215 x_ProcessIdModVerId2Info();
216 return;
217 }
218
219 NCBI_THROW(CPubseqGatewayException, eLogic,
220 "Logic error: none of the id2_info options were initialized");
221 }
222
223
x_ProcessIdModVerId2Info(void)224 void CPSGS_TSEChunkProcessor::x_ProcessIdModVerId2Info(void)
225 {
226 // This option is when id2info came in a shape of
227 // tse_id~~last_modified~~split_version
228
229 CRequestContextResetter context_resetter;
230 IPSGS_Processor::m_Request->SetRequestContext();
231
232 auto app = CPubseqGatewayApp::GetInstance();
233 string err_msg;
234
235 if (!m_IdModVerId2Info->GetTSEId().MapSatToKeyspace()) {
236 app->GetCounters().Increment(CPSGSCounters::ePSGS_ClientSatToSatNameError);
237
238 err_msg = GetName() + " failed to map sat " +
239 to_string(m_IdModVerId2Info->GetTSEId().m_Sat) +
240 " to a Cassandra keyspace";
241 x_SendProcessorError(err_msg, CRequestStatus::e404_NotFound,
242 ePSGS_UnknownResolvedSatellite);
243 UpdateOverallStatus(CRequestStatus::e404_NotFound);
244 m_Completed = true;
245 SignalFinishProcessing();
246
247 if (IPSGS_Processor::m_Reply->IsOutputReady())
248 x_Peek(false);
249 return;
250 }
251
252 // First, check the blob prop cache, may be the requested version matches
253 // the requested one
254 unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
255 CPSGCache psg_cache(IPSGS_Processor::m_Request,
256 IPSGS_Processor::m_Reply);
257
258 // CXX-11478: last modified is to be ignored for now
259 int64_t last_modified = INT64_MIN; // last modified is unknown
260 auto blob_prop_cache_lookup_result =
261 psg_cache.LookupBlobProp(m_IdModVerId2Info->GetTSEId().m_Sat,
262 m_IdModVerId2Info->GetTSEId().m_SatKey,
263 last_modified, *blob_record.get());
264 if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
265 do {
266 // Step 1: check the id2info presense
267 if (blob_record->GetId2Info().empty()) {
268 app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheNotMatched);
269 PSG_WARNING("Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
270 " properties id2info is empty in cache");
271 break; // Continue with cassandra
272 }
273
274 // Step 2: check that the id2info is parsable
275 unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> cache_id2_info;
276 // false -> do not finish the request
277 if (!x_ParseTSEChunkId2Info(blob_record->GetId2Info(),
278 cache_id2_info,
279 m_IdModVerId2Info->GetTSEId(),
280 false)) {
281 app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheNotMatched);
282 break; // Continue with cassandra
283 }
284
285 // Step 3: check the split version in cache
286 if (cache_id2_info->GetSplitVersion() != m_IdModVerId2Info->GetSplitVersion()) {
287 app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheNotMatched);
288 PSG_WARNING("Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
289 " split version in cache does not match the requested one");
290 break; // Continue with cassandra
291 }
292
293 app->GetCounters().Increment(CPSGSCounters::ePSGS_TSEChunkSplitVersionCacheMatched);
294
295 // Step 4: validate the chunk number
296 if (!x_ValidateTSEChunkNumber(m_TSEChunkRequest->m_Id2Chunk,
297 cache_id2_info->GetChunks(), false)) {
298 break; // Continue with cassandra
299 }
300
301 // Step 5: For the target chunk - convert sat to sat name
302 // Chunk's blob id
303 int64_t sat_key;
304 if (m_TSEChunkRequest->m_Id2Chunk == kSplitInfoChunk) {
305 // Special case
306 sat_key = cache_id2_info->GetInfo();
307 } else {
308 // For the target chunk - convert sat to sat name chunk's blob id
309 sat_key = cache_id2_info->GetInfo() -
310 cache_id2_info->GetChunks() - 1 +
311 m_TSEChunkRequest->m_Id2Chunk;
312 }
313 SCass_BlobId chunk_blob_id(cache_id2_info->GetSat(), sat_key);
314 if (!x_TSEChunkSatToKeyspace(chunk_blob_id, false)) {
315 break; // Continue with cassandra
316 }
317
318 // Step 6: search in cache the TSE chunk properties
319 last_modified = INT64_MIN;
320 auto tse_blob_prop_cache_lookup_result = psg_cache.LookupBlobProp(
321 chunk_blob_id.m_Sat, chunk_blob_id.m_SatKey,
322 last_modified, *blob_record.get());
323 if (tse_blob_prop_cache_lookup_result != ePSGS_CacheHit) {
324 err_msg = "TSE chunk blob " + chunk_blob_id.ToString() +
325 " properties are not found in cache";
326 if (tse_blob_prop_cache_lookup_result == ePSGS_CacheFailure)
327 err_msg += " due to LMDB error";
328 PSG_WARNING(err_msg);
329 break; // Continue with cassandra
330 }
331
332 // Step 7: initiate the chunk request
333 SPSGS_RequestBase::EPSGS_Trace trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
334 if (IPSGS_Processor::m_Request->NeedTrace())
335 trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
336 SPSGS_BlobBySatSatKeyRequest
337 chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()),
338 INT64_MIN,
339 SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
340 SPSGS_RequestBase::ePSGS_UnknownUseCache,
341 "", 0, trace_flag,
342 vector<string>(), vector<string>(),
343 chrono::high_resolution_clock::now());
344
345 unique_ptr<CCassBlobFetch> fetch_details;
346 fetch_details.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
347 CCassBlobTaskLoadBlob * load_task =
348 new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
349 app->GetCassandraMaxRetries(),
350 app->GetCassandraConnection(),
351 chunk_blob_id.m_Keyspace,
352 move(blob_record),
353 true, nullptr);
354 fetch_details->SetLoader(load_task);
355 load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
356 load_task->SetErrorCB(
357 CGetBlobErrorCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobError,
358 this, _1, _2, _3, _4, _5),
359 fetch_details.get()));
360 load_task->SetPropsCallback(
361 CBlobPropCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobProp,
362 this, _1, _2, _3),
363 IPSGS_Processor::m_Request,
364 IPSGS_Processor::m_Reply,
365 fetch_details.get(), false));
366 load_task->SetChunkCallback(
367 CBlobChunkCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobChunk,
368 this, _1, _2, _3, _4, _5),
369 fetch_details.get()));
370
371 if (IPSGS_Processor::m_Request->NeedTrace()) {
372 IPSGS_Processor::m_Reply->SendTrace(
373 "Cassandra request: " +
374 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
375 IPSGS_Processor::m_Request->GetStartTimestamp());
376 }
377
378 m_FetchDetails.push_back(move(fetch_details));
379 load_task->Wait(); // Initiate cassandra request
380 return;
381 } while (false);
382 } else {
383 if (psg_cache.IsAllowed()) {
384 err_msg = "Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
385 " properties are not found in cache";
386 if (blob_prop_cache_lookup_result == ePSGS_CacheFailure) {
387 err_msg += " due to LMDB error";
388 PSG_WARNING(err_msg);
389 } else {
390 // This warning could be confusing for the following cases:
391 // - requested blob has been recently removed
392 // but the split history still has the record for it
393 // - so the cache has nothing but split history directs to the
394 // still hanging blo
395 // The data will be sent back sucessfully and a log file will
396 // have a confusing warning. So it was decided to comment the
397 // warning out.
398 // PSG_WARNING(err_msg);
399 }
400 }
401 }
402
403
404 // Here:
405 // - fallback to cassandra
406 // - cache is not allowed
407 // - not found in cache
408
409 // Initiate async the history request
410 unique_ptr<CCassSplitHistoryFetch> fetch_details;
411 fetch_details.reset(new CCassSplitHistoryFetch(*m_TSEChunkRequest,
412 m_IdModVerId2Info->GetTSEId(),
413 m_IdModVerId2Info->GetSplitVersion()));
414 CCassBlobTaskFetchSplitHistory * load_task =
415 new CCassBlobTaskFetchSplitHistory(app->GetCassandraTimeout(),
416 app->GetCassandraMaxRetries(),
417 app->GetCassandraConnection(),
418 m_IdModVerId2Info->GetTSEId().m_Keyspace,
419 m_IdModVerId2Info->GetTSEId().m_SatKey,
420 m_IdModVerId2Info->GetSplitVersion(),
421 nullptr, nullptr);
422 fetch_details->SetLoader(load_task);
423 load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
424 load_task->SetErrorCB(
425 CSplitHistoryErrorCallback(
426 bind(&CPSGS_TSEChunkProcessor::OnGetSplitHistoryError,
427 this, _1, _2, _3, _4, _5),
428 fetch_details.get()));
429 load_task->SetConsumeCallback(
430 CSplitHistoryConsumeCallback(
431 bind(&CPSGS_TSEChunkProcessor::OnGetSplitHistory,
432 this, _1, _2),
433 fetch_details.get()));
434
435 if (IPSGS_Processor::m_Request->NeedTrace()) {
436 IPSGS_Processor::m_Reply->SendTrace(
437 "Cassandra request: " +
438 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
439 IPSGS_Processor::m_Request->GetStartTimestamp());
440 }
441
442 m_FetchDetails.push_back(move(fetch_details));
443 load_task->Wait(); // Initiate cassandra request
444 }
445
446
x_ProcessSatInfoChunkVerId2Info(void)447 void CPSGS_TSEChunkProcessor::x_ProcessSatInfoChunkVerId2Info(void)
448 {
449 // This option is when id2info came in a shape of sat.info.chunks[.ver]
450
451 auto app = CPubseqGatewayApp::GetInstance();
452 string err_msg;
453
454 // Note: the TSE id (blob id) is not used in the chunk retrieval.
455 // So there is no need to map its sat to keyspace. The TSE id
456 // will be sent to the client as as.
457 // The user provided id2_info is used to calculate the chunk blob_id
458 // so the sat from id2_info will be mapped to the cassandra keyspace
459
460 // Validate the chunk number
461 // true -> finish if failed
462 if (!x_ValidateTSEChunkNumber(m_TSEChunkRequest->m_Id2Chunk,
463 m_SatInfoChunkVerId2Info->GetChunks(),
464 true))
465 return;
466
467 int64_t sat_key;
468 if (m_TSEChunkRequest->m_Id2Chunk == kSplitInfoChunk) {
469 // Special case
470 sat_key = m_SatInfoChunkVerId2Info->GetInfo();
471 } else {
472 // For the target chunk - convert sat to sat name chunk's blob id
473 sat_key = m_SatInfoChunkVerId2Info->GetInfo() -
474 m_SatInfoChunkVerId2Info->GetChunks() - 1 + m_TSEChunkRequest->m_Id2Chunk;
475 }
476 SCass_BlobId chunk_blob_id(m_SatInfoChunkVerId2Info->GetSat(), sat_key);
477 if (!x_TSEChunkSatToKeyspace(chunk_blob_id))
478 return;
479
480 // Search in cache the TSE chunk properties
481 CPSGCache psg_cache(IPSGS_Processor::m_Request,
482 IPSGS_Processor::m_Reply);
483 int64_t last_modified = INT64_MIN;
484 unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
485 auto tse_blob_prop_cache_lookup_result =
486 psg_cache.LookupBlobProp(chunk_blob_id.m_Sat, chunk_blob_id.m_SatKey,
487 last_modified, *blob_record.get());
488
489
490 // Initiate the chunk request
491 SPSGS_RequestBase::EPSGS_Trace trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
492 if (IPSGS_Processor::m_Request->NeedTrace())
493 trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
494 SPSGS_BlobBySatSatKeyRequest
495 chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()),
496 INT64_MIN,
497 SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
498 SPSGS_RequestBase::ePSGS_UnknownUseCache,
499 "", 0, trace_flag,
500 vector<string>(), vector<string>(),
501 chrono::high_resolution_clock::now());
502
503 unique_ptr<CCassBlobFetch> fetch_details;
504 fetch_details.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
505
506 CCassBlobTaskLoadBlob * load_task = nullptr;
507 if (tse_blob_prop_cache_lookup_result != ePSGS_CacheHit) {
508 // Cassandra should look for blob props as well
509 load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
510 app->GetCassandraMaxRetries(),
511 app->GetCassandraConnection(),
512 chunk_blob_id.m_Keyspace,
513 chunk_blob_id.m_SatKey,
514 true, nullptr);
515 } else {
516 // Blob props are already here
517 load_task = new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
518 app->GetCassandraMaxRetries(),
519 app->GetCassandraConnection(),
520 chunk_blob_id.m_Keyspace,
521 move(blob_record),
522 true, nullptr);
523 }
524
525 fetch_details->SetLoader(load_task);
526 load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
527 load_task->SetErrorCB(
528 CGetBlobErrorCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobError,
529 this, _1, _2, _3, _4, _5),
530 fetch_details.get()));
531 load_task->SetPropsCallback(
532 CBlobPropCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobProp,
533 this, _1, _2, _3),
534 IPSGS_Processor::m_Request,
535 IPSGS_Processor::m_Reply,
536 fetch_details.get(), false));
537 load_task->SetChunkCallback(
538 CBlobChunkCallback(bind(&CPSGS_TSEChunkProcessor::OnGetBlobChunk,
539 this, _1, _2, _3, _4, _5),
540 fetch_details.get()));
541
542 if (IPSGS_Processor::m_Request->NeedTrace()) {
543 IPSGS_Processor::m_Reply->SendTrace(
544 "Cassandra request: " +
545 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
546 IPSGS_Processor::m_Request->GetStartTimestamp());
547 }
548
549 m_FetchDetails.push_back(move(fetch_details));
550 load_task->Wait(); // Initiate cassandra request
551 }
552
553
OnGetBlobProp(CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool is_found)554 void CPSGS_TSEChunkProcessor::OnGetBlobProp(CCassBlobFetch * fetch_details,
555 CBlobRecord const & blob,
556 bool is_found)
557 {
558 if (m_Cancelled) {
559 m_Completed = true;
560 return;
561 }
562
563 if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
564 m_Completed = true;
565 return;
566 }
567
568 // Note: cannot use CPSGS_CassBlobBase::OnGetBlobChunk() anymore because
569 // the reply has to have a few more fields for ID/get_tse_chunk request
570 CRequestContextResetter context_resetter;
571 IPSGS_Processor::m_Request->SetRequestContext();
572
573 if (IPSGS_Processor::m_Request->NeedTrace()) {
574 IPSGS_Processor::m_Reply->SendTrace(
575 "Blob prop callback; found: " + to_string(is_found),
576 IPSGS_Processor::m_Request->GetStartTimestamp());
577 }
578
579 if (is_found) {
580 IPSGS_Processor::m_Reply->PrepareTSEBlobPropData(
581 fetch_details, GetName(),
582 m_TSEChunkRequest->m_Id2Chunk,
583 m_TSEChunkRequest->m_Id2Info,
584 ToJson(blob).Repr(CJsonNode::fStandardJson));
585 IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
586 fetch_details, GetName());
587 } else {
588 // Not found; it is the user error, not the data inconsistency
589 auto * app = CPubseqGatewayApp::GetInstance();
590 app->GetCounters().Increment(CPSGSCounters::ePSGS_BlobPropsNotFoundError);
591
592 auto blob_id = fetch_details->GetBlobId();
593 string message = "Blob " + blob_id.ToString() + " properties are not found";
594 PSG_WARNING(message);
595 UpdateOverallStatus(CRequestStatus::e404_NotFound);
596 IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
597 fetch_details, GetName(),
598 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
599 message, CRequestStatus::e404_NotFound,
600 ePSGS_BlobPropsNotFound, eDiag_Error);
601 IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
602 fetch_details, GetName());
603 SetFinished(fetch_details);
604 }
605
606 if (IPSGS_Processor::m_Reply->IsOutputReady())
607 x_Peek(false);
608 }
609
610
OnGetBlobError(CCassBlobFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)611 void CPSGS_TSEChunkProcessor::OnGetBlobError(CCassBlobFetch * fetch_details,
612 CRequestStatus::ECode status,
613 int code,
614 EDiagSev severity,
615 const string & message)
616 {
617 if (m_Cancelled) {
618 m_Completed = true;
619 return;
620 }
621
622 // Cannot use CPSGS_CassBlobBase::OnGetBlobError() anymore because
623 // the TSE messages have different parameters
624
625 CRequestContextResetter context_resetter;
626 IPSGS_Processor::m_Request->SetRequestContext();
627
628 // To avoid sending an error in Peek()
629 fetch_details->GetLoader()->ClearError();
630
631 // It could be a message or an error
632 bool is_error = CountError(status, code, severity, message);
633
634 if (is_error) {
635 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
636
637 if (fetch_details->IsBlobPropStage()) {
638 IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
639 fetch_details, GetName(),
640 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
641 message, CRequestStatus::e500_InternalServerError, code, severity);
642 IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
643 fetch_details, GetName());
644 } else {
645 IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
646 fetch_details, GetName(),
647 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
648 message, CRequestStatus::e500_InternalServerError, code, severity);
649 IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
650 fetch_details, GetName());
651 }
652
653 // If it is an error then regardless what stage it was, props or
654 // chunks, there will be no more activity
655 fetch_details->SetReadFinished();
656 } else {
657 if (fetch_details->IsBlobPropStage())
658 IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
659 fetch_details, GetName(),
660 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
661 message, status, code, severity);
662 else
663 IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
664 fetch_details, GetName(),
665 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
666 message, status, code, severity);
667 }
668
669 SetFinished(fetch_details);
670
671 if (IPSGS_Processor::m_Reply->IsOutputReady())
672 x_Peek(false);
673 }
674
675
OnGetBlobChunk(CCassBlobFetch * fetch_details,CBlobRecord const & blob,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)676 void CPSGS_TSEChunkProcessor::OnGetBlobChunk(CCassBlobFetch * fetch_details,
677 CBlobRecord const & blob,
678 const unsigned char * chunk_data,
679 unsigned int data_size,
680 int chunk_no)
681 {
682 // Note: cannot use CPSGS_CassBlobBase::OnGetBlobChunk() anymore because
683 // the reply has to have a few more fields for TSE chunks
684 CRequestContextResetter context_resetter;
685 IPSGS_Processor::m_Request->SetRequestContext();
686
687 if (m_Cancelled) {
688 fetch_details->GetLoader()->Cancel();
689 SetFinished(fetch_details);
690 if (IPSGS_Processor::m_Reply->IsOutputReady())
691 x_Peek(false);
692 return;
693 }
694 if (IPSGS_Processor::m_Reply->IsFinished()) {
695 CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
696 CPSGSCounters::ePSGS_UnknownError);
697 PSG_ERROR("Unexpected data received "
698 "while the output has finished, ignoring");
699 if (IPSGS_Processor::m_Reply->IsOutputReady())
700 x_Peek(false);
701 return;
702 }
703
704 if (chunk_no >= 0) {
705 if (IPSGS_Processor::m_Request->NeedTrace()) {
706 IPSGS_Processor::m_Reply->SendTrace(
707 "Blob chunk " + to_string(chunk_no) + " callback",
708 IPSGS_Processor::m_Request->GetStartTimestamp());
709 }
710
711 // A blob chunk; 0-length chunks are allowed too
712 IPSGS_Processor::m_Reply->PrepareTSEBlobData(
713 fetch_details, GetName(),
714 chunk_data, data_size, chunk_no,
715 m_TSEChunkRequest->m_Id2Chunk,
716 m_TSEChunkRequest->m_Id2Info);
717 } else {
718 if (IPSGS_Processor::m_Request->NeedTrace()) {
719 IPSGS_Processor::m_Reply->SendTrace(
720 "Blob chunk no-more-data callback",
721 IPSGS_Processor::m_Request->GetStartTimestamp());
722 }
723
724 // End of the blob
725 IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
726 fetch_details, GetName());
727 SetFinished(fetch_details);
728
729 // Note: no need to set the blob completed in the exclude blob cache.
730 // It will happen in Peek()
731 }
732
733 if (IPSGS_Processor::m_Reply->IsOutputReady())
734 x_Peek(false);
735 }
736
737
738 void
OnGetSplitHistoryError(CCassSplitHistoryFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)739 CPSGS_TSEChunkProcessor::OnGetSplitHistoryError(
740 CCassSplitHistoryFetch * fetch_details,
741 CRequestStatus::ECode status,
742 int code,
743 EDiagSev severity,
744 const string & message)
745 {
746 if (m_Cancelled) {
747 m_Completed = true;
748 return;
749 }
750
751 CRequestContextResetter context_resetter;
752 IPSGS_Processor::m_Request->SetRequestContext();
753
754 // To avoid sending an error in Peek()
755 fetch_details->GetLoader()->ClearError();
756
757 // It could be a message or an error
758 bool is_error = (severity == eDiag_Error ||
759 severity == eDiag_Critical ||
760 severity == eDiag_Fatal);
761
762 auto * app = CPubseqGatewayApp::GetInstance();
763 if (status >= CRequestStatus::e400_BadRequest &&
764 status < CRequestStatus::e500_InternalServerError) {
765 PSG_WARNING(message);
766 } else {
767 PSG_ERROR(message);
768 }
769
770 if (IPSGS_Processor::m_Request->NeedTrace()) {
771 IPSGS_Processor::m_Reply->SendTrace(
772 "Split history error callback; status: " + to_string(status),
773 IPSGS_Processor::m_Request->GetStartTimestamp());
774 }
775
776 IPSGS_Processor::m_Reply->PrepareProcessorMessage(
777 IPSGS_Processor::m_Reply->GetItemId(),
778 GetName(), message, status, code, severity);
779
780 if (is_error) {
781 if (code == CCassandraException::eQueryTimeout)
782 app->GetCounters().Increment(CPSGSCounters::ePSGS_CassQueryTimeoutError);
783 else
784 app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
785
786 // If it is an error then there will be no more activity
787 fetch_details->SetReadFinished();
788 }
789
790 if (IPSGS_Processor::m_Reply->IsOutputReady())
791 x_Peek(false);
792 }
793
794
795 void
OnGetSplitHistory(CCassSplitHistoryFetch * fetch_details,vector<SSplitHistoryRecord> && result)796 CPSGS_TSEChunkProcessor::OnGetSplitHistory(
797 CCassSplitHistoryFetch * fetch_details,
798 vector<SSplitHistoryRecord> && result)
799 {
800 CRequestContextResetter context_resetter;
801 IPSGS_Processor::m_Request->SetRequestContext();
802
803 fetch_details->SetReadFinished();
804
805 if (m_Cancelled) {
806 fetch_details->GetLoader()->Cancel();
807 return;
808 }
809
810 if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
811 m_Completed = true;
812 return;
813 }
814
815 if (IPSGS_Processor::m_Request->NeedTrace()) {
816 IPSGS_Processor::m_Reply->SendTrace(
817 "Split history callback; number of records: " + to_string(result.size()),
818 IPSGS_Processor::m_Request->GetStartTimestamp());
819 }
820
821 auto * app = CPubseqGatewayApp::GetInstance();
822 if (result.empty()) {
823 // Split history is not found
824 app->GetCounters().Increment(CPSGSCounters::ePSGS_SplitHistoryNotFoundError);
825
826 string message = "Split history version " +
827 to_string(fetch_details->GetSplitVersion()) +
828 " is not found for the TSE id " +
829 fetch_details->GetTSEId().ToString();
830 PSG_WARNING(message);
831 UpdateOverallStatus(CRequestStatus::e404_NotFound);
832 IPSGS_Processor::m_Reply->PrepareProcessorMessage(
833 IPSGS_Processor::m_Reply->GetItemId(),
834 GetName(), message, CRequestStatus::e404_NotFound,
835 ePSGS_SplitHistoryNotFound, eDiag_Error);
836 SignalFinishProcessing();
837 } else {
838 // Split history found.
839 // Note: the request was issued so that there could be exactly one
840 // split history record or none at all. So it is not checked that
841 // there are more than one record.
842 x_RequestTSEChunk(result[0], fetch_details);
843 }
844
845 if (IPSGS_Processor::m_Reply->IsOutputReady())
846 x_Peek(false);
847 }
848
849
850 void
x_RequestTSEChunk(const SSplitHistoryRecord & split_record,CCassSplitHistoryFetch * fetch_details)851 CPSGS_TSEChunkProcessor::x_RequestTSEChunk(
852 const SSplitHistoryRecord & split_record,
853 CCassSplitHistoryFetch * fetch_details)
854 {
855 // Parse id2info
856 unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> id2_info;
857 if (!x_ParseTSEChunkId2Info(split_record.id2_info,
858 id2_info, fetch_details->GetTSEId(), true))
859 return;
860
861 // Check the requested chunk
862 // true -> finish the request if failed
863 if (!x_ValidateTSEChunkNumber(fetch_details->GetChunk(),
864 id2_info->GetChunks(), true))
865 return;
866
867 // Resolve sat to satkey
868 int64_t sat_key;
869 if (fetch_details->GetChunk() == kSplitInfoChunk) {
870 // Special case
871 sat_key = id2_info->GetInfo();
872 } else {
873 sat_key = id2_info->GetInfo() - id2_info->GetChunks() - 1 +
874 fetch_details->GetChunk();
875 }
876 SCass_BlobId chunk_blob_id(id2_info->GetSat(), sat_key);
877 if (!x_TSEChunkSatToKeyspace(chunk_blob_id, true))
878 return;
879
880 // Look for the blob props
881 // Form the chunk request with/without blob props
882 unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
883 CPSGCache psg_cache(
884 fetch_details->GetUseCache() != SPSGS_RequestBase::ePSGS_DbOnly,
885 IPSGS_Processor::m_Request,
886 IPSGS_Processor::m_Reply);
887 int64_t last_modified = INT64_MIN; // last modified is unknown
888 auto blob_prop_cache_lookup_result =
889 psg_cache.LookupBlobProp(chunk_blob_id.m_Sat,
890 chunk_blob_id.m_SatKey,
891 last_modified, *blob_record.get());
892 if (blob_prop_cache_lookup_result != ePSGS_CacheHit &&
893 fetch_details->GetUseCache() == SPSGS_RequestBase::ePSGS_CacheOnly) {
894 // Cassandra is forbidden for the blob prop
895 string err_msg = "TSE chunk blob " + chunk_blob_id.ToString() +
896 " properties are not found in cache";
897 if (blob_prop_cache_lookup_result == ePSGS_CacheFailure)
898 err_msg += " due to LMDB error";
899 x_SendProcessorError(err_msg, CRequestStatus::e404_NotFound,
900 ePSGS_BlobPropsNotFound);
901 UpdateOverallStatus(CRequestStatus::e404_NotFound);
902 SignalFinishProcessing();
903 return;
904 }
905
906 SPSGS_RequestBase::EPSGS_Trace trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
907 if (IPSGS_Processor::m_Request->NeedTrace())
908 trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
909 SPSGS_BlobBySatSatKeyRequest
910 chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()), INT64_MIN,
911 SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
912 SPSGS_RequestBase::ePSGS_UnknownUseCache,
913 "", 0, trace_flag,
914 vector<string>(), vector<string>(),
915 chrono::high_resolution_clock::now());
916 unique_ptr<CCassBlobFetch> cass_blob_fetch;
917 cass_blob_fetch.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
918
919 auto app = CPubseqGatewayApp::GetInstance();
920 CCassBlobTaskLoadBlob * load_task = nullptr;
921
922 if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
923 load_task = new CCassBlobTaskLoadBlob(
924 app->GetCassandraTimeout(),
925 app->GetCassandraMaxRetries(),
926 app->GetCassandraConnection(),
927 chunk_blob_id.m_Keyspace,
928 move(blob_record),
929 true, nullptr);
930 } else {
931 load_task = new CCassBlobTaskLoadBlob(
932 app->GetCassandraTimeout(),
933 app->GetCassandraMaxRetries(),
934 app->GetCassandraConnection(),
935 chunk_blob_id.m_Keyspace,
936 chunk_blob_id.m_SatKey,
937 true, nullptr);
938 }
939 cass_blob_fetch->SetLoader(load_task);
940
941 load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
942 load_task->SetErrorCB(
943 CGetBlobErrorCallback(
944 bind(&CPSGS_TSEChunkProcessor::OnGetBlobError,
945 this, _1, _2, _3, _4, _5),
946 cass_blob_fetch.get()));
947 load_task->SetPropsCallback(
948 CBlobPropCallback(
949 bind(&CPSGS_TSEChunkProcessor::OnGetBlobProp,
950 this, _1, _2, _3),
951 IPSGS_Processor::m_Request,
952 IPSGS_Processor::m_Reply,
953 cass_blob_fetch.get(),
954 blob_prop_cache_lookup_result != ePSGS_CacheHit));
955 load_task->SetChunkCallback(
956 CBlobChunkCallback(
957 bind(&CPSGS_TSEChunkProcessor::OnGetBlobChunk,
958 this, _1, _2, _3, _4, _5),
959 cass_blob_fetch.get()));
960
961 if (IPSGS_Processor::m_Request->NeedTrace()) {
962 IPSGS_Processor::m_Reply->SendTrace(
963 "Cassandra request: " +
964 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
965 IPSGS_Processor::m_Request->GetStartTimestamp());
966 }
967
968 m_FetchDetails.push_back(move(cass_blob_fetch));
969 load_task->Wait();
970 }
971
972
973 void
x_SendProcessorError(const string & msg,CRequestStatus::ECode status,int code)974 CPSGS_TSEChunkProcessor::x_SendProcessorError(const string & msg,
975 CRequestStatus::ECode status,
976 int code)
977 {
978 if (m_Cancelled)
979 return;
980
981 CRequestContextResetter context_resetter;
982 IPSGS_Processor::m_Request->SetRequestContext();
983
984 IPSGS_Processor::m_Reply->PrepareProcessorMessage(
985 IPSGS_Processor::m_Reply->GetItemId(),
986 GetName(), msg, status, code, eDiag_Error);
987 UpdateOverallStatus(status);
988
989 if (status >= CRequestStatus::e400_BadRequest &&
990 status < CRequestStatus::e500_InternalServerError) {
991 PSG_WARNING(msg);
992 } else {
993 PSG_ERROR(msg);
994 }
995 }
996
997
998 bool
x_ValidateTSEChunkNumber(int64_t requested_chunk,CPSGS_SatInfoChunksVerFlavorId2Info::TChunks total_chunks,bool need_finish)999 CPSGS_TSEChunkProcessor::x_ValidateTSEChunkNumber(
1000 int64_t requested_chunk,
1001 CPSGS_SatInfoChunksVerFlavorId2Info::TChunks total_chunks,
1002 bool need_finish)
1003 {
1004 if (requested_chunk == kSplitInfoChunk) {
1005 // Special value: the info chunk must be provided
1006 return true;
1007 }
1008
1009 if (requested_chunk > total_chunks) {
1010 string msg = "Invalid chunk requested. "
1011 "The number of available chunks: " +
1012 to_string(total_chunks) + ", requested number: " +
1013 to_string(requested_chunk);
1014 if (need_finish) {
1015 auto * app = CPubseqGatewayApp::GetInstance();
1016 app->GetCounters().Increment(CPSGSCounters::ePSGS_MalformedArgs);
1017 x_SendProcessorError(msg, CRequestStatus::e400_BadRequest,
1018 ePSGS_MalformedParameter);
1019 UpdateOverallStatus(CRequestStatus::e400_BadRequest);
1020 m_Completed = true;
1021 SignalFinishProcessing();
1022 } else {
1023 PSG_WARNING(msg);
1024 }
1025 return false;
1026 }
1027 return true;
1028 }
1029
1030
1031 bool
x_TSEChunkSatToKeyspace(SCass_BlobId & blob_id)1032 CPSGS_TSEChunkProcessor::x_TSEChunkSatToKeyspace(SCass_BlobId & blob_id)
1033 {
1034 auto * app = CPubseqGatewayApp::GetInstance();
1035 if (app->SatToKeyspace(blob_id.m_Sat, blob_id.m_Keyspace))
1036 return true;
1037
1038 app->GetCounters().Increment(CPSGSCounters::ePSGS_ClientSatToSatNameError);
1039
1040 string msg = "Unknown TSE chunk satellite number " +
1041 to_string(blob_id.m_Sat) +
1042 " for the blob " + blob_id.ToString();
1043 x_SendProcessorError(msg, CRequestStatus::e500_InternalServerError,
1044 ePSGS_UnknownResolvedSatellite);
1045
1046 // This method is used only in case of the TSE chunk requests.
1047 // So in case of errors - synchronous or asynchronous - it is
1048 // necessary to finish the reply anyway.
1049 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
1050 m_Completed = true;
1051 SignalFinishProcessing();
1052 return false;
1053 }
1054
1055
Cancel(void)1056 void CPSGS_TSEChunkProcessor::Cancel(void)
1057 {
1058 m_Cancelled = true;
1059 CancelLoaders();
1060 }
1061
1062
GetStatus(void)1063 IPSGS_Processor::EPSGS_Status CPSGS_TSEChunkProcessor::GetStatus(void)
1064 {
1065 auto status = CPSGS_CassProcessorBase::GetStatus();
1066 if (status == IPSGS_Processor::ePSGS_InProgress)
1067 return status;
1068
1069 if (m_Cancelled)
1070 return IPSGS_Processor::ePSGS_Cancelled;
1071
1072 return status;
1073 }
1074
1075
GetName(void) const1076 string CPSGS_TSEChunkProcessor::GetName(void) const
1077 {
1078 return "Cassandra-gettsechunk";
1079 }
1080
1081
ProcessEvent(void)1082 void CPSGS_TSEChunkProcessor::ProcessEvent(void)
1083 {
1084 x_Peek(true);
1085 }
1086
1087
x_Peek(bool need_wait)1088 void CPSGS_TSEChunkProcessor::x_Peek(bool need_wait)
1089 {
1090 if (m_Cancelled)
1091 return;
1092
1093 if (m_InPeek)
1094 return;
1095 m_InPeek = true;
1096
1097 // 1 -> call m_Loader->Wait1 to pick data
1098 // 2 -> check if we have ready-to-send buffers
1099 // 3 -> call reply->Send() to send what we have if it is ready
1100 bool overall_final_state = false;
1101
1102 while (true) {
1103 auto initial_size = m_FetchDetails.size();
1104
1105 for (auto & details: m_FetchDetails) {
1106 if (details)
1107 overall_final_state |= x_Peek(details, need_wait);
1108 }
1109
1110 if (initial_size == m_FetchDetails.size())
1111 break;
1112 }
1113
1114 // TSE chunk: ready packets need to be sent right away
1115 if (IPSGS_Processor::m_Reply->IsOutputReady())
1116 IPSGS_Processor::m_Reply->Flush(false);
1117
1118 m_InPeek = false;
1119 }
1120
1121
x_Peek(unique_ptr<CCassFetch> & fetch_details,bool need_wait)1122 bool CPSGS_TSEChunkProcessor::x_Peek(unique_ptr<CCassFetch> & fetch_details,
1123 bool need_wait)
1124 {
1125 if (!fetch_details->GetLoader())
1126 return true;
1127
1128 bool final_state = false;
1129 if (need_wait) {
1130 if (!fetch_details->ReadFinished()) {
1131 final_state = fetch_details->GetLoader()->Wait();
1132 }
1133 }
1134
1135 if (fetch_details->GetLoader()->HasError() &&
1136 IPSGS_Processor::m_Reply->IsOutputReady() &&
1137 ! IPSGS_Processor::m_Reply->IsFinished()) {
1138 // Send an error
1139 string error = fetch_details->GetLoader()->LastError();
1140 auto * app = CPubseqGatewayApp::GetInstance();
1141
1142 app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
1143 PSG_ERROR(error);
1144
1145 CCassBlobFetch * blob_fetch = static_cast<CCassBlobFetch *>(fetch_details.get());
1146 if (blob_fetch->IsBlobPropStage()) {
1147 IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
1148 blob_fetch, GetName(),
1149 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
1150 error, CRequestStatus::e500_InternalServerError,
1151 ePSGS_UnknownError, eDiag_Error);
1152 IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
1153 blob_fetch, GetName());
1154 } else {
1155 IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
1156 blob_fetch, GetName(),
1157 m_TSEChunkRequest->m_Id2Chunk, m_TSEChunkRequest->m_Id2Info,
1158 error, CRequestStatus::e500_InternalServerError,
1159 ePSGS_UnknownError, eDiag_Error);
1160 IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
1161 blob_fetch, GetName());
1162 }
1163
1164 // Mark finished
1165 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
1166 fetch_details->SetReadFinished();
1167 SignalFinishProcessing();
1168 }
1169
1170 return final_state;
1171 }
1172
1173