1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #ifndef IO_ENGINE_H
4 #define IO_ENGINE_H
5 
6 #include "base/exception.hpp"
7 #include "base/lazy-init.hpp"
8 #include "base/logger.hpp"
9 #include "base/shared-object.hpp"
10 #include <atomic>
11 #include <exception>
12 #include <memory>
13 #include <thread>
14 #include <utility>
15 #include <vector>
16 #include <stdexcept>
17 #include <boost/exception/all.hpp>
18 #include <boost/asio/deadline_timer.hpp>
19 #include <boost/asio/io_context.hpp>
20 #include <boost/asio/spawn.hpp>
21 
22 namespace icinga
23 {
24 
25 /**
26  * Scope lock for CPU-bound work done in an I/O thread
27  *
28  * @ingroup base
29  */
30 class CpuBoundWork
31 {
32 public:
33 	CpuBoundWork(boost::asio::yield_context yc);
34 	CpuBoundWork(const CpuBoundWork&) = delete;
35 	CpuBoundWork(CpuBoundWork&&) = delete;
36 	CpuBoundWork& operator=(const CpuBoundWork&) = delete;
37 	CpuBoundWork& operator=(CpuBoundWork&&) = delete;
38 	~CpuBoundWork();
39 
40 	void Done();
41 
42 private:
43 	bool m_Done;
44 };
45 
46 /**
47  * Scope break for CPU-bound work done in an I/O thread
48  *
49  * @ingroup base
50  */
51 class IoBoundWorkSlot
52 {
53 public:
54 	IoBoundWorkSlot(boost::asio::yield_context yc);
55 	IoBoundWorkSlot(const IoBoundWorkSlot&) = delete;
56 	IoBoundWorkSlot(IoBoundWorkSlot&&) = delete;
57 	IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete;
58 	IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete;
59 	~IoBoundWorkSlot();
60 
61 private:
62 	boost::asio::yield_context yc;
63 };
64 
65 /**
66  * Async I/O engine
67  *
68  * @ingroup base
69  */
70 class IoEngine
71 {
72 	friend CpuBoundWork;
73 	friend IoBoundWorkSlot;
74 
75 public:
76 	IoEngine(const IoEngine&) = delete;
77 	IoEngine(IoEngine&&) = delete;
78 	IoEngine& operator=(const IoEngine&) = delete;
79 	IoEngine& operator=(IoEngine&&) = delete;
80 	~IoEngine();
81 
82 	static IoEngine& Get();
83 
84 	boost::asio::io_context& GetIoContext();
85 
GetCoroutineStackSize()86 	static inline size_t GetCoroutineStackSize() {
87 #ifdef _WIN32
88 		// Increase the stack size for Windows coroutines to prevent exception corruption.
89 		// Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431
90 		return 8 * 1024 * 1024;
91 #else /* _WIN32 */
92 		// Increase the stack size for Linux/Unix coroutines for many JSON objects on the stack.
93 		// This may help mitigate possible stack overflows. https://github.com/Icinga/icinga2/issues/7532
94 		return 256 * 1024;
95 		//return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB
96 #endif /* _WIN32 */
97 	}
98 
99 	template <typename Handler, typename Function>
SpawnCoroutine(Handler & h,Function f)100 	static void SpawnCoroutine(Handler& h, Function f) {
101 
102 		boost::asio::spawn(h,
103 			[f](boost::asio::yield_context yc) {
104 
105 				try {
106 					f(yc);
107 				} catch (const boost::coroutines::detail::forced_unwind &) {
108 					// Required for proper stack unwinding when coroutines are destroyed.
109 					// https://github.com/boostorg/coroutine/issues/39
110 					throw;
111 				} catch (const std::exception& ex) {
112 					Log(LogCritical, "IoEngine", "Exception in coroutine!");
113 					Log(LogDebug, "IoEngine") << "Exception in coroutine: " << DiagnosticInformation(ex);
114 				} catch (...) {
115 					Log(LogCritical, "IoEngine", "Exception in coroutine!");
116 				}
117 			},
118 			boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
119 		);
120 	}
121 
122 private:
123 	IoEngine();
124 
125 	void RunEventLoop();
126 
127 	static LazyInit<std::unique_ptr<IoEngine>> m_Instance;
128 
129 	boost::asio::io_context m_IoContext;
130 	boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
131 	std::vector<std::thread> m_Threads;
132 	boost::asio::deadline_timer m_AlreadyExpiredTimer;
133 	std::atomic_int_fast32_t m_CpuBoundSemaphore;
134 };
135 
136 class TerminateIoThread : public std::exception
137 {
138 };
139 
140 /**
141  * Condition variable which doesn't block I/O threads
142  *
143  * @ingroup base
144  */
145 class AsioConditionVariable
146 {
147 public:
148 	AsioConditionVariable(boost::asio::io_context& io, bool init = false);
149 
150 	void Set();
151 	void Clear();
152 	void Wait(boost::asio::yield_context yc);
153 
154 private:
155 	boost::asio::deadline_timer m_Timer;
156 };
157 
158 /**
159  * I/O timeout emulator
160  *
161  * @ingroup base
162  */
163 class Timeout : public SharedObject
164 {
165 public:
166 	DECLARE_PTR_TYPEDEFS(Timeout);
167 
168 	template<class Executor, class TimeoutFromNow, class OnTimeout>
Timeout(boost::asio::io_context & io,Executor & executor,TimeoutFromNow timeoutFromNow,OnTimeout onTimeout)169 	Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout)
170 		: m_Timer(io)
171 	{
172 		Ptr keepAlive (this);
173 
174 		m_Cancelled.store(false);
175 		m_Timer.expires_from_now(std::move(timeoutFromNow));
176 
177 		IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) {
178 			if (m_Cancelled.load()) {
179 				return;
180 			}
181 
182 			{
183 				boost::system::error_code ec;
184 
185 				m_Timer.async_wait(yc[ec]);
186 
187 				if (ec) {
188 					return;
189 				}
190 			}
191 
192 			if (m_Cancelled.load()) {
193 				return;
194 			}
195 
196 			auto f (onTimeout);
197 			f(std::move(yc));
198 		});
199 	}
200 
201 	void Cancel();
202 
203 private:
204 	boost::asio::deadline_timer m_Timer;
205 	std::atomic<bool> m_Cancelled;
206 };
207 
208 }
209 
210 #endif /* IO_ENGINE_H */
211