1 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__PSG_CLIENT_TRANSPORT__HPP
2 #define OBJTOOLS__PUBSEQ_GATEWAY__PSG_CLIENT_TRANSPORT__HPP
3 
4 /*  $Id: psg_client_transport.hpp 628921 2021-04-07 18:46:41Z ivanov $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Dmitri Dmitrienko, Rafael Sadyrov
30  *
31  */
32 
33 #include <objtools/pubseq_gateway/client/impl/misc.hpp>
34 #include <objtools/pubseq_gateway/client/psg_client.hpp>
35 
36 #ifdef HAVE_PSG_CLIENT
37 
38 #define __STDC_FORMAT_MACROS
39 
40 #include <array>
41 #include <memory>
42 #include <string>
43 #include <cassert>
44 #include <ostream>
45 #include <iostream>
46 #include <atomic>
47 #include <functional>
48 #include <limits>
49 #include <unordered_map>
50 #include <cstdint>
51 #include <set>
52 #include <thread>
53 #include <utility>
54 #include <condition_variable>
55 #include <forward_list>
56 #include <chrono>
57 #include <sstream>
58 #include <random>
59 #include <type_traits>
60 
61 #include "mpmc_nw.hpp"
62 #include <connect/impl/ncbi_uv_nghttp2.hpp>
63 #include <connect/services/netservice_api.hpp>
64 #include <corelib/ncbi_param.hpp>
65 #include <corelib/ncbi_url.hpp>
66 
67 
68 BEGIN_NCBI_SCOPE
69 
70 // Different TRACE macros allow turning off tracing in some classes
71 #define PSG_THROTTLING_TRACE(message)      _TRACE(message)
72 #define PSG_IO_SESSION_TRACE(message)      _TRACE(message)
73 #define PSG_IO_TRACE(message)              _TRACE(message)
74 #define PSG_DISCOVERY_TRACE(message)       _TRACE(message)
75 
76 struct SPSG_Args : CUrlArgs
77 {
78     using CUrlArgs::CUrlArgs;
79     using CUrlArgs::operator=;
80 
GetValueSPSG_Args81     const string& GetValue(const string& name) const
82     {
83         bool not_used;
84         return CUrlArgs::GetValue(name, &not_used);
85     }
86 };
87 
88 template <typename TValue>
89 struct SPSG_Nullable : protected CNullable<TValue>
90 {
91     template <template<typename> class TCmp>
CmpSPSG_Nullable92     bool Cmp(TValue s) const { return !CNullable<TValue>::IsNull() && TCmp<TValue>()(*this, s); }
93 
94     using CNullable<TValue>::operator TValue;
95     using CNullable<TValue>::operator=;
96 };
97 
98 struct SPSG_Chunk : string
99 {
100     using string::string;
101 
102     SPSG_Chunk() = default;
103 
104     SPSG_Chunk(SPSG_Chunk&&) = default;
105     SPSG_Chunk& operator=(SPSG_Chunk&&) = default;
106 
107     SPSG_Chunk(const SPSG_Chunk&) = delete;
108     SPSG_Chunk& operator=(const SPSG_Chunk&) = delete;
109 };
110 
111 struct SPSG_Params
112 {
113     TPSG_DebugPrintout debug_printout;
114     TPSG_RequestsPerIo requests_per_io;
115     TPSG_RequestRetries request_retries;
116     TPSG_PsgClientMode client_mode;
117 
SPSG_ParamsSPSG_Params118     SPSG_Params() :
119         debug_printout(TPSG_DebugPrintout::eGetDefault),
120         requests_per_io(TPSG_RequestsPerIo::eGetDefault),
121         request_retries(TPSG_RequestRetries::eGetDefault),
122         client_mode(TPSG_PsgClientMode::eGetDefault)
123     {}
124 };
125 
126 struct SDebugPrintout
127 {
128     const string id;
129 
SDebugPrintoutSDebugPrintout130     SDebugPrintout(string i, const SPSG_Params& params) :
131         id(move(i)),
132         m_Params(params)
133     {
134         if (IsPerf()) m_Events.reserve(20);
135     }
136 
137     ~SDebugPrintout();
138 
139     template <class TArg, class ...TRest>
140     struct SPack;
141 
142     template <class TArg>
operator <<SDebugPrintout143     SPack<TArg> operator<<(const TArg& arg) { return { this, &arg }; }
144 
145 private:
146     template <class ...TArgs>
ProcessSDebugPrintout147     void Process(TArgs&&... args)
148     {
149         if (IsPerf()) return Event(forward<TArgs>(args)...);
150 
151         if (m_Params.debug_printout == EPSG_DebugPrintout::eNone) return;
152 
153         Print(forward<TArgs>(args)...);
154     }
155 
156     enum EType { eSend = 1000, eReceive, eClose, eRetry, eFail };
157 
EventSDebugPrintout158     void Event(SSocketAddress, const string&)       { Event(eSend);    }
EventSDebugPrintout159     void Event(const SPSG_Args&, const SPSG_Chunk&) { Event(eReceive); }
EventSDebugPrintout160     void Event(uint32_t)                            { Event(eClose);   }
EventSDebugPrintout161     void Event(unsigned, const SUvNgHttp2_Error&)   { Event(eRetry);   }
EventSDebugPrintout162     void Event(const SUvNgHttp2_Error&)             { Event(eFail);    }
163 
EventSDebugPrintout164     void Event(EType type)
165     {
166         auto ms = chrono::duration<double, milli>(chrono::steady_clock::now().time_since_epoch()).count();
167         auto thread_id = this_thread::get_id();
168         m_Events.emplace_back(ms, type, thread_id);
169     }
170 
171     void Print(SSocketAddress address, const string& path);
172     void Print(const SPSG_Args& args, const SPSG_Chunk& chunk);
173     void Print(uint32_t error_code);
174     void Print(unsigned retries, const SUvNgHttp2_Error& error);
175     void Print(const SUvNgHttp2_Error& error);
176 
IsPerfSDebugPrintout177     bool IsPerf() const
178     {
179         return (m_Params.client_mode == EPSG_PsgClientMode::ePerformance) ||
180             (m_Params.client_mode == EPSG_PsgClientMode::eIo);
181     }
182 
183     SPSG_Params m_Params;
184     vector<tuple<double, EType, thread::id>> m_Events;
185 };
186 
187 template <class TArg, class ...TRest>
188 struct SDebugPrintout::SPack : SPack<TRest...>
189 {
SPackSDebugPrintout::SPack190     SPack(SPack<TRest...>&& base, const TArg* arg) :
191         SPack<TRest...>(move(base)),
192         m_Arg(arg)
193     {}
194 
195     template <class TNextArg>
operator <<SDebugPrintout::SPack196     SPack<TNextArg, TArg, TRest...> operator<<(const TNextArg& next_arg)
197     {
198         return { move(*this), &next_arg };
199     }
200 
operator <<SDebugPrintout::SPack201     void operator<<(ostream& (*)(ostream&)) { Process(); }
202 
203 protected:
204     template <class ...TArgs>
ProcessSDebugPrintout::SPack205     void Process(TArgs&&... args)
206     {
207         SPack<TRest...>::Process(*m_Arg, forward<TArgs>(args)...);
208     }
209 
210 private:
211     const TArg* m_Arg;
212 };
213 
214 template <class TArg>
215 struct SDebugPrintout::SPack<TArg>
216 {
SPackSDebugPrintout::SPack217     SPack(SDebugPrintout* debug_printout, const TArg* arg) :
218         m_DebugPrintout(debug_printout),
219         m_Arg(arg)
220     {}
221 
222     template <class TNextArg>
operator <<SDebugPrintout::SPack223     SPack<TNextArg, TArg> operator<<(const TNextArg& next_arg)
224     {
225         return { move(*this), &next_arg };
226     }
227 
operator <<SDebugPrintout::SPack228     void operator<<(ostream& (*)(ostream&)) { Process(); }
229 
230 protected:
231     template <class ...TArgs>
ProcessSDebugPrintout::SPack232     void Process(TArgs&&... args)
233     {
234         m_DebugPrintout->Process(*m_Arg, forward<TArgs>(args)...);
235     }
236 
237 private:
238     SDebugPrintout* m_DebugPrintout;
239     const TArg* m_Arg;
240 };
241 
242 struct SPSG_Reply
243 {
244     struct SState
245     {
246         enum EState {
247             eInProgress,
248             eSuccess,
249             eNotFound,
250             eError,
251         };
252 
253         SPSG_CV<0> change;
254 
SStateSPSG_Reply::SState255         SState() : m_State(eInProgress), m_Returned(false), m_Empty(true) {}
256 
GetStateSPSG_Reply::SState257         const volatile atomic<EState>& GetState() const volatile { return m_State; }
258         string GetError();
259 
InProgressSPSG_Reply::SState260         bool InProgress() const volatile { return m_State == eInProgress; }
ReturnedSPSG_Reply::SState261         bool Returned() const volatile { return m_Returned; }
EmptySPSG_Reply::SState262         bool Empty() const volatile { return m_Empty; }
263 
SetStateSPSG_Reply::SState264         void SetState(EState state) volatile
265         {
266             auto expected = eInProgress;
267 
268             if (m_State.compare_exchange_strong(expected, state)) {
269                 change.NotifyOne();
270             }
271         }
272 
273         void AddError(string message, EState new_state = eError);
SetReturnedSPSG_Reply::SState274         void SetReturned() volatile { bool expected = false; m_Returned.compare_exchange_strong(expected, true); }
SetNotEmptySPSG_Reply::SState275         void SetNotEmpty() volatile { bool expected = true; m_Empty.compare_exchange_strong(expected, false); }
276 
277     private:
278         atomic<EState> m_State;
279         atomic_bool m_Returned;
280         atomic_bool m_Empty;
281         vector<string> m_Messages;
282     };
283 
284     struct SItem
285     {
286         using TTS = SPSG_CV<0, SThreadSafe<SItem>>;
287 
288         vector<SPSG_Chunk> chunks;
289         SPSG_Args args;
290         SPSG_Nullable<size_t> expected;
291         size_t received = 0;
292         SState state;
293 
294         void SetSuccess();
295     };
296 
297     SThreadSafe<list<SItem::TTS>> items;
298     SItem::TTS reply_item;
299     SDebugPrintout debug_printout;
300 
SPSG_ReplySPSG_Reply301     SPSG_Reply(string id, const SPSG_Params& params) : debug_printout(move(id), params) {}
302     void SetSuccess();
303     void AddError(string message);
304 };
305 
306 struct SPSG_Request
307 {
308     const string full_path;
309     shared_ptr<SPSG_Reply> reply;
310     CRef<CRequestContext> context;
311 
312     SPSG_Request(string p, shared_ptr<SPSG_Reply> r, CRef<CRequestContext> c, const SPSG_Params& params);
313 
OnReplyDataSPSG_Request314     bool OnReplyData(const char* data, size_t len)
315     {
316         while (len) {
317             if (!(this->*m_State)(data, len)) {
318                 return false;
319             }
320         }
321 
322         return true;
323     }
324 
GetRetriesSPSG_Request325     unsigned GetRetries()
326     {
327         return reply->reply_item.GetMTSafe().state.InProgress() && (m_Retries > 0) ? m_Retries-- : 0;
328     }
329 
330 private:
331     bool StatePrefix(const char*& data, size_t& len);
332     bool StateArgs(const char*& data, size_t& len);
333     bool StateData(const char*& data, size_t& len);
StateIoSPSG_Request334     bool StateIo(const char*& data, size_t& len) { data += len; len = 0; return true; }
335 
SetStatePrefixSPSG_Request336     void SetStatePrefix()  { Add(); m_State = &SPSG_Request::StatePrefix; }
SetStateArgsSPSG_Request337     void SetStateArgs()           { m_State = &SPSG_Request::StateArgs;   }
SetStateDataSPSG_Request338     void SetStateData(size_t dtr) { m_State = &SPSG_Request::StateData;   m_Buffer.data_to_read = dtr; }
339 
340     void Add();
341     void AddIo();
342 
343     using TState = bool (SPSG_Request::*)(const char*& data, size_t& len);
344     TState m_State;
345 
346     struct SBuffer
347     {
348         size_t prefix_index = 0;
349         string args_buffer;
350         SPSG_Args args;
351         SPSG_Chunk chunk;
352         size_t data_to_read = 0;
353     };
354 
355     SBuffer m_Buffer;
356     unordered_map<string, SPSG_Reply::SItem::TTS*> m_ItemsByID;
357     unsigned m_Retries;
358 };
359 
360 struct SPSG_TimedRequest
361 {
SPSG_TimedRequestSPSG_TimedRequest362     SPSG_TimedRequest(shared_ptr<SPSG_Request> r) : m_Request(move(r)) {}
363 
operator ->SPSG_TimedRequest364     shared_ptr<SPSG_Request> operator->() { m_Seconds = 0; return m_Request; }
operator shared_ptr<SPSG_Request>SPSG_TimedRequest365     operator shared_ptr<SPSG_Request>()   { m_Seconds = 0; return m_Request; }
AddSecondSPSG_TimedRequest366     unsigned AddSecond() { return m_Seconds++; }
367 
368 private:
369     shared_ptr<SPSG_Request> m_Request;
370     unsigned m_Seconds = 0;
371 };
372 
373 struct SPSG_AsyncQueue : SUv_Async
374 {
375     using TRequest = shared_ptr<SPSG_Request>;
376 
PopSPSG_AsyncQueue377     bool Pop(TRequest& request)
378     {
379         return m_Queue.PopMove(request);
380     }
381 
PushSPSG_AsyncQueue382     bool Push(TRequest&& request)
383     {
384         if (m_Queue.PushMove(request)) {
385             Signal();
386             return true;
387         } else {
388             return false;
389         }
390     }
391 
392     using SUv_Async::Signal;
393 
394 private:
395     CMPMCQueue<TRequest> m_Queue;
396 };
397 
398 struct SPSG_ThrottleParams
399 {
400     struct SThreshold
401     {
402         size_t numerator = 0;
403         size_t denominator = 1;
404         constexpr static size_t kMaxDenominator = 128;
405 
406         SThreshold(string error_rate);
407     };
408 
409     const volatile uint64_t period;
410     TPSG_ThrottleMaxFailures max_failures;
411     TPSG_ThrottleUntilDiscovery until_discovery;
412     SThreshold threshold;
413 
414     SPSG_ThrottleParams();
415 };
416 
417 struct SPSG_Throttling
418 {
419     SPSG_Throttling(const SSocketAddress& address, SPSG_ThrottleParams p, uv_loop_t* l);
420 
ActiveSPSG_Throttling421     bool Active() const { return m_Active != eOff; }
AddSuccessSPSG_Throttling422     bool AddSuccess() { return AddResult(true); }
AddFailureSPSG_Throttling423     bool AddFailure() { return AddResult(false); }
424     void StartClose();
425 
DiscoveredSPSG_Throttling426     void Discovered()
427     {
428         if (!Configured()) return;
429 
430         EThrottling expected = eUntilDiscovery;
431 
432         if (m_Active.compare_exchange_strong(expected, eOff)) {
433             ERR_POST(Warning << "Disabling throttling for server " << m_Address.AsString() << " after wait and rediscovery");
434         }
435     }
436 
437 private:
438     struct SStats
439     {
440         SPSG_ThrottleParams params;
441         unsigned failures = 0;
442         pair<bitset<SPSG_ThrottleParams::SThreshold::kMaxDenominator>, size_t> threshold_reg;
443 
SStatsSPSG_Throttling::SStats444         SStats(SPSG_ThrottleParams p) : params(p) {}
445 
446         bool Adjust(const SSocketAddress& address, bool result);
447         void Reset();
448     };
449     enum EThrottling { eOff, eOnTimer, eUntilDiscovery };
450 
ConfiguredSPSG_Throttling451     uint64_t Configured() const { return m_Stats.GetMTSafe().params.period; }
AddResultSPSG_Throttling452     bool AddResult(bool result) { return Configured() && (Active() || Adjust(result)); }
453     bool Adjust(bool result);
454 
s_OnSignalSPSG_Throttling455     static void s_OnSignal(uv_async_t* handle)
456     {
457         auto that = static_cast<SPSG_Throttling*>(handle->data);
458         that->m_Timer.Start();
459     }
460 
s_OnTimerSPSG_Throttling461     static void s_OnTimer(uv_timer_t* handle)
462     {
463         auto that = static_cast<SPSG_Throttling*>(handle->data);
464         auto new_value = that->m_Stats.GetLock()->params.until_discovery ? eUntilDiscovery : eOff;
465         that->m_Active.store(new_value);
466 
467         if (new_value == eOff) {
468             ERR_POST(Warning << "Disabling throttling for server " << that->m_Address.AsString() << " after wait");
469         }
470     }
471 
472     const SSocketAddress& m_Address;
473     SThreadSafe<SStats> m_Stats;
474     atomic<EThrottling> m_Active;
475     SUv_Timer m_Timer;
476     SUv_Async m_Signal;
477 };
478 
479 struct SPSG_Server
480 {
481     const SSocketAddress address;
482     atomic<double> rate;
483     SPSG_Throttling throttling;
484 
SPSG_ServerSPSG_Server485     SPSG_Server(SSocketAddress a, double r, SPSG_ThrottleParams p, uv_loop_t* l) :
486         address(move(a)),
487         rate(r),
488         throttling(address, move(p), l)
489     {}
490 };
491 
492 struct SPSG_IoSession : SUvNgHttp2_SessionBase
493 {
494     SPSG_Server& server;
495 
496     template <class... TNgHttp2Cbs>
497     SPSG_IoSession(SPSG_Server& s, SPSG_AsyncQueue& queue, uv_loop_t* loop, TNgHttp2Cbs&&... callbacks);
498 
499     bool ProcessRequest(shared_ptr<SPSG_Request>& req);
500     void CheckRequestExpiration();
IsFullSPSG_IoSession501     bool IsFull() const { return m_Session.GetMaxStreams() <= m_Requests.size(); }
502 
503 protected:
504     int OnData(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len);
505     int OnStreamClose(nghttp2_session* session, int32_t stream_id, uint32_t error_code);
506     int OnHeader(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen,
507             const uint8_t* value, size_t valuelen, uint8_t flags);
OnErrorSPSG_IoSession508     int OnError(nghttp2_session*, const char* msg, size_t) { Reset(msg); return 0; }
509 
510 private:
511     enum EHeaders { eMethod, eScheme, eAuthority, ePath, eUserAgent, eSessionID, eSubHitID, eClientIP, eSize };
512 
513     using TRequests = unordered_map<int32_t, SPSG_TimedRequest>;
514 
515     bool Retry(shared_ptr<SPSG_Request> req, const SUvNgHttp2_Error& error);
516     void RequestComplete(TRequests::iterator& it);
517 
518     void OnReset(SUvNgHttp2_Error error) override;
519 
520     array<SNgHttp2_Header<NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE>, eSize> m_Headers;
521     const TPSG_RequestTimeout m_RequestTimeout;
522     SPSG_AsyncQueue& m_Queue;
523     TRequests m_Requests;
524 };
525 
526 template <class TImpl>
527 struct SPSG_Thread : public TImpl
528 {
529     template <class... TArgs>
SPSG_ThreadSPSG_Thread530     SPSG_Thread(SUv_Barrier& barrier, uint64_t timeout, uint64_t repeat, TArgs&&... args) :
531         TImpl(forward<TArgs>(args)...),
532         m_Timer(this, s_OnTimer, timeout, repeat),
533         m_Thread(s_Execute, this, ref(barrier))
534     {}
535 
~SPSG_ThreadSPSG_Thread536     ~SPSG_Thread()
537     {
538         if (m_Thread.joinable()) {
539             m_Shutdown.Signal();
540             m_Thread.join();
541         }
542     }
543 
544 private:
s_OnShutdownSPSG_Thread545     static void s_OnShutdown(uv_async_t* handle)
546     {
547         SPSG_Thread* io = static_cast<SPSG_Thread*>(handle->data);
548         io->m_Shutdown.Close();
549         io->m_Timer.Close();
550         io->TImpl::OnShutdown(handle);
551     }
552 
s_OnTimerSPSG_Thread553     static void s_OnTimer(uv_timer_t* handle)
554     {
555         SPSG_Thread* io = static_cast<SPSG_Thread*>(handle->data);
556         io->TImpl::OnTimer(handle);
557     }
558 
s_ExecuteSPSG_Thread559     static void s_Execute(SPSG_Thread* io, SUv_Barrier& barrier)
560     {
561         SUv_Loop loop;
562 
563         io->TImpl::OnExecute(loop);
564         io->m_Shutdown.Init(io, &loop, s_OnShutdown);
565         io->m_Timer.Init(&loop);
566         io->m_Timer.Start();
567 
568         barrier.Wait();
569 
570         loop.Run();
571 
572         io->TImpl::AfterExecute();
573     }
574 
575     SUv_Async m_Shutdown;
576     SUv_Timer m_Timer;
577     thread m_Thread;
578 };
579 
580 struct SPSG_Servers : protected deque<SPSG_Server>
581 {
582     using TBase = deque<SPSG_Server>;
583     using TTS = SThreadSafe<SPSG_Servers>;
584 
585     // Only methods that do not use/change size can be used directly
586     using TBase::begin;
587     using TBase::end;
588     using TBase::operator[];
589 
590     atomic_bool fail_requests;
591 
SPSG_ServersSPSG_Servers592     SPSG_Servers() : fail_requests(false), m_Size(0) {}
593 
594     template <class... TArgs>
emplace_backSPSG_Servers595     void emplace_back(TArgs&&... args)
596     {
597         TBase::emplace_back(forward<TArgs>(args)...);
598         ++m_Size;
599     }
600 
sizeSPSG_Servers601     size_t size() const volatile { return m_Size; }
602 
603 private:
604     atomic_size_t m_Size;
605 };
606 
607 struct SPSG_IoImpl
608 {
609     using TSpaceCV = SPSG_CV<1000>;
610 
611     SPSG_AsyncQueue queue;
612     TSpaceCV* space;
613 
SPSG_IoImplSPSG_IoImpl614     SPSG_IoImpl(TSpaceCV* s, SPSG_Servers::TTS& servers) :
615         space(s),
616         m_Servers(servers),
617         m_Random(piecewise_construct, {}, forward_as_tuple(random_device()()))
618     {}
619 
620 protected:
621     void OnShutdown(uv_async_t* handle);
622     void OnTimer(uv_timer_t* handle);
623     void OnExecute(uv_loop_t& loop);
624     void AfterExecute();
625 
626 private:
CheckForNewServersSPSG_IoImpl627     void CheckForNewServers(uv_async_t* handle)
628     {
629         const auto servers_size = m_Servers.GetMTSafe().size();
630         const auto sessions_size = m_Sessions.size();
631 
632         if (servers_size > sessions_size) {
633             AddNewServers(servers_size, sessions_size, handle);
634         }
635     }
636 
637     void AddNewServers(size_t servers_size, size_t sessions_size, uv_async_t* handle);
638     void OnQueue(uv_async_t* handle);
639 
s_OnQueueSPSG_IoImpl640     static void s_OnQueue(uv_async_t* handle)
641     {
642         SPSG_IoImpl* io = static_cast<SPSG_IoImpl*>(handle->data);
643         io->CheckForNewServers(handle);
644         io->OnQueue(handle);
645     }
646 
647     SPSG_Servers::TTS& m_Servers;
648     deque<pair<SUvNgHttp2_Session<SPSG_IoSession>, double>> m_Sessions;
649     pair<uniform_real_distribution<>, default_random_engine> m_Random;
650 };
651 
652 struct SPSG_DiscoveryImpl
653 {
SPSG_DiscoveryImplSPSG_DiscoveryImpl654     SPSG_DiscoveryImpl(CServiceDiscovery service, SPSG_Servers::TTS& servers) :
655         m_NoServers(servers),
656         m_Service(move(service)),
657         m_Servers(servers)
658     {}
659 
660 protected:
661     void OnShutdown(uv_async_t*);
662     void OnTimer(uv_timer_t* handle);
OnExecuteSPSG_DiscoveryImpl663     void OnExecute(uv_loop_t&) {}
AfterExecuteSPSG_DiscoveryImpl664     void AfterExecute() {}
665 
666 private:
667     struct SNoServers
668     {
669         SNoServers(SPSG_Servers::TTS& servers);
670 
671         bool operator()(bool discovered, SUv_Timer* timer);
672 
673     private:
674         const uint64_t m_RetryDelay;
675         const uint64_t m_Timeout;
676         atomic_bool& m_FailRequests;
677         uint64_t m_Passed = 0;
678     };
679 
680     SNoServers m_NoServers;
681     CServiceDiscovery m_Service;
682     SPSG_Servers::TTS& m_Servers;
683     SPSG_ThrottleParams m_ThrottleParams;
684 };
685 
686 struct SPSG_IoCoordinator
687 {
688     SPSG_Params params;
689 
690     SPSG_IoCoordinator(CServiceDiscovery service);
691     bool AddRequest(shared_ptr<SPSG_Request> req, const atomic_bool& stopped, const CDeadline& deadline);
GetNewRequestIdSPSG_IoCoordinator692     string GetNewRequestId() { return to_string(m_RequestId++); }
GetUrlArgsSPSG_IoCoordinator693     const string& GetUrlArgs() const { return m_UrlArgs; }
RejectsRequestsSPSG_IoCoordinator694     bool RejectsRequests() const { return m_Servers.GetMTSafe().fail_requests; }
695 
696 private:
697     SUv_Barrier m_Barrier;
698     SPSG_IoImpl::TSpaceCV m_Space;
699     SPSG_Servers::TTS m_Servers;
700     SPSG_Thread<SPSG_DiscoveryImpl> m_Discovery;
701     vector<unique_ptr<SPSG_Thread<SPSG_IoImpl>>> m_Io;
702     atomic<size_t> m_RequestCounter;
703     atomic<size_t> m_RequestId;
704     const string m_UrlArgs;
705 };
706 
707 END_NCBI_SCOPE
708 
709 #endif
710 #endif
711