xref: /qemu/util/fdmon-io_uring.c (revision abff1abf)
1 /* SPDX-License-Identifier: GPL-2.0-or-later */
2 /*
3  * Linux io_uring file descriptor monitoring
4  *
5  * The Linux io_uring API supports file descriptor monitoring with a few
6  * advantages over existing APIs like poll(2) and epoll(7):
7  *
8  * 1. Userspace polling of events is possible because the completion queue (cq
9  *    ring) is shared between the kernel and userspace.  This allows
10  *    applications that rely on userspace polling to also monitor file
11  *    descriptors in the same userspace polling loop.
12  *
13  * 2. Submission and completion is batched and done together in a single system
14  *    call.  This minimizes the number of system calls.
15  *
16  * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
17  *    poll(2).
18  *
19  * 4. Nanosecond timeouts are supported so it requires fewer syscalls than
20  *    epoll(7).
21  *
22  * This code only monitors file descriptors and does not do asynchronous disk
23  * I/O.  Implementing disk I/O efficiently has other requirements and should
24  * use a separate io_uring so it does not make sense to unify the code.
25  *
26  * File descriptor monitoring is implemented using the following operations:
27  *
28  * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
29  * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored.  When
30  *    the poll mask changes for a file descriptor it is first removed and then
31  *    re-added with the new poll mask, so this operation is also used as part
32  *    of modifying an existing monitored file descriptor.
33  * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
34  *    for events.  This operation self-cancels if another event completes
35  *    before the timeout.
36  *
37  * io_uring calls the submission queue the "sq ring" and the completion queue
38  * the "cq ring".  Ring entries are called "sqe" and "cqe", respectively.
39  *
40  * The code is structured so that sq/cq rings are only modified within
41  * fdmon_io_uring_wait().  Changes to AioHandlers are made by enqueuing them on
42  * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
43  * and/or IORING_OP_POLL_REMOVE sqes for them.
44  */
45 
46 #include "qemu/osdep.h"
47 #include <poll.h>
48 #include "qemu/rcu_queue.h"
49 #include "aio-posix.h"
50 
51 enum {
52     FDMON_IO_URING_ENTRIES  = 128, /* sq/cq ring size */
53 
54     /* AioHandler::flags */
55     FDMON_IO_URING_PENDING  = (1 << 0),
56     FDMON_IO_URING_ADD      = (1 << 1),
57     FDMON_IO_URING_REMOVE   = (1 << 2),
58 };
59 
60 static inline int poll_events_from_pfd(int pfd_events)
61 {
62     return (pfd_events & G_IO_IN ? POLLIN : 0) |
63            (pfd_events & G_IO_OUT ? POLLOUT : 0) |
64            (pfd_events & G_IO_HUP ? POLLHUP : 0) |
65            (pfd_events & G_IO_ERR ? POLLERR : 0);
66 }
67 
68 static inline int pfd_events_from_poll(int poll_events)
69 {
70     return (poll_events & POLLIN ? G_IO_IN : 0) |
71            (poll_events & POLLOUT ? G_IO_OUT : 0) |
72            (poll_events & POLLHUP ? G_IO_HUP : 0) |
73            (poll_events & POLLERR ? G_IO_ERR : 0);
74 }
75 
76 /*
77  * Returns an sqe for submitting a request.  Only be called within
78  * fdmon_io_uring_wait().
79  */
80 static struct io_uring_sqe *get_sqe(AioContext *ctx)
81 {
82     struct io_uring *ring = &ctx->fdmon_io_uring;
83     struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
84     int ret;
85 
86     if (likely(sqe)) {
87         return sqe;
88     }
89 
90     /* No free sqes left, submit pending sqes first */
91     do {
92         ret = io_uring_submit(ring);
93     } while (ret == -EINTR);
94 
95     assert(ret > 1);
96     sqe = io_uring_get_sqe(ring);
97     assert(sqe);
98     return sqe;
99 }
100 
101 /* Atomically enqueue an AioHandler for sq ring submission */
102 static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
103 {
104     unsigned old_flags;
105 
106     old_flags = atomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
107     if (!(old_flags & FDMON_IO_URING_PENDING)) {
108         QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
109     }
110 }
111 
112 /* Dequeue an AioHandler for sq ring submission.  Called by fill_sq_ring(). */
113 static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
114 {
115     AioHandler *node = QSLIST_FIRST(head);
116 
117     if (!node) {
118         return NULL;
119     }
120 
121     /* Doesn't need to be atomic since fill_sq_ring() moves the list */
122     QSLIST_REMOVE_HEAD(head, node_submitted);
123 
124     /*
125      * Don't clear FDMON_IO_URING_REMOVE.  It's sticky so it can serve two
126      * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
127      * telling process_cqe() to delete the AioHandler when its
128      * IORING_OP_POLL_ADD completes.
129      */
130     *flags = atomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
131                                               FDMON_IO_URING_ADD));
132     return node;
133 }
134 
135 static void fdmon_io_uring_update(AioContext *ctx,
136                                   AioHandler *old_node,
137                                   AioHandler *new_node)
138 {
139     if (new_node) {
140         enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
141     }
142 
143     if (old_node) {
144         /*
145          * Deletion is tricky because IORING_OP_POLL_ADD and
146          * IORING_OP_POLL_REMOVE are async.  We need to wait for the original
147          * IORING_OP_POLL_ADD to complete before this handler can be freed
148          * safely.
149          *
150          * It's possible that the file descriptor becomes ready and the
151          * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
152          * submitted, too.
153          *
154          * Mark this handler deleted right now but don't place it on
155          * ctx->deleted_aio_handlers yet.  Instead, manually fudge the list
156          * entry to make QLIST_IS_INSERTED() think this handler has been
157          * inserted and other code recognizes this AioHandler as deleted.
158          *
159          * Once the original IORING_OP_POLL_ADD completes we enqueue the
160          * handler on the real ctx->deleted_aio_handlers list to be freed.
161          */
162         assert(!QLIST_IS_INSERTED(old_node, node_deleted));
163         old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;
164 
165         enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
166     }
167 }
168 
169 static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
170 {
171     struct io_uring_sqe *sqe = get_sqe(ctx);
172     int events = poll_events_from_pfd(node->pfd.events);
173 
174     io_uring_prep_poll_add(sqe, node->pfd.fd, events);
175     io_uring_sqe_set_data(sqe, node);
176 }
177 
178 static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
179 {
180     struct io_uring_sqe *sqe = get_sqe(ctx);
181 
182     io_uring_prep_poll_remove(sqe, node);
183 }
184 
185 /* Add a timeout that self-cancels when another cqe becomes ready */
186 static void add_timeout_sqe(AioContext *ctx, int64_t ns)
187 {
188     struct io_uring_sqe *sqe;
189     struct __kernel_timespec ts = {
190         .tv_sec = ns / NANOSECONDS_PER_SECOND,
191         .tv_nsec = ns % NANOSECONDS_PER_SECOND,
192     };
193 
194     sqe = get_sqe(ctx);
195     io_uring_prep_timeout(sqe, &ts, 1, 0);
196 }
197 
198 /* Add sqes from ctx->submit_list for submission */
199 static void fill_sq_ring(AioContext *ctx)
200 {
201     AioHandlerSList submit_list;
202     AioHandler *node;
203     unsigned flags;
204 
205     QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);
206 
207     while ((node = dequeue(&submit_list, &flags))) {
208         /* Order matters, just in case both flags were set */
209         if (flags & FDMON_IO_URING_ADD) {
210             add_poll_add_sqe(ctx, node);
211         }
212         if (flags & FDMON_IO_URING_REMOVE) {
213             add_poll_remove_sqe(ctx, node);
214         }
215     }
216 }
217 
218 /* Returns true if a handler became ready */
219 static bool process_cqe(AioContext *ctx,
220                         AioHandlerList *ready_list,
221                         struct io_uring_cqe *cqe)
222 {
223     AioHandler *node = io_uring_cqe_get_data(cqe);
224     unsigned flags;
225 
226     /* poll_timeout and poll_remove have a zero user_data field */
227     if (!node) {
228         return false;
229     }
230 
231     /*
232      * Deletion can only happen when IORING_OP_POLL_ADD completes.  If we race
233      * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
234      * bit before IORING_OP_POLL_REMOVE is submitted.
235      */
236     flags = atomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
237     if (flags & FDMON_IO_URING_REMOVE) {
238         QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
239         return false;
240     }
241 
242     aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));
243 
244     /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */
245     add_poll_add_sqe(ctx, node);
246     return true;
247 }
248 
249 static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
250 {
251     struct io_uring *ring = &ctx->fdmon_io_uring;
252     struct io_uring_cqe *cqe;
253     unsigned num_cqes = 0;
254     unsigned num_ready = 0;
255     unsigned head;
256 
257     io_uring_for_each_cqe(ring, head, cqe) {
258         if (process_cqe(ctx, ready_list, cqe)) {
259             num_ready++;
260         }
261 
262         num_cqes++;
263     }
264 
265     io_uring_cq_advance(ring, num_cqes);
266     return num_ready;
267 }
268 
269 static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
270                                int64_t timeout)
271 {
272     unsigned wait_nr = 1; /* block until at least one cqe is ready */
273     int ret;
274 
275     /* Fall back while external clients are disabled */
276     if (atomic_read(&ctx->external_disable_cnt)) {
277         return fdmon_poll_ops.wait(ctx, ready_list, timeout);
278     }
279 
280     if (timeout == 0) {
281         wait_nr = 0; /* non-blocking */
282     } else if (timeout > 0) {
283         add_timeout_sqe(ctx, timeout);
284     }
285 
286     fill_sq_ring(ctx);
287 
288     do {
289         ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
290     } while (ret == -EINTR);
291 
292     assert(ret >= 0);
293 
294     return process_cq_ring(ctx, ready_list);
295 }
296 
297 static bool fdmon_io_uring_need_wait(AioContext *ctx)
298 {
299     /* Have io_uring events completed? */
300     if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {
301         return true;
302     }
303 
304     /* Are there pending sqes to submit? */
305     if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
306         return true;
307     }
308 
309     /* Do we need to process AioHandlers for io_uring changes? */
310     if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {
311         return true;
312     }
313 
314     /* Are we falling back to fdmon-poll? */
315     return atomic_read(&ctx->external_disable_cnt);
316 }
317 
318 static const FDMonOps fdmon_io_uring_ops = {
319     .update = fdmon_io_uring_update,
320     .wait = fdmon_io_uring_wait,
321     .need_wait = fdmon_io_uring_need_wait,
322 };
323 
324 bool fdmon_io_uring_setup(AioContext *ctx)
325 {
326     int ret;
327 
328     ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
329     if (ret != 0) {
330         return false;
331     }
332 
333     QSLIST_INIT(&ctx->submit_list);
334     ctx->fdmon_ops = &fdmon_io_uring_ops;
335     return true;
336 }
337 
338 void fdmon_io_uring_destroy(AioContext *ctx)
339 {
340     if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
341         AioHandler *node;
342 
343         io_uring_queue_exit(&ctx->fdmon_io_uring);
344 
345         /* Move handlers due to be removed onto the deleted list */
346         while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
347             unsigned flags = atomic_fetch_and(&node->flags,
348                     ~(FDMON_IO_URING_PENDING |
349                       FDMON_IO_URING_ADD |
350                       FDMON_IO_URING_REMOVE));
351 
352             if (flags & FDMON_IO_URING_REMOVE) {
353                 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
354             }
355 
356             QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
357         }
358 
359         ctx->fdmon_ops = &fdmon_poll_ops;
360     }
361 }
362