1 /*
2  * Copyright 2003-2020 The Music Player Daemon Project
3  * http://www.musicpd.org
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19 
20 #include "Loop.hxx"
21 #include "TimerEvent.hxx"
22 #include "SocketEvent.hxx"
23 #include "IdleEvent.hxx"
24 #include "util/ScopeExit.hxx"
25 
26 #ifdef HAVE_THREADED_EVENT_LOOP
27 #include "DeferEvent.hxx"
28 #endif
29 
30 #ifdef HAVE_URING
31 #include "UringManager.hxx"
32 #include "util/PrintException.hxx"
33 #include <stdio.h>
34 #endif
35 
36 constexpr bool
operator ()(const TimerEvent & a,const TimerEvent & b) const37 EventLoop::TimerCompare::operator()(const TimerEvent &a,
38 				    const TimerEvent &b) const noexcept
39 {
40 	return a.due < b.due;
41 }
42 
EventLoop(ThreadId _thread)43 EventLoop::EventLoop(
44 #ifdef HAVE_THREADED_EVENT_LOOP
45 		     ThreadId _thread
46 #endif
47 		     )
48 	:
49 #ifdef HAVE_THREADED_EVENT_LOOP
50 	wake_event(*this, BIND_THIS_METHOD(OnSocketReady)),
51 	 thread(_thread),
52 	 /* if this instance is hosted by an EventThread (no ThreadId
53 	    known yet) then we're not yet alive until the thread is
54 	    started; for the main EventLoop instance, we assume it's
55 	    already alive, because nobody but EventThread will call
56 	    SetAlive() */
57 	 alive(!_thread.IsNull()),
58 #endif
59 	 quit(false)
60 {
61 #ifdef HAVE_THREADED_EVENT_LOOP
62 	wake_event.Open(SocketDescriptor(wake_fd.Get()));
63 #endif
64 }
65 
~EventLoop()66 EventLoop::~EventLoop() noexcept
67 {
68 	assert(idle.empty());
69 	assert(timers.empty());
70 }
71 
72 #ifdef HAVE_URING
73 
74 Uring::Queue *
GetUring()75 EventLoop::GetUring() noexcept
76 {
77 	if (!uring_initialized) {
78 		uring_initialized = true;
79 		try {
80 			uring = std::make_unique<Uring::Manager>(*this);
81 		} catch (...) {
82 			fprintf(stderr, "Failed to initialize io_uring: ");
83 			PrintException(std::current_exception());
84 		}
85 	}
86 
87 	return uring.get();
88 }
89 
90 #endif
91 
92 void
Break()93 EventLoop::Break() noexcept
94 {
95 	if (quit.exchange(true))
96 		return;
97 
98 #ifdef HAVE_THREADED_EVENT_LOOP
99 	wake_fd.Write();
100 #endif
101 }
102 
103 bool
AbandonFD(int _fd)104 EventLoop::AbandonFD(int _fd)  noexcept
105 {
106 #ifdef HAVE_THREADED_EVENT_LOOP
107 	assert(!IsAlive() || IsInside());
108 #endif
109 
110 	return poll_group.Abandon(_fd);
111 }
112 
113 bool
RemoveFD(int _fd)114 EventLoop::RemoveFD(int _fd) noexcept
115 {
116 #ifdef HAVE_THREADED_EVENT_LOOP
117 	assert(!IsAlive() || IsInside());
118 #endif
119 
120 	return poll_group.Remove(_fd);
121 }
122 
123 void
AddIdle(IdleEvent & i)124 EventLoop::AddIdle(IdleEvent &i) noexcept
125 {
126 	assert(IsInside());
127 
128 	idle.push_back(i);
129 	again = true;
130 }
131 
132 void
RemoveIdle(IdleEvent & i)133 EventLoop::RemoveIdle(IdleEvent &i) noexcept
134 {
135 	assert(IsInside());
136 
137 	idle.erase(idle.iterator_to(i));
138 }
139 
140 void
AddTimer(TimerEvent & t,Event::Duration d)141 EventLoop::AddTimer(TimerEvent &t, Event::Duration d) noexcept
142 {
143 	assert(IsInside());
144 
145 	t.due = now + d;
146 	timers.insert(t);
147 	again = true;
148 }
149 
150 inline Event::Duration
HandleTimers()151 EventLoop::HandleTimers() noexcept
152 {
153 	Event::Duration timeout;
154 
155 	while (!quit) {
156 		auto i = timers.begin();
157 		if (i == timers.end())
158 			break;
159 
160 		TimerEvent &t = *i;
161 		timeout = t.due - now;
162 		if (timeout > timeout.zero())
163 			return timeout;
164 
165 		timers.erase(i);
166 
167 		t.Run();
168 	}
169 
170 	return Event::Duration(-1);
171 }
172 
173 /**
174  * Convert the given timeout specification to a milliseconds integer,
175  * to be used by functions like poll() and epoll_wait().  Any negative
176  * value (= never times out) is translated to the magic value -1.
177  */
178 static constexpr int
ExportTimeoutMS(Event::Duration timeout)179 ExportTimeoutMS(Event::Duration timeout)
180 {
181 	return timeout >= timeout.zero()
182 		/* round up (+1) to avoid unnecessary wakeups */
183 		? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()) + 1
184 		: -1;
185 }
186 
187 inline bool
Wait(Event::Duration timeout)188 EventLoop::Wait(Event::Duration timeout) noexcept
189 {
190 	const auto poll_result =
191 		poll_group.ReadEvents(ExportTimeoutMS(timeout));
192 
193 	ready_sockets.clear();
194 	for (size_t i = 0; i < poll_result.GetSize(); ++i) {
195 		auto &s = *(SocketEvent *)poll_result.GetObject(i);
196 		s.SetReadyFlags(poll_result.GetEvents(i));
197 		ready_sockets.push_back(s);
198 	}
199 
200 	return poll_result.GetSize() > 0;
201 }
202 
203 void
Run()204 EventLoop::Run() noexcept
205 {
206 #ifdef HAVE_THREADED_EVENT_LOOP
207 	if (thread.IsNull())
208 		thread = ThreadId::GetCurrent();
209 #endif
210 
211 	assert(IsInside());
212 	assert(!quit);
213 #ifdef HAVE_THREADED_EVENT_LOOP
214 	assert(alive);
215 	assert(busy);
216 
217 	wake_event.Schedule(SocketEvent::READ);
218 #endif
219 
220 #ifdef HAVE_URING
221 	AtScopeExit(this) {
222 		/* make sure that the Uring::Manager gets destructed
223 		   from within the EventThread, or else its
224 		   destruction in another thread will cause assertion
225 		   failures */
226 		uring.reset();
227 		uring_initialized = false;
228 	};
229 #endif
230 
231 #ifdef HAVE_THREADED_EVENT_LOOP
232 	AtScopeExit(this) {
233 		wake_event.Cancel();
234 	};
235 #endif
236 
237 	do {
238 		now = std::chrono::steady_clock::now();
239 		again = false;
240 
241 		/* invoke timers */
242 
243 		const auto timeout = HandleTimers();
244 		if (quit)
245 			break;
246 
247 		/* invoke idle */
248 
249 		while (!idle.empty()) {
250 			IdleEvent &m = idle.front();
251 			idle.pop_front();
252 			m.Run();
253 
254 			if (quit)
255 				return;
256 		}
257 
258 #ifdef HAVE_THREADED_EVENT_LOOP
259 		/* try to handle DeferEvents without WakeFD
260 		   overhead */
261 		{
262 			const std::lock_guard<Mutex> lock(mutex);
263 			HandleDeferred();
264 			busy = false;
265 
266 			if (again)
267 				/* re-evaluate timers because one of
268 				   the IdleEvents may have added a
269 				   new timeout */
270 				continue;
271 		}
272 #endif
273 
274 		/* wait for new event */
275 
276 		Wait(timeout);
277 
278 		now = std::chrono::steady_clock::now();
279 
280 #ifdef HAVE_THREADED_EVENT_LOOP
281 		{
282 			const std::lock_guard<Mutex> lock(mutex);
283 			busy = true;
284 		}
285 #endif
286 
287 		/* invoke sockets */
288 		while (!ready_sockets.empty() && !quit) {
289 			auto &sm = ready_sockets.front();
290 			ready_sockets.pop_front();
291 
292 			sm.Dispatch();
293 		}
294 	} while (!quit);
295 
296 #ifdef HAVE_THREADED_EVENT_LOOP
297 #ifndef NDEBUG
298 	assert(thread.IsInside());
299 #endif
300 #endif
301 }
302 
303 #ifdef HAVE_THREADED_EVENT_LOOP
304 
305 void
AddDeferred(DeferEvent & d)306 EventLoop::AddDeferred(DeferEvent &d) noexcept
307 {
308 	bool must_wake;
309 
310 	{
311 		const std::lock_guard<Mutex> lock(mutex);
312 		if (d.IsPending())
313 			return;
314 
315 		/* we don't need to wake up the EventLoop if another
316 		   DeferEvent has already done it */
317 		must_wake = !busy && deferred.empty();
318 
319 		deferred.push_back(d);
320 		again = true;
321 	}
322 
323 	if (must_wake)
324 		wake_fd.Write();
325 }
326 
327 void
RemoveDeferred(DeferEvent & d)328 EventLoop::RemoveDeferred(DeferEvent &d) noexcept
329 {
330 	const std::lock_guard<Mutex> protect(mutex);
331 
332 	if (d.IsPending())
333 		deferred.erase(deferred.iterator_to(d));
334 }
335 
336 void
HandleDeferred()337 EventLoop::HandleDeferred() noexcept
338 {
339 	while (!deferred.empty() && !quit) {
340 		auto &m = deferred.front();
341 		assert(m.IsPending());
342 
343 		deferred.pop_front();
344 
345 		const ScopeUnlock unlock(mutex);
346 		m.RunDeferred();
347 	}
348 }
349 
350 void
OnSocketReady(unsigned flags)351 EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
352 {
353 	assert(IsInside());
354 
355 	wake_fd.Read();
356 
357 	const std::lock_guard<Mutex> lock(mutex);
358 	HandleDeferred();
359 }
360 
361 #endif
362