xref: /qemu/util/aio-win32.c (revision d73415a3)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM Corp., 2008
5  * Copyright Red Hat Inc., 2012
6  *
7  * Authors:
8  *  Anthony Liguori   <aliguori@us.ibm.com>
9  *  Paolo Bonzini     <pbonzini@redhat.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2.  See
12  * the COPYING file in the top-level directory.
13  *
14  * Contributions after 2012-01-13 are licensed under the terms of the
15  * GNU GPL, version 2 or (at your option) any later version.
16  */
17 
18 #include "qemu/osdep.h"
19 #include "qemu-common.h"
20 #include "block/block.h"
21 #include "qemu/queue.h"
22 #include "qemu/sockets.h"
23 #include "qapi/error.h"
24 #include "qemu/rcu_queue.h"
25 
26 struct AioHandler {
27     EventNotifier *e;
28     IOHandler *io_read;
29     IOHandler *io_write;
30     EventNotifierHandler *io_notify;
31     GPollFD pfd;
32     int deleted;
33     void *opaque;
34     bool is_external;
35     QLIST_ENTRY(AioHandler) node;
36 };
37 
38 static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
39 {
40     /*
41      * If the GSource is in the process of being destroyed then
42      * g_source_remove_poll() causes an assertion failure.  Skip
43      * removal in that case, because glib cleans up its state during
44      * destruction anyway.
45      */
46     if (!g_source_is_destroyed(&ctx->source)) {
47         g_source_remove_poll(&ctx->source, &node->pfd);
48     }
49 
50     /* If aio_poll is in progress, just mark the node as deleted */
51     if (qemu_lockcnt_count(&ctx->list_lock)) {
52         node->deleted = 1;
53         node->pfd.revents = 0;
54     } else {
55         /* Otherwise, delete it for real.  We can't just mark it as
56          * deleted because deleted nodes are only cleaned up after
57          * releasing the list_lock.
58          */
59         QLIST_REMOVE(node, node);
60         g_free(node);
61     }
62 }
63 
64 void aio_set_fd_handler(AioContext *ctx,
65                         int fd,
66                         bool is_external,
67                         IOHandler *io_read,
68                         IOHandler *io_write,
69                         AioPollFn *io_poll,
70                         void *opaque)
71 {
72     /* fd is a SOCKET in our case */
73     AioHandler *old_node;
74     AioHandler *node = NULL;
75 
76     qemu_lockcnt_lock(&ctx->list_lock);
77     QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
78         if (old_node->pfd.fd == fd && !old_node->deleted) {
79             break;
80         }
81     }
82 
83     if (io_read || io_write) {
84         HANDLE event;
85         long bitmask = 0;
86 
87         /* Alloc and insert if it's not already there */
88         node = g_new0(AioHandler, 1);
89         node->pfd.fd = fd;
90 
91         node->pfd.events = 0;
92         if (node->io_read) {
93             node->pfd.events |= G_IO_IN;
94         }
95         if (node->io_write) {
96             node->pfd.events |= G_IO_OUT;
97         }
98 
99         node->e = &ctx->notifier;
100 
101         /* Update handler with latest information */
102         node->opaque = opaque;
103         node->io_read = io_read;
104         node->io_write = io_write;
105         node->is_external = is_external;
106 
107         if (io_read) {
108             bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
109         }
110 
111         if (io_write) {
112             bitmask |= FD_WRITE | FD_CONNECT;
113         }
114 
115         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
116         event = event_notifier_get_handle(&ctx->notifier);
117         WSAEventSelect(node->pfd.fd, event, bitmask);
118     }
119     if (old_node) {
120         aio_remove_fd_handler(ctx, old_node);
121     }
122 
123     qemu_lockcnt_unlock(&ctx->list_lock);
124     aio_notify(ctx);
125 }
126 
127 void aio_set_fd_poll(AioContext *ctx, int fd,
128                      IOHandler *io_poll_begin,
129                      IOHandler *io_poll_end)
130 {
131     /* Not implemented */
132 }
133 
134 void aio_set_event_notifier(AioContext *ctx,
135                             EventNotifier *e,
136                             bool is_external,
137                             EventNotifierHandler *io_notify,
138                             AioPollFn *io_poll)
139 {
140     AioHandler *node;
141 
142     qemu_lockcnt_lock(&ctx->list_lock);
143     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
144         if (node->e == e && !node->deleted) {
145             break;
146         }
147     }
148 
149     /* Are we deleting the fd handler? */
150     if (!io_notify) {
151         if (node) {
152             aio_remove_fd_handler(ctx, node);
153         }
154     } else {
155         if (node == NULL) {
156             /* Alloc and insert if it's not already there */
157             node = g_new0(AioHandler, 1);
158             node->e = e;
159             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
160             node->pfd.events = G_IO_IN;
161             node->is_external = is_external;
162             QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
163 
164             g_source_add_poll(&ctx->source, &node->pfd);
165         }
166         /* Update handler with latest information */
167         node->io_notify = io_notify;
168     }
169 
170     qemu_lockcnt_unlock(&ctx->list_lock);
171     aio_notify(ctx);
172 }
173 
174 void aio_set_event_notifier_poll(AioContext *ctx,
175                                  EventNotifier *notifier,
176                                  EventNotifierHandler *io_poll_begin,
177                                  EventNotifierHandler *io_poll_end)
178 {
179     /* Not implemented */
180 }
181 
182 bool aio_prepare(AioContext *ctx)
183 {
184     static struct timeval tv0;
185     AioHandler *node;
186     bool have_select_revents = false;
187     fd_set rfds, wfds;
188 
189     /*
190      * We have to walk very carefully in case aio_set_fd_handler is
191      * called while we're walking.
192      */
193     qemu_lockcnt_inc(&ctx->list_lock);
194 
195     /* fill fd sets */
196     FD_ZERO(&rfds);
197     FD_ZERO(&wfds);
198     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
199         if (node->io_read) {
200             FD_SET ((SOCKET)node->pfd.fd, &rfds);
201         }
202         if (node->io_write) {
203             FD_SET ((SOCKET)node->pfd.fd, &wfds);
204         }
205     }
206 
207     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
208         QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
209             node->pfd.revents = 0;
210             if (FD_ISSET(node->pfd.fd, &rfds)) {
211                 node->pfd.revents |= G_IO_IN;
212                 have_select_revents = true;
213             }
214 
215             if (FD_ISSET(node->pfd.fd, &wfds)) {
216                 node->pfd.revents |= G_IO_OUT;
217                 have_select_revents = true;
218             }
219         }
220     }
221 
222     qemu_lockcnt_dec(&ctx->list_lock);
223     return have_select_revents;
224 }
225 
226 bool aio_pending(AioContext *ctx)
227 {
228     AioHandler *node;
229     bool result = false;
230 
231     /*
232      * We have to walk very carefully in case aio_set_fd_handler is
233      * called while we're walking.
234      */
235     qemu_lockcnt_inc(&ctx->list_lock);
236     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
237         if (node->pfd.revents && node->io_notify) {
238             result = true;
239             break;
240         }
241 
242         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
243             result = true;
244             break;
245         }
246         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
247             result = true;
248             break;
249         }
250     }
251 
252     qemu_lockcnt_dec(&ctx->list_lock);
253     return result;
254 }
255 
256 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
257 {
258     AioHandler *node;
259     bool progress = false;
260     AioHandler *tmp;
261 
262     /*
263      * We have to walk very carefully in case aio_set_fd_handler is
264      * called while we're walking.
265      */
266     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
267         int revents = node->pfd.revents;
268 
269         if (!node->deleted &&
270             (revents || event_notifier_get_handle(node->e) == event) &&
271             node->io_notify) {
272             node->pfd.revents = 0;
273             node->io_notify(node->e);
274 
275             /* aio_notify() does not count as progress */
276             if (node->e != &ctx->notifier) {
277                 progress = true;
278             }
279         }
280 
281         if (!node->deleted &&
282             (node->io_read || node->io_write)) {
283             node->pfd.revents = 0;
284             if ((revents & G_IO_IN) && node->io_read) {
285                 node->io_read(node->opaque);
286                 progress = true;
287             }
288             if ((revents & G_IO_OUT) && node->io_write) {
289                 node->io_write(node->opaque);
290                 progress = true;
291             }
292 
293             /* if the next select() will return an event, we have progressed */
294             if (event == event_notifier_get_handle(&ctx->notifier)) {
295                 WSANETWORKEVENTS ev;
296                 WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
297                 if (ev.lNetworkEvents) {
298                     progress = true;
299                 }
300             }
301         }
302 
303         if (node->deleted) {
304             if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
305                 QLIST_REMOVE(node, node);
306                 g_free(node);
307                 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
308             }
309         }
310     }
311 
312     return progress;
313 }
314 
315 void aio_dispatch(AioContext *ctx)
316 {
317     qemu_lockcnt_inc(&ctx->list_lock);
318     aio_bh_poll(ctx);
319     aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
320     qemu_lockcnt_dec(&ctx->list_lock);
321     timerlistgroup_run_timers(&ctx->tlg);
322 }
323 
324 bool aio_poll(AioContext *ctx, bool blocking)
325 {
326     AioHandler *node;
327     HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
328     bool progress, have_select_revents, first;
329     int count;
330     int timeout;
331 
332     /*
333      * There cannot be two concurrent aio_poll calls for the same AioContext (or
334      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
335      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
336      */
337     assert(in_aio_context_home_thread(ctx));
338     progress = false;
339 
340     /* aio_notify can avoid the expensive event_notifier_set if
341      * everything (file descriptors, bottom halves, timers) will
342      * be re-evaluated before the next blocking poll().  This is
343      * already true when aio_poll is called with blocking == false;
344      * if blocking == true, it is only true after poll() returns,
345      * so disable the optimization now.
346      */
347     if (blocking) {
348         qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
349         /*
350          * Write ctx->notify_me before computing the timeout
351          * (reading bottom half flags, etc.).  Pairs with
352          * smp_mb in aio_notify().
353          */
354         smp_mb();
355     }
356 
357     qemu_lockcnt_inc(&ctx->list_lock);
358     have_select_revents = aio_prepare(ctx);
359 
360     /* fill fd sets */
361     count = 0;
362     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
363         if (!node->deleted && node->io_notify
364             && aio_node_check(ctx, node->is_external)) {
365             events[count++] = event_notifier_get_handle(node->e);
366         }
367     }
368 
369     first = true;
370 
371     /* ctx->notifier is always registered.  */
372     assert(count > 0);
373 
374     /* Multiple iterations, all of them non-blocking except the first,
375      * may be necessary to process all pending events.  After the first
376      * WaitForMultipleObjects call ctx->notify_me will be decremented.
377      */
378     do {
379         HANDLE event;
380         int ret;
381 
382         timeout = blocking && !have_select_revents
383             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
384         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
385         if (blocking) {
386             assert(first);
387             qatomic_store_release(&ctx->notify_me,
388                                   qatomic_read(&ctx->notify_me) - 2);
389             aio_notify_accept(ctx);
390         }
391 
392         if (first) {
393             progress |= aio_bh_poll(ctx);
394             first = false;
395         }
396 
397         /* if we have any signaled events, dispatch event */
398         event = NULL;
399         if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
400             event = events[ret - WAIT_OBJECT_0];
401             events[ret - WAIT_OBJECT_0] = events[--count];
402         } else if (!have_select_revents) {
403             break;
404         }
405 
406         have_select_revents = false;
407         blocking = false;
408 
409         progress |= aio_dispatch_handlers(ctx, event);
410     } while (count > 0);
411 
412     qemu_lockcnt_dec(&ctx->list_lock);
413 
414     progress |= timerlistgroup_run_timers(&ctx->tlg);
415     return progress;
416 }
417 
418 void aio_context_setup(AioContext *ctx)
419 {
420 }
421 
422 void aio_context_destroy(AioContext *ctx)
423 {
424 }
425 
426 void aio_context_use_g_source(AioContext *ctx)
427 {
428 }
429 
430 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
431                                  int64_t grow, int64_t shrink, Error **errp)
432 {
433     if (max_ns) {
434         error_setg(errp, "AioContext polling is not implemented on Windows");
435     }
436 }
437