1 /*  $Id: pubseq_gateway.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko
27  *
28  * File Description:
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include <math.h>
34 #include <thread>
35 
36 #include <corelib/ncbithr.hpp>
37 #include <corelib/ncbidiag.hpp>
38 #include <corelib/request_ctx.hpp>
39 #include <corelib/ncbifile.hpp>
40 #include <corelib/ncbi_config.hpp>
41 #include <corelib/plugin_manager.hpp>
42 #include <connect/services/grid_app_version_info.hpp>
43 #include <util/random_gen.hpp>
44 
45 #include <google/protobuf/stubs/common.h>
46 
47 #include <objtools/pubseq_gateway/impl/cassandra/blob_storage.hpp>
48 
49 #include "pubseq_gateway.hpp"
50 #include "pubseq_gateway_exception.hpp"
51 #include "pubseq_gateway_logging.hpp"
52 #include "shutdown_data.hpp"
53 #include "cass_monitor.hpp"
54 #include "introspection.hpp"
55 #include "resolve_processor.hpp"
56 #include "annot_processor.hpp"
57 #include "get_processor.hpp"
58 #include "getblob_processor.hpp"
59 #include "tse_chunk_processor.hpp"
60 #include "osg_processor.hpp"
61 #include "cdd_processor.hpp"
62 #include "favicon.hpp"
63 
64 
65 USING_NCBI_SCOPE;
66 
67 const unsigned short    kWorkersMin = 1;
68 const unsigned short    kWorkersMax = 100;
69 const unsigned short    kWorkersDefault = 32;
70 const unsigned short    kHttpPortMin = 1;
71 const unsigned short    kHttpPortMax = 65534;
72 const unsigned int      kListenerBacklogMin = 5;
73 const unsigned int      kListenerBacklogMax = 2048;
74 const unsigned int      kListenerBacklogDefault = 256;
75 const unsigned short    kTcpMaxConnMax = 65000;
76 const unsigned short    kTcpMaxConnMin = 5;
77 const unsigned short    kTcpMaxConnDefault = 4096;
78 const unsigned int      kTimeoutMsMin = 0;
79 const unsigned int      kTimeoutMsMax = UINT_MAX;
80 const unsigned int      kTimeoutDefault = 30000;
81 const unsigned int      kMaxRetriesDefault = 1;
82 const unsigned int      kMaxRetriesMin = 0;
83 const unsigned int      kMaxRetriesMax = UINT_MAX;
84 const EDiagSev          kDefaultSeverity = eDiag_Critical;
85 const bool              kDefaultLog = true;
86 const bool              kDefaultTrace = false;
87 const string            kDefaultRootKeyspace = "sat_info";
88 const unsigned int      kDefaultExcludeCacheMaxSize = 1000;
89 const unsigned int      kDefaultExcludeCachePurgePercentage = 20;
90 const unsigned int      kDefaultExcludeCacheInactivityPurge = 60;
91 const string            kDefaultAuthToken = "";
92 const bool              kDefaultAllowIOTest = false;
93 const unsigned long     kDefaultSlimMaxBlobSize = 10 * 1024;
94 const unsigned int      kDefaultMaxHops = 2;
95 const unsigned long     kDefaultSmallBlobSize = 16;
96 const bool              kDefaultCassandraProcessorsEnabled = true;
97 const bool              kDefaultOSGProcessorsEnabled = false;
98 const bool              kDefaultCDDProcessorsEnabled = true;
99 const string            kDefaultTestSeqId = "gi|2";
100 const bool              kDefaultTestSeqIdIgnoreError = true;
101 const bool              kDefaultSSLEnable = false;
102 const string            kDefaultSSLCertFile = "";
103 const string            kDefaultSSLKeyFile = "";
104 const string            kDefaultSSLCiphers = "TLSv1.2:!NULL-SHA256";
105 
106 static const string     kDaemonizeArgName = "daemonize";
107 
108 
109 // Memorize the configured severity level to check before using ERR_POST.
110 // Otherwise some expensive operations are executed without a real need.
111 EDiagSev                g_ConfiguredSeverity = kDefaultSeverity;
112 
113 // Memorize the configured tracing to check before using ERR_POST.
114 // Otherwise some expensive operations are executed without a real need.
115 bool                    g_Trace = kDefaultTrace;
116 
117 // Memorize the configured log on/off flag.
118 // It is used in the context resetter to avoid unnecessary context resets
119 bool                    g_Log = kDefaultLog;
120 
121 // Create the shutdown related data. It is used in a few places:
122 // a URL handler, signal handlers, watchdog handlers
123 SShutdownData           g_ShutdownData;
124 
125 
126 CPubseqGatewayApp *     CPubseqGatewayApp::sm_PubseqApp = nullptr;
127 
128 
CPubseqGatewayApp()129 CPubseqGatewayApp::CPubseqGatewayApp() :
130     m_MappingIndex(0),
131     m_HttpPort(0),
132     m_HttpWorkers(kWorkersDefault),
133     m_ListenerBacklog(kListenerBacklogDefault),
134     m_TcpMaxConn(kTcpMaxConnDefault),
135     m_CassConnection(nullptr),
136     m_CassConnectionFactory(CCassConnectionFactory::s_Create()),
137     m_TimeoutMs(kTimeoutDefault),
138     m_MaxRetries(kMaxRetriesDefault),
139     m_ExcludeCacheMaxSize(kDefaultExcludeCacheMaxSize),
140     m_ExcludeCachePurgePercentage(kDefaultExcludeCachePurgePercentage),
141     m_ExcludeCacheInactivityPurge(kDefaultExcludeCacheInactivityPurge),
142     m_SmallBlobSize(kDefaultSmallBlobSize),
143     m_MinStatValue(kMinStatValue),
144     m_MaxStatValue(kMaxStatValue),
145     m_NStatBins(kNStatBins),
146     m_StatScaleType(kStatScaleType),
147     m_TickSpan(kTickSpan),
148     m_StartTime(GetFastLocalTime()),
149     m_AllowIOTest(kDefaultAllowIOTest),
150     m_SlimMaxBlobSize(kDefaultSlimMaxBlobSize),
151     m_MaxHops(kDefaultMaxHops),
152     m_CassandraProcessorsEnabled(kDefaultCassandraProcessorsEnabled),
153     m_TestSeqId(kDefaultTestSeqId),
154     m_TestSeqIdIgnoreError(kDefaultTestSeqIdIgnoreError),
155     m_ExcludeBlobCache(nullptr),
156     m_StartupDataState(ePSGS_NoCassConnection),
157     m_LogFields("http"),
158     m_OSGProcessorsEnabled(kDefaultOSGProcessorsEnabled),
159     m_CDDProcessorsEnabled(kDefaultCDDProcessorsEnabled),
160     m_SSLEnable(kDefaultSSLEnable),
161     m_SSLCiphers(kDefaultSSLCiphers)
162 {
163     sm_PubseqApp = this;
164     m_HelpMessage = GetIntrospectionNode().Repr(CJsonNode::fStandardJson);
165 
166     x_RegisterProcessors();
167 }
168 
169 
~CPubseqGatewayApp()170 CPubseqGatewayApp::~CPubseqGatewayApp()
171 {}
172 
173 
Init(void)174 void CPubseqGatewayApp::Init(void)
175 {
176     unique_ptr<CArgDescriptions>    argdesc(new CArgDescriptions());
177 
178     argdesc->AddFlag(kDaemonizeArgName,
179                      "Turn on daemonization of Pubseq Gateway at the start.");
180 
181     argdesc->SetUsageContext(
182         GetArguments().GetProgramBasename(),
183         "Daemon to service Accession.Version Cache requests");
184     SetupArgDescriptions(argdesc.release());
185 
186     // Memorize the configured severity
187     g_ConfiguredSeverity = GetDiagPostLevel();
188 
189     // Memorize the configure trace
190     g_Trace = GetDiagTrace();
191 }
192 
193 
ParseArgs(void)194 void CPubseqGatewayApp::ParseArgs(void)
195 {
196     const CArgs &           args = GetArgs();
197     const CNcbiRegistry &   registry = GetConfig();
198 
199     if (!registry.HasEntry("SERVER", "port"))
200         NCBI_THROW(CPubseqGatewayException, eConfigurationError,
201                    "[SERVER]/port value is not found in the configuration "
202                    "file. The port must be provided to run the server. "
203                    "Exiting.");
204 
205     m_Si2csiDbFile = registry.GetString("LMDB_CACHE", "dbfile_si2csi", "");
206     m_BioseqInfoDbFile = registry.GetString("LMDB_CACHE", "dbfile_bioseq_info", "");
207     m_BlobPropDbFile = registry.GetString("LMDB_CACHE", "dbfile_blob_prop", "");
208     m_HttpPort = registry.GetInt("SERVER", "port", 0);
209     m_HttpWorkers = registry.GetInt("SERVER", "workers",
210                                     kWorkersDefault);
211     m_ListenerBacklog = registry.GetInt("SERVER", "backlog",
212                                         kListenerBacklogDefault);
213     m_TcpMaxConn = registry.GetInt("SERVER", "maxconn",
214                                    kTcpMaxConnDefault);
215     m_TimeoutMs = registry.GetInt("SERVER", "optimeout",
216                                   kTimeoutDefault);
217     m_MaxRetries = registry.GetInt("SERVER", "maxretries",
218                                    kMaxRetriesDefault);
219     g_Log = registry.GetBool("SERVER", "log",
220                              kDefaultLog);
221     m_RootKeyspace = registry.GetString("SERVER", "root_keyspace",
222                                         kDefaultRootKeyspace);
223 
224     m_ExcludeCacheMaxSize = registry.GetInt("AUTO_EXCLUDE", "max_cache_size",
225                                             kDefaultExcludeCacheMaxSize);
226     m_ExcludeCachePurgePercentage = registry.GetInt("AUTO_EXCLUDE",
227                                                     "purge_percentage",
228                                                     kDefaultExcludeCachePurgePercentage);
229     m_ExcludeCacheInactivityPurge = registry.GetInt("AUTO_EXCLUDE",
230                                                     "inactivity_purge_timeout",
231                                                     kDefaultExcludeCacheInactivityPurge);
232     m_AllowIOTest = registry.GetBool("DEBUG", "psg_allow_io_test",
233                                      kDefaultAllowIOTest);
234 
235     m_SlimMaxBlobSize = x_GetDataSize(registry, "SERVER", "slim_max_blob_size",
236                                       kDefaultSlimMaxBlobSize);
237     m_MaxHops = registry.GetInt("SERVER", "max_hops", kDefaultMaxHops);
238 
239     try {
240         m_AuthToken = registry.GetEncryptedString("ADMIN", "auth_token",
241                                                   IRegistry::fPlaintextAllowed);
242     } catch (const CRegistryException &  ex) {
243         string  msg = "Decrypting error detected while reading "
244                       "[ADMIN]/auth_token value: " + string(ex.what());
245         ERR_POST(msg);
246         m_Alerts.Register(ePSGS_ConfigAuthDecrypt, msg);
247 
248         // Treat the value as a clear text
249         m_AuthToken = registry.GetString("ADMIN", "auth_token",
250                                          kDefaultAuthToken);
251     } catch (...) {
252         string  msg = "Unknown decrypting error detected while reading "
253                       "[ADMIN]/auth_token value";
254         ERR_POST(msg);
255         m_Alerts.Register(ePSGS_ConfigAuthDecrypt, msg);
256 
257         // Treat the value as a clear text
258         m_AuthToken = registry.GetString("ADMIN", "auth_token",
259                                          kDefaultAuthToken);
260     }
261 
262     m_CassConnectionFactory->AppParseArgs(args);
263     m_CassConnectionFactory->LoadConfig(registry, "");
264     m_CassConnectionFactory->SetLogging(GetDiagPostLevel());
265 
266     m_SmallBlobSize = x_GetDataSize(registry, "STATISTICS", "small_blob_size",
267                                     kDefaultSmallBlobSize);
268     m_MinStatValue = registry.GetInt("STATISTICS", "min", kMinStatValue);
269     m_MaxStatValue = registry.GetInt("STATISTICS", "max", kMaxStatValue);
270     m_NStatBins = registry.GetInt("STATISTICS", "n_bins", kNStatBins);
271     m_StatScaleType = registry.GetString("STATISTICS", "type", kStatScaleType);
272     m_TickSpan = registry.GetInt("STATISTICS", "tick_span", kTickSpan);
273 
274     x_ReadIdToNameAndDescriptionConfiguration(registry, "COUNTERS");
275 
276     m_OSGConnectionPool = new psg::osg::COSGConnectionPool();
277     m_OSGConnectionPool->AppParseArgs(args);
278     m_OSGConnectionPool->LoadConfig(registry);
279     m_OSGConnectionPool->SetLogging(GetDiagPostLevel());
280 
281     m_OSGProcessorsEnabled = registry.GetBool(
282             "OSG_PROCESSOR", "enabled",
283             kDefaultOSGProcessorsEnabled);
284     m_CDDProcessorsEnabled = registry.GetBool(
285             "CDD_PROCESSOR", "enabled",
286             kDefaultCDDProcessorsEnabled);
287     m_CassandraProcessorsEnabled = registry.GetBool(
288             "CASSANDRA_PROCESSOR", "enabled",
289             kDefaultCassandraProcessorsEnabled);
290 
291     m_TestSeqId = registry.GetString("HEALTH", "test_seq_id", kDefaultTestSeqId);
292     m_TestSeqIdIgnoreError = registry.GetBool("HEALTH", "test_seq_id_ignore_error",
293                                               kDefaultTestSeqIdIgnoreError);
294 
295     m_SSLEnable = registry.GetBool("SSL", "ssl_enable", kDefaultSSLEnable);
296     m_SSLCertFile = registry.GetString("SSL", "ssl_cert_file", kDefaultSSLCertFile);
297     m_SSLKeyFile = registry.GetString("SSL", "ssl_key_file", kDefaultSSLKeyFile);
298     m_SSLCiphers = registry.GetString("SSL", "ssl_ciphers", kDefaultSSLCiphers);
299 
300     // It throws an exception in case of inability to start
301     x_ValidateArgs();
302 }
303 
304 
OpenCache(void)305 void CPubseqGatewayApp::OpenCache(void)
306 {
307     // It was decided to work with and without the cache even if the wrapper
308     // has not been created. So the cache initialization is called once and
309     // always succeed.
310     m_StartupDataState = ePSGS_StartupDataOK;
311 
312     try {
313         // NB. It was decided that the configuration may ommit the cache file
314         // paths. In this case the server should not use the corresponding
315         // cache at all. This is covered in the CPubseqGatewayCache class.
316         m_LookupCache.reset(new CPubseqGatewayCache(m_BioseqInfoDbFile,
317                                                     m_Si2csiDbFile,
318                                                     m_BlobPropDbFile));
319 
320         // The format of the sat ids is different
321         set<int>        sat_ids;
322         for (size_t  index = 0; index < m_SatNames.size(); ++index) {
323             if (!m_SatNames[index].empty()) {
324                 sat_ids.insert(index);
325             }
326         }
327 
328         m_LookupCache->Open(sat_ids);
329         const auto        errors = m_LookupCache->GetErrors();
330         if (!errors.empty()) {
331             string  msg = "Error opening the LMDB cache:";
332             for (const auto &  err : errors) {
333                 msg += "\n" + err.message;
334             }
335 
336             PSG_ERROR(msg);
337             m_Alerts.Register(ePSGS_OpenCache, msg);
338             m_LookupCache->ResetErrors();
339         }
340     } catch (const exception &  exc) {
341         string      msg = "Error initializing the LMDB cache: " +
342                           string(exc.what()) +
343                           ". The server continues without cache.";
344         PSG_ERROR(exc);
345         m_Alerts.Register(ePSGS_OpenCache, msg);
346         m_LookupCache.reset(nullptr);
347     } catch (...) {
348         string      msg = "Unknown initializing LMDB cache error. "
349                           "The server continues without cache.";
350         PSG_ERROR(msg);
351         m_Alerts.Register(ePSGS_OpenCache, msg);
352         m_LookupCache.reset(nullptr);
353     }
354 }
355 
356 
OpenCass(void)357 bool CPubseqGatewayApp::OpenCass(void)
358 {
359     static bool need_logging = true;
360 
361     try {
362         if (!m_CassConnection)
363             m_CassConnection = m_CassConnectionFactory->CreateInstance();
364         m_CassConnection->Connect();
365 
366         // Next step in the startup data initialization
367         m_StartupDataState = ePSGS_NoValidCassMapping;
368     } catch (const exception &  exc) {
369         string      msg = "Error connecting to Cassandra: " +
370                           string(exc.what());
371         if (need_logging)
372             PSG_CRITICAL(exc);
373 
374         need_logging = false;
375         m_Alerts.Register(ePSGS_OpenCassandra, msg);
376         return false;
377     } catch (...) {
378         string      msg = "Unknown Cassandra connecting error";
379         if (need_logging)
380             PSG_CRITICAL(msg);
381 
382         need_logging = false;
383         m_Alerts.Register(ePSGS_OpenCassandra, msg);
384         return false;
385     }
386 
387     need_logging = false;
388     return true;
389 }
390 
391 
CloseCass(void)392 void CPubseqGatewayApp::CloseCass(void)
393 {
394     m_CassConnection = nullptr;
395     m_CassConnectionFactory = nullptr;
396 }
397 
398 
SatToKeyspace(int sat,string & sat_name)399 bool CPubseqGatewayApp::SatToKeyspace(int  sat, string &  sat_name)
400 {
401     if (sat >= 0) {
402         if (sat < static_cast<int>(m_SatNames.size())) {
403             sat_name = m_SatNames[sat];
404             return !sat_name.empty();
405         }
406     }
407     return false;
408 }
409 
410 
Run(void)411 int CPubseqGatewayApp::Run(void)
412 {
413     srand(time(NULL));
414 
415     try {
416         ParseArgs();
417     } catch (const exception &  exc) {
418         PSG_CRITICAL(exc.what());
419         return 1;
420     } catch (...) {
421         PSG_CRITICAL("Unknown argument parsing error");
422         return 1;
423     }
424 
425     if (GetArgs()[kDaemonizeArgName]) {
426         // NOTE: if the stdin/stdout are not kept (default daemonize behavior)
427         // then libuv fails to close its events loop. There are some asserts
428         // with fds, one of them fails so a core dump is generated.
429         // With stdin/stdout kept open libuv stays happy and does not crash
430         bool    is_good = CCurrentProcess::Daemonize(kEmptyCStr,
431                                                      CCurrentProcess::fDF_KeepCWD |
432                                                      CCurrentProcess::fDF_KeepStdin |
433                                                      CCurrentProcess::fDF_KeepStdout);
434         if (!is_good)
435             NCBI_THROW(CPubseqGatewayException, eDaemonizationFailed,
436                        "Error during daemonization");
437     }
438 
439     bool    connected = OpenCass();
440     bool    populated = false;
441     if (connected) {
442         PopulatePublicCommentsMapping();
443         populated = PopulateCassandraMapping();
444     }
445 
446     if (populated)
447         OpenCache();
448 
449     // m_IdToNameAndDescription was populated at the time of
450     // dealing with arguments
451     m_Counters.UpdateConfiguredNameDescription(m_IdToNameAndDescription);
452 
453     auto purge_size = round(float(m_ExcludeCacheMaxSize) *
454                             float(m_ExcludeCachePurgePercentage) / 100.0);
455     m_ExcludeBlobCache.reset(
456         new CExcludeBlobCache(m_ExcludeCacheInactivityPurge,
457                               m_ExcludeCacheMaxSize,
458                               m_ExcludeCacheMaxSize - static_cast<size_t>(purge_size)));
459 
460     m_Timing.reset(new COperationTiming(m_MinStatValue,
461                                         m_MaxStatValue,
462                                         m_NStatBins,
463                                         m_StatScaleType,
464                                         m_SmallBlobSize));
465 
466     vector<CHttpHandler<CPendingOperation>>     http_handler;
467     CHttpGetParser                              get_parser;
468 
469     http_handler.emplace_back(
470             "/ID/getblob",
471             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
472             {
473                 return OnGetBlob(req, reply);
474             }, &get_parser, nullptr);
475     http_handler.emplace_back(
476             "/ID/get",
477             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
478             {
479                 return OnGet(req, reply);
480             }, &get_parser, nullptr);
481     http_handler.emplace_back(
482             "/ID/resolve",
483             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
484             {
485                 return OnResolve(req, reply);
486             }, &get_parser, nullptr);
487     http_handler.emplace_back(
488             "/ID/get_tse_chunk",
489             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
490             {
491                 return OnGetTSEChunk(req, reply);
492             }, &get_parser, nullptr);
493     http_handler.emplace_back(
494             "/ID/get_na",
495             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
496             {
497                 return OnGetNA(req, reply);
498             }, &get_parser, nullptr);
499     http_handler.emplace_back(
500             "/health",
501             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
502             {
503                 return OnHealth(req, reply);
504             }, &get_parser, nullptr);
505     http_handler.emplace_back(
506             "/deep-health",
507             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
508             {
509                 // For the time being 'deep-health' matches 'health'
510                 return OnHealth(req, reply);
511             }, &get_parser, nullptr);
512     http_handler.emplace_back(
513             "/ADMIN/config",
514             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
515             {
516                 return OnConfig(req, reply);
517             }, &get_parser, nullptr);
518     http_handler.emplace_back(
519             "/ADMIN/info",
520             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
521             {
522                 return OnInfo(req, reply);
523             }, &get_parser, nullptr);
524     http_handler.emplace_back(
525             "/ADMIN/status",
526             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
527             {
528                 return OnStatus(req, reply);
529             }, &get_parser, nullptr);
530     http_handler.emplace_back(
531             "/ADMIN/shutdown",
532             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
533             {
534                 return OnShutdown(req, reply);
535             }, &get_parser, nullptr);
536     http_handler.emplace_back(
537             "/ADMIN/get_alerts",
538             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
539             {
540                 return OnGetAlerts(req, reply);
541             }, &get_parser, nullptr);
542     http_handler.emplace_back(
543             "/ADMIN/ack_alert",
544             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
545             {
546                 return OnAckAlert(req, reply);
547             }, &get_parser, nullptr);
548     http_handler.emplace_back(
549             "/ADMIN/statistics",
550             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
551             {
552                 return OnStatistics(req, reply);
553             }, &get_parser, nullptr);
554     http_handler.emplace_back(
555             "/favicon.ico",
556             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
557             {
558                 // It's a browser, most probably admin request
559                 reply->GetHttpReply()->SetContentType(ePSGS_ImageMime);
560                 reply->GetHttpReply()->SetContentLength(sizeof(favicon));
561                 reply->GetHttpReply()->SendOk((const char *)(favicon), sizeof(favicon), true);
562                 return 0;
563             }, &get_parser, nullptr);
564 
565     if (m_AllowIOTest) {
566         m_IOTestBuffer.reset(new char[kMaxTestIOSize]);
567         CRandom     random;
568         char *      current = m_IOTestBuffer.get();
569         for (size_t  k = 0; k < kMaxTestIOSize; k += 8) {
570             Uint8   random_val = random.GetRandUint8();
571             memcpy(current, &random_val, 8);
572             current += 8;
573         }
574 
575         http_handler.emplace_back(
576                 "/TEST/io",
577                 [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
578                 {
579                     return OnTestIO(req, reply);
580                 }, &get_parser, nullptr);
581     }
582 
583     http_handler.emplace_back(
584             "",
585             [this](CHttpRequest &  req, shared_ptr<CPSGS_Reply>  reply)->int
586             {
587                 return OnBadURL(req, reply);
588             }, &get_parser, nullptr);
589 
590 
591     x_InitSSL();
592     m_TcpDaemon.reset(
593             new CHttpDaemon<CPendingOperation>(http_handler, "0.0.0.0",
594                                                m_HttpPort,
595                                                m_HttpWorkers,
596                                                m_ListenerBacklog,
597                                                m_TcpMaxConn));
598 
599 
600 
601     // Run the monitoring thread
602     int             ret_code = 0;
603     std::thread     monitoring_thread(CassMonitorThreadedFunction);
604 
605     try {
606         m_TcpDaemon->Run([this](TSL::CTcpDaemon<CHttpProto<CPendingOperation>,
607                            CHttpConnection<CPendingOperation>,
608                            CHttpDaemon<CPendingOperation>> &  tcp_daemon)
609                 {
610                     // This lambda is called once per second.
611                     // Earlier implementations printed counters on stdout.
612 
613                     static unsigned long   tick_no = 0;
614                     if (++tick_no % m_TickSpan == 0) {
615                         tick_no = 0;
616                         this->m_Timing->Rotate();
617                     }
618                 });
619     } catch (const CException &  exc) {
620         ERR_POST(Critical << exc);
621         ret_code = 1;
622         g_ShutdownData.m_ShutdownRequested = true;
623         monitoring_thread.detach();     // to avoid up to 60 sec delay
624     } catch (const exception &  exc) {
625         ERR_POST(Critical << exc);
626         ret_code = 1;
627         g_ShutdownData.m_ShutdownRequested = true;
628         monitoring_thread.detach();     // to avoid up to 60 sec delay
629     } catch (...) {
630         ERR_POST(Critical << "Unknown exception while running TCP daemon");
631         ret_code = 1;
632         g_ShutdownData.m_ShutdownRequested = true;
633         monitoring_thread.detach();     // to avoid up to 60 sec delay
634     }
635 
636     if (monitoring_thread.joinable()) {
637         monitoring_thread.join();
638     }
639 
640     CloseCass();
641     return ret_code;
642 }
643 
644 
645 
GetInstance(void)646 CPubseqGatewayApp *  CPubseqGatewayApp::GetInstance(void)
647 {
648     return sm_PubseqApp;
649 }
650 
651 
x_ValidateArgs(void)652 void CPubseqGatewayApp::x_ValidateArgs(void)
653 {
654     if (m_HttpPort < kHttpPortMin || m_HttpPort > kHttpPortMax) {
655         NCBI_THROW(CPubseqGatewayException, eConfigurationError,
656                    "[SERVER]/port value is out of range. Allowed range: " +
657                    to_string(kHttpPortMin) + "..." +
658                    to_string(kHttpPortMax) + ". Received: " +
659                    to_string(m_HttpPort));
660     }
661 
662     if (m_Si2csiDbFile.empty()) {
663         PSG_WARNING("[LMDB_CACHE]/dbfile_si2csi is not found "
664                     "in the ini file. No si2csi cache will be used.");
665     }
666 
667     if (m_BioseqInfoDbFile.empty()) {
668         PSG_WARNING("[LMDB_CACHE]/dbfile_bioseq_info is not found "
669                     "in the ini file. No bioseq_info cache will be used.");
670     }
671 
672     if (m_BlobPropDbFile.empty()) {
673         PSG_WARNING("[LMDB_CACHE]/dbfile_blob_prop is not found "
674                     "in the ini file. No blob_prop cache will be used.");
675     }
676 
677     if (m_HttpWorkers < kWorkersMin || m_HttpWorkers > kWorkersMax) {
678         string  err_msg =
679             "The number of HTTP workers is out of range. Allowed "
680             "range: " + to_string(kWorkersMin) + "..." +
681             to_string(kWorkersMax) + ". Received: " +
682             to_string(m_HttpWorkers) + ". Reset to "
683             "default: " + to_string(kWorkersDefault);
684         m_Alerts.Register(ePSGS_ConfigHttpWorkers, err_msg);
685         PSG_ERROR(err_msg);
686         m_HttpWorkers = kWorkersDefault;
687     }
688 
689     if (m_ListenerBacklog < kListenerBacklogMin ||
690         m_ListenerBacklog > kListenerBacklogMax) {
691         string  err_msg =
692             "The listener backlog is out of range. Allowed "
693             "range: " + to_string(kListenerBacklogMin) + "..." +
694             to_string(kListenerBacklogMax) + ". Received: " +
695             to_string(m_ListenerBacklog) + ". Reset to "
696             "default: " + to_string(kListenerBacklogDefault);
697         m_Alerts.Register(ePSGS_ConfigListenerBacklog, err_msg);
698         PSG_ERROR(err_msg);
699         m_ListenerBacklog = kListenerBacklogDefault;
700     }
701 
702     if (m_TcpMaxConn < kTcpMaxConnMin || m_TcpMaxConn > kTcpMaxConnMax) {
703         string  err_msg =
704             "The max number of connections is out of range. Allowed "
705             "range: " + to_string(kTcpMaxConnMin) + "..." +
706             to_string(kTcpMaxConnMax) + ". Received: " +
707             to_string(m_TcpMaxConn) + ". Reset to "
708             "default: " + to_string(kTcpMaxConnDefault);
709         m_Alerts.Register(ePSGS_ConfigMaxConnections, err_msg);
710         PSG_ERROR(err_msg);
711         m_TcpMaxConn = kTcpMaxConnDefault;
712     }
713 
714     if (m_TimeoutMs < kTimeoutMsMin || m_TimeoutMs > kTimeoutMsMax) {
715         string  err_msg =
716             "The operation timeout is out of range. Allowed "
717             "range: " + to_string(kTimeoutMsMin) + "..." +
718             to_string(kTimeoutMsMax) + ". Received: " +
719             to_string(m_TimeoutMs) + ". Reset to "
720             "default: " + to_string(kTimeoutDefault);
721         m_Alerts.Register(ePSGS_ConfigTimeout, err_msg);
722         PSG_ERROR(err_msg);
723         m_TimeoutMs = kTimeoutDefault;
724     }
725 
726     if (m_MaxRetries < kMaxRetriesMin || m_MaxRetries > kMaxRetriesMax) {
727         string  err_msg =
728             "The max retries is out of range. Allowed "
729             "range: " + to_string(kMaxRetriesMin) + "..." +
730             to_string(kMaxRetriesMax) + ". Received: " +
731             to_string(m_MaxRetries) + ". Reset to "
732             "default: " + to_string(kMaxRetriesDefault);
733         m_Alerts.Register(ePSGS_ConfigRetries, err_msg);
734         PSG_ERROR(err_msg);
735         m_MaxRetries = kMaxRetriesDefault;
736     }
737 
738     if (m_ExcludeCacheMaxSize < 0) {
739         string  err_msg =
740             "The max exclude cache size must be a positive integer. "
741             "Received: " + to_string(m_ExcludeCacheMaxSize) + ". "
742             "Reset to 0 (exclude blobs cache is disabled)";
743         m_Alerts.Register(ePSGS_ConfigExcludeCacheSize, err_msg);
744         PSG_ERROR(err_msg);
745         m_ExcludeCacheMaxSize = 0;
746     }
747 
748     if (m_ExcludeCachePurgePercentage < 0 || m_ExcludeCachePurgePercentage > 100) {
749         string  err_msg = "The exclude cache purge percentage is out of range. "
750             "Allowed: 0...100. Received: " +
751             to_string(m_ExcludeCachePurgePercentage) + ". ";
752         if (m_ExcludeCacheMaxSize > 0) {
753             err_msg += "Reset to " +
754                 to_string(kDefaultExcludeCachePurgePercentage);
755             PSG_ERROR(err_msg);
756         } else {
757             err_msg += "The provided value has no effect "
758                 "because the exclude cache is disabled.";
759             PSG_WARNING(err_msg);
760         }
761         m_ExcludeCachePurgePercentage = kDefaultExcludeCachePurgePercentage;
762         m_Alerts.Register(ePSGS_ConfigExcludeCachePurgeSize, err_msg);
763     }
764 
765     if (m_ExcludeCacheInactivityPurge <= 0) {
766         string  err_msg = "The exclude cache inactivity purge timeout must be "
767             "a positive integer greater than zero. Received: " +
768             to_string(m_ExcludeCacheInactivityPurge) + ". ";
769         if (m_ExcludeCacheMaxSize > 0) {
770             err_msg += "Reset to " +
771                 to_string(kDefaultExcludeCacheInactivityPurge);
772             PSG_ERROR(err_msg);
773         } else {
774             err_msg += "The provided value has no effect "
775                 "because the exclude cache is disabled.";
776             PSG_WARNING(err_msg);
777         }
778         m_ExcludeCacheInactivityPurge = kDefaultExcludeCacheInactivityPurge;
779         m_Alerts.Register(ePSGS_ConfigExcludeCacheInactivity, err_msg);
780     }
781 
782     bool        stat_settings_good = true;
783     if (NStr::CompareNocase(m_StatScaleType, "log") != 0 &&
784         NStr::CompareNocase(m_StatScaleType, "linear") != 0) {
785         string  err_msg = "Invalid [STATISTICS]/type value '" +
786             m_StatScaleType + "'. Allowed values are: log, linear. "
787             "The statistics parameters are reset to default.";
788         m_Alerts.Register(ePSGS_ConfigStatScaleType, err_msg);
789         PSG_ERROR(err_msg);
790         stat_settings_good = false;
791 
792         m_MinStatValue = kMinStatValue;
793         m_MaxStatValue = kMaxStatValue;
794         m_NStatBins = kNStatBins;
795         m_StatScaleType = kStatScaleType;
796     }
797 
798     if (stat_settings_good) {
799         if (m_MinStatValue > m_MaxStatValue) {
800             string  err_msg = "Invalid [STATISTICS]/min and max values. The "
801                 "max cannot be less than min. "
802                 "The statistics parameters are reset to default.";
803             m_Alerts.Register(ePSGS_ConfigStatMinMaxVal, err_msg);
804             PSG_ERROR(err_msg);
805             stat_settings_good = false;
806 
807             m_MinStatValue = kMinStatValue;
808             m_MaxStatValue = kMaxStatValue;
809             m_NStatBins = kNStatBins;
810             m_StatScaleType = kStatScaleType;
811         }
812     }
813 
814     if (stat_settings_good) {
815         if (m_NStatBins <= 0) {
816             string  err_msg = "Invalid [STATISTICS]/n_bins value. The "
817                 "number of bins must be greater than 0. "
818                 "The statistics parameters are reset to default.";
819             m_Alerts.Register(ePSGS_ConfigStatNBins, err_msg);
820             PSG_ERROR(err_msg);
821 
822             // Uncomment if there will be more [STATISTICS] section parameters
823             // stat_settings_good = false;
824 
825             m_MinStatValue = kMinStatValue;
826             m_MaxStatValue = kMaxStatValue;
827             m_NStatBins = kNStatBins;
828             m_StatScaleType = kStatScaleType;
829         }
830     }
831 
832     if (m_TickSpan <= 0) {
833         PSG_WARNING("Invalid [STATISTICS]/tick_span value (" +
834                     to_string(m_TickSpan) + "). "
835                     "The tick span must be greater than 0. The tick span is "
836                     "reset to the default value (" +
837                     to_string(kTickSpan) + ").");
838         m_TickSpan = kTickSpan;
839     }
840 
841     if (m_MaxHops <= 0) {
842         PSG_WARNING("Invalid [SERVER]/max_hops value (" +
843                     to_string(m_MaxHops) + "). "
844                     "The max hops must be greater than 0. The max hops is "
845                     "reset to the default value (" +
846                     to_string(kDefaultMaxHops) + ").");
847         m_MaxHops = kDefaultMaxHops;
848     }
849 
850     if (m_SSLEnable) {
851         if (m_SSLCertFile.empty()) {
852             NCBI_THROW(CPubseqGatewayException, eConfigurationError,
853                        "[SSL]/ssl_cert_file value must be provided "
854                        "if [SSL]/ssl_enable is set to true");
855         }
856         if (m_SSLKeyFile.empty()) {
857             NCBI_THROW(CPubseqGatewayException, eConfigurationError,
858                        "[SSL]/ssl_key_file value must be provided "
859                        "if [SSL]/ssl_enable is set to true");
860         }
861 
862         if (!CFile(m_SSLCertFile).Exists()) {
863             NCBI_THROW(CPubseqGatewayException, eConfigurationError,
864                        "[SSL]/ssl_cert_file is not found");
865         }
866         if (!CFile(m_SSLKeyFile).Exists()) {
867             NCBI_THROW(CPubseqGatewayException, eConfigurationError,
868                        "[SSL]/ssl_key_file is not found");
869         }
870 
871         if (m_SSLCiphers.empty()) {
872             m_SSLCiphers = kDefaultSSLCiphers;
873         }
874     }
875 }
876 
877 
x_GetCmdLineArguments(void) const878 string CPubseqGatewayApp::x_GetCmdLineArguments(void) const
879 {
880     const CNcbiArguments &  arguments = CNcbiApplication::Instance()->
881                                                             GetArguments();
882     size_t                  args_size = arguments.Size();
883     string                  cmdline_args;
884 
885     for (size_t index = 0; index < args_size; ++index) {
886         if (index != 0)
887             cmdline_args += " ";
888         cmdline_args += arguments[index];
889     }
890     return cmdline_args;
891 }
892 
893 
894 static string   kNcbiSidHeader = "HTTP_NCBI_SID";
895 static string   kNcbiPhidHeader = "HTTP_NCBI_PHID";
896 static string   kXForwardedForHeader = "X-Forwarded-For";
897 static string   kUserAgentHeader = "User-Agent";
898 static string   kUserAgentApplog = "USER_AGENT";
899 static string   kRequestPathApplog = "request_path";
x_CreateRequestContext(CHttpRequest & req) const900 CRef<CRequestContext> CPubseqGatewayApp::x_CreateRequestContext(
901                                                 CHttpRequest &  req) const
902 {
903     CRef<CRequestContext>   context;
904     if (g_Log) {
905         context.Reset(new CRequestContext());
906         context->SetRequestID();
907 
908         // NCBI SID may come from the header
909         string      sid = req.GetHeaderValue(kNcbiSidHeader);
910         if (!sid.empty())
911             context->SetSessionID(sid);
912         else
913             context->SetSessionID();
914 
915         // NCBI PHID may come from the header
916         string      phid = req.GetHeaderValue(kNcbiPhidHeader);
917         if (!phid.empty())
918             context->SetHitID(phid);
919         else
920             context->SetHitID();
921 
922         // Client IP may come from the headers
923         string      client_ip = req.GetHeaderValue(kXForwardedForHeader);
924         if (!client_ip.empty()) {
925             vector<string>      ip_addrs;
926             NStr::Split(client_ip, ",", ip_addrs);
927             if (!ip_addrs.empty()) {
928                 // Take the first one, there could be many...
929                 context->SetClientIP(ip_addrs[0]);
930             }
931         } else {
932             client_ip = req.GetPeerIP();
933             if (!client_ip.empty()) {
934                 context->SetClientIP(client_ip);
935             }
936         }
937 
938         // It was decided not to use the standard C++ Toolkit function because
939         // it searches in the CGI environment and does quite a bit of
940         // unnecessary things. The PSG server only needs X-Forwarded-For
941         // TNCBI_IPv6Addr  client_address = req.GetClientIP();
942         // if (!NcbiIsEmptyIPv6(&client_address)) {
943         //     char        buf[256];
944         //     if (NcbiIPv6ToString(buf, sizeof(buf), &client_address) != 0) {
945         //         context->SetClientIP(buf);
946         //     }
947         // }
948 
949         CDiagContext::SetRequestContext(context);
950         CDiagContext_Extra  extra = GetDiagContext().PrintRequestStart();
951 
952         // This is the URL path
953         extra.Print(kRequestPathApplog, req.GetPath());
954 
955         string      user_agent = req.GetHeaderValue(kUserAgentHeader);
956         if (!user_agent.empty())
957             extra.Print(kUserAgentApplog, user_agent);
958 
959         req.PrintParams(extra);
960         req.PrintLogFields(m_LogFields);
961 
962         // If extra is not flushed then it picks read-only even though it is
963         // done after...
964         extra.Flush();
965 
966         // Just in case, avoid to have 0
967         context->SetRequestStatus(CRequestStatus::e200_Ok);
968         context->SetReadOnly(true);
969     }
970     return context;
971 }
972 
973 
x_PrintRequestStop(CRef<CRequestContext> context,int status)974 void CPubseqGatewayApp::x_PrintRequestStop(CRef<CRequestContext>   context,
975                                            int  status)
976 {
977     if (context.NotNull()) {
978         CDiagContext::SetRequestContext(context);
979         context->SetReadOnly(false);
980         context->SetRequestStatus(status);
981         GetDiagContext().PrintRequestStop();
982         context.Reset();
983         CDiagContext::SetRequestContext(NULL);
984     }
985 }
986 
987 
988 CPubseqGatewayApp::SRequestParameter
x_GetParam(CHttpRequest & req,const string & name) const989 CPubseqGatewayApp::x_GetParam(CHttpRequest &  req, const string &  name) const
990 {
991     SRequestParameter       param;
992     const char *            value;
993     size_t                  value_size;
994 
995     param.m_Found = req.GetParam(name.data(), name.size(),
996                                  true, &value, &value_size);
997     if (param.m_Found)
998         param.m_Value.assign(value, value_size);
999     return param;
1000 }
1001 
1002 
x_IsBoolParamValid(const string & param_name,const CTempString & param_value,string & err_msg) const1003 bool CPubseqGatewayApp::x_IsBoolParamValid(const string &  param_name,
1004                                            const CTempString &  param_value,
1005                                            string &  err_msg) const
1006 {
1007     static string   yes = "yes";
1008     static string   no = "no";
1009 
1010     if (param_value != yes && param_value != no) {
1011         err_msg = "Malformed '" + param_name + "' parameter. "
1012                   "Acceptable values are '" + yes + "' and '" + no + "'.";
1013         return false;
1014     }
1015     return true;
1016 }
1017 
1018 
1019 SPSGS_ResolveRequest::EPSGS_OutputFormat
x_GetOutputFormat(const string & param_name,const CTempString & param_value,string & err_msg) const1020 CPubseqGatewayApp::x_GetOutputFormat(const string &  param_name,
1021                                      const CTempString &  param_value,
1022                                      string &  err_msg) const
1023 {
1024     static string   protobuf = "protobuf";
1025     static string   json = "json";
1026     static string   native = "native";
1027 
1028     if (param_value == protobuf)
1029         return SPSGS_ResolveRequest::ePSGS_ProtobufFormat;
1030     if (param_value == json)
1031         return SPSGS_ResolveRequest::ePSGS_JsonFormat;
1032     if (param_value == native)
1033         return SPSGS_ResolveRequest::ePSGS_NativeFormat;
1034 
1035     err_msg = "Malformed '" + param_name + "' parameter. "
1036               "Acceptable values are '" +
1037               protobuf + "' and '" +
1038               json + "' and '" +
1039               native + "'.";
1040     return SPSGS_ResolveRequest::ePSGS_UnknownFormat;
1041 }
1042 
1043 
1044 SPSGS_BlobRequestBase::EPSGS_TSEOption
x_GetTSEOption(const string & param_name,const CTempString & param_value,string & err_msg) const1045 CPubseqGatewayApp::x_GetTSEOption(const string &  param_name,
1046                                   const CTempString &  param_value,
1047                                   string &  err_msg) const
1048 {
1049     static string   none = "none";
1050     static string   whole = "whole";
1051     static string   orig = "orig";
1052     static string   smart = "smart";
1053     static string   slim = "slim";
1054 
1055     if (param_value == none)
1056         return SPSGS_BlobRequestBase::ePSGS_NoneTSE;
1057     if (param_value == whole)
1058         return SPSGS_BlobRequestBase::ePSGS_WholeTSE;
1059     if (param_value == orig)
1060         return SPSGS_BlobRequestBase::ePSGS_OrigTSE;
1061     if (param_value == smart)
1062         return SPSGS_BlobRequestBase::ePSGS_SmartTSE;
1063     if (param_value == slim)
1064         return SPSGS_BlobRequestBase::ePSGS_SlimTSE;
1065 
1066     err_msg = "Malformed '" + param_name + "' parameter. "
1067               "Acceptable values are '" +
1068               none + "', '" +
1069               whole + "', '" +
1070               orig + "', '" +
1071               smart + "' and '" +
1072               slim + "'.";
1073     return SPSGS_BlobRequestBase::ePSGS_UnknownTSE;
1074 }
1075 
1076 
1077 SPSGS_RequestBase::EPSGS_AccSubstitutioOption
x_GetAccessionSubstitutionOption(const string & param_name,const CTempString & param_value,string & err_msg) const1078 CPubseqGatewayApp::x_GetAccessionSubstitutionOption(
1079                                     const string &  param_name,
1080                                     const CTempString &  param_value,
1081                                     string &  err_msg) const
1082 {
1083     static string       default_option = "default";
1084     static string       limited_option = "limited";
1085     static string       never_option = "never";
1086 
1087     if (param_value == default_option)
1088         return SPSGS_RequestBase::ePSGS_DefaultAccSubstitution;
1089     if (param_value == limited_option)
1090         return SPSGS_RequestBase::ePSGS_LimitedAccSubstitution;
1091     if (param_value == never_option)
1092         return SPSGS_RequestBase::ePSGS_NeverAccSubstitute;
1093 
1094     err_msg = "Malformed '" + param_name + "' parameter. "
1095               "Acceptable values are '" +
1096               default_option + "', '" +
1097               limited_option + "', '" +
1098               never_option + "'.";
1099     return SPSGS_RequestBase::ePSGS_UnknownAccSubstitution;
1100 }
1101 
1102 
1103 bool
x_GetTraceParameter(CHttpRequest & req,const string & param_name,SPSGS_RequestBase::EPSGS_Trace & trace,string & err_msg)1104 CPubseqGatewayApp::x_GetTraceParameter(CHttpRequest &  req,
1105                                        const string &  param_name,
1106                                        SPSGS_RequestBase::EPSGS_Trace &  trace,
1107                                        string &  err_msg)
1108 {
1109     trace = SPSGS_RequestBase::ePSGS_NoTracing;
1110     SRequestParameter   trace_protocol_param = x_GetParam(req, param_name);
1111 
1112     if (trace_protocol_param.m_Found) {
1113         if (!x_IsBoolParamValid(param_name,
1114                                 trace_protocol_param.m_Value, err_msg)) {
1115             m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1116             PSG_WARNING(err_msg);
1117             return false;
1118         }
1119         if (trace_protocol_param.m_Value == "yes")
1120             trace = SPSGS_RequestBase::ePSGS_WithTracing;
1121         else
1122             trace = SPSGS_RequestBase::ePSGS_NoTracing;;
1123     }
1124     return true;
1125 }
1126 
1127 
1128 bool
x_GetHops(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply,int & hops)1129 CPubseqGatewayApp::x_GetHops(CHttpRequest &  req,
1130                              shared_ptr<CPSGS_Reply>  reply,
1131                              int &  hops)
1132 {
1133     static string   kHopsParam = "hops";
1134 
1135     hops = 0;   // Default value
1136     SRequestParameter   hops_param = x_GetParam(req, kHopsParam);
1137 
1138     if (hops_param.m_Found) {
1139         string      err_msg;
1140         if (!x_ConvertIntParameter(kHopsParam, hops_param.m_Value,
1141                                    hops, err_msg)) {
1142             m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1143             x_SendMessageAndCompletionChunks(reply, err_msg,
1144                                              CRequestStatus::e400_BadRequest,
1145                                              ePSGS_MalformedParameter,
1146                                              eDiag_Error);
1147             PSG_ERROR(err_msg);
1148             return false;
1149         }
1150 
1151         if (hops < 0) {
1152             err_msg = "Invalid '" + kHopsParam + "' value " + to_string(hops) +
1153                       ". It must be > 0.";
1154             m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1155             x_SendMessageAndCompletionChunks(reply, err_msg,
1156                                              CRequestStatus::e400_BadRequest,
1157                                              ePSGS_MalformedParameter,
1158                                              eDiag_Error);
1159             PSG_ERROR(err_msg);
1160             return false;
1161         }
1162 
1163         if (hops > m_MaxHops) {
1164             err_msg = "The '" + kHopsParam + "' value " + to_string(hops) +
1165                       " exceeds the server configured value " +
1166                       to_string(m_MaxHops) + ".";
1167             m_Counters.Increment(CPSGSCounters::ePSGS_MaxHopsExceededError);
1168             x_SendMessageAndCompletionChunks(reply, err_msg,
1169                                              CRequestStatus::e400_BadRequest,
1170                                              ePSGS_MalformedParameter,
1171                                              eDiag_Error);
1172             PSG_ERROR(err_msg);
1173             return false;
1174         }
1175     }
1176     return true;
1177 }
1178 
1179 
1180 bool
x_GetEnabledAndDisabledProcessors(CHttpRequest & req,shared_ptr<CPSGS_Reply> reply,vector<string> & enabled_processors,vector<string> & disabled_processors)1181 CPubseqGatewayApp::x_GetEnabledAndDisabledProcessors(
1182                                         CHttpRequest &  req,
1183                                         shared_ptr<CPSGS_Reply>  reply,
1184                                         vector<string> &  enabled_processors,
1185                                         vector<string> &  disabled_processors)
1186 {
1187     static string   kEnableProcessor = "enable_processor";
1188     static string   kDisableProcessor = "disable_processor";
1189 
1190     req.GetMultipleValuesParam(kEnableProcessor.data(),
1191                                kEnableProcessor.size(),
1192                                enabled_processors);
1193     req.GetMultipleValuesParam(kDisableProcessor.data(),
1194                                kDisableProcessor.size(),
1195                                disabled_processors);
1196 
1197     enabled_processors.erase(
1198         remove_if(enabled_processors.begin(), enabled_processors.end(),
1199                   [](string const & s) { return s.empty(); }),
1200         enabled_processors.end());
1201     disabled_processors.erase(
1202         remove_if(disabled_processors.begin(), disabled_processors.end(),
1203                   [](string const & s) { return s.empty(); }),
1204         disabled_processors.end());
1205 
1206     for (const auto & en_processor : enabled_processors) {
1207         for (const auto &  dis_processor : disabled_processors) {
1208             if (NStr::CompareNocase(en_processor, dis_processor) == 0) {
1209                 string      err_msg = "The same processor name is found "
1210                     "in both '" + kEnableProcessor + "' (has it as " + en_processor + ") and '" +
1211                     kDisableProcessor + "' (has it as " + dis_processor + ") lists";
1212                 m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1213                 x_SendMessageAndCompletionChunks(reply, err_msg,
1214                                                  CRequestStatus::e400_BadRequest,
1215                                                  ePSGS_MalformedParameter,
1216                                                  eDiag_Error);
1217                 PSG_WARNING(err_msg);
1218                 return false;
1219             }
1220         }
1221     }
1222 
1223     return true;
1224 }
1225 
1226 
1227 vector<string>
x_GetExcludeBlobs(const string & param_name,const CTempString & param_value) const1228 CPubseqGatewayApp::x_GetExcludeBlobs(const string &  param_name,
1229                                      const CTempString &  param_value) const
1230 {
1231     vector<string>              result;
1232     vector<string>              blob_ids;
1233     NStr::Split(param_value, ",", blob_ids);
1234 
1235     size_t                      empty_count = 0;
1236     for (const auto &  item : blob_ids) {
1237         if (item.empty())
1238             ++empty_count;
1239         else
1240             result.push_back(item);
1241     }
1242 
1243     if (empty_count > 0)
1244         PSG_WARNING("Found " << empty_count << " empty blob id(s) in the '" <<
1245                     param_name << "' list (empty blob ids are ignored)");
1246 
1247     return result;
1248 }
1249 
1250 
x_ConvertIntParameter(const string & param_name,const CTempString & param_value,int & converted,string & err_msg) const1251 bool CPubseqGatewayApp::x_ConvertIntParameter(const string &  param_name,
1252                                               const CTempString &  param_value,
1253                                               int &  converted,
1254                                               string &  err_msg) const
1255 {
1256     try {
1257         converted = NStr::StringToInt(param_value);
1258     } catch (...) {
1259         err_msg = "Error converting '" + param_name + "' parameter "
1260                   "to integer (received value: '" + string(param_value) + "')";
1261         return false;
1262     }
1263     return true;
1264 }
1265 
1266 
1267 unsigned long
x_GetDataSize(const IRegistry & reg,const string & section,const string & entry,unsigned long default_val)1268 CPubseqGatewayApp::x_GetDataSize(const IRegistry &  reg,
1269                                  const string &  section,
1270                                  const string &  entry,
1271                                  unsigned long  default_val)
1272 {
1273     CConfig                         conf(reg);
1274     const CConfig::TParamTree *     param_tree = conf.GetTree();
1275     const TPluginManagerParamTree * section_tree =
1276                                         param_tree->FindSubNode(section);
1277 
1278     if (!section_tree)
1279         return default_val;
1280 
1281     CConfig     c((CConfig::TParamTree*)section_tree, eNoOwnership);
1282     return c.GetDataSize("psg", entry, CConfig::eErr_NoThrow,
1283                          default_val);
1284 }
1285 
1286 
PopulateCassandraMapping(bool need_accept_alert)1287 bool CPubseqGatewayApp::PopulateCassandraMapping(bool  need_accept_alert)
1288 {
1289     static bool need_logging = true;
1290 
1291     size_t      vacant_index = m_MappingIndex + 1;
1292     if (vacant_index >= 2)
1293         vacant_index = 0;
1294     m_CassMapping[vacant_index].clear();
1295 
1296     try {
1297         string      err_msg;
1298         if (!FetchSatToKeyspaceMapping(
1299                     m_RootKeyspace, m_CassConnection,
1300                     m_SatNames, eBlobVer2Schema,
1301                     m_CassMapping[vacant_index].m_BioseqKeyspace, eResolverSchema,
1302                     m_CassMapping[vacant_index].m_BioseqNAKeyspaces, eNamedAnnotationsSchema,
1303                     err_msg)) {
1304             err_msg = "Error populating cassandra mapping: " + err_msg;
1305             if (need_logging)
1306                 PSG_CRITICAL(err_msg);
1307             need_logging = false;
1308             m_Alerts.Register(ePSGS_NoValidCassandraMapping, err_msg);
1309             return false;
1310         }
1311     } catch (const exception &  exc) {
1312         string      err_msg = "Cannot populate keyspace mapping from Cassandra.";
1313         if (need_logging) {
1314             PSG_CRITICAL(exc);
1315             PSG_CRITICAL(err_msg);
1316         }
1317         need_logging = false;
1318         m_Alerts.Register(ePSGS_NoValidCassandraMapping, err_msg + " " + exc.what());
1319         return false;
1320     } catch (...) {
1321         string      err_msg = "Unknown error while populating "
1322                               "keyspace mapping from Cassandra.";
1323         if (need_logging)
1324             PSG_CRITICAL(err_msg);
1325         need_logging = false;
1326         m_Alerts.Register(ePSGS_NoValidCassandraMapping, err_msg);
1327         return false;
1328     }
1329 
1330     auto    errors = m_CassMapping[vacant_index].validate(m_RootKeyspace);
1331     if (m_SatNames.empty())
1332         errors.push_back("No sat to keyspace resolutions found in the '" +
1333                          m_RootKeyspace + "' keyspace.");
1334 
1335     if (errors.empty()) {
1336         m_MappingIndex = vacant_index;
1337 
1338         // Next step in the startup data initialization
1339         m_StartupDataState = ePSGS_NoCassCache;
1340 
1341         if (need_accept_alert) {
1342             // We are not the first time here; It means that the alert about
1343             // the bad mapping has been set before. So now we accepted the
1344             // configuration so a change config alert is needed for the UI
1345             // visibility
1346             m_Alerts.Register(ePSGS_NewCassandraMappingAccepted,
1347                               "Keyspace mapping (sat names, bioseq info and named "
1348                               "annotations) has been populated");
1349         }
1350     } else {
1351         string      combined_error("Invalid Cassandra mapping:");
1352         for (const auto &  err : errors) {
1353             combined_error += "\n" + err;
1354         }
1355         if (need_logging)
1356             PSG_CRITICAL(combined_error);
1357 
1358         m_Alerts.Register(ePSGS_NoValidCassandraMapping, combined_error);
1359     }
1360 
1361     need_logging = false;
1362     return errors.empty();
1363 }
1364 
1365 
PopulatePublicCommentsMapping(void)1366 void CPubseqGatewayApp::PopulatePublicCommentsMapping(void)
1367 {
1368     if (m_PublicComments.get() != nullptr)
1369         return;     // We have already been here: the mapping needs to be
1370                     // populated once
1371 
1372     try {
1373         string      err_msg;
1374 
1375         m_PublicComments.reset(new CPSGMessages);
1376 
1377         if (!FetchMessages(m_RootKeyspace, m_CassConnection,
1378                            *m_PublicComments.get(), err_msg)) {
1379             err_msg = "Error populating cassandra public comments mapping: " + err_msg;
1380             PSG_ERROR(err_msg);
1381             m_Alerts.Register(ePSGS_NoCassandraPublicCommentsMapping, err_msg);
1382         }
1383     } catch (const exception &  exc) {
1384         string      err_msg = "Cannot populate public comments mapping from Cassandra";
1385         PSG_ERROR(exc);
1386         PSG_ERROR(err_msg);
1387         m_Alerts.Register(ePSGS_NoCassandraPublicCommentsMapping,
1388                           err_msg + ". " + exc.what());
1389     } catch (...) {
1390         string      err_msg = "Unknown error while populating "
1391                               "public comments mapping from Cassandra";
1392         PSG_ERROR(err_msg);
1393         m_Alerts.Register(ePSGS_NoCassandraPublicCommentsMapping, err_msg);
1394     }
1395 }
1396 
1397 
CheckCassMapping(void)1398 void CPubseqGatewayApp::CheckCassMapping(void)
1399 {
1400     size_t      vacant_index = m_MappingIndex + 1;
1401     if (vacant_index >= 2)
1402         vacant_index = 0;
1403     m_CassMapping[vacant_index].clear();
1404 
1405     vector<string>      sat_names;
1406     try {
1407         string      err_msg;
1408         if (!FetchSatToKeyspaceMapping(
1409                     m_RootKeyspace, m_CassConnection,
1410                     sat_names, eBlobVer2Schema,
1411                     m_CassMapping[vacant_index].m_BioseqKeyspace, eResolverSchema,
1412                     m_CassMapping[vacant_index].m_BioseqNAKeyspaces, eNamedAnnotationsSchema,
1413                     err_msg)) {
1414             m_Alerts.Register(ePSGS_InvalidCassandraMapping,
1415                               "Error checking cassandra mapping: " + err_msg);
1416             return;
1417         }
1418 
1419     } catch (const exception &  exc) {
1420         m_Alerts.Register(ePSGS_InvalidCassandraMapping,
1421                           "Cannot check keyspace mapping from Cassandra. " +
1422                           string(exc.what()));
1423         return;
1424     } catch (...) {
1425         m_Alerts.Register(ePSGS_InvalidCassandraMapping,
1426                           "Unknown error while checking "
1427                           "keyspace mapping from Cassandra.");
1428         return;
1429     }
1430 
1431     auto    errors = m_CassMapping[vacant_index].validate(m_RootKeyspace);
1432     if (sat_names.empty())
1433         errors.push_back("No sat to keyspace resolutions found in the " +
1434                          m_RootKeyspace + " keyspace.");
1435 
1436     if (errors.empty()) {
1437         // No errors detected in the DB; let's compare with what we use now
1438         if (sat_names != m_SatNames)
1439             m_Alerts.Register(ePSGS_NewCassandraSatNamesMapping,
1440                               "Cassandra has new sat names mapping in  the " +
1441                               m_RootKeyspace + " keyspace. The server needs "
1442                               "to restart to use it.");
1443 
1444         if (m_CassMapping[0] != m_CassMapping[1]) {
1445             m_MappingIndex = vacant_index;
1446             m_Alerts.Register(ePSGS_NewCassandraMappingAccepted,
1447                               "Keyspace mapping (bioseq info and named "
1448                               "annotations) has been updated");
1449         }
1450     } else {
1451         string      combined_error("Invalid Cassandra mapping detected "
1452                                    "during the periodic check:");
1453         for (const auto &  err : errors) {
1454             combined_error += "\n" + err;
1455         }
1456         m_Alerts.Register(ePSGS_InvalidCassandraMapping, combined_error);
1457     }
1458 }
1459 
1460 
x_IsResolutionParamValid(const string & param_name,const CTempString & param_value,string & err_msg) const1461 bool CPubseqGatewayApp::x_IsResolutionParamValid(const string &  param_name,
1462                                                  const CTempString &  param_value,
1463                                                  string &  err_msg) const
1464 {
1465     static string   fast = "fast";
1466     static string   full = "full";
1467 
1468     if (param_value != fast && param_value != full) {
1469         err_msg = "Malformed '" + param_name + "' parameter. "
1470                   "Acceptable values are '" + fast + "' and '" + full + "'.";
1471         return false;
1472     }
1473     return true;
1474 }
1475 
1476 
1477 // Prepares the chunks for the case when it is a client error so only two
1478 // chunks are required:
1479 // - a message chunk
1480 // - a reply completion chunk
x_SendMessageAndCompletionChunks(shared_ptr<CPSGS_Reply> reply,const string & message,CRequestStatus::ECode status,int code,EDiagSev severity)1481 void  CPubseqGatewayApp::x_SendMessageAndCompletionChunks(
1482         shared_ptr<CPSGS_Reply>  reply,
1483         const string &  message,
1484         CRequestStatus::ECode  status, int  code, EDiagSev  severity)
1485 {
1486     if (reply->IsFinished()) {
1487         // This is the case when a reply is already formed and sent to
1488         // the client.
1489         return;
1490     }
1491 
1492     reply->SetContentType(ePSGS_PSGMime);
1493     reply->PrepareReplyMessage(message, status, code, severity);
1494     reply->PrepareReplyCompletion();
1495     reply->Flush();
1496 }
1497 
1498 
x_MalformedArguments(shared_ptr<CPSGS_Reply> reply,CRef<CRequestContext> & context,const string & err_msg)1499 void CPubseqGatewayApp::x_MalformedArguments(
1500                                 shared_ptr<CPSGS_Reply>  reply,
1501                                 CRef<CRequestContext> &  context,
1502                                 const string &  err_msg)
1503 {
1504     m_Counters.Increment(CPSGSCounters::ePSGS_MalformedArgs);
1505     x_SendMessageAndCompletionChunks(reply, err_msg,
1506                                      CRequestStatus::e400_BadRequest,
1507                                      ePSGS_MalformedParameter, eDiag_Error);
1508     PSG_WARNING(err_msg);
1509     x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1510 }
1511 
1512 
x_InsufficientArguments(shared_ptr<CPSGS_Reply> reply,CRef<CRequestContext> & context,const string & err_msg)1513 void CPubseqGatewayApp::x_InsufficientArguments(
1514                                 shared_ptr<CPSGS_Reply>  reply,
1515                                 CRef<CRequestContext> &  context,
1516                                 const string &  err_msg)
1517 {
1518     m_Counters.Increment(CPSGSCounters::ePSGS_InsufficientArgs);
1519     x_SendMessageAndCompletionChunks(reply, err_msg,
1520                                      CRequestStatus::e400_BadRequest,
1521                                      ePSGS_InsufficientArguments, eDiag_Error);
1522     PSG_WARNING(err_msg);
1523     x_PrintRequestStop(context, CRequestStatus::e400_BadRequest);
1524 }
1525 
1526 
x_ReadIdToNameAndDescriptionConfiguration(const IRegistry & reg,const string & section)1527 void CPubseqGatewayApp::x_ReadIdToNameAndDescriptionConfiguration(
1528                                                     const IRegistry &  reg,
1529                                                     const string &  section)
1530 {
1531     list<string>            entries;
1532     reg.EnumerateEntries(section, &entries);
1533 
1534     for(const auto &  value_id : entries) {
1535         string      name_and_description = reg.Get(section, value_id);
1536         string      name;
1537         string      description;
1538         if (NStr::SplitInTwo(name_and_description, ":::", name, description,
1539                              NStr::fSplit_ByPattern)) {
1540             m_IdToNameAndDescription[value_id] = {name, description};
1541         } else {
1542             PSG_WARNING("Malformed counter [" << section << "]/" << name <<
1543                         " information. Expected <name>:::<description");
1544         }
1545     }
1546 }
1547 
x_InitSSL(void)1548 void CPubseqGatewayApp::x_InitSSL(void)
1549 {
1550     if (m_SSLEnable) {
1551         SSL_load_error_strings();
1552         SSL_library_init();
1553         OpenSSL_add_all_algorithms();
1554     }
1555 }
1556 
1557 
x_RegisterProcessors(void)1558 void CPubseqGatewayApp::x_RegisterProcessors(void)
1559 {
1560     // Note: the order of adding defines the priority.
1561     //       Earleir added - higher priority
1562     m_RequestDispatcher.AddProcessor(
1563             unique_ptr<IPSGS_Processor>(new CPSGS_ResolveProcessor()));
1564     m_RequestDispatcher.AddProcessor(
1565             unique_ptr<IPSGS_Processor>(new CPSGS_GetProcessor()));
1566     m_RequestDispatcher.AddProcessor(
1567             unique_ptr<IPSGS_Processor>(new CPSGS_GetBlobProcessor()));
1568     m_RequestDispatcher.AddProcessor(
1569             unique_ptr<IPSGS_Processor>(new CPSGS_AnnotProcessor()));
1570     m_RequestDispatcher.AddProcessor(
1571             unique_ptr<IPSGS_Processor>(new CPSGS_TSEChunkProcessor()));
1572     m_RequestDispatcher.AddProcessor(
1573         unique_ptr<IPSGS_Processor>(new psg::osg::CPSGS_OSGProcessor()));
1574     m_RequestDispatcher.AddProcessor(
1575         unique_ptr<IPSGS_Processor>(new psg::cdd::CPSGS_CDDProcessor()));
1576 }
1577 
1578 
main(int argc,const char * argv[])1579 int main(int argc, const char* argv[])
1580 {
1581     srand(time(NULL));
1582     CThread::InitializeMainThreadId();
1583 
1584 
1585     g_Diag_Use_RWLock();
1586     CDiagContext::SetOldPostFormat(false);
1587     CRequestContext::SetAllowedSessionIDFormat(CRequestContext::eSID_Other);
1588     CRequestContext::SetDefaultAutoIncRequestIDOnPost(true);
1589     CDiagContext::GetRequestContext().SetAutoIncRequestIDOnPost(true);
1590 
1591 
1592     int ret = CPubseqGatewayApp().AppMain(argc, argv, NULL, eDS_ToStdlog);
1593     google::protobuf::ShutdownProtobufLibrary();
1594     return ret;
1595 }
1596 
1597 
CollectGarbage(void)1598 void CollectGarbage(void)
1599 {
1600     CPubseqGatewayApp *      app = CPubseqGatewayApp::GetInstance();
1601     app->GetExcludeBlobCache()->Purge();
1602 }
1603 
1604 
1605