1 /*  $Id: pubseq_gateway_handlers.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko
27  *
28  * File Description:
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include <corelib/ncbithr.hpp>
34 #include <corelib/ncbidiag.hpp>
35 #include <corelib/request_ctx.hpp>
36 
37 #include <objtools/pubseq_gateway/impl/cassandra/blob_storage.hpp>
38 
39 #include "pubseq_gateway.hpp"
40 #include "pubseq_gateway_exception.hpp"
41 #include "resolve_processor.hpp"
42 
43 #include "shutdown_data.hpp"
44 extern SShutdownData    g_ShutdownData;
45 
46 
47 USING_NCBI_SCOPE;
48 
49 
50 static string  kFmtParam = "fmt";
51 static string  kBlobIdParam = "blob_id";
52 static string  kLastModifiedParam = "last_modified";
53 static string  kTSELastModifiedParam = "tse_last_modified";
54 static string  kSeqIdParam = "seq_id";
55 static string  kSeqIdTypeParam = "seq_id_type";
56 static string  kTSEParam = "tse";
57 static string  kUseCacheParam = "use_cache";
58 static string  kNamesParam = "names";
59 static string  kExcludeBlobsParam = "exclude_blobs";
60 static string  kClientIdParam = "client_id";
61 static string  kAuthTokenParam = "auth_token";
62 static string  kTimeoutParam = "timeout";
63 static string  kDataSizeParam = "return_data_size";
64 static string  kLogParam = "log";
65 static string  kUsernameParam = "username";
66 static string  kAlertParam = "alert";
67 static string  kResetParam = "reset";
68 static string  kTSEIdParam = "tse_id";
69 static string  kId2ChunkParam = "id2_chunk";
70 static string  kId2InfoParam = "id2_info";
71 static string  kAccSubstitutionParam = "acc_substitution";
72 static string  kAutoBlobSkippingParam = "auto_blob_skipping";
73 static string  kTraceParam = "trace";
74 static string  kMostRecentTimeParam = "most_recent_time";
75 static string  kMostAncientTimeParam = "most_ancient_time";
76 static string  kHistogramNamesParam = "histogram_names";
77 static string  kNA = "n/a";
78 
79 static vector<pair<string, SPSGS_ResolveRequest::EPSGS_BioseqIncludeData>>
80     kResolveFlagParams =
81 {
82     make_pair("all_info", SPSGS_ResolveRequest::fPSGS_AllBioseqFields),   // must be first
83 
84     make_pair("canon_id", SPSGS_ResolveRequest::fPSGS_CanonicalId),
85     make_pair("seq_ids", SPSGS_ResolveRequest::fPSGS_SeqIds),
86     make_pair("mol_type", SPSGS_ResolveRequest::fPSGS_MoleculeType),
87     make_pair("length", SPSGS_ResolveRequest::fPSGS_Length),
88     make_pair("state", SPSGS_ResolveRequest::fPSGS_State),
89     make_pair("blob_id", SPSGS_ResolveRequest::fPSGS_BlobId),
90     make_pair("tax_id", SPSGS_ResolveRequest::fPSGS_TaxId),
91     make_pair("hash", SPSGS_ResolveRequest::fPSGS_Hash),
92     make_pair("date_changed", SPSGS_ResolveRequest::fPSGS_DateChanged),
93     make_pair("gi", SPSGS_ResolveRequest::fPSGS_Gi),
94     make_pair("name", SPSGS_ResolveRequest::fPSGS_Name),
95     make_pair("seq_state", SPSGS_ResolveRequest::fPSGS_SeqState)
96 };
97 static string  kBadUrlMessage = "Unknown request, the provided URL "
98                                 "is not recognized: ";
99 
100 
OnBadURL(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)101 int CPubseqGatewayApp::OnBadURL(CHttpRequest &  req,
102                                 shared_ptr<CPSGS_Reply>  reply)
103 {
104     CRequestContextResetter context_resetter;
105     CRef<CRequestContext>   context = x_CreateRequestContext(req);
106 
107     if (x_IsShuttingDown(reply)) {
108         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
109         return 0;
110     }
111 
112     auto    http_reply = reply->GetHttpReply();
113     if (req.GetPath() == "/") {
114         // Special case: no path at all so provide a help message
115         try {
116             http_reply->SetContentType(ePSGS_JsonMime);
117             http_reply->SetContentLength(m_HelpMessage.length());
118             http_reply->SendOk(m_HelpMessage.c_str(), m_HelpMessage.length(), false);
119             x_PrintRequestStop(context, CRequestStatus::e200_Ok);
120         } catch (const exception &  exc) {
121             string      msg = "Exception when handling no path URL event: " +
122                               string(exc.what());
123             x_SendMessageAndCompletionChunks(reply, msg,
124                                              CRequestStatus::e500_InternalServerError,
125                                              ePSGS_BadURL, eDiag_Error);
126             PSG_ERROR(msg);
127             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
128         } catch (...) {
129             string      msg = "Unknown exception when handling no path URL event";
130             x_SendMessageAndCompletionChunks(reply, msg,
131                                              CRequestStatus::e500_InternalServerError,
132                                              ePSGS_BadURL, eDiag_Error);
133             PSG_ERROR(msg);
134             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
135         }
136     } else {
137         try {
138             m_Counters.Increment(CPSGSCounters::ePSGS_BadUrlPath);
139 
140             string      bad_url = req.GetPath();
141             x_SendMessageAndCompletionChunks(reply, kBadUrlMessage + bad_url,
142                                              CRequestStatus::e400_BadRequest,
143                                              ePSGS_BadURL,
144                                              eDiag_Error);
145             auto    bad_url_size = bad_url.size();
146             if (bad_url_size > 128) {
147                 bad_url.resize(128);
148                 bad_url += "... (truncated; original length: " +
149                            to_string(bad_url_size) + ")";
150             }
151 
152             PSG_WARNING(kBadUrlMessage + bad_url);
153             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
154         } catch (const exception &  exc) {
155             string      msg = "Exception when handling a bad URL event: " +
156                               string(exc.what());
157             x_SendMessageAndCompletionChunks(reply, msg,
158                                              CRequestStatus::e500_InternalServerError,
159                                              ePSGS_BadURL, eDiag_Error);
160             PSG_ERROR(msg);
161             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
162         } catch (...) {
163             string  msg = "Unknown exception when handling a bad URL event";
164             x_SendMessageAndCompletionChunks(reply, msg,
165                                              CRequestStatus::e500_InternalServerError,
166                                              ePSGS_BadURL, eDiag_Error);
167             PSG_ERROR(msg);
168             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
169         }
170     }
171     return 0;
172 }
173 
174 
OnGet(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)175 int CPubseqGatewayApp::OnGet(CHttpRequest &  req,
176                              shared_ptr<CPSGS_Reply>  reply)
177 {
178     auto                    now = chrono::high_resolution_clock::now();
179     CRequestContextResetter context_resetter;
180     CRef<CRequestContext>   context = x_CreateRequestContext(req);
181 
182     if (x_IsShuttingDown(reply)) {
183         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
184         return 0;
185     }
186 
187     int     hops;
188     if (!x_GetHops(req, reply, hops)) {
189         x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
190         return 0;
191     }
192 
193     try {
194         CTempString                             seq_id;
195         int                                     seq_id_type;
196         SPSGS_RequestBase::EPSGS_CacheAndDbUse  use_cache = SPSGS_RequestBase::ePSGS_CacheAndDb;
197 
198         if (!x_ProcessCommonGetAndResolveParams(req, reply, seq_id,
199                                                 seq_id_type, use_cache)) {
200             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
201             return 0;
202         }
203 
204         string                                  err_msg;
205         SPSGS_BlobRequestBase::EPSGS_TSEOption  tse_option = SPSGS_BlobRequestBase::ePSGS_OrigTSE;
206         SRequestParameter                       tse_param = x_GetParam(req, kTSEParam);
207         if (tse_param.m_Found) {
208             tse_option = x_GetTSEOption(kTSEParam, tse_param.m_Value, err_msg);
209             if (tse_option == SPSGS_BlobRequestBase::ePSGS_UnknownTSE) {
210                 x_MalformedArguments(reply, context, err_msg);
211                 return 0;
212             }
213         }
214 
215         vector<string>      exclude_blobs;
216         SRequestParameter   exclude_blobs_param = x_GetParam(req,
217                                                          kExcludeBlobsParam);
218         if (exclude_blobs_param.m_Found) {
219             exclude_blobs = x_GetExcludeBlobs(kExcludeBlobsParam,
220                                               exclude_blobs_param.m_Value);
221         }
222 
223         SPSGS_RequestBase::EPSGS_AccSubstitutioOption
224                                 subst_option = SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
225         SRequestParameter       subst_param = x_GetParam(req, kAccSubstitutionParam);
226         if (subst_param.m_Found) {
227             subst_option = x_GetAccessionSubstitutionOption(kAccSubstitutionParam,
228                                                             subst_param.m_Value,
229                                                             err_msg);
230             if (subst_option == SPSGS_RequestBase::ePSGS_UnknownAccSubstitution) {
231                 x_MalformedArguments(reply, context, err_msg);
232                 return 0;
233             }
234         }
235 
236         SRequestParameter   client_id_param = x_GetParam(req, kClientIdParam);
237 
238         SPSGS_RequestBase::EPSGS_Trace  trace = SPSGS_RequestBase::ePSGS_NoTracing;
239         if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
240             x_SendMessageAndCompletionChunks(reply, err_msg,
241                                              CRequestStatus::e400_BadRequest,
242                                              ePSGS_MalformedParameter, eDiag_Error);
243             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
244             return 0;
245         }
246 
247         bool                auto_blob_skipping = true;  // default
248         SRequestParameter   auto_blob_skipping_param = x_GetParam(req, kAutoBlobSkippingParam);
249         if (auto_blob_skipping_param.m_Found) {
250             if (!x_IsBoolParamValid(kAutoBlobSkippingParam,
251                                     auto_blob_skipping_param.m_Value,
252                                     err_msg)) {
253                 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
254                 x_SendMessageAndCompletionChunks(reply, err_msg,
255                                                  CRequestStatus::e400_BadRequest,
256                                                  ePSGS_MalformedParameter,
257                                                  eDiag_Error);
258                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
259                 return 0;
260             }
261             auto_blob_skipping = auto_blob_skipping_param.m_Value == "yes";
262         }
263 
264         vector<string>      enabled_processors;
265         vector<string>      disabled_processors;
266         if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
267                                                disabled_processors))
268             return 0;
269 
270         m_Counters.Increment(CPSGSCounters::ePSGS_GetBlobBySeqIdRequest);
271         unique_ptr<SPSGS_RequestBase>
272             req(new SPSGS_BlobBySeqIdRequest(
273                         string(seq_id.data(), seq_id.size()),
274                         seq_id_type, exclude_blobs,
275                         tse_option, use_cache, subst_option,
276                         auto_blob_skipping,
277                         string(client_id_param.m_Value.data(),
278                                client_id_param.m_Value.size()),
279                         hops, trace, enabled_processors, disabled_processors,
280                         now));
281         shared_ptr<CPSGS_Request>
282             request(new CPSGS_Request(move(req), context));
283 
284         x_DispatchRequest(request, reply);
285     } catch (const exception &  exc) {
286         string      msg = "Exception when handling a get request: " +
287                           string(exc.what());
288         x_SendMessageAndCompletionChunks(reply, msg,
289                                          CRequestStatus::e500_InternalServerError,
290                                          ePSGS_UnknownError, eDiag_Error);
291         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
292     } catch (...) {
293         string      msg = "Unknown exception when handling a get request";
294         x_SendMessageAndCompletionChunks(reply, msg,
295                                          CRequestStatus::e500_InternalServerError,
296                                          ePSGS_UnknownError, eDiag_Error);
297         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
298     }
299     return 0;
300 }
301 
302 
OnGetBlob(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)303 int CPubseqGatewayApp::OnGetBlob(CHttpRequest &  req,
304                                  shared_ptr<CPSGS_Reply>  reply)
305 {
306     auto                    now = chrono::high_resolution_clock::now();
307     CRequestContextResetter context_resetter;
308     CRef<CRequestContext>   context = x_CreateRequestContext(req);
309 
310     if (x_IsShuttingDown(reply)) {
311         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
312         return 0;
313     }
314 
315     int     hops;
316     if (!x_GetHops(req, reply, hops)) {
317         x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
318         return 0;
319     }
320 
321     try {
322         string              err_msg;
323         SPSGS_BlobRequestBase::EPSGS_TSEOption
324                             tse_option = SPSGS_BlobRequestBase::ePSGS_OrigTSE;
325         SRequestParameter   tse_param = x_GetParam(req, kTSEParam);
326         if (tse_param.m_Found) {
327             tse_option = x_GetTSEOption(kTSEParam, tse_param.m_Value, err_msg);
328             if (tse_option == SPSGS_BlobRequestBase::ePSGS_UnknownTSE) {
329                 x_MalformedArguments(reply, context, err_msg);
330                 return 0;
331             }
332         }
333 
334         SRequestParameter   last_modified_param = x_GetParam(req, kLastModifiedParam);
335         int64_t             last_modified_value = INT64_MIN;
336         if (last_modified_param.m_Found) {
337             try {
338                 last_modified_value = NStr::StringToLong(
339                                                 last_modified_param.m_Value);
340             } catch (...) {
341                 x_MalformedArguments(reply, context,
342                                      "Malformed '" + kLastModifiedParam +
343                                      "' parameter. Expected an integer");
344                 return 0;
345             }
346         }
347 
348         SPSGS_RequestBase::EPSGS_CacheAndDbUse  use_cache = x_GetUseCacheParameter(req, err_msg);
349         if (!err_msg.empty()) {
350             x_MalformedArguments(reply, context, err_msg);
351             return 0;
352         }
353 
354         SPSGS_RequestBase::EPSGS_Trace  trace = SPSGS_RequestBase::ePSGS_NoTracing;
355         if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
356             x_SendMessageAndCompletionChunks(reply, err_msg,
357                                              CRequestStatus::e400_BadRequest,
358                                              ePSGS_MalformedParameter, eDiag_Error);
359             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
360             return 0;
361         }
362 
363         SRequestParameter   blob_id_param = x_GetParam(req, kBlobIdParam);
364         if (blob_id_param.m_Found)
365         {
366             SPSGS_BlobId    blob_id(blob_id_param.m_Value);
367 
368             if (blob_id.GetId().empty()) {
369                 x_MalformedArguments(reply, context,
370                                      "The '" + kBlobIdParam +
371                                      "' parameter value has not been supplied");
372                 return 0;
373             }
374 
375             SRequestParameter   client_id_param = x_GetParam(req, kClientIdParam);
376 
377             vector<string>      enabled_processors;
378             vector<string>      disabled_processors;
379             if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
380                                                    disabled_processors))
381                 return 0;
382 
383             m_Counters.Increment(CPSGSCounters::ePSGS_GetBlobBySatSatKeyRequest);
384             unique_ptr<SPSGS_RequestBase>
385                     req(new SPSGS_BlobBySatSatKeyRequest(
386                                 blob_id, last_modified_value,
387                                 tse_option, use_cache,
388                                 string(client_id_param.m_Value.data(),
389                                        client_id_param.m_Value.size()),
390                                 hops, trace,
391                                 enabled_processors, disabled_processors, now));
392             shared_ptr<CPSGS_Request>
393                     request(new CPSGS_Request(move(req), context));
394 
395             x_DispatchRequest(request, reply);
396             return 0;
397         }
398 
399         x_InsufficientArguments(reply, context, "Mandatory parameter "
400                                 "'" + kBlobIdParam + "' is not found.");
401     } catch (const exception &  exc) {
402         string      msg = "Exception when handling a getblob request: " +
403                           string(exc.what());
404         x_SendMessageAndCompletionChunks(reply, msg,
405                                          CRequestStatus::e500_InternalServerError,
406                                          ePSGS_UnknownError, eDiag_Error);
407         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
408     } catch (...) {
409         string      msg = "Unknown exception when handling a getblob request";
410         x_SendMessageAndCompletionChunks(reply, msg,
411                                          CRequestStatus::e500_InternalServerError,
412                                          ePSGS_UnknownError, eDiag_Error);
413         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
414     }
415     return 0;
416 }
417 
418 
OnResolve(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)419 int CPubseqGatewayApp::OnResolve(CHttpRequest &  req,
420                                  shared_ptr<CPSGS_Reply>  reply)
421 {
422     auto                    now = chrono::high_resolution_clock::now();
423     CRequestContextResetter context_resetter;
424     CRef<CRequestContext>   context = x_CreateRequestContext(req);
425 
426     if (x_IsShuttingDown(reply)) {
427         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
428         return 0;
429     }
430 
431     int     hops;
432     if (!x_GetHops(req, reply, hops)) {
433         x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
434         return 0;
435     }
436 
437     try {
438         CTempString                             seq_id;
439         int                                     seq_id_type;
440         SPSGS_RequestBase::EPSGS_CacheAndDbUse  use_cache = SPSGS_RequestBase::ePSGS_CacheAndDb;
441 
442         if (!x_ProcessCommonGetAndResolveParams(req, reply, seq_id,
443                                                 seq_id_type, use_cache)) {
444             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
445             return 0;
446         }
447 
448         string              err_msg;
449         SPSGS_ResolveRequest::EPSGS_OutputFormat
450                             output_format = SPSGS_ResolveRequest::ePSGS_NativeFormat;
451         SRequestParameter   fmt_param = x_GetParam(req, kFmtParam);
452         if (fmt_param.m_Found) {
453             output_format = x_GetOutputFormat(kFmtParam, fmt_param.m_Value, err_msg);
454             if (output_format == SPSGS_ResolveRequest::ePSGS_UnknownFormat) {
455                 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
456                 x_SendMessageAndCompletionChunks(reply, err_msg,
457                                                  CRequestStatus::e400_BadRequest,
458                                                  ePSGS_MalformedParameter,
459                                                  eDiag_Error);
460                 PSG_WARNING(err_msg);
461                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
462                 return 0;
463             }
464         }
465 
466         SPSGS_ResolveRequest::TPSGS_BioseqIncludeData
467                                 include_data_flags = 0;
468         SRequestParameter       request_param;
469         for (const auto &  flag_param: kResolveFlagParams) {
470             request_param = x_GetParam(req, flag_param.first);
471             if (request_param.m_Found) {
472                 if (!x_IsBoolParamValid(flag_param.first,
473                                         request_param.m_Value, err_msg)) {
474                     m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
475                     x_SendMessageAndCompletionChunks(reply, err_msg,
476                                                      CRequestStatus::e400_BadRequest,
477                                                      ePSGS_MalformedParameter,
478                                                      eDiag_Error);
479                     PSG_WARNING(err_msg);
480                     x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
481                     return 0;
482                 }
483                 if (request_param.m_Value == "yes") {
484                     include_data_flags |= flag_param.second;
485                 } else {
486                     include_data_flags &= ~flag_param.second;
487                 }
488             }
489         }
490 
491         SPSGS_RequestBase::EPSGS_AccSubstitutioOption
492                                 subst_option = SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
493         SRequestParameter       subst_param = x_GetParam(req, kAccSubstitutionParam);
494         if (subst_param.m_Found) {
495             subst_option = x_GetAccessionSubstitutionOption(kAccSubstitutionParam,
496                                                             subst_param.m_Value,
497                                                             err_msg);
498             if (subst_option == SPSGS_RequestBase::ePSGS_UnknownAccSubstitution) {
499                 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
500                 x_SendMessageAndCompletionChunks(reply, err_msg,
501                                                  CRequestStatus::e400_BadRequest,
502                                                  ePSGS_MalformedParameter,
503                                                  eDiag_Error);
504                 PSG_WARNING(err_msg);
505                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
506                 return 0;
507             }
508         }
509 
510         SPSGS_RequestBase::EPSGS_Trace  trace = SPSGS_RequestBase::ePSGS_NoTracing;
511         if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
512             x_SendMessageAndCompletionChunks(reply, err_msg,
513                                              CRequestStatus::e400_BadRequest,
514                                              ePSGS_MalformedParameter,
515                                              eDiag_Error);
516             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
517             return 0;
518         }
519 
520         vector<string>      enabled_processors;
521         vector<string>      disabled_processors;
522         if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
523                                                disabled_processors))
524             return 0;
525 
526         // Parameters processing has finished
527         m_Counters.Increment(CPSGSCounters::ePSGS_ResolveRequest);
528         unique_ptr<SPSGS_RequestBase>
529             req(new SPSGS_ResolveRequest(
530                         string(seq_id.data(), seq_id.size()),
531                         seq_id_type, include_data_flags, output_format,
532                         use_cache, subst_option, hops, trace,
533                         enabled_processors, disabled_processors, now));
534         shared_ptr<CPSGS_Request>
535             request(new CPSGS_Request(move(req), context));
536 
537         x_DispatchRequest(request, reply);
538     } catch (const exception &  exc) {
539         string      msg = "Exception when handling a resolve request: " +
540                           string(exc.what());
541         x_SendMessageAndCompletionChunks(reply, msg,
542                                          CRequestStatus::e500_InternalServerError,
543                                          ePSGS_UnknownError, eDiag_Error);
544         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
545     } catch (...) {
546         string      msg = "Unknown exception when handling a resolve request";
547         x_SendMessageAndCompletionChunks(reply, msg,
548                                          CRequestStatus::e500_InternalServerError,
549                                          ePSGS_UnknownError, eDiag_Error);
550         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
551     }
552     return 0;
553 }
554 
555 
OnGetTSEChunk(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)556 int CPubseqGatewayApp::OnGetTSEChunk(CHttpRequest &  req,
557                                      shared_ptr<CPSGS_Reply>  reply)
558 {
559     auto                    now = chrono::high_resolution_clock::now();
560     CRequestContextResetter context_resetter;
561     CRef<CRequestContext>   context = x_CreateRequestContext(req);
562 
563     if (x_IsShuttingDown(reply)) {
564         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
565         return 0;
566     }
567 
568     int     hops;
569     if (!x_GetHops(req, reply, hops)) {
570         x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
571         return 0;
572     }
573 
574     try {
575         // Mandatory parameter id2_chunk
576         SRequestParameter   id2_chunk_param = x_GetParam(req, kId2ChunkParam);
577         int64_t             id2_chunk_value = INT64_MIN;
578         if (!id2_chunk_param.m_Found) {
579             x_InsufficientArguments(reply, context, "Mandatory parameter "
580                                     "'" + kId2ChunkParam + "' is not found.");
581             return 0;
582         }
583 
584         try {
585             id2_chunk_value = NStr::StringToLong(id2_chunk_param.m_Value);
586             if (id2_chunk_value < 0) {
587                 x_MalformedArguments(reply, context,
588                                      "Invalid '" + kId2ChunkParam +
589                                      "' parameter. Expected >= 0");
590                 return 0;
591             }
592         } catch (...) {
593             x_MalformedArguments(reply, context,
594                                  "Malformed '" + kId2ChunkParam + "' parameter. "
595                                  "Expected an integer");
596             return 0;
597         }
598 
599         SRequestParameter   id2_info_param = x_GetParam(req, kId2InfoParam);
600         if (!id2_info_param.m_Found)
601         {
602             x_InsufficientArguments(reply, context, "Mandatory parameter '" +
603                                     kId2InfoParam + "' is not found.");
604             return 0;
605         }
606 
607         string                                  err_msg;
608         SPSGS_RequestBase::EPSGS_CacheAndDbUse  use_cache = x_GetUseCacheParameter(req, err_msg);
609         if (!err_msg.empty()) {
610             x_MalformedArguments(reply, context, err_msg);
611             return 0;
612         }
613 
614         SPSGS_RequestBase::EPSGS_Trace  trace = SPSGS_RequestBase::ePSGS_NoTracing;
615         if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
616             x_SendMessageAndCompletionChunks(reply, err_msg,
617                                              CRequestStatus::e400_BadRequest,
618                                              ePSGS_MalformedParameter, eDiag_Error);
619             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
620             return 0;
621         }
622 
623         vector<string>      enabled_processors;
624         vector<string>      disabled_processors;
625         if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
626                                                disabled_processors))
627             return 0;
628 
629         // All parameters are good
630         m_Counters.Increment(CPSGSCounters::ePSGS_GetTSEChunk);
631         unique_ptr<SPSGS_RequestBase>
632             req(new SPSGS_TSEChunkRequest(
633                         id2_chunk_value, id2_info_param.m_Value,
634                         use_cache, hops, trace,
635                         enabled_processors, disabled_processors, now));
636         shared_ptr<CPSGS_Request>
637             request(new CPSGS_Request(move(req), context));
638 
639         x_DispatchRequest(request, reply);
640     } catch (const exception &  exc) {
641         string      msg = "Exception when handling a get_tse_chunk request: " +
642                           string(exc.what());
643         x_SendMessageAndCompletionChunks(reply, msg,
644                                          CRequestStatus::e500_InternalServerError,
645                                          ePSGS_UnknownError, eDiag_Error);
646         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
647     } catch (...) {
648         string      msg = "Unknown exception when handling a get_tse_chunk request";
649         x_SendMessageAndCompletionChunks(reply, msg,
650                                          CRequestStatus::e500_InternalServerError,
651                                          ePSGS_UnknownError, eDiag_Error);
652         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
653     }
654     return 0;
655 }
656 
657 
OnGetNA(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)658 int CPubseqGatewayApp::OnGetNA(CHttpRequest &  req,
659                                shared_ptr<CPSGS_Reply>  reply)
660 {
661     auto                    now = chrono::high_resolution_clock::now();
662     CRequestContextResetter context_resetter;
663     CRef<CRequestContext>   context = x_CreateRequestContext(req);
664 
665     if (x_IsShuttingDown(reply)) {
666         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
667         return 0;
668     }
669 
670     int     hops;
671     if (!x_GetHops(req, reply, hops)) {
672         x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
673         return 0;
674     }
675 
676     try {
677         CTempString                             seq_id;
678         int                                     seq_id_type;
679         SPSGS_RequestBase::EPSGS_CacheAndDbUse  use_cache = SPSGS_RequestBase::ePSGS_CacheAndDb;
680 
681         if (!x_ProcessCommonGetAndResolveParams(req, reply, seq_id,
682                                                 seq_id_type, use_cache)) {
683             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
684             return 0;
685         }
686 
687         // At the moment the only json format is supported (native translates
688         // to json). See CXX-10258
689         string              err_msg;
690         SRequestParameter   fmt_param = x_GetParam(req, kFmtParam);
691         if (fmt_param.m_Found) {
692             auto        output_format = x_GetOutputFormat(kFmtParam,
693                                                           fmt_param.m_Value,
694                                                           err_msg);
695             if (output_format == SPSGS_ResolveRequest::ePSGS_ProtobufFormat ||
696                 output_format == SPSGS_ResolveRequest::ePSGS_UnknownFormat) {
697                 x_MalformedArguments(
698                         reply, context,
699                         "Invalid '" + kFmtParam + "' parameter value. The 'get_na' "
700                         "request supports 'json' and 'native' values");
701                 return 0;
702             }
703         }
704 
705 
706         // Get the annotation names
707         SRequestParameter   names_param = x_GetParam(req, kNamesParam);
708         if (!names_param.m_Found) {
709             x_MalformedArguments(reply, context,
710                                  "The mandatory '" + kNamesParam +
711                                  "' parameter is not found");
712             return 0;
713         }
714 
715         vector<string>          names;
716         NStr::Split(names_param.m_Value, ",", names);
717         if (names.empty()) {
718             x_MalformedArguments(reply, context,
719                                  "Named annotation names are not found in the request");
720             return 0;
721         }
722 
723         SPSGS_RequestBase::EPSGS_Trace  trace = SPSGS_RequestBase::ePSGS_NoTracing;
724         if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
725             x_SendMessageAndCompletionChunks(reply, err_msg,
726                                              CRequestStatus::e400_BadRequest,
727                                              ePSGS_MalformedParameter,
728                                              eDiag_Error);
729             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
730             return 0;
731         }
732 
733         vector<string>      enabled_processors;
734         vector<string>      disabled_processors;
735         if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
736                                                disabled_processors))
737             return 0;
738 
739         SPSGS_BlobRequestBase::EPSGS_TSEOption
740                             tse_option = SPSGS_BlobRequestBase::ePSGS_NoneTSE;
741         SRequestParameter   tse_param = x_GetParam(req, kTSEParam);
742         if (tse_param.m_Found) {
743             tse_option = x_GetTSEOption(kTSEParam, tse_param.m_Value, err_msg);
744             if (tse_option == SPSGS_BlobRequestBase::ePSGS_UnknownTSE) {
745                 x_MalformedArguments(reply, context, err_msg);
746                 return 0;
747             }
748         }
749 
750         // Parameters processing has finished
751         m_Counters.Increment(CPSGSCounters::ePSGS_GetNamedAnnotations);
752         unique_ptr<SPSGS_RequestBase>
753             req(new SPSGS_AnnotRequest(
754                         string(seq_id.data(), seq_id.size()),
755                         seq_id_type, names, use_cache, tse_option,
756                         hops, trace,
757                         enabled_processors, disabled_processors,
758                         now));
759         shared_ptr<CPSGS_Request>
760             request(new CPSGS_Request(move(req), context));
761 
762         x_DispatchRequest(request, reply);
763     } catch (const exception &  exc) {
764         string      msg = "Exception when handling a get_na request: " +
765                           string(exc.what());
766         x_SendMessageAndCompletionChunks(reply, msg,
767                                          CRequestStatus::e500_InternalServerError,
768                                          ePSGS_UnknownError, eDiag_Error);
769         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
770     } catch (...) {
771         string      msg = "Unknown exception when handling a get_na request";
772         x_SendMessageAndCompletionChunks(reply, msg,
773                                          CRequestStatus::e500_InternalServerError,
774                                          ePSGS_UnknownError, eDiag_Error);
775         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
776     }
777     return 0;
778 }
779 
780 
OnHealth(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)781 int CPubseqGatewayApp::OnHealth(CHttpRequest &  req,
782                                 shared_ptr<CPSGS_Reply>  reply)
783 {
784     static string   separator = "==============================================";
785     static string   prefix = "PSG_HEALTH_ERROR: ";
786 
787     m_Counters.Increment(CPSGSCounters::ePSGS_HealthRequest);
788 
789     auto                    now = chrono::high_resolution_clock::now();
790     CRequestContextResetter context_resetter;
791     CRef<CRequestContext>   context = x_CreateRequestContext(req);
792 
793     auto    startup_data_state = GetStartupDataState();
794     if (startup_data_state != ePSGS_StartupDataOK) {
795         // Something is wrong with access to Cassandra
796         // Check if there are any active alerts
797         auto        active_alerts = m_Alerts.SerializeActive();
798         string      msg = separator + "\n" +
799                           prefix + "CASSANDRA" "\n" +
800                           GetCassStartupDataStateMessage(startup_data_state) + "\n" +
801                           separator + "\n" +
802                           prefix + "ALERTS" "\n";
803         if (active_alerts.GetSize() == 0) {
804             msg += "There are no active alerts";
805         } else {
806             msg += "Active alerts are:" "\n" +
807                    active_alerts.Repr(CJsonNode::fStandardJson);
808         }
809 
810         reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
811         reply->GetHttpReply()->Send500(msg.c_str());
812         PSG_WARNING("Cassandra is not available or is in non-working state");
813         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
814         return 0;
815     }
816 
817     if (m_TestSeqId.empty()) {
818         // seq_id for a health test is not configured so skip the test
819         auto    http_reply = reply->GetHttpReply();
820         http_reply->SetContentType(ePSGS_PlainTextMime);
821         http_reply->SetContentLength(0);
822         http_reply->SendOk(nullptr, 0, false);
823         PSG_WARNING("Test seq_id resolution skipped (configured as an empty string)");
824         x_PrintRequestStop(context, CRequestStatus::e200_Ok);
825         return 0;
826     }
827 
828     if (m_Si2csiDbFile.empty() || m_BioseqInfoDbFile.empty()) {
829         // Cache is not configured so skip the test
830         auto    http_reply = reply->GetHttpReply();
831         http_reply->SetContentType(ePSGS_PlainTextMime);
832         http_reply->SetContentLength(0);
833         http_reply->SendOk(nullptr, 0, false);
834         PSG_WARNING("Test seq_id resolution skipped (cache is not configured)");
835         x_PrintRequestStop(context, CRequestStatus::e200_Ok);
836         return 0;
837     }
838 
839     // If cache is configured then we can give it a try to resolve the
840     // configured seq_id in cache
841     try {
842         vector<string>      enabled_processors;
843         vector<string>      disabled_processors;
844 
845         unique_ptr<SPSGS_RequestBase>
846             req(new SPSGS_ResolveRequest(m_TestSeqId, -1,
847                                          SPSGS_ResolveRequest::fPSGS_CanonicalId,
848                                          SPSGS_ResolveRequest::ePSGS_JsonFormat,
849                                          SPSGS_RequestBase::ePSGS_CacheOnly,
850                                          SPSGS_RequestBase::ePSGS_NeverAccSubstitute,
851                                          0, SPSGS_RequestBase::ePSGS_NoTracing,
852                                          enabled_processors, disabled_processors,
853                                          now));
854         shared_ptr<CPSGS_Request>
855             request(new CPSGS_Request(move(req), CRef<CRequestContext>()));
856 
857 
858         CPSGS_ResolveProcessor  resolve_processor(request, reply, 0);
859         auto    resolution = resolve_processor.ResolveTestInputSeqId();
860 
861         if (!resolution.IsValid()) {
862             if (!m_TestSeqIdIgnoreError) {
863                 string  msg = separator + "\n" +
864                               prefix + "RESOLUTION" "\n";
865                 if (resolution.m_Error.HasError()) {
866                     msg += resolution.m_Error.m_ErrorMessage;
867                 } else {
868                     msg += "Cannot resolve '" + m_TestSeqId + "' seq_id";
869                 }
870                 reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
871                 reply->GetHttpReply()->Send500(msg.c_str());
872                 PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId + "'");
873                 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
874                 return 0;
875             }
876             PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId +
877                         "', however the configuration is to ignore test errors");
878         }
879     } catch (const exception &  exc) {
880         if (!m_TestSeqIdIgnoreError) {
881             string  msg = separator + "\n" +
882                           prefix + "RESOLUTION" "\n" +
883                           exc.what();
884             reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
885             reply->GetHttpReply()->Send500(msg.c_str());
886             PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId + "'");
887             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
888             return 0;
889         }
890         PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId +
891                     "', however the configuration is to ignore test errors");
892     } catch (...) {
893         if (!m_TestSeqIdIgnoreError) {
894             string  msg = separator + "\n" +
895                           prefix + "RESOLUTION" "\n"
896                           "Unknown '" + m_TestSeqId + "' resolution error";
897             reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
898             reply->GetHttpReply()->Send500(msg.c_str());
899             PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId + "'");
900             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
901             return 0;
902         }
903         PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId +
904                     "', however the configuration is to ignore test errors");
905     }
906 
907     // Here: all OK or errors are ignored
908     auto    http_reply = reply->GetHttpReply();
909     http_reply->SetContentType(ePSGS_PlainTextMime);
910     http_reply->SetContentLength(0);
911     http_reply->SendOk(nullptr, 0, false);
912     x_PrintRequestStop(context, CRequestStatus::e200_Ok);
913     return 0;
914 }
915 
916 
OnConfig(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)917 int CPubseqGatewayApp::OnConfig(CHttpRequest &  req,
918                                 shared_ptr<CPSGS_Reply>  reply)
919 {
920     static string   kConfigurationFilePath = "ConfigurationFilePath";
921     static string   kConfiguration = "Configuration";
922 
923     // NOTE: expected to work regardless of the shutdown request
924 
925     CRequestContextResetter context_resetter;
926     CRef<CRequestContext>   context = x_CreateRequestContext(req);
927 
928     auto    http_reply = reply->GetHttpReply();
929     try {
930         m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
931 
932         CNcbiOstrstream             conf;
933         CNcbiOstrstreamToString     converter(conf);
934 
935         GetConfig().Write(conf);
936 
937         CJsonNode   conf_info(CJsonNode::NewObjectNode());
938         conf_info.SetString(kConfigurationFilePath, GetConfigPath());
939         conf_info.SetString(kConfiguration, string(converter));
940         string      content = conf_info.Repr(CJsonNode::fStandardJson);
941 
942         http_reply->SetContentType(ePSGS_JsonMime);
943         http_reply->SetContentLength(content.length());
944         http_reply->SendOk(content.c_str(), content.length(), false);
945 
946         x_PrintRequestStop(context, CRequestStatus::e200_Ok);
947     } catch (const exception &  exc) {
948         string      msg = "Exception when handling a config request: " +
949                           string(exc.what());
950         x_SendMessageAndCompletionChunks(reply, msg,
951                                          CRequestStatus::e500_InternalServerError,
952                                          ePSGS_ConfigError, eDiag_Error);
953         PSG_ERROR(msg);
954         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
955     } catch (...) {
956         string      msg = "Unknown exception when handling a config request";
957         x_SendMessageAndCompletionChunks(reply, msg,
958                                          CRequestStatus::e500_InternalServerError,
959                                          ePSGS_ConfigError, eDiag_Error);
960         PSG_ERROR(msg);
961         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
962     }
963     return 0;
964 }
965 
966 
OnInfo(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)967 int CPubseqGatewayApp::OnInfo(CHttpRequest &  req,
968                               shared_ptr<CPSGS_Reply>  reply)
969 {
970     static string   kPID = "PID";
971     static string   kExecutablePath = "ExecutablePath";
972     static string   kCommandLineArguments = "CommandLineArguments";
973     static string   kStartupDataState = "StartupDataState";
974     static string   kRealTime = "RealTime";
975     static string   kUserTime = "UserTime";
976     static string   kSystemTime = "SystemTime";
977     static string   kPhysicalMemory = "PhysicalMemory";
978     static string   kMemoryUsedTotal = "MemoryUsedTotal";
979     static string   kMemoryUsedTotalPeak = "MemoryUsedTotalPeak";
980     static string   kMemoryUsedResident = "MemoryUsedResident";
981     static string   kMemoryUsedResidentPeak = "MemoryUsedResidentPeak";
982     static string   kMemoryUsedShared = "MemoryUsedShared";
983     static string   kMemoryUsedData = "MemoryUsedData";
984     static string   kMemoryUsedStack = "MemoryUsedStack";
985     static string   kMemoryUsedText = "MemoryUsedText";
986     static string   kMemoryUsedLib = "MemoryUsedLib";
987     static string   kMemoryUsedSwap = "MemoryUsedSwap";
988     static string   kProcFDSoftLimit = "ProcFDSoftLimit";
989     static string   kProcFDHardLimit = "ProcFDHardLimit";
990     static string   kProcFDUsed = "ProcFDUsed";
991     static string   kCPUCount = "CPUCount";
992     static string   kProcThreadCount = "ProcThreadCount";
993     static string   kVersion = "Version";
994     static string   kBuildDate = "BuildDate";
995     static string   kStartedAt = "StartedAt";
996     static string   kExcludeBlobCacheUserCount = "ExcludeBlobCacheUserCount";
997 
998     // NOTE: expected to work regardless of the shutdown request
999 
1000     CRequestContextResetter context_resetter;
1001     CRef<CRequestContext>   context = x_CreateRequestContext(req);
1002 
1003     auto                    app = CPubseqGatewayApp::GetInstance();
1004     auto                    http_reply = reply->GetHttpReply();
1005     try {
1006         m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1007 
1008         CJsonNode   info(CJsonNode::NewObjectNode());
1009 
1010         info.SetInteger(kPID, CDiagContext::GetPID());
1011         info.SetString(kExecutablePath, GetProgramExecutablePath());
1012         info.SetString(kCommandLineArguments, x_GetCmdLineArguments());
1013         info.SetString(kStartupDataState,
1014                        GetCassStartupDataStateMessage(app->GetStartupDataState()));
1015 
1016 
1017         double      real_time;
1018         double      user_time;
1019         double      system_time;
1020         bool        process_time_result = CCurrentProcess::GetTimes(&real_time,
1021                                                                     &user_time,
1022                                                                     &system_time);
1023         if (process_time_result) {
1024             info.SetDouble(kRealTime, real_time);
1025             info.SetDouble(kUserTime, user_time);
1026             info.SetDouble(kSystemTime, system_time);
1027         } else {
1028             info.SetString(kRealTime, kNA);
1029             info.SetString(kUserTime, kNA);
1030             info.SetString(kSystemTime, kNA);
1031         }
1032 
1033         Uint8       physical_memory = CSystemInfo::GetTotalPhysicalMemorySize();
1034         if (physical_memory > 0)
1035             info.SetInteger(kPhysicalMemory, physical_memory);
1036         else
1037             info.SetString(kPhysicalMemory, kNA);
1038 
1039         CProcessBase::SMemoryUsage      mem_usage;
1040         bool                            mem_used_result = CCurrentProcess::GetMemoryUsage(mem_usage);
1041         if (mem_used_result) {
1042             if (mem_usage.total > 0)
1043                 info.SetInteger(kMemoryUsedTotal, mem_usage.total);
1044             else
1045                 info.SetString(kMemoryUsedTotal, kNA);
1046 
1047             if (mem_usage.total_peak > 0)
1048                 info.SetInteger(kMemoryUsedTotalPeak, mem_usage.total_peak);
1049             else
1050                 info.SetString(kMemoryUsedTotalPeak, kNA);
1051 
1052             if (mem_usage.resident > 0)
1053                 info.SetInteger(kMemoryUsedResident, mem_usage.resident);
1054             else
1055                 info.SetString(kMemoryUsedResident, kNA);
1056 
1057             if (mem_usage.resident_peak > 0)
1058                 info.SetInteger(kMemoryUsedResidentPeak, mem_usage.resident_peak);
1059             else
1060                 info.SetString(kMemoryUsedResidentPeak, kNA);
1061 
1062             if (mem_usage.shared > 0)
1063                 info.SetInteger(kMemoryUsedShared, mem_usage.shared);
1064             else
1065                 info.SetString(kMemoryUsedShared, kNA);
1066 
1067             if (mem_usage.data > 0)
1068                 info.SetInteger(kMemoryUsedData, mem_usage.data);
1069             else
1070                 info.SetString(kMemoryUsedData, kNA);
1071 
1072             if (mem_usage.stack > 0)
1073                 info.SetInteger(kMemoryUsedStack, mem_usage.stack);
1074             else
1075                 info.SetString(kMemoryUsedStack, kNA);
1076 
1077             if (mem_usage.text > 0)
1078                 info.SetInteger(kMemoryUsedText, mem_usage.text);
1079             else
1080                 info.SetString(kMemoryUsedText, kNA);
1081 
1082             if (mem_usage.lib > 0)
1083                 info.SetInteger(kMemoryUsedLib, mem_usage.lib);
1084             else
1085                 info.SetString(kMemoryUsedLib, kNA);
1086 
1087             if (mem_usage.swap > 0)
1088                 info.SetInteger(kMemoryUsedSwap, mem_usage.swap);
1089             else
1090                 info.SetString(kMemoryUsedSwap, kNA);
1091         } else {
1092             info.SetString(kMemoryUsedTotal, kNA);
1093             info.SetString(kMemoryUsedTotalPeak, kNA);
1094             info.SetString(kMemoryUsedResident, kNA);
1095             info.SetString(kMemoryUsedResidentPeak, kNA);
1096             info.SetString(kMemoryUsedShared, kNA);
1097             info.SetString(kMemoryUsedData, kNA);
1098             info.SetString(kMemoryUsedStack, kNA);
1099             info.SetString(kMemoryUsedText, kNA);
1100             info.SetString(kMemoryUsedLib, kNA);
1101             info.SetString(kMemoryUsedSwap, kNA);
1102         }
1103 
1104         int         proc_fd_soft_limit;
1105         int         proc_fd_hard_limit;
1106         int         proc_fd_used =
1107                 CCurrentProcess::GetFileDescriptorsCount(&proc_fd_soft_limit,
1108                                                          &proc_fd_hard_limit);
1109 
1110         if (proc_fd_soft_limit >= 0)
1111             info.SetInteger(kProcFDSoftLimit, proc_fd_soft_limit);
1112         else
1113             info.SetString(kProcFDSoftLimit, kNA);
1114 
1115         if (proc_fd_hard_limit >= 0)
1116             info.SetInteger(kProcFDHardLimit, proc_fd_hard_limit);
1117         else
1118             info.SetString(kProcFDHardLimit, kNA);
1119 
1120         if (proc_fd_used >= 0)
1121             info.SetInteger(kProcFDUsed, proc_fd_used);
1122         else
1123             info.SetString(kProcFDUsed, kNA);
1124 
1125         info.SetInteger(kCPUCount, CSystemInfo::GetCpuCount());
1126 
1127         int         proc_thread_count = CCurrentProcess::GetThreadCount();
1128         if (proc_thread_count >= 1)
1129             info.SetInteger(kProcThreadCount, proc_thread_count);
1130         else
1131             info.SetString(kProcThreadCount, kNA);
1132 
1133 
1134         info.SetString(kVersion, PUBSEQ_GATEWAY_VERSION);
1135         info.SetString(kBuildDate, PUBSEQ_GATEWAY_BUILD_DATE);
1136         info.SetString(kStartedAt, m_StartTime.AsString());
1137 
1138         info.SetInteger(kExcludeBlobCacheUserCount,
1139                         app->GetExcludeBlobCache()->Size());
1140 
1141         string      content = info.Repr(CJsonNode::fStandardJson);
1142 
1143         http_reply->SetContentType(ePSGS_JsonMime);
1144         http_reply->SetContentLength(content.length());
1145         http_reply->SendOk(content.c_str(), content.length(), false);
1146 
1147         x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1148     } catch (const exception &  exc) {
1149         string      msg = "Exception when handling an info request: " +
1150                           string(exc.what());
1151         x_SendMessageAndCompletionChunks(reply, msg,
1152                                          CRequestStatus::e500_InternalServerError,
1153                                          ePSGS_InfoError, eDiag_Error);
1154         PSG_ERROR(msg);
1155         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1156     } catch (...) {
1157         string      msg = "Unknown exception when handling an info request";
1158         x_SendMessageAndCompletionChunks(reply, msg,
1159                                          CRequestStatus::e500_InternalServerError,
1160                                          ePSGS_InfoError, eDiag_Error);
1161         PSG_ERROR(msg);
1162         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1163     }
1164     return 0;
1165 }
1166 
1167 
OnStatus(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1168 int CPubseqGatewayApp::OnStatus(CHttpRequest &  req,
1169                                 shared_ptr<CPSGS_Reply>  reply)
1170 {
1171     // NOTE: expected to work regardless of the shutdown request
1172 
1173     CRequestContextResetter context_resetter;
1174     CRef<CRequestContext>   context = x_CreateRequestContext(req);
1175 
1176     auto        http_reply = reply->GetHttpReply();
1177     try {
1178         m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1179 
1180         CJsonNode                       status(CJsonNode::NewObjectNode());
1181 
1182         m_Counters.AppendValueNode(
1183             status, CPSGSCounters::ePSGS_CassandraActiveStatements,
1184             static_cast<uint64_t>(m_CassConnection->GetActiveStatements()));
1185         m_Counters.AppendValueNode(
1186             status, CPSGSCounters::ePSGS_NumberOfConnections,
1187             static_cast<uint64_t>(m_TcpDaemon->NumOfConnections()));
1188         m_Counters.AppendValueNode(
1189             status, CPSGSCounters::ePSGS_ActiveRequest,
1190             static_cast<uint64_t>(g_ShutdownData.m_ActiveRequestCount.load()));
1191         m_Counters.AppendValueNode(
1192             status, CPSGSCounters::ePSGS_ShutdownRequested,
1193             g_ShutdownData.m_ShutdownRequested);
1194 
1195         if (g_ShutdownData.m_ShutdownRequested) {
1196             auto        now = chrono::steady_clock::now();
1197             uint64_t    sec = std::chrono::duration_cast<std::chrono::seconds>
1198                                 (g_ShutdownData.m_Expired - now).count();
1199             m_Counters.AppendValueNode(
1200                 status, CPSGSCounters::ePSGS_GracefulShutdownExpiredInSec, sec);
1201         } else {
1202             m_Counters.AppendValueNode(
1203                 status, CPSGSCounters::ePSGS_GracefulShutdownExpiredInSec, kNA);
1204         }
1205 
1206         m_Counters.PopulateDictionary(status);
1207 
1208         string      content = status.Repr(CJsonNode::fStandardJson);
1209 
1210         http_reply->SetContentType(ePSGS_JsonMime);
1211         http_reply->SetContentLength(content.length());
1212         http_reply->SendOk(content.c_str(), content.length(), false);
1213 
1214         x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1215     } catch (const exception &  exc) {
1216         string      msg = "Exception when handling a status request: " +
1217                           string(exc.what());
1218         x_SendMessageAndCompletionChunks(reply, msg,
1219                                          CRequestStatus::e500_InternalServerError,
1220                                          ePSGS_StatusError, eDiag_Error);
1221         PSG_ERROR(msg);
1222         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1223     } catch (...) {
1224         string      msg = "Unknown exception when handling a status request";
1225         x_SendMessageAndCompletionChunks(reply, msg,
1226                                          CRequestStatus::e500_InternalServerError,
1227                                          ePSGS_StatusError, eDiag_Error);
1228         PSG_ERROR(msg);
1229         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1230     }
1231     return 0;
1232 }
1233 
1234 
OnShutdown(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1235 int CPubseqGatewayApp::OnShutdown(CHttpRequest &  req,
1236                                   shared_ptr<CPSGS_Reply>  reply)
1237 {
1238     // NOTE: expected to work regardless of the shutdown request
1239 
1240     static const char *     s_ImmediateShutdown = "Immediate shutdown request accepted";
1241     static const char *     s_GracefulShutdown = "Graceful shutdown request accepted";
1242     static size_t           s_ImmediateShutdownSize = strlen(s_ImmediateShutdown);
1243     static size_t           s_GracefulShutdownSize = strlen(s_GracefulShutdown);
1244 
1245     CRequestContextResetter context_resetter;
1246     CRef<CRequestContext>   context = x_CreateRequestContext(req);
1247 
1248     try {
1249         string              msg;
1250         string              username;
1251         SRequestParameter   username_param = x_GetParam(req, kUsernameParam);
1252         if (username_param.m_Found) {
1253             username = string(username_param.m_Value.data(),
1254                               username_param.m_Value.size());
1255         }
1256 
1257         if (!m_AuthToken.empty()) {
1258             SRequestParameter   auth_token_param = x_GetParam(req, kAuthTokenParam);
1259 
1260             bool    auth_good = false;
1261             if (auth_token_param.m_Found) {
1262                 auth_good = m_AuthToken == string(auth_token_param.m_Value.data(),
1263                                                   auth_token_param.m_Value.size());
1264             }
1265 
1266             if (!auth_good) {
1267                 msg = "Unauthorized shutdown request: invalid authorization token. ";
1268                 if (username.empty())
1269                     msg += "Unknown user";
1270                 else
1271                     msg += "User: " + username;
1272                 PSG_MESSAGE(msg);
1273 
1274                 x_SendMessageAndCompletionChunks(
1275                         reply, "Invalid authorization token",
1276                         CRequestStatus::e401_Unauthorized,
1277                         ePSGS_Unauthorised, eDiag_Error);
1278                 x_PrintRequestStop(context, CRequestStatus::e401_Unauthorized);
1279                 return 0;
1280             }
1281         }
1282 
1283         int                 timeout = 10; // Default: 10 sec
1284         SRequestParameter   timeout_param = x_GetParam(req, kTimeoutParam);
1285         if (timeout_param.m_Found) {
1286             try {
1287                 timeout = stoi(string(timeout_param.m_Value.data(),
1288                                       timeout_param.m_Value.size()));
1289             } catch (...) {
1290                 msg = "Invalid shutdown request: cannot convert timeout to an integer. ";
1291                 if (username.empty())
1292                     msg += "Unknown user";
1293                 else
1294                     msg += "User: " + username;
1295                 PSG_MESSAGE(msg);
1296 
1297                 x_SendMessageAndCompletionChunks(
1298                         reply, "Invalid timeout (must be a positive integer)",
1299                         CRequestStatus::e400_BadRequest,
1300                         ePSGS_MalformedParameter, eDiag_Error);
1301                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1302                 return 0;
1303             }
1304 
1305             if (timeout < 0) {
1306                 msg = "Invalid shutdown request: timeout must be >= 0. ";
1307                 if (username.empty())
1308                     msg += "Unknown user";
1309                 else
1310                     msg += "User: " + username;
1311                 PSG_MESSAGE(msg);
1312 
1313                 x_SendMessageAndCompletionChunks(
1314                         reply, "Invalid timeout (must be a positive integer)",
1315                         CRequestStatus::e400_BadRequest,
1316                         ePSGS_MalformedParameter, eDiag_Error);
1317                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1318                 return 0;
1319             }
1320         }
1321 
1322         auto        http_reply = reply->GetHttpReply();
1323         http_reply->SetContentType(ePSGS_PlainTextMime);
1324         if (timeout == 0) {
1325             // Immediate shutdown is requested
1326             msg = "Immediate shutdown request received from ";
1327             if (username.empty())
1328                 msg += "an unknown user";
1329             else
1330                 msg += "user " + username;
1331             PSG_MESSAGE(msg);
1332 
1333             http_reply->Send202(s_ImmediateShutdown, s_ImmediateShutdownSize);
1334             x_PrintRequestStop(context, CRequestStatus::e202_Accepted);
1335             exit(0);
1336         }
1337 
1338         msg = "Graceful shutdown request received from ";
1339         if (username.empty())
1340             msg += "an unknown user";
1341         else
1342             msg += "user " + username;
1343 
1344         auto        now = chrono::steady_clock::now();
1345         auto        expiration = now + chrono::seconds(timeout);
1346         if (g_ShutdownData.m_ShutdownRequested) {
1347             // Consequest shutdown request
1348             if (expiration >= g_ShutdownData.m_Expired) {
1349                 msg += ". The previous shutdown expiration is shorter "
1350                        "than this one. Ignored.";
1351                 PSG_MESSAGE(msg);
1352                 http_reply->Send409(msg.c_str());
1353                 x_PrintRequestStop(context, CRequestStatus::e409_Conflict);
1354                 return 0;
1355             }
1356         }
1357 
1358         // New shutdown request or a shorter expiration request
1359         PSG_MESSAGE(msg);
1360 
1361         http_reply->Send202(s_GracefulShutdown, s_GracefulShutdownSize);
1362         x_PrintRequestStop(context, CRequestStatus::e202_Accepted);
1363 
1364         g_ShutdownData.m_Expired = expiration;
1365         g_ShutdownData.m_ShutdownRequested = true;
1366     } catch (const exception &  exc) {
1367         string      msg = "Exception when handling a shutdown request: " +
1368                           string(exc.what());
1369         x_SendMessageAndCompletionChunks(reply, msg,
1370                                          CRequestStatus::e500_InternalServerError,
1371                                          ePSGS_ShutdownError, eDiag_Error);
1372         PSG_ERROR(msg);
1373         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1374     } catch (...) {
1375         string      msg = "Unknown exception when handling a shutdown request";
1376         x_SendMessageAndCompletionChunks(reply, msg,
1377                                          CRequestStatus::e500_InternalServerError,
1378                                          ePSGS_ShutdownError, eDiag_Error);
1379         PSG_ERROR(msg);
1380         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1381     }
1382     return 0;
1383 }
1384 
1385 
OnGetAlerts(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1386 int CPubseqGatewayApp::OnGetAlerts(CHttpRequest &  req,
1387                                    shared_ptr<CPSGS_Reply>  reply)
1388 {
1389     // NOTE: expected to work regardless of the shutdown request
1390 
1391     CRequestContextResetter context_resetter;
1392     CRef<CRequestContext>   context = x_CreateRequestContext(req);
1393 
1394     auto        http_reply = reply->GetHttpReply();
1395     try {
1396         m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1397 
1398         string      content = m_Alerts.Serialize().Repr(CJsonNode::fStandardJson);
1399 
1400         http_reply->SetContentType(ePSGS_JsonMime);
1401         http_reply->SetContentLength(content.length());
1402         http_reply->SendOk(content.c_str(), content.length(), false);
1403 
1404         x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1405     } catch (const exception &  exc) {
1406         string      msg = "Exception when handling a get alerts request: " +
1407                           string(exc.what());
1408         x_SendMessageAndCompletionChunks(reply, msg,
1409                                          CRequestStatus::e500_InternalServerError,
1410                                          ePSGS_GetAlertsError, eDiag_Error);
1411         PSG_ERROR(msg);
1412         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1413     } catch (...) {
1414         string      msg = "Unknown exception when handling a get alerts request";
1415         x_SendMessageAndCompletionChunks(reply, msg,
1416                                          CRequestStatus::e500_InternalServerError,
1417                                          ePSGS_GetAlertsError, eDiag_Error);
1418         PSG_ERROR(msg);
1419         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1420     }
1421     return 0;
1422 }
1423 
1424 
OnAckAlert(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1425 int CPubseqGatewayApp::OnAckAlert(CHttpRequest &  req,
1426                                   shared_ptr<CPSGS_Reply>  reply)
1427 {
1428     CRequestContextResetter context_resetter;
1429     CRef<CRequestContext>   context = x_CreateRequestContext(req);
1430 
1431     if (x_IsShuttingDown(reply)) {
1432         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
1433         return 0;
1434     }
1435 
1436     string                  msg;
1437 
1438     try {
1439         SRequestParameter   alert_param = x_GetParam(req, kAlertParam);
1440         if (!alert_param.m_Found) {
1441             msg = "Missing " + kAlertParam + " parameter";
1442             x_SendMessageAndCompletionChunks(
1443                     reply, msg, CRequestStatus::e400_BadRequest,
1444                     ePSGS_InsufficientArguments, eDiag_Error);
1445             PSG_ERROR(msg);
1446             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1447             return 0;
1448         }
1449 
1450         SRequestParameter   username_param = x_GetParam(req, kUsernameParam);
1451         if (!username_param.m_Found) {
1452             msg = "Missing " + kUsernameParam + " parameter";
1453             x_SendMessageAndCompletionChunks(
1454                     reply, msg, CRequestStatus::e400_BadRequest,
1455                     ePSGS_InsufficientArguments, eDiag_Error);
1456             PSG_ERROR(msg);
1457             x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1458             return 0;
1459         }
1460 
1461         string  alert(alert_param.m_Value.data(), alert_param.m_Value.size());
1462         string  username(username_param.m_Value.data(), username_param.m_Value.size());
1463         auto    http_reply = reply->GetHttpReply();
1464 
1465         switch (m_Alerts.Acknowledge(alert, username)) {
1466             case ePSGS_AlertNotFound:
1467                 msg = "Alert " + alert + " is not found";
1468                 x_SendMessageAndCompletionChunks(
1469                         reply, msg,
1470                         CRequestStatus::e404_NotFound,
1471                         ePSGS_MalformedParameter, eDiag_Error);
1472                 PSG_ERROR(msg);
1473                 x_PrintRequestStop(context, CRequestStatus::e404_NotFound);
1474                 break;
1475             case ePSGS_AlertAlreadyAcknowledged:
1476                 http_reply->SetContentType(ePSGS_PlainTextMime);
1477                 msg = "Alert " + alert + " has already been acknowledged";
1478                 http_reply->SendOk(msg.c_str(), msg.size(), false);
1479                 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1480                 break;
1481             case ePSGS_AlertAcknowledged:
1482                 http_reply->SetContentType(ePSGS_PlainTextMime);
1483                 http_reply->SendOk(nullptr, 0, true);
1484                 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1485                 break;
1486         }
1487     } catch (const exception &  exc) {
1488         string      msg = "Exception when handling an acknowledge alert request: " +
1489                           string(exc.what());
1490         x_SendMessageAndCompletionChunks(reply, msg,
1491                                          CRequestStatus::e500_InternalServerError,
1492                                          ePSGS_AckAlertError,
1493                                          eDiag_Error);
1494         PSG_ERROR(msg);
1495         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1496     } catch (...) {
1497         string      msg = "Unknown exception when handling an acknowledge alert request";
1498         x_SendMessageAndCompletionChunks(reply, msg,
1499                                          CRequestStatus::e500_InternalServerError,
1500                                          ePSGS_AckAlertError,
1501                                          eDiag_Error);
1502         PSG_ERROR(msg);
1503         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1504     }
1505     return 0;
1506 }
1507 
OnStatistics(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1508 int CPubseqGatewayApp::OnStatistics(CHttpRequest &  req,
1509                                     shared_ptr<CPSGS_Reply>  reply)
1510 {
1511     CRequestContextResetter context_resetter;
1512     CRef<CRequestContext>   context = x_CreateRequestContext(req);
1513     auto                    http_reply = reply->GetHttpReply();
1514 
1515     if (x_IsShuttingDown(reply)) {
1516         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
1517         return 0;
1518     }
1519 
1520     try {
1521         m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1522 
1523         // /ADMIN/statistics[?reset=yes(dflt=no)][&most_recent_time=<time>][&most_ancient_time=<time>][histogram_names=name1,name2,...]
1524         // /ADMIN/statistics[?reset=yes(dflt=no)][&most_recent_time=<time>][&most_ancient_time=<time>]
1525 
1526 
1527         bool                    reset = false;
1528         SRequestParameter       reset_param = x_GetParam(req, kResetParam);
1529         if (reset_param.m_Found) {
1530             string      err_msg;
1531             if (!x_IsBoolParamValid(kResetParam, reset_param.m_Value, err_msg)) {
1532                 x_SendMessageAndCompletionChunks(
1533                         reply, err_msg, CRequestStatus::e400_BadRequest,
1534                         ePSGS_MalformedParameter, eDiag_Error);
1535                 PSG_ERROR(err_msg);
1536                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1537                 return 0;
1538             }
1539             reset = reset_param.m_Value == "yes";
1540         }
1541 
1542         if (reset) {
1543             m_Timing->Reset();
1544             http_reply->SetContentType(ePSGS_PlainTextMime);
1545             http_reply->SendOk(nullptr, 0, true);
1546             x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1547             return 0;
1548         }
1549 
1550         int                 most_recent_time = INT_MIN;
1551         SRequestParameter   most_recent_time_param = x_GetParam(req, kMostRecentTimeParam);
1552         if (most_recent_time_param.m_Found) {
1553             string          err_msg;
1554             try {
1555                 most_recent_time = NStr::StringToInt8(most_recent_time_param.m_Value);
1556                 if (most_recent_time < 0)
1557                     err_msg = "Invalid " + kMostRecentTimeParam +
1558                               " value (" + most_recent_time_param.m_Value +
1559                               "). It must be >= 0.";
1560             } catch (...) {
1561                 err_msg = "Invalid " + kMostRecentTimeParam +
1562                           " value (" + most_recent_time_param.m_Value +
1563                           "). It must be an integer >= 0.";
1564             }
1565 
1566             if (!err_msg.empty()) {
1567                 x_SendMessageAndCompletionChunks(
1568                         reply, err_msg, CRequestStatus::e400_BadRequest,
1569                         ePSGS_MalformedParameter, eDiag_Error);
1570                 PSG_ERROR(err_msg);
1571                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1572                 return 0;
1573             }
1574         }
1575 
1576         int                 most_ancient_time = INT_MIN;
1577         SRequestParameter   most_ancient_time_param = x_GetParam(req, kMostAncientTimeParam);
1578         if (most_ancient_time_param.m_Found) {
1579             string          err_msg;
1580             try {
1581                 most_ancient_time = NStr::StringToInt8(most_ancient_time_param.m_Value);
1582                 if (most_ancient_time < 0)
1583                     err_msg = "Invalid " + kMostAncientTimeParam +
1584                               " value (" + most_ancient_time_param.m_Value +
1585                               "). It must be >= 0.";
1586             } catch (...) {
1587                 err_msg = "Invalid " + kMostAncientTimeParam +
1588                           " value (" + most_ancient_time_param.m_Value +
1589                           "). It must be an integer >= 0.";
1590             }
1591 
1592             if (!err_msg.empty()) {
1593                 x_SendMessageAndCompletionChunks(
1594                         reply, err_msg, CRequestStatus::e400_BadRequest,
1595                         ePSGS_MalformedParameter, eDiag_Error);
1596                 PSG_ERROR(err_msg);
1597                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1598                 return 0;
1599             }
1600         }
1601 
1602         if (most_ancient_time >= 0 && most_recent_time >= 0) {
1603             // auto reorder the time if needed
1604             if (most_recent_time > most_ancient_time) {
1605                 swap(most_recent_time, most_ancient_time);
1606             }
1607         }
1608 
1609         vector<CTempString> histogram_names;
1610         SRequestParameter   histogram_names_param = x_GetParam(req, kHistogramNamesParam);
1611         if (histogram_names_param.m_Found) {
1612             NStr::Split(histogram_names_param.m_Value, ",", histogram_names);
1613         }
1614 
1615 
1616         CJsonNode   timing(m_Timing->Serialize(most_ancient_time,
1617                                                most_recent_time,
1618                                                histogram_names,
1619                                                m_TickSpan));
1620         string      content = timing.Repr(CJsonNode::fStandardJson);
1621 
1622         http_reply->SetContentType(ePSGS_JsonMime);
1623         http_reply->SetContentLength(content.length());
1624         http_reply->SendOk(content.c_str(), content.length(), false);
1625 
1626         x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1627     } catch (const exception &  exc) {
1628         string      msg = "Exception when handling a statistics request: " +
1629                           string(exc.what());
1630         x_SendMessageAndCompletionChunks(reply, msg,
1631                                          CRequestStatus::e500_InternalServerError,
1632                                          ePSGS_StatisticsError,
1633                                          eDiag_Error);
1634         PSG_ERROR(msg);
1635         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1636     } catch (...) {
1637         string  msg = "Unknown exception when handling a statistics request";
1638         x_SendMessageAndCompletionChunks(reply, msg,
1639                                          CRequestStatus::e500_InternalServerError,
1640                                          ePSGS_StatisticsError,
1641                                          eDiag_Error);
1642         PSG_ERROR(msg);
1643         x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1644     }
1645     return 0;
1646 }
1647 
1648 
OnTestIO(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1649 int CPubseqGatewayApp::OnTestIO(CHttpRequest &  req,
1650                                 shared_ptr<CPSGS_Reply>  reply)
1651 {
1652     bool                    need_log = false;   // default
1653     CRequestContextResetter context_resetter;
1654     CRef<CRequestContext>   context;
1655     auto                    http_reply = reply->GetHttpReply();
1656 
1657     if (x_IsShuttingDown(reply)) {
1658         x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
1659         return 0;
1660     }
1661 
1662     try {
1663         string                  err_msg;
1664 
1665         SRequestParameter       log_param = x_GetParam(req, kLogParam);
1666         if (log_param.m_Found) {
1667             if (!x_IsBoolParamValid(kLogParam, log_param.m_Value, err_msg)) {
1668                 x_SendMessageAndCompletionChunks(
1669                         reply, err_msg, CRequestStatus::e400_BadRequest,
1670                         ePSGS_MalformedParameter, eDiag_Error);
1671                 PSG_ERROR(err_msg);
1672                 return 0;
1673             }
1674             need_log = log_param.m_Value == "yes";
1675         }
1676 
1677         if (need_log)
1678             context = x_CreateRequestContext(req);
1679 
1680         // Read the return data size
1681         SRequestParameter   data_size_param = x_GetParam(req, kDataSizeParam);
1682         long                data_size = 0;
1683         if (data_size_param.m_Found) {
1684             data_size = NStr::StringToLong(data_size_param.m_Value);
1685             if (data_size < 0 || data_size > kMaxTestIOSize) {
1686                 err_msg = "Invalid range of the '" + kDataSizeParam +
1687                           "' parameter. Accepted values are 0..." +
1688                           to_string(kMaxTestIOSize);
1689                 x_SendMessageAndCompletionChunks(
1690                         reply, err_msg, CRequestStatus::e400_BadRequest,
1691                         ePSGS_MalformedParameter, eDiag_Error);
1692                 if (need_log) {
1693                     PSG_WARNING(err_msg);
1694                     x_PrintRequestStop(context,
1695                                        CRequestStatus::e400_BadRequest);
1696                 }
1697                 return 0;
1698             }
1699         } else {
1700             err_msg = "The '" + kDataSizeParam + "' must be provided";
1701             x_SendMessageAndCompletionChunks(
1702                     reply, err_msg, CRequestStatus::e400_BadRequest,
1703                     ePSGS_InsufficientArguments, eDiag_Error);
1704             if (need_log) {
1705                 PSG_WARNING(err_msg);
1706                 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1707             }
1708             return 0;
1709         }
1710 
1711         m_Counters.Increment(CPSGSCounters::ePSGS_TestIORequest);
1712 
1713         http_reply->SetContentType(ePSGS_BinaryMime);
1714         http_reply->SetContentLength(data_size);
1715 
1716         // true: persistent
1717         http_reply->SendOk(m_IOTestBuffer.get(), data_size, true);
1718 
1719         if (need_log)
1720             x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1721     } catch (const exception &  exc) {
1722         string      msg = "Exception when handling a test io request: " +
1723                           string(exc.what());
1724         x_SendMessageAndCompletionChunks(reply, msg,
1725                                          CRequestStatus::e500_InternalServerError,
1726                                          ePSGS_TestIOError,
1727                                          eDiag_Error);
1728         PSG_ERROR(msg);
1729         if (need_log)
1730             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1731     } catch (...) {
1732         string  msg = "Unknown exception when handling a test io request";
1733         x_SendMessageAndCompletionChunks(reply, msg,
1734                                          CRequestStatus::e500_InternalServerError,
1735                                          ePSGS_TestIOError,
1736                                          eDiag_Error);
1737         PSG_ERROR(msg);
1738         if (need_log)
1739             x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1740     }
1741     return 0;
1742 }
1743 
1744 
x_ProcessCommonGetAndResolveParams(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply,CTempString & seq_id,int & seq_id_type,SPSGS_RequestBase::EPSGS_CacheAndDbUse & use_cache)1745 bool CPubseqGatewayApp::x_ProcessCommonGetAndResolveParams(
1746         CHttpRequest &  req,
1747         shared_ptr<CPSGS_Reply>  reply,
1748         CTempString &  seq_id,
1749         int &  seq_id_type,
1750         SPSGS_RequestBase::EPSGS_CacheAndDbUse &  use_cache)
1751 {
1752     SRequestParameter   seq_id_type_param;
1753     string              err_msg;
1754 
1755     // Check the mandatory parameter presence
1756     SRequestParameter   seq_id_param = x_GetParam(req, kSeqIdParam);
1757     if (!seq_id_param.m_Found) {
1758         err_msg = "Missing the '" + kSeqIdParam + "' parameter";
1759         m_Counters.Increment(CPSGSCounters::ePSGS_InsufficientArgs);
1760     }
1761     else if (seq_id_param.m_Value.empty()) {
1762         err_msg = "Missing value of the '" + kSeqIdParam + "' parameter";
1763         m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1764     }
1765 
1766     if (err_msg.empty()) {
1767         use_cache = x_GetUseCacheParameter(req, err_msg);
1768     }
1769 
1770     if (!err_msg.empty()) {
1771         x_SendMessageAndCompletionChunks(reply, err_msg,
1772                                          CRequestStatus::e400_BadRequest,
1773                                          ePSGS_MissingParameter, eDiag_Error);
1774         PSG_WARNING(err_msg);
1775         return false;
1776     }
1777     seq_id = seq_id_param.m_Value;
1778 
1779     seq_id_type_param = x_GetParam(req, kSeqIdTypeParam);
1780     if (seq_id_type_param.m_Found) {
1781         if (!x_ConvertIntParameter(kSeqIdTypeParam, seq_id_type_param.m_Value,
1782                                    seq_id_type, err_msg)) {
1783             m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1784             x_SendMessageAndCompletionChunks(reply, err_msg,
1785                                              CRequestStatus::e400_BadRequest,
1786                                              ePSGS_MalformedParameter,
1787                                              eDiag_Error);
1788             PSG_WARNING(err_msg);
1789             return false;
1790         }
1791 
1792         if (seq_id_type < 0 || seq_id_type >= CSeq_id::e_MaxChoice) {
1793             err_msg = "The '" + kSeqIdTypeParam +
1794                       "' value must be >= 0 and less than " +
1795                       to_string(CSeq_id::e_MaxChoice);
1796             m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1797             x_SendMessageAndCompletionChunks(reply, err_msg,
1798                                              CRequestStatus::e400_BadRequest,
1799                                              ePSGS_MalformedParameter,
1800                                              eDiag_Error);
1801             PSG_WARNING(err_msg);
1802             return false;
1803         }
1804     } else {
1805         seq_id_type = -1;
1806     }
1807 
1808     return true;
1809 }
1810 
1811 
1812 SPSGS_RequestBase::EPSGS_CacheAndDbUse
x_GetUseCacheParameter(CHttpRequest & req,string & err_msg)1813 CPubseqGatewayApp::x_GetUseCacheParameter(CHttpRequest &  req,
1814                                           string &  err_msg)
1815 {
1816     SRequestParameter   use_cache_param = x_GetParam(req, kUseCacheParam);
1817 
1818     if (use_cache_param.m_Found) {
1819         if (!x_IsBoolParamValid(kUseCacheParam, use_cache_param.m_Value,
1820                                 err_msg)) {
1821             m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1822             return SPSGS_RequestBase::ePSGS_UnknownUseCache;
1823         }
1824         if (use_cache_param.m_Value == "yes")
1825             return SPSGS_RequestBase::ePSGS_CacheOnly;
1826         return SPSGS_RequestBase::ePSGS_DbOnly;
1827     }
1828     return SPSGS_RequestBase::ePSGS_CacheAndDb;
1829 }
1830 
1831 
x_IsShuttingDown(shared_ptr<CPSGS_Reply> reply)1832 bool CPubseqGatewayApp::x_IsShuttingDown(shared_ptr<CPSGS_Reply>  reply)
1833 {
1834     if (g_ShutdownData.m_ShutdownRequested) {
1835         string      msg = "The server is in process of shutting down";
1836         x_SendMessageAndCompletionChunks(reply, msg,
1837                                          CRequestStatus::e503_ServiceUnavailable,
1838                                          ePSGS_ShuttingDown,
1839                                          eDiag_Error);
1840         PSG_ERROR(msg);
1841         return true;
1842     }
1843     return false;
1844 }
1845 
1846 
x_DispatchRequest(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply)1847 void CPubseqGatewayApp::x_DispatchRequest(shared_ptr<CPSGS_Request>  request,
1848                                           shared_ptr<CPSGS_Reply>  reply)
1849 {
1850     // The dispatcher works in terms of processors while the infrastructure
1851     // works in terms of pending operation. So, lets create the corresponding
1852     // pending operations.
1853     // Note: the case when no processors are available is handled in the
1854     //       dispatcher (reply is sent to the client).
1855 
1856     list<unique_ptr<CPendingOperation>>     pending_ops;
1857     for (auto processor: m_RequestDispatcher.DispatchRequest(request, reply)) {
1858         pending_ops.push_back(
1859             move(
1860                 unique_ptr<CPendingOperation>(
1861                     new CPendingOperation(request, reply, processor))));
1862     }
1863 
1864     if (!pending_ops.empty()) {
1865         auto    http_conn = reply->GetHttpReply()->GetHttpConnection();
1866         http_conn->Postpone(move(pending_ops), reply);
1867     }
1868 }
1869 
1870