xref: /qemu/util/aio-posix.c (revision dcc474c6)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  * Contributions after 2012-01-13 are licensed under the terms of the
13  * GNU GPL, version 2 or (at your option) any later version.
14  */
15 
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "qemu/rcu.h"
19 #include "qemu/rcu_queue.h"
20 #include "qemu/sockets.h"
21 #include "qemu/cutils.h"
22 #include "trace.h"
23 #ifdef CONFIG_EPOLL_CREATE1
24 #include <sys/epoll.h>
25 #endif
26 
27 struct AioHandler
28 {
29     GPollFD pfd;
30     IOHandler *io_read;
31     IOHandler *io_write;
32     AioPollFn *io_poll;
33     IOHandler *io_poll_begin;
34     IOHandler *io_poll_end;
35     void *opaque;
36     bool is_external;
37     QLIST_ENTRY(AioHandler) node;
38     QLIST_ENTRY(AioHandler) node_ready; /* only used during aio_poll() */
39     QLIST_ENTRY(AioHandler) node_deleted;
40 };
41 
42 /* Add a handler to a ready list */
43 static void add_ready_handler(AioHandlerList *ready_list,
44                               AioHandler *node,
45                               int revents)
46 {
47     QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
48     node->pfd.revents = revents;
49     QLIST_INSERT_HEAD(ready_list, node, node_ready);
50 }
51 
52 #ifdef CONFIG_EPOLL_CREATE1
53 
54 /* The fd number threshold to switch to epoll */
55 #define EPOLL_ENABLE_THRESHOLD 64
56 
57 static void aio_epoll_disable(AioContext *ctx)
58 {
59     ctx->epoll_enabled = false;
60     if (!ctx->epoll_available) {
61         return;
62     }
63     ctx->epoll_available = false;
64     close(ctx->epollfd);
65 }
66 
67 static inline int epoll_events_from_pfd(int pfd_events)
68 {
69     return (pfd_events & G_IO_IN ? EPOLLIN : 0) |
70            (pfd_events & G_IO_OUT ? EPOLLOUT : 0) |
71            (pfd_events & G_IO_HUP ? EPOLLHUP : 0) |
72            (pfd_events & G_IO_ERR ? EPOLLERR : 0);
73 }
74 
75 static bool aio_epoll_try_enable(AioContext *ctx)
76 {
77     AioHandler *node;
78     struct epoll_event event;
79 
80     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
81         int r;
82         if (QLIST_IS_INSERTED(node, node_deleted) || !node->pfd.events) {
83             continue;
84         }
85         event.events = epoll_events_from_pfd(node->pfd.events);
86         event.data.ptr = node;
87         r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event);
88         if (r) {
89             return false;
90         }
91     }
92     ctx->epoll_enabled = true;
93     return true;
94 }
95 
96 static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
97 {
98     struct epoll_event event;
99     int r;
100     int ctl;
101 
102     if (!ctx->epoll_enabled) {
103         return;
104     }
105     if (!node->pfd.events) {
106         ctl = EPOLL_CTL_DEL;
107     } else {
108         event.data.ptr = node;
109         event.events = epoll_events_from_pfd(node->pfd.events);
110         ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
111     }
112 
113     r = epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);
114     if (r) {
115         aio_epoll_disable(ctx);
116     }
117 }
118 
119 static int aio_epoll(AioContext *ctx, AioHandlerList *ready_list,
120                      int64_t timeout)
121 {
122     GPollFD pfd = {
123         .fd = ctx->epollfd,
124         .events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR,
125     };
126     AioHandler *node;
127     int i, ret = 0;
128     struct epoll_event events[128];
129 
130     if (timeout > 0) {
131         ret = qemu_poll_ns(&pfd, 1, timeout);
132         if (ret > 0) {
133             timeout = 0;
134         }
135     }
136     if (timeout <= 0 || ret > 0) {
137         ret = epoll_wait(ctx->epollfd, events,
138                          ARRAY_SIZE(events),
139                          timeout);
140         if (ret <= 0) {
141             goto out;
142         }
143         for (i = 0; i < ret; i++) {
144             int ev = events[i].events;
145             int revents = (ev & EPOLLIN ? G_IO_IN : 0) |
146                           (ev & EPOLLOUT ? G_IO_OUT : 0) |
147                           (ev & EPOLLHUP ? G_IO_HUP : 0) |
148                           (ev & EPOLLERR ? G_IO_ERR : 0);
149 
150             node = events[i].data.ptr;
151             add_ready_handler(ready_list, node, revents);
152         }
153     }
154 out:
155     return ret;
156 }
157 
158 static bool aio_epoll_enabled(AioContext *ctx)
159 {
160     /* Fall back to ppoll when external clients are disabled. */
161     return !aio_external_disabled(ctx) && ctx->epoll_enabled;
162 }
163 
164 static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
165                                  unsigned npfd, int64_t timeout)
166 {
167     if (!ctx->epoll_available) {
168         return false;
169     }
170     if (aio_epoll_enabled(ctx)) {
171         return true;
172     }
173     if (npfd >= EPOLL_ENABLE_THRESHOLD) {
174         if (aio_epoll_try_enable(ctx)) {
175             return true;
176         } else {
177             aio_epoll_disable(ctx);
178         }
179     }
180     return false;
181 }
182 
183 #else
184 
185 static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
186 {
187 }
188 
189 static int aio_epoll(AioContext *ctx, AioHandlerList *ready_list,
190                      int64_t timeout)
191 {
192     assert(false);
193 }
194 
195 static bool aio_epoll_enabled(AioContext *ctx)
196 {
197     return false;
198 }
199 
200 static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
201                           unsigned npfd, int64_t timeout)
202 {
203     return false;
204 }
205 
206 #endif
207 
208 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
209 {
210     AioHandler *node;
211 
212     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
213         if (node->pfd.fd == fd) {
214             if (!QLIST_IS_INSERTED(node, node_deleted)) {
215                 return node;
216             }
217         }
218     }
219 
220     return NULL;
221 }
222 
223 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
224 {
225     /* If the GSource is in the process of being destroyed then
226      * g_source_remove_poll() causes an assertion failure.  Skip
227      * removal in that case, because glib cleans up its state during
228      * destruction anyway.
229      */
230     if (!g_source_is_destroyed(&ctx->source)) {
231         g_source_remove_poll(&ctx->source, &node->pfd);
232     }
233 
234     /* If a read is in progress, just mark the node as deleted */
235     if (qemu_lockcnt_count(&ctx->list_lock)) {
236         QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
237         node->pfd.revents = 0;
238         return false;
239     }
240     /* Otherwise, delete it for real.  We can't just mark it as
241      * deleted because deleted nodes are only cleaned up while
242      * no one is walking the handlers list.
243      */
244     QLIST_REMOVE(node, node);
245     return true;
246 }
247 
248 void aio_set_fd_handler(AioContext *ctx,
249                         int fd,
250                         bool is_external,
251                         IOHandler *io_read,
252                         IOHandler *io_write,
253                         AioPollFn *io_poll,
254                         void *opaque)
255 {
256     AioHandler *node;
257     AioHandler *new_node = NULL;
258     bool is_new = false;
259     bool deleted = false;
260     int poll_disable_change;
261 
262     qemu_lockcnt_lock(&ctx->list_lock);
263 
264     node = find_aio_handler(ctx, fd);
265 
266     /* Are we deleting the fd handler? */
267     if (!io_read && !io_write && !io_poll) {
268         if (node == NULL) {
269             qemu_lockcnt_unlock(&ctx->list_lock);
270             return;
271         }
272         /* Clean events in order to unregister fd from the ctx epoll. */
273         node->pfd.events = 0;
274 
275         poll_disable_change = -!node->io_poll;
276     } else {
277         poll_disable_change = !io_poll - (node && !node->io_poll);
278         if (node == NULL) {
279             is_new = true;
280         }
281         /* Alloc and insert if it's not already there */
282         new_node = g_new0(AioHandler, 1);
283 
284         /* Update handler with latest information */
285         new_node->io_read = io_read;
286         new_node->io_write = io_write;
287         new_node->io_poll = io_poll;
288         new_node->opaque = opaque;
289         new_node->is_external = is_external;
290 
291         if (is_new) {
292             new_node->pfd.fd = fd;
293         } else {
294             new_node->pfd = node->pfd;
295         }
296         g_source_add_poll(&ctx->source, &new_node->pfd);
297 
298         new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
299         new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
300 
301         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
302     }
303     if (node) {
304         deleted = aio_remove_fd_handler(ctx, node);
305     }
306 
307     /* No need to order poll_disable_cnt writes against other updates;
308      * the counter is only used to avoid wasting time and latency on
309      * iterated polling when the system call will be ultimately necessary.
310      * Changing handlers is a rare event, and a little wasted polling until
311      * the aio_notify below is not an issue.
312      */
313     atomic_set(&ctx->poll_disable_cnt,
314                atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
315 
316     if (new_node) {
317         aio_epoll_update(ctx, new_node, is_new);
318     } else if (node) {
319         /* Unregister deleted fd_handler */
320         aio_epoll_update(ctx, node, false);
321     }
322     qemu_lockcnt_unlock(&ctx->list_lock);
323     aio_notify(ctx);
324 
325     if (deleted) {
326         g_free(node);
327     }
328 }
329 
330 void aio_set_fd_poll(AioContext *ctx, int fd,
331                      IOHandler *io_poll_begin,
332                      IOHandler *io_poll_end)
333 {
334     AioHandler *node = find_aio_handler(ctx, fd);
335 
336     if (!node) {
337         return;
338     }
339 
340     node->io_poll_begin = io_poll_begin;
341     node->io_poll_end = io_poll_end;
342 }
343 
344 void aio_set_event_notifier(AioContext *ctx,
345                             EventNotifier *notifier,
346                             bool is_external,
347                             EventNotifierHandler *io_read,
348                             AioPollFn *io_poll)
349 {
350     aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
351                        (IOHandler *)io_read, NULL, io_poll, notifier);
352 }
353 
354 void aio_set_event_notifier_poll(AioContext *ctx,
355                                  EventNotifier *notifier,
356                                  EventNotifierHandler *io_poll_begin,
357                                  EventNotifierHandler *io_poll_end)
358 {
359     aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
360                     (IOHandler *)io_poll_begin,
361                     (IOHandler *)io_poll_end);
362 }
363 
364 static void poll_set_started(AioContext *ctx, bool started)
365 {
366     AioHandler *node;
367 
368     if (started == ctx->poll_started) {
369         return;
370     }
371 
372     ctx->poll_started = started;
373 
374     qemu_lockcnt_inc(&ctx->list_lock);
375     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
376         IOHandler *fn;
377 
378         if (QLIST_IS_INSERTED(node, node_deleted)) {
379             continue;
380         }
381 
382         if (started) {
383             fn = node->io_poll_begin;
384         } else {
385             fn = node->io_poll_end;
386         }
387 
388         if (fn) {
389             fn(node->opaque);
390         }
391     }
392     qemu_lockcnt_dec(&ctx->list_lock);
393 }
394 
395 
396 bool aio_prepare(AioContext *ctx)
397 {
398     /* Poll mode cannot be used with glib's event loop, disable it. */
399     poll_set_started(ctx, false);
400 
401     return false;
402 }
403 
404 bool aio_pending(AioContext *ctx)
405 {
406     AioHandler *node;
407     bool result = false;
408 
409     /*
410      * We have to walk very carefully in case aio_set_fd_handler is
411      * called while we're walking.
412      */
413     qemu_lockcnt_inc(&ctx->list_lock);
414 
415     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
416         int revents;
417 
418         revents = node->pfd.revents & node->pfd.events;
419         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
420             aio_node_check(ctx, node->is_external)) {
421             result = true;
422             break;
423         }
424         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
425             aio_node_check(ctx, node->is_external)) {
426             result = true;
427             break;
428         }
429     }
430     qemu_lockcnt_dec(&ctx->list_lock);
431 
432     return result;
433 }
434 
435 static void aio_free_deleted_handlers(AioContext *ctx)
436 {
437     AioHandler *node;
438 
439     if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
440         return;
441     }
442     if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
443         return; /* we are nested, let the parent do the freeing */
444     }
445 
446     while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
447         QLIST_REMOVE(node, node);
448         QLIST_REMOVE(node, node_deleted);
449         g_free(node);
450     }
451 
452     qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
453 }
454 
455 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
456 {
457     bool progress = false;
458     int revents;
459 
460     revents = node->pfd.revents & node->pfd.events;
461     node->pfd.revents = 0;
462 
463     if (!QLIST_IS_INSERTED(node, node_deleted) &&
464         (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
465         aio_node_check(ctx, node->is_external) &&
466         node->io_read) {
467         node->io_read(node->opaque);
468 
469         /* aio_notify() does not count as progress */
470         if (node->opaque != &ctx->notifier) {
471             progress = true;
472         }
473     }
474     if (!QLIST_IS_INSERTED(node, node_deleted) &&
475         (revents & (G_IO_OUT | G_IO_ERR)) &&
476         aio_node_check(ctx, node->is_external) &&
477         node->io_write) {
478         node->io_write(node->opaque);
479         progress = true;
480     }
481 
482     return progress;
483 }
484 
485 /*
486  * If we have a list of ready handlers then this is more efficient than
487  * scanning all handlers with aio_dispatch_handlers().
488  */
489 static bool aio_dispatch_ready_handlers(AioContext *ctx,
490                                         AioHandlerList *ready_list)
491 {
492     bool progress = false;
493     AioHandler *node;
494 
495     while ((node = QLIST_FIRST(ready_list))) {
496         QLIST_SAFE_REMOVE(node, node_ready);
497         progress = aio_dispatch_handler(ctx, node) || progress;
498     }
499 
500     return progress;
501 }
502 
503 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
504 static bool aio_dispatch_handlers(AioContext *ctx)
505 {
506     AioHandler *node, *tmp;
507     bool progress = false;
508 
509     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
510         progress = aio_dispatch_handler(ctx, node) || progress;
511     }
512 
513     return progress;
514 }
515 
516 void aio_dispatch(AioContext *ctx)
517 {
518     qemu_lockcnt_inc(&ctx->list_lock);
519     aio_bh_poll(ctx);
520     aio_dispatch_handlers(ctx);
521     aio_free_deleted_handlers(ctx);
522     qemu_lockcnt_dec(&ctx->list_lock);
523 
524     timerlistgroup_run_timers(&ctx->tlg);
525 }
526 
527 /* These thread-local variables are used only in a small part of aio_poll
528  * around the call to the poll() system call.  In particular they are not
529  * used while aio_poll is performing callbacks, which makes it much easier
530  * to think about reentrancy!
531  *
532  * Stack-allocated arrays would be perfect but they have size limitations;
533  * heap allocation is expensive enough that we want to reuse arrays across
534  * calls to aio_poll().  And because poll() has to be called without holding
535  * any lock, the arrays cannot be stored in AioContext.  Thread-local data
536  * has none of the disadvantages of these three options.
537  */
538 static __thread GPollFD *pollfds;
539 static __thread AioHandler **nodes;
540 static __thread unsigned npfd, nalloc;
541 static __thread Notifier pollfds_cleanup_notifier;
542 
543 static void pollfds_cleanup(Notifier *n, void *unused)
544 {
545     g_assert(npfd == 0);
546     g_free(pollfds);
547     g_free(nodes);
548     nalloc = 0;
549 }
550 
551 static void add_pollfd(AioHandler *node)
552 {
553     if (npfd == nalloc) {
554         if (nalloc == 0) {
555             pollfds_cleanup_notifier.notify = pollfds_cleanup;
556             qemu_thread_atexit_add(&pollfds_cleanup_notifier);
557             nalloc = 8;
558         } else {
559             g_assert(nalloc <= INT_MAX);
560             nalloc *= 2;
561         }
562         pollfds = g_renew(GPollFD, pollfds, nalloc);
563         nodes = g_renew(AioHandler *, nodes, nalloc);
564     }
565     nodes[npfd] = node;
566     pollfds[npfd] = (GPollFD) {
567         .fd = node->pfd.fd,
568         .events = node->pfd.events,
569     };
570     npfd++;
571 }
572 
573 static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
574 {
575     bool progress = false;
576     AioHandler *node;
577 
578     /*
579      * Optimization: ->io_poll() handlers often contain RCU read critical
580      * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
581      * -> rcu_read_lock() -> ... sequences with expensive memory
582      * synchronization primitives.  Make the entire polling loop an RCU
583      * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
584      * are cheap.
585      */
586     RCU_READ_LOCK_GUARD();
587 
588     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
589         if (!QLIST_IS_INSERTED(node, node_deleted) && node->io_poll &&
590             aio_node_check(ctx, node->is_external) &&
591             node->io_poll(node->opaque)) {
592             /*
593              * Polling was successful, exit try_poll_mode immediately
594              * to adjust the next polling time.
595              */
596             *timeout = 0;
597             if (node->opaque != &ctx->notifier) {
598                 progress = true;
599             }
600         }
601 
602         /* Caller handles freeing deleted nodes.  Don't do it here. */
603     }
604 
605     return progress;
606 }
607 
608 /* run_poll_handlers:
609  * @ctx: the AioContext
610  * @max_ns: maximum time to poll for, in nanoseconds
611  *
612  * Polls for a given time.
613  *
614  * Note that ctx->notify_me must be non-zero so this function can detect
615  * aio_notify().
616  *
617  * Note that the caller must have incremented ctx->list_lock.
618  *
619  * Returns: true if progress was made, false otherwise
620  */
621 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
622 {
623     bool progress;
624     int64_t start_time, elapsed_time;
625 
626     assert(ctx->notify_me);
627     assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
628 
629     trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
630 
631     start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
632     do {
633         progress = run_poll_handlers_once(ctx, timeout);
634         elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
635         max_ns = qemu_soonest_timeout(*timeout, max_ns);
636         assert(!(max_ns && progress));
637     } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt));
638 
639     /* If time has passed with no successful polling, adjust *timeout to
640      * keep the same ending time.
641      */
642     if (*timeout != -1) {
643         *timeout -= MIN(*timeout, elapsed_time);
644     }
645 
646     trace_run_poll_handlers_end(ctx, progress, *timeout);
647     return progress;
648 }
649 
650 /* try_poll_mode:
651  * @ctx: the AioContext
652  * @timeout: timeout for blocking wait, computed by the caller and updated if
653  *    polling succeeds.
654  *
655  * ctx->notify_me must be non-zero so this function can detect aio_notify().
656  *
657  * Note that the caller must have incremented ctx->list_lock.
658  *
659  * Returns: true if progress was made, false otherwise
660  */
661 static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
662 {
663     int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
664 
665     if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) {
666         poll_set_started(ctx, true);
667 
668         if (run_poll_handlers(ctx, max_ns, timeout)) {
669             return true;
670         }
671     }
672 
673     poll_set_started(ctx, false);
674 
675     /* Even if we don't run busy polling, try polling once in case it can make
676      * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2).
677      */
678     return run_poll_handlers_once(ctx, timeout);
679 }
680 
681 bool aio_poll(AioContext *ctx, bool blocking)
682 {
683     AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
684     AioHandler *node;
685     int i;
686     int ret = 0;
687     bool progress;
688     int64_t timeout;
689     int64_t start = 0;
690 
691     assert(in_aio_context_home_thread(ctx));
692 
693     /* aio_notify can avoid the expensive event_notifier_set if
694      * everything (file descriptors, bottom halves, timers) will
695      * be re-evaluated before the next blocking poll().  This is
696      * already true when aio_poll is called with blocking == false;
697      * if blocking == true, it is only true after poll() returns,
698      * so disable the optimization now.
699      */
700     if (blocking) {
701         atomic_add(&ctx->notify_me, 2);
702     }
703 
704     qemu_lockcnt_inc(&ctx->list_lock);
705 
706     if (ctx->poll_max_ns) {
707         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
708     }
709 
710     timeout = blocking ? aio_compute_timeout(ctx) : 0;
711     progress = try_poll_mode(ctx, &timeout);
712     assert(!(timeout && progress));
713 
714     /* If polling is allowed, non-blocking aio_poll does not need the
715      * system call---a single round of run_poll_handlers_once suffices.
716      */
717     if (timeout || atomic_read(&ctx->poll_disable_cnt)) {
718         assert(npfd == 0);
719 
720         /* fill pollfds */
721 
722         if (!aio_epoll_enabled(ctx)) {
723             QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
724                 if (!QLIST_IS_INSERTED(node, node_deleted) && node->pfd.events
725                     && aio_node_check(ctx, node->is_external)) {
726                     add_pollfd(node);
727                 }
728             }
729         }
730 
731         /* wait until next event */
732         if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
733             npfd = 0; /* pollfds[] is not being used */
734             ret = aio_epoll(ctx, &ready_list, timeout);
735         } else  {
736             ret = qemu_poll_ns(pollfds, npfd, timeout);
737         }
738     }
739 
740     if (blocking) {
741         atomic_sub(&ctx->notify_me, 2);
742         aio_notify_accept(ctx);
743     }
744 
745     /* Adjust polling time */
746     if (ctx->poll_max_ns) {
747         int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
748 
749         if (block_ns <= ctx->poll_ns) {
750             /* This is the sweet spot, no adjustment needed */
751         } else if (block_ns > ctx->poll_max_ns) {
752             /* We'd have to poll for too long, poll less */
753             int64_t old = ctx->poll_ns;
754 
755             if (ctx->poll_shrink) {
756                 ctx->poll_ns /= ctx->poll_shrink;
757             } else {
758                 ctx->poll_ns = 0;
759             }
760 
761             trace_poll_shrink(ctx, old, ctx->poll_ns);
762         } else if (ctx->poll_ns < ctx->poll_max_ns &&
763                    block_ns < ctx->poll_max_ns) {
764             /* There is room to grow, poll longer */
765             int64_t old = ctx->poll_ns;
766             int64_t grow = ctx->poll_grow;
767 
768             if (grow == 0) {
769                 grow = 2;
770             }
771 
772             if (ctx->poll_ns) {
773                 ctx->poll_ns *= grow;
774             } else {
775                 ctx->poll_ns = 4000; /* start polling at 4 microseconds */
776             }
777 
778             if (ctx->poll_ns > ctx->poll_max_ns) {
779                 ctx->poll_ns = ctx->poll_max_ns;
780             }
781 
782             trace_poll_grow(ctx, old, ctx->poll_ns);
783         }
784     }
785 
786     /* if we have any readable fds, dispatch event */
787     if (ret > 0) {
788         for (i = 0; i < npfd; i++) {
789             int revents = pollfds[i].revents;
790 
791             if (revents) {
792                 add_ready_handler(&ready_list, nodes[i], revents);
793             }
794         }
795     }
796 
797     npfd = 0;
798 
799     progress |= aio_bh_poll(ctx);
800 
801     if (ret > 0) {
802         progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
803     }
804 
805     aio_free_deleted_handlers(ctx);
806 
807     qemu_lockcnt_dec(&ctx->list_lock);
808 
809     progress |= timerlistgroup_run_timers(&ctx->tlg);
810 
811     return progress;
812 }
813 
814 void aio_context_setup(AioContext *ctx)
815 {
816 #ifdef CONFIG_EPOLL_CREATE1
817     assert(!ctx->epollfd);
818     ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
819     if (ctx->epollfd == -1) {
820         fprintf(stderr, "Failed to create epoll instance: %s", strerror(errno));
821         ctx->epoll_available = false;
822     } else {
823         ctx->epoll_available = true;
824     }
825 #endif
826 }
827 
828 void aio_context_destroy(AioContext *ctx)
829 {
830 #ifdef CONFIG_EPOLL_CREATE1
831     aio_epoll_disable(ctx);
832 #endif
833 }
834 
835 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
836                                  int64_t grow, int64_t shrink, Error **errp)
837 {
838     /* No thread synchronization here, it doesn't matter if an incorrect value
839      * is used once.
840      */
841     ctx->poll_max_ns = max_ns;
842     ctx->poll_ns = 0;
843     ctx->poll_grow = grow;
844     ctx->poll_shrink = shrink;
845 
846     aio_notify(ctx);
847 }
848