1 /*  $Id: netservice_api.cpp 607718 2020-05-06 17:39:37Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author:  Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  */
30 
31 #include <ncbi_pch.hpp>
32 
33 #include "../ncbi_comm.h"
34 #include "../ncbi_lbsmd.h"
35 #include "../ncbi_servicep.h"
36 
37 #include "netservice_api_impl.hpp"
38 
39 #include <connect/services/error_codes.hpp>
40 #include <connect/services/netcache_api_expt.hpp>
41 #include <connect/services/netschedule_api_expt.hpp>
42 #include <connect/services/srv_connections_expt.hpp>
43 
44 #include <connect/ncbi_conn_exception.hpp>
45 #include <connect/ncbi_conn_stream.hpp>
46 #include <connect/ncbi_core_cxx.hpp>
47 #include <connect/ncbi_localip.hpp>
48 
49 #include <corelib/ncbi_config.hpp>
50 #include <corelib/ncbi_message.hpp>
51 #include <corelib/ncbi_system.hpp>
52 
53 #include <util/random_gen.hpp>
54 #include <util/checksum.hpp>
55 
56 #include <deque>
57 
58 #define NCBI_USE_ERRCODE_X   ConnServ_Connection
59 
60 #define LBSMD_PENALIZED_RATE_BOUNDARY -0.01
61 
62 BEGIN_NCBI_SCOPE
63 
64 // The purpose of these classes is to execute commands suppressing possible errors and avoiding retries
65 struct SNoRetry : SNetServiceImpl::SRetry
66 {
SNoRetrySNoRetry67     SNoRetry(SNetServiceImpl* service) :
68         m_Service(service)
69     {
70         _ASSERT(m_Service);
71 
72         Swap(*m_Service, m_MaxRetries);
73     }
74 
~SNoRetrySNoRetry75     ~SNoRetry()
76     {
77         Swap(*m_Service, m_MaxRetries);
78     }
79 
80 protected:
81     CNetRef<SNetServiceImpl> m_Service;
82 
83 private:
84     unsigned m_MaxRetries = 0;
85 };
86 
87 struct SNoRetryNoErrors : SNoRetry
88 {
SNoRetryNoErrorsSNoRetryNoErrors89     SNoRetryNoErrors(SNetServiceImpl* service) :
90         SNoRetry(service)
91     {
92         Set([](const string&, CNetServer) { return true; });
93     }
94 
~SNoRetryNoErrorsSNoRetryNoErrors95     ~SNoRetryNoErrors()
96     {
97         Set(nullptr);
98     }
99 
100 private:
SetSNoRetryNoErrors101     void Set(CNetService::TEventHandler error_handler)
102     {
103         m_Service->m_Listener->SetErrorHandler(error_handler);
104     }
105 };
106 
DeleteThis()107 void SDiscoveredServers::DeleteThis()
108 {
109     CNetService service(m_Service);
110 
111     if (!service)
112         return;
113 
114     // Before resetting the m_Service pointer, verify that no other object
115     // has acquired a reference to this server group object yet (between
116     // the time the reference counter went to zero, and the current moment
117     // when m_Service is about to be reset).
118     CFastMutexGuard discovery_mutex_lock(service->m_DiscoveryMutex);
119 
120     service = NULL;
121 
122     if (!Referenced() && m_Service) {
123         if (m_Service->m_DiscoveredServers != this) {
124             m_NextGroupInPool = m_Service->m_ServerGroupPool;
125             m_Service->m_ServerGroupPool = this;
126         }
127         m_Service = NULL;
128     }
129 }
130 
GetServer()131 CNetServer CNetServiceIterator::GetServer()
132 {
133     return m_Impl->GetServer();
134 }
135 
GetServer()136 CNetServer SNetServiceIteratorImpl::GetServer()
137 {
138     auto& service = m_ServerGroup->m_Service;
139     service->m_RebalanceStrategy.OnResourceRequested();
140     return new SNetServerImpl(service, service->m_ServerPool->ReturnServer(m_Position->first));
141 }
142 
Next()143 bool CNetServiceIterator::Next()
144 {
145     if (m_Impl->Next())
146         return true;
147 
148     m_Impl.Reset(NULL);
149     return false;
150 }
151 
Prev()152 bool CNetServiceIterator::Prev()
153 {
154     if (m_Impl->Prev())
155         return true;
156 
157     m_Impl.Reset(NULL);
158     return false;
159 }
160 
GetRate() const161 double CNetServiceIterator::GetRate() const
162 {
163     return m_Impl->GetRate();
164 }
165 
Next()166 bool SNetServiceIteratorImpl::Next()
167 {
168     return ++m_Position != m_ServerGroup->m_Servers.end();
169 }
170 
Prev()171 bool SNetServiceIteratorImpl::Prev()
172 {
173     if (m_Position == m_ServerGroup->m_Servers.begin())
174         return false;
175     --m_Position;
176     return true;
177 }
178 
Next()179 bool SNetServiceIterator_OmitPenalized::Next()
180 {
181     return ++m_Position != m_ServerGroup->m_SuppressedBegin;
182 }
183 
184 DEFINE_STATIC_FAST_MUTEX(s_RndLock);
185 static CRandom s_RandomIteratorGen((CRandom::TValue) time(NULL));
186 
187 static CRandom::TValue
s_GetRand(CRandom::TValue max_value)188 s_GetRand(CRandom::TValue max_value)
189 {
190     CFastMutexGuard guard(s_RndLock);
191     return s_RandomIteratorGen.GetRand(0, max_value);
192 }
193 
SNetServiceIterator_RandomPivot(SDiscoveredServers * server_group_impl)194 SNetServiceIterator_RandomPivot::SNetServiceIterator_RandomPivot(
195         SDiscoveredServers* server_group_impl) :
196     SNetServiceIteratorImpl(server_group_impl,
197         server_group_impl->m_Servers.begin() + s_GetRand(
198             CRandom::TValue((server_group_impl->m_SuppressedBegin -
199                 server_group_impl->m_Servers.begin()) - 1)))
200 {
201 }
202 
Next()203 bool SNetServiceIterator_RandomPivot::Next()
204 {
205     if (m_RandomIterators.empty()) {
206         TNetServerList::const_iterator it = m_ServerGroup->m_Servers.begin();
207         size_t number_of_servers = m_ServerGroup->m_SuppressedBegin - it;
208         if (number_of_servers <= 1)
209             return false; // There are no servers to advance to.
210         m_RandomIterators.reserve(number_of_servers);
211         m_RandomIterators.push_back(m_Position);
212         --number_of_servers;
213         do {
214             if (it != m_Position) {
215                 m_RandomIterators.push_back(it);
216                 --number_of_servers;
217             }
218             ++it;
219         } while (number_of_servers > 0);
220         // Shuffle m_RandomIterators starting from the element with index '1'.
221         if (m_RandomIterators.size() > 2) {
222             TRandomIterators::iterator rnt_it = m_RandomIterators.begin();
223             while (++rnt_it != m_RandomIterators.end())
224                 swap(*rnt_it, m_RandomIterators[s_RandomIteratorGen.GetRand(1,
225                     CRandom::TValue(m_RandomIterators.size() - 1))]);
226         }
227         m_RandomIterator = m_RandomIterators.begin();
228         ++m_RandomIterator;
229     } else
230         if (++m_RandomIterator == m_RandomIterators.end())
231             return false;
232 
233     m_Position = *m_RandomIterator;
234 
235     return true;
236 }
237 
Prev()238 bool SNetServiceIterator_RandomPivot::Prev()
239 {
240     if (m_RandomIterators.empty() ||
241             m_RandomIterator == m_RandomIterators.begin())
242         return false;
243 
244     m_Position = *--m_RandomIterator;
245 
246     return true;
247 }
248 
Next()249 bool SNetServiceIterator_Circular::Next()
250 {
251     if (++m_Position == m_ServerGroup->m_Servers.end())
252         m_Position = m_ServerGroup->m_Servers.begin();
253     return m_Position != m_Pivot;
254 }
255 
Prev()256 bool SNetServiceIterator_Circular::Prev()
257 {
258     if (m_Position == m_Pivot)
259         return false;
260     if (m_Position == m_ServerGroup->m_Servers.begin())
261         m_Position = m_ServerGroup->m_Servers.end();
262     --m_Position;
263     return true;
264 }
265 
SNetServiceIterator_Weighted(SDiscoveredServers * server_group_impl,Uint4 key_crc32)266 SNetServiceIterator_Weighted::SNetServiceIterator_Weighted(
267         SDiscoveredServers* server_group_impl, Uint4 key_crc32) :
268     SNetServiceIteratorImpl(server_group_impl),
269     m_KeyCRC32(key_crc32)
270 {
271     TNetServerList::const_iterator server_list_iter(m_Position);
272 
273     if ((m_SingleServer =
274             (++server_list_iter == server_group_impl->m_SuppressedBegin)))
275         // Nothing to do if there's only one server.
276         return;
277 
278     // Find the server with the highest rank.
279     SServerRank highest_rank(x_GetServerRank(m_Position));
280 
281     do {
282         SServerRank server_rank(x_GetServerRank(server_list_iter));
283         if (highest_rank < server_rank)
284             highest_rank = server_rank;
285         // To avoid unnecessary memory allocations, do not save
286         // the calculated server ranks in hope that Next()
287         // will be called very rarely for this type of iterators.
288     } while (++server_list_iter != server_group_impl->m_SuppressedBegin);
289 
290     m_Position = highest_rank.m_ServerListIter;
291 }
292 
Next()293 bool SNetServiceIterator_Weighted::Next()
294 {
295     if (m_SingleServer)
296         return false;
297 
298     if (m_ServerRanks.empty()) {
299         TNetServerList::const_iterator server_list_iter(
300                 m_ServerGroup->m_Servers.begin());
301         do
302             m_ServerRanks.push_back(x_GetServerRank(server_list_iter));
303         while (++server_list_iter != m_ServerGroup->m_SuppressedBegin);
304 
305         // Sort the ranks in *reverse* order.
306         sort(m_ServerRanks.rbegin(), m_ServerRanks.rend());
307 
308         // Skip the server with the highest rank, which was the first
309         // server returned by this iterator object.
310         m_CurrentServerRank = m_ServerRanks.begin() + 1;
311     } else if (++m_CurrentServerRank == m_ServerRanks.end())
312         return false;
313 
314     m_Position = m_CurrentServerRank->m_ServerListIter;
315     return true;
316 }
317 
Prev()318 bool SNetServiceIterator_Weighted::Prev()
319 {
320     if (m_SingleServer)
321         return false;
322 
323     _ASSERT(!m_ServerRanks.empty());
324 
325     if (m_CurrentServerRank == m_ServerRanks.begin())
326         return false;
327 
328     m_Position = (--m_CurrentServerRank)->m_ServerListIter;
329     return true;
330 }
331 
SNetServerPoolImpl(INetServerConnectionListener * listener)332 SNetServerPoolImpl::SNetServerPoolImpl(INetServerConnectionListener* listener) :
333     m_PropCreator(listener->GetPropCreator()),
334     m_EnforcedServer(0, 0),
335     m_MaxTotalTime(CTimeout::eInfinite),
336     m_UseOldStyleAuth(false)
337 {
338 }
339 
SNetServiceImpl(const string & api_name,const string & service_name,const string & client_name,INetServerConnectionListener * listener,CSynRegistry & registry,const SRegSynonyms & sections)340 SNetServiceImpl::SNetServiceImpl(const string& api_name, const string& service_name, const string& client_name,
341         INetServerConnectionListener* listener, CSynRegistry& registry, const SRegSynonyms& sections) :
342     m_Listener(listener),
343     m_ServerPool(new SNetServerPoolImpl(listener)),
344     m_ServiceName(service_name),
345     m_RebalanceStrategy(registry, sections),
346     m_RoundRobin(0),
347     m_APIName(api_name),
348     m_ClientName(client_name)
349 {
350 }
351 
SNetServiceImpl(SNetServerInPool * server,SNetServiceImpl * prototype)352 SNetServiceImpl::SNetServiceImpl(SNetServerInPool* server, SNetServiceImpl* prototype) :
353     m_Listener(prototype->m_Listener->Clone()),
354     m_ServerPool(prototype->m_ServerPool),
355     m_ServiceName(server->m_Address.AsString()),
356     m_RebalanceStrategy(prototype->m_RebalanceStrategy),
357     m_RoundRobin(prototype->m_RoundRobin.load()),
358     m_APIName(prototype->m_APIName),
359     m_ClientName(prototype->m_ClientName),
360     m_UseSmartRetries(prototype->m_UseSmartRetries),
361     m_ConnectionMaxRetries(prototype->m_ConnectionMaxRetries),
362     m_ConnectionRetryDelay(prototype->m_ConnectionRetryDelay),
363     m_NetInfo(prototype->m_NetInfo)
364 {
365     Construct(server);
366 }
367 
SNetServiceImpl(const string & service_name,SNetServiceImpl * prototype)368 SNetServiceImpl::SNetServiceImpl(const string& service_name, SNetServiceImpl* prototype) :
369     m_Listener(prototype->m_Listener->Clone()),
370     m_ServerPool(prototype->m_ServerPool),
371     m_ServiceName(service_name),
372     m_RebalanceStrategy(prototype->m_RebalanceStrategy),
373     m_RoundRobin(prototype->m_RoundRobin.load()),
374     m_APIName(prototype->m_APIName),
375     m_ClientName(prototype->m_ClientName),
376     m_UseSmartRetries(prototype->m_UseSmartRetries),
377     m_ConnectionMaxRetries(prototype->m_ConnectionMaxRetries),
378     m_ConnectionRetryDelay(prototype->m_ConnectionRetryDelay),
379     m_NetInfo(prototype->m_NetInfo)
380 {
381     Construct();
382 }
383 
Construct(SNetServerInPool * server)384 void SNetServiceImpl::Construct(SNetServerInPool* server)
385 {
386     m_ServiceType = eSingleServerService;
387     m_DiscoveredServers = AllocServerGroup(0);
388     CFastMutexGuard server_mutex_lock(m_ServerPool->m_ServerMutex);
389     m_DiscoveredServers->m_Servers.push_back(TServerRate(server, 1));
390     m_DiscoveredServers->m_SuppressedBegin =
391         m_DiscoveredServers->m_Servers.end();
392 }
393 
Construct()394 void SNetServiceImpl::Construct()
395 {
396     if (!m_ServiceName.empty()) {
397         if (auto address = SSocketAddress::Parse(m_ServiceName)) {
398             Construct(m_ServerPool->FindOrCreateServerImpl(move(address)));
399         } else {
400             m_ServiceType = eLoadBalancedService;
401         }
402     }
403 }
404 
Create(const string & api_name,const string & service_name,const string & client_name,INetServerConnectionListener * listener,CSynRegistry & registry,SRegSynonyms & sections,const string & ns_client_name)405 SNetServiceImpl* SNetServiceImpl::Create(
406         const string& api_name, const string& service_name, const string& client_name,
407         INetServerConnectionListener* listener,
408         CSynRegistry& registry, SRegSynonyms& sections, const string& ns_client_name)
409 {
410     CNetRef<SNetServiceImpl> rv(new SNetServiceImpl(api_name, service_name, client_name, listener, registry, sections));
411     rv->Init(registry, sections, ns_client_name);
412     return rv.Release();
413 }
414 
Clone(SNetServerInPool * server,SNetServiceImpl * prototype)415 SNetServiceImpl* SNetServiceImpl::Clone(SNetServerInPool* server, SNetServiceImpl* prototype)
416 {
417     return new SNetServiceImpl(server, prototype);
418 }
419 
Clone(const string & service_name,SNetServiceImpl * prototype)420 SNetServiceImpl* SNetServiceImpl::Clone(const string& service_name, SNetServiceImpl* prototype)
421 {
422     return new SNetServiceImpl(service_name, prototype);
423 }
424 
425 #ifdef NCBI_GRID_XSITE_CONN_SUPPORT
AllowXSiteConnections()426 void CNetService::AllowXSiteConnections()
427 {
428     SNetServiceXSiteAPI::AllowXSiteConnections();
429 }
430 
IsUsingXSiteProxy()431 bool CNetService::IsUsingXSiteProxy()
432 {
433     return SNetServiceXSiteAPI::IsUsingXSiteProxy();
434 }
435 
436 static const char kXSiteFwd[] = "XSITEFWD";
437 
AllowXSiteConnections()438 void SNetServiceXSiteAPI::AllowXSiteConnections()
439 {
440     const auto local_ip = CSocketAPI::GetLocalHostAddress();
441     const auto local_domain = GetDomain(local_ip);
442     m_LocalDomain.store(local_domain);
443     m_AllowXSiteConnections.store(true);
444 }
445 
IsUsingXSiteProxy()446 bool SNetServiceXSiteAPI::IsUsingXSiteProxy()
447 {
448     return m_AllowXSiteConnections.load();
449 }
450 
InitXSite(CSynRegistry & registry,const SRegSynonyms & sections)451 void SNetServiceXSiteAPI::InitXSite(CSynRegistry& registry, const SRegSynonyms& sections)
452 {
453     if (registry.Get({ "netservice_api", sections }, "allow_xsite_conn", false)) {
454         AllowXSiteConnections();
455     }
456 }
457 
ConnectXSite(CSocket & socket,SNetServerImpl::SConnectDeadline & deadline,const SSocketAddress & original,const string & service)458 void SNetServiceXSiteAPI::ConnectXSite(CSocket& socket,
459         SNetServerImpl::SConnectDeadline& deadline,
460         const SSocketAddress& original, const string& service)
461 {
462     SSocketAddress actual(original);
463     _ASSERT(actual.port);
464     ticket_t ticket = 0;
465 
466     if (IsForeignAddr(actual.host)) {
467         union {
468             SFWDRequestReply rr;
469             char buf[FWD_RR_MAX_SIZE + 1];
470         };
471         memset(&rr, 0, sizeof(rr));
472 
473         rr.host =                     actual.host;
474         rr.port = SOCK_HostToNetShort(actual.port);
475         rr.flag = SOCK_HostToNetShort(FWD_RR_FIREWALL | FWD_RR_KEEPALIVE);
476 
477         auto text_max = sizeof(buf)-1 - offsetof(SFWDRequestReply,text);
478         auto text_len = service.size() ? min(service.size() + 1, text_max) : 0;
479         memcpy(rr.text, service.c_str(), text_len);
480 
481         size_t len = 0;
482 
483         CConn_ServiceStream svc(kXSiteFwd);
484         svc.rdbuf()->PUBSETBUF(0, 0);  // quick way to make stream unbuffered
485         if (svc.write((const char*) &rr.ticket/*0*/, sizeof(rr.ticket))  &&
486             svc.write(buf, offsetof(SFWDRequestReply,text) + text_len)) {
487             svc.read(buf, sizeof(buf)-1);
488             len = (size_t) svc.gcount();
489             _ASSERT(len < sizeof(buf));
490         }
491 
492         memset(buf + len, 0, sizeof(buf) - len); // NB: terminates "text" field
493 
494         if (len < offsetof(SFWDRequestReply,text)
495             ||  (rr.flag & FWD_RR_ERRORMASK)  ||  !rr.port) {
496             const char* err;
497             if (len == 0)
498                 err = "Connection refused";
499             else if (len < offsetof(SFWDRequestReply,text))
500                 err = "Short response received";
501             else if (!(rr.flag & FWD_RR_ERRORMASK))
502                 err = rr.flag & FWD_RR_REJECTMASK
503                     ? "Client rejected" : "Unknown error";
504             else if (memcmp(buf, "NCBI", 4) != 0)
505                 err = rr.text[0] ? rr.text : "Unspecified error";
506             else
507                 err = buf;
508             NCBI_THROW_FMT(CNetSrvConnException, eConnectionFailure,
509                            "Error while acquiring auth ticket from"
510                            " cross-site connection proxy "
511                            << kXSiteFwd << ": " << err);
512         }
513 
514         if (rr.ticket) {
515             ticket      = rr.ticket;
516             actual.host =                     rr.host;
517             actual.port = SOCK_NetToHostShort(rr.port);
518         } else {
519             SOCK sock;
520             EIO_Status io_st = CONN_GetSOCK(svc.GetCONN(), &sock);
521             if (sock)
522                 io_st = SOCK_CreateOnTop(sock, 0, &sock);
523             _ASSERT(!sock == !(io_st == eIO_Success));
524             if (sock) {
525                 // excess read data to return into sock
526                 text_len  = strlen(rr.text) + 1/*'\0'-terminated*/;
527                 if (text_len > text_max)
528                     text_len = text_max;
529                 text_len += offsetof(SFWDRequestReply,text);
530                 _ASSERT(text_len <= len);
531                 io_st = SOCK_Pushback(sock, buf + text_len, len - text_len);
532             }
533             if (io_st != eIO_Success) {
534                 SOCK_Destroy(sock);
535                 const char* err = IO_StatusStr(io_st);
536                 NCBI_THROW_FMT(CNetSrvConnException, eConnectionFailure,
537                                "Error while tunneling through proxy "
538                                << kXSiteFwd << ": " << err);
539             }
540             socket.Reset(sock, eTakeOwnership, eCopyTimeoutsToSOCK);
541             actual.port = 0;
542         }
543     }
544 
545     if (actual.port) {
546         SNetServerImpl::ConnectImpl(socket, deadline, actual, original);
547     }
548 
549     if (ticket  &&  socket.Write(&ticket, sizeof(ticket)) != eIO_Success) {
550         NCBI_THROW(CNetSrvConnException, eConnectionFailure,
551                    "Error while sending proxy auth ticket");
552     }
553 }
554 
GetDomain(unsigned int ip)555 int SNetServiceXSiteAPI::GetDomain(unsigned int ip)
556 {
557     TNCBI_IPv6Addr addr;
558     NcbiIPv4ToIPv6(&addr, ip, 0);
559 
560     SNcbiDomainInfo info;
561     NcbiIsLocalIPEx(&addr, &info);
562 
563     if (!info.num) {
564         NCBI_THROW(CNetSrvConnException, eLBNameNotFound,
565                    "Cannot determine local domain");
566     }
567 
568     return info.num;
569 }
570 
IsForeignAddr(unsigned int ip)571 bool SNetServiceXSiteAPI::IsForeignAddr(unsigned int ip)
572 {
573     if (!IsUsingXSiteProxy()) return false;
574 
575     return m_LocalDomain != GetDomain(ip);
576 }
577 
578 atomic<int> SNetServiceXSiteAPI::m_LocalDomain{0};
579 atomic<bool> SNetServiceXSiteAPI::m_AllowXSiteConnections{false};
580 
581 #else
582 
InitXSite(CSynRegistry &,const SRegSynonyms &)583 void SNetServiceXSiteAPI::InitXSite(CSynRegistry&, const SRegSynonyms&)
584 {
585 }
586 
ConnectXSite(CSocket & socket,SNetServerImpl::SConnectDeadline & deadline,const SSocketAddress & original,const string &)587 void SNetServiceXSiteAPI::ConnectXSite(CSocket& socket,
588         SNetServerImpl::SConnectDeadline& deadline,
589         const SSocketAddress& original, const string&)
590 {
591     SNetServerImpl::ConnectImpl(socket, deadline, original, original);
592 }
593 
594 #endif
595 
Init(CSynRegistry & registry,SRegSynonyms & sections,const string & ns_client_name)596 void SNetServiceImpl::Init(CSynRegistry& registry, SRegSynonyms& sections, const string& ns_client_name)
597 {
598     // Initialize the connect library and LBSM structures
599     // used in DiscoverServersIfNeeded().
600     {
601         class CInPlaceConnIniter : protected CConnIniter
602         {
603         public:
604             void NoOp() {};
605         } conn_initer;
606         conn_initer.NoOp();
607     }
608 
609     NStr::TruncateSpacesInPlace(m_ServiceName);
610 
611     // TODO:
612     // Do not check for emptiness and always read values (client, service, etc) from registry
613     // after values provided in ctors get into an underlying memory registry.
614 
615     // Do not override explicitly set client name
616     if (m_ClientName.empty()) m_ClientName = registry.Get(sections, { "client_name", "client" }, "");
617 
618     // Use client name from NetSchedule API if it's not provided for NetCache API
619     if (m_ClientName.empty()) m_ClientName = ns_client_name;
620 
621     if (m_ServiceName.empty()) {
622         m_ServiceName = registry.Get(sections, { "service", "service_name" }, "");
623 
624         if (m_ServiceName.empty()) {
625             string host = registry.Get(sections, { "server", "host" }, "");
626             string port = registry.Get(sections, "port", "");
627 
628             if (!host.empty() && !port.empty()) m_ServiceName = host + ":" + port;
629         }
630     }
631 
632     InitXSite(registry, sections);
633 
634     m_UseSmartRetries = registry.Get(sections, "smart_service_retries", true);
635 
636     int max_retries = registry.Get({ sections, "netservice_api" }, "connection_max_retries", CONNECTION_MAX_RETRIES);
637     m_ConnectionMaxRetries = max_retries >= 0 ? max_retries : CONNECTION_MAX_RETRIES;
638 
639     double retry_delay = registry.Get({ sections, "netservice_api" }, "retry_delay", RETRY_DELAY_DEFAULT);
640     if (retry_delay < 0) retry_delay = RETRY_DELAY_DEFAULT;
641     m_ConnectionRetryDelay = static_cast<unsigned long>(retry_delay * kMilliSecondsPerSecond);
642 
643     if (m_ClientName.empty() || m_ClientName == "noname" ||
644             NStr::FindNoCase(m_ClientName, "unknown") != NPOS) {
645         CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
646         if (!app) {
647             NCBI_THROW_FMT(CArgException, eNoValue,
648                 m_APIName << ": client name is not set");
649         }
650         m_ClientName = app->GetProgramDisplayName();
651     }
652 
653     m_ServerPool->Init(registry, sections);
654 
655     Construct();
656 }
657 
Init(CSynRegistry & registry,const SRegSynonyms & sections)658 void SNetServerPoolImpl::Init(CSynRegistry& registry, const SRegSynonyms& sections)
659 {
660     const auto kConnTimeoutDefault = 2.0;
661     const auto kCommTimeoutDefault = 12.0;
662     const auto kFirstServerTimeoutDefault = 0.0;
663 
664     m_LBSMAffinity.first = registry.Get(sections, "use_lbsm_affinity", "");
665 
666     // Get affinity value from the local LBSM configuration file.
667     if (!m_LBSMAffinity.first.empty()) {
668         m_LBSMAffinity.second = LBSMD_GetHostParameter(SERV_LOCALHOST, m_LBSMAffinity.first.c_str());
669     }
670 
671     double conn_timeout = registry.Get(sections, "connection_timeout", kConnTimeoutDefault);
672     g_CTimeoutToSTimeout(CTimeout(conn_timeout > 0 ? conn_timeout : kConnTimeoutDefault), m_ConnTimeout);
673 
674     double comm_timeout = registry.Get({ sections, "netservice_api" }, "communication_timeout", kCommTimeoutDefault);
675     g_CTimeoutToSTimeout(CTimeout(comm_timeout > 0 ? comm_timeout : kCommTimeoutDefault), m_CommTimeout);
676 
677     double first_srv_timeout = registry.Get(sections, "first_server_timeout", kFirstServerTimeoutDefault);
678     g_CTimeoutToSTimeout(CTimeout(first_srv_timeout > 0 ? first_srv_timeout : kFirstServerTimeoutDefault), m_FirstServerTimeout);
679 
680     double max_total_time = registry.Get(sections, "max_connection_time", 0.0);
681     if (max_total_time > 0) m_MaxTotalTime = CTimeout(max_total_time);
682 
683     m_ThrottleParams.Init(registry, sections);
684 }
685 
AllocServerGroup(unsigned discovery_iteration)686 SDiscoveredServers* SNetServiceImpl::AllocServerGroup(
687     unsigned discovery_iteration)
688 {
689     if (m_ServerGroupPool == NULL)
690         return new SDiscoveredServers(discovery_iteration);
691     else {
692         SDiscoveredServers* server_group = m_ServerGroupPool;
693         m_ServerGroupPool = server_group->m_NextGroupInPool;
694 
695         server_group->Reset(discovery_iteration);
696 
697         return server_group;
698     }
699 }
700 
MakeAuthString()701 string SNetServiceImpl::MakeAuthString()
702 {
703     string auth;
704     auth.reserve(256);
705 
706     auth += "client=\"";
707     auth += NStr::PrintableString(m_ClientName);
708     auth += '\"';
709 
710     if (!m_ServerPool->m_UseOldStyleAuth) {
711         if (m_ServiceType == eLoadBalancedService) {
712             auth += " svc=\"";
713             auth += NStr::PrintableString(m_ServiceName);
714             auth += '\"';
715         }
716 
717         CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
718         if (app) {
719             auth += " client_path=\"";
720             auth += NStr::PrintableString(app->GetProgramExecutablePath());
721             auth += '\"';
722         }
723     }
724 
725     return auth;
726 }
727 
GetServiceName() const728 const string& CNetService::GetServiceName() const
729 {
730     return m_Impl->m_ServiceName;
731 }
732 
GetServerPool()733 CNetServerPool CNetService::GetServerPool()
734 {
735     return m_Impl->m_ServerPool;
736 }
737 
IsLoadBalanced() const738 bool CNetService::IsLoadBalanced() const
739 {
740     return m_Impl->IsLoadBalanced();
741 }
742 
StickToServer(SSocketAddress address)743 void CNetServerPool::StickToServer(SSocketAddress address)
744 {
745     m_Impl->m_EnforcedServer = move(address);
746 }
747 
PrintCmdOutput(const string & cmd,CNcbiOstream & output_stream,CNetService::ECmdOutputStyle output_style,CNetService::EIterationMode iteration_mode)748 void CNetService::PrintCmdOutput(const string& cmd,
749         CNcbiOstream& output_stream, CNetService::ECmdOutputStyle output_style,
750         CNetService::EIterationMode iteration_mode)
751 {
752     bool load_balanced = IsLoadBalanced() ?
753         output_style != eMultilineOutput_NoHeaders : false;
754 
755     for (CNetServiceIterator it = Iterate(iteration_mode); it; ++it) {
756         if (load_balanced)
757             output_stream << '[' << (*it).GetServerAddress() << ']' << endl;
758 
759         switch (output_style) {
760         case eSingleLineOutput:
761             output_stream << (*it).ExecWithRetry(cmd, false).response << endl;
762             break;
763 
764         case eUrlEncodedOutput:
765             {
766                 CUrlArgs url_parser((*it).ExecWithRetry(cmd, false).response);
767 
768                 ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
769                     output_stream << field->name <<
770                             ": " << field->value << endl;
771                 }
772             }
773             break;
774 
775         default:
776             {
777                 CNetServerMultilineCmdOutput output(
778                         (*it).ExecWithRetry(cmd, true));
779 
780                 if (output_style == eMultilineOutput_NetCacheStyle)
781                     output->SetNetCacheCompatMode();
782 
783                 string line;
784 
785                 while (output.ReadLine(line))
786                     output_stream << line << endl;
787             }
788         }
789 
790         if (load_balanced)
791             output_stream << endl;
792     }
793 }
794 
FindOrCreateServerImpl(SSocketAddress server_address)795 SNetServerInPool* SNetServerPoolImpl::FindOrCreateServerImpl(
796         SSocketAddress server_address)
797 {
798     pair<TNetServerByAddress::iterator, bool> loc(m_Servers.insert(
799             TNetServerByAddress::value_type(server_address,
800                     (SNetServerInPool*) NULL)));
801 
802     if (!loc.second)
803         return loc.first->second;
804 
805     auto* server = new SNetServerInPool(move(server_address), m_PropCreator(), m_ThrottleParams);
806 
807     loc.first->second = server;
808 
809     return server;
810 }
811 
ReturnServer(SNetServerInPool * server_impl)812 CRef<SNetServerInPool> SNetServerPoolImpl::ReturnServer(
813         SNetServerInPool* server_impl)
814 {
815     CFastMutexGuard server_mutex_lock(m_ServerMutex);
816 
817     server_impl->m_ServerPool = this;
818     return CRef<SNetServerInPool>(server_impl);
819 }
820 
GetServer(SNetServiceImpl * service,SSocketAddress server_address)821 CNetServer SNetServerPoolImpl::GetServer(SNetServiceImpl* service, SSocketAddress server_address)
822 {
823     CFastMutexGuard server_mutex_lock(m_ServerMutex);
824 
825     auto* server = FindOrCreateServerImpl(m_EnforcedServer.host == 0 ? move(server_address) : m_EnforcedServer);
826     server->m_ServerPool = this;
827 
828     return new SNetServerImpl(service, server);
829 }
830 
GetServer(SSocketAddress server_address)831 CNetServer SNetServiceImpl::GetServer(SSocketAddress server_address)
832 {
833     m_RebalanceStrategy.OnResourceRequested();
834     return m_ServerPool->GetServer(this, move(server_address));
835 }
836 
GetServer(const string & host,unsigned short port)837 CNetServer CNetService::GetServer(const string& host,
838         unsigned short port)
839 {
840     return m_Impl->GetServer(SSocketAddress(host, port));
841 }
842 
GetServer(unsigned host,unsigned short port)843 CNetServer CNetService::GetServer(unsigned host, unsigned short port)
844 {
845     return m_Impl->GetServer(SSocketAddress(host, port));
846 }
847 
848 class SRandomServiceTraversal : public IServiceTraversal
849 {
850 public:
SRandomServiceTraversal(CNetService::TInstance service)851     SRandomServiceTraversal(CNetService::TInstance service) :
852         m_Service(service)
853     {
854     }
855 
856     virtual CNetServer BeginIteration();
857     virtual CNetServer NextServer();
858 
859 private:
860     CNetService m_Service;
861     CNetServiceIterator m_Iterator;
862 };
863 
BeginIteration()864 CNetServer SRandomServiceTraversal::BeginIteration()
865 {
866     return *(m_Iterator = m_Service.Iterate(CNetService::eRandomize));
867 }
868 
NextServer()869 CNetServer SRandomServiceTraversal::NextServer()
870 {
871     return ++m_Iterator ? *m_Iterator : CNetServer();
872 }
873 
FindServerAndExec(const string & cmd,bool multiline_output)874 CNetServer::SExecResult SNetServiceImpl::FindServerAndExec(const string& cmd,
875         bool multiline_output)
876 {
877     switch (m_ServiceType) {
878     default: // eServiceNotDefined
879         NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
880                 m_APIName << ": service name is not set");
881 
882     case eLoadBalancedService:
883         {
884             CNetServer::SExecResult exec_result;
885 
886             SRandomServiceTraversal random_traversal(this);
887 
888             IterateUntilExecOK(cmd, multiline_output,
889                     exec_result, &random_traversal,
890                     SNetServiceImpl::eIgnoreServerErrors);
891 
892             return exec_result;
893         }
894 
895     case eSingleServerService:
896         {
897             CNetServer server(new SNetServerImpl(this,
898                     m_ServerPool->ReturnServer(
899                     m_DiscoveredServers->m_Servers.front().first)));
900 
901             return server.ExecWithRetry(cmd, multiline_output);
902         }
903     }
904 }
905 
FindServerAndExec(const string & cmd,bool multiline_output)906 CNetServer::SExecResult CNetService::FindServerAndExec(const string& cmd,
907         bool multiline_output)
908 {
909     return m_Impl->FindServerAndExec(cmd, multiline_output);
910 }
911 
ExecOnAllServers(const string & cmd)912 void CNetService::ExecOnAllServers(const string& cmd)
913 {
914     for (CNetServiceIterator it = Iterate(eIncludePenalized); it; ++it)
915         (*it).ExecWithRetry(cmd, false);
916 }
917 
DiscoverServersIfNeeded()918 void SNetServiceImpl::DiscoverServersIfNeeded()
919 {
920     if (m_ServiceType == eServiceNotDefined) {
921         NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
922             m_APIName << ": service name is not set");
923     }
924 
925     if (m_ServiceType == eLoadBalancedService) {
926         // The service is load-balanced, check if rebalancing is required.
927         m_RebalanceStrategy.OnResourceRequested();
928         if (m_RebalanceStrategy.NeedRebalance())
929             ++m_LatestDiscoveryIteration;
930 
931         if (m_DiscoveredServers == NULL ||
932             m_DiscoveredServers->m_DiscoveryIteration !=
933                 m_LatestDiscoveryIteration) {
934             // The requested server group either does not exist or
935             // does not contain up-to-date server list, thus it needs
936             // to be created anew.
937 
938             const TSERV_Type types = fSERV_Standalone | fSERV_IncludeStandby |
939                 fSERV_IncludeReserved | fSERV_IncludeSuppressed;
940 
941             auto discovered = CServiceDiscovery::DiscoverImpl(m_ServiceName, types, m_NetInfo, m_ServerPool->m_LBSMAffinity,
942                     TServConn_MaxFineLBNameRetries::GetDefault(), m_ConnectionRetryDelay);
943 
944             SDiscoveredServers* server_group = m_DiscoveredServers;
945 
946             if (server_group != NULL && !server_group->m_Service)
947                 server_group->Reset(m_LatestDiscoveryIteration);
948             else
949                 // Either the group does not exist or it has been
950                 // "issued" to the outside callers; allocate a new one.
951                 server_group = m_DiscoveredServers =
952                     AllocServerGroup(m_LatestDiscoveryIteration);
953 
954             CFastMutexGuard server_mutex_lock(m_ServerPool->m_ServerMutex);
955 
956             TNetServerList& servers = server_group->m_Servers;
957             TNetServerList::size_type number_of_regular_servers = 0;
958             TNetServerList::size_type number_of_standby_servers = 0;
959             double max_standby_rate = LBSMD_PENALIZED_RATE_BOUNDARY;
960 
961             // Fill the 'servers' array in accordance with the layout
962             // described above the SDiscoveredServers::m_Servers declaration.
963             for (const auto& d : discovered) {
964                 SNetServerInPool* server = m_ServerPool->FindOrCreateServerImpl(d.first);
965                 server->m_ThrottleStats.Discover();
966 
967                 TServerRate server_rate(server, d.second);
968 
969                 if (d.second > 0)
970                     servers.insert(servers.begin() +
971                         number_of_regular_servers++, server_rate);
972                 else if (d.second < max_standby_rate ||
973                         d.second <= LBSMD_PENALIZED_RATE_BOUNDARY)
974                     servers.push_back(server_rate);
975                 else {
976                     servers.insert(servers.begin() +
977                         number_of_regular_servers, server_rate);
978                     if (d.second == max_standby_rate)
979                         ++number_of_standby_servers;
980                     else {
981                         max_standby_rate = d.second;
982                         number_of_standby_servers = 1;
983                     }
984                 }
985             }
986 
987             server_group->m_SuppressedBegin = servers.begin() +
988                 (number_of_regular_servers > 0 ?
989                     number_of_regular_servers : number_of_standby_servers);
990 
991             server_mutex_lock.Release();
992         }
993     }
994 }
995 
GetDiscoveredServers(CRef<SDiscoveredServers> & discovered_servers)996 void SNetServiceImpl::GetDiscoveredServers(
997     CRef<SDiscoveredServers>& discovered_servers)
998 {
999     CFastMutexGuard discovery_mutex_lock(m_DiscoveryMutex);
1000     DiscoverServersIfNeeded();
1001     discovered_servers = m_DiscoveredServers;
1002     discovered_servers->m_Service = this;
1003 }
1004 
IsInService(CNetServer::TInstance server)1005 bool SNetServiceImpl::IsInService(CNetServer::TInstance server)
1006 {
1007     CRef<SDiscoveredServers> servers;
1008     GetDiscoveredServers(servers);
1009 
1010     // Find the requested server among the discovered servers.
1011     ITERATE(TNetServerList, it, servers->m_Servers) {
1012         if (it->first == server->m_ServerInPool)
1013             return true;
1014     }
1015 
1016     return false;
1017 }
1018 
1019 struct SFailOnlyWarnings : deque<pair<string, CNetServer>>
1020 {
SFailOnlyWarningsSFailOnlyWarnings1021     SFailOnlyWarnings(CRef<INetServerConnectionListener> listener) : m_Listener(listener) {}
~SFailOnlyWarningsSFailOnlyWarnings1022     ~SFailOnlyWarnings() { IssueAndClear(); }
1023 
IssueAndClearSFailOnlyWarnings1024     void IssueAndClear()
1025     {
1026         for (auto& w : *this) {
1027             m_Listener->OnWarning(w.first, w.second);
1028         }
1029 
1030         clear();
1031     }
1032 
1033 private:
1034     CRef<INetServerConnectionListener> m_Listener;
1035 };
1036 
IterateUntilExecOK(const string & cmd,bool multiline_output,CNetServer::SExecResult & exec_result,IServiceTraversal * service_traversal,SNetServiceImpl::EServerErrorHandling error_handling)1037 void SNetServiceImpl::IterateUntilExecOK(const string& cmd,
1038     bool multiline_output,
1039     CNetServer::SExecResult& exec_result,
1040     IServiceTraversal* service_traversal,
1041     SNetServiceImpl::EServerErrorHandling error_handling)
1042 {
1043     int retry_count = m_ConnectionMaxRetries;
1044 
1045     const unsigned long retry_delay = m_ConnectionRetryDelay;
1046 
1047     const CTimeout& max_total_time = m_ServerPool->m_MaxTotalTime;
1048     CDeadline deadline(max_total_time);
1049 
1050     enum EIterationMode {
1051         eInitialIteration,
1052         eRetry
1053     } iteration_mode = eInitialIteration;
1054     CNetServer server = service_traversal->BeginIteration();
1055 
1056     vector<CNetServer> servers_to_retry;
1057     unsigned current_server = 0;
1058 
1059     bool skip_server;
1060 
1061     unsigned number_of_servers = 0;
1062     unsigned ns_with_submits_disabled = 0;
1063     unsigned servers_throttled = 0;
1064     bool blob_not_found = false;
1065 
1066     const auto& fst = m_ServerPool->m_FirstServerTimeout;
1067     const bool use_fst = (fst.sec || fst.usec) && (retry_count > 0 || m_UseSmartRetries);
1068     const STimeout* timeout = use_fst ? &fst : nullptr;
1069 
1070     SFailOnlyWarnings fail_only_warnings(m_Listener);
1071 
1072     for (;;) {
1073         skip_server = false;
1074 
1075         try {
1076             server->ConnectAndExec(cmd, multiline_output, exec_result,
1077                     timeout);
1078             fail_only_warnings.clear();
1079             return;
1080         }
1081         catch (CNetCacheBlobTooOldException& /*ex rethrown*/) {
1082             throw;
1083         }
1084         catch (CNetCacheException& ex) {
1085             if (retry_count <= 0 && !m_UseSmartRetries)
1086                 throw;
1087             if (error_handling == eRethrowAllServerErrors)
1088                 throw;
1089             if (ex.GetErrCode() == CNetCacheException::eBlobNotFound) {
1090                 blob_not_found = true;
1091                 skip_server = true;
1092             } else if (error_handling == eRethrowServerErrors)
1093                 throw;
1094             else
1095                 m_Listener->OnWarning(ex.GetMsg(), server);
1096         }
1097         catch (CNetScheduleException& ex) {
1098             if (retry_count <= 0 && !m_UseSmartRetries)
1099                 throw;
1100             if (error_handling == eRethrowAllServerErrors)
1101                 throw;
1102             if (ex.GetErrCode() == CNetScheduleException::eSubmitsDisabled) {
1103                 ++ns_with_submits_disabled;
1104                 skip_server = true;
1105                 fail_only_warnings.emplace_back(ex.GetMsg(), server);
1106             } else if (error_handling == eRethrowServerErrors)
1107                 throw;
1108             else
1109                 m_Listener->OnWarning(ex.GetMsg(), server);
1110         }
1111         catch (CNetSrvConnException& ex) {
1112             if (retry_count <= 0 && !m_UseSmartRetries)
1113                 throw;
1114             switch (ex.GetErrCode()) {
1115             case CNetSrvConnException::eReadTimeout:
1116             case CNetSrvConnException::eConnectionFailure:
1117                 m_Listener->OnWarning(ex.GetMsg(), server);
1118                 break;
1119 
1120             case CNetSrvConnException::eServerThrottle:
1121                 ++servers_throttled;
1122                 fail_only_warnings.emplace_back(ex.GetMsg(), server);
1123                 break;
1124 
1125             default:
1126                 throw;
1127             }
1128         }
1129 
1130         ++number_of_servers;
1131 
1132         if (iteration_mode == eInitialIteration) {
1133             if (!skip_server)
1134                 servers_to_retry.push_back(server);
1135             server = service_traversal->NextServer();
1136         } else {
1137             if (!skip_server)
1138                 ++current_server;
1139             else
1140                 servers_to_retry.erase(servers_to_retry.begin() +
1141                         current_server);
1142 
1143             if (current_server < servers_to_retry.size())
1144                 server = servers_to_retry[current_server];
1145             else
1146                 server = NULL;
1147         }
1148 
1149         if (!blob_not_found && !deadline.IsInfinite() &&
1150                 deadline.GetRemainingTime().GetAsMilliSeconds() <= (server ? 0 : retry_delay)) {
1151             NCBI_THROW_FMT(CNetSrvConnException, eReadTimeout, "Exceeded max_connection_time=" <<
1152                     max_total_time.GetAsMilliSeconds() << "; cmd=[" << cmd << "]");
1153         }
1154 
1155         if (!server) {
1156             if (number_of_servers == ns_with_submits_disabled) {
1157                 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1158                         "Cannot execute ["  << cmd <<
1159                         "]: all NetSchedule servers are "
1160                         "in REFUSESUBMITS mode for the " + m_ServiceName + " service.");
1161             }
1162 
1163             if (number_of_servers == servers_throttled) {
1164                 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1165                         "Cannot execute ["  << cmd <<
1166                         "]: all servers are throttled for the " + m_ServiceName + " service.");
1167             }
1168 
1169             if (retry_count <= 0 || servers_to_retry.empty()) {
1170                 if (blob_not_found) {
1171                     NCBI_THROW_FMT(CNetCacheException, eBlobNotFound,
1172                             "Cannot execute ["  << cmd << "]: blob not found.");
1173                 }
1174                 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1175                         "Unable to execute [" << cmd <<
1176                         "] on any of the discovered servers for the " + m_ServiceName + " service.");
1177             }
1178 
1179             fail_only_warnings.IssueAndClear();
1180             ERR_POST(Warning << "Unable to send [" << cmd << "] to any "
1181                     "of the discovered servers; will retry after delay.");
1182 
1183             SleepMilliSec(retry_delay);
1184 
1185             number_of_servers = 0;
1186             ns_with_submits_disabled = 0;
1187             servers_throttled = 0;
1188 
1189             iteration_mode = eRetry;
1190             server = servers_to_retry[current_server = 0];
1191         }
1192 
1193         --retry_count;
1194 
1195         timeout = NULL;
1196     }
1197 }
1198 
CreateRetryGuard(SRetry::EType type)1199 shared_ptr<void> SNetServiceImpl::CreateRetryGuard(SRetry::EType type)
1200 {
1201     switch (type)
1202     {
1203         case SRetry::eNoRetryNoErrors: return make_shared<SNoRetryNoErrors>(this);
1204         case SRetry::eNoRetry:         return make_shared<SNoRetry>(this);
1205         default:                       return nullptr;
1206     }
1207 }
1208 
ResetServerConnections()1209 void SNetServerPoolImpl::ResetServerConnections()
1210 {
1211     CFastMutexGuard server_mutex_lock(m_ServerMutex);
1212 
1213     NON_CONST_ITERATE(TNetServerByAddress, it, m_Servers) {
1214         it->second->m_CurrentConnectionGeneration.Add(1);
1215     }
1216 }
1217 
~SNetServerPoolImpl()1218 SNetServerPoolImpl::~SNetServerPoolImpl()
1219 {
1220     // Clean up m_Servers
1221     NON_CONST_ITERATE(TNetServerByAddress, it, m_Servers) {
1222         delete it->second;
1223     }
1224 
1225     if (m_LBSMAffinity.second) free(const_cast<char*>(m_LBSMAffinity.second));
1226 }
1227 
~SNetServiceImpl()1228 SNetServiceImpl::~SNetServiceImpl()
1229 {
1230     delete m_DiscoveredServers;
1231 
1232     // Clean up m_ServerGroupPool
1233     SDiscoveredServers* server_group = m_ServerGroupPool;
1234     while (server_group != NULL) {
1235         SDiscoveredServers* next_group = server_group->m_NextGroupInPool;
1236         delete server_group;
1237         server_group = next_group;
1238     }
1239 }
1240 
SetCommunicationTimeout(const STimeout & to)1241 void CNetServerPool::SetCommunicationTimeout(const STimeout& to)
1242 {
1243     m_Impl->m_CommTimeout = to;
1244 }
GetCommunicationTimeout() const1245 const STimeout& CNetServerPool::GetCommunicationTimeout() const
1246 {
1247     return m_Impl->m_CommTimeout;
1248 }
1249 
Iterate(CNetService::EIterationMode mode)1250 CNetServiceIterator CNetService::Iterate(CNetService::EIterationMode mode)
1251 {
1252     CRef<SDiscoveredServers> servers;
1253     m_Impl->GetDiscoveredServers(servers);
1254 
1255     if (mode != eIncludePenalized) {
1256         if (servers->m_Servers.begin() < servers->m_SuppressedBegin) {
1257             if (mode == eSortByLoad)
1258                 return new SNetServiceIterator_OmitPenalized(servers);
1259             else if (mode == eRoundRobin) {
1260                 auto begin = servers->m_Servers.begin() + m_Impl->m_RoundRobin++ % servers->m_Servers.size();
1261                 return new SNetServiceIterator_Circular(servers, begin);
1262             } else
1263                 return new SNetServiceIterator_RandomPivot(servers);
1264         }
1265     } else
1266         if (!servers->m_Servers.empty())
1267             return new SNetServiceIteratorImpl(servers);
1268 
1269     NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1270         "Couldn't find any available servers for the " +
1271         m_Impl->m_ServiceName + " service.");
1272 }
1273 
Iterate(CNetServer::TInstance priority_server)1274 CNetServiceIterator CNetService::Iterate(CNetServer::TInstance priority_server)
1275 {
1276     return m_Impl->Iterate(priority_server);
1277 }
1278 
Iterate(CNetServer::TInstance priority_server)1279 SNetServiceIteratorImpl* SNetServiceImpl::Iterate(CNetServer::TInstance priority_server)
1280 {
1281     CRef<SDiscoveredServers> servers;
1282     GetDiscoveredServers(servers);
1283 
1284     // Find the requested server among the discovered servers.
1285     ITERATE(TNetServerList, it, servers->m_Servers) {
1286         if (it->first == priority_server->m_ServerInPool)
1287             return new SNetServiceIterator_Circular(servers, it);
1288     }
1289 
1290     if (!servers->m_Servers.empty())
1291         // The requested server is not found in this service,
1292         // however there are servers, so return them.
1293         return new SNetServiceIteratorImpl(servers);
1294 
1295     NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1296         "Couldn't find any available servers for the " +
1297         m_ServiceName + " service.");
1298 }
1299 
IterateByWeight(const string & key)1300 CNetServiceIterator CNetService::IterateByWeight(const string& key)
1301 {
1302     CRef<SDiscoveredServers> servers;
1303     m_Impl->GetDiscoveredServers(servers);
1304 
1305     if (servers->m_Servers.begin() == servers->m_SuppressedBegin) {
1306         NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1307             "Couldn't find any available servers for the " +
1308             m_Impl->m_ServiceName + " service.");
1309     }
1310 
1311     CChecksum key_crc32(CChecksum::eCRC32);
1312 
1313     key_crc32.AddChars(key.data(), key.length());
1314 
1315     return new SNetServiceIterator_Weighted(servers, key_crc32.GetChecksum());
1316 }
1317 
ExcludeServer(CNetServer::TInstance server)1318 CNetServiceIterator CNetService::ExcludeServer(CNetServer::TInstance server)
1319 {
1320     CRef<SDiscoveredServers> servers;
1321     m_Impl->GetDiscoveredServers(servers);
1322 
1323     // Find the requested server among the discovered servers.
1324     ITERATE(TNetServerList, it, servers->m_Servers) {
1325         if (it->first == server->m_ServerInPool) {
1326             // The server is found. Make an iterator and
1327             // skip to the next server (the iterator may become NULL).
1328             CNetServiceIterator circular_iter(
1329                     new SNetServiceIterator_Circular(servers, it));
1330             return ++circular_iter;
1331         }
1332     }
1333 
1334     // The requested server is not found in this service, so
1335     // return the rest of servers or NULL if there are none.
1336     return !servers->m_Servers.empty() ?
1337             new SNetServiceIteratorImpl(servers) : NULL;
1338 }
1339 
FindServer(INetServerFinder * finder,CNetService::EIterationMode mode)1340 CNetServiceIterator CNetService::FindServer(INetServerFinder* finder,
1341     CNetService::EIterationMode mode)
1342 {
1343     string error_messages;
1344 
1345     CNetServiceIterator it = Iterate(mode);
1346 
1347     for (; it; ++it) {
1348         try {
1349             if (finder->Consider(*it))
1350                 break;
1351         }
1352         catch (CNetServiceException& ex) {
1353             if (ex.GetErrCode() != CNetServiceException::eCommunicationError)
1354                 throw;
1355 
1356             if (!error_messages.empty())
1357                 error_messages += '\n';
1358 
1359             error_messages += (*it)->m_ServerInPool->m_Address.AsString();
1360             error_messages += ": ";
1361             error_messages += ex.what();
1362         }
1363         catch (CIO_Exception& ex) {
1364             if (!error_messages.empty())
1365                 error_messages += '\n';
1366 
1367             error_messages += (*it)->m_ServerInPool->m_Address.AsString();
1368             error_messages += ": ";
1369             error_messages += ex.what();
1370         }
1371     }
1372 
1373     if (!error_messages.empty()) {
1374         LOG_POST(error_messages);
1375     }
1376 
1377     return it;
1378 }
1379 
Clone(const string & name)1380 CNetService CNetService::Clone(const string& name)
1381 {
1382     _ASSERT(m_Impl);
1383     return name == m_Impl->m_ServiceName ? m_Impl :
1384         SNetServiceImpl::Clone(name, m_Impl);
1385 }
1386 
SetErrorHandler(TEventHandler error_handler)1387 void CNetService::SetErrorHandler(TEventHandler error_handler)
1388 {
1389     m_Impl->m_Listener->SetErrorHandler(error_handler);
1390 }
1391 
SetWarningHandler(TEventHandler warning_handler)1392 void CNetService::SetWarningHandler(TEventHandler warning_handler)
1393 {
1394     m_Impl->m_Listener->SetWarningHandler(warning_handler);
1395 }
1396 
GetServiceByName(const string & service_name,SNetServiceImpl * prototype)1397 CNetService SNetServiceMap::GetServiceByName(const string& service_name,
1398         SNetServiceImpl* prototype)
1399 {
1400     CFastMutexGuard guard(m_ServiceMapMutex);
1401     return GetServiceByNameImpl(service_name, prototype);
1402 }
1403 
GetServiceByNameImpl(const string & service_name,SNetServiceImpl * prototype)1404 CNetService SNetServiceMap::GetServiceByNameImpl(const string& service_name,
1405         SNetServiceImpl* prototype)
1406 {
1407     pair<TNetServiceByName::iterator, bool> loc(m_ServiceByName.insert(
1408             TNetServiceByName::value_type(service_name, CNetService())));
1409 
1410     return !loc.second ? loc.first->second :
1411             (loc.first->second =
1412                     SNetServiceImpl::Clone(service_name, prototype));
1413 }
1414 
IsAllowed(const string & service_name) const1415 bool SNetServiceMap::IsAllowed(const string& service_name) const
1416 {
1417     // Not restricted or found
1418     return !m_Restricted || m_Allowed.count(service_name);
1419 }
1420 
IsAllowed(CNetServer::TInstance server,SNetServiceImpl * prototype)1421 bool SNetServiceMap::IsAllowed(CNetServer::TInstance server,
1422         SNetServiceImpl* prototype)
1423 {
1424     if (!m_Restricted) return true;
1425 
1426     CFastMutexGuard guard(m_ServiceMapMutex);
1427 
1428     // Check if server belongs to some allowed service
1429     for (auto& service_name: m_Allowed) {
1430         CNetService service(GetServiceByNameImpl(service_name, prototype));
1431 
1432         if (service->IsInService(server)) return true;
1433     }
1434 
1435     return false;
1436 }
1437 
AddToAllowed(const string & service_name)1438 void SNetServiceMap::AddToAllowed(const string& service_name)
1439 {
1440     m_Allowed.insert(service_name);
1441 }
1442 
g_ExecToJson(IExecToJson & exec_to_json,CNetService service,CNetService::EIterationMode iteration_mode)1443 CJsonNode g_ExecToJson(IExecToJson& exec_to_json, CNetService service,
1444         CNetService::EIterationMode iteration_mode)
1445 {
1446     if (!service.IsLoadBalanced())
1447         return exec_to_json.ExecOn(service.Iterate().GetServer());
1448 
1449     CJsonNode result(CJsonNode::NewObjectNode());
1450 
1451     for (CNetServiceIterator it = service.Iterate(iteration_mode); it; ++it)
1452         result.SetByKey((*it).GetServerAddress(), exec_to_json.ExecOn(*it));
1453 
1454     return result;
1455 }
1456 
Create(const string & api_name,const string & service_name,const string & client_name)1457 CNetService CNetService::Create(const string& api_name, const string& service_name, const string& client_name)
1458 {
1459     struct SNoOpConnectionListener : public INetServerConnectionListener
1460     {
1461         INetServerConnectionListener* Clone() override { return new SNoOpConnectionListener(*this); }
1462         void OnConnected(CNetServerConnection&) override {}
1463 
1464     private:
1465         void OnErrorImpl(const string&, CNetServer&) override {}
1466         void OnWarningImpl(const string&, CNetServer&) override {}
1467     };
1468 
1469     CSynRegistryBuilder registry_builder;
1470     SRegSynonyms sections(api_name);
1471 
1472     return SNetServiceImpl::Create(api_name, service_name, client_name, new SNoOpConnectionListener,
1473             registry_builder, sections);
1474 }
1475 
g_AppendClientIPSessionIDHitID(string & cmd)1476 void g_AppendClientIPSessionIDHitID(string& cmd)
1477 {
1478     CRequestContext& req = CDiagContext::GetRequestContext();
1479     g_AppendClientIPAndSessionID(cmd, req);
1480     g_AppendHitID(cmd, req);
1481 }
1482 
1483 END_NCBI_SCOPE
1484