1 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__CLIENT__IMPL__MISC__HPP 2 #define OBJTOOLS__PUBSEQ_GATEWAY__CLIENT__IMPL__MISC__HPP 3 4 /* $Id: misc.hpp 628920 2021-04-07 18:46:32Z ivanov $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Authors: Rafael Sadyrov 30 * 31 */ 32 33 #include <atomic> 34 #include <chrono> 35 #include <condition_variable> 36 #include <memory> 37 #include <mutex> 38 #include <thread> 39 40 #include <corelib/ncbitime.hpp> 41 #include <corelib/ncbi_param.hpp> 42 43 BEGIN_NCBI_SCOPE 44 45 template <int ITERS, class TBase> 46 struct SPSG_CV_Base : TBase 47 { GetMutexSPSG_CV_Base48 mutex& GetMutex() { return TBase::m_Mutex; } 49 }; 50 51 template <int ITERS> 52 struct SPSG_CV_Base<ITERS, void> 53 { 54 }; 55 56 template <> 57 struct SPSG_CV_Base<0, void> 58 { GetMutexSPSG_CV_Base59 mutex& GetMutex() { return m_Mutex; } 60 61 private: 62 mutex m_Mutex; 63 }; 64 65 template <int ITERS, typename TBase = void> 66 struct SPSG_CV : SPSG_CV_Base<ITERS, TBase> 67 { 68 private: 69 template <int I, typename NA = void> 70 struct SImpl 71 { SImplSPSG_CV::SImpl72 SImpl(SPSG_CV_Base<I, TBase>&) {} 73 NotifyOneSPSG_CV::SImpl74 void NotifyOne() {} NotifyAllSPSG_CV::SImpl75 void NotifyAll() {} 76 WaitUntilSPSG_CV::SImpl77 bool WaitUntil(const CDeadline& deadline) 78 { 79 return x_WaitUntil() || !deadline.IsExpired(); 80 } 81 82 template <typename T = bool> WaitUntilSPSG_CV::SImpl83 bool WaitUntil(const volatile atomic<T>& a, const CDeadline& deadline, T v = false, bool rv = false) 84 { 85 return x_WaitUntil() || (a != v ? rv : !deadline.IsExpired()); 86 } 87 88 private: x_WaitUntilSPSG_CV::SImpl89 bool x_WaitUntil() 90 { 91 constexpr auto kWait = chrono::microseconds(10); 92 this_thread::sleep_for(kWait); 93 94 if (m_I-- > 0) { 95 return true; 96 } 97 98 m_I += I; 99 return false; 100 } 101 102 int m_I = I; 103 }; 104 105 template <typename NA> 106 struct SImpl<0, NA> 107 { 108 using clock = chrono::system_clock; 109 SImplSPSG_CV::SImpl110 SImpl(SPSG_CV_Base<0, TBase>& base) : m_Base(base), m_Signal(0) {} 111 NotifyOneSPSG_CV::SImpl112 void NotifyOne() { x_Signal(); m_CV.notify_one(); } NotifyAllSPSG_CV::SImpl113 void NotifyAll() { x_Signal(); m_CV.notify_all(); } 114 WaitUntilSPSG_CV::SImpl115 bool WaitUntil(const CDeadline& deadline) 116 { 117 return deadline.IsInfinite() ? x_Wait() : x_Wait(x_GetTP(deadline)); 118 } 119 120 template <typename T = bool> WaitUntilSPSG_CV::SImpl121 bool WaitUntil(const volatile atomic<T>& a, const CDeadline& deadline, T v = false, bool rv = false) 122 { 123 constexpr auto kWait = chrono::milliseconds(100); 124 const auto until = deadline.IsInfinite() ? clock::time_point::max() : x_GetTP(deadline); 125 const auto max = clock::now() + kWait; 126 127 do { 128 if (until < max) { 129 return x_Wait(until); 130 } 131 132 if (x_Wait(max)) { 133 return true; 134 } 135 } 136 while (a == v); 137 138 return rv; 139 } 140 141 private: x_GetTPSPSG_CV::SImpl142 static clock::time_point x_GetTP(const CDeadline& d) 143 { 144 time_t seconds; 145 unsigned int nanoseconds; 146 147 d.GetExpirationTime(&seconds, &nanoseconds); 148 const auto ns = chrono::duration_cast<clock::duration>(chrono::nanoseconds(nanoseconds)); 149 return clock::from_time_t(seconds) + ns; 150 } 151 152 template <class... TArgs> x_WaitSPSG_CV::SImpl153 bool x_Wait(TArgs&&... args) 154 { 155 unique_lock<mutex> lock(m_Base.GetMutex()); 156 auto p = [&](){ return m_Signal > 0; }; 157 158 if (!x_CvWait(lock, p, forward<TArgs>(args)...)) return false; 159 160 m_Signal--; 161 return true; 162 } 163 164 template <class TL, class TP, class TT> x_CvWaitSPSG_CV::SImpl165 bool x_CvWait(TL& l, TP p, const TT& t) 166 { 167 return m_CV.wait_until(l, t, p); 168 } 169 170 template <class TL, class TP> x_CvWaitSPSG_CV::SImpl171 bool x_CvWait(TL& l, TP p) 172 { 173 m_CV.wait(l, p); return true; 174 } 175 x_SignalSPSG_CV::SImpl176 void x_Signal() 177 { 178 lock_guard<mutex> lock(m_Base.GetMutex()); 179 m_Signal++; 180 } 181 182 SPSG_CV_Base<ITERS, TBase>& m_Base; 183 condition_variable m_CV; 184 int m_Signal; 185 }; 186 187 public: SPSG_CVSPSG_CV188 SPSG_CV() : m_Impl(*this) {} 189 NotifyOneSPSG_CV190 void NotifyOne() volatile { GetImpl().NotifyOne(); } NotifyAllSPSG_CV191 void NotifyAll() volatile { GetImpl().NotifyAll(); } 192 193 template <class... TArgs> WaitUntilSPSG_CV194 bool WaitUntil(TArgs&&... args) volatile 195 { 196 return GetImpl().WaitUntil(forward<TArgs>(args)...); 197 } 198 199 private: 200 using TImpl = SImpl<ITERS>; 201 GetImplSPSG_CV202 TImpl& GetImpl() volatile { return const_cast<TImpl&>(m_Impl); } 203 204 TImpl m_Impl; 205 }; 206 207 template <class TValue> 208 struct CPSG_Stack 209 { 210 private: 211 struct TElement 212 { 213 shared_ptr<TElement> next; 214 TValue value; 215 216 template <class... TArgs> TElementCPSG_Stack::TElement217 TElement(shared_ptr<TElement> n, TArgs&&... args) : next(n), value(forward<TArgs>(args)...) {} 218 }; 219 220 public: ~CPSG_StackCPSG_Stack221 ~CPSG_Stack() { Clear(); } 222 223 template <class... TArgs> EmplaceCPSG_Stack224 void Emplace(TArgs&&... args) 225 { 226 auto head = make_shared<TElement>(atomic_load(&m_Head), forward<TArgs>(args)...); 227 228 while (!atomic_compare_exchange_weak(&m_Head, &head->next, head)); 229 } 230 PushCPSG_Stack231 void Push(TValue value) 232 { 233 auto head = make_shared<TElement>(atomic_load(&m_Head), move(value)); 234 235 while (!atomic_compare_exchange_weak(&m_Head, &head->next, head)); 236 } 237 PopCPSG_Stack238 bool Pop(TValue& value) 239 { 240 while (auto head = atomic_load(&m_Head)) { 241 if (atomic_compare_exchange_weak(&m_Head, &head, head->next)) { 242 value = move(head->value); 243 return true; 244 } 245 } 246 247 return false; 248 } 249 ClearCPSG_Stack250 void Clear() 251 { 252 while (auto head = atomic_load(&m_Head)) { 253 if (atomic_compare_exchange_weak(&m_Head, &head, {})) { 254 while (auto old_head = head) { 255 head = head->next; 256 } 257 } 258 } 259 } 260 EmptyCPSG_Stack261 bool Empty() const { return !atomic_load(&m_Head); } 262 263 private: 264 shared_ptr<TElement> m_Head; 265 }; 266 267 template <class TValue> 268 struct CPSG_WaitingStack : private CPSG_Stack<TValue> 269 { CPSG_WaitingStackCPSG_WaitingStack270 CPSG_WaitingStack() : m_Stopped(false) {} 271 272 template <class... TArgs> EmplaceCPSG_WaitingStack273 void Emplace(TArgs&&... args) 274 { 275 if (m_Stopped) return; 276 277 CPSG_Stack<TValue>::Emplace(forward<TArgs>(args)...); 278 m_CV.NotifyOne(); 279 } 280 PushCPSG_WaitingStack281 void Push(TValue value) 282 { 283 if (m_Stopped) return; 284 285 CPSG_Stack<TValue>::Push(move(value)); 286 m_CV.NotifyOne(); 287 } 288 PopCPSG_WaitingStack289 bool Pop(TValue& value, const CDeadline& deadline = CDeadline::eInfinite) 290 { 291 do { 292 if (CPSG_Stack<TValue>::Pop(value)) { 293 return true; 294 } 295 } 296 while (m_CV.WaitUntil(m_Stopped, deadline)); 297 298 return false; 299 } 300 301 enum EStop { eDrain, eClear }; StopCPSG_WaitingStack302 void Stop(EStop stop) 303 { 304 m_Stopped.store(true); 305 if (stop == eClear) CPSG_Stack<TValue>::Clear(); 306 m_CV.NotifyAll(); 307 } 308 StoppedCPSG_WaitingStack309 const atomic_bool& Stopped() const { return m_Stopped; } EmptyCPSG_WaitingStack310 bool Empty() const { return m_Stopped && CPSG_Stack<TValue>::Empty(); } 311 312 private: 313 SPSG_CV<0> m_CV; 314 atomic_bool m_Stopped; 315 }; 316 317 template <class TParam> 318 struct SPSG_ParamValue 319 { 320 using TValue = typename TParam::TValueType; 321 322 // Getting default incurs some performance penalty, so this ctor is explicit 323 enum EGetDefault { eGetDefault }; SPSG_ParamValueSPSG_ParamValue324 explicit SPSG_ParamValue(EGetDefault) : m_Value(TParam::GetDefault()) { _DEBUG_ARG(sm_Used = true); } 325 operator TValueSPSG_ParamValue326 operator TValue() const { return m_Value; } 327 GetDefaultSPSG_ParamValue328 static TValue GetDefault() { return TParam::GetDefault(); } 329 330 template <typename T> SetDefaultSPSG_ParamValue331 static void SetDefault(const T& value) 332 { 333 // Forbid setting after it's already used 334 _ASSERT(!sm_Used); 335 336 TParam::SetDefault(static_cast<TValue>(value)); 337 } 338 SetDefaultSPSG_ParamValue339 static void SetDefault(const string& value) 340 { 341 SetDefaultImpl(TParam(), value); 342 } 343 344 // Overriding default but only if it's not configured explicitly 345 template <typename T> SetImplicitDefaultSPSG_ParamValue346 static void SetImplicitDefault(const T& value) 347 { 348 bool sourcing_complete; 349 typename TParam::EParamSource param_source; 350 TParam::GetDefault(); 351 TParam::GetState(&sourcing_complete, ¶m_source); 352 353 if (sourcing_complete && (param_source == TParam::eSource_Default)) { 354 SetDefault(value); 355 } 356 } 357 358 private: 359 // TDescription is not publicly available in CParam, but it's needed for string to enum conversion. 360 // This templated method circumvents that shortcoming. 361 template <class TDescription> SetDefaultImplSPSG_ParamValue362 static void SetDefaultImpl(const CParam<TDescription>&, const string& value) 363 { 364 SetDefault(CParam<TDescription>::TParamParser::StringToValue(value, TDescription::sm_ParamDescription)); 365 } 366 367 TValue m_Value; 368 _DEBUG_ARG(static bool sm_Used); 369 }; 370 371 _DEBUG_ARG(template <class TParam> bool SPSG_ParamValue<TParam>::sm_Used = false); 372 373 #define PSG_PARAM_VALUE_TYPE(section, name) SPSG_ParamValue<NCBI_PARAM_TYPE(section, name)> 374 375 NCBI_PARAM_DECL(unsigned, PSG, rd_buf_size); 376 typedef NCBI_PARAM_TYPE(PSG, rd_buf_size) TPSG_RdBufSize; 377 378 NCBI_PARAM_DECL(size_t, PSG, wr_buf_size); 379 typedef NCBI_PARAM_TYPE(PSG, wr_buf_size) TPSG_WrBufSize; 380 381 NCBI_PARAM_DECL(unsigned, PSG, max_concurrent_streams); 382 typedef NCBI_PARAM_TYPE(PSG, max_concurrent_streams) TPSG_MaxConcurrentStreams; 383 384 NCBI_PARAM_DECL(unsigned, PSG, num_io); 385 typedef NCBI_PARAM_TYPE(PSG, num_io) TPSG_NumIo; 386 387 NCBI_PARAM_DECL(unsigned, PSG, reader_timeout); 388 typedef NCBI_PARAM_TYPE(PSG, reader_timeout) TPSG_ReaderTimeout; 389 390 NCBI_PARAM_DECL(double, PSG, rebalance_time); 391 typedef NCBI_PARAM_TYPE(PSG, rebalance_time) TPSG_RebalanceTime; 392 393 NCBI_PARAM_DECL(unsigned, PSG, request_timeout); 394 using TPSG_RequestTimeout = PSG_PARAM_VALUE_TYPE(PSG, request_timeout); 395 396 NCBI_PARAM_DECL(size_t, PSG, requests_per_io); 397 using TPSG_RequestsPerIo = PSG_PARAM_VALUE_TYPE(PSG, requests_per_io); 398 399 NCBI_PARAM_DECL(unsigned, PSG, request_retries); 400 using TPSG_RequestRetries = PSG_PARAM_VALUE_TYPE(PSG, request_retries); 401 402 NCBI_PARAM_DECL(string, PSG, request_user_args); 403 typedef NCBI_PARAM_TYPE(PSG, request_user_args) TPSG_RequestUserArgs; 404 405 NCBI_PARAM_DECL(unsigned, PSG, localhost_preference); 406 typedef NCBI_PARAM_TYPE(PSG, localhost_preference) TPSG_LocalhostPreference; 407 408 NCBI_PARAM_DECL(bool, PSG, fail_on_unknown_items); 409 typedef NCBI_PARAM_TYPE(PSG, fail_on_unknown_items) TPSG_FailOnUnknownItems; 410 411 NCBI_PARAM_DECL(bool, PSG, https); 412 typedef NCBI_PARAM_TYPE(PSG, https) TPSG_Https; 413 414 NCBI_PARAM_DECL(double, PSG, no_servers_retry_delay); 415 typedef NCBI_PARAM_TYPE(PSG, no_servers_retry_delay) TPSG_NoServersRetryDelay; 416 417 NCBI_PARAM_DECL(double, PSG, throttle_relaxation_period); 418 using TPSG_ThrottlePeriod = NCBI_PARAM_TYPE(PSG, throttle_relaxation_period); 419 420 NCBI_PARAM_DECL(unsigned, PSG, throttle_by_consecutive_connection_failures); 421 using TPSG_ThrottleMaxFailures = PSG_PARAM_VALUE_TYPE(PSG, throttle_by_consecutive_connection_failures); 422 423 NCBI_PARAM_DECL(bool, PSG, throttle_hold_until_active_in_lb); 424 using TPSG_ThrottleUntilDiscovery = PSG_PARAM_VALUE_TYPE(PSG, throttle_hold_until_active_in_lb); 425 426 NCBI_PARAM_DECL(string, PSG, throttle_by_connection_error_rate); 427 using TPSG_ThrottleThreshold = NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate); 428 429 enum class EPSG_DebugPrintout { eNone, eSome, eAll }; 430 NCBI_PARAM_ENUM_DECL(EPSG_DebugPrintout, PSG, debug_printout); 431 using TPSG_DebugPrintout = PSG_PARAM_VALUE_TYPE(PSG, debug_printout); 432 433 enum class EPSG_UseCache { eDefault, eNo, eYes }; 434 NCBI_PARAM_ENUM_DECL(EPSG_UseCache, PSG, use_cache); 435 using TPSG_UseCache = PSG_PARAM_VALUE_TYPE(PSG, use_cache); 436 437 // Performance reporting/request IDs for psg_client app 438 enum class EPSG_PsgClientMode { eOff, eInteractive, ePerformance, eIo }; 439 NCBI_PARAM_ENUM_DECL(EPSG_PsgClientMode, PSG, internal_psg_client_mode); 440 using TPSG_PsgClientMode = PSG_PARAM_VALUE_TYPE(PSG, internal_psg_client_mode); 441 442 END_NCBI_SCOPE 443 444 #endif 445