1 /* $Id: pubseq_gateway.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitri Dmitrienko
27 *
28 * File Description:
29 *
30 */
31 #include <ncbi_pch.hpp>
32
33 #include <math.h>
34 #include <thread>
35
36 #include <corelib/ncbithr.hpp>
37 #include <corelib/ncbidiag.hpp>
38 #include <corelib/request_ctx.hpp>
39 #include <corelib/ncbifile.hpp>
40 #include <corelib/ncbi_config.hpp>
41 #include <corelib/plugin_manager.hpp>
42 #include <connect/services/grid_app_version_info.hpp>
43 #include <util/random_gen.hpp>
44
45 #include <google/protobuf/stubs/common.h>
46
47 #include <objtools/pubseq_gateway/impl/cassandra/blob_storage.hpp>
48
49 #include "pubseq_gateway.hpp"
50 #include "pubseq_gateway_exception.hpp"
51 #include "pubseq_gateway_logging.hpp"
52 #include "shutdown_data.hpp"
53 #include "cass_monitor.hpp"
54 #include "introspection.hpp"
55 #include "resolve_processor.hpp"
56 #include "annot_processor.hpp"
57 #include "get_processor.hpp"
58 #include "getblob_processor.hpp"
59 #include "tse_chunk_processor.hpp"
60 #include "osg_processor.hpp"
61 #include "cdd_processor.hpp"
62 #include "favicon.hpp"
63
64
65 USING_NCBI_SCOPE;
66
67 const unsigned short kWorkersMin = 1;
68 const unsigned short kWorkersMax = 100;
69 const unsigned short kWorkersDefault = 32;
70 const unsigned short kHttpPortMin = 1;
71 const unsigned short kHttpPortMax = 65534;
72 const unsigned int kListenerBacklogMin = 5;
73 const unsigned int kListenerBacklogMax = 2048;
74 const unsigned int kListenerBacklogDefault = 256;
75 const unsigned short kTcpMaxConnMax = 65000;
76 const unsigned short kTcpMaxConnMin = 5;
77 const unsigned short kTcpMaxConnDefault = 4096;
78 const unsigned int kTimeoutMsMin = 0;
79 const unsigned int kTimeoutMsMax = UINT_MAX;
80 const unsigned int kTimeoutDefault = 30000;
81 const unsigned int kMaxRetriesDefault = 1;
82 const unsigned int kMaxRetriesMin = 0;
83 const unsigned int kMaxRetriesMax = UINT_MAX;
84 const EDiagSev kDefaultSeverity = eDiag_Critical;
85 const bool kDefaultLog = true;
86 const bool kDefaultTrace = false;
87 const string kDefaultRootKeyspace = "sat_info";
88 const unsigned int kDefaultExcludeCacheMaxSize = 1000;
89 const unsigned int kDefaultExcludeCachePurgePercentage = 20;
90 const unsigned int kDefaultExcludeCacheInactivityPurge = 60;
91 const string kDefaultAuthToken = "";
92 const bool kDefaultAllowIOTest = false;
93 const unsigned long kDefaultSlimMaxBlobSize = 10 * 1024;
94 const unsigned int kDefaultMaxHops = 2;
95 const unsigned long kDefaultSmallBlobSize = 16;
96 const bool kDefaultCassandraProcessorsEnabled = true;
97 const bool kDefaultOSGProcessorsEnabled = false;
98 const bool kDefaultCDDProcessorsEnabled = true;
99 const string kDefaultTestSeqId = "gi|2";
100 const bool kDefaultTestSeqIdIgnoreError = true;
101 const bool kDefaultSSLEnable = false;
102 const string kDefaultSSLCertFile = "";
103 const string kDefaultSSLKeyFile = "";
104 const string kDefaultSSLCiphers = "TLSv1.2:!NULL-SHA256";
105
106 static const string kDaemonizeArgName = "daemonize";
107
108
109 // Memorize the configured severity level to check before using ERR_POST.
110 // Otherwise some expensive operations are executed without a real need.
111 EDiagSev g_ConfiguredSeverity = kDefaultSeverity;
112
113 // Memorize the configured tracing to check before using ERR_POST.
114 // Otherwise some expensive operations are executed without a real need.
115 bool g_Trace = kDefaultTrace;
116
117 // Memorize the configured log on/off flag.
118 // It is used in the context resetter to avoid unnecessary context resets
119 bool g_Log = kDefaultLog;
120
121 // Create the shutdown related data. It is used in a few places:
122 // a URL handler, signal handlers, watchdog handlers
123 SShutdownData g_ShutdownData;
124
125
126 CPubseqGatewayApp * CPubseqGatewayApp::sm_PubseqApp = nullptr;
127
128
CPubseqGatewayApp()129 CPubseqGatewayApp::CPubseqGatewayApp() :
130 m_MappingIndex(0),
131 m_HttpPort(0),
132 m_HttpWorkers(kWorkersDefault),
133 m_ListenerBacklog(kListenerBacklogDefault),
134 m_TcpMaxConn(kTcpMaxConnDefault),
135 m_CassConnection(nullptr),
136 m_CassConnectionFactory(CCassConnectionFactory::s_Create()),
137 m_TimeoutMs(kTimeoutDefault),
138 m_MaxRetries(kMaxRetriesDefault),
139 m_ExcludeCacheMaxSize(kDefaultExcludeCacheMaxSize),
140 m_ExcludeCachePurgePercentage(kDefaultExcludeCachePurgePercentage),
141 m_ExcludeCacheInactivityPurge(kDefaultExcludeCacheInactivityPurge),
142 m_SmallBlobSize(kDefaultSmallBlobSize),
143 m_MinStatValue(kMinStatValue),
144 m_MaxStatValue(kMaxStatValue),
145 m_NStatBins(kNStatBins),
146 m_StatScaleType(kStatScaleType),
147 m_TickSpan(kTickSpan),
148 m_StartTime(GetFastLocalTime()),
149 m_AllowIOTest(kDefaultAllowIOTest),
150 m_SlimMaxBlobSize(kDefaultSlimMaxBlobSize),
151 m_MaxHops(kDefaultMaxHops),
152 m_CassandraProcessorsEnabled(kDefaultCassandraProcessorsEnabled),
153 m_TestSeqId(kDefaultTestSeqId),
154 m_TestSeqIdIgnoreError(kDefaultTestSeqIdIgnoreError),
155 m_ExcludeBlobCache(nullptr),
156 m_StartupDataState(ePSGS_NoCassConnection),
157 m_LogFields("http"),
158 m_OSGProcessorsEnabled(kDefaultOSGProcessorsEnabled),
159 m_CDDProcessorsEnabled(kDefaultCDDProcessorsEnabled),
160 m_SSLEnable(kDefaultSSLEnable),
161 m_SSLCiphers(kDefaultSSLCiphers)
162 {
163 sm_PubseqApp = this;
164 m_HelpMessage = GetIntrospectionNode().Repr(CJsonNode::fStandardJson);
165
166 x_RegisterProcessors();
167 }
168
169
~CPubseqGatewayApp()170 CPubseqGatewayApp::~CPubseqGatewayApp()
171 {}
172
173
Init(void)174 void CPubseqGatewayApp::Init(void)
175 {
176 unique_ptr<CArgDescriptions> argdesc(new CArgDescriptions());
177
178 argdesc->AddFlag(kDaemonizeArgName,
179 "Turn on daemonization of Pubseq Gateway at the start.");
180
181 argdesc->SetUsageContext(
182 GetArguments().GetProgramBasename(),
183 "Daemon to service Accession.Version Cache requests");
184 SetupArgDescriptions(argdesc.release());
185
186 // Memorize the configured severity
187 g_ConfiguredSeverity = GetDiagPostLevel();
188
189 // Memorize the configure trace
190 g_Trace = GetDiagTrace();
191 }
192
193
ParseArgs(void)194 void CPubseqGatewayApp::ParseArgs(void)
195 {
196 const CArgs & args = GetArgs();
197 const CNcbiRegistry & registry = GetConfig();
198
199 if (!registry.HasEntry("SERVER", "port"))
200 NCBI_THROW(CPubseqGatewayException, eConfigurationError,
201 "[SERVER]/port value is not found in the configuration "
202 "file. The port must be provided to run the server. "
203 "Exiting.");
204
205 m_Si2csiDbFile = registry.GetString("LMDB_CACHE", "dbfile_si2csi", "");
206 m_BioseqInfoDbFile = registry.GetString("LMDB_CACHE", "dbfile_bioseq_info", "");
207 m_BlobPropDbFile = registry.GetString("LMDB_CACHE", "dbfile_blob_prop", "");
208 m_HttpPort = registry.GetInt("SERVER", "port", 0);
209 m_HttpWorkers = registry.GetInt("SERVER", "workers",
210 kWorkersDefault);
211 m_ListenerBacklog = registry.GetInt("SERVER", "backlog",
212 kListenerBacklogDefault);
213 m_TcpMaxConn = registry.GetInt("SERVER", "maxconn",
214 kTcpMaxConnDefault);
215 m_TimeoutMs = registry.GetInt("SERVER", "optimeout",
216 kTimeoutDefault);
217 m_MaxRetries = registry.GetInt("SERVER", "maxretries",
218 kMaxRetriesDefault);
219 g_Log = registry.GetBool("SERVER", "log",
220 kDefaultLog);
221 m_RootKeyspace = registry.GetString("SERVER", "root_keyspace",
222 kDefaultRootKeyspace);
223
224 m_ExcludeCacheMaxSize = registry.GetInt("AUTO_EXCLUDE", "max_cache_size",
225 kDefaultExcludeCacheMaxSize);
226 m_ExcludeCachePurgePercentage = registry.GetInt("AUTO_EXCLUDE",
227 "purge_percentage",
228 kDefaultExcludeCachePurgePercentage);
229 m_ExcludeCacheInactivityPurge = registry.GetInt("AUTO_EXCLUDE",
230 "inactivity_purge_timeout",
231 kDefaultExcludeCacheInactivityPurge);
232 m_AllowIOTest = registry.GetBool("DEBUG", "psg_allow_io_test",
233 kDefaultAllowIOTest);
234
235 m_SlimMaxBlobSize = x_GetDataSize(registry, "SERVER", "slim_max_blob_size",
236 kDefaultSlimMaxBlobSize);
237 m_MaxHops = registry.GetInt("SERVER", "max_hops", kDefaultMaxHops);
238
239 try {
240 m_AuthToken = registry.GetEncryptedString("ADMIN", "auth_token",
241 IRegistry::fPlaintextAllowed);
242 } catch (const CRegistryException & ex) {
243 string msg = "Decrypting error detected while reading "
244 "[ADMIN]/auth_token value: " + string(ex.what());
245 ERR_POST(msg);
246 m_Alerts.Register(ePSGS_ConfigAuthDecrypt, msg);
247
248 // Treat the value as a clear text
249 m_AuthToken = registry.GetString("ADMIN", "auth_token",
250 kDefaultAuthToken);
251 } catch (...) {
252 string msg = "Unknown decrypting error detected while reading "
253 "[ADMIN]/auth_token value";
254 ERR_POST(msg);
255 m_Alerts.Register(ePSGS_ConfigAuthDecrypt, msg);
256
257 // Treat the value as a clear text
258 m_AuthToken = registry.GetString("ADMIN", "auth_token",
259 kDefaultAuthToken);
260 }
261
262 m_CassConnectionFactory->AppParseArgs(args);
263 m_CassConnectionFactory->LoadConfig(registry, "");
264 m_CassConnectionFactory->SetLogging(GetDiagPostLevel());
265
266 m_SmallBlobSize = x_GetDataSize(registry, "STATISTICS", "small_blob_size",
267 kDefaultSmallBlobSize);
268 m_MinStatValue = registry.GetInt("STATISTICS", "min", kMinStatValue);
269 m_MaxStatValue = registry.GetInt("STATISTICS", "max", kMaxStatValue);
270 m_NStatBins = registry.GetInt("STATISTICS", "n_bins", kNStatBins);
271 m_StatScaleType = registry.GetString("STATISTICS", "type", kStatScaleType);
272 m_TickSpan = registry.GetInt("STATISTICS", "tick_span", kTickSpan);
273
274 x_ReadIdToNameAndDescriptionConfiguration(registry, "COUNTERS");
275
276 m_OSGConnectionPool = new psg::osg::COSGConnectionPool();
277 m_OSGConnectionPool->AppParseArgs(args);
278 m_OSGConnectionPool->LoadConfig(registry);
279 m_OSGConnectionPool->SetLogging(GetDiagPostLevel());
280
281 m_OSGProcessorsEnabled = registry.GetBool(
282 "OSG_PROCESSOR", "enabled",
283 kDefaultOSGProcessorsEnabled);
284 m_CDDProcessorsEnabled = registry.GetBool(
285 "CDD_PROCESSOR", "enabled",
286 kDefaultCDDProcessorsEnabled);
287 m_CassandraProcessorsEnabled = registry.GetBool(
288 "CASSANDRA_PROCESSOR", "enabled",
289 kDefaultCassandraProcessorsEnabled);
290
291 m_TestSeqId = registry.GetString("HEALTH", "test_seq_id", kDefaultTestSeqId);
292 m_TestSeqIdIgnoreError = registry.GetBool("HEALTH", "test_seq_id_ignore_error",
293 kDefaultTestSeqIdIgnoreError);
294
295 m_SSLEnable = registry.GetBool("SSL", "ssl_enable", kDefaultSSLEnable);
296 m_SSLCertFile = registry.GetString("SSL", "ssl_cert_file", kDefaultSSLCertFile);
297 m_SSLKeyFile = registry.GetString("SSL", "ssl_key_file", kDefaultSSLKeyFile);
298 m_SSLCiphers = registry.GetString("SSL", "ssl_ciphers", kDefaultSSLCiphers);
299
300 // It throws an exception in case of inability to start
301 x_ValidateArgs();
302 }
303
304
OpenCache(void)305 void CPubseqGatewayApp::OpenCache(void)
306 {
307 // It was decided to work with and without the cache even if the wrapper
308 // has not been created. So the cache initialization is called once and
309 // always succeed.
310 m_StartupDataState = ePSGS_StartupDataOK;
311
312 try {
313 // NB. It was decided that the configuration may ommit the cache file
314 // paths. In this case the server should not use the corresponding
315 // cache at all. This is covered in the CPubseqGatewayCache class.
316 m_LookupCache.reset(new CPubseqGatewayCache(m_BioseqInfoDbFile,
317 m_Si2csiDbFile,
318 m_BlobPropDbFile));
319
320 // The format of the sat ids is different
321 set<int> sat_ids;
322 for (size_t index = 0; index < m_SatNames.size(); ++index) {
323 if (!m_SatNames[index].empty()) {
324 sat_ids.insert(index);
325 }
326 }
327
328 m_LookupCache->Open(sat_ids);
329 const auto errors = m_LookupCache->GetErrors();
330 if (!errors.empty()) {
331 string msg = "Error opening the LMDB cache:";
332 for (const auto & err : errors) {
333 msg += "\n" + err.message;
334 }
335
336 PSG_ERROR(msg);
337 m_Alerts.Register(ePSGS_OpenCache, msg);
338 m_LookupCache->ResetErrors();
339 }
340 } catch (const exception & exc) {
341 string msg = "Error initializing the LMDB cache: " +
342 string(exc.what()) +
343 ". The server continues without cache.";
344 PSG_ERROR(exc);
345 m_Alerts.Register(ePSGS_OpenCache, msg);
346 m_LookupCache.reset(nullptr);
347 } catch (...) {
348 string msg = "Unknown initializing LMDB cache error. "
349 "The server continues without cache.";
350 PSG_ERROR(msg);
351 m_Alerts.Register(ePSGS_OpenCache, msg);
352 m_LookupCache.reset(nullptr);
353 }
354 }
355
356
OpenCass(void)357 bool CPubseqGatewayApp::OpenCass(void)
358 {
359 static bool need_logging = true;
360
361 try {
362 if (!m_CassConnection)
363 m_CassConnection = m_CassConnectionFactory->CreateInstance();
364 m_CassConnection->Connect();
365
366 // Next step in the startup data initialization
367 m_StartupDataState = ePSGS_NoValidCassMapping;
368 } catch (const exception & exc) {
369 string msg = "Error connecting to Cassandra: " +
370 string(exc.what());
371 if (need_logging)
372 PSG_CRITICAL(exc);
373
374 need_logging = false;
375 m_Alerts.Register(ePSGS_OpenCassandra, msg);
376 return false;
377 } catch (...) {
378 string msg = "Unknown Cassandra connecting error";
379 if (need_logging)
380 PSG_CRITICAL(msg);
381
382 need_logging = false;
383 m_Alerts.Register(ePSGS_OpenCassandra, msg);
384 return false;
385 }
386
387 need_logging = false;
388 return true;
389 }
390
391
CloseCass(void)392 void CPubseqGatewayApp::CloseCass(void)
393 {
394 m_CassConnection = nullptr;
395 m_CassConnectionFactory = nullptr;
396 }
397
398
SatToKeyspace(int sat,string & sat_name)399 bool CPubseqGatewayApp::SatToKeyspace(int sat, string & sat_name)
400 {
401 if (sat >= 0) {
402 if (sat < static_cast<int>(m_SatNames.size())) {
403 sat_name = m_SatNames[sat];
404 return !sat_name.empty();
405 }
406 }
407 return false;
408 }
409
410
Run(void)411 int CPubseqGatewayApp::Run(void)
412 {
413 srand(time(NULL));
414
415 try {
416 ParseArgs();
417 } catch (const exception & exc) {
418 PSG_CRITICAL(exc.what());
419 return 1;
420 } catch (...) {
421 PSG_CRITICAL("Unknown argument parsing error");
422 return 1;
423 }
424
425 if (GetArgs()[kDaemonizeArgName]) {
426 // NOTE: if the stdin/stdout are not kept (default daemonize behavior)
427 // then libuv fails to close its events loop. There are some asserts
428 // with fds, one of them fails so a core dump is generated.
429 // With stdin/stdout kept open libuv stays happy and does not crash
430 bool is_good = CCurrentProcess::Daemonize(kEmptyCStr,
431 CCurrentProcess::fDF_KeepCWD |
432 CCurrentProcess::fDF_KeepStdin |
433 CCurrentProcess::fDF_KeepStdout);
434 if (!is_good)
435 NCBI_THROW(CPubseqGatewayException, eDaemonizationFailed,
436 "Error during daemonization");
437 }
438
439 bool connected = OpenCass();
440 bool populated = false;
441 if (connected) {
442 PopulatePublicCommentsMapping();
443 populated = PopulateCassandraMapping();
444 }
445
446 if (populated)
447 OpenCache();
448
449 // m_IdToNameAndDescription was populated at the time of
450 // dealing with arguments
451 m_Counters.UpdateConfiguredNameDescription(m_IdToNameAndDescription);
452
453 auto purge_size = round(float(m_ExcludeCacheMaxSize) *
454 float(m_ExcludeCachePurgePercentage) / 100.0);
455 m_ExcludeBlobCache.reset(
456 new CExcludeBlobCache(m_ExcludeCacheInactivityPurge,
457 m_ExcludeCacheMaxSize,
458 m_ExcludeCacheMaxSize - static_cast<size_t>(purge_size)));
459
460 m_Timing.reset(new COperationTiming(m_MinStatValue,
461 m_MaxStatValue,
462 m_NStatBins,
463 m_StatScaleType,
464 m_SmallBlobSize));
465
466 vector<CHttpHandler<CPendingOperation>> http_handler;
467 CHttpGetParser get_parser;
468
469 http_handler.emplace_back(
470 "/ID/getblob",
471 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
472 {
473 return OnGetBlob(req, reply);
474 }, &get_parser, nullptr);
475 http_handler.emplace_back(
476 "/ID/get",
477 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
478 {
479 return OnGet(req, reply);
480 }, &get_parser, nullptr);
481 http_handler.emplace_back(
482 "/ID/resolve",
483 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
484 {
485 return OnResolve(req, reply);
486 }, &get_parser, nullptr);
487 http_handler.emplace_back(
488 "/ID/get_tse_chunk",
489 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
490 {
491 return OnGetTSEChunk(req, reply);
492 }, &get_parser, nullptr);
493 http_handler.emplace_back(
494 "/ID/get_na",
495 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
496 {
497 return OnGetNA(req, reply);
498 }, &get_parser, nullptr);
499 http_handler.emplace_back(
500 "/health",
501 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
502 {
503 return OnHealth(req, reply);
504 }, &get_parser, nullptr);
505 http_handler.emplace_back(
506 "/deep-health",
507 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
508 {
509 // For the time being 'deep-health' matches 'health'
510 return OnHealth(req, reply);
511 }, &get_parser, nullptr);
512 http_handler.emplace_back(
513 "/ADMIN/config",
514 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
515 {
516 return OnConfig(req, reply);
517 }, &get_parser, nullptr);
518 http_handler.emplace_back(
519 "/ADMIN/info",
520 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
521 {
522 return OnInfo(req, reply);
523 }, &get_parser, nullptr);
524 http_handler.emplace_back(
525 "/ADMIN/status",
526 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
527 {
528 return OnStatus(req, reply);
529 }, &get_parser, nullptr);
530 http_handler.emplace_back(
531 "/ADMIN/shutdown",
532 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
533 {
534 return OnShutdown(req, reply);
535 }, &get_parser, nullptr);
536 http_handler.emplace_back(
537 "/ADMIN/get_alerts",
538 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
539 {
540 return OnGetAlerts(req, reply);
541 }, &get_parser, nullptr);
542 http_handler.emplace_back(
543 "/ADMIN/ack_alert",
544 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
545 {
546 return OnAckAlert(req, reply);
547 }, &get_parser, nullptr);
548 http_handler.emplace_back(
549 "/ADMIN/statistics",
550 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
551 {
552 return OnStatistics(req, reply);
553 }, &get_parser, nullptr);
554 http_handler.emplace_back(
555 "/favicon.ico",
556 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
557 {
558 // It's a browser, most probably admin request
559 reply->GetHttpReply()->SetContentType(ePSGS_ImageMime);
560 reply->GetHttpReply()->SetContentLength(sizeof(favicon));
561 reply->GetHttpReply()->SendOk((const char *)(favicon), sizeof(favicon), true);
562 return 0;
563 }, &get_parser, nullptr);
564
565 if (m_AllowIOTest) {
566 m_IOTestBuffer.reset(new char[kMaxTestIOSize]);
567 CRandom random;
568 char * current = m_IOTestBuffer.get();
569 for (size_t k = 0; k < kMaxTestIOSize; k += 8) {
570 Uint8 random_val = random.GetRandUint8();
571 memcpy(current, &random_val, 8);
572 current += 8;
573 }
574
575 http_handler.emplace_back(
576 "/TEST/io",
577 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
578 {
579 return OnTestIO(req, reply);
580 }, &get_parser, nullptr);
581 }
582
583 http_handler.emplace_back(
584 "",
585 [this](CHttpRequest & req, shared_ptr<CPSGS_Reply> reply)->int
586 {
587 return OnBadURL(req, reply);
588 }, &get_parser, nullptr);
589
590
591 x_InitSSL();
592 m_TcpDaemon.reset(
593 new CHttpDaemon<CPendingOperation>(http_handler, "0.0.0.0",
594 m_HttpPort,
595 m_HttpWorkers,
596 m_ListenerBacklog,
597 m_TcpMaxConn));
598
599
600
601 // Run the monitoring thread
602 int ret_code = 0;
603 std::thread monitoring_thread(CassMonitorThreadedFunction);
604
605 try {
606 m_TcpDaemon->Run([this](TSL::CTcpDaemon<CHttpProto<CPendingOperation>,
607 CHttpConnection<CPendingOperation>,
608 CHttpDaemon<CPendingOperation>> & tcp_daemon)
609 {
610 // This lambda is called once per second.
611 // Earlier implementations printed counters on stdout.
612
613 static unsigned long tick_no = 0;
614 if (++tick_no % m_TickSpan == 0) {
615 tick_no = 0;
616 this->m_Timing->Rotate();
617 }
618 });
619 } catch (const CException & exc) {
620 ERR_POST(Critical << exc);
621 ret_code = 1;
622 g_ShutdownData.m_ShutdownRequested = true;
623 monitoring_thread.detach(); // to avoid up to 60 sec delay
624 } catch (const exception & exc) {
625 ERR_POST(Critical << exc);
626 ret_code = 1;
627 g_ShutdownData.m_ShutdownRequested = true;
628 monitoring_thread.detach(); // to avoid up to 60 sec delay
629 } catch (...) {
630 ERR_POST(Critical << "Unknown exception while running TCP daemon");
631 ret_code = 1;
632 g_ShutdownData.m_ShutdownRequested = true;
633 monitoring_thread.detach(); // to avoid up to 60 sec delay
634 }
635
636 if (monitoring_thread.joinable()) {
637 monitoring_thread.join();
638 }
639
640 CloseCass();
641 return ret_code;
642 }
643
644
645
GetInstance(void)646 CPubseqGatewayApp * CPubseqGatewayApp::GetInstance(void)
647 {
648 return sm_PubseqApp;
649 }
650
651
x_ValidateArgs(void)652 void CPubseqGatewayApp::x_ValidateArgs(void)
653 {
654 if (m_HttpPort < kHttpPortMin || m_HttpPort > kHttpPortMax) {
655 NCBI_THROW(CPubseqGatewayException, eConfigurationError,
656 "[SERVER]/port value is out of range. Allowed range: " +
657 to_string(kHttpPortMin) + "..." +
658 to_string(kHttpPortMax) + ". Received: " +
659 to_string(m_HttpPort));
660 }
661
662 if (m_Si2csiDbFile.empty()) {
663 PSG_WARNING("[LMDB_CACHE]/dbfile_si2csi is not found "
664 "in the ini file. No si2csi cache will be used.");
665 }
666
667 if (m_BioseqInfoDbFile.empty()) {
668 PSG_WARNING("[LMDB_CACHE]/dbfile_bioseq_info is not found "
669 "in the ini file. No bioseq_info cache will be used.");
670 }
671
672 if (m_BlobPropDbFile.empty()) {
673 PSG_WARNING("[LMDB_CACHE]/dbfile_blob_prop is not found "
674 "in the ini file. No blob_prop cache will be used.");
675 }
676
677 if (m_HttpWorkers < kWorkersMin || m_HttpWorkers > kWorkersMax) {
678 string err_msg =
679 "The number of HTTP workers is out of range. Allowed "
680 "range: " + to_string(kWorkersMin) + "..." +
681 to_string(kWorkersMax) + ". Received: " +
682 to_string(m_HttpWorkers) + ". Reset to "
683 "default: " + to_string(kWorkersDefault);
684 m_Alerts.Register(ePSGS_ConfigHttpWorkers, err_msg);
685 PSG_ERROR(err_msg);
686 m_HttpWorkers = kWorkersDefault;
687 }
688
689 if (m_ListenerBacklog < kListenerBacklogMin ||
690 m_ListenerBacklog > kListenerBacklogMax) {
691 string err_msg =
692 "The listener backlog is out of range. Allowed "
693 "range: " + to_string(kListenerBacklogMin) + "..." +
694 to_string(kListenerBacklogMax) + ". Received: " +
695 to_string(m_ListenerBacklog) + ". Reset to "
696 "default: " + to_string(kListenerBacklogDefault);
697 m_Alerts.Register(ePSGS_ConfigListenerBacklog, err_msg);
698 PSG_ERROR(err_msg);
699 m_ListenerBacklog = kListenerBacklogDefault;
700 }
701
702 if (m_TcpMaxConn < kTcpMaxConnMin || m_TcpMaxConn > kTcpMaxConnMax) {
703 string err_msg =
704 "The max number of connections is out of range. Allowed "
705 "range: " + to_string(kTcpMaxConnMin) + "..." +
706 to_string(kTcpMaxConnMax) + ". Received: " +
707 to_string(m_TcpMaxConn) + ". Reset to "
708 "default: " + to_string(kTcpMaxConnDefault);
709 m_Alerts.Register(ePSGS_ConfigMaxConnections, err_msg);
710 PSG_ERROR(err_msg);
711 m_TcpMaxConn = kTcpMaxConnDefault;
712 }
713
714 if (m_TimeoutMs < kTimeoutMsMin || m_TimeoutMs > kTimeoutMsMax) {
715 string err_msg =
716 "The operation timeout is out of range. Allowed "
717 "range: " + to_string(kTimeoutMsMin) + "..." +
718 to_string(kTimeoutMsMax) + ". Received: " +
719 to_string(m_TimeoutMs) + ". Reset to "
720 "default: " + to_string(kTimeoutDefault);
721 m_Alerts.Register(ePSGS_ConfigTimeout, err_msg);
722 PSG_ERROR(err_msg);
723 m_TimeoutMs = kTimeoutDefault;
724 }
725
726 if (m_MaxRetries < kMaxRetriesMin || m_MaxRetries > kMaxRetriesMax) {
727 string err_msg =
728 "The max retries is out of range. Allowed "
729 "range: " + to_string(kMaxRetriesMin) + "..." +
730 to_string(kMaxRetriesMax) + ". Received: " +
731 to_string(m_MaxRetries) + ". Reset to "
732 "default: " + to_string(kMaxRetriesDefault);
733 m_Alerts.Register(ePSGS_ConfigRetries, err_msg);
734 PSG_ERROR(err_msg);
735 m_MaxRetries = kMaxRetriesDefault;
736 }
737
738 if (m_ExcludeCacheMaxSize < 0) {
739 string err_msg =
740 "The max exclude cache size must be a positive integer. "
741 "Received: " + to_string(m_ExcludeCacheMaxSize) + ". "
742 "Reset to 0 (exclude blobs cache is disabled)";
743 m_Alerts.Register(ePSGS_ConfigExcludeCacheSize, err_msg);
744 PSG_ERROR(err_msg);
745 m_ExcludeCacheMaxSize = 0;
746 }
747
748 if (m_ExcludeCachePurgePercentage < 0 || m_ExcludeCachePurgePercentage > 100) {
749 string err_msg = "The exclude cache purge percentage is out of range. "
750 "Allowed: 0...100. Received: " +
751 to_string(m_ExcludeCachePurgePercentage) + ". ";
752 if (m_ExcludeCacheMaxSize > 0) {
753 err_msg += "Reset to " +
754 to_string(kDefaultExcludeCachePurgePercentage);
755 PSG_ERROR(err_msg);
756 } else {
757 err_msg += "The provided value has no effect "
758 "because the exclude cache is disabled.";
759 PSG_WARNING(err_msg);
760 }
761 m_ExcludeCachePurgePercentage = kDefaultExcludeCachePurgePercentage;
762 m_Alerts.Register(ePSGS_ConfigExcludeCachePurgeSize, err_msg);
763 }
764
765 if (m_ExcludeCacheInactivityPurge <= 0) {
766 string err_msg = "The exclude cache inactivity purge timeout must be "
767 "a positive integer greater than zero. Received: " +
768 to_string(m_ExcludeCacheInactivityPurge) + ". ";
769 if (m_ExcludeCacheMaxSize > 0) {
770 err_msg += "Reset to " +
771 to_string(kDefaultExcludeCacheInactivityPurge);
772 PSG_ERROR(err_msg);
773 } else {
774 err_msg += "The provided value has no effect "
775 "because the exclude cache is disabled.";
776 PSG_WARNING(err_msg);
777 }
778 m_ExcludeCacheInactivityPurge = kDefaultExcludeCacheInactivityPurge;
779 m_Alerts.Register(ePSGS_ConfigExcludeCacheInactivity, err_msg);
780 }
781
782 bool stat_settings_good = true;
783 if (NStr::CompareNocase(m_StatScaleType, "log") != 0 &&
784 NStr::CompareNocase(m_StatScaleType, "linear") != 0) {
785 string err_msg = "Invalid [STATISTICS]/type value '" +
786 m_StatScaleType + "'. Allowed values are: log, linear. "
787 "The statistics parameters are reset to default.";
788 m_Alerts.Register(ePSGS_ConfigStatScaleType, err_msg);
789 PSG_ERROR(err_msg);
790 stat_settings_good = false;
791
792 m_MinStatValue = kMinStatValue;
793 m_MaxStatValue = kMaxStatValue;
794 m_NStatBins = kNStatBins;
795 m_StatScaleType = kStatScaleType;
796 }
797
798 if (stat_settings_good) {
799 if (m_MinStatValue > m_MaxStatValue) {
800 string err_msg = "Invalid [STATISTICS]/min and max values. The "
801 "max cannot be less than min. "
802 "The statistics parameters are reset to default.";
803 m_Alerts.Register(ePSGS_ConfigStatMinMaxVal, err_msg);
804 PSG_ERROR(err_msg);
805 stat_settings_good = false;
806
807 m_MinStatValue = kMinStatValue;
808 m_MaxStatValue = kMaxStatValue;
809 m_NStatBins = kNStatBins;
810 m_StatScaleType = kStatScaleType;
811 }
812 }
813
814 if (stat_settings_good) {
815 if (m_NStatBins <= 0) {
816 string err_msg = "Invalid [STATISTICS]/n_bins value. The "
817 "number of bins must be greater than 0. "
818 "The statistics parameters are reset to default.";
819 m_Alerts.Register(ePSGS_ConfigStatNBins, err_msg);
820 PSG_ERROR(err_msg);
821
822 // Uncomment if there will be more [STATISTICS] section parameters
823 // stat_settings_good = false;
824
825 m_MinStatValue = kMinStatValue;
826 m_MaxStatValue = kMaxStatValue;
827 m_NStatBins = kNStatBins;
828 m_StatScaleType = kStatScaleType;
829 }
830 }
831
832 if (m_TickSpan <= 0) {
833 PSG_WARNING("Invalid [STATISTICS]/tick_span value (" +
834 to_string(m_TickSpan) + "). "
835 "The tick span must be greater than 0. The tick span is "
836 "reset to the default value (" +
837 to_string(kTickSpan) + ").");
838 m_TickSpan = kTickSpan;
839 }
840
841 if (m_MaxHops <= 0) {
842 PSG_WARNING("Invalid [SERVER]/max_hops value (" +
843 to_string(m_MaxHops) + "). "
844 "The max hops must be greater than 0. The max hops is "
845 "reset to the default value (" +
846 to_string(kDefaultMaxHops) + ").");
847 m_MaxHops = kDefaultMaxHops;
848 }
849
850 if (m_SSLEnable) {
851 if (m_SSLCertFile.empty()) {
852 NCBI_THROW(CPubseqGatewayException, eConfigurationError,
853 "[SSL]/ssl_cert_file value must be provided "
854 "if [SSL]/ssl_enable is set to true");
855 }
856 if (m_SSLKeyFile.empty()) {
857 NCBI_THROW(CPubseqGatewayException, eConfigurationError,
858 "[SSL]/ssl_key_file value must be provided "
859 "if [SSL]/ssl_enable is set to true");
860 }
861
862 if (!CFile(m_SSLCertFile).Exists()) {
863 NCBI_THROW(CPubseqGatewayException, eConfigurationError,
864 "[SSL]/ssl_cert_file is not found");
865 }
866 if (!CFile(m_SSLKeyFile).Exists()) {
867 NCBI_THROW(CPubseqGatewayException, eConfigurationError,
868 "[SSL]/ssl_key_file is not found");
869 }
870
871 if (m_SSLCiphers.empty()) {
872 m_SSLCiphers = kDefaultSSLCiphers;
873 }
874 }
875 }
876
877
x_GetCmdLineArguments(void) const878 string CPubseqGatewayApp::x_GetCmdLineArguments(void) const
879 {
880 const CNcbiArguments & arguments = CNcbiApplication::Instance()->
881 GetArguments();
882 size_t args_size = arguments.Size();
883 string cmdline_args;
884
885 for (size_t index = 0; index < args_size; ++index) {
886 if (index != 0)
887 cmdline_args += " ";
888 cmdline_args += arguments[index];
889 }
890 return cmdline_args;
891 }
892
893
894 static string kNcbiSidHeader = "HTTP_NCBI_SID";
895 static string kNcbiPhidHeader = "HTTP_NCBI_PHID";
896 static string kXForwardedForHeader = "X-Forwarded-For";
897 static string kUserAgentHeader = "User-Agent";
898 static string kUserAgentApplog = "USER_AGENT";
899 static string kRequestPathApplog = "request_path";
x_CreateRequestContext(CHttpRequest & req) const900 CRef<CRequestContext> CPubseqGatewayApp::x_CreateRequestContext(
901 CHttpRequest & req) const
902 {
903 CRef<CRequestContext> context;
904 if (g_Log) {
905 context.Reset(new CRequestContext());
906 context->SetRequestID();
907
908 // NCBI SID may come from the header
909 string sid = req.GetHeaderValue(kNcbiSidHeader);
910 if (!sid.empty())
911 context->SetSessionID(sid);
912 else
913 context->SetSessionID();
914
915 // NCBI PHID may come from the header
916 string phid = req.GetHeaderValue(kNcbiPhidHeader);
917 if (!phid.empty())
918 context->SetHitID(phid);
919 else
920 context->SetHitID();
921
922 // Client IP may come from the headers
923 string client_ip = req.GetHeaderValue(kXForwardedForHeader);
924 if (!client_ip.empty()) {
925 vector<string> ip_addrs;
926 NStr::Split(client_ip, ",", ip_addrs);
927 if (!ip_addrs.empty()) {
928 // Take the first one, there could be many...
929 context->SetClientIP(ip_addrs[0]);
930 }
931 } else {
932 client_ip = req.GetPeerIP();
933 if (!client_ip.empty()) {
934 context->SetClientIP(client_ip);
935 }
936 }
937
938 // It was decided not to use the standard C++ Toolkit function because
939 // it searches in the CGI environment and does quite a bit of
940 // unnecessary things. The PSG server only needs X-Forwarded-For
941 // TNCBI_IPv6Addr client_address = req.GetClientIP();
942 // if (!NcbiIsEmptyIPv6(&client_address)) {
943 // char buf[256];
944 // if (NcbiIPv6ToString(buf, sizeof(buf), &client_address) != 0) {
945 // context->SetClientIP(buf);
946 // }
947 // }
948
949 CDiagContext::SetRequestContext(context);
950 CDiagContext_Extra extra = GetDiagContext().PrintRequestStart();
951
952 // This is the URL path
953 extra.Print(kRequestPathApplog, req.GetPath());
954
955 string user_agent = req.GetHeaderValue(kUserAgentHeader);
956 if (!user_agent.empty())
957 extra.Print(kUserAgentApplog, user_agent);
958
959 req.PrintParams(extra);
960 req.PrintLogFields(m_LogFields);
961
962 // If extra is not flushed then it picks read-only even though it is
963 // done after...
964 extra.Flush();
965
966 // Just in case, avoid to have 0
967 context->SetRequestStatus(CRequestStatus::e200_Ok);
968 context->SetReadOnly(true);
969 }
970 return context;
971 }
972
973
x_PrintRequestStop(CRef<CRequestContext> context,int status)974 void CPubseqGatewayApp::x_PrintRequestStop(CRef<CRequestContext> context,
975 int status)
976 {
977 if (context.NotNull()) {
978 CDiagContext::SetRequestContext(context);
979 context->SetReadOnly(false);
980 context->SetRequestStatus(status);
981 GetDiagContext().PrintRequestStop();
982 context.Reset();
983 CDiagContext::SetRequestContext(NULL);
984 }
985 }
986
987
988 CPubseqGatewayApp::SRequestParameter
x_GetParam(CHttpRequest & req,const string & name) const989 CPubseqGatewayApp::x_GetParam(CHttpRequest & req, const string & name) const
990 {
991 SRequestParameter param;
992 const char * value;
993 size_t value_size;
994
995 param.m_Found = req.GetParam(name.data(), name.size(),
996 true, &value, &value_size);
997 if (param.m_Found)
998 param.m_Value.assign(value, value_size);
999 return param;
1000 }
1001
1002
x_IsBoolParamValid(const string & param_name,const CTempString & param_value,string & err_msg) const1003 bool CPubseqGatewayApp::x_IsBoolParamValid(const string & param_name,
1004 const CTempString & param_value,
1005 string & err_msg) const
1006 {
1007 static string yes = "yes";
1008 static string no = "no";
1009
1010 if (param_value != yes && param_value != no) {
1011 err_msg = "Malformed '" + param_name + "' parameter. "
1012 "Acceptable values are '" + yes + "' and '" + no + "'.";
1013 return false;
1014 }
1015 return true;
1016 }
1017
1018
1019 SPSGS_ResolveRequest::EPSGS_OutputFormat
x_GetOutputFormat(const string & param_name,const CTempString & param_value,string & err_msg) const1020 CPubseqGatewayApp::x_GetOutputFormat(const string & param_name,
1021 const CTempString & param_value,
1022 string & err_msg) const
1023 {
1024 static string protobuf = "protobuf";
1025 static string json = "json";
1026 static string native = "native";
1027
1028 if (param_value == protobuf)
1029 return SPSGS_ResolveRequest::ePSGS_ProtobufFormat;
1030 if (param_value == json)
1031 return SPSGS_ResolveRequest::ePSGS_JsonFormat;
1032 if (param_value == native)
1033 return SPSGS_ResolveRequest::ePSGS_NativeFormat;
1034
1035 err_msg = "Malformed '" + param_name + "' parameter. "
1036 "Acceptable values are '" +
1037 protobuf + "' and '" +
1038 json + "' and '" +
1039 native + "'.";
1040 return SPSGS_ResolveRequest::ePSGS_UnknownFormat;
1041 }
1042
1043
1044 SPSGS_BlobRequestBase::EPSGS_TSEOption
x_GetTSEOption(const string & param_name,const CTempString & param_value,string & err_msg) const1045 CPubseqGatewayApp::x_GetTSEOption(const string & param_name,
1046 const CTempString & param_value,
1047 string & err_msg) const
1048 {
1049 static string none = "none";
1050 static string whole = "whole";
1051 static string orig = "orig";
1052 static string smart = "smart";
1053 static string slim = "slim";
1054
1055 if (param_value == none)
1056 return SPSGS_BlobRequestBase::ePSGS_NoneTSE;
1057 if (param_value == whole)
1058 return SPSGS_BlobRequestBase::ePSGS_WholeTSE;
1059 if (param_value == orig)
1060 return SPSGS_BlobRequestBase::ePSGS_OrigTSE;
1061 if (param_value == smart)
1062 return SPSGS_BlobRequestBase::ePSGS_SmartTSE;
1063 if (param_value == slim)
1064 return SPSGS_BlobRequestBase::ePSGS_SlimTSE;
1065
1066 err_msg = "Malformed '" + param_name + "' parameter. "
1067 "Acceptable values are '" +
1068 none + "', '" +
1069 whole + "', '" +
1070 orig + "', '" +
1071 smart + "' and '" +
1072 slim + "'.";
1073 return SPSGS_BlobRequestBase::ePSGS_UnknownTSE;
1074 }
1075
1076
1077 SPSGS_RequestBase::EPSGS_AccSubstitutioOption
x_GetAccessionSubstitutionOption(const string & param_name,const CTempString & param_value,string & err_msg) const1078 CPubseqGatewayApp::x_GetAccessionSubstitutionOption(
1079 const string & param_name,
1080 const CTempString & param_value,
1081 string & err_msg) const
1082 {
1083 static string default_option = "default";
1084 static string limited_option = "limited";
1085 static string never_option = "never";
1086
1087 if (param_value == default_option)
1088 return SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
1089 if (param_value == limited_option)
1090 return SPSGS_RequestBase::ePSGS_LimitedAccSubstitution;
1091 if (param_value == never_option)
1092 return SPSGS_RequestBase::ePSGS_NeverAccSubstitute;
1093
1094 err_msg = "Malformed '" + param_name + "' parameter. "
1095 "Acceptable values are '" +
1096 default_option + "', '" +
1097 limited_option + "', '" +
1098 never_option + "'.";
1099 return SPSGS_RequestBase::ePSGS_UnknownAccSubstitution;
1100 }
1101
1102
1103 bool
x_GetTraceParameter(CHttpRequest & req,const string & param_name,SPSGS_RequestBase::EPSGS_Trace & trace,string & err_msg)1104 CPubseqGatewayApp::x_GetTraceParameter(CHttpRequest & req,
1105 const string & param_name,
1106 SPSGS_RequestBase::EPSGS_Trace & trace,
1107 string & err_msg)
1108 {
1109 trace = SPSGS_RequestBase::ePSGS_NoTracing;
1110 SRequestParameter trace_protocol_param = x_GetParam(req, param_name);
1111
1112 if (trace_protocol_param.m_Found) {
1113 if (!x_IsBoolParamValid(param_name,
1114 trace_protocol_param.m_Value, err_msg)) {
1115 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1116 PSG_WARNING(err_msg);
1117 return false;
1118 }
1119 if (trace_protocol_param.m_Value == "yes")
1120 trace = SPSGS_RequestBase::ePSGS_WithTracing;
1121 else
1122 trace = SPSGS_RequestBase::ePSGS_NoTracing;;
1123 }
1124 return true;
1125 }
1126
1127
1128 bool
x_GetHops(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply,int & hops)1129 CPubseqGatewayApp::x_GetHops(CHttpRequest & req,
1130 shared_ptr<CPSGS_Reply> reply,
1131 int & hops)
1132 {
1133 static string kHopsParam = "hops";
1134
1135 hops = 0; // Default value
1136 SRequestParameter hops_param = x_GetParam(req, kHopsParam);
1137
1138 if (hops_param.m_Found) {
1139 string err_msg;
1140 if (!x_ConvertIntParameter(kHopsParam, hops_param.m_Value,
1141 hops, err_msg)) {
1142 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1143 x_SendMessageAndCompletionChunks(reply, err_msg,
1144 CRequestStatus::e400_BadRequest,
1145 ePSGS_MalformedParameter,
1146 eDiag_Error);
1147 PSG_ERROR(err_msg);
1148 return false;
1149 }
1150
1151 if (hops < 0) {
1152 err_msg = "Invalid '" + kHopsParam + "' value " + to_string(hops) +
1153 ". It must be > 0.";
1154 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1155 x_SendMessageAndCompletionChunks(reply, err_msg,
1156 CRequestStatus::e400_BadRequest,
1157 ePSGS_MalformedParameter,
1158 eDiag_Error);
1159 PSG_ERROR(err_msg);
1160 return false;
1161 }
1162
1163 if (hops > m_MaxHops) {
1164 err_msg = "The '" + kHopsParam + "' value " + to_string(hops) +
1165 " exceeds the server configured value " +
1166 to_string(m_MaxHops) + ".";
1167 m_Counters.Increment(CPSGSCounters::ePSGS_MaxHopsExceededError);
1168 x_SendMessageAndCompletionChunks(reply, err_msg,
1169 CRequestStatus::e400_BadRequest,
1170 ePSGS_MalformedParameter,
1171 eDiag_Error);
1172 PSG_ERROR(err_msg);
1173 return false;
1174 }
1175 }
1176 return true;
1177 }
1178
1179
1180 bool
x_GetEnabledAndDisabledProcessors(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply,vector<string> & enabled_processors,vector<string> & disabled_processors)1181 CPubseqGatewayApp::x_GetEnabledAndDisabledProcessors(
1182 CHttpRequest & req,
1183 shared_ptr<CPSGS_Reply> reply,
1184 vector<string> & enabled_processors,
1185 vector<string> & disabled_processors)
1186 {
1187 static string kEnableProcessor = "enable_processor";
1188 static string kDisableProcessor = "disable_processor";
1189
1190 req.GetMultipleValuesParam(kEnableProcessor.data(),
1191 kEnableProcessor.size(),
1192 enabled_processors);
1193 req.GetMultipleValuesParam(kDisableProcessor.data(),
1194 kDisableProcessor.size(),
1195 disabled_processors);
1196
1197 enabled_processors.erase(
1198 remove_if(enabled_processors.begin(), enabled_processors.end(),
1199 [](string const & s) { return s.empty(); }),
1200 enabled_processors.end());
1201 disabled_processors.erase(
1202 remove_if(disabled_processors.begin(), disabled_processors.end(),
1203 [](string const & s) { return s.empty(); }),
1204 disabled_processors.end());
1205
1206 for (const auto & en_processor : enabled_processors) {
1207 for (const auto & dis_processor : disabled_processors) {
1208 if (NStr::CompareNocase(en_processor, dis_processor) == 0) {
1209 string err_msg = "The same processor name is found "
1210 "in both '" + kEnableProcessor + "' (has it as " + en_processor + ") and '" +
1211 kDisableProcessor + "' (has it as " + dis_processor + ") lists";
1212 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1213 x_SendMessageAndCompletionChunks(reply, err_msg,
1214 CRequestStatus::e400_BadRequest,
1215 ePSGS_MalformedParameter,
1216 eDiag_Error);
1217 PSG_WARNING(err_msg);
1218 return false;
1219 }
1220 }
1221 }
1222
1223 return true;
1224 }
1225
1226
1227 vector<string>
x_GetExcludeBlobs(const string & param_name,const CTempString & param_value) const1228 CPubseqGatewayApp::x_GetExcludeBlobs(const string & param_name,
1229 const CTempString & param_value) const
1230 {
1231 vector<string> result;
1232 vector<string> blob_ids;
1233 NStr::Split(param_value, ",", blob_ids);
1234
1235 size_t empty_count = 0;
1236 for (const auto & item : blob_ids) {
1237 if (item.empty())
1238 ++empty_count;
1239 else
1240 result.push_back(item);
1241 }
1242
1243 if (empty_count > 0)
1244 PSG_WARNING("Found " << empty_count << " empty blob id(s) in the '" <<
1245 param_name << "' list (empty blob ids are ignored)");
1246
1247 return result;
1248 }
1249
1250
x_ConvertIntParameter(const string & param_name,const CTempString & param_value,int & converted,string & err_msg) const1251 bool CPubseqGatewayApp::x_ConvertIntParameter(const string & param_name,
1252 const CTempString & param_value,
1253 int & converted,
1254 string & err_msg) const
1255 {
1256 try {
1257 converted = NStr::StringToInt(param_value);
1258 } catch (...) {
1259 err_msg = "Error converting '" + param_name + "' parameter "
1260 "to integer (received value: '" + string(param_value) + "')";
1261 return false;
1262 }
1263 return true;
1264 }
1265
1266
1267 unsigned long
x_GetDataSize(const IRegistry & reg,const string & section,const string & entry,unsigned long default_val)1268 CPubseqGatewayApp::x_GetDataSize(const IRegistry & reg,
1269 const string & section,
1270 const string & entry,
1271 unsigned long default_val)
1272 {
1273 CConfig conf(reg);
1274 const CConfig::TParamTree * param_tree = conf.GetTree();
1275 const TPluginManagerParamTree * section_tree =
1276 param_tree->FindSubNode(section);
1277
1278 if (!section_tree)
1279 return default_val;
1280
1281 CConfig c((CConfig::TParamTree*)section_tree, eNoOwnership);
1282 return c.GetDataSize("psg", entry, CConfig::eErr_NoThrow,
1283 default_val);
1284 }
1285
1286
PopulateCassandraMapping(bool need_accept_alert)1287 bool CPubseqGatewayApp::PopulateCassandraMapping(bool need_accept_alert)
1288 {
1289 static bool need_logging = true;
1290
1291 size_t vacant_index = m_MappingIndex + 1;
1292 if (vacant_index >= 2)
1293 vacant_index = 0;
1294 m_CassMapping[vacant_index].clear();
1295
1296 try {
1297 string err_msg;
1298 if (!FetchSatToKeyspaceMapping(
1299 m_RootKeyspace, m_CassConnection,
1300 m_SatNames, eBlobVer2Schema,
1301 m_CassMapping[vacant_index].m_BioseqKeyspace, eResolverSchema,
1302 m_CassMapping[vacant_index].m_BioseqNAKeyspaces, eNamedAnnotationsSchema,
1303 err_msg)) {
1304 err_msg = "Error populating cassandra mapping: " + err_msg;
1305 if (need_logging)
1306 PSG_CRITICAL(err_msg);
1307 need_logging = false;
1308 m_Alerts.Register(ePSGS_NoValidCassandraMapping, err_msg);
1309 return false;
1310 }
1311 } catch (const exception & exc) {
1312 string err_msg = "Cannot populate keyspace mapping from Cassandra.";
1313 if (need_logging) {
1314 PSG_CRITICAL(exc);
1315 PSG_CRITICAL(err_msg);
1316 }
1317 need_logging = false;
1318 m_Alerts.Register(ePSGS_NoValidCassandraMapping, err_msg + " " + exc.what());
1319 return false;
1320 } catch (...) {
1321 string err_msg = "Unknown error while populating "
1322 "keyspace mapping from Cassandra.";
1323 if (need_logging)
1324 PSG_CRITICAL(err_msg);
1325 need_logging = false;
1326 m_Alerts.Register(ePSGS_NoValidCassandraMapping, err_msg);
1327 return false;
1328 }
1329
1330 auto errors = m_CassMapping[vacant_index].validate(m_RootKeyspace);
1331 if (m_SatNames.empty())
1332 errors.push_back("No sat to keyspace resolutions found in the '" +
1333 m_RootKeyspace + "' keyspace.");
1334
1335 if (errors.empty()) {
1336 m_MappingIndex = vacant_index;
1337
1338 // Next step in the startup data initialization
1339 m_StartupDataState = ePSGS_NoCassCache;
1340
1341 if (need_accept_alert) {
1342 // We are not the first time here; It means that the alert about
1343 // the bad mapping has been set before. So now we accepted the
1344 // configuration so a change config alert is needed for the UI
1345 // visibility
1346 m_Alerts.Register(ePSGS_NewCassandraMappingAccepted,
1347 "Keyspace mapping (sat names, bioseq info and named "
1348 "annotations) has been populated");
1349 }
1350 } else {
1351 string combined_error("Invalid Cassandra mapping:");
1352 for (const auto & err : errors) {
1353 combined_error += "\n" + err;
1354 }
1355 if (need_logging)
1356 PSG_CRITICAL(combined_error);
1357
1358 m_Alerts.Register(ePSGS_NoValidCassandraMapping, combined_error);
1359 }
1360
1361 need_logging = false;
1362 return errors.empty();
1363 }
1364
1365
PopulatePublicCommentsMapping(void)1366 void CPubseqGatewayApp::PopulatePublicCommentsMapping(void)
1367 {
1368 if (m_PublicComments.get() != nullptr)
1369 return; // We have already been here: the mapping needs to be
1370 // populated once
1371
1372 try {
1373 string err_msg;
1374
1375 m_PublicComments.reset(new CPSGMessages);
1376
1377 if (!FetchMessages(m_RootKeyspace, m_CassConnection,
1378 *m_PublicComments.get(), err_msg)) {
1379 err_msg = "Error populating cassandra public comments mapping: " + err_msg;
1380 PSG_ERROR(err_msg);
1381 m_Alerts.Register(ePSGS_NoCassandraPublicCommentsMapping, err_msg);
1382 }
1383 } catch (const exception & exc) {
1384 string err_msg = "Cannot populate public comments mapping from Cassandra";
1385 PSG_ERROR(exc);
1386 PSG_ERROR(err_msg);
1387 m_Alerts.Register(ePSGS_NoCassandraPublicCommentsMapping,
1388 err_msg + ". " + exc.what());
1389 } catch (...) {
1390 string err_msg = "Unknown error while populating "
1391 "public comments mapping from Cassandra";
1392 PSG_ERROR(err_msg);
1393 m_Alerts.Register(ePSGS_NoCassandraPublicCommentsMapping, err_msg);
1394 }
1395 }
1396
1397
CheckCassMapping(void)1398 void CPubseqGatewayApp::CheckCassMapping(void)
1399 {
1400 size_t vacant_index = m_MappingIndex + 1;
1401 if (vacant_index >= 2)
1402 vacant_index = 0;
1403 m_CassMapping[vacant_index].clear();
1404
1405 vector<string> sat_names;
1406 try {
1407 string err_msg;
1408 if (!FetchSatToKeyspaceMapping(
1409 m_RootKeyspace, m_CassConnection,
1410 sat_names, eBlobVer2Schema,
1411 m_CassMapping[vacant_index].m_BioseqKeyspace, eResolverSchema,
1412 m_CassMapping[vacant_index].m_BioseqNAKeyspaces, eNamedAnnotationsSchema,
1413 err_msg)) {
1414 m_Alerts.Register(ePSGS_InvalidCassandraMapping,
1415 "Error checking cassandra mapping: " + err_msg);
1416 return;
1417 }
1418
1419 } catch (const exception & exc) {
1420 m_Alerts.Register(ePSGS_InvalidCassandraMapping,
1421 "Cannot check keyspace mapping from Cassandra. " +
1422 string(exc.what()));
1423 return;
1424 } catch (...) {
1425 m_Alerts.Register(ePSGS_InvalidCassandraMapping,
1426 "Unknown error while checking "
1427 "keyspace mapping from Cassandra.");
1428 return;
1429 }
1430
1431 auto errors = m_CassMapping[vacant_index].validate(m_RootKeyspace);
1432 if (sat_names.empty())
1433 errors.push_back("No sat to keyspace resolutions found in the " +
1434 m_RootKeyspace + " keyspace.");
1435
1436 if (errors.empty()) {
1437 // No errors detected in the DB; let's compare with what we use now
1438 if (sat_names != m_SatNames)
1439 m_Alerts.Register(ePSGS_NewCassandraSatNamesMapping,
1440 "Cassandra has new sat names mapping in the " +
1441 m_RootKeyspace + " keyspace. The server needs "
1442 "to restart to use it.");
1443
1444 if (m_CassMapping[0] != m_CassMapping[1]) {
1445 m_MappingIndex = vacant_index;
1446 m_Alerts.Register(ePSGS_NewCassandraMappingAccepted,
1447 "Keyspace mapping (bioseq info and named "
1448 "annotations) has been updated");
1449 }
1450 } else {
1451 string combined_error("Invalid Cassandra mapping detected "
1452 "during the periodic check:");
1453 for (const auto & err : errors) {
1454 combined_error += "\n" + err;
1455 }
1456 m_Alerts.Register(ePSGS_InvalidCassandraMapping, combined_error);
1457 }
1458 }
1459
1460
x_IsResolutionParamValid(const string & param_name,const CTempString & param_value,string & err_msg) const1461 bool CPubseqGatewayApp::x_IsResolutionParamValid(const string & param_name,
1462 const CTempString & param_value,
1463 string & err_msg) const
1464 {
1465 static string fast = "fast";
1466 static string full = "full";
1467
1468 if (param_value != fast && param_value != full) {
1469 err_msg = "Malformed '" + param_name + "' parameter. "
1470 "Acceptable values are '" + fast + "' and '" + full + "'.";
1471 return false;
1472 }
1473 return true;
1474 }
1475
1476
1477 // Prepares the chunks for the case when it is a client error so only two
1478 // chunks are required:
1479 // - a message chunk
1480 // - a reply completion chunk
x_SendMessageAndCompletionChunks(shared_ptr<CPSGS_Reply> reply,const string & message,CRequestStatus::ECode status,int code,EDiagSev severity)1481 void CPubseqGatewayApp::x_SendMessageAndCompletionChunks(
1482 shared_ptr<CPSGS_Reply> reply,
1483 const string & message,
1484 CRequestStatus::ECode status, int code, EDiagSev severity)
1485 {
1486 if (reply->IsFinished()) {
1487 // This is the case when a reply is already formed and sent to
1488 // the client.
1489 return;
1490 }
1491
1492 reply->SetContentType(ePSGS_PSGMime);
1493 reply->PrepareReplyMessage(message, status, code, severity);
1494 reply->PrepareReplyCompletion();
1495 reply->Flush();
1496 }
1497
1498
x_MalformedArguments(shared_ptr<CPSGS_Reply> reply,CRef<CRequestContext> & context,const string & err_msg)1499 void CPubseqGatewayApp::x_MalformedArguments(
1500 shared_ptr<CPSGS_Reply> reply,
1501 CRef<CRequestContext> & context,
1502 const string & err_msg)
1503 {
1504 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1505 x_SendMessageAndCompletionChunks(reply, err_msg,
1506 CRequestStatus::e400_BadRequest,
1507 ePSGS_MalformedParameter, eDiag_Error);
1508 PSG_WARNING(err_msg);
1509 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1510 }
1511
1512
x_InsufficientArguments(shared_ptr<CPSGS_Reply> reply,CRef<CRequestContext> & context,const string & err_msg)1513 void CPubseqGatewayApp::x_InsufficientArguments(
1514 shared_ptr<CPSGS_Reply> reply,
1515 CRef<CRequestContext> & context,
1516 const string & err_msg)
1517 {
1518 m_Counters.Increment(CPSGSCounters::ePSGS_InsufficientArgs);
1519 x_SendMessageAndCompletionChunks(reply, err_msg,
1520 CRequestStatus::e400_BadRequest,
1521 ePSGS_InsufficientArguments, eDiag_Error);
1522 PSG_WARNING(err_msg);
1523 x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1524 }
1525
1526
x_ReadIdToNameAndDescriptionConfiguration(const IRegistry & reg,const string & section)1527 void CPubseqGatewayApp::x_ReadIdToNameAndDescriptionConfiguration(
1528 const IRegistry & reg,
1529 const string & section)
1530 {
1531 list<string> entries;
1532 reg.EnumerateEntries(section, &entries);
1533
1534 for(const auto & value_id : entries) {
1535 string name_and_description = reg.Get(section, value_id);
1536 string name;
1537 string description;
1538 if (NStr::SplitInTwo(name_and_description, ":::", name, description,
1539 NStr::fSplit_ByPattern)) {
1540 m_IdToNameAndDescription[value_id] = {name, description};
1541 } else {
1542 PSG_WARNING("Malformed counter [" << section << "]/" << name <<
1543 " information. Expected <name>:::<description");
1544 }
1545 }
1546 }
1547
x_InitSSL(void)1548 void CPubseqGatewayApp::x_InitSSL(void)
1549 {
1550 if (m_SSLEnable) {
1551 SSL_load_error_strings();
1552 SSL_library_init();
1553 OpenSSL_add_all_algorithms();
1554 }
1555 }
1556
1557
x_RegisterProcessors(void)1558 void CPubseqGatewayApp::x_RegisterProcessors(void)
1559 {
1560 // Note: the order of adding defines the priority.
1561 // Earleir added - higher priority
1562 m_RequestDispatcher.AddProcessor(
1563 unique_ptr<IPSGS_Processor>(new CPSGS_ResolveProcessor()));
1564 m_RequestDispatcher.AddProcessor(
1565 unique_ptr<IPSGS_Processor>(new CPSGS_GetProcessor()));
1566 m_RequestDispatcher.AddProcessor(
1567 unique_ptr<IPSGS_Processor>(new CPSGS_GetBlobProcessor()));
1568 m_RequestDispatcher.AddProcessor(
1569 unique_ptr<IPSGS_Processor>(new CPSGS_AnnotProcessor()));
1570 m_RequestDispatcher.AddProcessor(
1571 unique_ptr<IPSGS_Processor>(new CPSGS_TSEChunkProcessor()));
1572 m_RequestDispatcher.AddProcessor(
1573 unique_ptr<IPSGS_Processor>(new psg::osg::CPSGS_OSGProcessor()));
1574 m_RequestDispatcher.AddProcessor(
1575 unique_ptr<IPSGS_Processor>(new psg::cdd::CPSGS_CDDProcessor()));
1576 }
1577
1578
main(int argc,const char * argv[])1579 int main(int argc, const char* argv[])
1580 {
1581 srand(time(NULL));
1582 CThread::InitializeMainThreadId();
1583
1584
1585 g_Diag_Use_RWLock();
1586 CDiagContext::SetOldPostFormat(false);
1587 CRequestContext::SetAllowedSessionIDFormat(CRequestContext::eSID_Other);
1588 CRequestContext::SetDefaultAutoIncRequestIDOnPost(true);
1589 CDiagContext::GetRequestContext().SetAutoIncRequestIDOnPost(true);
1590
1591
1592 int ret = CPubseqGatewayApp().AppMain(argc, argv, NULL, eDS_ToStdlog);
1593 google::protobuf::ShutdownProtobufLibrary();
1594 return ret;
1595 }
1596
1597
CollectGarbage(void)1598 void CollectGarbage(void)
1599 {
1600 CPubseqGatewayApp * app = CPubseqGatewayApp::GetInstance();
1601 app->GetExcludeBlobCache()->Purge();
1602 }
1603
1604
1605