1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ 2 3 #ifndef IO_ENGINE_H 4 #define IO_ENGINE_H 5 6 #include "base/exception.hpp" 7 #include "base/lazy-init.hpp" 8 #include "base/logger.hpp" 9 #include "base/shared-object.hpp" 10 #include <atomic> 11 #include <exception> 12 #include <memory> 13 #include <thread> 14 #include <utility> 15 #include <vector> 16 #include <stdexcept> 17 #include <boost/exception/all.hpp> 18 #include <boost/asio/deadline_timer.hpp> 19 #include <boost/asio/io_context.hpp> 20 #include <boost/asio/spawn.hpp> 21 22 namespace icinga 23 { 24 25 /** 26 * Scope lock for CPU-bound work done in an I/O thread 27 * 28 * @ingroup base 29 */ 30 class CpuBoundWork 31 { 32 public: 33 CpuBoundWork(boost::asio::yield_context yc); 34 CpuBoundWork(const CpuBoundWork&) = delete; 35 CpuBoundWork(CpuBoundWork&&) = delete; 36 CpuBoundWork& operator=(const CpuBoundWork&) = delete; 37 CpuBoundWork& operator=(CpuBoundWork&&) = delete; 38 ~CpuBoundWork(); 39 40 void Done(); 41 42 private: 43 bool m_Done; 44 }; 45 46 /** 47 * Scope break for CPU-bound work done in an I/O thread 48 * 49 * @ingroup base 50 */ 51 class IoBoundWorkSlot 52 { 53 public: 54 IoBoundWorkSlot(boost::asio::yield_context yc); 55 IoBoundWorkSlot(const IoBoundWorkSlot&) = delete; 56 IoBoundWorkSlot(IoBoundWorkSlot&&) = delete; 57 IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete; 58 IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete; 59 ~IoBoundWorkSlot(); 60 61 private: 62 boost::asio::yield_context yc; 63 }; 64 65 /** 66 * Async I/O engine 67 * 68 * @ingroup base 69 */ 70 class IoEngine 71 { 72 friend CpuBoundWork; 73 friend IoBoundWorkSlot; 74 75 public: 76 IoEngine(const IoEngine&) = delete; 77 IoEngine(IoEngine&&) = delete; 78 IoEngine& operator=(const IoEngine&) = delete; 79 IoEngine& operator=(IoEngine&&) = delete; 80 ~IoEngine(); 81 82 static IoEngine& Get(); 83 84 boost::asio::io_context& GetIoContext(); 85 GetCoroutineStackSize()86 static inline size_t GetCoroutineStackSize() { 87 #ifdef _WIN32 88 // Increase the stack size for Windows coroutines to prevent exception corruption. 89 // Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431 90 return 8 * 1024 * 1024; 91 #else /* _WIN32 */ 92 // Increase the stack size for Linux/Unix coroutines for many JSON objects on the stack. 93 // This may help mitigate possible stack overflows. https://github.com/Icinga/icinga2/issues/7532 94 return 256 * 1024; 95 //return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB 96 #endif /* _WIN32 */ 97 } 98 99 template <typename Handler, typename Function> SpawnCoroutine(Handler & h,Function f)100 static void SpawnCoroutine(Handler& h, Function f) { 101 102 boost::asio::spawn(h, 103 [f](boost::asio::yield_context yc) { 104 105 try { 106 f(yc); 107 } catch (const boost::coroutines::detail::forced_unwind &) { 108 // Required for proper stack unwinding when coroutines are destroyed. 109 // https://github.com/boostorg/coroutine/issues/39 110 throw; 111 } catch (const std::exception& ex) { 112 Log(LogCritical, "IoEngine", "Exception in coroutine!"); 113 Log(LogDebug, "IoEngine") << "Exception in coroutine: " << DiagnosticInformation(ex); 114 } catch (...) { 115 Log(LogCritical, "IoEngine", "Exception in coroutine!"); 116 } 117 }, 118 boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size. 119 ); 120 } 121 122 private: 123 IoEngine(); 124 125 void RunEventLoop(); 126 127 static LazyInit<std::unique_ptr<IoEngine>> m_Instance; 128 129 boost::asio::io_context m_IoContext; 130 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive; 131 std::vector<std::thread> m_Threads; 132 boost::asio::deadline_timer m_AlreadyExpiredTimer; 133 std::atomic_int_fast32_t m_CpuBoundSemaphore; 134 }; 135 136 class TerminateIoThread : public std::exception 137 { 138 }; 139 140 /** 141 * Condition variable which doesn't block I/O threads 142 * 143 * @ingroup base 144 */ 145 class AsioConditionVariable 146 { 147 public: 148 AsioConditionVariable(boost::asio::io_context& io, bool init = false); 149 150 void Set(); 151 void Clear(); 152 void Wait(boost::asio::yield_context yc); 153 154 private: 155 boost::asio::deadline_timer m_Timer; 156 }; 157 158 /** 159 * I/O timeout emulator 160 * 161 * @ingroup base 162 */ 163 class Timeout : public SharedObject 164 { 165 public: 166 DECLARE_PTR_TYPEDEFS(Timeout); 167 168 template<class Executor, class TimeoutFromNow, class OnTimeout> Timeout(boost::asio::io_context & io,Executor & executor,TimeoutFromNow timeoutFromNow,OnTimeout onTimeout)169 Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout) 170 : m_Timer(io) 171 { 172 Ptr keepAlive (this); 173 174 m_Cancelled.store(false); 175 m_Timer.expires_from_now(std::move(timeoutFromNow)); 176 177 IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) { 178 if (m_Cancelled.load()) { 179 return; 180 } 181 182 { 183 boost::system::error_code ec; 184 185 m_Timer.async_wait(yc[ec]); 186 187 if (ec) { 188 return; 189 } 190 } 191 192 if (m_Cancelled.load()) { 193 return; 194 } 195 196 auto f (onTimeout); 197 f(std::move(yc)); 198 }); 199 } 200 201 void Cancel(); 202 203 private: 204 boost::asio::deadline_timer m_Timer; 205 std::atomic<bool> m_Cancelled; 206 }; 207 208 } 209 210 #endif /* IO_ENGINE_H */ 211