1 /* $Id: netservice_api.cpp 607718 2020-05-06 17:39:37Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 */
30
31 #include <ncbi_pch.hpp>
32
33 #include "../ncbi_comm.h"
34 #include "../ncbi_lbsmd.h"
35 #include "../ncbi_servicep.h"
36
37 #include "netservice_api_impl.hpp"
38
39 #include <connect/services/error_codes.hpp>
40 #include <connect/services/netcache_api_expt.hpp>
41 #include <connect/services/netschedule_api_expt.hpp>
42 #include <connect/services/srv_connections_expt.hpp>
43
44 #include <connect/ncbi_conn_exception.hpp>
45 #include <connect/ncbi_conn_stream.hpp>
46 #include <connect/ncbi_core_cxx.hpp>
47 #include <connect/ncbi_localip.hpp>
48
49 #include <corelib/ncbi_config.hpp>
50 #include <corelib/ncbi_message.hpp>
51 #include <corelib/ncbi_system.hpp>
52
53 #include <util/random_gen.hpp>
54 #include <util/checksum.hpp>
55
56 #include <deque>
57
58 #define NCBI_USE_ERRCODE_X ConnServ_Connection
59
60 #define LBSMD_PENALIZED_RATE_BOUNDARY -0.01
61
62 BEGIN_NCBI_SCOPE
63
64 // The purpose of these classes is to execute commands suppressing possible errors and avoiding retries
65 struct SNoRetry : SNetServiceImpl::SRetry
66 {
SNoRetrySNoRetry67 SNoRetry(SNetServiceImpl* service) :
68 m_Service(service)
69 {
70 _ASSERT(m_Service);
71
72 Swap(*m_Service, m_MaxRetries);
73 }
74
~SNoRetrySNoRetry75 ~SNoRetry()
76 {
77 Swap(*m_Service, m_MaxRetries);
78 }
79
80 protected:
81 CNetRef<SNetServiceImpl> m_Service;
82
83 private:
84 unsigned m_MaxRetries = 0;
85 };
86
87 struct SNoRetryNoErrors : SNoRetry
88 {
SNoRetryNoErrorsSNoRetryNoErrors89 SNoRetryNoErrors(SNetServiceImpl* service) :
90 SNoRetry(service)
91 {
92 Set([](const string&, CNetServer) { return true; });
93 }
94
~SNoRetryNoErrorsSNoRetryNoErrors95 ~SNoRetryNoErrors()
96 {
97 Set(nullptr);
98 }
99
100 private:
SetSNoRetryNoErrors101 void Set(CNetService::TEventHandler error_handler)
102 {
103 m_Service->m_Listener->SetErrorHandler(error_handler);
104 }
105 };
106
DeleteThis()107 void SDiscoveredServers::DeleteThis()
108 {
109 CNetService service(m_Service);
110
111 if (!service)
112 return;
113
114 // Before resetting the m_Service pointer, verify that no other object
115 // has acquired a reference to this server group object yet (between
116 // the time the reference counter went to zero, and the current moment
117 // when m_Service is about to be reset).
118 CFastMutexGuard discovery_mutex_lock(service->m_DiscoveryMutex);
119
120 service = NULL;
121
122 if (!Referenced() && m_Service) {
123 if (m_Service->m_DiscoveredServers != this) {
124 m_NextGroupInPool = m_Service->m_ServerGroupPool;
125 m_Service->m_ServerGroupPool = this;
126 }
127 m_Service = NULL;
128 }
129 }
130
GetServer()131 CNetServer CNetServiceIterator::GetServer()
132 {
133 return m_Impl->GetServer();
134 }
135
GetServer()136 CNetServer SNetServiceIteratorImpl::GetServer()
137 {
138 auto& service = m_ServerGroup->m_Service;
139 service->m_RebalanceStrategy.OnResourceRequested();
140 return new SNetServerImpl(service, service->m_ServerPool->ReturnServer(m_Position->first));
141 }
142
Next()143 bool CNetServiceIterator::Next()
144 {
145 if (m_Impl->Next())
146 return true;
147
148 m_Impl.Reset(NULL);
149 return false;
150 }
151
Prev()152 bool CNetServiceIterator::Prev()
153 {
154 if (m_Impl->Prev())
155 return true;
156
157 m_Impl.Reset(NULL);
158 return false;
159 }
160
GetRate() const161 double CNetServiceIterator::GetRate() const
162 {
163 return m_Impl->GetRate();
164 }
165
Next()166 bool SNetServiceIteratorImpl::Next()
167 {
168 return ++m_Position != m_ServerGroup->m_Servers.end();
169 }
170
Prev()171 bool SNetServiceIteratorImpl::Prev()
172 {
173 if (m_Position == m_ServerGroup->m_Servers.begin())
174 return false;
175 --m_Position;
176 return true;
177 }
178
Next()179 bool SNetServiceIterator_OmitPenalized::Next()
180 {
181 return ++m_Position != m_ServerGroup->m_SuppressedBegin;
182 }
183
184 DEFINE_STATIC_FAST_MUTEX(s_RndLock);
185 static CRandom s_RandomIteratorGen((CRandom::TValue) time(NULL));
186
187 static CRandom::TValue
s_GetRand(CRandom::TValue max_value)188 s_GetRand(CRandom::TValue max_value)
189 {
190 CFastMutexGuard guard(s_RndLock);
191 return s_RandomIteratorGen.GetRand(0, max_value);
192 }
193
SNetServiceIterator_RandomPivot(SDiscoveredServers * server_group_impl)194 SNetServiceIterator_RandomPivot::SNetServiceIterator_RandomPivot(
195 SDiscoveredServers* server_group_impl) :
196 SNetServiceIteratorImpl(server_group_impl,
197 server_group_impl->m_Servers.begin() + s_GetRand(
198 CRandom::TValue((server_group_impl->m_SuppressedBegin -
199 server_group_impl->m_Servers.begin()) - 1)))
200 {
201 }
202
Next()203 bool SNetServiceIterator_RandomPivot::Next()
204 {
205 if (m_RandomIterators.empty()) {
206 TNetServerList::const_iterator it = m_ServerGroup->m_Servers.begin();
207 size_t number_of_servers = m_ServerGroup->m_SuppressedBegin - it;
208 if (number_of_servers <= 1)
209 return false; // There are no servers to advance to.
210 m_RandomIterators.reserve(number_of_servers);
211 m_RandomIterators.push_back(m_Position);
212 --number_of_servers;
213 do {
214 if (it != m_Position) {
215 m_RandomIterators.push_back(it);
216 --number_of_servers;
217 }
218 ++it;
219 } while (number_of_servers > 0);
220 // Shuffle m_RandomIterators starting from the element with index '1'.
221 if (m_RandomIterators.size() > 2) {
222 TRandomIterators::iterator rnt_it = m_RandomIterators.begin();
223 while (++rnt_it != m_RandomIterators.end())
224 swap(*rnt_it, m_RandomIterators[s_RandomIteratorGen.GetRand(1,
225 CRandom::TValue(m_RandomIterators.size() - 1))]);
226 }
227 m_RandomIterator = m_RandomIterators.begin();
228 ++m_RandomIterator;
229 } else
230 if (++m_RandomIterator == m_RandomIterators.end())
231 return false;
232
233 m_Position = *m_RandomIterator;
234
235 return true;
236 }
237
Prev()238 bool SNetServiceIterator_RandomPivot::Prev()
239 {
240 if (m_RandomIterators.empty() ||
241 m_RandomIterator == m_RandomIterators.begin())
242 return false;
243
244 m_Position = *--m_RandomIterator;
245
246 return true;
247 }
248
Next()249 bool SNetServiceIterator_Circular::Next()
250 {
251 if (++m_Position == m_ServerGroup->m_Servers.end())
252 m_Position = m_ServerGroup->m_Servers.begin();
253 return m_Position != m_Pivot;
254 }
255
Prev()256 bool SNetServiceIterator_Circular::Prev()
257 {
258 if (m_Position == m_Pivot)
259 return false;
260 if (m_Position == m_ServerGroup->m_Servers.begin())
261 m_Position = m_ServerGroup->m_Servers.end();
262 --m_Position;
263 return true;
264 }
265
SNetServiceIterator_Weighted(SDiscoveredServers * server_group_impl,Uint4 key_crc32)266 SNetServiceIterator_Weighted::SNetServiceIterator_Weighted(
267 SDiscoveredServers* server_group_impl, Uint4 key_crc32) :
268 SNetServiceIteratorImpl(server_group_impl),
269 m_KeyCRC32(key_crc32)
270 {
271 TNetServerList::const_iterator server_list_iter(m_Position);
272
273 if ((m_SingleServer =
274 (++server_list_iter == server_group_impl->m_SuppressedBegin)))
275 // Nothing to do if there's only one server.
276 return;
277
278 // Find the server with the highest rank.
279 SServerRank highest_rank(x_GetServerRank(m_Position));
280
281 do {
282 SServerRank server_rank(x_GetServerRank(server_list_iter));
283 if (highest_rank < server_rank)
284 highest_rank = server_rank;
285 // To avoid unnecessary memory allocations, do not save
286 // the calculated server ranks in hope that Next()
287 // will be called very rarely for this type of iterators.
288 } while (++server_list_iter != server_group_impl->m_SuppressedBegin);
289
290 m_Position = highest_rank.m_ServerListIter;
291 }
292
Next()293 bool SNetServiceIterator_Weighted::Next()
294 {
295 if (m_SingleServer)
296 return false;
297
298 if (m_ServerRanks.empty()) {
299 TNetServerList::const_iterator server_list_iter(
300 m_ServerGroup->m_Servers.begin());
301 do
302 m_ServerRanks.push_back(x_GetServerRank(server_list_iter));
303 while (++server_list_iter != m_ServerGroup->m_SuppressedBegin);
304
305 // Sort the ranks in *reverse* order.
306 sort(m_ServerRanks.rbegin(), m_ServerRanks.rend());
307
308 // Skip the server with the highest rank, which was the first
309 // server returned by this iterator object.
310 m_CurrentServerRank = m_ServerRanks.begin() + 1;
311 } else if (++m_CurrentServerRank == m_ServerRanks.end())
312 return false;
313
314 m_Position = m_CurrentServerRank->m_ServerListIter;
315 return true;
316 }
317
Prev()318 bool SNetServiceIterator_Weighted::Prev()
319 {
320 if (m_SingleServer)
321 return false;
322
323 _ASSERT(!m_ServerRanks.empty());
324
325 if (m_CurrentServerRank == m_ServerRanks.begin())
326 return false;
327
328 m_Position = (--m_CurrentServerRank)->m_ServerListIter;
329 return true;
330 }
331
SNetServerPoolImpl(INetServerConnectionListener * listener)332 SNetServerPoolImpl::SNetServerPoolImpl(INetServerConnectionListener* listener) :
333 m_PropCreator(listener->GetPropCreator()),
334 m_EnforcedServer(0, 0),
335 m_MaxTotalTime(CTimeout::eInfinite),
336 m_UseOldStyleAuth(false)
337 {
338 }
339
SNetServiceImpl(const string & api_name,const string & service_name,const string & client_name,INetServerConnectionListener * listener,CSynRegistry & registry,const SRegSynonyms & sections)340 SNetServiceImpl::SNetServiceImpl(const string& api_name, const string& service_name, const string& client_name,
341 INetServerConnectionListener* listener, CSynRegistry& registry, const SRegSynonyms& sections) :
342 m_Listener(listener),
343 m_ServerPool(new SNetServerPoolImpl(listener)),
344 m_ServiceName(service_name),
345 m_RebalanceStrategy(registry, sections),
346 m_RoundRobin(0),
347 m_APIName(api_name),
348 m_ClientName(client_name)
349 {
350 }
351
SNetServiceImpl(SNetServerInPool * server,SNetServiceImpl * prototype)352 SNetServiceImpl::SNetServiceImpl(SNetServerInPool* server, SNetServiceImpl* prototype) :
353 m_Listener(prototype->m_Listener->Clone()),
354 m_ServerPool(prototype->m_ServerPool),
355 m_ServiceName(server->m_Address.AsString()),
356 m_RebalanceStrategy(prototype->m_RebalanceStrategy),
357 m_RoundRobin(prototype->m_RoundRobin.load()),
358 m_APIName(prototype->m_APIName),
359 m_ClientName(prototype->m_ClientName),
360 m_UseSmartRetries(prototype->m_UseSmartRetries),
361 m_ConnectionMaxRetries(prototype->m_ConnectionMaxRetries),
362 m_ConnectionRetryDelay(prototype->m_ConnectionRetryDelay),
363 m_NetInfo(prototype->m_NetInfo)
364 {
365 Construct(server);
366 }
367
SNetServiceImpl(const string & service_name,SNetServiceImpl * prototype)368 SNetServiceImpl::SNetServiceImpl(const string& service_name, SNetServiceImpl* prototype) :
369 m_Listener(prototype->m_Listener->Clone()),
370 m_ServerPool(prototype->m_ServerPool),
371 m_ServiceName(service_name),
372 m_RebalanceStrategy(prototype->m_RebalanceStrategy),
373 m_RoundRobin(prototype->m_RoundRobin.load()),
374 m_APIName(prototype->m_APIName),
375 m_ClientName(prototype->m_ClientName),
376 m_UseSmartRetries(prototype->m_UseSmartRetries),
377 m_ConnectionMaxRetries(prototype->m_ConnectionMaxRetries),
378 m_ConnectionRetryDelay(prototype->m_ConnectionRetryDelay),
379 m_NetInfo(prototype->m_NetInfo)
380 {
381 Construct();
382 }
383
Construct(SNetServerInPool * server)384 void SNetServiceImpl::Construct(SNetServerInPool* server)
385 {
386 m_ServiceType = eSingleServerService;
387 m_DiscoveredServers = AllocServerGroup(0);
388 CFastMutexGuard server_mutex_lock(m_ServerPool->m_ServerMutex);
389 m_DiscoveredServers->m_Servers.push_back(TServerRate(server, 1));
390 m_DiscoveredServers->m_SuppressedBegin =
391 m_DiscoveredServers->m_Servers.end();
392 }
393
Construct()394 void SNetServiceImpl::Construct()
395 {
396 if (!m_ServiceName.empty()) {
397 if (auto address = SSocketAddress::Parse(m_ServiceName)) {
398 Construct(m_ServerPool->FindOrCreateServerImpl(move(address)));
399 } else {
400 m_ServiceType = eLoadBalancedService;
401 }
402 }
403 }
404
Create(const string & api_name,const string & service_name,const string & client_name,INetServerConnectionListener * listener,CSynRegistry & registry,SRegSynonyms & sections,const string & ns_client_name)405 SNetServiceImpl* SNetServiceImpl::Create(
406 const string& api_name, const string& service_name, const string& client_name,
407 INetServerConnectionListener* listener,
408 CSynRegistry& registry, SRegSynonyms& sections, const string& ns_client_name)
409 {
410 CNetRef<SNetServiceImpl> rv(new SNetServiceImpl(api_name, service_name, client_name, listener, registry, sections));
411 rv->Init(registry, sections, ns_client_name);
412 return rv.Release();
413 }
414
Clone(SNetServerInPool * server,SNetServiceImpl * prototype)415 SNetServiceImpl* SNetServiceImpl::Clone(SNetServerInPool* server, SNetServiceImpl* prototype)
416 {
417 return new SNetServiceImpl(server, prototype);
418 }
419
Clone(const string & service_name,SNetServiceImpl * prototype)420 SNetServiceImpl* SNetServiceImpl::Clone(const string& service_name, SNetServiceImpl* prototype)
421 {
422 return new SNetServiceImpl(service_name, prototype);
423 }
424
425 #ifdef NCBI_GRID_XSITE_CONN_SUPPORT
AllowXSiteConnections()426 void CNetService::AllowXSiteConnections()
427 {
428 SNetServiceXSiteAPI::AllowXSiteConnections();
429 }
430
IsUsingXSiteProxy()431 bool CNetService::IsUsingXSiteProxy()
432 {
433 return SNetServiceXSiteAPI::IsUsingXSiteProxy();
434 }
435
436 static const char kXSiteFwd[] = "XSITEFWD";
437
AllowXSiteConnections()438 void SNetServiceXSiteAPI::AllowXSiteConnections()
439 {
440 const auto local_ip = CSocketAPI::GetLocalHostAddress();
441 const auto local_domain = GetDomain(local_ip);
442 m_LocalDomain.store(local_domain);
443 m_AllowXSiteConnections.store(true);
444 }
445
IsUsingXSiteProxy()446 bool SNetServiceXSiteAPI::IsUsingXSiteProxy()
447 {
448 return m_AllowXSiteConnections.load();
449 }
450
InitXSite(CSynRegistry & registry,const SRegSynonyms & sections)451 void SNetServiceXSiteAPI::InitXSite(CSynRegistry& registry, const SRegSynonyms& sections)
452 {
453 if (registry.Get({ "netservice_api", sections }, "allow_xsite_conn", false)) {
454 AllowXSiteConnections();
455 }
456 }
457
ConnectXSite(CSocket & socket,SNetServerImpl::SConnectDeadline & deadline,const SSocketAddress & original,const string & service)458 void SNetServiceXSiteAPI::ConnectXSite(CSocket& socket,
459 SNetServerImpl::SConnectDeadline& deadline,
460 const SSocketAddress& original, const string& service)
461 {
462 SSocketAddress actual(original);
463 _ASSERT(actual.port);
464 ticket_t ticket = 0;
465
466 if (IsForeignAddr(actual.host)) {
467 union {
468 SFWDRequestReply rr;
469 char buf[FWD_RR_MAX_SIZE + 1];
470 };
471 memset(&rr, 0, sizeof(rr));
472
473 rr.host = actual.host;
474 rr.port = SOCK_HostToNetShort(actual.port);
475 rr.flag = SOCK_HostToNetShort(FWD_RR_FIREWALL | FWD_RR_KEEPALIVE);
476
477 auto text_max = sizeof(buf)-1 - offsetof(SFWDRequestReply,text);
478 auto text_len = service.size() ? min(service.size() + 1, text_max) : 0;
479 memcpy(rr.text, service.c_str(), text_len);
480
481 size_t len = 0;
482
483 CConn_ServiceStream svc(kXSiteFwd);
484 svc.rdbuf()->PUBSETBUF(0, 0); // quick way to make stream unbuffered
485 if (svc.write((const char*) &rr.ticket/*0*/, sizeof(rr.ticket)) &&
486 svc.write(buf, offsetof(SFWDRequestReply,text) + text_len)) {
487 svc.read(buf, sizeof(buf)-1);
488 len = (size_t) svc.gcount();
489 _ASSERT(len < sizeof(buf));
490 }
491
492 memset(buf + len, 0, sizeof(buf) - len); // NB: terminates "text" field
493
494 if (len < offsetof(SFWDRequestReply,text)
495 || (rr.flag & FWD_RR_ERRORMASK) || !rr.port) {
496 const char* err;
497 if (len == 0)
498 err = "Connection refused";
499 else if (len < offsetof(SFWDRequestReply,text))
500 err = "Short response received";
501 else if (!(rr.flag & FWD_RR_ERRORMASK))
502 err = rr.flag & FWD_RR_REJECTMASK
503 ? "Client rejected" : "Unknown error";
504 else if (memcmp(buf, "NCBI", 4) != 0)
505 err = rr.text[0] ? rr.text : "Unspecified error";
506 else
507 err = buf;
508 NCBI_THROW_FMT(CNetSrvConnException, eConnectionFailure,
509 "Error while acquiring auth ticket from"
510 " cross-site connection proxy "
511 << kXSiteFwd << ": " << err);
512 }
513
514 if (rr.ticket) {
515 ticket = rr.ticket;
516 actual.host = rr.host;
517 actual.port = SOCK_NetToHostShort(rr.port);
518 } else {
519 SOCK sock;
520 EIO_Status io_st = CONN_GetSOCK(svc.GetCONN(), &sock);
521 if (sock)
522 io_st = SOCK_CreateOnTop(sock, 0, &sock);
523 _ASSERT(!sock == !(io_st == eIO_Success));
524 if (sock) {
525 // excess read data to return into sock
526 text_len = strlen(rr.text) + 1/*'\0'-terminated*/;
527 if (text_len > text_max)
528 text_len = text_max;
529 text_len += offsetof(SFWDRequestReply,text);
530 _ASSERT(text_len <= len);
531 io_st = SOCK_Pushback(sock, buf + text_len, len - text_len);
532 }
533 if (io_st != eIO_Success) {
534 SOCK_Destroy(sock);
535 const char* err = IO_StatusStr(io_st);
536 NCBI_THROW_FMT(CNetSrvConnException, eConnectionFailure,
537 "Error while tunneling through proxy "
538 << kXSiteFwd << ": " << err);
539 }
540 socket.Reset(sock, eTakeOwnership, eCopyTimeoutsToSOCK);
541 actual.port = 0;
542 }
543 }
544
545 if (actual.port) {
546 SNetServerImpl::ConnectImpl(socket, deadline, actual, original);
547 }
548
549 if (ticket && socket.Write(&ticket, sizeof(ticket)) != eIO_Success) {
550 NCBI_THROW(CNetSrvConnException, eConnectionFailure,
551 "Error while sending proxy auth ticket");
552 }
553 }
554
GetDomain(unsigned int ip)555 int SNetServiceXSiteAPI::GetDomain(unsigned int ip)
556 {
557 TNCBI_IPv6Addr addr;
558 NcbiIPv4ToIPv6(&addr, ip, 0);
559
560 SNcbiDomainInfo info;
561 NcbiIsLocalIPEx(&addr, &info);
562
563 if (!info.num) {
564 NCBI_THROW(CNetSrvConnException, eLBNameNotFound,
565 "Cannot determine local domain");
566 }
567
568 return info.num;
569 }
570
IsForeignAddr(unsigned int ip)571 bool SNetServiceXSiteAPI::IsForeignAddr(unsigned int ip)
572 {
573 if (!IsUsingXSiteProxy()) return false;
574
575 return m_LocalDomain != GetDomain(ip);
576 }
577
578 atomic<int> SNetServiceXSiteAPI::m_LocalDomain{0};
579 atomic<bool> SNetServiceXSiteAPI::m_AllowXSiteConnections{false};
580
581 #else
582
InitXSite(CSynRegistry &,const SRegSynonyms &)583 void SNetServiceXSiteAPI::InitXSite(CSynRegistry&, const SRegSynonyms&)
584 {
585 }
586
ConnectXSite(CSocket & socket,SNetServerImpl::SConnectDeadline & deadline,const SSocketAddress & original,const string &)587 void SNetServiceXSiteAPI::ConnectXSite(CSocket& socket,
588 SNetServerImpl::SConnectDeadline& deadline,
589 const SSocketAddress& original, const string&)
590 {
591 SNetServerImpl::ConnectImpl(socket, deadline, original, original);
592 }
593
594 #endif
595
Init(CSynRegistry & registry,SRegSynonyms & sections,const string & ns_client_name)596 void SNetServiceImpl::Init(CSynRegistry& registry, SRegSynonyms& sections, const string& ns_client_name)
597 {
598 // Initialize the connect library and LBSM structures
599 // used in DiscoverServersIfNeeded().
600 {
601 class CInPlaceConnIniter : protected CConnIniter
602 {
603 public:
604 void NoOp() {};
605 } conn_initer;
606 conn_initer.NoOp();
607 }
608
609 NStr::TruncateSpacesInPlace(m_ServiceName);
610
611 // TODO:
612 // Do not check for emptiness and always read values (client, service, etc) from registry
613 // after values provided in ctors get into an underlying memory registry.
614
615 // Do not override explicitly set client name
616 if (m_ClientName.empty()) m_ClientName = registry.Get(sections, { "client_name", "client" }, "");
617
618 // Use client name from NetSchedule API if it's not provided for NetCache API
619 if (m_ClientName.empty()) m_ClientName = ns_client_name;
620
621 if (m_ServiceName.empty()) {
622 m_ServiceName = registry.Get(sections, { "service", "service_name" }, "");
623
624 if (m_ServiceName.empty()) {
625 string host = registry.Get(sections, { "server", "host" }, "");
626 string port = registry.Get(sections, "port", "");
627
628 if (!host.empty() && !port.empty()) m_ServiceName = host + ":" + port;
629 }
630 }
631
632 InitXSite(registry, sections);
633
634 m_UseSmartRetries = registry.Get(sections, "smart_service_retries", true);
635
636 int max_retries = registry.Get({ sections, "netservice_api" }, "connection_max_retries", CONNECTION_MAX_RETRIES);
637 m_ConnectionMaxRetries = max_retries >= 0 ? max_retries : CONNECTION_MAX_RETRIES;
638
639 double retry_delay = registry.Get({ sections, "netservice_api" }, "retry_delay", RETRY_DELAY_DEFAULT);
640 if (retry_delay < 0) retry_delay = RETRY_DELAY_DEFAULT;
641 m_ConnectionRetryDelay = static_cast<unsigned long>(retry_delay * kMilliSecondsPerSecond);
642
643 if (m_ClientName.empty() || m_ClientName == "noname" ||
644 NStr::FindNoCase(m_ClientName, "unknown") != NPOS) {
645 CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
646 if (!app) {
647 NCBI_THROW_FMT(CArgException, eNoValue,
648 m_APIName << ": client name is not set");
649 }
650 m_ClientName = app->GetProgramDisplayName();
651 }
652
653 m_ServerPool->Init(registry, sections);
654
655 Construct();
656 }
657
Init(CSynRegistry & registry,const SRegSynonyms & sections)658 void SNetServerPoolImpl::Init(CSynRegistry& registry, const SRegSynonyms& sections)
659 {
660 const auto kConnTimeoutDefault = 2.0;
661 const auto kCommTimeoutDefault = 12.0;
662 const auto kFirstServerTimeoutDefault = 0.0;
663
664 m_LBSMAffinity.first = registry.Get(sections, "use_lbsm_affinity", "");
665
666 // Get affinity value from the local LBSM configuration file.
667 if (!m_LBSMAffinity.first.empty()) {
668 m_LBSMAffinity.second = LBSMD_GetHostParameter(SERV_LOCALHOST, m_LBSMAffinity.first.c_str());
669 }
670
671 double conn_timeout = registry.Get(sections, "connection_timeout", kConnTimeoutDefault);
672 g_CTimeoutToSTimeout(CTimeout(conn_timeout > 0 ? conn_timeout : kConnTimeoutDefault), m_ConnTimeout);
673
674 double comm_timeout = registry.Get({ sections, "netservice_api" }, "communication_timeout", kCommTimeoutDefault);
675 g_CTimeoutToSTimeout(CTimeout(comm_timeout > 0 ? comm_timeout : kCommTimeoutDefault), m_CommTimeout);
676
677 double first_srv_timeout = registry.Get(sections, "first_server_timeout", kFirstServerTimeoutDefault);
678 g_CTimeoutToSTimeout(CTimeout(first_srv_timeout > 0 ? first_srv_timeout : kFirstServerTimeoutDefault), m_FirstServerTimeout);
679
680 double max_total_time = registry.Get(sections, "max_connection_time", 0.0);
681 if (max_total_time > 0) m_MaxTotalTime = CTimeout(max_total_time);
682
683 m_ThrottleParams.Init(registry, sections);
684 }
685
AllocServerGroup(unsigned discovery_iteration)686 SDiscoveredServers* SNetServiceImpl::AllocServerGroup(
687 unsigned discovery_iteration)
688 {
689 if (m_ServerGroupPool == NULL)
690 return new SDiscoveredServers(discovery_iteration);
691 else {
692 SDiscoveredServers* server_group = m_ServerGroupPool;
693 m_ServerGroupPool = server_group->m_NextGroupInPool;
694
695 server_group->Reset(discovery_iteration);
696
697 return server_group;
698 }
699 }
700
MakeAuthString()701 string SNetServiceImpl::MakeAuthString()
702 {
703 string auth;
704 auth.reserve(256);
705
706 auth += "client=\"";
707 auth += NStr::PrintableString(m_ClientName);
708 auth += '\"';
709
710 if (!m_ServerPool->m_UseOldStyleAuth) {
711 if (m_ServiceType == eLoadBalancedService) {
712 auth += " svc=\"";
713 auth += NStr::PrintableString(m_ServiceName);
714 auth += '\"';
715 }
716
717 CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
718 if (app) {
719 auth += " client_path=\"";
720 auth += NStr::PrintableString(app->GetProgramExecutablePath());
721 auth += '\"';
722 }
723 }
724
725 return auth;
726 }
727
GetServiceName() const728 const string& CNetService::GetServiceName() const
729 {
730 return m_Impl->m_ServiceName;
731 }
732
GetServerPool()733 CNetServerPool CNetService::GetServerPool()
734 {
735 return m_Impl->m_ServerPool;
736 }
737
IsLoadBalanced() const738 bool CNetService::IsLoadBalanced() const
739 {
740 return m_Impl->IsLoadBalanced();
741 }
742
StickToServer(SSocketAddress address)743 void CNetServerPool::StickToServer(SSocketAddress address)
744 {
745 m_Impl->m_EnforcedServer = move(address);
746 }
747
PrintCmdOutput(const string & cmd,CNcbiOstream & output_stream,CNetService::ECmdOutputStyle output_style,CNetService::EIterationMode iteration_mode)748 void CNetService::PrintCmdOutput(const string& cmd,
749 CNcbiOstream& output_stream, CNetService::ECmdOutputStyle output_style,
750 CNetService::EIterationMode iteration_mode)
751 {
752 bool load_balanced = IsLoadBalanced() ?
753 output_style != eMultilineOutput_NoHeaders : false;
754
755 for (CNetServiceIterator it = Iterate(iteration_mode); it; ++it) {
756 if (load_balanced)
757 output_stream << '[' << (*it).GetServerAddress() << ']' << endl;
758
759 switch (output_style) {
760 case eSingleLineOutput:
761 output_stream << (*it).ExecWithRetry(cmd, false).response << endl;
762 break;
763
764 case eUrlEncodedOutput:
765 {
766 CUrlArgs url_parser((*it).ExecWithRetry(cmd, false).response);
767
768 ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
769 output_stream << field->name <<
770 ": " << field->value << endl;
771 }
772 }
773 break;
774
775 default:
776 {
777 CNetServerMultilineCmdOutput output(
778 (*it).ExecWithRetry(cmd, true));
779
780 if (output_style == eMultilineOutput_NetCacheStyle)
781 output->SetNetCacheCompatMode();
782
783 string line;
784
785 while (output.ReadLine(line))
786 output_stream << line << endl;
787 }
788 }
789
790 if (load_balanced)
791 output_stream << endl;
792 }
793 }
794
FindOrCreateServerImpl(SSocketAddress server_address)795 SNetServerInPool* SNetServerPoolImpl::FindOrCreateServerImpl(
796 SSocketAddress server_address)
797 {
798 pair<TNetServerByAddress::iterator, bool> loc(m_Servers.insert(
799 TNetServerByAddress::value_type(server_address,
800 (SNetServerInPool*) NULL)));
801
802 if (!loc.second)
803 return loc.first->second;
804
805 auto* server = new SNetServerInPool(move(server_address), m_PropCreator(), m_ThrottleParams);
806
807 loc.first->second = server;
808
809 return server;
810 }
811
ReturnServer(SNetServerInPool * server_impl)812 CRef<SNetServerInPool> SNetServerPoolImpl::ReturnServer(
813 SNetServerInPool* server_impl)
814 {
815 CFastMutexGuard server_mutex_lock(m_ServerMutex);
816
817 server_impl->m_ServerPool = this;
818 return CRef<SNetServerInPool>(server_impl);
819 }
820
GetServer(SNetServiceImpl * service,SSocketAddress server_address)821 CNetServer SNetServerPoolImpl::GetServer(SNetServiceImpl* service, SSocketAddress server_address)
822 {
823 CFastMutexGuard server_mutex_lock(m_ServerMutex);
824
825 auto* server = FindOrCreateServerImpl(m_EnforcedServer.host == 0 ? move(server_address) : m_EnforcedServer);
826 server->m_ServerPool = this;
827
828 return new SNetServerImpl(service, server);
829 }
830
GetServer(SSocketAddress server_address)831 CNetServer SNetServiceImpl::GetServer(SSocketAddress server_address)
832 {
833 m_RebalanceStrategy.OnResourceRequested();
834 return m_ServerPool->GetServer(this, move(server_address));
835 }
836
GetServer(const string & host,unsigned short port)837 CNetServer CNetService::GetServer(const string& host,
838 unsigned short port)
839 {
840 return m_Impl->GetServer(SSocketAddress(host, port));
841 }
842
GetServer(unsigned host,unsigned short port)843 CNetServer CNetService::GetServer(unsigned host, unsigned short port)
844 {
845 return m_Impl->GetServer(SSocketAddress(host, port));
846 }
847
848 class SRandomServiceTraversal : public IServiceTraversal
849 {
850 public:
SRandomServiceTraversal(CNetService::TInstance service)851 SRandomServiceTraversal(CNetService::TInstance service) :
852 m_Service(service)
853 {
854 }
855
856 virtual CNetServer BeginIteration();
857 virtual CNetServer NextServer();
858
859 private:
860 CNetService m_Service;
861 CNetServiceIterator m_Iterator;
862 };
863
BeginIteration()864 CNetServer SRandomServiceTraversal::BeginIteration()
865 {
866 return *(m_Iterator = m_Service.Iterate(CNetService::eRandomize));
867 }
868
NextServer()869 CNetServer SRandomServiceTraversal::NextServer()
870 {
871 return ++m_Iterator ? *m_Iterator : CNetServer();
872 }
873
FindServerAndExec(const string & cmd,bool multiline_output)874 CNetServer::SExecResult SNetServiceImpl::FindServerAndExec(const string& cmd,
875 bool multiline_output)
876 {
877 switch (m_ServiceType) {
878 default: // eServiceNotDefined
879 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
880 m_APIName << ": service name is not set");
881
882 case eLoadBalancedService:
883 {
884 CNetServer::SExecResult exec_result;
885
886 SRandomServiceTraversal random_traversal(this);
887
888 IterateUntilExecOK(cmd, multiline_output,
889 exec_result, &random_traversal,
890 SNetServiceImpl::eIgnoreServerErrors);
891
892 return exec_result;
893 }
894
895 case eSingleServerService:
896 {
897 CNetServer server(new SNetServerImpl(this,
898 m_ServerPool->ReturnServer(
899 m_DiscoveredServers->m_Servers.front().first)));
900
901 return server.ExecWithRetry(cmd, multiline_output);
902 }
903 }
904 }
905
FindServerAndExec(const string & cmd,bool multiline_output)906 CNetServer::SExecResult CNetService::FindServerAndExec(const string& cmd,
907 bool multiline_output)
908 {
909 return m_Impl->FindServerAndExec(cmd, multiline_output);
910 }
911
ExecOnAllServers(const string & cmd)912 void CNetService::ExecOnAllServers(const string& cmd)
913 {
914 for (CNetServiceIterator it = Iterate(eIncludePenalized); it; ++it)
915 (*it).ExecWithRetry(cmd, false);
916 }
917
DiscoverServersIfNeeded()918 void SNetServiceImpl::DiscoverServersIfNeeded()
919 {
920 if (m_ServiceType == eServiceNotDefined) {
921 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
922 m_APIName << ": service name is not set");
923 }
924
925 if (m_ServiceType == eLoadBalancedService) {
926 // The service is load-balanced, check if rebalancing is required.
927 m_RebalanceStrategy.OnResourceRequested();
928 if (m_RebalanceStrategy.NeedRebalance())
929 ++m_LatestDiscoveryIteration;
930
931 if (m_DiscoveredServers == NULL ||
932 m_DiscoveredServers->m_DiscoveryIteration !=
933 m_LatestDiscoveryIteration) {
934 // The requested server group either does not exist or
935 // does not contain up-to-date server list, thus it needs
936 // to be created anew.
937
938 const TSERV_Type types = fSERV_Standalone | fSERV_IncludeStandby |
939 fSERV_IncludeReserved | fSERV_IncludeSuppressed;
940
941 auto discovered = CServiceDiscovery::DiscoverImpl(m_ServiceName, types, m_NetInfo, m_ServerPool->m_LBSMAffinity,
942 TServConn_MaxFineLBNameRetries::GetDefault(), m_ConnectionRetryDelay);
943
944 SDiscoveredServers* server_group = m_DiscoveredServers;
945
946 if (server_group != NULL && !server_group->m_Service)
947 server_group->Reset(m_LatestDiscoveryIteration);
948 else
949 // Either the group does not exist or it has been
950 // "issued" to the outside callers; allocate a new one.
951 server_group = m_DiscoveredServers =
952 AllocServerGroup(m_LatestDiscoveryIteration);
953
954 CFastMutexGuard server_mutex_lock(m_ServerPool->m_ServerMutex);
955
956 TNetServerList& servers = server_group->m_Servers;
957 TNetServerList::size_type number_of_regular_servers = 0;
958 TNetServerList::size_type number_of_standby_servers = 0;
959 double max_standby_rate = LBSMD_PENALIZED_RATE_BOUNDARY;
960
961 // Fill the 'servers' array in accordance with the layout
962 // described above the SDiscoveredServers::m_Servers declaration.
963 for (const auto& d : discovered) {
964 SNetServerInPool* server = m_ServerPool->FindOrCreateServerImpl(d.first);
965 server->m_ThrottleStats.Discover();
966
967 TServerRate server_rate(server, d.second);
968
969 if (d.second > 0)
970 servers.insert(servers.begin() +
971 number_of_regular_servers++, server_rate);
972 else if (d.second < max_standby_rate ||
973 d.second <= LBSMD_PENALIZED_RATE_BOUNDARY)
974 servers.push_back(server_rate);
975 else {
976 servers.insert(servers.begin() +
977 number_of_regular_servers, server_rate);
978 if (d.second == max_standby_rate)
979 ++number_of_standby_servers;
980 else {
981 max_standby_rate = d.second;
982 number_of_standby_servers = 1;
983 }
984 }
985 }
986
987 server_group->m_SuppressedBegin = servers.begin() +
988 (number_of_regular_servers > 0 ?
989 number_of_regular_servers : number_of_standby_servers);
990
991 server_mutex_lock.Release();
992 }
993 }
994 }
995
GetDiscoveredServers(CRef<SDiscoveredServers> & discovered_servers)996 void SNetServiceImpl::GetDiscoveredServers(
997 CRef<SDiscoveredServers>& discovered_servers)
998 {
999 CFastMutexGuard discovery_mutex_lock(m_DiscoveryMutex);
1000 DiscoverServersIfNeeded();
1001 discovered_servers = m_DiscoveredServers;
1002 discovered_servers->m_Service = this;
1003 }
1004
IsInService(CNetServer::TInstance server)1005 bool SNetServiceImpl::IsInService(CNetServer::TInstance server)
1006 {
1007 CRef<SDiscoveredServers> servers;
1008 GetDiscoveredServers(servers);
1009
1010 // Find the requested server among the discovered servers.
1011 ITERATE(TNetServerList, it, servers->m_Servers) {
1012 if (it->first == server->m_ServerInPool)
1013 return true;
1014 }
1015
1016 return false;
1017 }
1018
1019 struct SFailOnlyWarnings : deque<pair<string, CNetServer>>
1020 {
SFailOnlyWarningsSFailOnlyWarnings1021 SFailOnlyWarnings(CRef<INetServerConnectionListener> listener) : m_Listener(listener) {}
~SFailOnlyWarningsSFailOnlyWarnings1022 ~SFailOnlyWarnings() { IssueAndClear(); }
1023
IssueAndClearSFailOnlyWarnings1024 void IssueAndClear()
1025 {
1026 for (auto& w : *this) {
1027 m_Listener->OnWarning(w.first, w.second);
1028 }
1029
1030 clear();
1031 }
1032
1033 private:
1034 CRef<INetServerConnectionListener> m_Listener;
1035 };
1036
IterateUntilExecOK(const string & cmd,bool multiline_output,CNetServer::SExecResult & exec_result,IServiceTraversal * service_traversal,SNetServiceImpl::EServerErrorHandling error_handling)1037 void SNetServiceImpl::IterateUntilExecOK(const string& cmd,
1038 bool multiline_output,
1039 CNetServer::SExecResult& exec_result,
1040 IServiceTraversal* service_traversal,
1041 SNetServiceImpl::EServerErrorHandling error_handling)
1042 {
1043 int retry_count = m_ConnectionMaxRetries;
1044
1045 const unsigned long retry_delay = m_ConnectionRetryDelay;
1046
1047 const CTimeout& max_total_time = m_ServerPool->m_MaxTotalTime;
1048 CDeadline deadline(max_total_time);
1049
1050 enum EIterationMode {
1051 eInitialIteration,
1052 eRetry
1053 } iteration_mode = eInitialIteration;
1054 CNetServer server = service_traversal->BeginIteration();
1055
1056 vector<CNetServer> servers_to_retry;
1057 unsigned current_server = 0;
1058
1059 bool skip_server;
1060
1061 unsigned number_of_servers = 0;
1062 unsigned ns_with_submits_disabled = 0;
1063 unsigned servers_throttled = 0;
1064 bool blob_not_found = false;
1065
1066 const auto& fst = m_ServerPool->m_FirstServerTimeout;
1067 const bool use_fst = (fst.sec || fst.usec) && (retry_count > 0 || m_UseSmartRetries);
1068 const STimeout* timeout = use_fst ? &fst : nullptr;
1069
1070 SFailOnlyWarnings fail_only_warnings(m_Listener);
1071
1072 for (;;) {
1073 skip_server = false;
1074
1075 try {
1076 server->ConnectAndExec(cmd, multiline_output, exec_result,
1077 timeout);
1078 fail_only_warnings.clear();
1079 return;
1080 }
1081 catch (CNetCacheBlobTooOldException& /*ex rethrown*/) {
1082 throw;
1083 }
1084 catch (CNetCacheException& ex) {
1085 if (retry_count <= 0 && !m_UseSmartRetries)
1086 throw;
1087 if (error_handling == eRethrowAllServerErrors)
1088 throw;
1089 if (ex.GetErrCode() == CNetCacheException::eBlobNotFound) {
1090 blob_not_found = true;
1091 skip_server = true;
1092 } else if (error_handling == eRethrowServerErrors)
1093 throw;
1094 else
1095 m_Listener->OnWarning(ex.GetMsg(), server);
1096 }
1097 catch (CNetScheduleException& ex) {
1098 if (retry_count <= 0 && !m_UseSmartRetries)
1099 throw;
1100 if (error_handling == eRethrowAllServerErrors)
1101 throw;
1102 if (ex.GetErrCode() == CNetScheduleException::eSubmitsDisabled) {
1103 ++ns_with_submits_disabled;
1104 skip_server = true;
1105 fail_only_warnings.emplace_back(ex.GetMsg(), server);
1106 } else if (error_handling == eRethrowServerErrors)
1107 throw;
1108 else
1109 m_Listener->OnWarning(ex.GetMsg(), server);
1110 }
1111 catch (CNetSrvConnException& ex) {
1112 if (retry_count <= 0 && !m_UseSmartRetries)
1113 throw;
1114 switch (ex.GetErrCode()) {
1115 case CNetSrvConnException::eReadTimeout:
1116 case CNetSrvConnException::eConnectionFailure:
1117 m_Listener->OnWarning(ex.GetMsg(), server);
1118 break;
1119
1120 case CNetSrvConnException::eServerThrottle:
1121 ++servers_throttled;
1122 fail_only_warnings.emplace_back(ex.GetMsg(), server);
1123 break;
1124
1125 default:
1126 throw;
1127 }
1128 }
1129
1130 ++number_of_servers;
1131
1132 if (iteration_mode == eInitialIteration) {
1133 if (!skip_server)
1134 servers_to_retry.push_back(server);
1135 server = service_traversal->NextServer();
1136 } else {
1137 if (!skip_server)
1138 ++current_server;
1139 else
1140 servers_to_retry.erase(servers_to_retry.begin() +
1141 current_server);
1142
1143 if (current_server < servers_to_retry.size())
1144 server = servers_to_retry[current_server];
1145 else
1146 server = NULL;
1147 }
1148
1149 if (!blob_not_found && !deadline.IsInfinite() &&
1150 deadline.GetRemainingTime().GetAsMilliSeconds() <= (server ? 0 : retry_delay)) {
1151 NCBI_THROW_FMT(CNetSrvConnException, eReadTimeout, "Exceeded max_connection_time=" <<
1152 max_total_time.GetAsMilliSeconds() << "; cmd=[" << cmd << "]");
1153 }
1154
1155 if (!server) {
1156 if (number_of_servers == ns_with_submits_disabled) {
1157 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1158 "Cannot execute [" << cmd <<
1159 "]: all NetSchedule servers are "
1160 "in REFUSESUBMITS mode for the " + m_ServiceName + " service.");
1161 }
1162
1163 if (number_of_servers == servers_throttled) {
1164 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1165 "Cannot execute [" << cmd <<
1166 "]: all servers are throttled for the " + m_ServiceName + " service.");
1167 }
1168
1169 if (retry_count <= 0 || servers_to_retry.empty()) {
1170 if (blob_not_found) {
1171 NCBI_THROW_FMT(CNetCacheException, eBlobNotFound,
1172 "Cannot execute [" << cmd << "]: blob not found.");
1173 }
1174 NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1175 "Unable to execute [" << cmd <<
1176 "] on any of the discovered servers for the " + m_ServiceName + " service.");
1177 }
1178
1179 fail_only_warnings.IssueAndClear();
1180 ERR_POST(Warning << "Unable to send [" << cmd << "] to any "
1181 "of the discovered servers; will retry after delay.");
1182
1183 SleepMilliSec(retry_delay);
1184
1185 number_of_servers = 0;
1186 ns_with_submits_disabled = 0;
1187 servers_throttled = 0;
1188
1189 iteration_mode = eRetry;
1190 server = servers_to_retry[current_server = 0];
1191 }
1192
1193 --retry_count;
1194
1195 timeout = NULL;
1196 }
1197 }
1198
CreateRetryGuard(SRetry::EType type)1199 shared_ptr<void> SNetServiceImpl::CreateRetryGuard(SRetry::EType type)
1200 {
1201 switch (type)
1202 {
1203 case SRetry::eNoRetryNoErrors: return make_shared<SNoRetryNoErrors>(this);
1204 case SRetry::eNoRetry: return make_shared<SNoRetry>(this);
1205 default: return nullptr;
1206 }
1207 }
1208
ResetServerConnections()1209 void SNetServerPoolImpl::ResetServerConnections()
1210 {
1211 CFastMutexGuard server_mutex_lock(m_ServerMutex);
1212
1213 NON_CONST_ITERATE(TNetServerByAddress, it, m_Servers) {
1214 it->second->m_CurrentConnectionGeneration.Add(1);
1215 }
1216 }
1217
~SNetServerPoolImpl()1218 SNetServerPoolImpl::~SNetServerPoolImpl()
1219 {
1220 // Clean up m_Servers
1221 NON_CONST_ITERATE(TNetServerByAddress, it, m_Servers) {
1222 delete it->second;
1223 }
1224
1225 if (m_LBSMAffinity.second) free(const_cast<char*>(m_LBSMAffinity.second));
1226 }
1227
~SNetServiceImpl()1228 SNetServiceImpl::~SNetServiceImpl()
1229 {
1230 delete m_DiscoveredServers;
1231
1232 // Clean up m_ServerGroupPool
1233 SDiscoveredServers* server_group = m_ServerGroupPool;
1234 while (server_group != NULL) {
1235 SDiscoveredServers* next_group = server_group->m_NextGroupInPool;
1236 delete server_group;
1237 server_group = next_group;
1238 }
1239 }
1240
SetCommunicationTimeout(const STimeout & to)1241 void CNetServerPool::SetCommunicationTimeout(const STimeout& to)
1242 {
1243 m_Impl->m_CommTimeout = to;
1244 }
GetCommunicationTimeout() const1245 const STimeout& CNetServerPool::GetCommunicationTimeout() const
1246 {
1247 return m_Impl->m_CommTimeout;
1248 }
1249
Iterate(CNetService::EIterationMode mode)1250 CNetServiceIterator CNetService::Iterate(CNetService::EIterationMode mode)
1251 {
1252 CRef<SDiscoveredServers> servers;
1253 m_Impl->GetDiscoveredServers(servers);
1254
1255 if (mode != eIncludePenalized) {
1256 if (servers->m_Servers.begin() < servers->m_SuppressedBegin) {
1257 if (mode == eSortByLoad)
1258 return new SNetServiceIterator_OmitPenalized(servers);
1259 else if (mode == eRoundRobin) {
1260 auto begin = servers->m_Servers.begin() + m_Impl->m_RoundRobin++ % servers->m_Servers.size();
1261 return new SNetServiceIterator_Circular(servers, begin);
1262 } else
1263 return new SNetServiceIterator_RandomPivot(servers);
1264 }
1265 } else
1266 if (!servers->m_Servers.empty())
1267 return new SNetServiceIteratorImpl(servers);
1268
1269 NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1270 "Couldn't find any available servers for the " +
1271 m_Impl->m_ServiceName + " service.");
1272 }
1273
Iterate(CNetServer::TInstance priority_server)1274 CNetServiceIterator CNetService::Iterate(CNetServer::TInstance priority_server)
1275 {
1276 return m_Impl->Iterate(priority_server);
1277 }
1278
Iterate(CNetServer::TInstance priority_server)1279 SNetServiceIteratorImpl* SNetServiceImpl::Iterate(CNetServer::TInstance priority_server)
1280 {
1281 CRef<SDiscoveredServers> servers;
1282 GetDiscoveredServers(servers);
1283
1284 // Find the requested server among the discovered servers.
1285 ITERATE(TNetServerList, it, servers->m_Servers) {
1286 if (it->first == priority_server->m_ServerInPool)
1287 return new SNetServiceIterator_Circular(servers, it);
1288 }
1289
1290 if (!servers->m_Servers.empty())
1291 // The requested server is not found in this service,
1292 // however there are servers, so return them.
1293 return new SNetServiceIteratorImpl(servers);
1294
1295 NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1296 "Couldn't find any available servers for the " +
1297 m_ServiceName + " service.");
1298 }
1299
IterateByWeight(const string & key)1300 CNetServiceIterator CNetService::IterateByWeight(const string& key)
1301 {
1302 CRef<SDiscoveredServers> servers;
1303 m_Impl->GetDiscoveredServers(servers);
1304
1305 if (servers->m_Servers.begin() == servers->m_SuppressedBegin) {
1306 NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1307 "Couldn't find any available servers for the " +
1308 m_Impl->m_ServiceName + " service.");
1309 }
1310
1311 CChecksum key_crc32(CChecksum::eCRC32);
1312
1313 key_crc32.AddChars(key.data(), key.length());
1314
1315 return new SNetServiceIterator_Weighted(servers, key_crc32.GetChecksum());
1316 }
1317
ExcludeServer(CNetServer::TInstance server)1318 CNetServiceIterator CNetService::ExcludeServer(CNetServer::TInstance server)
1319 {
1320 CRef<SDiscoveredServers> servers;
1321 m_Impl->GetDiscoveredServers(servers);
1322
1323 // Find the requested server among the discovered servers.
1324 ITERATE(TNetServerList, it, servers->m_Servers) {
1325 if (it->first == server->m_ServerInPool) {
1326 // The server is found. Make an iterator and
1327 // skip to the next server (the iterator may become NULL).
1328 CNetServiceIterator circular_iter(
1329 new SNetServiceIterator_Circular(servers, it));
1330 return ++circular_iter;
1331 }
1332 }
1333
1334 // The requested server is not found in this service, so
1335 // return the rest of servers or NULL if there are none.
1336 return !servers->m_Servers.empty() ?
1337 new SNetServiceIteratorImpl(servers) : NULL;
1338 }
1339
FindServer(INetServerFinder * finder,CNetService::EIterationMode mode)1340 CNetServiceIterator CNetService::FindServer(INetServerFinder* finder,
1341 CNetService::EIterationMode mode)
1342 {
1343 string error_messages;
1344
1345 CNetServiceIterator it = Iterate(mode);
1346
1347 for (; it; ++it) {
1348 try {
1349 if (finder->Consider(*it))
1350 break;
1351 }
1352 catch (CNetServiceException& ex) {
1353 if (ex.GetErrCode() != CNetServiceException::eCommunicationError)
1354 throw;
1355
1356 if (!error_messages.empty())
1357 error_messages += '\n';
1358
1359 error_messages += (*it)->m_ServerInPool->m_Address.AsString();
1360 error_messages += ": ";
1361 error_messages += ex.what();
1362 }
1363 catch (CIO_Exception& ex) {
1364 if (!error_messages.empty())
1365 error_messages += '\n';
1366
1367 error_messages += (*it)->m_ServerInPool->m_Address.AsString();
1368 error_messages += ": ";
1369 error_messages += ex.what();
1370 }
1371 }
1372
1373 if (!error_messages.empty()) {
1374 LOG_POST(error_messages);
1375 }
1376
1377 return it;
1378 }
1379
Clone(const string & name)1380 CNetService CNetService::Clone(const string& name)
1381 {
1382 _ASSERT(m_Impl);
1383 return name == m_Impl->m_ServiceName ? m_Impl :
1384 SNetServiceImpl::Clone(name, m_Impl);
1385 }
1386
SetErrorHandler(TEventHandler error_handler)1387 void CNetService::SetErrorHandler(TEventHandler error_handler)
1388 {
1389 m_Impl->m_Listener->SetErrorHandler(error_handler);
1390 }
1391
SetWarningHandler(TEventHandler warning_handler)1392 void CNetService::SetWarningHandler(TEventHandler warning_handler)
1393 {
1394 m_Impl->m_Listener->SetWarningHandler(warning_handler);
1395 }
1396
GetServiceByName(const string & service_name,SNetServiceImpl * prototype)1397 CNetService SNetServiceMap::GetServiceByName(const string& service_name,
1398 SNetServiceImpl* prototype)
1399 {
1400 CFastMutexGuard guard(m_ServiceMapMutex);
1401 return GetServiceByNameImpl(service_name, prototype);
1402 }
1403
GetServiceByNameImpl(const string & service_name,SNetServiceImpl * prototype)1404 CNetService SNetServiceMap::GetServiceByNameImpl(const string& service_name,
1405 SNetServiceImpl* prototype)
1406 {
1407 pair<TNetServiceByName::iterator, bool> loc(m_ServiceByName.insert(
1408 TNetServiceByName::value_type(service_name, CNetService())));
1409
1410 return !loc.second ? loc.first->second :
1411 (loc.first->second =
1412 SNetServiceImpl::Clone(service_name, prototype));
1413 }
1414
IsAllowed(const string & service_name) const1415 bool SNetServiceMap::IsAllowed(const string& service_name) const
1416 {
1417 // Not restricted or found
1418 return !m_Restricted || m_Allowed.count(service_name);
1419 }
1420
IsAllowed(CNetServer::TInstance server,SNetServiceImpl * prototype)1421 bool SNetServiceMap::IsAllowed(CNetServer::TInstance server,
1422 SNetServiceImpl* prototype)
1423 {
1424 if (!m_Restricted) return true;
1425
1426 CFastMutexGuard guard(m_ServiceMapMutex);
1427
1428 // Check if server belongs to some allowed service
1429 for (auto& service_name: m_Allowed) {
1430 CNetService service(GetServiceByNameImpl(service_name, prototype));
1431
1432 if (service->IsInService(server)) return true;
1433 }
1434
1435 return false;
1436 }
1437
AddToAllowed(const string & service_name)1438 void SNetServiceMap::AddToAllowed(const string& service_name)
1439 {
1440 m_Allowed.insert(service_name);
1441 }
1442
g_ExecToJson(IExecToJson & exec_to_json,CNetService service,CNetService::EIterationMode iteration_mode)1443 CJsonNode g_ExecToJson(IExecToJson& exec_to_json, CNetService service,
1444 CNetService::EIterationMode iteration_mode)
1445 {
1446 if (!service.IsLoadBalanced())
1447 return exec_to_json.ExecOn(service.Iterate().GetServer());
1448
1449 CJsonNode result(CJsonNode::NewObjectNode());
1450
1451 for (CNetServiceIterator it = service.Iterate(iteration_mode); it; ++it)
1452 result.SetByKey((*it).GetServerAddress(), exec_to_json.ExecOn(*it));
1453
1454 return result;
1455 }
1456
Create(const string & api_name,const string & service_name,const string & client_name)1457 CNetService CNetService::Create(const string& api_name, const string& service_name, const string& client_name)
1458 {
1459 struct SNoOpConnectionListener : public INetServerConnectionListener
1460 {
1461 INetServerConnectionListener* Clone() override { return new SNoOpConnectionListener(*this); }
1462 void OnConnected(CNetServerConnection&) override {}
1463
1464 private:
1465 void OnErrorImpl(const string&, CNetServer&) override {}
1466 void OnWarningImpl(const string&, CNetServer&) override {}
1467 };
1468
1469 CSynRegistryBuilder registry_builder;
1470 SRegSynonyms sections(api_name);
1471
1472 return SNetServiceImpl::Create(api_name, service_name, client_name, new SNoOpConnectionListener,
1473 registry_builder, sections);
1474 }
1475
g_AppendClientIPSessionIDHitID(string & cmd)1476 void g_AppendClientIPSessionIDHitID(string& cmd)
1477 {
1478 CRequestContext& req = CDiagContext::GetRequestContext();
1479 g_AppendClientIPAndSessionID(cmd, req);
1480 g_AppendHitID(cmd, req);
1481 }
1482
1483 END_NCBI_SCOPE
1484