1 /* $Id: cass_blob_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description: base class for processors which retrieve blobs
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include <corelib/request_status.hpp>
35 #include <corelib/ncbidiag.hpp>
36
37 #include "cass_fetch.hpp"
38 #include "psgs_request.hpp"
39 #include "psgs_reply.hpp"
40 #include "pubseq_gateway_utils.hpp"
41 #include "pubseq_gateway_convert_utils.hpp"
42 #include "pubseq_gateway.hpp"
43 #include "cass_blob_base.hpp"
44 #include "pubseq_gateway_cache_utils.hpp"
45 #include "pubseq_gateway_convert_utils.hpp"
46 #include "public_comment_callback.hpp"
47
48 using namespace std::placeholders;
49
50
CPSGS_CassBlobBase()51 CPSGS_CassBlobBase::CPSGS_CassBlobBase() :
52 m_LastModified(-1)
53 {}
54
55
CPSGS_CassBlobBase(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,const string & processor_id)56 CPSGS_CassBlobBase::CPSGS_CassBlobBase(shared_ptr<CPSGS_Request> request,
57 shared_ptr<CPSGS_Reply> reply,
58 const string & processor_id) :
59 m_NeedToParseId2Info(true),
60 m_ProcessorId(processor_id),
61 m_LastModified(-1)
62 {}
63
64
~CPSGS_CassBlobBase()65 CPSGS_CassBlobBase::~CPSGS_CassBlobBase()
66 {}
67
68
69 void
OnGetBlobProp(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool is_found)70 CPSGS_CassBlobBase::OnGetBlobProp(TBlobPropsCB blob_props_cb,
71 TBlobChunkCB blob_chunk_cb,
72 TBlobErrorCB blob_error_cb,
73 CCassBlobFetch * fetch_details,
74 CBlobRecord const & blob,
75 bool is_found)
76 {
77 CRequestContextResetter context_resetter;
78 m_Request->SetRequestContext();
79
80 if (m_Request->NeedTrace()) {
81 m_Reply->SendTrace("Blob prop callback; found: " + to_string(is_found),
82 m_Request->GetStartTimestamp());
83 }
84
85 if (is_found) {
86 // The method could be called multiple times. First time it is called
87 // for the blob in the request (ID/getblob and ID/get). At this moment
88 // the blob id2info field should be parsed and memorized.
89 // Later the original blob id2info is used to decide if id2_chunk
90 // and id2_info should be present in the reply.
91
92 if (m_LastModified == -1)
93 m_LastModified = blob.GetModified();
94
95 x_PrepareBlobPropData(fetch_details, blob);
96
97 if (m_NeedToParseId2Info) {
98 if (!blob.GetId2Info().empty()) {
99 if (!x_ParseId2Info(fetch_details, blob)) {
100 x_PrepareBlobPropCompletion(fetch_details);
101 SetFinished(fetch_details);
102 return;
103 }
104 }
105 m_NeedToParseId2Info = false;
106 }
107
108 // Note: initially only blob_props are requested and at that moment the
109 // TSE option is 'known'. So the initial request should be
110 // finished, see m_FinishedRead = true
111 // In the further requests of the blob data regardless with blob
112 // props or not, the TSE option is set to unknown so the request
113 // will be finished at the moment when blob chunks are handled.
114 switch (fetch_details->GetTSEOption()) {
115 case SPSGS_BlobRequestBase::ePSGS_NoneTSE:
116 x_OnBlobPropNoneTSE(fetch_details);
117 break;
118 case SPSGS_BlobRequestBase::ePSGS_SlimTSE:
119 x_OnBlobPropSlimTSE(blob_props_cb, blob_chunk_cb, blob_error_cb,
120 fetch_details, blob);
121 break;
122 case SPSGS_BlobRequestBase::ePSGS_SmartTSE:
123 x_OnBlobPropSmartTSE(blob_props_cb, blob_chunk_cb, blob_error_cb,
124 fetch_details, blob);
125 break;
126 case SPSGS_BlobRequestBase::ePSGS_WholeTSE:
127 x_OnBlobPropWholeTSE(blob_props_cb, blob_chunk_cb, blob_error_cb,
128 fetch_details, blob);
129 break;
130 case SPSGS_BlobRequestBase::ePSGS_OrigTSE:
131 x_OnBlobPropOrigTSE(blob_chunk_cb, blob_error_cb,
132 fetch_details, blob);
133 break;
134 case SPSGS_BlobRequestBase::ePSGS_UnknownTSE:
135 // Used when INFO blobs are asked; i.e. chunks have been
136 // requested as well, so only the prop completion message needs
137 // to be sent.
138 x_PrepareBlobPropCompletion(fetch_details);
139 break;
140 }
141 } else {
142 x_OnBlobPropNotFound(fetch_details);
143 }
144 }
145
146
147 void
x_OnBlobPropNoneTSE(CCassBlobFetch * fetch_details)148 CPSGS_CassBlobBase::x_OnBlobPropNoneTSE(CCassBlobFetch * fetch_details)
149 {
150 // Nothing else to be sent
151 x_PrepareBlobPropCompletion(fetch_details);
152 SetFinished(fetch_details);
153 }
154
155
156 void
x_OnBlobPropSlimTSE(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)157 CPSGS_CassBlobBase::x_OnBlobPropSlimTSE(TBlobPropsCB blob_props_cb,
158 TBlobChunkCB blob_chunk_cb,
159 TBlobErrorCB blob_error_cb,
160 CCassBlobFetch * fetch_details,
161 CBlobRecord const & blob)
162 {
163 auto fetch_blob = fetch_details->GetBlobId();
164
165 // The handler deals with both kind of blob requests:
166 // - by sat/sat_key
167 // - by seq_id/seq_id_type
168 // So get the reference to the blob base request
169 auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
170
171 fetch_details->SetReadFinished();
172 if (blob.GetId2Info().empty()) {
173 x_PrepareBlobPropCompletion(fetch_details);
174
175 // An original blob may be required if its size is small
176 auto * app = CPubseqGatewayApp::GetInstance();
177 unsigned int slim_max_blob_size = app->GetSlimMaxBlobSize();
178
179 if (blob.GetSize() <= slim_max_blob_size) {
180 // The blob is small, get it, but first check in the
181 // exclude blob cache
182 if (x_CheckExcludeBlobCache(fetch_details,
183 blob_request) == ePSGS_InCache) {
184 return;
185 }
186
187 x_RequestOriginalBlobChunks(blob_chunk_cb, blob_error_cb,
188 fetch_details, blob);
189 } else {
190 // Nothing else to be sent, the original blob is big
191 }
192 return;
193 }
194
195 // Check the cache first - only if it is about the main
196 // blob request
197 if (x_CheckExcludeBlobCache(fetch_details,
198 blob_request) == ePSGS_InCache) {
199 return;
200 }
201
202 // Not in the cache, request the split INFO blob only
203 x_RequestID2BlobChunks(blob_props_cb, blob_chunk_cb, blob_error_cb,
204 fetch_details, blob, true);
205
206 // It is important to send completion after: there could be
207 // an error of converting/translating ID2 info
208 x_PrepareBlobPropCompletion(fetch_details);
209 }
210
211
212 void
x_OnBlobPropSmartTSE(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)213 CPSGS_CassBlobBase::x_OnBlobPropSmartTSE(TBlobPropsCB blob_props_cb,
214 TBlobChunkCB blob_chunk_cb,
215 TBlobErrorCB blob_error_cb,
216 CCassBlobFetch * fetch_details,
217 CBlobRecord const & blob)
218 {
219 fetch_details->SetReadFinished();
220 if (blob.GetId2Info().empty()) {
221 // Request original blob chunks
222 x_PrepareBlobPropCompletion(fetch_details);
223 x_RequestOriginalBlobChunks(blob_chunk_cb, blob_error_cb,
224 fetch_details, blob);
225 } else {
226 // Request the split INFO blob only
227 x_RequestID2BlobChunks(blob_props_cb, blob_chunk_cb, blob_error_cb,
228 fetch_details, blob, true);
229
230 // It is important to send completion after: there could be
231 // an error of converting/translating ID2 info
232 x_PrepareBlobPropCompletion(fetch_details);
233 }
234 }
235
236
237 void
x_OnBlobPropWholeTSE(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)238 CPSGS_CassBlobBase::x_OnBlobPropWholeTSE(TBlobPropsCB blob_props_cb,
239 TBlobChunkCB blob_chunk_cb,
240 TBlobErrorCB blob_error_cb,
241 CCassBlobFetch * fetch_details,
242 CBlobRecord const & blob)
243 {
244 fetch_details->SetReadFinished();
245 if (blob.GetId2Info().empty()) {
246 // Request original blob chunks
247 x_PrepareBlobPropCompletion(fetch_details);
248 x_RequestOriginalBlobChunks(blob_chunk_cb, blob_error_cb,
249 fetch_details, blob);
250 } else {
251 // Request the split INFO blob and all split chunks
252 x_RequestID2BlobChunks(blob_props_cb, blob_chunk_cb, blob_error_cb,
253 fetch_details, blob, false);
254
255 // It is important to send completion after: there could be
256 // an error of converting/translating ID2 info
257 x_PrepareBlobPropCompletion(fetch_details);
258 }
259 }
260
261
262 void
x_OnBlobPropOrigTSE(TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)263 CPSGS_CassBlobBase::x_OnBlobPropOrigTSE(TBlobChunkCB blob_chunk_cb,
264 TBlobErrorCB blob_error_cb,
265 CCassBlobFetch * fetch_details,
266 CBlobRecord const & blob)
267 {
268 fetch_details->SetReadFinished();
269 // Request original blob chunks
270 x_PrepareBlobPropCompletion(fetch_details);
271 x_RequestOriginalBlobChunks(blob_chunk_cb,
272 blob_error_cb,
273 fetch_details, blob);
274 }
275
276
277 void
x_RequestOriginalBlobChunks(TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob)278 CPSGS_CassBlobBase::x_RequestOriginalBlobChunks(TBlobChunkCB blob_chunk_cb,
279 TBlobErrorCB blob_error_cb,
280 CCassBlobFetch * fetch_details,
281 CBlobRecord const & blob)
282 {
283 auto app = CPubseqGatewayApp::GetInstance();
284
285 auto trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
286 if (m_Request->NeedTrace())
287 trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
288
289 // eUnknownTSE is safe here; no blob prop call will happen anyway
290 // eUnknownUseCache is safe here; no further resolution required
291 auto cass_blob_id = fetch_details->GetBlobId();
292 SPSGS_BlobBySatSatKeyRequest
293 orig_blob_request(SPSGS_BlobId(cass_blob_id.ToString()),
294 blob.GetModified(),
295 SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
296 SPSGS_RequestBase::ePSGS_UnknownUseCache,
297 fetch_details->GetClientId(),
298 0, trace_flag,
299 vector<string>(), vector<string>(),
300 chrono::high_resolution_clock::now());
301
302 // Create the cass async loader
303 unique_ptr<CBlobRecord> blob_record(new CBlobRecord(blob));
304 CCassBlobTaskLoadBlob * load_task =
305 new CCassBlobTaskLoadBlob(app->GetCassandraTimeout(),
306 app->GetCassandraMaxRetries(),
307 app->GetCassandraConnection(),
308 cass_blob_id.m_Keyspace,
309 move(blob_record),
310 true, nullptr);
311
312 unique_ptr<CCassBlobFetch> cass_blob_fetch;
313 cass_blob_fetch.reset(new CCassBlobFetch(orig_blob_request, cass_blob_id));
314 cass_blob_fetch->SetLoader(load_task);
315
316 // Blob props have already been rceived
317 cass_blob_fetch->SetBlobPropSent();
318
319 load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
320 load_task->SetErrorCB(
321 CGetBlobErrorCallback(blob_error_cb, cass_blob_fetch.get()));
322 load_task->SetPropsCallback(nullptr);
323 load_task->SetChunkCallback(
324 CBlobChunkCallback(blob_chunk_cb, cass_blob_fetch.get()));
325
326 if (m_Request->NeedTrace()) {
327 m_Reply->SendTrace(
328 "Cassandra request: " + ToJson(*load_task).Repr(CJsonNode::fStandardJson),
329 m_Request->GetStartTimestamp());
330 }
331
332 m_FetchDetails.push_back(std::move(cass_blob_fetch));
333
334 load_task->Wait();
335 }
336
337
338 void
x_RequestID2BlobChunks(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,CBlobRecord const & blob,bool info_blob_only)339 CPSGS_CassBlobBase::x_RequestID2BlobChunks(TBlobPropsCB blob_props_cb,
340 TBlobChunkCB blob_chunk_cb,
341 TBlobErrorCB blob_error_cb,
342 CCassBlobFetch * fetch_details,
343 CBlobRecord const & blob,
344 bool info_blob_only)
345 {
346 auto * app = CPubseqGatewayApp::GetInstance();
347
348 // Translate sat to keyspace
349 SCass_BlobId info_blob_id(m_Id2Info->GetSat(), m_Id2Info->GetInfo()); // mandatory
350
351 if (!app->SatToKeyspace(m_Id2Info->GetSat(), info_blob_id.m_Keyspace)) {
352 // Error: send it in the context of the blob props
353 string message = "Error mapping id2 info sat (" +
354 to_string(m_Id2Info->GetSat()) +
355 ") to a cassandra keyspace for the blob " +
356 fetch_details->GetBlobId().ToString();
357 x_PrepareBlobPropMessage(fetch_details, message,
358 CRequestStatus::e500_InternalServerError,
359 ePSGS_BadID2Info, eDiag_Error);
360 app->GetCounters().Increment(CPSGSCounters::ePSGS_ServerSatToSatNameError);
361 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
362 PSG_ERROR(message);
363 return;
364 }
365
366 auto trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
367 if (m_Request->NeedTrace())
368 trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
369
370 // Create the Id2Info requests.
371 // eUnknownTSE is treated in the blob prop handler as to do nothing (no
372 // sending completion message, no requesting other blobs)
373 // eUnknownUseCache is safe here; no further resolution
374 // empty client_id means that later on there will be no exclude blobs cache ops
375 SPSGS_BlobBySatSatKeyRequest
376 info_blob_request(SPSGS_BlobId(info_blob_id.ToString()),
377 INT64_MIN,
378 SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
379 SPSGS_RequestBase::ePSGS_UnknownUseCache,
380 "", 0, trace_flag,
381 vector<string>(), vector<string>(),
382 chrono::high_resolution_clock::now());
383
384 // Prepare Id2Info retrieval
385 unique_ptr<CCassBlobFetch> cass_blob_fetch;
386 cass_blob_fetch.reset(new CCassBlobFetch(info_blob_request, info_blob_id));
387
388 unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
389 CPSGCache psg_cache(m_Request, m_Reply);
390 auto blob_prop_cache_lookup_result =
391 psg_cache.LookupBlobProp(
392 info_blob_id.m_Sat,
393 info_blob_id.m_SatKey,
394 info_blob_request.m_LastModified,
395 *blob_record.get());
396 CCassBlobTaskLoadBlob * load_task = nullptr;
397
398
399 if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
400 load_task = new CCassBlobTaskLoadBlob(
401 app->GetCassandraTimeout(),
402 app->GetCassandraMaxRetries(),
403 app->GetCassandraConnection(),
404 info_blob_id.m_Keyspace,
405 move(blob_record),
406 true, nullptr);
407 } else {
408 // The handler deals with both kind of blob requests:
409 // - by sat/sat_key
410 // - by seq_id/seq_id_type
411 // So get the reference to the blob base request
412 auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
413
414 if (blob_request.m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
415 // No need to continue; it is forbidded to look for blob props in
416 // the Cassandra DB
417 string message;
418
419 if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
420 message = "Blob properties are not found";
421 UpdateOverallStatus(CRequestStatus::e404_NotFound);
422 x_PrepareBlobPropMessage(fetch_details, message,
423 CRequestStatus::e404_NotFound,
424 ePSGS_BlobPropsNotFound, eDiag_Error);
425 } else {
426 message = "Blob properties are not found due to LMDB cache error";
427 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
428 x_PrepareBlobPropMessage(fetch_details, message,
429 CRequestStatus::e500_InternalServerError,
430 ePSGS_BlobPropsNotFound, eDiag_Error);
431 }
432
433 PSG_WARNING(message);
434 return;
435 }
436
437 load_task = new CCassBlobTaskLoadBlob(
438 app->GetCassandraTimeout(),
439 app->GetCassandraMaxRetries(),
440 app->GetCassandraConnection(),
441 info_blob_id.m_Keyspace,
442 info_blob_id.m_SatKey,
443 true, nullptr);
444 }
445 cass_blob_fetch->SetLoader(load_task);
446
447 load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
448 load_task->SetErrorCB(
449 CGetBlobErrorCallback(blob_error_cb, cass_blob_fetch.get()));
450 load_task->SetPropsCallback(
451 CBlobPropCallback(blob_props_cb,
452 m_Request, m_Reply, cass_blob_fetch.get(),
453 blob_prop_cache_lookup_result != ePSGS_CacheHit));
454 load_task->SetChunkCallback(
455 CBlobChunkCallback(blob_chunk_cb, cass_blob_fetch.get()));
456
457 if (m_Request->NeedTrace()) {
458 m_Reply->SendTrace("Cassandra request: " +
459 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
460 m_Request->GetStartTimestamp());
461 }
462
463 m_FetchDetails.push_back(move(cass_blob_fetch));
464 auto to_init_iter = m_FetchDetails.end();
465 --to_init_iter;
466
467 // We may need to request ID2 chunks
468 if (!info_blob_only) {
469 // Sat name is the same
470 x_RequestId2SplitBlobs(blob_props_cb, blob_chunk_cb, blob_error_cb,
471 fetch_details, info_blob_id.m_Keyspace);
472 }
473
474 // initiate retrieval: only those which were just created
475 while (to_init_iter != m_FetchDetails.end()) {
476 if (*to_init_iter)
477 (*to_init_iter)->GetLoader()->Wait();
478 ++to_init_iter;
479 }
480 }
481
482
483 void
x_RequestId2SplitBlobs(TBlobPropsCB blob_props_cb,TBlobChunkCB blob_chunk_cb,TBlobErrorCB blob_error_cb,CCassBlobFetch * fetch_details,const string & keyspace)484 CPSGS_CassBlobBase::x_RequestId2SplitBlobs(TBlobPropsCB blob_props_cb,
485 TBlobChunkCB blob_chunk_cb,
486 TBlobErrorCB blob_error_cb,
487 CCassBlobFetch * fetch_details,
488 const string & keyspace)
489 {
490 auto app = CPubseqGatewayApp::GetInstance();
491
492 auto trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
493 if (m_Request->NeedTrace())
494 trace_flag = SPSGS_RequestBase::ePSGS_WithTracing;
495
496 for (int chunk_no = 1; chunk_no <= m_Id2Info->GetChunks(); ++chunk_no) {
497 SCass_BlobId chunks_blob_id(m_Id2Info->GetSat(),
498 m_Id2Info->GetInfo() - m_Id2Info->GetChunks() - 1 + chunk_no);
499 chunks_blob_id.m_Keyspace = keyspace;
500
501 // eUnknownTSE is treated in the blob prop handler as to do nothing (no
502 // sending completion message, no requesting other blobs)
503 // eUnknownUseCache is safe here; no further resolution required
504 // client_id is "" (empty string) so the split blobs do not participate
505 // in the exclude blob cache
506 SPSGS_BlobBySatSatKeyRequest
507 chunk_request(SPSGS_BlobId(chunks_blob_id.ToString()),
508 INT64_MIN,
509 SPSGS_BlobRequestBase::ePSGS_UnknownTSE,
510 SPSGS_RequestBase::ePSGS_UnknownUseCache,
511 "", 0, trace_flag,
512 vector<string>(), vector<string>(),
513 chrono::high_resolution_clock::now());
514
515 unique_ptr<CCassBlobFetch> details;
516 details.reset(new CCassBlobFetch(chunk_request, chunks_blob_id));
517
518 unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
519 CPSGCache psg_cache(m_Request, m_Reply);
520 auto blob_prop_cache_lookup_result =
521 psg_cache.LookupBlobProp(
522 chunks_blob_id.m_Sat,
523 chunks_blob_id.m_SatKey,
524 chunk_request.m_LastModified,
525 *blob_record.get());
526 CCassBlobTaskLoadBlob * load_task = nullptr;
527
528 if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
529 load_task = new CCassBlobTaskLoadBlob(
530 app->GetCassandraTimeout(),
531 app->GetCassandraMaxRetries(),
532 app->GetCassandraConnection(),
533 chunks_blob_id.m_Keyspace,
534 move(blob_record),
535 true, nullptr);
536 details->SetLoader(load_task);
537 } else {
538 // The handler deals with both kind of blob requests:
539 // - by sat/sat_key
540 // - by seq_id/seq_id_type
541 // So get the reference to the blob base request
542 auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
543
544 if (blob_request.m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
545 // No need to create a request because the Cassandra DB access
546 // is forbidden
547 string message;
548 if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
549 message = "Blob properties are not found";
550 UpdateOverallStatus(CRequestStatus::e404_NotFound);
551 x_PrepareBlobPropMessage(details.get(), message,
552 CRequestStatus::e404_NotFound,
553 ePSGS_BlobPropsNotFound, eDiag_Error);
554 } else {
555 message = "Blob properties are not found "
556 "due to a blob proc cache lookup error";
557 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
558 x_PrepareBlobPropMessage(details.get(), message,
559 CRequestStatus::e500_InternalServerError,
560 ePSGS_BlobPropsNotFound, eDiag_Error);
561 }
562 x_PrepareBlobPropCompletion(details.get());
563 PSG_WARNING(message);
564 continue;
565 }
566
567 load_task = new CCassBlobTaskLoadBlob(
568 app->GetCassandraTimeout(),
569 app->GetCassandraMaxRetries(),
570 app->GetCassandraConnection(),
571 chunks_blob_id.m_Keyspace,
572 chunks_blob_id.m_SatKey,
573 true, nullptr);
574 details->SetLoader(load_task);
575 }
576
577 load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
578 load_task->SetErrorCB(
579 CGetBlobErrorCallback(blob_error_cb, details.get()));
580 load_task->SetPropsCallback(
581 CBlobPropCallback(blob_props_cb,
582 m_Request, m_Reply, details.get(),
583 blob_prop_cache_lookup_result != ePSGS_CacheHit));
584 load_task->SetChunkCallback(
585 CBlobChunkCallback(blob_chunk_cb, details.get()));
586
587 m_FetchDetails.push_back(move(details));
588 }
589 }
590
591
592
593 CPSGS_CassBlobBase::EPSGS_BlobCacheCheckResult
x_CheckExcludeBlobCache(CCassBlobFetch * fetch_details,SPSGS_BlobRequestBase & blob_request)594 CPSGS_CassBlobBase::x_CheckExcludeBlobCache(CCassBlobFetch * fetch_details,
595 SPSGS_BlobRequestBase & blob_request)
596 {
597 if (blob_request.m_ClientId.empty())
598 return ePSGS_NotInCache;
599
600 auto fetch_blob = fetch_details->GetBlobId();
601 if (fetch_blob != m_BlobId)
602 return ePSGS_NotInCache;
603
604 auto * app = CPubseqGatewayApp::GetInstance();
605 bool completed = true;
606 auto cache_result = app->GetExcludeBlobCache()->AddBlobId(
607 blob_request.m_ClientId,
608 m_BlobId.m_Sat,
609 m_BlobId.m_SatKey,
610 completed);
611 if (m_Request->GetRequestType() == CPSGS_Request::ePSGS_BlobBySeqIdRequest &&
612 cache_result == ePSGS_AlreadyInCache) {
613 x_PrepareBlobPropCompletion(fetch_details);
614 if (completed)
615 x_PrepareBlobExcluded(fetch_details, m_BlobId.ToString(),
616 ePSGS_BlobSent);
617 else
618 x_PrepareBlobExcluded(fetch_details, m_BlobId.ToString(),
619 ePSGS_BlobInProgress);
620 return ePSGS_InCache;
621 }
622
623 if (cache_result == ePSGS_Added)
624 blob_request.m_ExcludeBlobCacheAdded = true;
625 return ePSGS_NotInCache;
626 }
627
628
629 void
PrepareServerErrorMessage(CCassBlobFetch * fetch_details,int code,EDiagSev severity,const string & message)630 CPSGS_CassBlobBase::PrepareServerErrorMessage(CCassBlobFetch * fetch_details,
631 int code,
632 EDiagSev severity,
633 const string & message)
634 {
635 if (fetch_details->IsBlobPropStage()) {
636 x_PrepareBlobPropMessage(fetch_details, message,
637 CRequestStatus::e500_InternalServerError,
638 code, severity);
639 x_PrepareBlobPropCompletion(fetch_details);
640 } else {
641 x_PrepareBlobMessage(fetch_details, message,
642 CRequestStatus::e500_InternalServerError,
643 code, severity);
644 x_PrepareBlobCompletion(fetch_details);
645 }
646 }
647
648
649 void
OnGetBlobError(CCassBlobFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)650 CPSGS_CassBlobBase::OnGetBlobError(CCassBlobFetch * fetch_details,
651 CRequestStatus::ECode status,
652 int code,
653 EDiagSev severity,
654 const string & message)
655 {
656 CRequestContextResetter context_resetter;
657 m_Request->SetRequestContext();
658
659 // To avoid sending an error in Peek()
660 fetch_details->GetLoader()->ClearError();
661
662 // It could be a message or an error
663 bool is_error = CountError(status, code, severity, message);
664
665 if (is_error) {
666 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
667 PrepareServerErrorMessage(fetch_details, code, severity, message);
668
669 // This code is reused by 2 requests:
670 // - get blob by sat/sat_key
671 // - get blob by seq_id/seq_id_type
672 auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
673
674 if (fetch_details->GetBlobId() == m_BlobId) {
675 if (blob_request.m_ExcludeBlobCacheAdded &&
676 ! blob_request.m_ClientId.empty()) {
677 auto * app = CPubseqGatewayApp::GetInstance();
678 app->GetExcludeBlobCache()->Remove(blob_request.m_ClientId,
679 m_BlobId.m_Sat,
680 m_BlobId.m_SatKey);
681
682 // To prevent any updates
683 blob_request.m_ExcludeBlobCacheAdded = false;
684 }
685 }
686
687 // If it is an error then regardless what stage it was, props or
688 // chunks, there will be no more activity
689 fetch_details->SetReadFinished();
690 } else {
691 if (fetch_details->IsBlobPropStage())
692 x_PrepareBlobPropMessage(fetch_details, message, status,
693 code, severity);
694 else
695 x_PrepareBlobMessage(fetch_details, message, status,
696 code, severity);
697 }
698
699 SetFinished(fetch_details);
700 }
701
702
703 bool
CountError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)704 CPSGS_CassBlobBase::CountError(CRequestStatus::ECode status,
705 int code,
706 EDiagSev severity,
707 const string & message)
708 {
709 // It could be a message or an error
710 bool is_error = (severity == eDiag_Error ||
711 severity == eDiag_Critical ||
712 severity == eDiag_Fatal);
713
714 auto * app = CPubseqGatewayApp::GetInstance();
715 if (status >= CRequestStatus::e400_BadRequest &&
716 status < CRequestStatus::e500_InternalServerError) {
717 PSG_WARNING(message);
718 } else {
719 PSG_ERROR(message);
720 }
721
722 if (m_Request->NeedTrace()) {
723 m_Reply->SendTrace("Blob error callback; status " + to_string(status),
724 m_Request->GetStartTimestamp());
725 }
726
727 if (status == CRequestStatus::e404_NotFound) {
728 app->GetCounters().Increment(CPSGSCounters::ePSGS_GetBlobNotFound);
729 } else {
730 if (is_error) {
731 if (code == CCassandraException::eQueryTimeout)
732 app->GetCounters().Increment(CPSGSCounters::ePSGS_CassQueryTimeoutError);
733 else
734 app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
735 }
736 }
737
738 return is_error;
739 }
740
741
742 void
OnGetBlobChunk(bool cancelled,CCassBlobFetch * fetch_details,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)743 CPSGS_CassBlobBase::OnGetBlobChunk(bool cancelled,
744 CCassBlobFetch * fetch_details,
745 const unsigned char * chunk_data,
746 unsigned int data_size,
747 int chunk_no)
748 {
749 CRequestContextResetter context_resetter;
750 m_Request->SetRequestContext();
751
752 if (cancelled) {
753 fetch_details->GetLoader()->Cancel();
754 SetFinished(fetch_details);
755 return;
756 }
757 if (m_Reply->IsFinished()) {
758 CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
759 CPSGSCounters::ePSGS_UnknownError);
760 PSG_ERROR("Unexpected data received "
761 "while the output has finished, ignoring");
762 return;
763 }
764
765 if (chunk_no >= 0) {
766 if (m_Request->NeedTrace()) {
767 m_Reply->SendTrace("Blob chunk " + to_string(chunk_no) + " callback",
768 m_Request->GetStartTimestamp());
769 }
770
771 // A blob chunk; 0-length chunks are allowed too
772 x_PrepareBlobData(fetch_details, chunk_data, data_size, chunk_no);
773 } else {
774 if (m_Request->NeedTrace()) {
775 m_Reply->SendTrace("Blob chunk no-more-data callback",
776 m_Request->GetStartTimestamp());
777 }
778
779 // End of the blob
780 x_PrepareBlobCompletion(fetch_details);
781 SetFinished(fetch_details);
782
783 // Note: no need to set the blob completed in the exclude blob cache.
784 // It will happen in Peek()
785 }
786 }
787
788
789 void
x_OnBlobPropNotFound(CCassBlobFetch * fetch_details)790 CPSGS_CassBlobBase::x_OnBlobPropNotFound(CCassBlobFetch * fetch_details)
791 {
792 // Not found, report 500 because it is data inconsistency
793 // or 404 if it was requested via sat.sat_key
794 auto * app = CPubseqGatewayApp::GetInstance();
795 app->GetCounters().Increment(CPSGSCounters::ePSGS_BlobPropsNotFoundError);
796
797 auto blob_id = fetch_details->GetBlobId();
798 string message = "Blob " + blob_id.ToString() +
799 " properties are not found (last modified: " +
800 to_string(fetch_details->GetLoader()->GetModified()) + ")";
801 if (fetch_details->GetFetchType() == ePSGS_BlobBySatSatKeyFetch) {
802 // User requested wrong sat_key, so it is a client error
803 UpdateOverallStatus(CRequestStatus::e404_NotFound);
804 x_PrepareBlobPropMessage(fetch_details, message,
805 CRequestStatus::e404_NotFound,
806 ePSGS_BlobPropsNotFound, eDiag_Error);
807 } else {
808 // Server error, data inconsistency
809 PSG_ERROR(message);
810 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
811 x_PrepareBlobPropMessage(fetch_details, message,
812 CRequestStatus::e500_InternalServerError,
813 ePSGS_BlobPropsNotFound, eDiag_Error);
814 }
815
816 // The handler deals with 2 kind of requests:
817 // - get blob by sat/sat_key
818 // - get blob by seq_id/seq_id_type
819 auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
820
821 if (blob_id == m_BlobId) {
822 if (blob_request.m_ExcludeBlobCacheAdded && !blob_request.m_ClientId.empty()) {
823 app->GetExcludeBlobCache()->Remove(blob_request.m_ClientId,
824 m_BlobId.m_Sat,
825 m_BlobId.m_SatKey);
826 blob_request.m_ExcludeBlobCacheAdded = false;
827 }
828 }
829
830 x_PrepareBlobPropCompletion(fetch_details);
831 SetFinished(fetch_details);
832 }
833
834
835 bool
x_ParseId2Info(CCassBlobFetch * fetch_details,CBlobRecord const & blob)836 CPSGS_CassBlobBase::x_ParseId2Info(CCassBlobFetch * fetch_details,
837 CBlobRecord const & blob)
838 {
839 string err_msg;
840 try {
841 m_Id2Info.reset(new CPSGS_SatInfoChunksVerFlavorId2Info(blob.GetId2Info()));
842 return true;
843 } catch (const exception & exc) {
844 err_msg = "Error extracting id2 info for the blob " +
845 fetch_details->GetBlobId().ToString() + ": " + exc.what();
846 } catch (...) {
847 err_msg = "Unknown error extracting id2 info for the blob " +
848 fetch_details->GetBlobId().ToString() + ".";
849 }
850
851 CPubseqGatewayApp::GetInstance()->GetCounters().Increment(
852 CPSGSCounters::ePSGS_InvalidId2InfoError);
853 x_PrepareBlobPropMessage(fetch_details, err_msg,
854 CRequestStatus::e500_InternalServerError,
855 ePSGS_BadID2Info, eDiag_Error);
856 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
857 PSG_ERROR(err_msg);
858 return false;
859 }
860
861
862 void
SetFinished(CCassBlobFetch * fetch_details)863 CPSGS_CassBlobBase::SetFinished(CCassBlobFetch * fetch_details)
864 {
865 fetch_details->SetReadFinished();
866 }
867
868
869 bool
NeedToAddId2CunkId2Info(void) const870 CPSGS_CassBlobBase::NeedToAddId2CunkId2Info(void) const
871 {
872 auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
873 if (blob_request.m_TSEOption == SPSGS_BlobRequestBase::ePSGS_OrigTSE)
874 return false;
875
876 return m_Id2Info.get() != nullptr && m_NeedToParseId2Info == false;
877 }
878
879
880 int64_t
x_GetId2ChunkNumber(CCassBlobFetch * fetch_details)881 CPSGS_CassBlobBase::x_GetId2ChunkNumber(CCassBlobFetch * fetch_details)
882 {
883 // Note: this member is called only when m_Id2Info is parsed successfully
884
885 auto blob_key = fetch_details->GetBlobId().m_SatKey;
886 auto orig_blob_info = m_Id2Info->GetInfo();
887 if (orig_blob_info == blob_key) {
888 // It is a split info chunk so use a special value
889 return kSplitInfoChunk;
890 }
891
892 // Calculate the id2_chunk
893 return blob_key - orig_blob_info + m_Id2Info->GetChunks() + 1;
894 }
895
896
897 void
x_PrepareBlobPropData(CCassBlobFetch * blob_fetch_details,CBlobRecord const & blob)898 CPSGS_CassBlobBase::x_PrepareBlobPropData(CCassBlobFetch * blob_fetch_details,
899 CBlobRecord const & blob)
900 {
901 bool need_id2_identification = NeedToAddId2CunkId2Info();
902
903 // CXX-11547: may be public comments request is needed as well
904 if (blob.GetFlag(EBlobFlags::eSuppress) ||
905 blob.GetFlag(EBlobFlags::eWithdrawn)) {
906 // Request public comment
907 auto app = CPubseqGatewayApp::GetInstance();
908 unique_ptr<CCassPublicCommentFetch> comment_fetch_details;
909 comment_fetch_details.reset(new CCassPublicCommentFetch());
910 // Memorize the identification which will be used at the moment of
911 // sending the comment to the client
912 if (need_id2_identification) {
913 comment_fetch_details->SetId2Identification(
914 x_GetId2ChunkNumber(blob_fetch_details),
915 m_Id2Info->Serialize());
916 } else {
917 comment_fetch_details->SetCassBlobIdentification(
918 blob_fetch_details->GetBlobId(),
919 m_LastModified);
920 }
921
922 CCassStatusHistoryTaskGetPublicComment * load_task =
923 new CCassStatusHistoryTaskGetPublicComment(app->GetCassandraTimeout(),
924 app->GetCassandraMaxRetries(),
925 app->GetCassandraConnection(),
926 blob_fetch_details->GetBlobId().m_Keyspace,
927 blob, nullptr);
928 comment_fetch_details->SetLoader(load_task);
929 load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
930 load_task->SetErrorCB(
931 CPublicCommentErrorCallback(
932 bind(&CPSGS_CassBlobBase::OnPublicCommentError,
933 this, _1, _2, _3, _4, _5),
934 comment_fetch_details.get()));
935 load_task->SetCommentCallback(
936 CPublicCommentConsumeCallback(
937 bind(&CPSGS_CassBlobBase::OnPublicComment,
938 this, _1, _2, _3),
939 comment_fetch_details.get()));
940 load_task->SetMessages(app->GetPublicCommentsMapping());
941
942 if (m_Request->NeedTrace()) {
943 m_Reply->SendTrace(
944 "Cassandra request: " +
945 ToJson(*load_task).Repr(CJsonNode::fStandardJson),
946 m_Request->GetStartTimestamp());
947 }
948
949 m_FetchDetails.push_back(move(comment_fetch_details));
950 load_task->Wait(); // Initiate cassandra request
951 }
952
953
954 if (need_id2_identification) {
955 m_Reply->PrepareTSEBlobPropData(
956 blob_fetch_details, m_ProcessorId,
957 x_GetId2ChunkNumber(blob_fetch_details), m_Id2Info->Serialize(),
958 ToJson(blob).Repr(CJsonNode::fStandardJson));
959 } else {
960 // There is no id2info in the originally requested blob
961 // so just send blob props without id2_chunk/id2_info
962 m_Reply->PrepareBlobPropData(
963 blob_fetch_details, m_ProcessorId,
964 ToJson(blob).Repr(CJsonNode::fStandardJson), m_LastModified);
965 }
966 }
967
968
969 void
x_PrepareBlobPropCompletion(CCassBlobFetch * fetch_details)970 CPSGS_CassBlobBase::x_PrepareBlobPropCompletion(CCassBlobFetch * fetch_details)
971 {
972 if (NeedToAddId2CunkId2Info()) {
973 m_Reply->PrepareTSEBlobPropCompletion(fetch_details, m_ProcessorId);
974 } else {
975 // There is no id2info in the originally requested blob
976 // so just send blob prop completion without id2_chunk/id2_info
977 m_Reply->PrepareBlobPropCompletion(fetch_details, m_ProcessorId);
978 }
979 }
980
981
982 void
x_PrepareBlobData(CCassBlobFetch * fetch_details,const unsigned char * chunk_data,unsigned int data_size,int chunk_no)983 CPSGS_CassBlobBase::x_PrepareBlobData(CCassBlobFetch * fetch_details,
984 const unsigned char * chunk_data,
985 unsigned int data_size,
986 int chunk_no)
987 {
988 if (NeedToAddId2CunkId2Info()) {
989 m_Reply->PrepareTSEBlobData(
990 fetch_details, m_ProcessorId,
991 chunk_data, data_size, chunk_no,
992 x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize());
993 } else {
994 // There is no id2info in the originally requested blob
995 // so just send blob prop completion without id2_chunk/id2_info
996 m_Reply->PrepareBlobData(fetch_details, m_ProcessorId,
997 chunk_data, data_size, chunk_no,
998 m_LastModified);
999 }
1000 }
1001
1002
1003 void
x_PrepareBlobCompletion(CCassBlobFetch * fetch_details)1004 CPSGS_CassBlobBase::x_PrepareBlobCompletion(CCassBlobFetch * fetch_details)
1005 {
1006 if (NeedToAddId2CunkId2Info()) {
1007 m_Reply->PrepareTSEBlobCompletion(fetch_details, m_ProcessorId);
1008 } else {
1009 // There is no id2info in the originally requested blob
1010 // so just send blob prop completion without id2_chunk/id2_info
1011 m_Reply->PrepareBlobCompletion(fetch_details, m_ProcessorId);
1012 }
1013 }
1014
1015
1016 void
x_PrepareBlobPropMessage(CCassBlobFetch * fetch_details,const string & message,CRequestStatus::ECode status,int err_code,EDiagSev severity)1017 CPSGS_CassBlobBase::x_PrepareBlobPropMessage(CCassBlobFetch * fetch_details,
1018 const string & message,
1019 CRequestStatus::ECode status,
1020 int err_code,
1021 EDiagSev severity)
1022 {
1023 if (NeedToAddId2CunkId2Info()) {
1024 m_Reply->PrepareTSEBlobPropMessage(
1025 fetch_details, m_ProcessorId,
1026 x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize(),
1027 message, status, err_code, severity);
1028 } else {
1029 // There is no id2info in the originally requested blob
1030 // so just send blob prop completion without id2_chunk/id2_info
1031 m_Reply->PrepareBlobPropMessage(
1032 fetch_details, m_ProcessorId,
1033 message, status, err_code, severity);
1034 }
1035 }
1036
1037
1038 void
x_PrepareBlobMessage(CCassBlobFetch * fetch_details,const string & message,CRequestStatus::ECode status,int err_code,EDiagSev severity)1039 CPSGS_CassBlobBase::x_PrepareBlobMessage(CCassBlobFetch * fetch_details,
1040 const string & message,
1041 CRequestStatus::ECode status,
1042 int err_code,
1043 EDiagSev severity)
1044 {
1045 if (NeedToAddId2CunkId2Info()) {
1046 m_Reply->PrepareTSEBlobMessage(
1047 fetch_details, m_ProcessorId,
1048 x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize(),
1049 message, status, err_code, severity);
1050 } else {
1051 // There is no id2info in the originally requested blob
1052 // so just send blob prop completion without id2_chunk/id2_info
1053 m_Reply->PrepareBlobMessage(
1054 fetch_details, m_ProcessorId,
1055 message, status, err_code, severity, m_LastModified);
1056 }
1057 }
1058
1059
1060 void
x_PrepareBlobExcluded(CCassBlobFetch * fetch_details,const string & blob_id,EPSGS_BlobSkipReason skip_reason)1061 CPSGS_CassBlobBase::x_PrepareBlobExcluded(CCassBlobFetch * fetch_details,
1062 const string & blob_id,
1063 EPSGS_BlobSkipReason skip_reason)
1064 {
1065 if (NeedToAddId2CunkId2Info()) {
1066 m_Reply->PrepareTSEBlobExcluded(
1067 m_ProcessorId, x_GetId2ChunkNumber(fetch_details),
1068 m_Id2Info->Serialize(), skip_reason);
1069 } else {
1070 // There is no id2info in the originally requested blob
1071 // so just send blob prop completion without id2_chunk/id2_info
1072 m_Reply->PrepareBlobExcluded(blob_id, m_ProcessorId, skip_reason,
1073 m_LastModified);
1074 }
1075 }
1076
1077
1078 void
OnPublicCommentError(CCassPublicCommentFetch * fetch_details,CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)1079 CPSGS_CassBlobBase::OnPublicCommentError(
1080 CCassPublicCommentFetch * fetch_details,
1081 CRequestStatus::ECode status,
1082 int code,
1083 EDiagSev severity,
1084 const string & message)
1085 {
1086 CRequestContextResetter context_resetter;
1087 m_Request->SetRequestContext();
1088
1089 if (m_Cancelled) {
1090 fetch_details->SetReadFinished();
1091 fetch_details->GetLoader()->Cancel();
1092 return;
1093 }
1094
1095 // To avoid sending an error in Peek()
1096 fetch_details->GetLoader()->ClearError();
1097
1098 // It could be a message or an error
1099 bool is_error = (severity == eDiag_Error ||
1100 severity == eDiag_Critical ||
1101 severity == eDiag_Fatal);
1102
1103 auto * app = CPubseqGatewayApp::GetInstance();
1104 if (status >= CRequestStatus::e400_BadRequest &&
1105 status < CRequestStatus::e500_InternalServerError) {
1106 PSG_WARNING(message);
1107 } else {
1108 PSG_ERROR(message);
1109 }
1110
1111 if (m_Request->NeedTrace()) {
1112 m_Reply->SendTrace(
1113 "Public comment error callback; status: " + to_string(status),
1114 m_Request->GetStartTimestamp());
1115 }
1116
1117 m_Reply->PrepareProcessorMessage(
1118 m_Reply->GetItemId(),
1119 m_ProcessorId, message, status, code, severity);
1120
1121 if (is_error) {
1122 if (code == CCassandraException::eQueryTimeout)
1123 app->GetCounters().Increment(CPSGSCounters::ePSGS_CassQueryTimeoutError);
1124 else
1125 app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
1126
1127 // If it is an error then there will be no more activity
1128 fetch_details->SetReadFinished();
1129 }
1130
1131 // Note: is it necessary to call something like x_Peek() of the actual
1132 // processor class to send this immediately? It should work without
1133 // this call and at the moment x_Peek() is not available here
1134 // if (m_Reply->IsOutputReady())
1135 // x_Peek(false);
1136 }
1137
1138
1139 void
OnPublicComment(CCassPublicCommentFetch * fetch_details,string comment,bool is_found)1140 CPSGS_CassBlobBase::OnPublicComment(
1141 CCassPublicCommentFetch * fetch_details,
1142 string comment,
1143 bool is_found)
1144 {
1145 CRequestContextResetter context_resetter;
1146 m_Request->SetRequestContext();
1147
1148 fetch_details->SetReadFinished();
1149
1150 if (m_Cancelled) {
1151 fetch_details->GetLoader()->Cancel();
1152 return;
1153 }
1154
1155 if (m_Request->NeedTrace()) {
1156 m_Reply->SendTrace(
1157 "Public comment callback; found: " + to_string(is_found),
1158 m_Request->GetStartTimestamp());
1159 }
1160
1161 if (is_found) {
1162 if (fetch_details->GetIdentification() ==
1163 CCassPublicCommentFetch::ePSGS_ById2) {
1164 m_Reply->PreparePublicComment(
1165 m_ProcessorId, comment,
1166 fetch_details->GetId2Chunk(),
1167 fetch_details->GetId2Info());
1168 } else {
1169 // There is no id2info in the originally requested blob
1170 // so just send blob prop completion without id2_chunk/id2_info
1171 m_Reply->PreparePublicComment(
1172 m_ProcessorId, comment,
1173 fetch_details->GetBlobId().ToString(),
1174 fetch_details->GetLastModified());
1175 }
1176 }
1177
1178 // Note: is it necessary to call something like x_Peek() of the actual
1179 // processor class to send this immediately? It should work without
1180 // this call and at the moment x_Peek() is not available here
1181 // if (m_Reply->IsOutputReady())
1182 // x_Peek(false);
1183 }
1184
1185