1 /* $Id: psgs_reply.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description:
29 *
30 */
31
32
33 #include <ncbi_pch.hpp>
34
35 #include "pending_operation.hpp"
36 #include "http_server_transport.hpp"
37 #include "psgs_reply.hpp"
38 #include "pubseq_gateway_utils.hpp"
39 #include "cass_fetch.hpp"
40
41
~CPSGS_Reply()42 CPSGS_Reply::~CPSGS_Reply()
43 {
44 if (m_ReplyOwned) {
45 delete m_Reply;
46 }
47 }
48
49
Flush(void)50 void CPSGS_Reply::Flush(void)
51 {
52 while (m_ChunksLock.exchange(true)) {}
53 m_Reply->Send(m_Chunks, true);
54 m_Chunks.clear();
55 m_ChunksLock = false;
56 }
57
58
Flush(bool is_last)59 void CPSGS_Reply::Flush(bool is_last)
60 {
61 while (m_ChunksLock.exchange(true)) {}
62 m_Reply->Send(m_Chunks, is_last);
63 m_Chunks.clear();
64 m_ChunksLock = false;
65
66 if (is_last)
67 m_Reply->CancelPending();
68 }
69
70
Clear(void)71 void CPSGS_Reply::Clear(void)
72 {
73 while (m_ChunksLock.exchange(true)) {}
74 m_Chunks.clear();
75 m_Reply = nullptr;
76 m_TotalSentReplyChunks = 0;
77 m_ChunksLock = false;
78 }
79
80
SetContentType(EPSGS_ReplyMimeType mime_type)81 void CPSGS_Reply::SetContentType(EPSGS_ReplyMimeType mime_type)
82 {
83 m_Reply->SetContentType(mime_type);
84 }
85
86
GetDataReadyCB(void)87 shared_ptr<CCassDataCallbackReceiver> CPSGS_Reply::GetDataReadyCB(void)
88 {
89 return m_Reply->GetDataReadyCB();
90 }
91
92
IsFinished(void) const93 bool CPSGS_Reply::IsFinished(void) const
94 {
95 return m_Reply->IsFinished();
96 }
97
98
IsOutputReady(void) const99 bool CPSGS_Reply::IsOutputReady(void) const
100 {
101 return m_Reply->IsOutputReady();
102 }
103
104
PrepareBioseqMessage(size_t item_id,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)105 void CPSGS_Reply::PrepareBioseqMessage(size_t item_id,
106 const string & processor_id,
107 const string & msg,
108 CRequestStatus::ECode status,
109 int err_code,
110 EDiagSev severity)
111 {
112 string header = GetBioseqMessageHeader(item_id, processor_id,
113 msg.size(), status,
114 err_code, severity);
115 while (m_ChunksLock.exchange(true)) {}
116 m_Chunks.push_back(m_Reply->PrepareChunk(
117 (const unsigned char *)(header.data()), header.size()));
118 m_Chunks.push_back(m_Reply->PrepareChunk(
119 (const unsigned char *)(msg.data()), msg.size()));
120 ++m_TotalSentReplyChunks;
121 m_ChunksLock = false;
122 }
123
124
125
PrepareBioseqData(size_t item_id,const string & processor_id,const string & content,SPSGS_ResolveRequest::EPSGS_OutputFormat output_format)126 void CPSGS_Reply::PrepareBioseqData(
127 size_t item_id,
128 const string & processor_id,
129 const string & content,
130 SPSGS_ResolveRequest::EPSGS_OutputFormat output_format)
131 {
132 string header = GetBioseqInfoHeader(item_id, processor_id,
133 content.size(), output_format);
134 while (m_ChunksLock.exchange(true)) {}
135 m_Chunks.push_back(m_Reply->PrepareChunk(
136 (const unsigned char *)(header.data()), header.size()));
137 m_Chunks.push_back(m_Reply->PrepareChunk(
138 (const unsigned char *)(content.data()), content.size()));
139 ++m_TotalSentReplyChunks;
140 m_ChunksLock = false;
141 }
142
143
PrepareBioseqCompletion(size_t item_id,const string & processor_id,size_t chunk_count)144 void CPSGS_Reply::PrepareBioseqCompletion(size_t item_id,
145 const string & processor_id,
146 size_t chunk_count)
147 {
148 string bioseq_meta = GetBioseqCompletionHeader(item_id,
149 processor_id,
150 chunk_count);
151 while (m_ChunksLock.exchange(true)) {}
152 m_Chunks.push_back(m_Reply->PrepareChunk(
153 (const unsigned char *)(bioseq_meta.data()),
154 bioseq_meta.size()));
155 ++m_TotalSentReplyChunks;
156 m_ChunksLock = false;
157 }
158
159
PrepareBlobPropMessage(size_t item_id,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)160 void CPSGS_Reply::PrepareBlobPropMessage(size_t item_id,
161 const string & processor_id,
162 const string & msg,
163 CRequestStatus::ECode status,
164 int err_code,
165 EDiagSev severity)
166 {
167 string header = GetBlobPropMessageHeader(item_id, processor_id,
168 msg.size(), status, err_code,
169 severity);
170 while (m_ChunksLock.exchange(true)) {}
171 m_Chunks.push_back(m_Reply->PrepareChunk(
172 (const unsigned char *)(header.data()), header.size()));
173 m_Chunks.push_back(m_Reply->PrepareChunk(
174 (const unsigned char *)(msg.data()), msg.size()));
175 ++m_TotalSentReplyChunks;
176 m_ChunksLock = false;
177 }
178
179
x_PrepareTSEBlobPropMessage(size_t item_id,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)180 void CPSGS_Reply::x_PrepareTSEBlobPropMessage(size_t item_id,
181 const string & processor_id,
182 int64_t id2_chunk,
183 const string & id2_info,
184 const string & msg,
185 CRequestStatus::ECode status,
186 int err_code,
187 EDiagSev severity)
188 {
189 string header = GetTSEBlobPropMessageHeader(
190 item_id, processor_id, id2_chunk, id2_info,
191 msg.size(), status, err_code, severity);
192 while (m_ChunksLock.exchange(true)) {}
193 m_Chunks.push_back(m_Reply->PrepareChunk(
194 (const unsigned char *)(header.data()), header.size()));
195 m_Chunks.push_back(m_Reply->PrepareChunk(
196 (const unsigned char *)(msg.data()), msg.size()));
197 ++m_TotalSentReplyChunks;
198 m_ChunksLock = false;
199 }
200
201
PrepareBlobPropMessage(CCassBlobFetch * fetch_details,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)202 void CPSGS_Reply::PrepareBlobPropMessage(CCassBlobFetch * fetch_details,
203 const string & processor_id,
204 const string & msg,
205 CRequestStatus::ECode status,
206 int err_code,
207 EDiagSev severity)
208 {
209 PrepareBlobPropMessage(fetch_details->GetBlobPropItemId(this),
210 processor_id, msg, status, err_code, severity);
211 fetch_details->IncrementTotalSentBlobChunks();
212 }
213
214
PrepareTSEBlobPropMessage(CCassBlobFetch * fetch_details,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)215 void CPSGS_Reply::PrepareTSEBlobPropMessage(CCassBlobFetch * fetch_details,
216 const string & processor_id,
217 int64_t id2_chunk,
218 const string & id2_info,
219 const string & msg,
220 CRequestStatus::ECode status,
221 int err_code,
222 EDiagSev severity)
223 {
224 x_PrepareTSEBlobPropMessage(fetch_details->GetBlobPropItemId(this),
225 processor_id, id2_chunk, id2_info, msg,
226 status, err_code, severity);
227 fetch_details->IncrementTotalSentBlobChunks();
228 }
229
230
PrepareBlobPropData(size_t item_id,const string & processor_id,const string & blob_id,const string & content,CBlobRecord::TTimestamp last_modified)231 void CPSGS_Reply::PrepareBlobPropData(size_t item_id,
232 const string & processor_id,
233 const string & blob_id,
234 const string & content,
235 CBlobRecord::TTimestamp last_modified)
236 {
237 string header = GetBlobPropHeader(item_id,
238 processor_id,
239 blob_id,
240 content.size(),
241 last_modified);
242 while (m_ChunksLock.exchange(true)) {}
243 m_Chunks.push_back(m_Reply->PrepareChunk(
244 (const unsigned char *)(header.data()), header.size()));
245 m_Chunks.push_back(m_Reply->PrepareChunk(
246 (const unsigned char *)(content.data()),
247 content.size()));
248 ++m_TotalSentReplyChunks;
249 m_ChunksLock = false;
250 }
251
252
PrepareBlobPropData(CCassBlobFetch * fetch_details,const string & processor_id,const string & content,CBlobRecord::TTimestamp last_modified)253 void CPSGS_Reply::PrepareBlobPropData(CCassBlobFetch * fetch_details,
254 const string & processor_id,
255 const string & content,
256 CBlobRecord::TTimestamp last_modified)
257 {
258 PrepareBlobPropData(fetch_details->GetBlobPropItemId(this),
259 processor_id,
260 fetch_details->GetBlobId().ToString(),
261 content,
262 last_modified);
263 fetch_details->IncrementTotalSentBlobChunks();
264 }
265
266
PrepareTSEBlobPropData(CCassBlobFetch * fetch_details,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & content)267 void CPSGS_Reply::PrepareTSEBlobPropData(CCassBlobFetch * fetch_details,
268 const string & processor_id,
269 int64_t id2_chunk,
270 const string & id2_info,
271 const string & content)
272 {
273 PrepareTSEBlobPropData(fetch_details->GetBlobPropItemId(this),
274 processor_id, id2_chunk, id2_info, content);
275 fetch_details->IncrementTotalSentBlobChunks();
276 }
277
278
PrepareTSEBlobPropData(size_t item_id,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & content)279 void CPSGS_Reply::PrepareTSEBlobPropData(size_t item_id,
280 const string & processor_id,
281 int64_t id2_chunk,
282 const string & id2_info,
283 const string & content)
284 {
285 string header = GetTSEBlobPropHeader(item_id,
286 processor_id,
287 id2_chunk, id2_info,
288 content.size());
289 while (m_ChunksLock.exchange(true)) {}
290 m_Chunks.push_back(m_Reply->PrepareChunk(
291 (const unsigned char *)(header.data()), header.size()));
292 m_Chunks.push_back(m_Reply->PrepareChunk(
293 (const unsigned char *)(content.data()),
294 content.size()));
295 ++m_TotalSentReplyChunks;
296 m_ChunksLock = false;
297 }
298
299
PrepareBlobData(size_t item_id,const string & processor_id,const string & blob_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,CBlobRecord::TTimestamp last_modified)300 void CPSGS_Reply::PrepareBlobData(size_t item_id,
301 const string & processor_id,
302 const string & blob_id,
303 const unsigned char * chunk_data,
304 unsigned int data_size,
305 int chunk_no,
306 CBlobRecord::TTimestamp last_modified)
307 {
308 ++m_TotalSentReplyChunks;
309
310 string header = GetBlobChunkHeader(
311 item_id,
312 processor_id,
313 blob_id,
314 data_size, chunk_no,
315 last_modified);
316 while (m_ChunksLock.exchange(true)) {}
317 m_Chunks.push_back(m_Reply->PrepareChunk(
318 (const unsigned char *)(header.data()),
319 header.size()));
320
321 if (data_size > 0 && chunk_data != nullptr)
322 m_Chunks.push_back(m_Reply->PrepareChunk(chunk_data, data_size));
323 m_ChunksLock = false;
324 }
325
326
PrepareBlobData(CCassBlobFetch * fetch_details,const string & processor_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,CBlobRecord::TTimestamp last_modified)327 void CPSGS_Reply::PrepareBlobData(CCassBlobFetch * fetch_details,
328 const string & processor_id,
329 const unsigned char * chunk_data,
330 unsigned int data_size,
331 int chunk_no,
332 CBlobRecord::TTimestamp last_modified)
333 {
334 fetch_details->IncrementTotalSentBlobChunks();
335 PrepareBlobData(fetch_details->GetBlobChunkItemId(this),
336 processor_id,
337 fetch_details->GetBlobId().ToString(),
338 chunk_data, data_size, chunk_no,
339 last_modified);
340 }
341
342
PrepareTSEBlobData(size_t item_id,const string & processor_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,int64_t id2_chunk,const string & id2_info)343 void CPSGS_Reply::PrepareTSEBlobData(size_t item_id,
344 const string & processor_id,
345 const unsigned char * chunk_data,
346 unsigned int data_size,
347 int chunk_no,
348 int64_t id2_chunk,
349 const string & id2_info)
350 {
351 ++m_TotalSentReplyChunks;
352
353 string header = GetTSEBlobChunkHeader(
354 item_id,
355 processor_id,
356 data_size, chunk_no,
357 id2_chunk, id2_info);
358 while (m_ChunksLock.exchange(true)) {}
359 m_Chunks.push_back(m_Reply->PrepareChunk(
360 (const unsigned char *)(header.data()),
361 header.size()));
362
363 if (data_size > 0 && chunk_data != nullptr)
364 m_Chunks.push_back(m_Reply->PrepareChunk(chunk_data, data_size));
365 m_ChunksLock = false;
366 }
367
368
PrepareTSEBlobData(CCassBlobFetch * fetch_details,const string & processor_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,int64_t id2_chunk,const string & id2_info)369 void CPSGS_Reply::PrepareTSEBlobData(CCassBlobFetch * fetch_details,
370 const string & processor_id,
371 const unsigned char * chunk_data,
372 unsigned int data_size,
373 int chunk_no,
374 int64_t id2_chunk,
375 const string & id2_info)
376 {
377 fetch_details->IncrementTotalSentBlobChunks();
378 PrepareTSEBlobData(fetch_details->GetBlobChunkItemId(this),
379 processor_id,
380 chunk_data, data_size, chunk_no,
381 id2_chunk, id2_info);
382 }
383
384
PrepareBlobPropCompletion(size_t item_id,const string & processor_id,size_t chunk_count)385 void CPSGS_Reply::PrepareBlobPropCompletion(size_t item_id,
386 const string & processor_id,
387 size_t chunk_count)
388 {
389 string blob_prop_meta = GetBlobPropCompletionHeader(item_id,
390 processor_id,
391 chunk_count);
392 while (m_ChunksLock.exchange(true)) {}
393 m_Chunks.push_back(m_Reply->PrepareChunk(
394 (const unsigned char *)(blob_prop_meta.data()),
395 blob_prop_meta.size()));
396 ++m_TotalSentReplyChunks;
397 m_ChunksLock = false;
398 }
399
400
x_PrepareTSEBlobPropCompletion(size_t item_id,const string & processor_id,size_t chunk_count)401 void CPSGS_Reply::x_PrepareTSEBlobPropCompletion(size_t item_id,
402 const string & processor_id,
403 size_t chunk_count)
404 {
405 string blob_prop_meta = GetTSEBlobPropCompletionHeader(item_id,
406 processor_id,
407 chunk_count);
408 while (m_ChunksLock.exchange(true)) {}
409 m_Chunks.push_back(m_Reply->PrepareChunk(
410 (const unsigned char *)(blob_prop_meta.data()),
411 blob_prop_meta.size()));
412 ++m_TotalSentReplyChunks;
413 m_ChunksLock = false;
414 }
415
416
PrepareBlobPropCompletion(CCassBlobFetch * fetch_details,const string & processor_id)417 void CPSGS_Reply::PrepareBlobPropCompletion(CCassBlobFetch * fetch_details,
418 const string & processor_id)
419 {
420 // +1 is for the completion itself
421 PrepareBlobPropCompletion(fetch_details->GetBlobPropItemId(this),
422 processor_id,
423 fetch_details->GetTotalSentBlobChunks() + 1);
424
425 // From now the counter will count chunks for the blob data
426 fetch_details->ResetTotalSentBlobChunks();
427 fetch_details->SetBlobPropSent();
428 }
429
430
PrepareTSEBlobPropCompletion(CCassBlobFetch * fetch_details,const string & processor_id)431 void CPSGS_Reply::PrepareTSEBlobPropCompletion(CCassBlobFetch * fetch_details,
432 const string & processor_id)
433 {
434 // +1 is for the completion itself
435 x_PrepareTSEBlobPropCompletion(fetch_details->GetBlobPropItemId(this),
436 processor_id,
437 fetch_details->GetTotalSentBlobChunks() + 1);
438
439 // From now the counter will count chunks for the blob data
440 fetch_details->ResetTotalSentBlobChunks();
441 fetch_details->SetBlobPropSent();
442 }
443
444
PrepareBlobMessage(size_t item_id,const string & processor_id,const string & blob_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity,CBlobRecord::TTimestamp last_modified)445 void CPSGS_Reply::PrepareBlobMessage(size_t item_id,
446 const string & processor_id,
447 const string & blob_id,
448 const string & msg,
449 CRequestStatus::ECode status,
450 int err_code,
451 EDiagSev severity,
452 CBlobRecord::TTimestamp last_modified)
453 {
454 string header = GetBlobMessageHeader(item_id, processor_id,
455 blob_id, msg.size(),
456 status, err_code, severity,
457 last_modified);
458 while (m_ChunksLock.exchange(true)) {}
459 m_Chunks.push_back(m_Reply->PrepareChunk(
460 (const unsigned char *)(header.data()), header.size()));
461 m_Chunks.push_back(m_Reply->PrepareChunk(
462 (const unsigned char *)(msg.data()), msg.size()));
463 ++m_TotalSentReplyChunks;
464 m_ChunksLock = false;
465 }
466
467
PrepareBlobMessage(CCassBlobFetch * fetch_details,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity,CBlobRecord::TTimestamp last_modified)468 void CPSGS_Reply::PrepareBlobMessage(CCassBlobFetch * fetch_details,
469 const string & processor_id,
470 const string & msg,
471 CRequestStatus::ECode status,
472 int err_code,
473 EDiagSev severity,
474 CBlobRecord::TTimestamp last_modified)
475 {
476 PrepareBlobMessage(fetch_details->GetBlobChunkItemId(this),
477 processor_id,
478 fetch_details->GetBlobId().ToString(),
479 msg, status, err_code, severity, last_modified);
480 fetch_details->IncrementTotalSentBlobChunks();
481 }
482
483
x_PrepareTSEBlobMessage(size_t item_id,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)484 void CPSGS_Reply::x_PrepareTSEBlobMessage(size_t item_id,
485 const string & processor_id,
486 int64_t id2_chunk,
487 const string & id2_info,
488 const string & msg,
489 CRequestStatus::ECode status,
490 int err_code,
491 EDiagSev severity)
492 {
493 string header = GetTSEBlobMessageHeader(item_id, processor_id,
494 id2_chunk, id2_info, msg.size(),
495 status, err_code, severity);
496 while (m_ChunksLock.exchange(true)) {}
497 m_Chunks.push_back(m_Reply->PrepareChunk(
498 (const unsigned char *)(header.data()), header.size()));
499 m_Chunks.push_back(m_Reply->PrepareChunk(
500 (const unsigned char *)(msg.data()), msg.size()));
501 ++m_TotalSentReplyChunks;
502 m_ChunksLock = false;
503 }
504
505
PrepareTSEBlobMessage(CCassBlobFetch * fetch_details,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)506 void CPSGS_Reply::PrepareTSEBlobMessage(CCassBlobFetch * fetch_details,
507 const string & processor_id,
508 int64_t id2_chunk,
509 const string & id2_info,
510 const string & msg,
511 CRequestStatus::ECode status, int err_code,
512 EDiagSev severity)
513 {
514 x_PrepareTSEBlobMessage(fetch_details->GetBlobChunkItemId(this),
515 processor_id, id2_chunk, id2_info,
516 msg, status, err_code, severity);
517 fetch_details->IncrementTotalSentBlobChunks();
518 }
519
520
PrepareBlobCompletion(size_t item_id,const string & processor_id,size_t chunk_count)521 void CPSGS_Reply::PrepareBlobCompletion(size_t item_id,
522 const string & processor_id,
523 size_t chunk_count)
524 {
525 string completion = GetBlobCompletionHeader(item_id, processor_id,
526 chunk_count);
527 while (m_ChunksLock.exchange(true)) {}
528 m_Chunks.push_back(m_Reply->PrepareChunk(
529 (const unsigned char *)(completion.data()),
530 completion.size()));
531 ++m_TotalSentReplyChunks;
532 m_ChunksLock = false;
533 }
534
535
PrepareTSEBlobCompletion(CCassBlobFetch * fetch_details,const string & processor_id)536 void CPSGS_Reply::PrepareTSEBlobCompletion(CCassBlobFetch * fetch_details,
537 const string & processor_id)
538 {
539 // +1 is for the completion itself
540 PrepareTSEBlobCompletion(fetch_details->GetBlobChunkItemId(this),
541 processor_id,
542 fetch_details->GetTotalSentBlobChunks() + 1);
543 fetch_details->IncrementTotalSentBlobChunks();
544 }
545
546
PrepareTSEBlobCompletion(size_t item_id,const string & processor_id,size_t chunk_count)547 void CPSGS_Reply::PrepareTSEBlobCompletion(size_t item_id,
548 const string & processor_id,
549 size_t chunk_count)
550 {
551 string completion = GetTSEBlobCompletionHeader(item_id, processor_id,
552 chunk_count);
553 while (m_ChunksLock.exchange(true)) {}
554 m_Chunks.push_back(m_Reply->PrepareChunk(
555 (const unsigned char *)(completion.data()),
556 completion.size()));
557 ++m_TotalSentReplyChunks;
558 m_ChunksLock = false;
559 }
560
561
PrepareBlobExcluded(const string & blob_id,const string & processor_id,EPSGS_BlobSkipReason skip_reason,CBlobRecord::TTimestamp last_modified)562 void CPSGS_Reply::PrepareBlobExcluded(const string & blob_id,
563 const string & processor_id,
564 EPSGS_BlobSkipReason skip_reason,
565 CBlobRecord::TTimestamp last_modified)
566 {
567 string exclude = GetBlobExcludeHeader(GetItemId(), processor_id,
568 blob_id, skip_reason, last_modified);
569 while (m_ChunksLock.exchange(true)) {}
570 m_Chunks.push_back(m_Reply->PrepareChunk(
571 (const unsigned char *)(exclude.data()),
572 exclude.size()));
573 ++m_TotalSentReplyChunks;
574 m_ChunksLock = false;
575 }
576
577
PrepareTSEBlobExcluded(const string & processor_id,int64_t id2_chunk,const string & id2_info,EPSGS_BlobSkipReason skip_reason)578 void CPSGS_Reply::PrepareTSEBlobExcluded(const string & processor_id,
579 int64_t id2_chunk,
580 const string & id2_info,
581 EPSGS_BlobSkipReason skip_reason)
582 {
583 string exclude = GetTSEBlobExcludeHeader(GetItemId(), processor_id,
584 id2_chunk, id2_info, skip_reason);
585 while (m_ChunksLock.exchange(true)) {}
586 m_Chunks.push_back(m_Reply->PrepareChunk(
587 (const unsigned char *)(exclude.data()),
588 exclude.size()));
589 ++m_TotalSentReplyChunks;
590 m_ChunksLock = false;
591 }
592
593
PrepareBlobExcluded(size_t item_id,const string & processor_id,const string & blob_id,EPSGS_BlobSkipReason skip_reason)594 void CPSGS_Reply::PrepareBlobExcluded(size_t item_id,
595 const string & processor_id,
596 const string & blob_id,
597 EPSGS_BlobSkipReason skip_reason)
598 {
599 string exclude = GetBlobExcludeHeader(item_id, processor_id,
600 blob_id, skip_reason);
601 while (m_ChunksLock.exchange(true)) {}
602 m_Chunks.push_back(m_Reply->PrepareChunk(
603 (const unsigned char *)(exclude.data()),
604 exclude.size()));
605 ++m_TotalSentReplyChunks;
606 m_ChunksLock = false;
607 }
608
609
PrepareBlobCompletion(CCassBlobFetch * fetch_details,const string & processor_id)610 void CPSGS_Reply::PrepareBlobCompletion(CCassBlobFetch * fetch_details,
611 const string & processor_id)
612 {
613 // +1 is for the completion itself
614 PrepareBlobCompletion(fetch_details->GetBlobChunkItemId(this),
615 processor_id,
616 fetch_details->GetTotalSentBlobChunks() + 1);
617 fetch_details->IncrementTotalSentBlobChunks();
618 }
619
620
PrepareReplyMessage(const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)621 void CPSGS_Reply::PrepareReplyMessage(const string & msg,
622 CRequestStatus::ECode status,
623 int err_code,
624 EDiagSev severity)
625 {
626 string header = GetReplyMessageHeader(msg.size(),
627 status, err_code, severity);
628 while (m_ChunksLock.exchange(true)) {}
629 m_Chunks.push_back(m_Reply->PrepareChunk(
630 (const unsigned char *)(header.data()), header.size()));
631 m_Chunks.push_back(m_Reply->PrepareChunk(
632 (const unsigned char *)(msg.data()), msg.size()));
633 ++m_TotalSentReplyChunks;
634 m_ChunksLock = false;
635 }
636
637
PrepareProcessorMessage(size_t item_id,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)638 void CPSGS_Reply::PrepareProcessorMessage(size_t item_id,
639 const string & processor_id,
640 const string & msg,
641 CRequestStatus::ECode status,
642 int err_code,
643 EDiagSev severity)
644 {
645 string header = GetProcessorMessageHeader(item_id, processor_id,
646 msg.size(),
647 status, err_code, severity);
648 string completion = GetProcessorMessageCompletionHeader(item_id,
649 processor_id,
650 2);
651 while (m_ChunksLock.exchange(true)) {}
652 m_Chunks.push_back(m_Reply->PrepareChunk(
653 (const unsigned char *)(header.data()), header.size()));
654 m_Chunks.push_back(m_Reply->PrepareChunk(
655 (const unsigned char *)(msg.data()), msg.size()));
656 ++m_TotalSentReplyChunks;
657
658 m_Chunks.push_back(m_Reply->PrepareChunk(
659 (const unsigned char *)(completion.data()), completion.size()));
660 ++m_TotalSentReplyChunks;
661 m_ChunksLock = false;
662 }
663
664
PreparePublicComment(const string & processor_id,const string & public_comment,const string & blob_id,CBlobRecord::TTimestamp last_modified)665 void CPSGS_Reply::PreparePublicComment(const string & processor_id,
666 const string & public_comment,
667 const string & blob_id,
668 CBlobRecord::TTimestamp last_modified)
669 {
670 auto item_id = GetItemId();
671 string header = GetPublicCommentHeader(item_id, processor_id, blob_id,
672 last_modified, public_comment.size());
673 string completion = GetPublicCommentCompletionHeader(item_id,
674 processor_id,
675 2);
676 while (m_ChunksLock.exchange(true)) {}
677 m_Chunks.push_back(m_Reply->PrepareChunk(
678 (const unsigned char *)(header.data()), header.size()));
679 m_Chunks.push_back(m_Reply->PrepareChunk(
680 (const unsigned char *)(public_comment.data()), public_comment.size()));
681 ++m_TotalSentReplyChunks;
682
683 m_Chunks.push_back(m_Reply->PrepareChunk(
684 (const unsigned char *)(completion.data()), completion.size()));
685 ++m_TotalSentReplyChunks;
686 m_ChunksLock = false;
687 }
688
689
PreparePublicComment(const string & processor_id,const string & public_comment,int64_t id2_chunk,const string & id2_info)690 void CPSGS_Reply::PreparePublicComment(const string & processor_id,
691 const string & public_comment,
692 int64_t id2_chunk,
693 const string & id2_info)
694 {
695 auto item_id = GetItemId();
696 string header = GetPublicCommentHeader(item_id, processor_id, id2_chunk,
697 id2_info, public_comment.size());
698 string completion = GetPublicCommentCompletionHeader(item_id,
699 processor_id,
700 2);
701 while (m_ChunksLock.exchange(true)) {}
702 m_Chunks.push_back(m_Reply->PrepareChunk(
703 (const unsigned char *)(header.data()), header.size()));
704 m_Chunks.push_back(m_Reply->PrepareChunk(
705 (const unsigned char *)(public_comment.data()), public_comment.size()));
706 ++m_TotalSentReplyChunks;
707
708 m_Chunks.push_back(m_Reply->PrepareChunk(
709 (const unsigned char *)(completion.data()), completion.size()));
710 ++m_TotalSentReplyChunks;
711 m_ChunksLock = false;
712 }
713
714
PrepareNamedAnnotationData(const string & annot_name,const string & processor_id,const string & content)715 void CPSGS_Reply::PrepareNamedAnnotationData(const string & annot_name,
716 const string & processor_id,
717 const string & content)
718 {
719 size_t item_id = GetItemId();
720 string header = GetNamedAnnotationHeader(item_id, processor_id,
721 annot_name, content.size());
722 // There are always 2 chunks
723 string bioseq_na_meta = GetNamedAnnotationCompletionHeader(item_id,
724 processor_id,
725 2);
726 while (m_ChunksLock.exchange(true)) {}
727 m_Chunks.push_back(m_Reply->PrepareChunk(
728 (const unsigned char *)(header.data()), header.size()));
729 m_Chunks.push_back(m_Reply->PrepareChunk(
730 (const unsigned char *)(content.data()), content.size()));
731 ++m_TotalSentReplyChunks;
732
733 m_Chunks.push_back(m_Reply->PrepareChunk(
734 (const unsigned char *)(bioseq_na_meta.data()),
735 bioseq_na_meta.size()));
736 ++m_TotalSentReplyChunks;
737 m_ChunksLock = false;
738 }
739
740
PrepareReplyCompletion(void)741 void CPSGS_Reply::PrepareReplyCompletion(void)
742 {
743 while (m_ChunksLock.exchange(true)) {}
744 ++m_TotalSentReplyChunks;
745
746 string reply_completion = GetReplyCompletionHeader(m_TotalSentReplyChunks);
747 m_Chunks.push_back(m_Reply->PrepareChunk(
748 (const unsigned char *)(reply_completion.data()),
749 reply_completion.size()));
750 m_ChunksLock = false;
751 }
752
753
SendTrace(const string & msg,const TPSGS_HighResolutionTimePoint & create_timestamp)754 void CPSGS_Reply::SendTrace(const string & msg,
755 const TPSGS_HighResolutionTimePoint & create_timestamp)
756 {
757 auto now = chrono::high_resolution_clock::now();
758 uint64_t mks = chrono::duration_cast<chrono::microseconds>
759 (now - create_timestamp).count();
760 string timestamp = "Timestamp (mks): " + to_string(mks) + "\n";
761
762 PrepareReplyMessage(timestamp + msg,
763 CRequestStatus::e200_Ok, 0, eDiag_Trace);
764 }
765
766
SendData(const string & data_to_send,EPSGS_ReplyMimeType mime_type)767 void CPSGS_Reply::SendData(const string & data_to_send,
768 EPSGS_ReplyMimeType mime_type)
769 {
770 m_Reply->SetContentType(mime_type);
771 m_Reply->SetContentLength(data_to_send.length());
772 m_Reply->SendOk(data_to_send.data(), data_to_send.length(), false);
773 }
774
775