xref: /qemu/util/aio-win32.c (revision abff1abf)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM Corp., 2008
5  * Copyright Red Hat Inc., 2012
6  *
7  * Authors:
8  *  Anthony Liguori   <aliguori@us.ibm.com>
9  *  Paolo Bonzini     <pbonzini@redhat.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2.  See
12  * the COPYING file in the top-level directory.
13  *
14  * Contributions after 2012-01-13 are licensed under the terms of the
15  * GNU GPL, version 2 or (at your option) any later version.
16  */
17 
18 #include "qemu/osdep.h"
19 #include "qemu-common.h"
20 #include "block/block.h"
21 #include "qemu/queue.h"
22 #include "qemu/sockets.h"
23 #include "qapi/error.h"
24 #include "qemu/rcu_queue.h"
25 
26 struct AioHandler {
27     EventNotifier *e;
28     IOHandler *io_read;
29     IOHandler *io_write;
30     EventNotifierHandler *io_notify;
31     GPollFD pfd;
32     int deleted;
33     void *opaque;
34     bool is_external;
35     QLIST_ENTRY(AioHandler) node;
36 };
37 
38 static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
39 {
40     /* If aio_poll is in progress, just mark the node as deleted */
41     if (qemu_lockcnt_count(&ctx->list_lock)) {
42         node->deleted = 1;
43         node->pfd.revents = 0;
44     } else {
45         /* Otherwise, delete it for real.  We can't just mark it as
46          * deleted because deleted nodes are only cleaned up after
47          * releasing the list_lock.
48          */
49         QLIST_REMOVE(node, node);
50         g_free(node);
51     }
52 }
53 
54 void aio_set_fd_handler(AioContext *ctx,
55                         int fd,
56                         bool is_external,
57                         IOHandler *io_read,
58                         IOHandler *io_write,
59                         AioPollFn *io_poll,
60                         void *opaque)
61 {
62     /* fd is a SOCKET in our case */
63     AioHandler *old_node;
64     AioHandler *node = NULL;
65 
66     qemu_lockcnt_lock(&ctx->list_lock);
67     QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
68         if (old_node->pfd.fd == fd && !old_node->deleted) {
69             break;
70         }
71     }
72 
73     if (io_read || io_write) {
74         HANDLE event;
75         long bitmask = 0;
76 
77         /* Alloc and insert if it's not already there */
78         node = g_new0(AioHandler, 1);
79         node->pfd.fd = fd;
80 
81         node->pfd.events = 0;
82         if (node->io_read) {
83             node->pfd.events |= G_IO_IN;
84         }
85         if (node->io_write) {
86             node->pfd.events |= G_IO_OUT;
87         }
88 
89         node->e = &ctx->notifier;
90 
91         /* Update handler with latest information */
92         node->opaque = opaque;
93         node->io_read = io_read;
94         node->io_write = io_write;
95         node->is_external = is_external;
96 
97         if (io_read) {
98             bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
99         }
100 
101         if (io_write) {
102             bitmask |= FD_WRITE | FD_CONNECT;
103         }
104 
105         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
106         event = event_notifier_get_handle(&ctx->notifier);
107         WSAEventSelect(node->pfd.fd, event, bitmask);
108     }
109     if (old_node) {
110         aio_remove_fd_handler(ctx, old_node);
111     }
112 
113     qemu_lockcnt_unlock(&ctx->list_lock);
114     aio_notify(ctx);
115 }
116 
117 void aio_set_fd_poll(AioContext *ctx, int fd,
118                      IOHandler *io_poll_begin,
119                      IOHandler *io_poll_end)
120 {
121     /* Not implemented */
122 }
123 
124 void aio_set_event_notifier(AioContext *ctx,
125                             EventNotifier *e,
126                             bool is_external,
127                             EventNotifierHandler *io_notify,
128                             AioPollFn *io_poll)
129 {
130     AioHandler *node;
131 
132     qemu_lockcnt_lock(&ctx->list_lock);
133     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
134         if (node->e == e && !node->deleted) {
135             break;
136         }
137     }
138 
139     /* Are we deleting the fd handler? */
140     if (!io_notify) {
141         if (node) {
142             g_source_remove_poll(&ctx->source, &node->pfd);
143 
144             aio_remove_fd_handler(ctx, node);
145         }
146     } else {
147         if (node == NULL) {
148             /* Alloc and insert if it's not already there */
149             node = g_new0(AioHandler, 1);
150             node->e = e;
151             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
152             node->pfd.events = G_IO_IN;
153             node->is_external = is_external;
154             QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
155 
156             g_source_add_poll(&ctx->source, &node->pfd);
157         }
158         /* Update handler with latest information */
159         node->io_notify = io_notify;
160     }
161 
162     qemu_lockcnt_unlock(&ctx->list_lock);
163     aio_notify(ctx);
164 }
165 
166 void aio_set_event_notifier_poll(AioContext *ctx,
167                                  EventNotifier *notifier,
168                                  EventNotifierHandler *io_poll_begin,
169                                  EventNotifierHandler *io_poll_end)
170 {
171     /* Not implemented */
172 }
173 
174 bool aio_prepare(AioContext *ctx)
175 {
176     static struct timeval tv0;
177     AioHandler *node;
178     bool have_select_revents = false;
179     fd_set rfds, wfds;
180 
181     /*
182      * We have to walk very carefully in case aio_set_fd_handler is
183      * called while we're walking.
184      */
185     qemu_lockcnt_inc(&ctx->list_lock);
186 
187     /* fill fd sets */
188     FD_ZERO(&rfds);
189     FD_ZERO(&wfds);
190     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
191         if (node->io_read) {
192             FD_SET ((SOCKET)node->pfd.fd, &rfds);
193         }
194         if (node->io_write) {
195             FD_SET ((SOCKET)node->pfd.fd, &wfds);
196         }
197     }
198 
199     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
200         QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
201             node->pfd.revents = 0;
202             if (FD_ISSET(node->pfd.fd, &rfds)) {
203                 node->pfd.revents |= G_IO_IN;
204                 have_select_revents = true;
205             }
206 
207             if (FD_ISSET(node->pfd.fd, &wfds)) {
208                 node->pfd.revents |= G_IO_OUT;
209                 have_select_revents = true;
210             }
211         }
212     }
213 
214     qemu_lockcnt_dec(&ctx->list_lock);
215     return have_select_revents;
216 }
217 
218 bool aio_pending(AioContext *ctx)
219 {
220     AioHandler *node;
221     bool result = false;
222 
223     /*
224      * We have to walk very carefully in case aio_set_fd_handler is
225      * called while we're walking.
226      */
227     qemu_lockcnt_inc(&ctx->list_lock);
228     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
229         if (node->pfd.revents && node->io_notify) {
230             result = true;
231             break;
232         }
233 
234         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
235             result = true;
236             break;
237         }
238         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
239             result = true;
240             break;
241         }
242     }
243 
244     qemu_lockcnt_dec(&ctx->list_lock);
245     return result;
246 }
247 
248 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
249 {
250     AioHandler *node;
251     bool progress = false;
252     AioHandler *tmp;
253 
254     /*
255      * We have to walk very carefully in case aio_set_fd_handler is
256      * called while we're walking.
257      */
258     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
259         int revents = node->pfd.revents;
260 
261         if (!node->deleted &&
262             (revents || event_notifier_get_handle(node->e) == event) &&
263             node->io_notify) {
264             node->pfd.revents = 0;
265             node->io_notify(node->e);
266 
267             /* aio_notify() does not count as progress */
268             if (node->e != &ctx->notifier) {
269                 progress = true;
270             }
271         }
272 
273         if (!node->deleted &&
274             (node->io_read || node->io_write)) {
275             node->pfd.revents = 0;
276             if ((revents & G_IO_IN) && node->io_read) {
277                 node->io_read(node->opaque);
278                 progress = true;
279             }
280             if ((revents & G_IO_OUT) && node->io_write) {
281                 node->io_write(node->opaque);
282                 progress = true;
283             }
284 
285             /* if the next select() will return an event, we have progressed */
286             if (event == event_notifier_get_handle(&ctx->notifier)) {
287                 WSANETWORKEVENTS ev;
288                 WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
289                 if (ev.lNetworkEvents) {
290                     progress = true;
291                 }
292             }
293         }
294 
295         if (node->deleted) {
296             if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
297                 QLIST_REMOVE(node, node);
298                 g_free(node);
299                 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
300             }
301         }
302     }
303 
304     return progress;
305 }
306 
307 void aio_dispatch(AioContext *ctx)
308 {
309     qemu_lockcnt_inc(&ctx->list_lock);
310     aio_bh_poll(ctx);
311     aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
312     qemu_lockcnt_dec(&ctx->list_lock);
313     timerlistgroup_run_timers(&ctx->tlg);
314 }
315 
316 bool aio_poll(AioContext *ctx, bool blocking)
317 {
318     AioHandler *node;
319     HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
320     bool progress, have_select_revents, first;
321     int count;
322     int timeout;
323 
324     /*
325      * There cannot be two concurrent aio_poll calls for the same AioContext (or
326      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
327      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
328      */
329     assert(in_aio_context_home_thread(ctx));
330     progress = false;
331 
332     /* aio_notify can avoid the expensive event_notifier_set if
333      * everything (file descriptors, bottom halves, timers) will
334      * be re-evaluated before the next blocking poll().  This is
335      * already true when aio_poll is called with blocking == false;
336      * if blocking == true, it is only true after poll() returns,
337      * so disable the optimization now.
338      */
339     if (blocking) {
340         atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
341         /*
342          * Write ctx->notify_me before computing the timeout
343          * (reading bottom half flags, etc.).  Pairs with
344          * smp_mb in aio_notify().
345          */
346         smp_mb();
347     }
348 
349     qemu_lockcnt_inc(&ctx->list_lock);
350     have_select_revents = aio_prepare(ctx);
351 
352     /* fill fd sets */
353     count = 0;
354     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
355         if (!node->deleted && node->io_notify
356             && aio_node_check(ctx, node->is_external)) {
357             events[count++] = event_notifier_get_handle(node->e);
358         }
359     }
360 
361     first = true;
362 
363     /* ctx->notifier is always registered.  */
364     assert(count > 0);
365 
366     /* Multiple iterations, all of them non-blocking except the first,
367      * may be necessary to process all pending events.  After the first
368      * WaitForMultipleObjects call ctx->notify_me will be decremented.
369      */
370     do {
371         HANDLE event;
372         int ret;
373 
374         timeout = blocking && !have_select_revents
375             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
376         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
377         if (blocking) {
378             assert(first);
379             atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
380             aio_notify_accept(ctx);
381         }
382 
383         if (first) {
384             progress |= aio_bh_poll(ctx);
385             first = false;
386         }
387 
388         /* if we have any signaled events, dispatch event */
389         event = NULL;
390         if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
391             event = events[ret - WAIT_OBJECT_0];
392             events[ret - WAIT_OBJECT_0] = events[--count];
393         } else if (!have_select_revents) {
394             break;
395         }
396 
397         have_select_revents = false;
398         blocking = false;
399 
400         progress |= aio_dispatch_handlers(ctx, event);
401     } while (count > 0);
402 
403     qemu_lockcnt_dec(&ctx->list_lock);
404 
405     progress |= timerlistgroup_run_timers(&ctx->tlg);
406     return progress;
407 }
408 
409 void aio_context_setup(AioContext *ctx)
410 {
411 }
412 
413 void aio_context_destroy(AioContext *ctx)
414 {
415 }
416 
417 void aio_context_use_g_source(AioContext *ctx)
418 {
419 }
420 
421 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
422                                  int64_t grow, int64_t shrink, Error **errp)
423 {
424     if (max_ns) {
425         error_setg(errp, "AioContext polling is not implemented on Windows");
426     }
427 }
428