1 /* $Id: osg_connection.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Eugene Vasilchenko
27 *
28 * File Description: base class for processors which may generate os_gateway
29 * fetches
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include "osg_connection.hpp"
36
37 #include <corelib/ncbithr.hpp>
38 #include <connect/ncbi_conn_stream.hpp>
39 #include <objects/id2/ID2_Request.hpp>
40 #include <objects/id2/ID2_Param.hpp>
41 #include <objects/id2/ID2_Params.hpp>
42 #include <objects/id2/ID2_Request_Packet.hpp>
43 #include <objects/id2/ID2_Reply.hpp>
44 #include <objects/id2/ID2_Reply_Data.hpp>
45 #include <serial/serial.hpp>
46 #include <serial/iterator.hpp>
47 #include <cmath>
48 #include <corelib/ncbi_system.hpp>
49 #include <corelib/impl/ncbi_dbsvcmapper.hpp>
50 #include "osg_processor_base.hpp"
51 #include "osg_mapper.hpp"
52
53
54 BEGIN_NCBI_NAMESPACE;
55 BEGIN_NAMESPACE(psg);
56 BEGIN_NAMESPACE(osg);
57
58
59 // Configuration parameters' names
60 static const char kConfigSection[] = "OSG_PROCESSOR";
61 static const char kParamServiceName[] = "service";
62 static const char kParamMaxConnectionCount[] = "maxconn";
63 static const char kParamDebugLevel[] = "debug";
64 static const char kParamExpirationTimeout[] = "expiration_timeout";
65 static const char kParamReadTimeout[] = "read_timeout";
66 static const char kParamCDDRetryTimeout[] = "cdd_retry_timeout";
67 static const char kParamRetryCount[] = "retry_count";
68 static const char kParamPreferredServer[] = "preferred_server";
69 static const char kParamPreference[] = "preference";
70 static const char kParamEnabledCDD[] = "enabled_cdd";
71 static const char kParamEnabledSNP[] = "enabled_snp";
72 static const char kParamEnabledWGS[] = "enabled_wgs";
73
74 // Default configuration parameters' values
75 static const char kDefaultServiceName[] = "ID2_SNP2";
76 static const int kMinMaxConnectionCount = 1;
77 static const int kDefaultMaxConnectionCount = 64;
78 static const EDebugLevel kDefaultDebugLevel = eDebug_error;
79 static const double kMinExpirationTimeout = 1;
80 static const double kDefaultExpirationTimeout = 60;
81 static const double kMinReadTimeout = .1;
82 static const double kDefaultReadTimeout = 30;
83 static const double kMinCDDRetryTimeout = .1;
84 static const double kDefaultCDDRetryTimeout = 0.9;
85 static const int kMinRetryCount = 1;
86 static const int kDefaultRetryCount = 3;
87 static const char kDefaultPreferredServer[] = "localhost";
88 static const int kDefaultPreference = 90;
89 static const int kDefaultEnabledCDD = false;
90 static const int kDefaultEnabledSNP = true;
91 static const int kDefaultEnabledWGS = true;
92
93 static const int kNonResolutionTimeout = 5;
94
95
COSGConnection(size_t connection_id)96 COSGConnection::COSGConnection(size_t connection_id)
97 : m_ConnectionID(connection_id),
98 m_RequestCount(0),
99 m_InitRequestWasSent(false),
100 m_Timestamp(CStopWatch::eStart)
101 {
102 }
103
104
COSGConnection(size_t connection_id,unique_ptr<CConn_IOStream> && stream)105 COSGConnection::COSGConnection(size_t connection_id,
106 unique_ptr<CConn_IOStream>&& stream)
107 : m_ConnectionID(connection_id),
108 m_Stream(move(stream)),
109 m_RequestCount(0),
110 m_InitRequestWasSent(false),
111 m_Timestamp(CStopWatch::eStart)
112 {
113 }
114
115
~COSGConnection()116 COSGConnection::~COSGConnection()
117 {
118 if ( m_RemoveFrom ) {
119 m_RemoveFrom->RemoveConnection(*this);
120 }
121 _ASSERT(!m_RemoveFrom);
122 }
123
124
UpdateTimestamp()125 double COSGConnection::UpdateTimestamp()
126 {
127 return m_Timestamp.Restart();
128 }
129
130
AcceptFeedback(int feedback)131 void COSGConnection::AcceptFeedback(int feedback)
132 {
133 if ( feedback != 0 && m_RemoveFrom && m_ServerInfo ) {
134 m_RemoveFrom->m_Mapper->AcceptFeedback(m_ServiceName,
135 m_ServerInfo->GetHost(), m_ServerInfo->GetPort(),
136 (feedback < 0?
137 COSGServiceMapper::eNegativeFeedback:
138 COSGServiceMapper::ePositiveFeedback));
139 }
140 }
141
142
143 template<class Type>
144 struct SConditionalASNLogger
145 {
SConditionalASNLoggerSConditionalASNLogger146 SConditionalASNLogger(const Type& obj, bool condition)
147 : m_Object(obj), m_Condition(condition)
148 {
149 }
150
151 const Type& m_Object;
152 bool m_Condition;
153 };
154 template<class Type>
operator <<(CNcbiOstream & out,const SConditionalASNLogger<Type> & logger)155 CNcbiOstream& operator<<(CNcbiOstream& out, const SConditionalASNLogger<Type>& logger)
156 {
157 if ( logger.m_Condition ) {
158 out << MSerial_AsnText << logger.m_Object;
159 }
160 else {
161 out << Type::GetTypeInfo()->GetName();
162 }
163 return out;
164 }
165 template<class Type>
LogASNIf(const Type & obj,bool condition)166 SConditionalASNLogger<Type> LogASNIf(const Type& obj, bool condition)
167 {
168 return SConditionalASNLogger<Type>(obj, condition);
169 }
170
171
172 const bool kSimulateFailures = false;
173 const int kNoFailureCount = 8;
174 const int kFailureRate = 8;
175 static DECLARE_TLS_VAR(int, s_NoFailureCount);
176
s_SimulateFailure(const char * where)177 static void s_SimulateFailure(const char* where)
178 {
179 if ( !kSimulateFailures ) {
180 return;
181 }
182 if ( s_NoFailureCount > 0 ) {
183 --s_NoFailureCount;
184 }
185 else if ( random() % kFailureRate == 0 ) {
186 s_NoFailureCount = kNoFailureCount;
187 string msg = string("simulated OSG ")+where+" failure";
188 if ( random() % 2 ) {
189 throw runtime_error(msg);
190 }
191 else {
192 NCBI_THROW(CIOException, eWrite, msg);
193 }
194 }
195 }
196
197
SendRequestPacket(const CID2_Request_Packet & packet)198 void COSGConnection::SendRequestPacket(const CID2_Request_Packet& packet)
199 {
200 _ASSERT(m_RemoveFrom);
201 if ( GetDebugLevel() >= eDebug_exchange ) {
202 LOG_POST(GetDiagSeverity() << "OSG("<<GetConnectionID()<<"): "
203 "Sending "<<LogASNIf(packet, GetDebugLevel() >= eDebug_asn));
204 }
205 _ASSERT(!packet.Get().empty());
206 _ASSERT(packet.Get().front()->GetSerial_number()+int(packet.Get().size()) == GetNextRequestSerialNumber());
207 _ASSERT(packet.Get().back()->GetSerial_number()+1 == GetNextRequestSerialNumber());
208 s_SimulateFailure("send");
209 _ASSERT(m_InitRequestWasSent || packet.Get().front()->GetRequest().IsInit());
210 *m_Stream << MSerial_AsnBinary << packet;
211 if ( packet.Get().front()->GetRequest().IsInit() ) {
212 m_InitRequestWasSent = true;
213 }
214 _ASSERT(m_InitRequestWasSent);
215 }
216
217
ReceiveReply()218 CRef<CID2_Reply> COSGConnection::ReceiveReply()
219 {
220 s_SimulateFailure("read");
221 CRef<CID2_Reply> reply(new CID2_Reply());
222 *m_Stream >> MSerial_AsnBinary >> *reply;
223 _ASSERT(m_RemoveFrom);
224 if ( GetDebugLevel() >= eDebug_exchange ) {
225 if ( GetDebugLevel() == eDebug_asn ) {
226 CTypeIterator<CID2_Reply_Data> iter = Begin(*reply);
227 if ( iter && iter->IsSetData() ) {
228 CID2_Reply_Data::TData save;
229 save.swap(iter->SetData());
230 size_t size = 0, count = 0;
231 ITERATE ( CID2_Reply_Data::TData, i, save ) {
232 ++count;
233 size_t chunk = (*i)->size();
234 size += chunk;
235 }
236 LOG_POST(GetDiagSeverity() << "OSG("<<GetConnectionID()<<"): "
237 "Received "<<MSerial_AsnText<<*reply<<
238 "Data: " << size << " bytes in "<<count<<" chunks");
239 save.swap(iter->SetData());
240 }
241 else {
242 LOG_POST(GetDiagSeverity() << "OSG("<<GetConnectionID()<<"): "
243 "Received "<<MSerial_AsnText<<*reply);
244 }
245 }
246 else {
247 LOG_POST(GetDiagSeverity() << "OSG("<<GetConnectionID()<<"): "
248 "Received "<<LogASNIf(*reply, GetDebugLevel() >= eDebug_raw));
249 }
250 }
251 return reply;
252 }
253
254
MakeInitRequest()255 CRef<CID2_Request> COSGConnection::MakeInitRequest()
256 {
257 CRef<CID2_Request> req(new CID2_Request());
258 req->SetRequest().SetInit();
259 if ( 1 ) {
260 // set client name
261 CRef<CID2_Param> param(new CID2_Param);
262 param->SetName("log:client_name");
263 param->SetValue().push_back("pubseq_gateway");
264 req->SetParams().Set().push_back(param);
265 }
266 if ( 1 ) {
267 CRef<CID2_Param> param(new CID2_Param);
268 param->SetName("id2:allow");
269
270 // allow new blob-state field in several ID2 replies
271 param->SetValue().push_back("*.blob-state");
272 // enable VDB-based WGS sequences
273 param->SetValue().push_back("vdb-wgs");
274 // enable VDB-based SNP sequences
275 param->SetValue().push_back("vdb-snp");
276 // enable VDB-based CDD sequences
277 param->SetValue().push_back("vdb-cdd");
278 req->SetParams().Set().push_back(param);
279 }
280 return req;
281 }
282
283
MakeInitRequestPacket()284 CRef<CID2_Request_Packet> COSGConnection::MakeInitRequestPacket()
285 {
286 CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
287 packet->Set().push_back(MakeInitRequest());
288 packet->Set().back()->SetSerial_number(AllocateRequestSerialNumber());
289 return packet;
290 }
291
292
COSGConnectionPool()293 COSGConnectionPool::COSGConnectionPool()
294 : m_ServiceName(kDefaultServiceName),
295 m_MaxConnectionCount(kDefaultMaxConnectionCount),
296 m_ExpirationTimeout(kDefaultExpirationTimeout),
297 m_ReadTimeout(kDefaultReadTimeout),
298 m_CDDRetryTimeout(kDefaultCDDRetryTimeout),
299 m_RetryCount(kDefaultRetryCount),
300 m_EnabledCDD(kDefaultEnabledCDD),
301 m_EnabledSNP(kDefaultEnabledSNP),
302 m_EnabledWGS(kDefaultEnabledWGS),
303 m_WaitConnectionSlot(0, kMax_Int),
304 m_NextConnectionID(1),
305 m_ConnectionCount(0),
306 m_ConnectFailureCount(0)
307 {
308 }
309
310
~COSGConnectionPool()311 COSGConnectionPool::~COSGConnectionPool()
312 {
313 }
314
315
AppParseArgs(const CArgs &)316 void COSGConnectionPool::AppParseArgs(const CArgs& /*args*/)
317 {
318 // TODO
319 }
320
321
g_OSG_GetPreferredAddress(const string & name)322 static Uint4 g_OSG_GetPreferredAddress(const string& name)
323 {
324 if (name.empty()) {
325 return 0;
326 } else if (NStr::EqualNocase(name, "localhost")) {
327 return CSocketAPI::GetLocalHostAddress();
328 } else {
329 return CSocketAPI::gethostbyname(name);
330 }
331 }
332
333
LoadConfig(const CNcbiRegistry & registry,string section)334 void COSGConnectionPool::LoadConfig(const CNcbiRegistry& registry, string section)
335 {
336 if ( section.empty() ) {
337 section = kConfigSection;
338 }
339
340 #define CHECK_PARAM_MIN(value, name, min_value) \
341 do { \
342 if ( value < min_value ) { \
343 NCBI_THROW_FMT(CPubseqGatewayException, eConfigurationError, \
344 name<<"(="<<value<<") < "<<min_value); \
345 } \
346 } while (0)
347
348 m_ServiceName =
349 registry.GetString(section,
350 kParamServiceName,
351 kDefaultServiceName);
352 m_MaxConnectionCount =
353 registry.GetInt(section,
354 kParamMaxConnectionCount,
355 kDefaultMaxConnectionCount);
356 CHECK_PARAM_MIN(m_MaxConnectionCount, kParamMaxConnectionCount, kMinMaxConnectionCount);
357
358 m_ExpirationTimeout =
359 registry.GetDouble(section,
360 kParamExpirationTimeout,
361 kDefaultExpirationTimeout);
362 CHECK_PARAM_MIN(m_ExpirationTimeout, kParamExpirationTimeout, kMinExpirationTimeout);
363
364 m_ReadTimeout =
365 registry.GetDouble(section,
366 kParamReadTimeout,
367 kDefaultReadTimeout);
368 CHECK_PARAM_MIN(m_ReadTimeout, kParamReadTimeout, kMinReadTimeout);
369
370 m_CDDRetryTimeout =
371 registry.GetDouble(section,
372 kParamCDDRetryTimeout,
373 kDefaultCDDRetryTimeout);
374 CHECK_PARAM_MIN(m_CDDRetryTimeout, kParamCDDRetryTimeout, kMinCDDRetryTimeout);
375
376 m_RetryCount =
377 registry.GetInt(section,
378 kParamRetryCount,
379 kDefaultRetryCount);
380 CHECK_PARAM_MIN(m_RetryCount, kParamRetryCount, kMinRetryCount);
381
382 #undef CHECK_PARAM_MIN
383
384 SetDebugLevel(registry.GetInt(section,
385 kParamDebugLevel,
386 eDebugLevel_default));
387 if ( GetDebugLevel() >= eDebug_open ) {
388 LOG_POST(GetDiagSeverity()<<"OSG: pool of "<<m_MaxConnectionCount<<
389 " connections to "<<m_ServiceName);
390 }
391
392 COSGServiceMapper::InitDefaults(const_cast<CNcbiRegistry&>(registry));
393 CRef<COSGServiceMapper> service_mapper(new COSGServiceMapper(®istry));
394 string preferred_server = registry.GetString(section,
395 kParamPreferredServer,
396 kDefaultPreferredServer);
397 int preference = registry.GetInt(section,
398 kParamPreference,
399 kDefaultPreference);
400 auto psg_ip = g_OSG_GetPreferredAddress(preferred_server);
401 TSvrRef pref_info(new CDBServer(m_ServiceName, psg_ip));
402 service_mapper->SetPreference(m_ServiceName, pref_info, preference);
403 if ( GetDebugLevel() >= eDebug_open ) {
404 LOG_POST(GetDiagSeverity()<<"OSG: prefer "<<preferred_server<<
405 " ["<<CSocketAPI::ntoa(psg_ip)<<"] by "<<preference);
406 }
407 m_Mapper = service_mapper;
408
409 m_EnabledCDD = registry.GetBool(section,
410 kParamEnabledCDD,
411 kDefaultEnabledCDD);
412 m_EnabledSNP = registry.GetBool(section,
413 kParamEnabledSNP,
414 kDefaultEnabledSNP);
415 m_EnabledWGS = registry.GetBool(section,
416 kParamEnabledWGS,
417 kDefaultEnabledWGS);
418 }
419
420
SetLogging(EDiagSev severity)421 void COSGConnectionPool::SetLogging(EDiagSev severity)
422 {
423 SetDiagSeverity(severity);
424 }
425
426
AllocateConnection()427 CRef<COSGConnection> COSGConnectionPool::AllocateConnection()
428 {
429 CRef<COSGConnection> conn;
430 while ( !conn ) {
431 {{
432 CMutexGuard guard(m_Mutex);
433 while ( !conn && !m_FreeConnections.empty() ) {
434 conn = move(m_FreeConnections.front());
435 m_FreeConnections.pop_front();
436 _ASSERT(!conn->m_RemoveFrom);
437 if ( conn->UpdateTimestamp() > m_ExpirationTimeout ) {
438 if ( GetDebugLevel() >= eDebug_open ) {
439 LOG_POST(GetDiagSeverity()<<"OSG("<<conn->GetConnectionID()<<"): "
440 "Closing expired connection");
441 }
442 --m_ConnectionCount;
443 conn = nullptr;
444 }
445 }
446 if ( !conn && m_ConnectionCount < m_MaxConnectionCount ) {
447 conn = new COSGConnection(m_NextConnectionID++);
448 ++m_ConnectionCount;
449 }
450 }}
451 if ( !conn ) {
452 m_WaitConnectionSlot.Wait();
453 }
454 }
455 _ASSERT(m_ConnectionCount > 0);
456 _ASSERT(!conn->m_RemoveFrom);
457 conn->m_RemoveFrom = this;
458 if ( !conn->m_Stream ) {
459 try {
460 x_OpenConnection(*conn);
461 m_ConnectFailureCount = 0;
462 }
463 catch ( ... ) {
464 ++m_ConnectFailureCount;
465 throw;
466 }
467 }
468 return conn;
469 }
470
471
x_OpenConnection(COSGConnection & conn)472 void COSGConnectionPool::x_OpenConnection(COSGConnection& conn)
473 {
474 _ASSERT(conn.m_RemoveFrom == this);
475 size_t connection_id = conn.GetConnectionID();
476 if ( GetDebugLevel() >= eDebug_open ) {
477 LOG_POST(GetDiagSeverity() << "OSG("<<connection_id<<"): "
478 "Connecting to "<<m_ServiceName);
479 }
480 int wait_count = m_ConnectFailureCount;
481 if ( wait_count > 0 ) {
482 // delay before opening new connection to a failing server
483 double wait_seconds = .5*pow(2., wait_count-1)+.5*wait_count;
484 wait_seconds = min(wait_seconds, 10.);
485 if ( GetDebugLevel() >= eDebug_open ) {
486 LOG_POST(GetDiagSeverity() << "OSG("<<connection_id<<"): waiting "<<
487 wait_seconds<<"s before new connection");
488 }
489 SleepMicroSec((unsigned long)(wait_seconds*1e6));
490 }
491 unique_ptr<CConn_IOStream> stream;
492 conn.m_ServiceName = m_ServiceName;
493 conn.m_ServerInfo = move(x_GetServer());
494 if ( conn.m_ServerInfo ) {
495 string host = CSocketAPI::ntoa(conn.m_ServerInfo->GetHost());
496 if ( GetDebugLevel() >= eDebug_open ) {
497 LOG_POST(GetDiagSeverity() << "OSG("<<connection_id<<"): "
498 "Connecting to "<<host<<":"<<conn.m_ServerInfo->GetPort());
499 }
500 stream = make_unique<CConn_SocketStream>(host, conn.m_ServerInfo->GetPort());
501 }
502 else {
503 stream = make_unique<CConn_ServiceStream>(m_ServiceName);
504 }
505 if ( !stream || !*stream ) {
506 NCBI_THROW(CIOException, eWrite, "failed to open connection");
507 }
508 if ( GetDebugLevel() >= eDebug_open ) {
509 string descr = m_ServiceName;
510 if ( CONN conn = stream->GetCONN() ) {
511 AutoPtr<char, CDeleter<char> > conn_descr(CONN_Description(conn));
512 if ( conn_descr ) {
513 descr += " -> ";
514 descr += conn_descr.get();
515 }
516 }
517 LOG_POST(GetDiagSeverity() << "OSG("<<connection_id<<"): "
518 "Connected to "<<descr);
519 }
520 conn.m_Stream = move(stream);
521 if ( 1 ) {
522 auto req_packet = conn.MakeInitRequestPacket();
523 conn.SendRequestPacket(*req_packet);
524 _ASSERT(conn.InitRequestWasSent());
525 auto reply = conn.ReceiveReply();
526 if ( !reply->GetReply().IsInit() || !reply->IsSetEnd_of_reply() ) {
527 NCBI_THROW(CIOException, eRead, "bad init reply");
528 }
529 conn.AcceptFeedback(+1);
530 }
531 }
532
533
x_GetServer()534 TSvrRef COSGConnectionPool::x_GetServer()
535 {
536 if ( !m_Mapper ) {
537 return null;
538 }
539 CMutexGuard guard(m_Mutex);
540 if ( m_NonresolutionRetryDeadline && !m_NonresolutionRetryDeadline->IsExpired() ) {
541 return null;
542 }
543 TSvrRef server;
544 do {
545 if ( !m_Balancer ) {
546 IDBServiceMapper::TOptions options;
547 m_Mapper->GetServerOptions(m_ServiceName, &options);
548 m_Balancer.Reset(new CPoolBalancer(m_ServiceName, options, true));
549 }
550 server = m_Balancer->GetServer();
551 if ( !server ) {
552 ERR_POST(Warning <<
553 "Unable to resolve OSG service name "
554 << m_ServiceName
555 << " via supplied mapper; passing it as is.");
556 m_NonresolutionRetryDeadline.reset(new CDeadline(kNonResolutionTimeout));
557 }
558 else if ( server->GetExpireTime() < CCurrentTime().GetTimeT() ) {
559 m_Balancer.Reset();
560 }
561 } while ( !m_Balancer );
562 if ( !server || server->GetHost() == 0 || server->GetPort() == 0 ) {
563 return null;
564 }
565 else {
566 return server;
567 }
568 }
569
570
ReleaseConnection(CRef<COSGConnection> & conn)571 void COSGConnectionPool::ReleaseConnection(CRef<COSGConnection>& conn)
572 {
573 CMutexGuard guard(m_Mutex);
574 _ASSERT(conn);
575 _ASSERT(m_ConnectionCount > 0);
576 _ASSERT(conn->m_RemoveFrom == this);
577 conn->AcceptFeedback(+1);
578 conn->m_RemoveFrom = nullptr;
579 m_FreeConnections.push_back(move(conn));
580 _ASSERT(!conn);
581 m_WaitConnectionSlot.Post();
582 }
583
584
RemoveConnection(COSGConnection & conn)585 void COSGConnectionPool::RemoveConnection(COSGConnection& conn)
586 {
587 if ( GetDebugLevel() >= eDebug_open ) {
588 LOG_POST(GetDiagSeverity()<<"OSG("<<conn.GetConnectionID()<<"): "
589 "Closing failed connection");
590 }
591 CMutexGuard guard(m_Mutex);
592 _ASSERT(m_ConnectionCount > 0);
593 _ASSERT(conn.m_RemoveFrom == this);
594 conn.AcceptFeedback(-1);
595 conn.m_RemoveFrom = nullptr;
596 --m_ConnectionCount;
597 m_WaitConnectionSlot.Post();
598 }
599
600
601 END_NAMESPACE(osg);
602 END_NAMESPACE(psg);
603 END_NCBI_NAMESPACE;
604