xref: /qemu/util/aio-posix.c (revision b355f08a)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  * Contributions after 2012-01-13 are licensed under the terms of the
13  * GNU GPL, version 2 or (at your option) any later version.
14  */
15 
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/rcu.h"
20 #include "qemu/rcu_queue.h"
21 #include "qemu/sockets.h"
22 #include "qemu/cutils.h"
23 #include "trace.h"
24 #include "aio-posix.h"
25 
26 /* Stop userspace polling on a handler if it isn't active for some time */
27 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
28 
29 bool aio_poll_disabled(AioContext *ctx)
30 {
31     return qatomic_read(&ctx->poll_disable_cnt);
32 }
33 
34 void aio_add_ready_handler(AioHandlerList *ready_list,
35                            AioHandler *node,
36                            int revents)
37 {
38     QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
39     node->pfd.revents = revents;
40     QLIST_INSERT_HEAD(ready_list, node, node_ready);
41 }
42 
43 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
44 {
45     AioHandler *node;
46 
47     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
48         if (node->pfd.fd == fd) {
49             if (!QLIST_IS_INSERTED(node, node_deleted)) {
50                 return node;
51             }
52         }
53     }
54 
55     return NULL;
56 }
57 
58 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
59 {
60     /* If the GSource is in the process of being destroyed then
61      * g_source_remove_poll() causes an assertion failure.  Skip
62      * removal in that case, because glib cleans up its state during
63      * destruction anyway.
64      */
65     if (!g_source_is_destroyed(&ctx->source)) {
66         g_source_remove_poll(&ctx->source, &node->pfd);
67     }
68 
69     node->pfd.revents = 0;
70 
71     /* If the fd monitor has already marked it deleted, leave it alone */
72     if (QLIST_IS_INSERTED(node, node_deleted)) {
73         return false;
74     }
75 
76     /* If a read is in progress, just mark the node as deleted */
77     if (qemu_lockcnt_count(&ctx->list_lock)) {
78         QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
79         return false;
80     }
81     /* Otherwise, delete it for real.  We can't just mark it as
82      * deleted because deleted nodes are only cleaned up while
83      * no one is walking the handlers list.
84      */
85     QLIST_SAFE_REMOVE(node, node_poll);
86     QLIST_REMOVE(node, node);
87     return true;
88 }
89 
90 void aio_set_fd_handler(AioContext *ctx,
91                         int fd,
92                         bool is_external,
93                         IOHandler *io_read,
94                         IOHandler *io_write,
95                         AioPollFn *io_poll,
96                         void *opaque)
97 {
98     AioHandler *node;
99     AioHandler *new_node = NULL;
100     bool is_new = false;
101     bool deleted = false;
102     int poll_disable_change;
103 
104     qemu_lockcnt_lock(&ctx->list_lock);
105 
106     node = find_aio_handler(ctx, fd);
107 
108     /* Are we deleting the fd handler? */
109     if (!io_read && !io_write && !io_poll) {
110         if (node == NULL) {
111             qemu_lockcnt_unlock(&ctx->list_lock);
112             return;
113         }
114         /* Clean events in order to unregister fd from the ctx epoll. */
115         node->pfd.events = 0;
116 
117         poll_disable_change = -!node->io_poll;
118     } else {
119         poll_disable_change = !io_poll - (node && !node->io_poll);
120         if (node == NULL) {
121             is_new = true;
122         }
123         /* Alloc and insert if it's not already there */
124         new_node = g_new0(AioHandler, 1);
125 
126         /* Update handler with latest information */
127         new_node->io_read = io_read;
128         new_node->io_write = io_write;
129         new_node->io_poll = io_poll;
130         new_node->opaque = opaque;
131         new_node->is_external = is_external;
132 
133         if (is_new) {
134             new_node->pfd.fd = fd;
135         } else {
136             new_node->pfd = node->pfd;
137         }
138         g_source_add_poll(&ctx->source, &new_node->pfd);
139 
140         new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
141         new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
142 
143         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
144     }
145 
146     /* No need to order poll_disable_cnt writes against other updates;
147      * the counter is only used to avoid wasting time and latency on
148      * iterated polling when the system call will be ultimately necessary.
149      * Changing handlers is a rare event, and a little wasted polling until
150      * the aio_notify below is not an issue.
151      */
152     qatomic_set(&ctx->poll_disable_cnt,
153                qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
154 
155     ctx->fdmon_ops->update(ctx, node, new_node);
156     if (node) {
157         deleted = aio_remove_fd_handler(ctx, node);
158     }
159     qemu_lockcnt_unlock(&ctx->list_lock);
160     aio_notify(ctx);
161 
162     if (deleted) {
163         g_free(node);
164     }
165 }
166 
167 void aio_set_fd_poll(AioContext *ctx, int fd,
168                      IOHandler *io_poll_begin,
169                      IOHandler *io_poll_end)
170 {
171     AioHandler *node = find_aio_handler(ctx, fd);
172 
173     if (!node) {
174         return;
175     }
176 
177     node->io_poll_begin = io_poll_begin;
178     node->io_poll_end = io_poll_end;
179 }
180 
181 void aio_set_event_notifier(AioContext *ctx,
182                             EventNotifier *notifier,
183                             bool is_external,
184                             EventNotifierHandler *io_read,
185                             AioPollFn *io_poll)
186 {
187     aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
188                        (IOHandler *)io_read, NULL, io_poll, notifier);
189 }
190 
191 void aio_set_event_notifier_poll(AioContext *ctx,
192                                  EventNotifier *notifier,
193                                  EventNotifierHandler *io_poll_begin,
194                                  EventNotifierHandler *io_poll_end)
195 {
196     aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
197                     (IOHandler *)io_poll_begin,
198                     (IOHandler *)io_poll_end);
199 }
200 
201 static bool poll_set_started(AioContext *ctx, bool started)
202 {
203     AioHandler *node;
204     bool progress = false;
205 
206     if (started == ctx->poll_started) {
207         return false;
208     }
209 
210     ctx->poll_started = started;
211 
212     qemu_lockcnt_inc(&ctx->list_lock);
213     QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
214         IOHandler *fn;
215 
216         if (QLIST_IS_INSERTED(node, node_deleted)) {
217             continue;
218         }
219 
220         if (started) {
221             fn = node->io_poll_begin;
222         } else {
223             fn = node->io_poll_end;
224         }
225 
226         if (fn) {
227             fn(node->opaque);
228         }
229 
230         /* Poll one last time in case ->io_poll_end() raced with the event */
231         if (!started) {
232             progress = node->io_poll(node->opaque) || progress;
233         }
234     }
235     qemu_lockcnt_dec(&ctx->list_lock);
236 
237     return progress;
238 }
239 
240 
241 bool aio_prepare(AioContext *ctx)
242 {
243     /* Poll mode cannot be used with glib's event loop, disable it. */
244     poll_set_started(ctx, false);
245 
246     return false;
247 }
248 
249 bool aio_pending(AioContext *ctx)
250 {
251     AioHandler *node;
252     bool result = false;
253 
254     /*
255      * We have to walk very carefully in case aio_set_fd_handler is
256      * called while we're walking.
257      */
258     qemu_lockcnt_inc(&ctx->list_lock);
259 
260     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
261         int revents;
262 
263         revents = node->pfd.revents & node->pfd.events;
264         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
265             aio_node_check(ctx, node->is_external)) {
266             result = true;
267             break;
268         }
269         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
270             aio_node_check(ctx, node->is_external)) {
271             result = true;
272             break;
273         }
274     }
275     qemu_lockcnt_dec(&ctx->list_lock);
276 
277     return result;
278 }
279 
280 static void aio_free_deleted_handlers(AioContext *ctx)
281 {
282     AioHandler *node;
283 
284     if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
285         return;
286     }
287     if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
288         return; /* we are nested, let the parent do the freeing */
289     }
290 
291     while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
292         QLIST_REMOVE(node, node);
293         QLIST_REMOVE(node, node_deleted);
294         QLIST_SAFE_REMOVE(node, node_poll);
295         g_free(node);
296     }
297 
298     qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
299 }
300 
301 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
302 {
303     bool progress = false;
304     int revents;
305 
306     revents = node->pfd.revents & node->pfd.events;
307     node->pfd.revents = 0;
308 
309     /*
310      * Start polling AioHandlers when they become ready because activity is
311      * likely to continue.  Note that starvation is theoretically possible when
312      * fdmon_supports_polling(), but only until the fd fires for the first
313      * time.
314      */
315     if (!QLIST_IS_INSERTED(node, node_deleted) &&
316         !QLIST_IS_INSERTED(node, node_poll) &&
317         node->io_poll) {
318         trace_poll_add(ctx, node, node->pfd.fd, revents);
319         if (ctx->poll_started && node->io_poll_begin) {
320             node->io_poll_begin(node->opaque);
321         }
322         QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
323     }
324 
325     if (!QLIST_IS_INSERTED(node, node_deleted) &&
326         (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
327         aio_node_check(ctx, node->is_external) &&
328         node->io_read) {
329         node->io_read(node->opaque);
330 
331         /* aio_notify() does not count as progress */
332         if (node->opaque != &ctx->notifier) {
333             progress = true;
334         }
335     }
336     if (!QLIST_IS_INSERTED(node, node_deleted) &&
337         (revents & (G_IO_OUT | G_IO_ERR)) &&
338         aio_node_check(ctx, node->is_external) &&
339         node->io_write) {
340         node->io_write(node->opaque);
341         progress = true;
342     }
343 
344     return progress;
345 }
346 
347 /*
348  * If we have a list of ready handlers then this is more efficient than
349  * scanning all handlers with aio_dispatch_handlers().
350  */
351 static bool aio_dispatch_ready_handlers(AioContext *ctx,
352                                         AioHandlerList *ready_list)
353 {
354     bool progress = false;
355     AioHandler *node;
356 
357     while ((node = QLIST_FIRST(ready_list))) {
358         QLIST_REMOVE(node, node_ready);
359         progress = aio_dispatch_handler(ctx, node) || progress;
360     }
361 
362     return progress;
363 }
364 
365 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
366 static bool aio_dispatch_handlers(AioContext *ctx)
367 {
368     AioHandler *node, *tmp;
369     bool progress = false;
370 
371     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
372         progress = aio_dispatch_handler(ctx, node) || progress;
373     }
374 
375     return progress;
376 }
377 
378 void aio_dispatch(AioContext *ctx)
379 {
380     qemu_lockcnt_inc(&ctx->list_lock);
381     aio_bh_poll(ctx);
382     aio_dispatch_handlers(ctx);
383     aio_free_deleted_handlers(ctx);
384     qemu_lockcnt_dec(&ctx->list_lock);
385 
386     timerlistgroup_run_timers(&ctx->tlg);
387 }
388 
389 static bool run_poll_handlers_once(AioContext *ctx,
390                                    int64_t now,
391                                    int64_t *timeout)
392 {
393     bool progress = false;
394     AioHandler *node;
395     AioHandler *tmp;
396 
397     QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
398         if (aio_node_check(ctx, node->is_external) &&
399             node->io_poll(node->opaque)) {
400             node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
401 
402             /*
403              * Polling was successful, exit try_poll_mode immediately
404              * to adjust the next polling time.
405              */
406             *timeout = 0;
407             if (node->opaque != &ctx->notifier) {
408                 progress = true;
409             }
410         }
411 
412         /* Caller handles freeing deleted nodes.  Don't do it here. */
413     }
414 
415     return progress;
416 }
417 
418 static bool fdmon_supports_polling(AioContext *ctx)
419 {
420     return ctx->fdmon_ops->need_wait != aio_poll_disabled;
421 }
422 
423 static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
424 {
425     AioHandler *node;
426     AioHandler *tmp;
427     bool progress = false;
428 
429     /*
430      * File descriptor monitoring implementations without userspace polling
431      * support suffer from starvation when a subset of handlers is polled
432      * because fds will not be processed in a timely fashion.  Don't remove
433      * idle poll handlers.
434      */
435     if (!fdmon_supports_polling(ctx)) {
436         return false;
437     }
438 
439     QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
440         if (node->poll_idle_timeout == 0LL) {
441             node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
442         } else if (now >= node->poll_idle_timeout) {
443             trace_poll_remove(ctx, node, node->pfd.fd);
444             node->poll_idle_timeout = 0LL;
445             QLIST_SAFE_REMOVE(node, node_poll);
446             if (ctx->poll_started && node->io_poll_end) {
447                 node->io_poll_end(node->opaque);
448 
449                 /*
450                  * Final poll in case ->io_poll_end() races with an event.
451                  * Nevermind about re-adding the handler in the rare case where
452                  * this causes progress.
453                  */
454                 progress = node->io_poll(node->opaque) || progress;
455             }
456         }
457     }
458 
459     return progress;
460 }
461 
462 /* run_poll_handlers:
463  * @ctx: the AioContext
464  * @max_ns: maximum time to poll for, in nanoseconds
465  *
466  * Polls for a given time.
467  *
468  * Note that the caller must have incremented ctx->list_lock.
469  *
470  * Returns: true if progress was made, false otherwise
471  */
472 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
473 {
474     bool progress;
475     int64_t start_time, elapsed_time;
476 
477     assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
478 
479     trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
480 
481     /*
482      * Optimization: ->io_poll() handlers often contain RCU read critical
483      * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
484      * -> rcu_read_lock() -> ... sequences with expensive memory
485      * synchronization primitives.  Make the entire polling loop an RCU
486      * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
487      * are cheap.
488      */
489     RCU_READ_LOCK_GUARD();
490 
491     start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
492     do {
493         progress = run_poll_handlers_once(ctx, start_time, timeout);
494         elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
495         max_ns = qemu_soonest_timeout(*timeout, max_ns);
496         assert(!(max_ns && progress));
497     } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
498 
499     if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
500         *timeout = 0;
501         progress = true;
502     }
503 
504     /* If time has passed with no successful polling, adjust *timeout to
505      * keep the same ending time.
506      */
507     if (*timeout != -1) {
508         *timeout -= MIN(*timeout, elapsed_time);
509     }
510 
511     trace_run_poll_handlers_end(ctx, progress, *timeout);
512     return progress;
513 }
514 
515 /* try_poll_mode:
516  * @ctx: the AioContext
517  * @timeout: timeout for blocking wait, computed by the caller and updated if
518  *    polling succeeds.
519  *
520  * Note that the caller must have incremented ctx->list_lock.
521  *
522  * Returns: true if progress was made, false otherwise
523  */
524 static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
525 {
526     int64_t max_ns;
527 
528     if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
529         return false;
530     }
531 
532     max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
533     if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
534         poll_set_started(ctx, true);
535 
536         if (run_poll_handlers(ctx, max_ns, timeout)) {
537             return true;
538         }
539     }
540 
541     if (poll_set_started(ctx, false)) {
542         *timeout = 0;
543         return true;
544     }
545 
546     return false;
547 }
548 
549 bool aio_poll(AioContext *ctx, bool blocking)
550 {
551     AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
552     int ret = 0;
553     bool progress;
554     bool use_notify_me;
555     int64_t timeout;
556     int64_t start = 0;
557 
558     /*
559      * There cannot be two concurrent aio_poll calls for the same AioContext (or
560      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
561      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
562      *
563      * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
564      * is special in that it runs in the main thread, but that thread's context
565      * is qemu_aio_context.
566      */
567     assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
568                                       qemu_get_aio_context() : ctx));
569 
570     qemu_lockcnt_inc(&ctx->list_lock);
571 
572     if (ctx->poll_max_ns) {
573         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
574     }
575 
576     timeout = blocking ? aio_compute_timeout(ctx) : 0;
577     progress = try_poll_mode(ctx, &timeout);
578     assert(!(timeout && progress));
579 
580     /*
581      * aio_notify can avoid the expensive event_notifier_set if
582      * everything (file descriptors, bottom halves, timers) will
583      * be re-evaluated before the next blocking poll().  This is
584      * already true when aio_poll is called with blocking == false;
585      * if blocking == true, it is only true after poll() returns,
586      * so disable the optimization now.
587      */
588     use_notify_me = timeout != 0;
589     if (use_notify_me) {
590         qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
591         /*
592          * Write ctx->notify_me before reading ctx->notified.  Pairs with
593          * smp_mb in aio_notify().
594          */
595         smp_mb();
596 
597         /* Don't block if aio_notify() was called */
598         if (qatomic_read(&ctx->notified)) {
599             timeout = 0;
600         }
601     }
602 
603     /* If polling is allowed, non-blocking aio_poll does not need the
604      * system call---a single round of run_poll_handlers_once suffices.
605      */
606     if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
607         ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
608     }
609 
610     if (use_notify_me) {
611         /* Finish the poll before clearing the flag.  */
612         qatomic_store_release(&ctx->notify_me,
613                              qatomic_read(&ctx->notify_me) - 2);
614     }
615 
616     aio_notify_accept(ctx);
617 
618     /* Adjust polling time */
619     if (ctx->poll_max_ns) {
620         int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
621 
622         if (block_ns <= ctx->poll_ns) {
623             /* This is the sweet spot, no adjustment needed */
624         } else if (block_ns > ctx->poll_max_ns) {
625             /* We'd have to poll for too long, poll less */
626             int64_t old = ctx->poll_ns;
627 
628             if (ctx->poll_shrink) {
629                 ctx->poll_ns /= ctx->poll_shrink;
630             } else {
631                 ctx->poll_ns = 0;
632             }
633 
634             trace_poll_shrink(ctx, old, ctx->poll_ns);
635         } else if (ctx->poll_ns < ctx->poll_max_ns &&
636                    block_ns < ctx->poll_max_ns) {
637             /* There is room to grow, poll longer */
638             int64_t old = ctx->poll_ns;
639             int64_t grow = ctx->poll_grow;
640 
641             if (grow == 0) {
642                 grow = 2;
643             }
644 
645             if (ctx->poll_ns) {
646                 ctx->poll_ns *= grow;
647             } else {
648                 ctx->poll_ns = 4000; /* start polling at 4 microseconds */
649             }
650 
651             if (ctx->poll_ns > ctx->poll_max_ns) {
652                 ctx->poll_ns = ctx->poll_max_ns;
653             }
654 
655             trace_poll_grow(ctx, old, ctx->poll_ns);
656         }
657     }
658 
659     progress |= aio_bh_poll(ctx);
660 
661     if (ret > 0) {
662         progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
663     }
664 
665     aio_free_deleted_handlers(ctx);
666 
667     qemu_lockcnt_dec(&ctx->list_lock);
668 
669     progress |= timerlistgroup_run_timers(&ctx->tlg);
670 
671     return progress;
672 }
673 
674 void aio_context_setup(AioContext *ctx)
675 {
676     ctx->fdmon_ops = &fdmon_poll_ops;
677     ctx->epollfd = -1;
678 
679     /* Use the fastest fd monitoring implementation if available */
680     if (fdmon_io_uring_setup(ctx)) {
681         return;
682     }
683 
684     fdmon_epoll_setup(ctx);
685 }
686 
687 void aio_context_destroy(AioContext *ctx)
688 {
689     fdmon_io_uring_destroy(ctx);
690     fdmon_epoll_disable(ctx);
691     aio_free_deleted_handlers(ctx);
692 }
693 
694 void aio_context_use_g_source(AioContext *ctx)
695 {
696     /*
697      * Disable io_uring when the glib main loop is used because it doesn't
698      * support mixed glib/aio_poll() usage. It relies on aio_poll() being
699      * called regularly so that changes to the monitored file descriptors are
700      * submitted, otherwise a list of pending fd handlers builds up.
701      */
702     fdmon_io_uring_destroy(ctx);
703     aio_free_deleted_handlers(ctx);
704 }
705 
706 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
707                                  int64_t grow, int64_t shrink, Error **errp)
708 {
709     /* No thread synchronization here, it doesn't matter if an incorrect value
710      * is used once.
711      */
712     ctx->poll_max_ns = max_ns;
713     ctx->poll_ns = 0;
714     ctx->poll_grow = grow;
715     ctx->poll_shrink = shrink;
716 
717     aio_notify(ctx);
718 }
719 
720 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
721                                 Error **errp)
722 {
723     /*
724      * No thread synchronization here, it doesn't matter if an incorrect value
725      * is used once.
726      */
727     ctx->aio_max_batch = max_batch;
728 
729     aio_notify(ctx);
730 }
731