1 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__CLIENT__IMPL__MISC__HPP
2 #define OBJTOOLS__PUBSEQ_GATEWAY__CLIENT__IMPL__MISC__HPP
3 
4 /*  $Id: misc.hpp 628920 2021-04-07 18:46:32Z ivanov $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Rafael Sadyrov
30  *
31  */
32 
33 #include <atomic>
34 #include <chrono>
35 #include <condition_variable>
36 #include <memory>
37 #include <mutex>
38 #include <thread>
39 
40 #include <corelib/ncbitime.hpp>
41 #include <corelib/ncbi_param.hpp>
42 
43 BEGIN_NCBI_SCOPE
44 
45 template <int ITERS, class TBase>
46 struct SPSG_CV_Base : TBase
47 {
GetMutexSPSG_CV_Base48     mutex& GetMutex() { return TBase::m_Mutex; }
49 };
50 
51 template <int ITERS>
52 struct SPSG_CV_Base<ITERS, void>
53 {
54 };
55 
56 template <>
57 struct SPSG_CV_Base<0, void>
58 {
GetMutexSPSG_CV_Base59     mutex& GetMutex() { return m_Mutex; }
60 
61 private:
62     mutex m_Mutex;
63 };
64 
65 template <int ITERS, typename TBase = void>
66 struct SPSG_CV : SPSG_CV_Base<ITERS, TBase>
67 {
68 private:
69     template <int I, typename NA = void>
70     struct SImpl
71     {
SImplSPSG_CV::SImpl72         SImpl(SPSG_CV_Base<I, TBase>&) {}
73 
NotifyOneSPSG_CV::SImpl74         void NotifyOne() {}
NotifyAllSPSG_CV::SImpl75         void NotifyAll() {}
76 
WaitUntilSPSG_CV::SImpl77         bool WaitUntil(const CDeadline& deadline)
78         {
79             return x_WaitUntil() || !deadline.IsExpired();
80         }
81 
82         template <typename T = bool>
WaitUntilSPSG_CV::SImpl83         bool WaitUntil(const volatile atomic<T>& a, const CDeadline& deadline, T v = false, bool rv = false)
84         {
85             return x_WaitUntil() || (a != v ? rv : !deadline.IsExpired());
86         }
87 
88     private:
x_WaitUntilSPSG_CV::SImpl89         bool x_WaitUntil()
90         {
91             constexpr auto kWait = chrono::microseconds(10);
92             this_thread::sleep_for(kWait);
93 
94             if (m_I-- > 0) {
95                 return true;
96             }
97 
98             m_I += I;
99             return false;
100         }
101 
102         int m_I = I;
103     };
104 
105     template <typename NA>
106     struct SImpl<0, NA>
107     {
108         using clock = chrono::system_clock;
109 
SImplSPSG_CV::SImpl110         SImpl(SPSG_CV_Base<0, TBase>& base) : m_Base(base), m_Signal(0) {}
111 
NotifyOneSPSG_CV::SImpl112         void NotifyOne() { x_Signal(); m_CV.notify_one(); }
NotifyAllSPSG_CV::SImpl113         void NotifyAll() { x_Signal(); m_CV.notify_all(); }
114 
WaitUntilSPSG_CV::SImpl115         bool WaitUntil(const CDeadline& deadline)
116         {
117             return deadline.IsInfinite() ? x_Wait() : x_Wait(x_GetTP(deadline));
118         }
119 
120         template <typename T = bool>
WaitUntilSPSG_CV::SImpl121         bool WaitUntil(const volatile atomic<T>& a, const CDeadline& deadline, T v = false, bool rv = false)
122         {
123             constexpr auto kWait = chrono::milliseconds(100);
124             const auto until = deadline.IsInfinite() ? clock::time_point::max() : x_GetTP(deadline);
125             const auto max = clock::now() + kWait;
126 
127             do {
128                 if (until < max) {
129                     return x_Wait(until);
130                 }
131 
132                 if (x_Wait(max)) {
133                     return true;
134                 }
135             }
136             while (a == v);
137 
138             return rv;
139         }
140 
141     private:
x_GetTPSPSG_CV::SImpl142         static clock::time_point x_GetTP(const CDeadline& d)
143         {
144             time_t seconds;
145             unsigned int nanoseconds;
146 
147             d.GetExpirationTime(&seconds, &nanoseconds);
148             const auto ns = chrono::duration_cast<clock::duration>(chrono::nanoseconds(nanoseconds));
149             return clock::from_time_t(seconds) + ns;
150         }
151 
152         template <class... TArgs>
x_WaitSPSG_CV::SImpl153         bool x_Wait(TArgs&&... args)
154         {
155             unique_lock<mutex> lock(m_Base.GetMutex());
156             auto p = [&](){ return m_Signal > 0; };
157 
158             if (!x_CvWait(lock, p, forward<TArgs>(args)...)) return false;
159 
160             m_Signal--;
161             return true;
162         }
163 
164         template <class TL, class TP, class TT>
x_CvWaitSPSG_CV::SImpl165         bool x_CvWait(TL& l, TP p, const TT& t)
166         {
167             return m_CV.wait_until(l, t, p);
168         }
169 
170         template <class TL, class TP>
x_CvWaitSPSG_CV::SImpl171         bool x_CvWait(TL& l, TP p)
172         {
173             m_CV.wait(l, p); return true;
174         }
175 
x_SignalSPSG_CV::SImpl176         void x_Signal()
177         {
178             lock_guard<mutex> lock(m_Base.GetMutex());
179             m_Signal++;
180         }
181 
182         SPSG_CV_Base<ITERS, TBase>& m_Base;
183         condition_variable m_CV;
184         int m_Signal;
185     };
186 
187 public:
SPSG_CVSPSG_CV188     SPSG_CV() : m_Impl(*this) {}
189 
NotifyOneSPSG_CV190     void NotifyOne() volatile { GetImpl().NotifyOne(); }
NotifyAllSPSG_CV191     void NotifyAll() volatile { GetImpl().NotifyAll(); }
192 
193     template <class... TArgs>
WaitUntilSPSG_CV194     bool WaitUntil(TArgs&&... args) volatile
195     {
196         return GetImpl().WaitUntil(forward<TArgs>(args)...);
197     }
198 
199 private:
200     using TImpl = SImpl<ITERS>;
201 
GetImplSPSG_CV202     TImpl& GetImpl() volatile { return const_cast<TImpl&>(m_Impl); }
203 
204     TImpl m_Impl;
205 };
206 
207 template <class TValue>
208 struct CPSG_Stack
209 {
210 private:
211     struct TElement
212     {
213         shared_ptr<TElement> next;
214         TValue value;
215 
216         template <class... TArgs>
TElementCPSG_Stack::TElement217         TElement(shared_ptr<TElement> n, TArgs&&... args) : next(n), value(forward<TArgs>(args)...) {}
218     };
219 
220 public:
~CPSG_StackCPSG_Stack221     ~CPSG_Stack() { Clear(); }
222 
223     template <class... TArgs>
EmplaceCPSG_Stack224     void Emplace(TArgs&&... args)
225     {
226         auto head = make_shared<TElement>(atomic_load(&m_Head), forward<TArgs>(args)...);
227 
228         while (!atomic_compare_exchange_weak(&m_Head, &head->next, head));
229     }
230 
PushCPSG_Stack231     void Push(TValue value)
232     {
233         auto head = make_shared<TElement>(atomic_load(&m_Head), move(value));
234 
235         while (!atomic_compare_exchange_weak(&m_Head, &head->next, head));
236     }
237 
PopCPSG_Stack238     bool Pop(TValue& value)
239     {
240         while (auto head = atomic_load(&m_Head)) {
241             if (atomic_compare_exchange_weak(&m_Head, &head, head->next)) {
242                 value = move(head->value);
243                 return true;
244             }
245         }
246 
247         return false;
248     }
249 
ClearCPSG_Stack250     void Clear()
251     {
252         while (auto head = atomic_load(&m_Head)) {
253             if (atomic_compare_exchange_weak(&m_Head, &head, {})) {
254                 while (auto old_head = head) {
255                     head = head->next;
256                 }
257             }
258         }
259     }
260 
EmptyCPSG_Stack261     bool Empty() const { return !atomic_load(&m_Head); }
262 
263 private:
264     shared_ptr<TElement> m_Head;
265 };
266 
267 template <class TValue>
268 struct CPSG_WaitingStack : private CPSG_Stack<TValue>
269 {
CPSG_WaitingStackCPSG_WaitingStack270     CPSG_WaitingStack() : m_Stopped(false) {}
271 
272     template <class... TArgs>
EmplaceCPSG_WaitingStack273     void Emplace(TArgs&&... args)
274     {
275         if (m_Stopped) return;
276 
277         CPSG_Stack<TValue>::Emplace(forward<TArgs>(args)...);
278         m_CV.NotifyOne();
279     }
280 
PushCPSG_WaitingStack281     void Push(TValue value)
282     {
283         if (m_Stopped) return;
284 
285         CPSG_Stack<TValue>::Push(move(value));
286         m_CV.NotifyOne();
287     }
288 
PopCPSG_WaitingStack289     bool Pop(TValue& value, const CDeadline& deadline = CDeadline::eInfinite)
290     {
291         do {
292             if (CPSG_Stack<TValue>::Pop(value)) {
293                 return true;
294             }
295         }
296         while (m_CV.WaitUntil(m_Stopped, deadline));
297 
298         return false;
299     }
300 
301     enum EStop { eDrain, eClear };
StopCPSG_WaitingStack302     void Stop(EStop stop)
303     {
304         m_Stopped.store(true);
305         if (stop == eClear) CPSG_Stack<TValue>::Clear();
306         m_CV.NotifyAll();
307     }
308 
StoppedCPSG_WaitingStack309     const atomic_bool& Stopped() const { return m_Stopped; }
EmptyCPSG_WaitingStack310     bool Empty() const { return m_Stopped && CPSG_Stack<TValue>::Empty(); }
311 
312 private:
313     SPSG_CV<0> m_CV;
314     atomic_bool m_Stopped;
315 };
316 
317 template <class TParam>
318 struct SPSG_ParamValue
319 {
320     using TValue = typename TParam::TValueType;
321 
322     // Getting default incurs some performance penalty, so this ctor is explicit
323     enum EGetDefault { eGetDefault };
SPSG_ParamValueSPSG_ParamValue324     explicit SPSG_ParamValue(EGetDefault) : m_Value(TParam::GetDefault()) { _DEBUG_ARG(sm_Used = true); }
325 
operator TValueSPSG_ParamValue326     operator TValue() const { return m_Value; }
327 
GetDefaultSPSG_ParamValue328     static TValue GetDefault() { return TParam::GetDefault(); }
329 
330     template <typename T>
SetDefaultSPSG_ParamValue331     static void SetDefault(const T& value)
332     {
333         // Forbid setting after it's already used
334         _ASSERT(!sm_Used);
335 
336         TParam::SetDefault(static_cast<TValue>(value));
337     }
338 
SetDefaultSPSG_ParamValue339     static void SetDefault(const string& value)
340     {
341         SetDefaultImpl(TParam(), value);
342     }
343 
344     // Overriding default but only if it's not configured explicitly
345     template <typename T>
SetImplicitDefaultSPSG_ParamValue346     static void SetImplicitDefault(const T& value)
347     {
348         bool sourcing_complete;
349         typename TParam::EParamSource param_source;
350         TParam::GetDefault();
351         TParam::GetState(&sourcing_complete, &param_source);
352 
353         if (sourcing_complete && (param_source == TParam::eSource_Default)) {
354             SetDefault(value);
355         }
356     }
357 
358 private:
359     // TDescription is not publicly available in CParam, but it's needed for string to enum conversion.
360     // This templated method circumvents that shortcoming.
361     template <class TDescription>
SetDefaultImplSPSG_ParamValue362     static void SetDefaultImpl(const CParam<TDescription>&, const string& value)
363     {
364         SetDefault(CParam<TDescription>::TParamParser::StringToValue(value, TDescription::sm_ParamDescription));
365     }
366 
367     TValue m_Value;
368     _DEBUG_ARG(static bool sm_Used);
369 };
370 
371 _DEBUG_ARG(template <class TParam> bool SPSG_ParamValue<TParam>::sm_Used = false);
372 
373 #define PSG_PARAM_VALUE_TYPE(section, name) SPSG_ParamValue<NCBI_PARAM_TYPE(section, name)>
374 
375 NCBI_PARAM_DECL(unsigned, PSG, rd_buf_size);
376 typedef NCBI_PARAM_TYPE(PSG, rd_buf_size) TPSG_RdBufSize;
377 
378 NCBI_PARAM_DECL(size_t, PSG, wr_buf_size);
379 typedef NCBI_PARAM_TYPE(PSG, wr_buf_size) TPSG_WrBufSize;
380 
381 NCBI_PARAM_DECL(unsigned, PSG, max_concurrent_streams);
382 typedef NCBI_PARAM_TYPE(PSG, max_concurrent_streams) TPSG_MaxConcurrentStreams;
383 
384 NCBI_PARAM_DECL(unsigned, PSG, num_io);
385 typedef NCBI_PARAM_TYPE(PSG, num_io) TPSG_NumIo;
386 
387 NCBI_PARAM_DECL(unsigned, PSG, reader_timeout);
388 typedef NCBI_PARAM_TYPE(PSG, reader_timeout) TPSG_ReaderTimeout;
389 
390 NCBI_PARAM_DECL(double, PSG, rebalance_time);
391 typedef NCBI_PARAM_TYPE(PSG, rebalance_time) TPSG_RebalanceTime;
392 
393 NCBI_PARAM_DECL(unsigned, PSG, request_timeout);
394 using TPSG_RequestTimeout = PSG_PARAM_VALUE_TYPE(PSG, request_timeout);
395 
396 NCBI_PARAM_DECL(size_t, PSG, requests_per_io);
397 using TPSG_RequestsPerIo = PSG_PARAM_VALUE_TYPE(PSG, requests_per_io);
398 
399 NCBI_PARAM_DECL(unsigned, PSG, request_retries);
400 using TPSG_RequestRetries = PSG_PARAM_VALUE_TYPE(PSG, request_retries);
401 
402 NCBI_PARAM_DECL(string, PSG, request_user_args);
403 typedef NCBI_PARAM_TYPE(PSG, request_user_args) TPSG_RequestUserArgs;
404 
405 NCBI_PARAM_DECL(unsigned, PSG, localhost_preference);
406 typedef NCBI_PARAM_TYPE(PSG, localhost_preference) TPSG_LocalhostPreference;
407 
408 NCBI_PARAM_DECL(bool, PSG, fail_on_unknown_items);
409 typedef NCBI_PARAM_TYPE(PSG, fail_on_unknown_items) TPSG_FailOnUnknownItems;
410 
411 NCBI_PARAM_DECL(bool, PSG, https);
412 typedef NCBI_PARAM_TYPE(PSG, https) TPSG_Https;
413 
414 NCBI_PARAM_DECL(double, PSG, no_servers_retry_delay);
415 typedef NCBI_PARAM_TYPE(PSG, no_servers_retry_delay) TPSG_NoServersRetryDelay;
416 
417 NCBI_PARAM_DECL(double, PSG, throttle_relaxation_period);
418 using TPSG_ThrottlePeriod = NCBI_PARAM_TYPE(PSG, throttle_relaxation_period);
419 
420 NCBI_PARAM_DECL(unsigned, PSG, throttle_by_consecutive_connection_failures);
421 using TPSG_ThrottleMaxFailures = PSG_PARAM_VALUE_TYPE(PSG, throttle_by_consecutive_connection_failures);
422 
423 NCBI_PARAM_DECL(bool, PSG, throttle_hold_until_active_in_lb);
424 using TPSG_ThrottleUntilDiscovery = PSG_PARAM_VALUE_TYPE(PSG, throttle_hold_until_active_in_lb);
425 
426 NCBI_PARAM_DECL(string, PSG, throttle_by_connection_error_rate);
427 using TPSG_ThrottleThreshold = NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate);
428 
429 enum class EPSG_DebugPrintout { eNone, eSome, eAll };
430 NCBI_PARAM_ENUM_DECL(EPSG_DebugPrintout, PSG, debug_printout);
431 using TPSG_DebugPrintout = PSG_PARAM_VALUE_TYPE(PSG, debug_printout);
432 
433 enum class EPSG_UseCache { eDefault, eNo, eYes };
434 NCBI_PARAM_ENUM_DECL(EPSG_UseCache, PSG, use_cache);
435 using TPSG_UseCache = PSG_PARAM_VALUE_TYPE(PSG, use_cache);
436 
437 // Performance reporting/request IDs for psg_client app
438 enum class EPSG_PsgClientMode { eOff, eInteractive, ePerformance, eIo };
439 NCBI_PARAM_ENUM_DECL(EPSG_PsgClientMode, PSG, internal_psg_client_mode);
440 using TPSG_PsgClientMode = PSG_PARAM_VALUE_TYPE(PSG, internal_psg_client_mode);
441 
442 END_NCBI_SCOPE
443 
444 #endif
445