1 /*
2 * Copyright 2003-2020 The Music Player Daemon Project
3 * http://www.musicpd.org
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "Loop.hxx"
21 #include "TimerEvent.hxx"
22 #include "SocketEvent.hxx"
23 #include "IdleEvent.hxx"
24 #include "util/ScopeExit.hxx"
25
26 #ifdef HAVE_THREADED_EVENT_LOOP
27 #include "DeferEvent.hxx"
28 #endif
29
30 #ifdef HAVE_URING
31 #include "UringManager.hxx"
32 #include "util/PrintException.hxx"
33 #include <stdio.h>
34 #endif
35
36 constexpr bool
operator ()(const TimerEvent & a,const TimerEvent & b) const37 EventLoop::TimerCompare::operator()(const TimerEvent &a,
38 const TimerEvent &b) const noexcept
39 {
40 return a.due < b.due;
41 }
42
EventLoop(ThreadId _thread)43 EventLoop::EventLoop(
44 #ifdef HAVE_THREADED_EVENT_LOOP
45 ThreadId _thread
46 #endif
47 )
48 :
49 #ifdef HAVE_THREADED_EVENT_LOOP
50 wake_event(*this, BIND_THIS_METHOD(OnSocketReady)),
51 thread(_thread),
52 /* if this instance is hosted by an EventThread (no ThreadId
53 known yet) then we're not yet alive until the thread is
54 started; for the main EventLoop instance, we assume it's
55 already alive, because nobody but EventThread will call
56 SetAlive() */
57 alive(!_thread.IsNull()),
58 #endif
59 quit(false)
60 {
61 #ifdef HAVE_THREADED_EVENT_LOOP
62 wake_event.Open(SocketDescriptor(wake_fd.Get()));
63 #endif
64 }
65
~EventLoop()66 EventLoop::~EventLoop() noexcept
67 {
68 assert(idle.empty());
69 assert(timers.empty());
70 }
71
72 #ifdef HAVE_URING
73
74 Uring::Queue *
GetUring()75 EventLoop::GetUring() noexcept
76 {
77 if (!uring_initialized) {
78 uring_initialized = true;
79 try {
80 uring = std::make_unique<Uring::Manager>(*this);
81 } catch (...) {
82 fprintf(stderr, "Failed to initialize io_uring: ");
83 PrintException(std::current_exception());
84 }
85 }
86
87 return uring.get();
88 }
89
90 #endif
91
92 void
Break()93 EventLoop::Break() noexcept
94 {
95 if (quit.exchange(true))
96 return;
97
98 #ifdef HAVE_THREADED_EVENT_LOOP
99 wake_fd.Write();
100 #endif
101 }
102
103 bool
AbandonFD(int _fd)104 EventLoop::AbandonFD(int _fd) noexcept
105 {
106 #ifdef HAVE_THREADED_EVENT_LOOP
107 assert(!IsAlive() || IsInside());
108 #endif
109
110 return poll_group.Abandon(_fd);
111 }
112
113 bool
RemoveFD(int _fd)114 EventLoop::RemoveFD(int _fd) noexcept
115 {
116 #ifdef HAVE_THREADED_EVENT_LOOP
117 assert(!IsAlive() || IsInside());
118 #endif
119
120 return poll_group.Remove(_fd);
121 }
122
123 void
AddIdle(IdleEvent & i)124 EventLoop::AddIdle(IdleEvent &i) noexcept
125 {
126 assert(IsInside());
127
128 idle.push_back(i);
129 again = true;
130 }
131
132 void
RemoveIdle(IdleEvent & i)133 EventLoop::RemoveIdle(IdleEvent &i) noexcept
134 {
135 assert(IsInside());
136
137 idle.erase(idle.iterator_to(i));
138 }
139
140 void
AddTimer(TimerEvent & t,Event::Duration d)141 EventLoop::AddTimer(TimerEvent &t, Event::Duration d) noexcept
142 {
143 assert(IsInside());
144
145 t.due = now + d;
146 timers.insert(t);
147 again = true;
148 }
149
150 inline Event::Duration
HandleTimers()151 EventLoop::HandleTimers() noexcept
152 {
153 Event::Duration timeout;
154
155 while (!quit) {
156 auto i = timers.begin();
157 if (i == timers.end())
158 break;
159
160 TimerEvent &t = *i;
161 timeout = t.due - now;
162 if (timeout > timeout.zero())
163 return timeout;
164
165 timers.erase(i);
166
167 t.Run();
168 }
169
170 return Event::Duration(-1);
171 }
172
173 /**
174 * Convert the given timeout specification to a milliseconds integer,
175 * to be used by functions like poll() and epoll_wait(). Any negative
176 * value (= never times out) is translated to the magic value -1.
177 */
178 static constexpr int
ExportTimeoutMS(Event::Duration timeout)179 ExportTimeoutMS(Event::Duration timeout)
180 {
181 return timeout >= timeout.zero()
182 /* round up (+1) to avoid unnecessary wakeups */
183 ? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()) + 1
184 : -1;
185 }
186
187 inline bool
Wait(Event::Duration timeout)188 EventLoop::Wait(Event::Duration timeout) noexcept
189 {
190 const auto poll_result =
191 poll_group.ReadEvents(ExportTimeoutMS(timeout));
192
193 ready_sockets.clear();
194 for (size_t i = 0; i < poll_result.GetSize(); ++i) {
195 auto &s = *(SocketEvent *)poll_result.GetObject(i);
196 s.SetReadyFlags(poll_result.GetEvents(i));
197 ready_sockets.push_back(s);
198 }
199
200 return poll_result.GetSize() > 0;
201 }
202
203 void
Run()204 EventLoop::Run() noexcept
205 {
206 #ifdef HAVE_THREADED_EVENT_LOOP
207 if (thread.IsNull())
208 thread = ThreadId::GetCurrent();
209 #endif
210
211 assert(IsInside());
212 assert(!quit);
213 #ifdef HAVE_THREADED_EVENT_LOOP
214 assert(alive);
215 assert(busy);
216
217 wake_event.Schedule(SocketEvent::READ);
218 #endif
219
220 #ifdef HAVE_URING
221 AtScopeExit(this) {
222 /* make sure that the Uring::Manager gets destructed
223 from within the EventThread, or else its
224 destruction in another thread will cause assertion
225 failures */
226 uring.reset();
227 uring_initialized = false;
228 };
229 #endif
230
231 #ifdef HAVE_THREADED_EVENT_LOOP
232 AtScopeExit(this) {
233 wake_event.Cancel();
234 };
235 #endif
236
237 do {
238 now = std::chrono::steady_clock::now();
239 again = false;
240
241 /* invoke timers */
242
243 const auto timeout = HandleTimers();
244 if (quit)
245 break;
246
247 /* invoke idle */
248
249 while (!idle.empty()) {
250 IdleEvent &m = idle.front();
251 idle.pop_front();
252 m.Run();
253
254 if (quit)
255 return;
256 }
257
258 #ifdef HAVE_THREADED_EVENT_LOOP
259 /* try to handle DeferEvents without WakeFD
260 overhead */
261 {
262 const std::lock_guard<Mutex> lock(mutex);
263 HandleDeferred();
264 busy = false;
265
266 if (again)
267 /* re-evaluate timers because one of
268 the IdleEvents may have added a
269 new timeout */
270 continue;
271 }
272 #endif
273
274 /* wait for new event */
275
276 Wait(timeout);
277
278 now = std::chrono::steady_clock::now();
279
280 #ifdef HAVE_THREADED_EVENT_LOOP
281 {
282 const std::lock_guard<Mutex> lock(mutex);
283 busy = true;
284 }
285 #endif
286
287 /* invoke sockets */
288 while (!ready_sockets.empty() && !quit) {
289 auto &sm = ready_sockets.front();
290 ready_sockets.pop_front();
291
292 sm.Dispatch();
293 }
294 } while (!quit);
295
296 #ifdef HAVE_THREADED_EVENT_LOOP
297 #ifndef NDEBUG
298 assert(thread.IsInside());
299 #endif
300 #endif
301 }
302
303 #ifdef HAVE_THREADED_EVENT_LOOP
304
305 void
AddDeferred(DeferEvent & d)306 EventLoop::AddDeferred(DeferEvent &d) noexcept
307 {
308 bool must_wake;
309
310 {
311 const std::lock_guard<Mutex> lock(mutex);
312 if (d.IsPending())
313 return;
314
315 /* we don't need to wake up the EventLoop if another
316 DeferEvent has already done it */
317 must_wake = !busy && deferred.empty();
318
319 deferred.push_back(d);
320 again = true;
321 }
322
323 if (must_wake)
324 wake_fd.Write();
325 }
326
327 void
RemoveDeferred(DeferEvent & d)328 EventLoop::RemoveDeferred(DeferEvent &d) noexcept
329 {
330 const std::lock_guard<Mutex> protect(mutex);
331
332 if (d.IsPending())
333 deferred.erase(deferred.iterator_to(d));
334 }
335
336 void
HandleDeferred()337 EventLoop::HandleDeferred() noexcept
338 {
339 while (!deferred.empty() && !quit) {
340 auto &m = deferred.front();
341 assert(m.IsPending());
342
343 deferred.pop_front();
344
345 const ScopeUnlock unlock(mutex);
346 m.RunDeferred();
347 }
348 }
349
350 void
OnSocketReady(unsigned flags)351 EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
352 {
353 assert(IsInside());
354
355 wake_fd.Read();
356
357 const std::lock_guard<Mutex> lock(mutex);
358 HandleDeferred();
359 }
360
361 #endif
362