1 /* $Id: pubseq_gateway_handlers.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitri Dmitrienko
27 *
28 * File Description:
29 *
30 */
31 #include <ncbi_pch.hpp>
32
33 #include <corelib/ncbithr.hpp>
34 #include <corelib/ncbidiag.hpp>
35 #include <corelib/request_ctx.hpp>
36
37 #include <objtools/pubseq_gateway/impl/cassandra/blob_storage.hpp>
38
39 #include "pubseq_gateway.hpp"
40 #include "pubseq_gateway_exception.hpp"
41 #include "resolve_processor.hpp"
42
43 #include "shutdown_data.hpp"
44 extern SShutdownData g_ShutdownData;
45
46
47 USING_NCBI_SCOPE;
48
49
50 static string kFmtParam = "fmt";
51 static string kBlobIdParam = "blob_id";
52 static string kLastModifiedParam = "last_modified";
53 static string kTSELastModifiedParam = "tse_last_modified";
54 static string kSeqIdParam = "seq_id";
55 static string kSeqIdTypeParam = "seq_id_type";
56 static string kTSEParam = "tse";
57 static string kUseCacheParam = "use_cache";
58 static string kNamesParam = "names";
59 static string kExcludeBlobsParam = "exclude_blobs";
60 static string kClientIdParam = "client_id";
61 static string kAuthTokenParam = "auth_token";
62 static string kTimeoutParam = "timeout";
63 static string kDataSizeParam = "return_data_size";
64 static string kLogParam = "log";
65 static string kUsernameParam = "username";
66 static string kAlertParam = "alert";
67 static string kResetParam = "reset";
68 static string kTSEIdParam = "tse_id";
69 static string kId2ChunkParam = "id2_chunk";
70 static string kId2InfoParam = "id2_info";
71 static string kAccSubstitutionParam = "acc_substitution";
72 static string kAutoBlobSkippingParam = "auto_blob_skipping";
73 static string kTraceParam = "trace";
74 static string kMostRecentTimeParam = "most_recent_time";
75 static string kMostAncientTimeParam = "most_ancient_time";
76 static string kHistogramNamesParam = "histogram_names";
77 static string kNA = "n/a";
78
79 static vector<pair<string, SPSGS_ResolveRequest::EPSGS_BioseqIncludeData>>
80 kResolveFlagParams =
81 {
82 make_pair("all_info", SPSGS_ResolveRequest::fPSGS_AllBioseqFields), // must be first
83
84 make_pair("canon_id", SPSGS_ResolveRequest::fPSGS_CanonicalId),
85 make_pair("seq_ids", SPSGS_ResolveRequest::fPSGS_SeqIds),
86 make_pair("mol_type", SPSGS_ResolveRequest::fPSGS_MoleculeType),
87 make_pair("length", SPSGS_ResolveRequest::fPSGS_Length),
88 make_pair("state", SPSGS_ResolveRequest::fPSGS_State),
89 make_pair("blob_id", SPSGS_ResolveRequest::fPSGS_BlobId),
90 make_pair("tax_id", SPSGS_ResolveRequest::fPSGS_TaxId),
91 make_pair("hash", SPSGS_ResolveRequest::fPSGS_Hash),
92 make_pair("date_changed", SPSGS_ResolveRequest::fPSGS_DateChanged),
93 make_pair("gi", SPSGS_ResolveRequest::fPSGS_Gi),
94 make_pair("name", SPSGS_ResolveRequest::fPSGS_Name),
95 make_pair("seq_state", SPSGS_ResolveRequest::fPSGS_SeqState)
96 };
97 static string kBadUrlMessage = "Unknown request, the provided URL "
98 "is not recognized: ";
99
100
OnBadURL(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)101 int CPubseqGatewayApp::OnBadURL(CHttpRequest & req,
102 shared_ptr<CPSGS_Reply> reply)
103 {
104 CRequestContextResetter context_resetter;
105 CRef<CRequestContext> context = x_CreateRequestContext(req);
106
107 if (x_IsShuttingDown(reply)) {
108 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
109 return 0;
110 }
111
112 auto http_reply = reply->GetHttpReply();
113 if (req.GetPath() == "/") {
114 // Special case: no path at all so provide a help message
115 try {
116 http_reply->SetContentType(ePSGS_JsonMime);
117 http_reply->SetContentLength(m_HelpMessage.length());
118 http_reply->SendOk(m_HelpMessage.c_str(), m_HelpMessage.length(), false);
119 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
120 } catch (const exception & exc) {
121 string msg = "Exception when handling no path URL event: " +
122 string(exc.what());
123 x_SendMessageAndCompletionChunks(reply, msg,
124 CRequestStatus::e500_InternalServerError,
125 ePSGS_BadURL, eDiag_Error);
126 PSG_ERROR(msg);
127 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
128 } catch (...) {
129 string msg = "Unknown exception when handling no path URL event";
130 x_SendMessageAndCompletionChunks(reply, msg,
131 CRequestStatus::e500_InternalServerError,
132 ePSGS_BadURL, eDiag_Error);
133 PSG_ERROR(msg);
134 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
135 }
136 } else {
137 try {
138 m_Counters.Increment(CPSGSCounters::ePSGS_BadUrlPath);
139
140 string bad_url = req.GetPath();
141 x_SendMessageAndCompletionChunks(reply, kBadUrlMessage + bad_url,
142 CRequestStatus::e400_BadRequest,
143 ePSGS_BadURL,
144 eDiag_Error);
145 auto bad_url_size = bad_url.size();
146 if (bad_url_size > 128) {
147 bad_url.resize(128);
148 bad_url += "... (truncated; original length: " +
149 to_string(bad_url_size) + ")";
150 }
151
152 PSG_WARNING(kBadUrlMessage + bad_url);
153 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
154 } catch (const exception & exc) {
155 string msg = "Exception when handling a bad URL event: " +
156 string(exc.what());
157 x_SendMessageAndCompletionChunks(reply, msg,
158 CRequestStatus::e500_InternalServerError,
159 ePSGS_BadURL, eDiag_Error);
160 PSG_ERROR(msg);
161 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
162 } catch (...) {
163 string msg = "Unknown exception when handling a bad URL event";
164 x_SendMessageAndCompletionChunks(reply, msg,
165 CRequestStatus::e500_InternalServerError,
166 ePSGS_BadURL, eDiag_Error);
167 PSG_ERROR(msg);
168 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
169 }
170 }
171 return 0;
172 }
173
174
OnGet(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)175 int CPubseqGatewayApp::OnGet(CHttpRequest & req,
176 shared_ptr<CPSGS_Reply> reply)
177 {
178 auto now = chrono::high_resolution_clock::now();
179 CRequestContextResetter context_resetter;
180 CRef<CRequestContext> context = x_CreateRequestContext(req);
181
182 if (x_IsShuttingDown(reply)) {
183 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
184 return 0;
185 }
186
187 int hops;
188 if (!x_GetHops(req, reply, hops)) {
189 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
190 return 0;
191 }
192
193 try {
194 CTempString seq_id;
195 int seq_id_type;
196 SPSGS_RequestBase::EPSGS_CacheAndDbUse use_cache = SPSGS_RequestBase::ePSGS_CacheAndDb;
197
198 if (!x_ProcessCommonGetAndResolveParams(req, reply, seq_id,
199 seq_id_type, use_cache)) {
200 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
201 return 0;
202 }
203
204 string err_msg;
205 SPSGS_BlobRequestBase::EPSGS_TSEOption tse_option = SPSGS_BlobRequestBase::ePSGS_OrigTSE;
206 SRequestParameter tse_param = x_GetParam(req, kTSEParam);
207 if (tse_param.m_Found) {
208 tse_option = x_GetTSEOption(kTSEParam, tse_param.m_Value, err_msg);
209 if (tse_option == SPSGS_BlobRequestBase::ePSGS_UnknownTSE) {
210 x_MalformedArguments(reply, context, err_msg);
211 return 0;
212 }
213 }
214
215 vector<string> exclude_blobs;
216 SRequestParameter exclude_blobs_param = x_GetParam(req,
217 kExcludeBlobsParam);
218 if (exclude_blobs_param.m_Found) {
219 exclude_blobs = x_GetExcludeBlobs(kExcludeBlobsParam,
220 exclude_blobs_param.m_Value);
221 }
222
223 SPSGS_RequestBase::EPSGS_AccSubstitutioOption
224 subst_option = SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
225 SRequestParameter subst_param = x_GetParam(req, kAccSubstitutionParam);
226 if (subst_param.m_Found) {
227 subst_option = x_GetAccessionSubstitutionOption(kAccSubstitutionParam,
228 subst_param.m_Value,
229 err_msg);
230 if (subst_option == SPSGS_RequestBase::ePSGS_UnknownAccSubstitution) {
231 x_MalformedArguments(reply, context, err_msg);
232 return 0;
233 }
234 }
235
236 SRequestParameter client_id_param = x_GetParam(req, kClientIdParam);
237
238 SPSGS_RequestBase::EPSGS_Trace trace = SPSGS_RequestBase::ePSGS_NoTracing;
239 if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
240 x_SendMessageAndCompletionChunks(reply, err_msg,
241 CRequestStatus::e400_BadRequest,
242 ePSGS_MalformedParameter, eDiag_Error);
243 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
244 return 0;
245 }
246
247 bool auto_blob_skipping = true; // default
248 SRequestParameter auto_blob_skipping_param = x_GetParam(req, kAutoBlobSkippingParam);
249 if (auto_blob_skipping_param.m_Found) {
250 if (!x_IsBoolParamValid(kAutoBlobSkippingParam,
251 auto_blob_skipping_param.m_Value,
252 err_msg)) {
253 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
254 x_SendMessageAndCompletionChunks(reply, err_msg,
255 CRequestStatus::e400_BadRequest,
256 ePSGS_MalformedParameter,
257 eDiag_Error);
258 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
259 return 0;
260 }
261 auto_blob_skipping = auto_blob_skipping_param.m_Value == "yes";
262 }
263
264 vector<string> enabled_processors;
265 vector<string> disabled_processors;
266 if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
267 disabled_processors))
268 return 0;
269
270 m_Counters.Increment(CPSGSCounters::ePSGS_GetBlobBySeqIdRequest);
271 unique_ptr<SPSGS_RequestBase>
272 req(new SPSGS_BlobBySeqIdRequest(
273 string(seq_id.data(), seq_id.size()),
274 seq_id_type, exclude_blobs,
275 tse_option, use_cache, subst_option,
276 auto_blob_skipping,
277 string(client_id_param.m_Value.data(),
278 client_id_param.m_Value.size()),
279 hops, trace, enabled_processors, disabled_processors,
280 now));
281 shared_ptr<CPSGS_Request>
282 request(new CPSGS_Request(move(req), context));
283
284 x_DispatchRequest(request, reply);
285 } catch (const exception & exc) {
286 string msg = "Exception when handling a get request: " +
287 string(exc.what());
288 x_SendMessageAndCompletionChunks(reply, msg,
289 CRequestStatus::e500_InternalServerError,
290 ePSGS_UnknownError, eDiag_Error);
291 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
292 } catch (...) {
293 string msg = "Unknown exception when handling a get request";
294 x_SendMessageAndCompletionChunks(reply, msg,
295 CRequestStatus::e500_InternalServerError,
296 ePSGS_UnknownError, eDiag_Error);
297 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
298 }
299 return 0;
300 }
301
302
OnGetBlob(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)303 int CPubseqGatewayApp::OnGetBlob(CHttpRequest & req,
304 shared_ptr<CPSGS_Reply> reply)
305 {
306 auto now = chrono::high_resolution_clock::now();
307 CRequestContextResetter context_resetter;
308 CRef<CRequestContext> context = x_CreateRequestContext(req);
309
310 if (x_IsShuttingDown(reply)) {
311 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
312 return 0;
313 }
314
315 int hops;
316 if (!x_GetHops(req, reply, hops)) {
317 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
318 return 0;
319 }
320
321 try {
322 string err_msg;
323 SPSGS_BlobRequestBase::EPSGS_TSEOption
324 tse_option = SPSGS_BlobRequestBase::ePSGS_OrigTSE;
325 SRequestParameter tse_param = x_GetParam(req, kTSEParam);
326 if (tse_param.m_Found) {
327 tse_option = x_GetTSEOption(kTSEParam, tse_param.m_Value, err_msg);
328 if (tse_option == SPSGS_BlobRequestBase::ePSGS_UnknownTSE) {
329 x_MalformedArguments(reply, context, err_msg);
330 return 0;
331 }
332 }
333
334 SRequestParameter last_modified_param = x_GetParam(req, kLastModifiedParam);
335 int64_t last_modified_value = INT64_MIN;
336 if (last_modified_param.m_Found) {
337 try {
338 last_modified_value = NStr::StringToLong(
339 last_modified_param.m_Value);
340 } catch (...) {
341 x_MalformedArguments(reply, context,
342 "Malformed '" + kLastModifiedParam +
343 "' parameter. Expected an integer");
344 return 0;
345 }
346 }
347
348 SPSGS_RequestBase::EPSGS_CacheAndDbUse use_cache = x_GetUseCacheParameter(req, err_msg);
349 if (!err_msg.empty()) {
350 x_MalformedArguments(reply, context, err_msg);
351 return 0;
352 }
353
354 SPSGS_RequestBase::EPSGS_Trace trace = SPSGS_RequestBase::ePSGS_NoTracing;
355 if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
356 x_SendMessageAndCompletionChunks(reply, err_msg,
357 CRequestStatus::e400_BadRequest,
358 ePSGS_MalformedParameter, eDiag_Error);
359 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
360 return 0;
361 }
362
363 SRequestParameter blob_id_param = x_GetParam(req, kBlobIdParam);
364 if (blob_id_param.m_Found)
365 {
366 SPSGS_BlobId blob_id(blob_id_param.m_Value);
367
368 if (blob_id.GetId().empty()) {
369 x_MalformedArguments(reply, context,
370 "The '" + kBlobIdParam +
371 "' parameter value has not been supplied");
372 return 0;
373 }
374
375 SRequestParameter client_id_param = x_GetParam(req, kClientIdParam);
376
377 vector<string> enabled_processors;
378 vector<string> disabled_processors;
379 if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
380 disabled_processors))
381 return 0;
382
383 m_Counters.Increment(CPSGSCounters::ePSGS_GetBlobBySatSatKeyRequest);
384 unique_ptr<SPSGS_RequestBase>
385 req(new SPSGS_BlobBySatSatKeyRequest(
386 blob_id, last_modified_value,
387 tse_option, use_cache,
388 string(client_id_param.m_Value.data(),
389 client_id_param.m_Value.size()),
390 hops, trace,
391 enabled_processors, disabled_processors, now));
392 shared_ptr<CPSGS_Request>
393 request(new CPSGS_Request(move(req), context));
394
395 x_DispatchRequest(request, reply);
396 return 0;
397 }
398
399 x_InsufficientArguments(reply, context, "Mandatory parameter "
400 "'" + kBlobIdParam + "' is not found.");
401 } catch (const exception & exc) {
402 string msg = "Exception when handling a getblob request: " +
403 string(exc.what());
404 x_SendMessageAndCompletionChunks(reply, msg,
405 CRequestStatus::e500_InternalServerError,
406 ePSGS_UnknownError, eDiag_Error);
407 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
408 } catch (...) {
409 string msg = "Unknown exception when handling a getblob request";
410 x_SendMessageAndCompletionChunks(reply, msg,
411 CRequestStatus::e500_InternalServerError,
412 ePSGS_UnknownError, eDiag_Error);
413 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
414 }
415 return 0;
416 }
417
418
OnResolve(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)419 int CPubseqGatewayApp::OnResolve(CHttpRequest & req,
420 shared_ptr<CPSGS_Reply> reply)
421 {
422 auto now = chrono::high_resolution_clock::now();
423 CRequestContextResetter context_resetter;
424 CRef<CRequestContext> context = x_CreateRequestContext(req);
425
426 if (x_IsShuttingDown(reply)) {
427 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
428 return 0;
429 }
430
431 int hops;
432 if (!x_GetHops(req, reply, hops)) {
433 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
434 return 0;
435 }
436
437 try {
438 CTempString seq_id;
439 int seq_id_type;
440 SPSGS_RequestBase::EPSGS_CacheAndDbUse use_cache = SPSGS_RequestBase::ePSGS_CacheAndDb;
441
442 if (!x_ProcessCommonGetAndResolveParams(req, reply, seq_id,
443 seq_id_type, use_cache)) {
444 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
445 return 0;
446 }
447
448 string err_msg;
449 SPSGS_ResolveRequest::EPSGS_OutputFormat
450 output_format = SPSGS_ResolveRequest::ePSGS_NativeFormat;
451 SRequestParameter fmt_param = x_GetParam(req, kFmtParam);
452 if (fmt_param.m_Found) {
453 output_format = x_GetOutputFormat(kFmtParam, fmt_param.m_Value, err_msg);
454 if (output_format == SPSGS_ResolveRequest::ePSGS_UnknownFormat) {
455 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
456 x_SendMessageAndCompletionChunks(reply, err_msg,
457 CRequestStatus::e400_BadRequest,
458 ePSGS_MalformedParameter,
459 eDiag_Error);
460 PSG_WARNING(err_msg);
461 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
462 return 0;
463 }
464 }
465
466 SPSGS_ResolveRequest::TPSGS_BioseqIncludeData
467 include_data_flags = 0;
468 SRequestParameter request_param;
469 for (const auto & flag_param: kResolveFlagParams) {
470 request_param = x_GetParam(req, flag_param.first);
471 if (request_param.m_Found) {
472 if (!x_IsBoolParamValid(flag_param.first,
473 request_param.m_Value, err_msg)) {
474 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
475 x_SendMessageAndCompletionChunks(reply, err_msg,
476 CRequestStatus::e400_BadRequest,
477 ePSGS_MalformedParameter,
478 eDiag_Error);
479 PSG_WARNING(err_msg);
480 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
481 return 0;
482 }
483 if (request_param.m_Value == "yes") {
484 include_data_flags |= flag_param.second;
485 } else {
486 include_data_flags &= ~flag_param.second;
487 }
488 }
489 }
490
491 SPSGS_RequestBase::EPSGS_AccSubstitutioOption
492 subst_option = SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
493 SRequestParameter subst_param = x_GetParam(req, kAccSubstitutionParam);
494 if (subst_param.m_Found) {
495 subst_option = x_GetAccessionSubstitutionOption(kAccSubstitutionParam,
496 subst_param.m_Value,
497 err_msg);
498 if (subst_option == SPSGS_RequestBase::ePSGS_UnknownAccSubstitution) {
499 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
500 x_SendMessageAndCompletionChunks(reply, err_msg,
501 CRequestStatus::e400_BadRequest,
502 ePSGS_MalformedParameter,
503 eDiag_Error);
504 PSG_WARNING(err_msg);
505 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
506 return 0;
507 }
508 }
509
510 SPSGS_RequestBase::EPSGS_Trace trace = SPSGS_RequestBase::ePSGS_NoTracing;
511 if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
512 x_SendMessageAndCompletionChunks(reply, err_msg,
513 CRequestStatus::e400_BadRequest,
514 ePSGS_MalformedParameter,
515 eDiag_Error);
516 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
517 return 0;
518 }
519
520 vector<string> enabled_processors;
521 vector<string> disabled_processors;
522 if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
523 disabled_processors))
524 return 0;
525
526 // Parameters processing has finished
527 m_Counters.Increment(CPSGSCounters::ePSGS_ResolveRequest);
528 unique_ptr<SPSGS_RequestBase>
529 req(new SPSGS_ResolveRequest(
530 string(seq_id.data(), seq_id.size()),
531 seq_id_type, include_data_flags, output_format,
532 use_cache, subst_option, hops, trace,
533 enabled_processors, disabled_processors, now));
534 shared_ptr<CPSGS_Request>
535 request(new CPSGS_Request(move(req), context));
536
537 x_DispatchRequest(request, reply);
538 } catch (const exception & exc) {
539 string msg = "Exception when handling a resolve request: " +
540 string(exc.what());
541 x_SendMessageAndCompletionChunks(reply, msg,
542 CRequestStatus::e500_InternalServerError,
543 ePSGS_UnknownError, eDiag_Error);
544 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
545 } catch (...) {
546 string msg = "Unknown exception when handling a resolve request";
547 x_SendMessageAndCompletionChunks(reply, msg,
548 CRequestStatus::e500_InternalServerError,
549 ePSGS_UnknownError, eDiag_Error);
550 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
551 }
552 return 0;
553 }
554
555
OnGetTSEChunk(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)556 int CPubseqGatewayApp::OnGetTSEChunk(CHttpRequest & req,
557 shared_ptr<CPSGS_Reply> reply)
558 {
559 auto now = chrono::high_resolution_clock::now();
560 CRequestContextResetter context_resetter;
561 CRef<CRequestContext> context = x_CreateRequestContext(req);
562
563 if (x_IsShuttingDown(reply)) {
564 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
565 return 0;
566 }
567
568 int hops;
569 if (!x_GetHops(req, reply, hops)) {
570 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
571 return 0;
572 }
573
574 try {
575 // Mandatory parameter id2_chunk
576 SRequestParameter id2_chunk_param = x_GetParam(req, kId2ChunkParam);
577 int64_t id2_chunk_value = INT64_MIN;
578 if (!id2_chunk_param.m_Found) {
579 x_InsufficientArguments(reply, context, "Mandatory parameter "
580 "'" + kId2ChunkParam + "' is not found.");
581 return 0;
582 }
583
584 try {
585 id2_chunk_value = NStr::StringToLong(id2_chunk_param.m_Value);
586 if (id2_chunk_value < 0) {
587 x_MalformedArguments(reply, context,
588 "Invalid '" + kId2ChunkParam +
589 "' parameter. Expected >= 0");
590 return 0;
591 }
592 } catch (...) {
593 x_MalformedArguments(reply, context,
594 "Malformed '" + kId2ChunkParam + "' parameter. "
595 "Expected an integer");
596 return 0;
597 }
598
599 SRequestParameter id2_info_param = x_GetParam(req, kId2InfoParam);
600 if (!id2_info_param.m_Found)
601 {
602 x_InsufficientArguments(reply, context, "Mandatory parameter '" +
603 kId2InfoParam + "' is not found.");
604 return 0;
605 }
606
607 string err_msg;
608 SPSGS_RequestBase::EPSGS_CacheAndDbUse use_cache = x_GetUseCacheParameter(req, err_msg);
609 if (!err_msg.empty()) {
610 x_MalformedArguments(reply, context, err_msg);
611 return 0;
612 }
613
614 SPSGS_RequestBase::EPSGS_Trace trace = SPSGS_RequestBase::ePSGS_NoTracing;
615 if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
616 x_SendMessageAndCompletionChunks(reply, err_msg,
617 CRequestStatus::e400_BadRequest,
618 ePSGS_MalformedParameter, eDiag_Error);
619 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
620 return 0;
621 }
622
623 vector<string> enabled_processors;
624 vector<string> disabled_processors;
625 if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
626 disabled_processors))
627 return 0;
628
629 // All parameters are good
630 m_Counters.Increment(CPSGSCounters::ePSGS_GetTSEChunk);
631 unique_ptr<SPSGS_RequestBase>
632 req(new SPSGS_TSEChunkRequest(
633 id2_chunk_value, id2_info_param.m_Value,
634 use_cache, hops, trace,
635 enabled_processors, disabled_processors, now));
636 shared_ptr<CPSGS_Request>
637 request(new CPSGS_Request(move(req), context));
638
639 x_DispatchRequest(request, reply);
640 } catch (const exception & exc) {
641 string msg = "Exception when handling a get_tse_chunk request: " +
642 string(exc.what());
643 x_SendMessageAndCompletionChunks(reply, msg,
644 CRequestStatus::e500_InternalServerError,
645 ePSGS_UnknownError, eDiag_Error);
646 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
647 } catch (...) {
648 string msg = "Unknown exception when handling a get_tse_chunk request";
649 x_SendMessageAndCompletionChunks(reply, msg,
650 CRequestStatus::e500_InternalServerError,
651 ePSGS_UnknownError, eDiag_Error);
652 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
653 }
654 return 0;
655 }
656
657
OnGetNA(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)658 int CPubseqGatewayApp::OnGetNA(CHttpRequest & req,
659 shared_ptr<CPSGS_Reply> reply)
660 {
661 auto now = chrono::high_resolution_clock::now();
662 CRequestContextResetter context_resetter;
663 CRef<CRequestContext> context = x_CreateRequestContext(req);
664
665 if (x_IsShuttingDown(reply)) {
666 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
667 return 0;
668 }
669
670 int hops;
671 if (!x_GetHops(req, reply, hops)) {
672 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
673 return 0;
674 }
675
676 try {
677 CTempString seq_id;
678 int seq_id_type;
679 SPSGS_RequestBase::EPSGS_CacheAndDbUse use_cache = SPSGS_RequestBase::ePSGS_CacheAndDb;
680
681 if (!x_ProcessCommonGetAndResolveParams(req, reply, seq_id,
682 seq_id_type, use_cache)) {
683 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
684 return 0;
685 }
686
687 // At the moment the only json format is supported (native translates
688 // to json). See CXX-10258
689 string err_msg;
690 SRequestParameter fmt_param = x_GetParam(req, kFmtParam);
691 if (fmt_param.m_Found) {
692 auto output_format = x_GetOutputFormat(kFmtParam,
693 fmt_param.m_Value,
694 err_msg);
695 if (output_format == SPSGS_ResolveRequest::ePSGS_ProtobufFormat ||
696 output_format == SPSGS_ResolveRequest::ePSGS_UnknownFormat) {
697 x_MalformedArguments(
698 reply, context,
699 "Invalid '" + kFmtParam + "' parameter value. The 'get_na' "
700 "request supports 'json' and 'native' values");
701 return 0;
702 }
703 }
704
705
706 // Get the annotation names
707 SRequestParameter names_param = x_GetParam(req, kNamesParam);
708 if (!names_param.m_Found) {
709 x_MalformedArguments(reply, context,
710 "The mandatory '" + kNamesParam +
711 "' parameter is not found");
712 return 0;
713 }
714
715 vector<string> names;
716 NStr::Split(names_param.m_Value, ",", names);
717 if (names.empty()) {
718 x_MalformedArguments(reply, context,
719 "Named annotation names are not found in the request");
720 return 0;
721 }
722
723 SPSGS_RequestBase::EPSGS_Trace trace = SPSGS_RequestBase::ePSGS_NoTracing;
724 if (!x_GetTraceParameter(req, kTraceParam, trace, err_msg)) {
725 x_SendMessageAndCompletionChunks(reply, err_msg,
726 CRequestStatus::e400_BadRequest,
727 ePSGS_MalformedParameter,
728 eDiag_Error);
729 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
730 return 0;
731 }
732
733 vector<string> enabled_processors;
734 vector<string> disabled_processors;
735 if (!x_GetEnabledAndDisabledProcessors(req, reply, enabled_processors,
736 disabled_processors))
737 return 0;
738
739 SPSGS_BlobRequestBase::EPSGS_TSEOption
740 tse_option = SPSGS_BlobRequestBase::ePSGS_NoneTSE;
741 SRequestParameter tse_param = x_GetParam(req, kTSEParam);
742 if (tse_param.m_Found) {
743 tse_option = x_GetTSEOption(kTSEParam, tse_param.m_Value, err_msg);
744 if (tse_option == SPSGS_BlobRequestBase::ePSGS_UnknownTSE) {
745 x_MalformedArguments(reply, context, err_msg);
746 return 0;
747 }
748 }
749
750 // Parameters processing has finished
751 m_Counters.Increment(CPSGSCounters::ePSGS_GetNamedAnnotations);
752 unique_ptr<SPSGS_RequestBase>
753 req(new SPSGS_AnnotRequest(
754 string(seq_id.data(), seq_id.size()),
755 seq_id_type, names, use_cache, tse_option,
756 hops, trace,
757 enabled_processors, disabled_processors,
758 now));
759 shared_ptr<CPSGS_Request>
760 request(new CPSGS_Request(move(req), context));
761
762 x_DispatchRequest(request, reply);
763 } catch (const exception & exc) {
764 string msg = "Exception when handling a get_na request: " +
765 string(exc.what());
766 x_SendMessageAndCompletionChunks(reply, msg,
767 CRequestStatus::e500_InternalServerError,
768 ePSGS_UnknownError, eDiag_Error);
769 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
770 } catch (...) {
771 string msg = "Unknown exception when handling a get_na request";
772 x_SendMessageAndCompletionChunks(reply, msg,
773 CRequestStatus::e500_InternalServerError,
774 ePSGS_UnknownError, eDiag_Error);
775 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
776 }
777 return 0;
778 }
779
780
OnHealth(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)781 int CPubseqGatewayApp::OnHealth(CHttpRequest & req,
782 shared_ptr<CPSGS_Reply> reply)
783 {
784 static string separator = "==============================================";
785 static string prefix = "PSG_HEALTH_ERROR: ";
786
787 m_Counters.Increment(CPSGSCounters::ePSGS_HealthRequest);
788
789 auto now = chrono::high_resolution_clock::now();
790 CRequestContextResetter context_resetter;
791 CRef<CRequestContext> context = x_CreateRequestContext(req);
792
793 auto startup_data_state = GetStartupDataState();
794 if (startup_data_state != ePSGS_StartupDataOK) {
795 // Something is wrong with access to Cassandra
796 // Check if there are any active alerts
797 auto active_alerts = m_Alerts.SerializeActive();
798 string msg = separator + "\n" +
799 prefix + "CASSANDRA" "\n" +
800 GetCassStartupDataStateMessage(startup_data_state) + "\n" +
801 separator + "\n" +
802 prefix + "ALERTS" "\n";
803 if (active_alerts.GetSize() == 0) {
804 msg += "There are no active alerts";
805 } else {
806 msg += "Active alerts are:" "\n" +
807 active_alerts.Repr(CJsonNode::fStandardJson);
808 }
809
810 reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
811 reply->GetHttpReply()->Send500(msg.c_str());
812 PSG_WARNING("Cassandra is not available or is in non-working state");
813 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
814 return 0;
815 }
816
817 if (m_TestSeqId.empty()) {
818 // seq_id for a health test is not configured so skip the test
819 auto http_reply = reply->GetHttpReply();
820 http_reply->SetContentType(ePSGS_PlainTextMime);
821 http_reply->SetContentLength(0);
822 http_reply->SendOk(nullptr, 0, false);
823 PSG_WARNING("Test seq_id resolution skipped (configured as an empty string)");
824 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
825 return 0;
826 }
827
828 if (m_Si2csiDbFile.empty() || m_BioseqInfoDbFile.empty()) {
829 // Cache is not configured so skip the test
830 auto http_reply = reply->GetHttpReply();
831 http_reply->SetContentType(ePSGS_PlainTextMime);
832 http_reply->SetContentLength(0);
833 http_reply->SendOk(nullptr, 0, false);
834 PSG_WARNING("Test seq_id resolution skipped (cache is not configured)");
835 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
836 return 0;
837 }
838
839 // If cache is configured then we can give it a try to resolve the
840 // configured seq_id in cache
841 try {
842 vector<string> enabled_processors;
843 vector<string> disabled_processors;
844
845 unique_ptr<SPSGS_RequestBase>
846 req(new SPSGS_ResolveRequest(m_TestSeqId, -1,
847 SPSGS_ResolveRequest::fPSGS_CanonicalId,
848 SPSGS_ResolveRequest::ePSGS_JsonFormat,
849 SPSGS_RequestBase::ePSGS_CacheOnly,
850 SPSGS_RequestBase::ePSGS_NeverAccSubstitute,
851 0, SPSGS_RequestBase::ePSGS_NoTracing,
852 enabled_processors, disabled_processors,
853 now));
854 shared_ptr<CPSGS_Request>
855 request(new CPSGS_Request(move(req), CRef<CRequestContext>()));
856
857
858 CPSGS_ResolveProcessor resolve_processor(request, reply, 0);
859 auto resolution = resolve_processor.ResolveTestInputSeqId();
860
861 if (!resolution.IsValid()) {
862 if (!m_TestSeqIdIgnoreError) {
863 string msg = separator + "\n" +
864 prefix + "RESOLUTION" "\n";
865 if (resolution.m_Error.HasError()) {
866 msg += resolution.m_Error.m_ErrorMessage;
867 } else {
868 msg += "Cannot resolve '" + m_TestSeqId + "' seq_id";
869 }
870 reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
871 reply->GetHttpReply()->Send500(msg.c_str());
872 PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId + "'");
873 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
874 return 0;
875 }
876 PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId +
877 "', however the configuration is to ignore test errors");
878 }
879 } catch (const exception & exc) {
880 if (!m_TestSeqIdIgnoreError) {
881 string msg = separator + "\n" +
882 prefix + "RESOLUTION" "\n" +
883 exc.what();
884 reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
885 reply->GetHttpReply()->Send500(msg.c_str());
886 PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId + "'");
887 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
888 return 0;
889 }
890 PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId +
891 "', however the configuration is to ignore test errors");
892 } catch (...) {
893 if (!m_TestSeqIdIgnoreError) {
894 string msg = separator + "\n" +
895 prefix + "RESOLUTION" "\n"
896 "Unknown '" + m_TestSeqId + "' resolution error";
897 reply->GetHttpReply()->SetContentType(ePSGS_PlainTextMime);
898 reply->GetHttpReply()->Send500(msg.c_str());
899 PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId + "'");
900 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
901 return 0;
902 }
903 PSG_WARNING("Cannot resolve test seq_id '" + m_TestSeqId +
904 "', however the configuration is to ignore test errors");
905 }
906
907 // Here: all OK or errors are ignored
908 auto http_reply = reply->GetHttpReply();
909 http_reply->SetContentType(ePSGS_PlainTextMime);
910 http_reply->SetContentLength(0);
911 http_reply->SendOk(nullptr, 0, false);
912 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
913 return 0;
914 }
915
916
OnConfig(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)917 int CPubseqGatewayApp::OnConfig(CHttpRequest & req,
918 shared_ptr<CPSGS_Reply> reply)
919 {
920 static string kConfigurationFilePath = "ConfigurationFilePath";
921 static string kConfiguration = "Configuration";
922
923 // NOTE: expected to work regardless of the shutdown request
924
925 CRequestContextResetter context_resetter;
926 CRef<CRequestContext> context = x_CreateRequestContext(req);
927
928 auto http_reply = reply->GetHttpReply();
929 try {
930 m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
931
932 CNcbiOstrstream conf;
933 CNcbiOstrstreamToString converter(conf);
934
935 GetConfig().Write(conf);
936
937 CJsonNode conf_info(CJsonNode::NewObjectNode());
938 conf_info.SetString(kConfigurationFilePath, GetConfigPath());
939 conf_info.SetString(kConfiguration, string(converter));
940 string content = conf_info.Repr(CJsonNode::fStandardJson);
941
942 http_reply->SetContentType(ePSGS_JsonMime);
943 http_reply->SetContentLength(content.length());
944 http_reply->SendOk(content.c_str(), content.length(), false);
945
946 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
947 } catch (const exception & exc) {
948 string msg = "Exception when handling a config request: " +
949 string(exc.what());
950 x_SendMessageAndCompletionChunks(reply, msg,
951 CRequestStatus::e500_InternalServerError,
952 ePSGS_ConfigError, eDiag_Error);
953 PSG_ERROR(msg);
954 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
955 } catch (...) {
956 string msg = "Unknown exception when handling a config request";
957 x_SendMessageAndCompletionChunks(reply, msg,
958 CRequestStatus::e500_InternalServerError,
959 ePSGS_ConfigError, eDiag_Error);
960 PSG_ERROR(msg);
961 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
962 }
963 return 0;
964 }
965
966
OnInfo(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)967 int CPubseqGatewayApp::OnInfo(CHttpRequest & req,
968 shared_ptr<CPSGS_Reply> reply)
969 {
970 static string kPID = "PID";
971 static string kExecutablePath = "ExecutablePath";
972 static string kCommandLineArguments = "CommandLineArguments";
973 static string kStartupDataState = "StartupDataState";
974 static string kRealTime = "RealTime";
975 static string kUserTime = "UserTime";
976 static string kSystemTime = "SystemTime";
977 static string kPhysicalMemory = "PhysicalMemory";
978 static string kMemoryUsedTotal = "MemoryUsedTotal";
979 static string kMemoryUsedTotalPeak = "MemoryUsedTotalPeak";
980 static string kMemoryUsedResident = "MemoryUsedResident";
981 static string kMemoryUsedResidentPeak = "MemoryUsedResidentPeak";
982 static string kMemoryUsedShared = "MemoryUsedShared";
983 static string kMemoryUsedData = "MemoryUsedData";
984 static string kMemoryUsedStack = "MemoryUsedStack";
985 static string kMemoryUsedText = "MemoryUsedText";
986 static string kMemoryUsedLib = "MemoryUsedLib";
987 static string kMemoryUsedSwap = "MemoryUsedSwap";
988 static string kProcFDSoftLimit = "ProcFDSoftLimit";
989 static string kProcFDHardLimit = "ProcFDHardLimit";
990 static string kProcFDUsed = "ProcFDUsed";
991 static string kCPUCount = "CPUCount";
992 static string kProcThreadCount = "ProcThreadCount";
993 static string kVersion = "Version";
994 static string kBuildDate = "BuildDate";
995 static string kStartedAt = "StartedAt";
996 static string kExcludeBlobCacheUserCount = "ExcludeBlobCacheUserCount";
997
998 // NOTE: expected to work regardless of the shutdown request
999
1000 CRequestContextResetter context_resetter;
1001 CRef<CRequestContext> context = x_CreateRequestContext(req);
1002
1003 auto app = CPubseqGatewayApp::GetInstance();
1004 auto http_reply = reply->GetHttpReply();
1005 try {
1006 m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1007
1008 CJsonNode info(CJsonNode::NewObjectNode());
1009
1010 info.SetInteger(kPID, CDiagContext::GetPID());
1011 info.SetString(kExecutablePath, GetProgramExecutablePath());
1012 info.SetString(kCommandLineArguments, x_GetCmdLineArguments());
1013 info.SetString(kStartupDataState,
1014 GetCassStartupDataStateMessage(app->GetStartupDataState()));
1015
1016
1017 double real_time;
1018 double user_time;
1019 double system_time;
1020 bool process_time_result = CCurrentProcess::GetTimes(&real_time,
1021 &user_time,
1022 &system_time);
1023 if (process_time_result) {
1024 info.SetDouble(kRealTime, real_time);
1025 info.SetDouble(kUserTime, user_time);
1026 info.SetDouble(kSystemTime, system_time);
1027 } else {
1028 info.SetString(kRealTime, kNA);
1029 info.SetString(kUserTime, kNA);
1030 info.SetString(kSystemTime, kNA);
1031 }
1032
1033 Uint8 physical_memory = CSystemInfo::GetTotalPhysicalMemorySize();
1034 if (physical_memory > 0)
1035 info.SetInteger(kPhysicalMemory, physical_memory);
1036 else
1037 info.SetString(kPhysicalMemory, kNA);
1038
1039 CProcessBase::SMemoryUsage mem_usage;
1040 bool mem_used_result = CCurrentProcess::GetMemoryUsage(mem_usage);
1041 if (mem_used_result) {
1042 if (mem_usage.total > 0)
1043 info.SetInteger(kMemoryUsedTotal, mem_usage.total);
1044 else
1045 info.SetString(kMemoryUsedTotal, kNA);
1046
1047 if (mem_usage.total_peak > 0)
1048 info.SetInteger(kMemoryUsedTotalPeak, mem_usage.total_peak);
1049 else
1050 info.SetString(kMemoryUsedTotalPeak, kNA);
1051
1052 if (mem_usage.resident > 0)
1053 info.SetInteger(kMemoryUsedResident, mem_usage.resident);
1054 else
1055 info.SetString(kMemoryUsedResident, kNA);
1056
1057 if (mem_usage.resident_peak > 0)
1058 info.SetInteger(kMemoryUsedResidentPeak, mem_usage.resident_peak);
1059 else
1060 info.SetString(kMemoryUsedResidentPeak, kNA);
1061
1062 if (mem_usage.shared > 0)
1063 info.SetInteger(kMemoryUsedShared, mem_usage.shared);
1064 else
1065 info.SetString(kMemoryUsedShared, kNA);
1066
1067 if (mem_usage.data > 0)
1068 info.SetInteger(kMemoryUsedData, mem_usage.data);
1069 else
1070 info.SetString(kMemoryUsedData, kNA);
1071
1072 if (mem_usage.stack > 0)
1073 info.SetInteger(kMemoryUsedStack, mem_usage.stack);
1074 else
1075 info.SetString(kMemoryUsedStack, kNA);
1076
1077 if (mem_usage.text > 0)
1078 info.SetInteger(kMemoryUsedText, mem_usage.text);
1079 else
1080 info.SetString(kMemoryUsedText, kNA);
1081
1082 if (mem_usage.lib > 0)
1083 info.SetInteger(kMemoryUsedLib, mem_usage.lib);
1084 else
1085 info.SetString(kMemoryUsedLib, kNA);
1086
1087 if (mem_usage.swap > 0)
1088 info.SetInteger(kMemoryUsedSwap, mem_usage.swap);
1089 else
1090 info.SetString(kMemoryUsedSwap, kNA);
1091 } else {
1092 info.SetString(kMemoryUsedTotal, kNA);
1093 info.SetString(kMemoryUsedTotalPeak, kNA);
1094 info.SetString(kMemoryUsedResident, kNA);
1095 info.SetString(kMemoryUsedResidentPeak, kNA);
1096 info.SetString(kMemoryUsedShared, kNA);
1097 info.SetString(kMemoryUsedData, kNA);
1098 info.SetString(kMemoryUsedStack, kNA);
1099 info.SetString(kMemoryUsedText, kNA);
1100 info.SetString(kMemoryUsedLib, kNA);
1101 info.SetString(kMemoryUsedSwap, kNA);
1102 }
1103
1104 int proc_fd_soft_limit;
1105 int proc_fd_hard_limit;
1106 int proc_fd_used =
1107 CCurrentProcess::GetFileDescriptorsCount(&proc_fd_soft_limit,
1108 &proc_fd_hard_limit);
1109
1110 if (proc_fd_soft_limit >= 0)
1111 info.SetInteger(kProcFDSoftLimit, proc_fd_soft_limit);
1112 else
1113 info.SetString(kProcFDSoftLimit, kNA);
1114
1115 if (proc_fd_hard_limit >= 0)
1116 info.SetInteger(kProcFDHardLimit, proc_fd_hard_limit);
1117 else
1118 info.SetString(kProcFDHardLimit, kNA);
1119
1120 if (proc_fd_used >= 0)
1121 info.SetInteger(kProcFDUsed, proc_fd_used);
1122 else
1123 info.SetString(kProcFDUsed, kNA);
1124
1125 info.SetInteger(kCPUCount, CSystemInfo::GetCpuCount());
1126
1127 int proc_thread_count = CCurrentProcess::GetThreadCount();
1128 if (proc_thread_count >= 1)
1129 info.SetInteger(kProcThreadCount, proc_thread_count);
1130 else
1131 info.SetString(kProcThreadCount, kNA);
1132
1133
1134 info.SetString(kVersion, PUBSEQ_GATEWAY_VERSION);
1135 info.SetString(kBuildDate, PUBSEQ_GATEWAY_BUILD_DATE);
1136 info.SetString(kStartedAt, m_StartTime.AsString());
1137
1138 info.SetInteger(kExcludeBlobCacheUserCount,
1139 app->GetExcludeBlobCache()->Size());
1140
1141 string content = info.Repr(CJsonNode::fStandardJson);
1142
1143 http_reply->SetContentType(ePSGS_JsonMime);
1144 http_reply->SetContentLength(content.length());
1145 http_reply->SendOk(content.c_str(), content.length(), false);
1146
1147 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1148 } catch (const exception & exc) {
1149 string msg = "Exception when handling an info request: " +
1150 string(exc.what());
1151 x_SendMessageAndCompletionChunks(reply, msg,
1152 CRequestStatus::e500_InternalServerError,
1153 ePSGS_InfoError, eDiag_Error);
1154 PSG_ERROR(msg);
1155 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1156 } catch (...) {
1157 string msg = "Unknown exception when handling an info request";
1158 x_SendMessageAndCompletionChunks(reply, msg,
1159 CRequestStatus::e500_InternalServerError,
1160 ePSGS_InfoError, eDiag_Error);
1161 PSG_ERROR(msg);
1162 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1163 }
1164 return 0;
1165 }
1166
1167
OnStatus(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1168 int CPubseqGatewayApp::OnStatus(CHttpRequest & req,
1169 shared_ptr<CPSGS_Reply> reply)
1170 {
1171 // NOTE: expected to work regardless of the shutdown request
1172
1173 CRequestContextResetter context_resetter;
1174 CRef<CRequestContext> context = x_CreateRequestContext(req);
1175
1176 auto http_reply = reply->GetHttpReply();
1177 try {
1178 m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1179
1180 CJsonNode status(CJsonNode::NewObjectNode());
1181
1182 m_Counters.AppendValueNode(
1183 status, CPSGSCounters::ePSGS_CassandraActiveStatements,
1184 static_cast<uint64_t>(m_CassConnection->GetActiveStatements()));
1185 m_Counters.AppendValueNode(
1186 status, CPSGSCounters::ePSGS_NumberOfConnections,
1187 static_cast<uint64_t>(m_TcpDaemon->NumOfConnections()));
1188 m_Counters.AppendValueNode(
1189 status, CPSGSCounters::ePSGS_ActiveRequest,
1190 static_cast<uint64_t>(g_ShutdownData.m_ActiveRequestCount.load()));
1191 m_Counters.AppendValueNode(
1192 status, CPSGSCounters::ePSGS_ShutdownRequested,
1193 g_ShutdownData.m_ShutdownRequested);
1194
1195 if (g_ShutdownData.m_ShutdownRequested) {
1196 auto now = chrono::steady_clock::now();
1197 uint64_t sec = std::chrono::duration_cast<std::chrono::seconds>
1198 (g_ShutdownData.m_Expired - now).count();
1199 m_Counters.AppendValueNode(
1200 status, CPSGSCounters::ePSGS_GracefulShutdownExpiredInSec, sec);
1201 } else {
1202 m_Counters.AppendValueNode(
1203 status, CPSGSCounters::ePSGS_GracefulShutdownExpiredInSec, kNA);
1204 }
1205
1206 m_Counters.PopulateDictionary(status);
1207
1208 string content = status.Repr(CJsonNode::fStandardJson);
1209
1210 http_reply->SetContentType(ePSGS_JsonMime);
1211 http_reply->SetContentLength(content.length());
1212 http_reply->SendOk(content.c_str(), content.length(), false);
1213
1214 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1215 } catch (const exception & exc) {
1216 string msg = "Exception when handling a status request: " +
1217 string(exc.what());
1218 x_SendMessageAndCompletionChunks(reply, msg,
1219 CRequestStatus::e500_InternalServerError,
1220 ePSGS_StatusError, eDiag_Error);
1221 PSG_ERROR(msg);
1222 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1223 } catch (...) {
1224 string msg = "Unknown exception when handling a status request";
1225 x_SendMessageAndCompletionChunks(reply, msg,
1226 CRequestStatus::e500_InternalServerError,
1227 ePSGS_StatusError, eDiag_Error);
1228 PSG_ERROR(msg);
1229 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1230 }
1231 return 0;
1232 }
1233
1234
OnShutdown(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1235 int CPubseqGatewayApp::OnShutdown(CHttpRequest & req,
1236 shared_ptr<CPSGS_Reply> reply)
1237 {
1238 // NOTE: expected to work regardless of the shutdown request
1239
1240 static const char * s_ImmediateShutdown = "Immediate shutdown request accepted";
1241 static const char * s_GracefulShutdown = "Graceful shutdown request accepted";
1242 static size_t s_ImmediateShutdownSize = strlen(s_ImmediateShutdown);
1243 static size_t s_GracefulShutdownSize = strlen(s_GracefulShutdown);
1244
1245 CRequestContextResetter context_resetter;
1246 CRef<CRequestContext> context = x_CreateRequestContext(req);
1247
1248 try {
1249 string msg;
1250 string username;
1251 SRequestParameter username_param = x_GetParam(req, kUsernameParam);
1252 if (username_param.m_Found) {
1253 username = string(username_param.m_Value.data(),
1254 username_param.m_Value.size());
1255 }
1256
1257 if (!m_AuthToken.empty()) {
1258 SRequestParameter auth_token_param = x_GetParam(req, kAuthTokenParam);
1259
1260 bool auth_good = false;
1261 if (auth_token_param.m_Found) {
1262 auth_good = m_AuthToken == string(auth_token_param.m_Value.data(),
1263 auth_token_param.m_Value.size());
1264 }
1265
1266 if (!auth_good) {
1267 msg = "Unauthorized shutdown request: invalid authorization token. ";
1268 if (username.empty())
1269 msg += "Unknown user";
1270 else
1271 msg += "User: " + username;
1272 PSG_MESSAGE(msg);
1273
1274 x_SendMessageAndCompletionChunks(
1275 reply, "Invalid authorization token",
1276 CRequestStatus::e401_Unauthorized,
1277 ePSGS_Unauthorised, eDiag_Error);
1278 x_PrintRequestStop(context, CRequestStatus::e401_Unauthorized);
1279 return 0;
1280 }
1281 }
1282
1283 int timeout = 10; // Default: 10 sec
1284 SRequestParameter timeout_param = x_GetParam(req, kTimeoutParam);
1285 if (timeout_param.m_Found) {
1286 try {
1287 timeout = stoi(string(timeout_param.m_Value.data(),
1288 timeout_param.m_Value.size()));
1289 } catch (...) {
1290 msg = "Invalid shutdown request: cannot convert timeout to an integer. ";
1291 if (username.empty())
1292 msg += "Unknown user";
1293 else
1294 msg += "User: " + username;
1295 PSG_MESSAGE(msg);
1296
1297 x_SendMessageAndCompletionChunks(
1298 reply, "Invalid timeout (must be a positive integer)",
1299 CRequestStatus::e400_BadRequest,
1300 ePSGS_MalformedParameter, eDiag_Error);
1301 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1302 return 0;
1303 }
1304
1305 if (timeout < 0) {
1306 msg = "Invalid shutdown request: timeout must be >= 0. ";
1307 if (username.empty())
1308 msg += "Unknown user";
1309 else
1310 msg += "User: " + username;
1311 PSG_MESSAGE(msg);
1312
1313 x_SendMessageAndCompletionChunks(
1314 reply, "Invalid timeout (must be a positive integer)",
1315 CRequestStatus::e400_BadRequest,
1316 ePSGS_MalformedParameter, eDiag_Error);
1317 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1318 return 0;
1319 }
1320 }
1321
1322 auto http_reply = reply->GetHttpReply();
1323 http_reply->SetContentType(ePSGS_PlainTextMime);
1324 if (timeout == 0) {
1325 // Immediate shutdown is requested
1326 msg = "Immediate shutdown request received from ";
1327 if (username.empty())
1328 msg += "an unknown user";
1329 else
1330 msg += "user " + username;
1331 PSG_MESSAGE(msg);
1332
1333 http_reply->Send202(s_ImmediateShutdown, s_ImmediateShutdownSize);
1334 x_PrintRequestStop(context, CRequestStatus::e202_Accepted);
1335 exit(0);
1336 }
1337
1338 msg = "Graceful shutdown request received from ";
1339 if (username.empty())
1340 msg += "an unknown user";
1341 else
1342 msg += "user " + username;
1343
1344 auto now = chrono::steady_clock::now();
1345 auto expiration = now + chrono::seconds(timeout);
1346 if (g_ShutdownData.m_ShutdownRequested) {
1347 // Consequest shutdown request
1348 if (expiration >= g_ShutdownData.m_Expired) {
1349 msg += ". The previous shutdown expiration is shorter "
1350 "than this one. Ignored.";
1351 PSG_MESSAGE(msg);
1352 http_reply->Send409(msg.c_str());
1353 x_PrintRequestStop(context, CRequestStatus::e409_Conflict);
1354 return 0;
1355 }
1356 }
1357
1358 // New shutdown request or a shorter expiration request
1359 PSG_MESSAGE(msg);
1360
1361 http_reply->Send202(s_GracefulShutdown, s_GracefulShutdownSize);
1362 x_PrintRequestStop(context, CRequestStatus::e202_Accepted);
1363
1364 g_ShutdownData.m_Expired = expiration;
1365 g_ShutdownData.m_ShutdownRequested = true;
1366 } catch (const exception & exc) {
1367 string msg = "Exception when handling a shutdown request: " +
1368 string(exc.what());
1369 x_SendMessageAndCompletionChunks(reply, msg,
1370 CRequestStatus::e500_InternalServerError,
1371 ePSGS_ShutdownError, eDiag_Error);
1372 PSG_ERROR(msg);
1373 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1374 } catch (...) {
1375 string msg = "Unknown exception when handling a shutdown request";
1376 x_SendMessageAndCompletionChunks(reply, msg,
1377 CRequestStatus::e500_InternalServerError,
1378 ePSGS_ShutdownError, eDiag_Error);
1379 PSG_ERROR(msg);
1380 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1381 }
1382 return 0;
1383 }
1384
1385
OnGetAlerts(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1386 int CPubseqGatewayApp::OnGetAlerts(CHttpRequest & req,
1387 shared_ptr<CPSGS_Reply> reply)
1388 {
1389 // NOTE: expected to work regardless of the shutdown request
1390
1391 CRequestContextResetter context_resetter;
1392 CRef<CRequestContext> context = x_CreateRequestContext(req);
1393
1394 auto http_reply = reply->GetHttpReply();
1395 try {
1396 m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1397
1398 string content = m_Alerts.Serialize().Repr(CJsonNode::fStandardJson);
1399
1400 http_reply->SetContentType(ePSGS_JsonMime);
1401 http_reply->SetContentLength(content.length());
1402 http_reply->SendOk(content.c_str(), content.length(), false);
1403
1404 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1405 } catch (const exception & exc) {
1406 string msg = "Exception when handling a get alerts request: " +
1407 string(exc.what());
1408 x_SendMessageAndCompletionChunks(reply, msg,
1409 CRequestStatus::e500_InternalServerError,
1410 ePSGS_GetAlertsError, eDiag_Error);
1411 PSG_ERROR(msg);
1412 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1413 } catch (...) {
1414 string msg = "Unknown exception when handling a get alerts request";
1415 x_SendMessageAndCompletionChunks(reply, msg,
1416 CRequestStatus::e500_InternalServerError,
1417 ePSGS_GetAlertsError, eDiag_Error);
1418 PSG_ERROR(msg);
1419 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1420 }
1421 return 0;
1422 }
1423
1424
OnAckAlert(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1425 int CPubseqGatewayApp::OnAckAlert(CHttpRequest & req,
1426 shared_ptr<CPSGS_Reply> reply)
1427 {
1428 CRequestContextResetter context_resetter;
1429 CRef<CRequestContext> context = x_CreateRequestContext(req);
1430
1431 if (x_IsShuttingDown(reply)) {
1432 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
1433 return 0;
1434 }
1435
1436 string msg;
1437
1438 try {
1439 SRequestParameter alert_param = x_GetParam(req, kAlertParam);
1440 if (!alert_param.m_Found) {
1441 msg = "Missing " + kAlertParam + " parameter";
1442 x_SendMessageAndCompletionChunks(
1443 reply, msg, CRequestStatus::e400_BadRequest,
1444 ePSGS_InsufficientArguments, eDiag_Error);
1445 PSG_ERROR(msg);
1446 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1447 return 0;
1448 }
1449
1450 SRequestParameter username_param = x_GetParam(req, kUsernameParam);
1451 if (!username_param.m_Found) {
1452 msg = "Missing " + kUsernameParam + " parameter";
1453 x_SendMessageAndCompletionChunks(
1454 reply, msg, CRequestStatus::e400_BadRequest,
1455 ePSGS_InsufficientArguments, eDiag_Error);
1456 PSG_ERROR(msg);
1457 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1458 return 0;
1459 }
1460
1461 string alert(alert_param.m_Value.data(), alert_param.m_Value.size());
1462 string username(username_param.m_Value.data(), username_param.m_Value.size());
1463 auto http_reply = reply->GetHttpReply();
1464
1465 switch (m_Alerts.Acknowledge(alert, username)) {
1466 case ePSGS_AlertNotFound:
1467 msg = "Alert " + alert + " is not found";
1468 x_SendMessageAndCompletionChunks(
1469 reply, msg,
1470 CRequestStatus::e404_NotFound,
1471 ePSGS_MalformedParameter, eDiag_Error);
1472 PSG_ERROR(msg);
1473 x_PrintRequestStop(context, CRequestStatus::e404_NotFound);
1474 break;
1475 case ePSGS_AlertAlreadyAcknowledged:
1476 http_reply->SetContentType(ePSGS_PlainTextMime);
1477 msg = "Alert " + alert + " has already been acknowledged";
1478 http_reply->SendOk(msg.c_str(), msg.size(), false);
1479 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1480 break;
1481 case ePSGS_AlertAcknowledged:
1482 http_reply->SetContentType(ePSGS_PlainTextMime);
1483 http_reply->SendOk(nullptr, 0, true);
1484 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1485 break;
1486 }
1487 } catch (const exception & exc) {
1488 string msg = "Exception when handling an acknowledge alert request: " +
1489 string(exc.what());
1490 x_SendMessageAndCompletionChunks(reply, msg,
1491 CRequestStatus::e500_InternalServerError,
1492 ePSGS_AckAlertError,
1493 eDiag_Error);
1494 PSG_ERROR(msg);
1495 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1496 } catch (...) {
1497 string msg = "Unknown exception when handling an acknowledge alert request";
1498 x_SendMessageAndCompletionChunks(reply, msg,
1499 CRequestStatus::e500_InternalServerError,
1500 ePSGS_AckAlertError,
1501 eDiag_Error);
1502 PSG_ERROR(msg);
1503 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1504 }
1505 return 0;
1506 }
1507
OnStatistics(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1508 int CPubseqGatewayApp::OnStatistics(CHttpRequest & req,
1509 shared_ptr<CPSGS_Reply> reply)
1510 {
1511 CRequestContextResetter context_resetter;
1512 CRef<CRequestContext> context = x_CreateRequestContext(req);
1513 auto http_reply = reply->GetHttpReply();
1514
1515 if (x_IsShuttingDown(reply)) {
1516 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
1517 return 0;
1518 }
1519
1520 try {
1521 m_Counters.Increment(CPSGSCounters::ePSGS_AdminRequest);
1522
1523 // /ADMIN/statistics[?reset=yes(dflt=no)][&most_recent_time=<time>][&most_ancient_time=<time>][histogram_names=name1,name2,...]
1524 // /ADMIN/statistics[?reset=yes(dflt=no)][&most_recent_time=<time>][&most_ancient_time=<time>]
1525
1526
1527 bool reset = false;
1528 SRequestParameter reset_param = x_GetParam(req, kResetParam);
1529 if (reset_param.m_Found) {
1530 string err_msg;
1531 if (!x_IsBoolParamValid(kResetParam, reset_param.m_Value, err_msg)) {
1532 x_SendMessageAndCompletionChunks(
1533 reply, err_msg, CRequestStatus::e400_BadRequest,
1534 ePSGS_MalformedParameter, eDiag_Error);
1535 PSG_ERROR(err_msg);
1536 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1537 return 0;
1538 }
1539 reset = reset_param.m_Value == "yes";
1540 }
1541
1542 if (reset) {
1543 m_Timing->Reset();
1544 http_reply->SetContentType(ePSGS_PlainTextMime);
1545 http_reply->SendOk(nullptr, 0, true);
1546 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1547 return 0;
1548 }
1549
1550 int most_recent_time = INT_MIN;
1551 SRequestParameter most_recent_time_param = x_GetParam(req, kMostRecentTimeParam);
1552 if (most_recent_time_param.m_Found) {
1553 string err_msg;
1554 try {
1555 most_recent_time = NStr::StringToInt8(most_recent_time_param.m_Value);
1556 if (most_recent_time < 0)
1557 err_msg = "Invalid " + kMostRecentTimeParam +
1558 " value (" + most_recent_time_param.m_Value +
1559 "). It must be >= 0.";
1560 } catch (...) {
1561 err_msg = "Invalid " + kMostRecentTimeParam +
1562 " value (" + most_recent_time_param.m_Value +
1563 "). It must be an integer >= 0.";
1564 }
1565
1566 if (!err_msg.empty()) {
1567 x_SendMessageAndCompletionChunks(
1568 reply, err_msg, CRequestStatus::e400_BadRequest,
1569 ePSGS_MalformedParameter, eDiag_Error);
1570 PSG_ERROR(err_msg);
1571 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1572 return 0;
1573 }
1574 }
1575
1576 int most_ancient_time = INT_MIN;
1577 SRequestParameter most_ancient_time_param = x_GetParam(req, kMostAncientTimeParam);
1578 if (most_ancient_time_param.m_Found) {
1579 string err_msg;
1580 try {
1581 most_ancient_time = NStr::StringToInt8(most_ancient_time_param.m_Value);
1582 if (most_ancient_time < 0)
1583 err_msg = "Invalid " + kMostAncientTimeParam +
1584 " value (" + most_ancient_time_param.m_Value +
1585 "). It must be >= 0.";
1586 } catch (...) {
1587 err_msg = "Invalid " + kMostAncientTimeParam +
1588 " value (" + most_ancient_time_param.m_Value +
1589 "). It must be an integer >= 0.";
1590 }
1591
1592 if (!err_msg.empty()) {
1593 x_SendMessageAndCompletionChunks(
1594 reply, err_msg, CRequestStatus::e400_BadRequest,
1595 ePSGS_MalformedParameter, eDiag_Error);
1596 PSG_ERROR(err_msg);
1597 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1598 return 0;
1599 }
1600 }
1601
1602 if (most_ancient_time >= 0 && most_recent_time >= 0) {
1603 // auto reorder the time if needed
1604 if (most_recent_time > most_ancient_time) {
1605 swap(most_recent_time, most_ancient_time);
1606 }
1607 }
1608
1609 vector<CTempString> histogram_names;
1610 SRequestParameter histogram_names_param = x_GetParam(req, kHistogramNamesParam);
1611 if (histogram_names_param.m_Found) {
1612 NStr::Split(histogram_names_param.m_Value, ",", histogram_names);
1613 }
1614
1615
1616 CJsonNode timing(m_Timing->Serialize(most_ancient_time,
1617 most_recent_time,
1618 histogram_names,
1619 m_TickSpan));
1620 string content = timing.Repr(CJsonNode::fStandardJson);
1621
1622 http_reply->SetContentType(ePSGS_JsonMime);
1623 http_reply->SetContentLength(content.length());
1624 http_reply->SendOk(content.c_str(), content.length(), false);
1625
1626 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1627 } catch (const exception & exc) {
1628 string msg = "Exception when handling a statistics request: " +
1629 string(exc.what());
1630 x_SendMessageAndCompletionChunks(reply, msg,
1631 CRequestStatus::e500_InternalServerError,
1632 ePSGS_StatisticsError,
1633 eDiag_Error);
1634 PSG_ERROR(msg);
1635 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1636 } catch (...) {
1637 string msg = "Unknown exception when handling a statistics request";
1638 x_SendMessageAndCompletionChunks(reply, msg,
1639 CRequestStatus::e500_InternalServerError,
1640 ePSGS_StatisticsError,
1641 eDiag_Error);
1642 PSG_ERROR(msg);
1643 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1644 }
1645 return 0;
1646 }
1647
1648
OnTestIO(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply)1649 int CPubseqGatewayApp::OnTestIO(CHttpRequest & req,
1650 shared_ptr<CPSGS_Reply> reply)
1651 {
1652 bool need_log = false; // default
1653 CRequestContextResetter context_resetter;
1654 CRef<CRequestContext> context;
1655 auto http_reply = reply->GetHttpReply();
1656
1657 if (x_IsShuttingDown(reply)) {
1658 x_PrintRequestStop(context, CRequestStatus::e503_ServiceUnavailable);
1659 return 0;
1660 }
1661
1662 try {
1663 string err_msg;
1664
1665 SRequestParameter log_param = x_GetParam(req, kLogParam);
1666 if (log_param.m_Found) {
1667 if (!x_IsBoolParamValid(kLogParam, log_param.m_Value, err_msg)) {
1668 x_SendMessageAndCompletionChunks(
1669 reply, err_msg, CRequestStatus::e400_BadRequest,
1670 ePSGS_MalformedParameter, eDiag_Error);
1671 PSG_ERROR(err_msg);
1672 return 0;
1673 }
1674 need_log = log_param.m_Value == "yes";
1675 }
1676
1677 if (need_log)
1678 context = x_CreateRequestContext(req);
1679
1680 // Read the return data size
1681 SRequestParameter data_size_param = x_GetParam(req, kDataSizeParam);
1682 long data_size = 0;
1683 if (data_size_param.m_Found) {
1684 data_size = NStr::StringToLong(data_size_param.m_Value);
1685 if (data_size < 0 || data_size > kMaxTestIOSize) {
1686 err_msg = "Invalid range of the '" + kDataSizeParam +
1687 "' parameter. Accepted values are 0..." +
1688 to_string(kMaxTestIOSize);
1689 x_SendMessageAndCompletionChunks(
1690 reply, err_msg, CRequestStatus::e400_BadRequest,
1691 ePSGS_MalformedParameter, eDiag_Error);
1692 if (need_log) {
1693 PSG_WARNING(err_msg);
1694 x_PrintRequestStop(context,
1695 CRequestStatus::e400_BadRequest);
1696 }
1697 return 0;
1698 }
1699 } else {
1700 err_msg = "The '" + kDataSizeParam + "' must be provided";
1701 x_SendMessageAndCompletionChunks(
1702 reply, err_msg, CRequestStatus::e400_BadRequest,
1703 ePSGS_InsufficientArguments, eDiag_Error);
1704 if (need_log) {
1705 PSG_WARNING(err_msg);
1706 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1707 }
1708 return 0;
1709 }
1710
1711 m_Counters.Increment(CPSGSCounters::ePSGS_TestIORequest);
1712
1713 http_reply->SetContentType(ePSGS_BinaryMime);
1714 http_reply->SetContentLength(data_size);
1715
1716 // true: persistent
1717 http_reply->SendOk(m_IOTestBuffer.get(), data_size, true);
1718
1719 if (need_log)
1720 x_PrintRequestStop(context, CRequestStatus::e200_Ok);
1721 } catch (const exception & exc) {
1722 string msg = "Exception when handling a test io request: " +
1723 string(exc.what());
1724 x_SendMessageAndCompletionChunks(reply, msg,
1725 CRequestStatus::e500_InternalServerError,
1726 ePSGS_TestIOError,
1727 eDiag_Error);
1728 PSG_ERROR(msg);
1729 if (need_log)
1730 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1731 } catch (...) {
1732 string msg = "Unknown exception when handling a test io request";
1733 x_SendMessageAndCompletionChunks(reply, msg,
1734 CRequestStatus::e500_InternalServerError,
1735 ePSGS_TestIOError,
1736 eDiag_Error);
1737 PSG_ERROR(msg);
1738 if (need_log)
1739 x_PrintRequestStop(context, CRequestStatus::e500_InternalServerError);
1740 }
1741 return 0;
1742 }
1743
1744
x_ProcessCommonGetAndResolveParams(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply,CTempString & seq_id,int & seq_id_type,SPSGS_RequestBase::EPSGS_CacheAndDbUse & use_cache)1745 bool CPubseqGatewayApp::x_ProcessCommonGetAndResolveParams(
1746 CHttpRequest & req,
1747 shared_ptr<CPSGS_Reply> reply,
1748 CTempString & seq_id,
1749 int & seq_id_type,
1750 SPSGS_RequestBase::EPSGS_CacheAndDbUse & use_cache)
1751 {
1752 SRequestParameter seq_id_type_param;
1753 string err_msg;
1754
1755 // Check the mandatory parameter presence
1756 SRequestParameter seq_id_param = x_GetParam(req, kSeqIdParam);
1757 if (!seq_id_param.m_Found) {
1758 err_msg = "Missing the '" + kSeqIdParam + "' parameter";
1759 m_Counters.Increment(CPSGSCounters::ePSGS_InsufficientArgs);
1760 }
1761 else if (seq_id_param.m_Value.empty()) {
1762 err_msg = "Missing value of the '" + kSeqIdParam + "' parameter";
1763 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1764 }
1765
1766 if (err_msg.empty()) {
1767 use_cache = x_GetUseCacheParameter(req, err_msg);
1768 }
1769
1770 if (!err_msg.empty()) {
1771 x_SendMessageAndCompletionChunks(reply, err_msg,
1772 CRequestStatus::e400_BadRequest,
1773 ePSGS_MissingParameter, eDiag_Error);
1774 PSG_WARNING(err_msg);
1775 return false;
1776 }
1777 seq_id = seq_id_param.m_Value;
1778
1779 seq_id_type_param = x_GetParam(req, kSeqIdTypeParam);
1780 if (seq_id_type_param.m_Found) {
1781 if (!x_ConvertIntParameter(kSeqIdTypeParam, seq_id_type_param.m_Value,
1782 seq_id_type, err_msg)) {
1783 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1784 x_SendMessageAndCompletionChunks(reply, err_msg,
1785 CRequestStatus::e400_BadRequest,
1786 ePSGS_MalformedParameter,
1787 eDiag_Error);
1788 PSG_WARNING(err_msg);
1789 return false;
1790 }
1791
1792 if (seq_id_type < 0 || seq_id_type >= CSeq_id::e_MaxChoice) {
1793 err_msg = "The '" + kSeqIdTypeParam +
1794 "' value must be >= 0 and less than " +
1795 to_string(CSeq_id::e_MaxChoice);
1796 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1797 x_SendMessageAndCompletionChunks(reply, err_msg,
1798 CRequestStatus::e400_BadRequest,
1799 ePSGS_MalformedParameter,
1800 eDiag_Error);
1801 PSG_WARNING(err_msg);
1802 return false;
1803 }
1804 } else {
1805 seq_id_type = -1;
1806 }
1807
1808 return true;
1809 }
1810
1811
1812 SPSGS_RequestBase::EPSGS_CacheAndDbUse
x_GetUseCacheParameter(CHttpRequest & req,string & err_msg)1813 CPubseqGatewayApp::x_GetUseCacheParameter(CHttpRequest & req,
1814 string & err_msg)
1815 {
1816 SRequestParameter use_cache_param = x_GetParam(req, kUseCacheParam);
1817
1818 if (use_cache_param.m_Found) {
1819 if (!x_IsBoolParamValid(kUseCacheParam, use_cache_param.m_Value,
1820 err_msg)) {
1821 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1822 return SPSGS_RequestBase::ePSGS_UnknownUseCache;
1823 }
1824 if (use_cache_param.m_Value == "yes")
1825 return SPSGS_RequestBase::ePSGS_CacheOnly;
1826 return SPSGS_RequestBase::ePSGS_DbOnly;
1827 }
1828 return SPSGS_RequestBase::ePSGS_CacheAndDb;
1829 }
1830
1831
x_IsShuttingDown(shared_ptr<CPSGS_Reply> reply)1832 bool CPubseqGatewayApp::x_IsShuttingDown(shared_ptr<CPSGS_Reply> reply)
1833 {
1834 if (g_ShutdownData.m_ShutdownRequested) {
1835 string msg = "The server is in process of shutting down";
1836 x_SendMessageAndCompletionChunks(reply, msg,
1837 CRequestStatus::e503_ServiceUnavailable,
1838 ePSGS_ShuttingDown,
1839 eDiag_Error);
1840 PSG_ERROR(msg);
1841 return true;
1842 }
1843 return false;
1844 }
1845
1846
x_DispatchRequest(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply)1847 void CPubseqGatewayApp::x_DispatchRequest(shared_ptr<CPSGS_Request> request,
1848 shared_ptr<CPSGS_Reply> reply)
1849 {
1850 // The dispatcher works in terms of processors while the infrastructure
1851 // works in terms of pending operation. So, lets create the corresponding
1852 // pending operations.
1853 // Note: the case when no processors are available is handled in the
1854 // dispatcher (reply is sent to the client).
1855
1856 list<unique_ptr<CPendingOperation>> pending_ops;
1857 for (auto processor: m_RequestDispatcher.DispatchRequest(request, reply)) {
1858 pending_ops.push_back(
1859 move(
1860 unique_ptr<CPendingOperation>(
1861 new CPendingOperation(request, reply, processor))));
1862 }
1863
1864 if (!pending_ops.empty()) {
1865 auto http_conn = reply->GetHttpReply()->GetHttpConnection();
1866 http_conn->Postpone(move(pending_ops), reply);
1867 }
1868 }
1869
1870