xref: /qemu/util/aio-win32.c (revision 78f314cf)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM Corp., 2008
5  * Copyright Red Hat Inc., 2012
6  *
7  * Authors:
8  *  Anthony Liguori   <aliguori@us.ibm.com>
9  *  Paolo Bonzini     <pbonzini@redhat.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2.  See
12  * the COPYING file in the top-level directory.
13  *
14  * Contributions after 2012-01-13 are licensed under the terms of the
15  * GNU GPL, version 2 or (at your option) any later version.
16  */
17 
18 #include "qemu/osdep.h"
19 #include "block/block.h"
20 #include "qemu/main-loop.h"
21 #include "qemu/queue.h"
22 #include "qemu/sockets.h"
23 #include "qapi/error.h"
24 #include "qemu/rcu_queue.h"
25 #include "qemu/error-report.h"
26 
27 struct AioHandler {
28     EventNotifier *e;
29     IOHandler *io_read;
30     IOHandler *io_write;
31     EventNotifierHandler *io_notify;
32     GPollFD pfd;
33     int deleted;
34     void *opaque;
35     QLIST_ENTRY(AioHandler) node;
36 };
37 
38 static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
39 {
40     /*
41      * If the GSource is in the process of being destroyed then
42      * g_source_remove_poll() causes an assertion failure.  Skip
43      * removal in that case, because glib cleans up its state during
44      * destruction anyway.
45      */
46     if (!g_source_is_destroyed(&ctx->source)) {
47         g_source_remove_poll(&ctx->source, &node->pfd);
48     }
49 
50     /* If aio_poll is in progress, just mark the node as deleted */
51     if (qemu_lockcnt_count(&ctx->list_lock)) {
52         node->deleted = 1;
53         node->pfd.revents = 0;
54     } else {
55         /* Otherwise, delete it for real.  We can't just mark it as
56          * deleted because deleted nodes are only cleaned up after
57          * releasing the list_lock.
58          */
59         QLIST_REMOVE(node, node);
60         g_free(node);
61     }
62 }
63 
64 void aio_set_fd_handler(AioContext *ctx,
65                         int fd,
66                         IOHandler *io_read,
67                         IOHandler *io_write,
68                         AioPollFn *io_poll,
69                         IOHandler *io_poll_ready,
70                         void *opaque)
71 {
72     AioHandler *old_node;
73     AioHandler *node = NULL;
74     SOCKET s;
75 
76     if (!fd_is_socket(fd)) {
77         error_report("fd=%d is not a socket, AIO implementation is missing", fd);
78         return;
79     }
80 
81     s = _get_osfhandle(fd);
82 
83     qemu_lockcnt_lock(&ctx->list_lock);
84     QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
85         if (old_node->pfd.fd == s && !old_node->deleted) {
86             break;
87         }
88     }
89 
90     if (io_read || io_write) {
91         HANDLE event;
92         long bitmask = 0;
93 
94         /* Alloc and insert if it's not already there */
95         node = g_new0(AioHandler, 1);
96         node->pfd.fd = s;
97 
98         node->pfd.events = 0;
99         if (node->io_read) {
100             node->pfd.events |= G_IO_IN;
101         }
102         if (node->io_write) {
103             node->pfd.events |= G_IO_OUT;
104         }
105 
106         node->e = &ctx->notifier;
107 
108         /* Update handler with latest information */
109         node->opaque = opaque;
110         node->io_read = io_read;
111         node->io_write = io_write;
112 
113         if (io_read) {
114             bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
115         }
116 
117         if (io_write) {
118             bitmask |= FD_WRITE | FD_CONNECT;
119         }
120 
121         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
122         event = event_notifier_get_handle(&ctx->notifier);
123         qemu_socket_select(fd, event, bitmask, NULL);
124     }
125     if (old_node) {
126         aio_remove_fd_handler(ctx, old_node);
127     }
128 
129     qemu_lockcnt_unlock(&ctx->list_lock);
130     aio_notify(ctx);
131 }
132 
133 void aio_set_event_notifier(AioContext *ctx,
134                             EventNotifier *e,
135                             EventNotifierHandler *io_notify,
136                             AioPollFn *io_poll,
137                             EventNotifierHandler *io_poll_ready)
138 {
139     AioHandler *node;
140 
141     qemu_lockcnt_lock(&ctx->list_lock);
142     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
143         if (node->e == e && !node->deleted) {
144             break;
145         }
146     }
147 
148     /* Are we deleting the fd handler? */
149     if (!io_notify) {
150         if (node) {
151             aio_remove_fd_handler(ctx, node);
152         }
153     } else {
154         if (node == NULL) {
155             /* Alloc and insert if it's not already there */
156             node = g_new0(AioHandler, 1);
157             node->e = e;
158             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
159             node->pfd.events = G_IO_IN;
160             QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
161 
162             g_source_add_poll(&ctx->source, &node->pfd);
163         }
164         /* Update handler with latest information */
165         node->io_notify = io_notify;
166     }
167 
168     qemu_lockcnt_unlock(&ctx->list_lock);
169     aio_notify(ctx);
170 }
171 
172 void aio_set_event_notifier_poll(AioContext *ctx,
173                                  EventNotifier *notifier,
174                                  EventNotifierHandler *io_poll_begin,
175                                  EventNotifierHandler *io_poll_end)
176 {
177     /* Not implemented */
178 }
179 
180 bool aio_prepare(AioContext *ctx)
181 {
182     static struct timeval tv0;
183     AioHandler *node;
184     bool have_select_revents = false;
185     fd_set rfds, wfds;
186 
187     /*
188      * We have to walk very carefully in case aio_set_fd_handler is
189      * called while we're walking.
190      */
191     qemu_lockcnt_inc(&ctx->list_lock);
192 
193     /* fill fd sets */
194     FD_ZERO(&rfds);
195     FD_ZERO(&wfds);
196     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
197         if (node->io_read) {
198             FD_SET ((SOCKET)node->pfd.fd, &rfds);
199         }
200         if (node->io_write) {
201             FD_SET ((SOCKET)node->pfd.fd, &wfds);
202         }
203     }
204 
205     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
206         QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
207             node->pfd.revents = 0;
208             if (FD_ISSET(node->pfd.fd, &rfds)) {
209                 node->pfd.revents |= G_IO_IN;
210                 have_select_revents = true;
211             }
212 
213             if (FD_ISSET(node->pfd.fd, &wfds)) {
214                 node->pfd.revents |= G_IO_OUT;
215                 have_select_revents = true;
216             }
217         }
218     }
219 
220     qemu_lockcnt_dec(&ctx->list_lock);
221     return have_select_revents;
222 }
223 
224 bool aio_pending(AioContext *ctx)
225 {
226     AioHandler *node;
227     bool result = false;
228 
229     /*
230      * We have to walk very carefully in case aio_set_fd_handler is
231      * called while we're walking.
232      */
233     qemu_lockcnt_inc(&ctx->list_lock);
234     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
235         if (node->pfd.revents && node->io_notify) {
236             result = true;
237             break;
238         }
239 
240         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
241             result = true;
242             break;
243         }
244         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
245             result = true;
246             break;
247         }
248     }
249 
250     qemu_lockcnt_dec(&ctx->list_lock);
251     return result;
252 }
253 
254 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
255 {
256     AioHandler *node;
257     bool progress = false;
258     AioHandler *tmp;
259 
260     /*
261      * We have to walk very carefully in case aio_set_fd_handler is
262      * called while we're walking.
263      */
264     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
265         int revents = node->pfd.revents;
266 
267         if (!node->deleted &&
268             (revents || event_notifier_get_handle(node->e) == event) &&
269             node->io_notify) {
270             node->pfd.revents = 0;
271             node->io_notify(node->e);
272 
273             /* aio_notify() does not count as progress */
274             if (node->e != &ctx->notifier) {
275                 progress = true;
276             }
277         }
278 
279         if (!node->deleted &&
280             (node->io_read || node->io_write)) {
281             node->pfd.revents = 0;
282             if ((revents & G_IO_IN) && node->io_read) {
283                 node->io_read(node->opaque);
284                 progress = true;
285             }
286             if ((revents & G_IO_OUT) && node->io_write) {
287                 node->io_write(node->opaque);
288                 progress = true;
289             }
290 
291             /* if the next select() will return an event, we have progressed */
292             if (event == event_notifier_get_handle(&ctx->notifier)) {
293                 WSANETWORKEVENTS ev;
294                 WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
295                 if (ev.lNetworkEvents) {
296                     progress = true;
297                 }
298             }
299         }
300 
301         if (node->deleted) {
302             if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
303                 QLIST_REMOVE(node, node);
304                 g_free(node);
305                 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
306             }
307         }
308     }
309 
310     return progress;
311 }
312 
313 void aio_dispatch(AioContext *ctx)
314 {
315     qemu_lockcnt_inc(&ctx->list_lock);
316     aio_bh_poll(ctx);
317     aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
318     qemu_lockcnt_dec(&ctx->list_lock);
319     timerlistgroup_run_timers(&ctx->tlg);
320 }
321 
322 bool aio_poll(AioContext *ctx, bool blocking)
323 {
324     AioHandler *node;
325     HANDLE events[MAXIMUM_WAIT_OBJECTS];
326     bool progress, have_select_revents, first;
327     unsigned count;
328     int timeout;
329 
330     /*
331      * There cannot be two concurrent aio_poll calls for the same AioContext (or
332      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
333      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
334      *
335      * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
336      * is special in that it runs in the main thread, but that thread's context
337      * is qemu_aio_context.
338      */
339     assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
340                                       qemu_get_aio_context() : ctx));
341     progress = false;
342 
343     /* aio_notify can avoid the expensive event_notifier_set if
344      * everything (file descriptors, bottom halves, timers) will
345      * be re-evaluated before the next blocking poll().  This is
346      * already true when aio_poll is called with blocking == false;
347      * if blocking == true, it is only true after poll() returns,
348      * so disable the optimization now.
349      */
350     if (blocking) {
351         qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
352         /*
353          * Write ctx->notify_me before computing the timeout
354          * (reading bottom half flags, etc.).  Pairs with
355          * smp_mb in aio_notify().
356          */
357         smp_mb();
358     }
359 
360     qemu_lockcnt_inc(&ctx->list_lock);
361     have_select_revents = aio_prepare(ctx);
362 
363     /* fill fd sets */
364     count = 0;
365     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
366         if (!node->deleted && node->io_notify) {
367             assert(count < MAXIMUM_WAIT_OBJECTS);
368             events[count++] = event_notifier_get_handle(node->e);
369         }
370     }
371 
372     first = true;
373 
374     /* ctx->notifier is always registered.  */
375     assert(count > 0);
376 
377     /* Multiple iterations, all of them non-blocking except the first,
378      * may be necessary to process all pending events.  After the first
379      * WaitForMultipleObjects call ctx->notify_me will be decremented.
380      */
381     do {
382         HANDLE event;
383         int ret;
384 
385         timeout = blocking && !have_select_revents
386             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
387         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
388         if (blocking) {
389             assert(first);
390             qatomic_store_release(&ctx->notify_me,
391                                   qatomic_read(&ctx->notify_me) - 2);
392             aio_notify_accept(ctx);
393         }
394 
395         if (first) {
396             progress |= aio_bh_poll(ctx);
397             first = false;
398         }
399 
400         /* if we have any signaled events, dispatch event */
401         event = NULL;
402         if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
403             event = events[ret - WAIT_OBJECT_0];
404             events[ret - WAIT_OBJECT_0] = events[--count];
405         } else if (!have_select_revents) {
406             break;
407         }
408 
409         have_select_revents = false;
410         blocking = false;
411 
412         progress |= aio_dispatch_handlers(ctx, event);
413     } while (count > 0);
414 
415     qemu_lockcnt_dec(&ctx->list_lock);
416 
417     progress |= timerlistgroup_run_timers(&ctx->tlg);
418     return progress;
419 }
420 
421 void aio_context_setup(AioContext *ctx)
422 {
423 }
424 
425 void aio_context_destroy(AioContext *ctx)
426 {
427 }
428 
429 void aio_context_use_g_source(AioContext *ctx)
430 {
431 }
432 
433 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
434                                  int64_t grow, int64_t shrink, Error **errp)
435 {
436     if (max_ns) {
437         error_setg(errp, "AioContext polling is not implemented on Windows");
438     }
439 }
440 
441 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
442                                 Error **errp)
443 {
444 }
445