xref: /qemu/util/aio-win32.c (revision b2a3cbb8)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM Corp., 2008
5  * Copyright Red Hat Inc., 2012
6  *
7  * Authors:
8  *  Anthony Liguori   <aliguori@us.ibm.com>
9  *  Paolo Bonzini     <pbonzini@redhat.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2.  See
12  * the COPYING file in the top-level directory.
13  *
14  * Contributions after 2012-01-13 are licensed under the terms of the
15  * GNU GPL, version 2 or (at your option) any later version.
16  */
17 
18 #include "qemu/osdep.h"
19 #include "block/block.h"
20 #include "qemu/main-loop.h"
21 #include "qemu/queue.h"
22 #include "qemu/sockets.h"
23 #include "qapi/error.h"
24 #include "qemu/rcu_queue.h"
25 
26 struct AioHandler {
27     EventNotifier *e;
28     IOHandler *io_read;
29     IOHandler *io_write;
30     EventNotifierHandler *io_notify;
31     GPollFD pfd;
32     int deleted;
33     void *opaque;
34     bool is_external;
35     QLIST_ENTRY(AioHandler) node;
36 };
37 
38 static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
39 {
40     /*
41      * If the GSource is in the process of being destroyed then
42      * g_source_remove_poll() causes an assertion failure.  Skip
43      * removal in that case, because glib cleans up its state during
44      * destruction anyway.
45      */
46     if (!g_source_is_destroyed(&ctx->source)) {
47         g_source_remove_poll(&ctx->source, &node->pfd);
48     }
49 
50     /* If aio_poll is in progress, just mark the node as deleted */
51     if (qemu_lockcnt_count(&ctx->list_lock)) {
52         node->deleted = 1;
53         node->pfd.revents = 0;
54     } else {
55         /* Otherwise, delete it for real.  We can't just mark it as
56          * deleted because deleted nodes are only cleaned up after
57          * releasing the list_lock.
58          */
59         QLIST_REMOVE(node, node);
60         g_free(node);
61     }
62 }
63 
64 void aio_set_fd_handler(AioContext *ctx,
65                         int fd,
66                         bool is_external,
67                         IOHandler *io_read,
68                         IOHandler *io_write,
69                         AioPollFn *io_poll,
70                         IOHandler *io_poll_ready,
71                         void *opaque)
72 {
73     /* fd is a SOCKET in our case */
74     AioHandler *old_node;
75     AioHandler *node = NULL;
76 
77     qemu_lockcnt_lock(&ctx->list_lock);
78     QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
79         if (old_node->pfd.fd == fd && !old_node->deleted) {
80             break;
81         }
82     }
83 
84     if (io_read || io_write) {
85         HANDLE event;
86         long bitmask = 0;
87 
88         /* Alloc and insert if it's not already there */
89         node = g_new0(AioHandler, 1);
90         node->pfd.fd = fd;
91 
92         node->pfd.events = 0;
93         if (node->io_read) {
94             node->pfd.events |= G_IO_IN;
95         }
96         if (node->io_write) {
97             node->pfd.events |= G_IO_OUT;
98         }
99 
100         node->e = &ctx->notifier;
101 
102         /* Update handler with latest information */
103         node->opaque = opaque;
104         node->io_read = io_read;
105         node->io_write = io_write;
106         node->is_external = is_external;
107 
108         if (io_read) {
109             bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
110         }
111 
112         if (io_write) {
113             bitmask |= FD_WRITE | FD_CONNECT;
114         }
115 
116         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
117         event = event_notifier_get_handle(&ctx->notifier);
118         WSAEventSelect(node->pfd.fd, event, bitmask);
119     }
120     if (old_node) {
121         aio_remove_fd_handler(ctx, old_node);
122     }
123 
124     qemu_lockcnt_unlock(&ctx->list_lock);
125     aio_notify(ctx);
126 }
127 
128 void aio_set_fd_poll(AioContext *ctx, int fd,
129                      IOHandler *io_poll_begin,
130                      IOHandler *io_poll_end)
131 {
132     /* Not implemented */
133 }
134 
135 void aio_set_event_notifier(AioContext *ctx,
136                             EventNotifier *e,
137                             bool is_external,
138                             EventNotifierHandler *io_notify,
139                             AioPollFn *io_poll,
140                             EventNotifierHandler *io_poll_ready)
141 {
142     AioHandler *node;
143 
144     qemu_lockcnt_lock(&ctx->list_lock);
145     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
146         if (node->e == e && !node->deleted) {
147             break;
148         }
149     }
150 
151     /* Are we deleting the fd handler? */
152     if (!io_notify) {
153         if (node) {
154             aio_remove_fd_handler(ctx, node);
155         }
156     } else {
157         if (node == NULL) {
158             /* Alloc and insert if it's not already there */
159             node = g_new0(AioHandler, 1);
160             node->e = e;
161             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
162             node->pfd.events = G_IO_IN;
163             node->is_external = is_external;
164             QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
165 
166             g_source_add_poll(&ctx->source, &node->pfd);
167         }
168         /* Update handler with latest information */
169         node->io_notify = io_notify;
170     }
171 
172     qemu_lockcnt_unlock(&ctx->list_lock);
173     aio_notify(ctx);
174 }
175 
176 void aio_set_event_notifier_poll(AioContext *ctx,
177                                  EventNotifier *notifier,
178                                  EventNotifierHandler *io_poll_begin,
179                                  EventNotifierHandler *io_poll_end)
180 {
181     /* Not implemented */
182 }
183 
184 bool aio_prepare(AioContext *ctx)
185 {
186     static struct timeval tv0;
187     AioHandler *node;
188     bool have_select_revents = false;
189     fd_set rfds, wfds;
190 
191     /*
192      * We have to walk very carefully in case aio_set_fd_handler is
193      * called while we're walking.
194      */
195     qemu_lockcnt_inc(&ctx->list_lock);
196 
197     /* fill fd sets */
198     FD_ZERO(&rfds);
199     FD_ZERO(&wfds);
200     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
201         if (node->io_read) {
202             FD_SET ((SOCKET)node->pfd.fd, &rfds);
203         }
204         if (node->io_write) {
205             FD_SET ((SOCKET)node->pfd.fd, &wfds);
206         }
207     }
208 
209     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
210         QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
211             node->pfd.revents = 0;
212             if (FD_ISSET(node->pfd.fd, &rfds)) {
213                 node->pfd.revents |= G_IO_IN;
214                 have_select_revents = true;
215             }
216 
217             if (FD_ISSET(node->pfd.fd, &wfds)) {
218                 node->pfd.revents |= G_IO_OUT;
219                 have_select_revents = true;
220             }
221         }
222     }
223 
224     qemu_lockcnt_dec(&ctx->list_lock);
225     return have_select_revents;
226 }
227 
228 bool aio_pending(AioContext *ctx)
229 {
230     AioHandler *node;
231     bool result = false;
232 
233     /*
234      * We have to walk very carefully in case aio_set_fd_handler is
235      * called while we're walking.
236      */
237     qemu_lockcnt_inc(&ctx->list_lock);
238     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
239         if (node->pfd.revents && node->io_notify) {
240             result = true;
241             break;
242         }
243 
244         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
245             result = true;
246             break;
247         }
248         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
249             result = true;
250             break;
251         }
252     }
253 
254     qemu_lockcnt_dec(&ctx->list_lock);
255     return result;
256 }
257 
258 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
259 {
260     AioHandler *node;
261     bool progress = false;
262     AioHandler *tmp;
263 
264     /*
265      * We have to walk very carefully in case aio_set_fd_handler is
266      * called while we're walking.
267      */
268     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
269         int revents = node->pfd.revents;
270 
271         if (!node->deleted &&
272             (revents || event_notifier_get_handle(node->e) == event) &&
273             node->io_notify) {
274             node->pfd.revents = 0;
275             node->io_notify(node->e);
276 
277             /* aio_notify() does not count as progress */
278             if (node->e != &ctx->notifier) {
279                 progress = true;
280             }
281         }
282 
283         if (!node->deleted &&
284             (node->io_read || node->io_write)) {
285             node->pfd.revents = 0;
286             if ((revents & G_IO_IN) && node->io_read) {
287                 node->io_read(node->opaque);
288                 progress = true;
289             }
290             if ((revents & G_IO_OUT) && node->io_write) {
291                 node->io_write(node->opaque);
292                 progress = true;
293             }
294 
295             /* if the next select() will return an event, we have progressed */
296             if (event == event_notifier_get_handle(&ctx->notifier)) {
297                 WSANETWORKEVENTS ev;
298                 WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
299                 if (ev.lNetworkEvents) {
300                     progress = true;
301                 }
302             }
303         }
304 
305         if (node->deleted) {
306             if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
307                 QLIST_REMOVE(node, node);
308                 g_free(node);
309                 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
310             }
311         }
312     }
313 
314     return progress;
315 }
316 
317 void aio_dispatch(AioContext *ctx)
318 {
319     qemu_lockcnt_inc(&ctx->list_lock);
320     aio_bh_poll(ctx);
321     aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
322     qemu_lockcnt_dec(&ctx->list_lock);
323     timerlistgroup_run_timers(&ctx->tlg);
324 }
325 
326 bool aio_poll(AioContext *ctx, bool blocking)
327 {
328     AioHandler *node;
329     HANDLE events[MAXIMUM_WAIT_OBJECTS];
330     bool progress, have_select_revents, first;
331     unsigned count;
332     int timeout;
333 
334     /*
335      * There cannot be two concurrent aio_poll calls for the same AioContext (or
336      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
337      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
338      *
339      * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
340      * is special in that it runs in the main thread, but that thread's context
341      * is qemu_aio_context.
342      */
343     assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
344                                       qemu_get_aio_context() : ctx));
345     progress = false;
346 
347     /* aio_notify can avoid the expensive event_notifier_set if
348      * everything (file descriptors, bottom halves, timers) will
349      * be re-evaluated before the next blocking poll().  This is
350      * already true when aio_poll is called with blocking == false;
351      * if blocking == true, it is only true after poll() returns,
352      * so disable the optimization now.
353      */
354     if (blocking) {
355         qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
356         /*
357          * Write ctx->notify_me before computing the timeout
358          * (reading bottom half flags, etc.).  Pairs with
359          * smp_mb in aio_notify().
360          */
361         smp_mb();
362     }
363 
364     qemu_lockcnt_inc(&ctx->list_lock);
365     have_select_revents = aio_prepare(ctx);
366 
367     /* fill fd sets */
368     count = 0;
369     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
370         if (!node->deleted && node->io_notify
371             && aio_node_check(ctx, node->is_external)) {
372             assert(count < MAXIMUM_WAIT_OBJECTS);
373             events[count++] = event_notifier_get_handle(node->e);
374         }
375     }
376 
377     first = true;
378 
379     /* ctx->notifier is always registered.  */
380     assert(count > 0);
381 
382     /* Multiple iterations, all of them non-blocking except the first,
383      * may be necessary to process all pending events.  After the first
384      * WaitForMultipleObjects call ctx->notify_me will be decremented.
385      */
386     do {
387         HANDLE event;
388         int ret;
389 
390         timeout = blocking && !have_select_revents
391             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
392         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
393         if (blocking) {
394             assert(first);
395             qatomic_store_release(&ctx->notify_me,
396                                   qatomic_read(&ctx->notify_me) - 2);
397             aio_notify_accept(ctx);
398         }
399 
400         if (first) {
401             progress |= aio_bh_poll(ctx);
402             first = false;
403         }
404 
405         /* if we have any signaled events, dispatch event */
406         event = NULL;
407         if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
408             event = events[ret - WAIT_OBJECT_0];
409             events[ret - WAIT_OBJECT_0] = events[--count];
410         } else if (!have_select_revents) {
411             break;
412         }
413 
414         have_select_revents = false;
415         blocking = false;
416 
417         progress |= aio_dispatch_handlers(ctx, event);
418     } while (count > 0);
419 
420     qemu_lockcnt_dec(&ctx->list_lock);
421 
422     progress |= timerlistgroup_run_timers(&ctx->tlg);
423     return progress;
424 }
425 
426 void aio_context_setup(AioContext *ctx)
427 {
428 }
429 
430 void aio_context_destroy(AioContext *ctx)
431 {
432 }
433 
434 void aio_context_use_g_source(AioContext *ctx)
435 {
436 }
437 
438 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
439                                  int64_t grow, int64_t shrink, Error **errp)
440 {
441     if (max_ns) {
442         error_setg(errp, "AioContext polling is not implemented on Windows");
443     }
444 }
445 
446 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
447                                 Error **errp)
448 {
449 }
450