1 /*  $Id: psgs_reply.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  *
30  */
31 
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "pending_operation.hpp"
36 #include "http_server_transport.hpp"
37 #include "psgs_reply.hpp"
38 #include "pubseq_gateway_utils.hpp"
39 #include "cass_fetch.hpp"
40 
41 
~CPSGS_Reply()42 CPSGS_Reply::~CPSGS_Reply()
43 {
44     if (m_ReplyOwned) {
45         delete m_Reply;
46     }
47 }
48 
49 
Flush(void)50 void CPSGS_Reply::Flush(void)
51 {
52     while (m_ChunksLock.exchange(true)) {}
53     m_Reply->Send(m_Chunks, true);
54     m_Chunks.clear();
55     m_ChunksLock = false;
56 }
57 
58 
Flush(bool is_last)59 void CPSGS_Reply::Flush(bool  is_last)
60 {
61     while (m_ChunksLock.exchange(true)) {}
62     m_Reply->Send(m_Chunks, is_last);
63     m_Chunks.clear();
64     m_ChunksLock = false;
65 
66     if (is_last)
67         m_Reply->CancelPending();
68 }
69 
70 
Clear(void)71 void CPSGS_Reply::Clear(void)
72 {
73     while (m_ChunksLock.exchange(true)) {}
74     m_Chunks.clear();
75     m_Reply = nullptr;
76     m_TotalSentReplyChunks = 0;
77     m_ChunksLock = false;
78 }
79 
80 
SetContentType(EPSGS_ReplyMimeType mime_type)81 void CPSGS_Reply::SetContentType(EPSGS_ReplyMimeType  mime_type)
82 {
83     m_Reply->SetContentType(mime_type);
84 }
85 
86 
GetDataReadyCB(void)87 shared_ptr<CCassDataCallbackReceiver> CPSGS_Reply::GetDataReadyCB(void)
88 {
89     return m_Reply->GetDataReadyCB();
90 }
91 
92 
IsFinished(void) const93 bool CPSGS_Reply::IsFinished(void) const
94 {
95     return m_Reply->IsFinished();
96 }
97 
98 
IsOutputReady(void) const99 bool CPSGS_Reply::IsOutputReady(void) const
100 {
101     return m_Reply->IsOutputReady();
102 }
103 
104 
PrepareBioseqMessage(size_t item_id,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)105 void CPSGS_Reply::PrepareBioseqMessage(size_t  item_id,
106                                        const string &  processor_id,
107                                        const string &  msg,
108                                        CRequestStatus::ECode  status,
109                                        int  err_code,
110                                        EDiagSev  severity)
111 {
112     string  header = GetBioseqMessageHeader(item_id, processor_id,
113                                             msg.size(), status,
114                                             err_code, severity);
115     while (m_ChunksLock.exchange(true)) {}
116     m_Chunks.push_back(m_Reply->PrepareChunk(
117                 (const unsigned char *)(header.data()), header.size()));
118     m_Chunks.push_back(m_Reply->PrepareChunk(
119                 (const unsigned char *)(msg.data()), msg.size()));
120     ++m_TotalSentReplyChunks;
121     m_ChunksLock = false;
122 }
123 
124 
125 
PrepareBioseqData(size_t item_id,const string & processor_id,const string & content,SPSGS_ResolveRequest::EPSGS_OutputFormat output_format)126 void CPSGS_Reply::PrepareBioseqData(
127                     size_t  item_id,
128                     const string &  processor_id,
129                     const string &  content,
130                     SPSGS_ResolveRequest::EPSGS_OutputFormat  output_format)
131 {
132     string      header = GetBioseqInfoHeader(item_id, processor_id,
133                                              content.size(), output_format);
134     while (m_ChunksLock.exchange(true)) {}
135     m_Chunks.push_back(m_Reply->PrepareChunk(
136                 (const unsigned char *)(header.data()), header.size()));
137     m_Chunks.push_back(m_Reply->PrepareChunk(
138                 (const unsigned char *)(content.data()), content.size()));
139     ++m_TotalSentReplyChunks;
140     m_ChunksLock = false;
141 }
142 
143 
PrepareBioseqCompletion(size_t item_id,const string & processor_id,size_t chunk_count)144 void CPSGS_Reply::PrepareBioseqCompletion(size_t  item_id,
145                                           const string &  processor_id,
146                                           size_t  chunk_count)
147 {
148     string      bioseq_meta = GetBioseqCompletionHeader(item_id,
149                                                         processor_id,
150                                                         chunk_count);
151     while (m_ChunksLock.exchange(true)) {}
152     m_Chunks.push_back(m_Reply->PrepareChunk(
153                 (const unsigned char *)(bioseq_meta.data()),
154                 bioseq_meta.size()));
155     ++m_TotalSentReplyChunks;
156     m_ChunksLock = false;
157 }
158 
159 
PrepareBlobPropMessage(size_t item_id,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)160 void CPSGS_Reply::PrepareBlobPropMessage(size_t                 item_id,
161                                          const string &         processor_id,
162                                          const string &         msg,
163                                          CRequestStatus::ECode  status,
164                                          int                    err_code,
165                                          EDiagSev               severity)
166 {
167     string      header = GetBlobPropMessageHeader(item_id, processor_id,
168                                                   msg.size(), status, err_code,
169                                                   severity);
170     while (m_ChunksLock.exchange(true)) {}
171     m_Chunks.push_back(m_Reply->PrepareChunk(
172                 (const unsigned char *)(header.data()), header.size()));
173     m_Chunks.push_back(m_Reply->PrepareChunk(
174                 (const unsigned char *)(msg.data()), msg.size()));
175     ++m_TotalSentReplyChunks;
176     m_ChunksLock = false;
177 }
178 
179 
x_PrepareTSEBlobPropMessage(size_t item_id,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)180 void CPSGS_Reply::x_PrepareTSEBlobPropMessage(size_t                 item_id,
181                                               const string &         processor_id,
182                                               int64_t                id2_chunk,
183                                               const string &         id2_info,
184                                               const string &         msg,
185                                               CRequestStatus::ECode  status,
186                                               int                    err_code,
187                                               EDiagSev               severity)
188 {
189     string      header = GetTSEBlobPropMessageHeader(
190                                 item_id, processor_id, id2_chunk, id2_info,
191                                 msg.size(), status, err_code, severity);
192     while (m_ChunksLock.exchange(true)) {}
193     m_Chunks.push_back(m_Reply->PrepareChunk(
194                 (const unsigned char *)(header.data()), header.size()));
195     m_Chunks.push_back(m_Reply->PrepareChunk(
196                 (const unsigned char *)(msg.data()), msg.size()));
197     ++m_TotalSentReplyChunks;
198     m_ChunksLock = false;
199 }
200 
201 
PrepareBlobPropMessage(CCassBlobFetch * fetch_details,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)202 void CPSGS_Reply::PrepareBlobPropMessage(CCassBlobFetch *       fetch_details,
203                                          const string &         processor_id,
204                                          const string &         msg,
205                                          CRequestStatus::ECode  status,
206                                          int                    err_code,
207                                          EDiagSev               severity)
208 {
209     PrepareBlobPropMessage(fetch_details->GetBlobPropItemId(this),
210                            processor_id, msg, status, err_code, severity);
211     fetch_details->IncrementTotalSentBlobChunks();
212 }
213 
214 
PrepareTSEBlobPropMessage(CCassBlobFetch * fetch_details,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)215 void CPSGS_Reply::PrepareTSEBlobPropMessage(CCassBlobFetch *       fetch_details,
216                                             const string &         processor_id,
217                                             int64_t                id2_chunk,
218                                             const string &         id2_info,
219                                             const string &         msg,
220                                             CRequestStatus::ECode  status,
221                                             int                    err_code,
222                                             EDiagSev               severity)
223 {
224     x_PrepareTSEBlobPropMessage(fetch_details->GetBlobPropItemId(this),
225                                 processor_id, id2_chunk, id2_info, msg,
226                                 status, err_code, severity);
227     fetch_details->IncrementTotalSentBlobChunks();
228 }
229 
230 
PrepareBlobPropData(size_t item_id,const string & processor_id,const string & blob_id,const string & content,CBlobRecord::TTimestamp last_modified)231 void CPSGS_Reply::PrepareBlobPropData(size_t                   item_id,
232                                       const string &           processor_id,
233                                       const string &           blob_id,
234                                       const string &           content,
235                                       CBlobRecord::TTimestamp  last_modified)
236 {
237     string  header = GetBlobPropHeader(item_id,
238                                        processor_id,
239                                        blob_id,
240                                        content.size(),
241                                        last_modified);
242     while (m_ChunksLock.exchange(true)) {}
243     m_Chunks.push_back(m_Reply->PrepareChunk(
244                     (const unsigned char *)(header.data()), header.size()));
245     m_Chunks.push_back(m_Reply->PrepareChunk(
246                     (const unsigned char *)(content.data()),
247                     content.size()));
248     ++m_TotalSentReplyChunks;
249     m_ChunksLock = false;
250 }
251 
252 
PrepareBlobPropData(CCassBlobFetch * fetch_details,const string & processor_id,const string & content,CBlobRecord::TTimestamp last_modified)253 void CPSGS_Reply::PrepareBlobPropData(CCassBlobFetch *         fetch_details,
254                                       const string &           processor_id,
255                                       const string &           content,
256                                       CBlobRecord::TTimestamp  last_modified)
257 {
258     PrepareBlobPropData(fetch_details->GetBlobPropItemId(this),
259                         processor_id,
260                         fetch_details->GetBlobId().ToString(),
261                         content,
262                         last_modified);
263     fetch_details->IncrementTotalSentBlobChunks();
264 }
265 
266 
PrepareTSEBlobPropData(CCassBlobFetch * fetch_details,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & content)267 void CPSGS_Reply::PrepareTSEBlobPropData(CCassBlobFetch *  fetch_details,
268                                          const string &    processor_id,
269                                          int64_t           id2_chunk,
270                                          const string &    id2_info,
271                                          const string &    content)
272 {
273     PrepareTSEBlobPropData(fetch_details->GetBlobPropItemId(this),
274                            processor_id, id2_chunk, id2_info, content);
275     fetch_details->IncrementTotalSentBlobChunks();
276 }
277 
278 
PrepareTSEBlobPropData(size_t item_id,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & content)279 void CPSGS_Reply::PrepareTSEBlobPropData(size_t  item_id,
280                                          const string &    processor_id,
281                                          int64_t           id2_chunk,
282                                          const string &    id2_info,
283                                          const string &    content)
284 {
285     string  header = GetTSEBlobPropHeader(item_id,
286                                           processor_id,
287                                           id2_chunk, id2_info,
288                                           content.size());
289     while (m_ChunksLock.exchange(true)) {}
290     m_Chunks.push_back(m_Reply->PrepareChunk(
291                     (const unsigned char *)(header.data()), header.size()));
292     m_Chunks.push_back(m_Reply->PrepareChunk(
293                     (const unsigned char *)(content.data()),
294                     content.size()));
295     ++m_TotalSentReplyChunks;
296     m_ChunksLock = false;
297 }
298 
299 
PrepareBlobData(size_t item_id,const string & processor_id,const string & blob_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,CBlobRecord::TTimestamp last_modified)300 void CPSGS_Reply::PrepareBlobData(size_t                   item_id,
301                                   const string &           processor_id,
302                                   const string &           blob_id,
303                                   const unsigned char *    chunk_data,
304                                   unsigned int             data_size,
305                                   int                      chunk_no,
306                                   CBlobRecord::TTimestamp  last_modified)
307 {
308     ++m_TotalSentReplyChunks;
309 
310     string  header = GetBlobChunkHeader(
311                             item_id,
312                             processor_id,
313                             blob_id,
314                             data_size, chunk_no,
315                             last_modified);
316     while (m_ChunksLock.exchange(true)) {}
317     m_Chunks.push_back(m_Reply->PrepareChunk(
318                     (const unsigned char *)(header.data()),
319                     header.size()));
320 
321     if (data_size > 0 && chunk_data != nullptr)
322         m_Chunks.push_back(m_Reply->PrepareChunk(chunk_data, data_size));
323     m_ChunksLock = false;
324 }
325 
326 
PrepareBlobData(CCassBlobFetch * fetch_details,const string & processor_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,CBlobRecord::TTimestamp last_modified)327 void CPSGS_Reply::PrepareBlobData(CCassBlobFetch *         fetch_details,
328                                   const string &           processor_id,
329                                   const unsigned char *    chunk_data,
330                                   unsigned int             data_size,
331                                   int                      chunk_no,
332                                   CBlobRecord::TTimestamp  last_modified)
333 {
334     fetch_details->IncrementTotalSentBlobChunks();
335     PrepareBlobData(fetch_details->GetBlobChunkItemId(this),
336                     processor_id,
337                     fetch_details->GetBlobId().ToString(),
338                     chunk_data, data_size, chunk_no,
339                     last_modified);
340 }
341 
342 
PrepareTSEBlobData(size_t item_id,const string & processor_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,int64_t id2_chunk,const string & id2_info)343 void CPSGS_Reply::PrepareTSEBlobData(size_t                 item_id,
344                                      const string &         processor_id,
345                                      const unsigned char *  chunk_data,
346                                      unsigned int           data_size,
347                                      int                    chunk_no,
348                                      int64_t                id2_chunk,
349                                      const string &         id2_info)
350 {
351     ++m_TotalSentReplyChunks;
352 
353     string  header = GetTSEBlobChunkHeader(
354                             item_id,
355                             processor_id,
356                             data_size, chunk_no,
357                             id2_chunk, id2_info);
358     while (m_ChunksLock.exchange(true)) {}
359     m_Chunks.push_back(m_Reply->PrepareChunk(
360                     (const unsigned char *)(header.data()),
361                     header.size()));
362 
363     if (data_size > 0 && chunk_data != nullptr)
364         m_Chunks.push_back(m_Reply->PrepareChunk(chunk_data, data_size));
365     m_ChunksLock = false;
366 }
367 
368 
PrepareTSEBlobData(CCassBlobFetch * fetch_details,const string & processor_id,const unsigned char * chunk_data,unsigned int data_size,int chunk_no,int64_t id2_chunk,const string & id2_info)369 void CPSGS_Reply::PrepareTSEBlobData(CCassBlobFetch *  fetch_details,
370                                      const string &  processor_id,
371                                      const unsigned char *  chunk_data,
372                                      unsigned int  data_size,
373                                      int  chunk_no,
374                                      int64_t  id2_chunk,
375                                      const string &  id2_info)
376 {
377     fetch_details->IncrementTotalSentBlobChunks();
378     PrepareTSEBlobData(fetch_details->GetBlobChunkItemId(this),
379                        processor_id,
380                        chunk_data, data_size, chunk_no,
381                        id2_chunk, id2_info);
382 }
383 
384 
PrepareBlobPropCompletion(size_t item_id,const string & processor_id,size_t chunk_count)385 void CPSGS_Reply::PrepareBlobPropCompletion(size_t  item_id,
386                                             const string &  processor_id,
387                                             size_t  chunk_count)
388 {
389     string      blob_prop_meta = GetBlobPropCompletionHeader(item_id,
390                                                              processor_id,
391                                                              chunk_count);
392     while (m_ChunksLock.exchange(true)) {}
393     m_Chunks.push_back(m_Reply->PrepareChunk(
394                     (const unsigned char *)(blob_prop_meta.data()),
395                     blob_prop_meta.size()));
396     ++m_TotalSentReplyChunks;
397     m_ChunksLock = false;
398 }
399 
400 
x_PrepareTSEBlobPropCompletion(size_t item_id,const string & processor_id,size_t chunk_count)401 void CPSGS_Reply::x_PrepareTSEBlobPropCompletion(size_t          item_id,
402                                                  const string &  processor_id,
403                                                  size_t          chunk_count)
404 {
405     string      blob_prop_meta = GetTSEBlobPropCompletionHeader(item_id,
406                                                                 processor_id,
407                                                                 chunk_count);
408     while (m_ChunksLock.exchange(true)) {}
409     m_Chunks.push_back(m_Reply->PrepareChunk(
410                     (const unsigned char *)(blob_prop_meta.data()),
411                     blob_prop_meta.size()));
412     ++m_TotalSentReplyChunks;
413     m_ChunksLock = false;
414 }
415 
416 
PrepareBlobPropCompletion(CCassBlobFetch * fetch_details,const string & processor_id)417 void CPSGS_Reply::PrepareBlobPropCompletion(CCassBlobFetch *  fetch_details,
418                                             const string &  processor_id)
419 {
420     // +1 is for the completion itself
421     PrepareBlobPropCompletion(fetch_details->GetBlobPropItemId(this),
422                               processor_id,
423                               fetch_details->GetTotalSentBlobChunks() + 1);
424 
425     // From now the counter will count chunks for the blob data
426     fetch_details->ResetTotalSentBlobChunks();
427     fetch_details->SetBlobPropSent();
428 }
429 
430 
PrepareTSEBlobPropCompletion(CCassBlobFetch * fetch_details,const string & processor_id)431 void CPSGS_Reply::PrepareTSEBlobPropCompletion(CCassBlobFetch *  fetch_details,
432                                                const string &  processor_id)
433 {
434     // +1 is for the completion itself
435     x_PrepareTSEBlobPropCompletion(fetch_details->GetBlobPropItemId(this),
436                                    processor_id,
437                                    fetch_details->GetTotalSentBlobChunks() + 1);
438 
439     // From now the counter will count chunks for the blob data
440     fetch_details->ResetTotalSentBlobChunks();
441     fetch_details->SetBlobPropSent();
442 }
443 
444 
PrepareBlobMessage(size_t item_id,const string & processor_id,const string & blob_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity,CBlobRecord::TTimestamp last_modified)445 void CPSGS_Reply::PrepareBlobMessage(size_t                   item_id,
446                                      const string &           processor_id,
447                                      const string &           blob_id,
448                                      const string &           msg,
449                                      CRequestStatus::ECode    status,
450                                      int                      err_code,
451                                      EDiagSev                 severity,
452                                      CBlobRecord::TTimestamp  last_modified)
453 {
454     string      header = GetBlobMessageHeader(item_id, processor_id,
455                                               blob_id, msg.size(),
456                                               status, err_code, severity,
457                                               last_modified);
458     while (m_ChunksLock.exchange(true)) {}
459     m_Chunks.push_back(m_Reply->PrepareChunk(
460                 (const unsigned char *)(header.data()), header.size()));
461     m_Chunks.push_back(m_Reply->PrepareChunk(
462                 (const unsigned char *)(msg.data()), msg.size()));
463     ++m_TotalSentReplyChunks;
464     m_ChunksLock = false;
465 }
466 
467 
PrepareBlobMessage(CCassBlobFetch * fetch_details,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity,CBlobRecord::TTimestamp last_modified)468 void CPSGS_Reply::PrepareBlobMessage(CCassBlobFetch *         fetch_details,
469                                      const string &           processor_id,
470                                      const string &           msg,
471                                      CRequestStatus::ECode    status,
472                                      int                      err_code,
473                                      EDiagSev                 severity,
474                                      CBlobRecord::TTimestamp  last_modified)
475 {
476     PrepareBlobMessage(fetch_details->GetBlobChunkItemId(this),
477                        processor_id,
478                        fetch_details->GetBlobId().ToString(),
479                        msg, status, err_code, severity, last_modified);
480     fetch_details->IncrementTotalSentBlobChunks();
481 }
482 
483 
x_PrepareTSEBlobMessage(size_t item_id,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)484 void CPSGS_Reply::x_PrepareTSEBlobMessage(size_t  item_id,
485                                           const string &  processor_id,
486                                           int64_t  id2_chunk,
487                                           const string &  id2_info,
488                                           const string &  msg,
489                                           CRequestStatus::ECode  status,
490                                           int  err_code,
491                                           EDiagSev  severity)
492 {
493     string      header = GetTSEBlobMessageHeader(item_id, processor_id,
494                                                  id2_chunk, id2_info, msg.size(),
495                                                  status, err_code, severity);
496     while (m_ChunksLock.exchange(true)) {}
497     m_Chunks.push_back(m_Reply->PrepareChunk(
498                 (const unsigned char *)(header.data()), header.size()));
499     m_Chunks.push_back(m_Reply->PrepareChunk(
500                 (const unsigned char *)(msg.data()), msg.size()));
501     ++m_TotalSentReplyChunks;
502     m_ChunksLock = false;
503 }
504 
505 
PrepareTSEBlobMessage(CCassBlobFetch * fetch_details,const string & processor_id,int64_t id2_chunk,const string & id2_info,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)506 void CPSGS_Reply::PrepareTSEBlobMessage(CCassBlobFetch *  fetch_details,
507                                         const string &  processor_id,
508                                         int64_t  id2_chunk,
509                                         const string &  id2_info,
510                                         const string &  msg,
511                                         CRequestStatus::ECode  status, int  err_code,
512                                         EDiagSev  severity)
513 {
514     x_PrepareTSEBlobMessage(fetch_details->GetBlobChunkItemId(this),
515                             processor_id, id2_chunk, id2_info,
516                             msg, status, err_code, severity);
517     fetch_details->IncrementTotalSentBlobChunks();
518 }
519 
520 
PrepareBlobCompletion(size_t item_id,const string & processor_id,size_t chunk_count)521 void CPSGS_Reply::PrepareBlobCompletion(size_t                   item_id,
522                                         const string &           processor_id,
523                                         size_t                   chunk_count)
524 {
525     string completion = GetBlobCompletionHeader(item_id, processor_id,
526                                                 chunk_count);
527     while (m_ChunksLock.exchange(true)) {}
528     m_Chunks.push_back(m_Reply->PrepareChunk(
529                     (const unsigned char *)(completion.data()),
530                     completion.size()));
531     ++m_TotalSentReplyChunks;
532     m_ChunksLock = false;
533 }
534 
535 
PrepareTSEBlobCompletion(CCassBlobFetch * fetch_details,const string & processor_id)536 void CPSGS_Reply::PrepareTSEBlobCompletion(CCassBlobFetch *  fetch_details,
537                                            const string &  processor_id)
538 {
539     // +1 is for the completion itself
540     PrepareTSEBlobCompletion(fetch_details->GetBlobChunkItemId(this),
541                              processor_id,
542                              fetch_details->GetTotalSentBlobChunks() + 1);
543     fetch_details->IncrementTotalSentBlobChunks();
544 }
545 
546 
PrepareTSEBlobCompletion(size_t item_id,const string & processor_id,size_t chunk_count)547 void CPSGS_Reply::PrepareTSEBlobCompletion(size_t  item_id,
548                                            const string &  processor_id,
549                                            size_t  chunk_count)
550 {
551     string completion = GetTSEBlobCompletionHeader(item_id, processor_id,
552                                                    chunk_count);
553     while (m_ChunksLock.exchange(true)) {}
554     m_Chunks.push_back(m_Reply->PrepareChunk(
555                     (const unsigned char *)(completion.data()),
556                     completion.size()));
557     ++m_TotalSentReplyChunks;
558     m_ChunksLock = false;
559 }
560 
561 
PrepareBlobExcluded(const string & blob_id,const string & processor_id,EPSGS_BlobSkipReason skip_reason,CBlobRecord::TTimestamp last_modified)562 void CPSGS_Reply::PrepareBlobExcluded(const string &           blob_id,
563                                       const string &           processor_id,
564                                       EPSGS_BlobSkipReason     skip_reason,
565                                       CBlobRecord::TTimestamp  last_modified)
566 {
567     string  exclude = GetBlobExcludeHeader(GetItemId(), processor_id,
568                                            blob_id, skip_reason, last_modified);
569     while (m_ChunksLock.exchange(true)) {}
570     m_Chunks.push_back(m_Reply->PrepareChunk(
571                     (const unsigned char *)(exclude.data()),
572                     exclude.size()));
573     ++m_TotalSentReplyChunks;
574     m_ChunksLock = false;
575 }
576 
577 
PrepareTSEBlobExcluded(const string & processor_id,int64_t id2_chunk,const string & id2_info,EPSGS_BlobSkipReason skip_reason)578 void CPSGS_Reply::PrepareTSEBlobExcluded(const string &  processor_id,
579                                          int64_t  id2_chunk,
580                                          const string &  id2_info,
581                                          EPSGS_BlobSkipReason  skip_reason)
582 {
583     string  exclude = GetTSEBlobExcludeHeader(GetItemId(), processor_id,
584                                               id2_chunk, id2_info, skip_reason);
585     while (m_ChunksLock.exchange(true)) {}
586     m_Chunks.push_back(m_Reply->PrepareChunk(
587                     (const unsigned char *)(exclude.data()),
588                     exclude.size()));
589     ++m_TotalSentReplyChunks;
590     m_ChunksLock = false;
591 }
592 
593 
PrepareBlobExcluded(size_t item_id,const string & processor_id,const string & blob_id,EPSGS_BlobSkipReason skip_reason)594 void CPSGS_Reply::PrepareBlobExcluded(size_t                item_id,
595                                       const string &        processor_id,
596                                       const string &        blob_id,
597                                       EPSGS_BlobSkipReason  skip_reason)
598 {
599     string  exclude = GetBlobExcludeHeader(item_id, processor_id,
600                                            blob_id, skip_reason);
601     while (m_ChunksLock.exchange(true)) {}
602     m_Chunks.push_back(m_Reply->PrepareChunk(
603                     (const unsigned char *)(exclude.data()),
604                     exclude.size()));
605     ++m_TotalSentReplyChunks;
606     m_ChunksLock = false;
607 }
608 
609 
PrepareBlobCompletion(CCassBlobFetch * fetch_details,const string & processor_id)610 void CPSGS_Reply::PrepareBlobCompletion(CCassBlobFetch *  fetch_details,
611                                         const string &    processor_id)
612 {
613     // +1 is for the completion itself
614     PrepareBlobCompletion(fetch_details->GetBlobChunkItemId(this),
615                           processor_id,
616                           fetch_details->GetTotalSentBlobChunks() + 1);
617     fetch_details->IncrementTotalSentBlobChunks();
618 }
619 
620 
PrepareReplyMessage(const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)621 void CPSGS_Reply::PrepareReplyMessage(const string &         msg,
622                                       CRequestStatus::ECode  status,
623                                       int                    err_code,
624                                       EDiagSev               severity)
625 {
626     string      header = GetReplyMessageHeader(msg.size(),
627                                                status, err_code, severity);
628     while (m_ChunksLock.exchange(true)) {}
629     m_Chunks.push_back(m_Reply->PrepareChunk(
630                 (const unsigned char *)(header.data()), header.size()));
631     m_Chunks.push_back(m_Reply->PrepareChunk(
632                 (const unsigned char *)(msg.data()), msg.size()));
633     ++m_TotalSentReplyChunks;
634     m_ChunksLock = false;
635 }
636 
637 
PrepareProcessorMessage(size_t item_id,const string & processor_id,const string & msg,CRequestStatus::ECode status,int err_code,EDiagSev severity)638 void CPSGS_Reply::PrepareProcessorMessage(size_t                 item_id,
639                                           const string &         processor_id,
640                                           const string &         msg,
641                                           CRequestStatus::ECode  status,
642                                           int                    err_code,
643                                           EDiagSev               severity)
644 {
645     string      header = GetProcessorMessageHeader(item_id, processor_id,
646                                                    msg.size(),
647                                                    status, err_code, severity);
648     string      completion = GetProcessorMessageCompletionHeader(item_id,
649                                                                  processor_id,
650                                                                  2);
651     while (m_ChunksLock.exchange(true)) {}
652     m_Chunks.push_back(m_Reply->PrepareChunk(
653                 (const unsigned char *)(header.data()), header.size()));
654     m_Chunks.push_back(m_Reply->PrepareChunk(
655                 (const unsigned char *)(msg.data()), msg.size()));
656     ++m_TotalSentReplyChunks;
657 
658     m_Chunks.push_back(m_Reply->PrepareChunk(
659                 (const unsigned char *)(completion.data()), completion.size()));
660     ++m_TotalSentReplyChunks;
661     m_ChunksLock = false;
662 }
663 
664 
PreparePublicComment(const string & processor_id,const string & public_comment,const string & blob_id,CBlobRecord::TTimestamp last_modified)665 void CPSGS_Reply::PreparePublicComment(const string &  processor_id,
666                                        const string &  public_comment,
667                                        const string &  blob_id,
668                                        CBlobRecord::TTimestamp  last_modified)
669 {
670     auto        item_id = GetItemId();
671     string      header = GetPublicCommentHeader(item_id, processor_id, blob_id,
672                                                 last_modified, public_comment.size());
673     string      completion = GetPublicCommentCompletionHeader(item_id,
674                                                               processor_id,
675                                                               2);
676     while (m_ChunksLock.exchange(true)) {}
677     m_Chunks.push_back(m_Reply->PrepareChunk(
678                 (const unsigned char *)(header.data()), header.size()));
679     m_Chunks.push_back(m_Reply->PrepareChunk(
680                 (const unsigned char *)(public_comment.data()), public_comment.size()));
681     ++m_TotalSentReplyChunks;
682 
683     m_Chunks.push_back(m_Reply->PrepareChunk(
684                 (const unsigned char *)(completion.data()), completion.size()));
685     ++m_TotalSentReplyChunks;
686     m_ChunksLock = false;
687 }
688 
689 
PreparePublicComment(const string & processor_id,const string & public_comment,int64_t id2_chunk,const string & id2_info)690 void CPSGS_Reply::PreparePublicComment(const string &  processor_id,
691                                        const string &  public_comment,
692                                        int64_t  id2_chunk,
693                                        const string &  id2_info)
694 {
695     auto        item_id = GetItemId();
696     string      header = GetPublicCommentHeader(item_id, processor_id, id2_chunk,
697                                                 id2_info, public_comment.size());
698     string      completion = GetPublicCommentCompletionHeader(item_id,
699                                                               processor_id,
700                                                               2);
701     while (m_ChunksLock.exchange(true)) {}
702     m_Chunks.push_back(m_Reply->PrepareChunk(
703                 (const unsigned char *)(header.data()), header.size()));
704     m_Chunks.push_back(m_Reply->PrepareChunk(
705                 (const unsigned char *)(public_comment.data()), public_comment.size()));
706     ++m_TotalSentReplyChunks;
707 
708     m_Chunks.push_back(m_Reply->PrepareChunk(
709                 (const unsigned char *)(completion.data()), completion.size()));
710     ++m_TotalSentReplyChunks;
711     m_ChunksLock = false;
712 }
713 
714 
PrepareNamedAnnotationData(const string & annot_name,const string & processor_id,const string & content)715 void CPSGS_Reply::PrepareNamedAnnotationData(const string &  annot_name,
716                                              const string &  processor_id,
717                                              const string &  content)
718 {
719     size_t      item_id = GetItemId();
720     string      header = GetNamedAnnotationHeader(item_id, processor_id,
721                                                   annot_name, content.size());
722     // There are always 2 chunks
723     string      bioseq_na_meta = GetNamedAnnotationCompletionHeader(item_id,
724                                                                     processor_id,
725                                                                     2);
726     while (m_ChunksLock.exchange(true)) {}
727     m_Chunks.push_back(m_Reply->PrepareChunk(
728                 (const unsigned char *)(header.data()), header.size()));
729     m_Chunks.push_back(m_Reply->PrepareChunk(
730                 (const unsigned char *)(content.data()), content.size()));
731     ++m_TotalSentReplyChunks;
732 
733     m_Chunks.push_back(m_Reply->PrepareChunk(
734                 (const unsigned char *)(bioseq_na_meta.data()),
735                 bioseq_na_meta.size()));
736     ++m_TotalSentReplyChunks;
737     m_ChunksLock = false;
738 }
739 
740 
PrepareReplyCompletion(void)741 void CPSGS_Reply::PrepareReplyCompletion(void)
742 {
743     while (m_ChunksLock.exchange(true)) {}
744     ++m_TotalSentReplyChunks;
745 
746     string  reply_completion = GetReplyCompletionHeader(m_TotalSentReplyChunks);
747     m_Chunks.push_back(m_Reply->PrepareChunk(
748                 (const unsigned char *)(reply_completion.data()),
749                 reply_completion.size()));
750     m_ChunksLock = false;
751 }
752 
753 
SendTrace(const string & msg,const TPSGS_HighResolutionTimePoint & create_timestamp)754 void CPSGS_Reply::SendTrace(const string &  msg,
755                             const TPSGS_HighResolutionTimePoint &  create_timestamp)
756 {
757     auto            now = chrono::high_resolution_clock::now();
758     uint64_t        mks = chrono::duration_cast<chrono::microseconds>
759                                             (now - create_timestamp).count();
760     string          timestamp = "Timestamp (mks): " + to_string(mks) + "\n";
761 
762     PrepareReplyMessage(timestamp + msg,
763                         CRequestStatus::e200_Ok, 0, eDiag_Trace);
764 }
765 
766 
SendData(const string & data_to_send,EPSGS_ReplyMimeType mime_type)767 void CPSGS_Reply::SendData(const string &  data_to_send,
768                            EPSGS_ReplyMimeType  mime_type)
769 {
770     m_Reply->SetContentType(mime_type);
771     m_Reply->SetContentLength(data_to_send.length());
772     m_Reply->SendOk(data_to_send.data(), data_to_send.length(), false);
773 }
774 
775