1 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__PSG_CLIENT_TRANSPORT__HPP 2 #define OBJTOOLS__PUBSEQ_GATEWAY__PSG_CLIENT_TRANSPORT__HPP 3 4 /* $Id: psg_client_transport.hpp 628921 2021-04-07 18:46:41Z ivanov $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Authors: Dmitri Dmitrienko, Rafael Sadyrov 30 * 31 */ 32 33 #include <objtools/pubseq_gateway/client/impl/misc.hpp> 34 #include <objtools/pubseq_gateway/client/psg_client.hpp> 35 36 #ifdef HAVE_PSG_CLIENT 37 38 #define __STDC_FORMAT_MACROS 39 40 #include <array> 41 #include <memory> 42 #include <string> 43 #include <cassert> 44 #include <ostream> 45 #include <iostream> 46 #include <atomic> 47 #include <functional> 48 #include <limits> 49 #include <unordered_map> 50 #include <cstdint> 51 #include <set> 52 #include <thread> 53 #include <utility> 54 #include <condition_variable> 55 #include <forward_list> 56 #include <chrono> 57 #include <sstream> 58 #include <random> 59 #include <type_traits> 60 61 #include "mpmc_nw.hpp" 62 #include <connect/impl/ncbi_uv_nghttp2.hpp> 63 #include <connect/services/netservice_api.hpp> 64 #include <corelib/ncbi_param.hpp> 65 #include <corelib/ncbi_url.hpp> 66 67 68 BEGIN_NCBI_SCOPE 69 70 // Different TRACE macros allow turning off tracing in some classes 71 #define PSG_THROTTLING_TRACE(message) _TRACE(message) 72 #define PSG_IO_SESSION_TRACE(message) _TRACE(message) 73 #define PSG_IO_TRACE(message) _TRACE(message) 74 #define PSG_DISCOVERY_TRACE(message) _TRACE(message) 75 76 struct SPSG_Args : CUrlArgs 77 { 78 using CUrlArgs::CUrlArgs; 79 using CUrlArgs::operator=; 80 GetValueSPSG_Args81 const string& GetValue(const string& name) const 82 { 83 bool not_used; 84 return CUrlArgs::GetValue(name, ¬_used); 85 } 86 }; 87 88 template <typename TValue> 89 struct SPSG_Nullable : protected CNullable<TValue> 90 { 91 template <template<typename> class TCmp> CmpSPSG_Nullable92 bool Cmp(TValue s) const { return !CNullable<TValue>::IsNull() && TCmp<TValue>()(*this, s); } 93 94 using CNullable<TValue>::operator TValue; 95 using CNullable<TValue>::operator=; 96 }; 97 98 struct SPSG_Chunk : string 99 { 100 using string::string; 101 102 SPSG_Chunk() = default; 103 104 SPSG_Chunk(SPSG_Chunk&&) = default; 105 SPSG_Chunk& operator=(SPSG_Chunk&&) = default; 106 107 SPSG_Chunk(const SPSG_Chunk&) = delete; 108 SPSG_Chunk& operator=(const SPSG_Chunk&) = delete; 109 }; 110 111 struct SPSG_Params 112 { 113 TPSG_DebugPrintout debug_printout; 114 TPSG_RequestsPerIo requests_per_io; 115 TPSG_RequestRetries request_retries; 116 TPSG_PsgClientMode client_mode; 117 SPSG_ParamsSPSG_Params118 SPSG_Params() : 119 debug_printout(TPSG_DebugPrintout::eGetDefault), 120 requests_per_io(TPSG_RequestsPerIo::eGetDefault), 121 request_retries(TPSG_RequestRetries::eGetDefault), 122 client_mode(TPSG_PsgClientMode::eGetDefault) 123 {} 124 }; 125 126 struct SDebugPrintout 127 { 128 const string id; 129 SDebugPrintoutSDebugPrintout130 SDebugPrintout(string i, const SPSG_Params& params) : 131 id(move(i)), 132 m_Params(params) 133 { 134 if (IsPerf()) m_Events.reserve(20); 135 } 136 137 ~SDebugPrintout(); 138 139 template <class TArg, class ...TRest> 140 struct SPack; 141 142 template <class TArg> operator <<SDebugPrintout143 SPack<TArg> operator<<(const TArg& arg) { return { this, &arg }; } 144 145 private: 146 template <class ...TArgs> ProcessSDebugPrintout147 void Process(TArgs&&... args) 148 { 149 if (IsPerf()) return Event(forward<TArgs>(args)...); 150 151 if (m_Params.debug_printout == EPSG_DebugPrintout::eNone) return; 152 153 Print(forward<TArgs>(args)...); 154 } 155 156 enum EType { eSend = 1000, eReceive, eClose, eRetry, eFail }; 157 EventSDebugPrintout158 void Event(SSocketAddress, const string&) { Event(eSend); } EventSDebugPrintout159 void Event(const SPSG_Args&, const SPSG_Chunk&) { Event(eReceive); } EventSDebugPrintout160 void Event(uint32_t) { Event(eClose); } EventSDebugPrintout161 void Event(unsigned, const SUvNgHttp2_Error&) { Event(eRetry); } EventSDebugPrintout162 void Event(const SUvNgHttp2_Error&) { Event(eFail); } 163 EventSDebugPrintout164 void Event(EType type) 165 { 166 auto ms = chrono::duration<double, milli>(chrono::steady_clock::now().time_since_epoch()).count(); 167 auto thread_id = this_thread::get_id(); 168 m_Events.emplace_back(ms, type, thread_id); 169 } 170 171 void Print(SSocketAddress address, const string& path); 172 void Print(const SPSG_Args& args, const SPSG_Chunk& chunk); 173 void Print(uint32_t error_code); 174 void Print(unsigned retries, const SUvNgHttp2_Error& error); 175 void Print(const SUvNgHttp2_Error& error); 176 IsPerfSDebugPrintout177 bool IsPerf() const 178 { 179 return (m_Params.client_mode == EPSG_PsgClientMode::ePerformance) || 180 (m_Params.client_mode == EPSG_PsgClientMode::eIo); 181 } 182 183 SPSG_Params m_Params; 184 vector<tuple<double, EType, thread::id>> m_Events; 185 }; 186 187 template <class TArg, class ...TRest> 188 struct SDebugPrintout::SPack : SPack<TRest...> 189 { SPackSDebugPrintout::SPack190 SPack(SPack<TRest...>&& base, const TArg* arg) : 191 SPack<TRest...>(move(base)), 192 m_Arg(arg) 193 {} 194 195 template <class TNextArg> operator <<SDebugPrintout::SPack196 SPack<TNextArg, TArg, TRest...> operator<<(const TNextArg& next_arg) 197 { 198 return { move(*this), &next_arg }; 199 } 200 operator <<SDebugPrintout::SPack201 void operator<<(ostream& (*)(ostream&)) { Process(); } 202 203 protected: 204 template <class ...TArgs> ProcessSDebugPrintout::SPack205 void Process(TArgs&&... args) 206 { 207 SPack<TRest...>::Process(*m_Arg, forward<TArgs>(args)...); 208 } 209 210 private: 211 const TArg* m_Arg; 212 }; 213 214 template <class TArg> 215 struct SDebugPrintout::SPack<TArg> 216 { SPackSDebugPrintout::SPack217 SPack(SDebugPrintout* debug_printout, const TArg* arg) : 218 m_DebugPrintout(debug_printout), 219 m_Arg(arg) 220 {} 221 222 template <class TNextArg> operator <<SDebugPrintout::SPack223 SPack<TNextArg, TArg> operator<<(const TNextArg& next_arg) 224 { 225 return { move(*this), &next_arg }; 226 } 227 operator <<SDebugPrintout::SPack228 void operator<<(ostream& (*)(ostream&)) { Process(); } 229 230 protected: 231 template <class ...TArgs> ProcessSDebugPrintout::SPack232 void Process(TArgs&&... args) 233 { 234 m_DebugPrintout->Process(*m_Arg, forward<TArgs>(args)...); 235 } 236 237 private: 238 SDebugPrintout* m_DebugPrintout; 239 const TArg* m_Arg; 240 }; 241 242 struct SPSG_Reply 243 { 244 struct SState 245 { 246 enum EState { 247 eInProgress, 248 eSuccess, 249 eNotFound, 250 eError, 251 }; 252 253 SPSG_CV<0> change; 254 SStateSPSG_Reply::SState255 SState() : m_State(eInProgress), m_Returned(false), m_Empty(true) {} 256 GetStateSPSG_Reply::SState257 const volatile atomic<EState>& GetState() const volatile { return m_State; } 258 string GetError(); 259 InProgressSPSG_Reply::SState260 bool InProgress() const volatile { return m_State == eInProgress; } ReturnedSPSG_Reply::SState261 bool Returned() const volatile { return m_Returned; } EmptySPSG_Reply::SState262 bool Empty() const volatile { return m_Empty; } 263 SetStateSPSG_Reply::SState264 void SetState(EState state) volatile 265 { 266 auto expected = eInProgress; 267 268 if (m_State.compare_exchange_strong(expected, state)) { 269 change.NotifyOne(); 270 } 271 } 272 273 void AddError(string message, EState new_state = eError); SetReturnedSPSG_Reply::SState274 void SetReturned() volatile { bool expected = false; m_Returned.compare_exchange_strong(expected, true); } SetNotEmptySPSG_Reply::SState275 void SetNotEmpty() volatile { bool expected = true; m_Empty.compare_exchange_strong(expected, false); } 276 277 private: 278 atomic<EState> m_State; 279 atomic_bool m_Returned; 280 atomic_bool m_Empty; 281 vector<string> m_Messages; 282 }; 283 284 struct SItem 285 { 286 using TTS = SPSG_CV<0, SThreadSafe<SItem>>; 287 288 vector<SPSG_Chunk> chunks; 289 SPSG_Args args; 290 SPSG_Nullable<size_t> expected; 291 size_t received = 0; 292 SState state; 293 294 void SetSuccess(); 295 }; 296 297 SThreadSafe<list<SItem::TTS>> items; 298 SItem::TTS reply_item; 299 SDebugPrintout debug_printout; 300 SPSG_ReplySPSG_Reply301 SPSG_Reply(string id, const SPSG_Params& params) : debug_printout(move(id), params) {} 302 void SetSuccess(); 303 void AddError(string message); 304 }; 305 306 struct SPSG_Request 307 { 308 const string full_path; 309 shared_ptr<SPSG_Reply> reply; 310 CRef<CRequestContext> context; 311 312 SPSG_Request(string p, shared_ptr<SPSG_Reply> r, CRef<CRequestContext> c, const SPSG_Params& params); 313 OnReplyDataSPSG_Request314 bool OnReplyData(const char* data, size_t len) 315 { 316 while (len) { 317 if (!(this->*m_State)(data, len)) { 318 return false; 319 } 320 } 321 322 return true; 323 } 324 GetRetriesSPSG_Request325 unsigned GetRetries() 326 { 327 return reply->reply_item.GetMTSafe().state.InProgress() && (m_Retries > 0) ? m_Retries-- : 0; 328 } 329 330 private: 331 bool StatePrefix(const char*& data, size_t& len); 332 bool StateArgs(const char*& data, size_t& len); 333 bool StateData(const char*& data, size_t& len); StateIoSPSG_Request334 bool StateIo(const char*& data, size_t& len) { data += len; len = 0; return true; } 335 SetStatePrefixSPSG_Request336 void SetStatePrefix() { Add(); m_State = &SPSG_Request::StatePrefix; } SetStateArgsSPSG_Request337 void SetStateArgs() { m_State = &SPSG_Request::StateArgs; } SetStateDataSPSG_Request338 void SetStateData(size_t dtr) { m_State = &SPSG_Request::StateData; m_Buffer.data_to_read = dtr; } 339 340 void Add(); 341 void AddIo(); 342 343 using TState = bool (SPSG_Request::*)(const char*& data, size_t& len); 344 TState m_State; 345 346 struct SBuffer 347 { 348 size_t prefix_index = 0; 349 string args_buffer; 350 SPSG_Args args; 351 SPSG_Chunk chunk; 352 size_t data_to_read = 0; 353 }; 354 355 SBuffer m_Buffer; 356 unordered_map<string, SPSG_Reply::SItem::TTS*> m_ItemsByID; 357 unsigned m_Retries; 358 }; 359 360 struct SPSG_TimedRequest 361 { SPSG_TimedRequestSPSG_TimedRequest362 SPSG_TimedRequest(shared_ptr<SPSG_Request> r) : m_Request(move(r)) {} 363 operator ->SPSG_TimedRequest364 shared_ptr<SPSG_Request> operator->() { m_Seconds = 0; return m_Request; } operator shared_ptr<SPSG_Request>SPSG_TimedRequest365 operator shared_ptr<SPSG_Request>() { m_Seconds = 0; return m_Request; } AddSecondSPSG_TimedRequest366 unsigned AddSecond() { return m_Seconds++; } 367 368 private: 369 shared_ptr<SPSG_Request> m_Request; 370 unsigned m_Seconds = 0; 371 }; 372 373 struct SPSG_AsyncQueue : SUv_Async 374 { 375 using TRequest = shared_ptr<SPSG_Request>; 376 PopSPSG_AsyncQueue377 bool Pop(TRequest& request) 378 { 379 return m_Queue.PopMove(request); 380 } 381 PushSPSG_AsyncQueue382 bool Push(TRequest&& request) 383 { 384 if (m_Queue.PushMove(request)) { 385 Signal(); 386 return true; 387 } else { 388 return false; 389 } 390 } 391 392 using SUv_Async::Signal; 393 394 private: 395 CMPMCQueue<TRequest> m_Queue; 396 }; 397 398 struct SPSG_ThrottleParams 399 { 400 struct SThreshold 401 { 402 size_t numerator = 0; 403 size_t denominator = 1; 404 constexpr static size_t kMaxDenominator = 128; 405 406 SThreshold(string error_rate); 407 }; 408 409 const volatile uint64_t period; 410 TPSG_ThrottleMaxFailures max_failures; 411 TPSG_ThrottleUntilDiscovery until_discovery; 412 SThreshold threshold; 413 414 SPSG_ThrottleParams(); 415 }; 416 417 struct SPSG_Throttling 418 { 419 SPSG_Throttling(const SSocketAddress& address, SPSG_ThrottleParams p, uv_loop_t* l); 420 ActiveSPSG_Throttling421 bool Active() const { return m_Active != eOff; } AddSuccessSPSG_Throttling422 bool AddSuccess() { return AddResult(true); } AddFailureSPSG_Throttling423 bool AddFailure() { return AddResult(false); } 424 void StartClose(); 425 DiscoveredSPSG_Throttling426 void Discovered() 427 { 428 if (!Configured()) return; 429 430 EThrottling expected = eUntilDiscovery; 431 432 if (m_Active.compare_exchange_strong(expected, eOff)) { 433 ERR_POST(Warning << "Disabling throttling for server " << m_Address.AsString() << " after wait and rediscovery"); 434 } 435 } 436 437 private: 438 struct SStats 439 { 440 SPSG_ThrottleParams params; 441 unsigned failures = 0; 442 pair<bitset<SPSG_ThrottleParams::SThreshold::kMaxDenominator>, size_t> threshold_reg; 443 SStatsSPSG_Throttling::SStats444 SStats(SPSG_ThrottleParams p) : params(p) {} 445 446 bool Adjust(const SSocketAddress& address, bool result); 447 void Reset(); 448 }; 449 enum EThrottling { eOff, eOnTimer, eUntilDiscovery }; 450 ConfiguredSPSG_Throttling451 uint64_t Configured() const { return m_Stats.GetMTSafe().params.period; } AddResultSPSG_Throttling452 bool AddResult(bool result) { return Configured() && (Active() || Adjust(result)); } 453 bool Adjust(bool result); 454 s_OnSignalSPSG_Throttling455 static void s_OnSignal(uv_async_t* handle) 456 { 457 auto that = static_cast<SPSG_Throttling*>(handle->data); 458 that->m_Timer.Start(); 459 } 460 s_OnTimerSPSG_Throttling461 static void s_OnTimer(uv_timer_t* handle) 462 { 463 auto that = static_cast<SPSG_Throttling*>(handle->data); 464 auto new_value = that->m_Stats.GetLock()->params.until_discovery ? eUntilDiscovery : eOff; 465 that->m_Active.store(new_value); 466 467 if (new_value == eOff) { 468 ERR_POST(Warning << "Disabling throttling for server " << that->m_Address.AsString() << " after wait"); 469 } 470 } 471 472 const SSocketAddress& m_Address; 473 SThreadSafe<SStats> m_Stats; 474 atomic<EThrottling> m_Active; 475 SUv_Timer m_Timer; 476 SUv_Async m_Signal; 477 }; 478 479 struct SPSG_Server 480 { 481 const SSocketAddress address; 482 atomic<double> rate; 483 SPSG_Throttling throttling; 484 SPSG_ServerSPSG_Server485 SPSG_Server(SSocketAddress a, double r, SPSG_ThrottleParams p, uv_loop_t* l) : 486 address(move(a)), 487 rate(r), 488 throttling(address, move(p), l) 489 {} 490 }; 491 492 struct SPSG_IoSession : SUvNgHttp2_SessionBase 493 { 494 SPSG_Server& server; 495 496 template <class... TNgHttp2Cbs> 497 SPSG_IoSession(SPSG_Server& s, SPSG_AsyncQueue& queue, uv_loop_t* loop, TNgHttp2Cbs&&... callbacks); 498 499 bool ProcessRequest(shared_ptr<SPSG_Request>& req); 500 void CheckRequestExpiration(); IsFullSPSG_IoSession501 bool IsFull() const { return m_Session.GetMaxStreams() <= m_Requests.size(); } 502 503 protected: 504 int OnData(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len); 505 int OnStreamClose(nghttp2_session* session, int32_t stream_id, uint32_t error_code); 506 int OnHeader(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, 507 const uint8_t* value, size_t valuelen, uint8_t flags); OnErrorSPSG_IoSession508 int OnError(nghttp2_session*, const char* msg, size_t) { Reset(msg); return 0; } 509 510 private: 511 enum EHeaders { eMethod, eScheme, eAuthority, ePath, eUserAgent, eSessionID, eSubHitID, eClientIP, eSize }; 512 513 using TRequests = unordered_map<int32_t, SPSG_TimedRequest>; 514 515 bool Retry(shared_ptr<SPSG_Request> req, const SUvNgHttp2_Error& error); 516 void RequestComplete(TRequests::iterator& it); 517 518 void OnReset(SUvNgHttp2_Error error) override; 519 520 array<SNgHttp2_Header<NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE>, eSize> m_Headers; 521 const TPSG_RequestTimeout m_RequestTimeout; 522 SPSG_AsyncQueue& m_Queue; 523 TRequests m_Requests; 524 }; 525 526 template <class TImpl> 527 struct SPSG_Thread : public TImpl 528 { 529 template <class... TArgs> SPSG_ThreadSPSG_Thread530 SPSG_Thread(SUv_Barrier& barrier, uint64_t timeout, uint64_t repeat, TArgs&&... args) : 531 TImpl(forward<TArgs>(args)...), 532 m_Timer(this, s_OnTimer, timeout, repeat), 533 m_Thread(s_Execute, this, ref(barrier)) 534 {} 535 ~SPSG_ThreadSPSG_Thread536 ~SPSG_Thread() 537 { 538 if (m_Thread.joinable()) { 539 m_Shutdown.Signal(); 540 m_Thread.join(); 541 } 542 } 543 544 private: s_OnShutdownSPSG_Thread545 static void s_OnShutdown(uv_async_t* handle) 546 { 547 SPSG_Thread* io = static_cast<SPSG_Thread*>(handle->data); 548 io->m_Shutdown.Close(); 549 io->m_Timer.Close(); 550 io->TImpl::OnShutdown(handle); 551 } 552 s_OnTimerSPSG_Thread553 static void s_OnTimer(uv_timer_t* handle) 554 { 555 SPSG_Thread* io = static_cast<SPSG_Thread*>(handle->data); 556 io->TImpl::OnTimer(handle); 557 } 558 s_ExecuteSPSG_Thread559 static void s_Execute(SPSG_Thread* io, SUv_Barrier& barrier) 560 { 561 SUv_Loop loop; 562 563 io->TImpl::OnExecute(loop); 564 io->m_Shutdown.Init(io, &loop, s_OnShutdown); 565 io->m_Timer.Init(&loop); 566 io->m_Timer.Start(); 567 568 barrier.Wait(); 569 570 loop.Run(); 571 572 io->TImpl::AfterExecute(); 573 } 574 575 SUv_Async m_Shutdown; 576 SUv_Timer m_Timer; 577 thread m_Thread; 578 }; 579 580 struct SPSG_Servers : protected deque<SPSG_Server> 581 { 582 using TBase = deque<SPSG_Server>; 583 using TTS = SThreadSafe<SPSG_Servers>; 584 585 // Only methods that do not use/change size can be used directly 586 using TBase::begin; 587 using TBase::end; 588 using TBase::operator[]; 589 590 atomic_bool fail_requests; 591 SPSG_ServersSPSG_Servers592 SPSG_Servers() : fail_requests(false), m_Size(0) {} 593 594 template <class... TArgs> emplace_backSPSG_Servers595 void emplace_back(TArgs&&... args) 596 { 597 TBase::emplace_back(forward<TArgs>(args)...); 598 ++m_Size; 599 } 600 sizeSPSG_Servers601 size_t size() const volatile { return m_Size; } 602 603 private: 604 atomic_size_t m_Size; 605 }; 606 607 struct SPSG_IoImpl 608 { 609 using TSpaceCV = SPSG_CV<1000>; 610 611 SPSG_AsyncQueue queue; 612 TSpaceCV* space; 613 SPSG_IoImplSPSG_IoImpl614 SPSG_IoImpl(TSpaceCV* s, SPSG_Servers::TTS& servers) : 615 space(s), 616 m_Servers(servers), 617 m_Random(piecewise_construct, {}, forward_as_tuple(random_device()())) 618 {} 619 620 protected: 621 void OnShutdown(uv_async_t* handle); 622 void OnTimer(uv_timer_t* handle); 623 void OnExecute(uv_loop_t& loop); 624 void AfterExecute(); 625 626 private: CheckForNewServersSPSG_IoImpl627 void CheckForNewServers(uv_async_t* handle) 628 { 629 const auto servers_size = m_Servers.GetMTSafe().size(); 630 const auto sessions_size = m_Sessions.size(); 631 632 if (servers_size > sessions_size) { 633 AddNewServers(servers_size, sessions_size, handle); 634 } 635 } 636 637 void AddNewServers(size_t servers_size, size_t sessions_size, uv_async_t* handle); 638 void OnQueue(uv_async_t* handle); 639 s_OnQueueSPSG_IoImpl640 static void s_OnQueue(uv_async_t* handle) 641 { 642 SPSG_IoImpl* io = static_cast<SPSG_IoImpl*>(handle->data); 643 io->CheckForNewServers(handle); 644 io->OnQueue(handle); 645 } 646 647 SPSG_Servers::TTS& m_Servers; 648 deque<pair<SUvNgHttp2_Session<SPSG_IoSession>, double>> m_Sessions; 649 pair<uniform_real_distribution<>, default_random_engine> m_Random; 650 }; 651 652 struct SPSG_DiscoveryImpl 653 { SPSG_DiscoveryImplSPSG_DiscoveryImpl654 SPSG_DiscoveryImpl(CServiceDiscovery service, SPSG_Servers::TTS& servers) : 655 m_NoServers(servers), 656 m_Service(move(service)), 657 m_Servers(servers) 658 {} 659 660 protected: 661 void OnShutdown(uv_async_t*); 662 void OnTimer(uv_timer_t* handle); OnExecuteSPSG_DiscoveryImpl663 void OnExecute(uv_loop_t&) {} AfterExecuteSPSG_DiscoveryImpl664 void AfterExecute() {} 665 666 private: 667 struct SNoServers 668 { 669 SNoServers(SPSG_Servers::TTS& servers); 670 671 bool operator()(bool discovered, SUv_Timer* timer); 672 673 private: 674 const uint64_t m_RetryDelay; 675 const uint64_t m_Timeout; 676 atomic_bool& m_FailRequests; 677 uint64_t m_Passed = 0; 678 }; 679 680 SNoServers m_NoServers; 681 CServiceDiscovery m_Service; 682 SPSG_Servers::TTS& m_Servers; 683 SPSG_ThrottleParams m_ThrottleParams; 684 }; 685 686 struct SPSG_IoCoordinator 687 { 688 SPSG_Params params; 689 690 SPSG_IoCoordinator(CServiceDiscovery service); 691 bool AddRequest(shared_ptr<SPSG_Request> req, const atomic_bool& stopped, const CDeadline& deadline); GetNewRequestIdSPSG_IoCoordinator692 string GetNewRequestId() { return to_string(m_RequestId++); } GetUrlArgsSPSG_IoCoordinator693 const string& GetUrlArgs() const { return m_UrlArgs; } RejectsRequestsSPSG_IoCoordinator694 bool RejectsRequests() const { return m_Servers.GetMTSafe().fail_requests; } 695 696 private: 697 SUv_Barrier m_Barrier; 698 SPSG_IoImpl::TSpaceCV m_Space; 699 SPSG_Servers::TTS m_Servers; 700 SPSG_Thread<SPSG_DiscoveryImpl> m_Discovery; 701 vector<unique_ptr<SPSG_Thread<SPSG_IoImpl>>> m_Io; 702 atomic<size_t> m_RequestCounter; 703 atomic<size_t> m_RequestId; 704 const string m_UrlArgs; 705 }; 706 707 END_NCBI_SCOPE 708 709 #endif 710 #endif 711