xref: /qemu/io/channel.c (revision 92eecfff)
1 /*
2  * QEMU I/O channels
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/module.h"
26 #include "qemu/iov.h"
27 
28 bool qio_channel_has_feature(QIOChannel *ioc,
29                              QIOChannelFeature feature)
30 {
31     return ioc->features & (1 << feature);
32 }
33 
34 
35 void qio_channel_set_feature(QIOChannel *ioc,
36                              QIOChannelFeature feature)
37 {
38     ioc->features |= (1 << feature);
39 }
40 
41 
42 void qio_channel_set_name(QIOChannel *ioc,
43                           const char *name)
44 {
45     g_free(ioc->name);
46     ioc->name = g_strdup(name);
47 }
48 
49 
50 ssize_t qio_channel_readv_full(QIOChannel *ioc,
51                                const struct iovec *iov,
52                                size_t niov,
53                                int **fds,
54                                size_t *nfds,
55                                Error **errp)
56 {
57     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
58 
59     if ((fds || nfds) &&
60         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
61         error_setg_errno(errp, EINVAL,
62                          "Channel does not support file descriptor passing");
63         return -1;
64     }
65 
66     return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
67 }
68 
69 
70 ssize_t qio_channel_writev_full(QIOChannel *ioc,
71                                 const struct iovec *iov,
72                                 size_t niov,
73                                 int *fds,
74                                 size_t nfds,
75                                 Error **errp)
76 {
77     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
78 
79     if ((fds || nfds) &&
80         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
81         error_setg_errno(errp, EINVAL,
82                          "Channel does not support file descriptor passing");
83         return -1;
84     }
85 
86     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
87 }
88 
89 
90 int qio_channel_readv_all_eof(QIOChannel *ioc,
91                               const struct iovec *iov,
92                               size_t niov,
93                               Error **errp)
94 {
95     int ret = -1;
96     struct iovec *local_iov = g_new(struct iovec, niov);
97     struct iovec *local_iov_head = local_iov;
98     unsigned int nlocal_iov = niov;
99     bool partial = false;
100 
101     nlocal_iov = iov_copy(local_iov, nlocal_iov,
102                           iov, niov,
103                           0, iov_size(iov, niov));
104 
105     while (nlocal_iov > 0) {
106         ssize_t len;
107         len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
108         if (len == QIO_CHANNEL_ERR_BLOCK) {
109             if (qemu_in_coroutine()) {
110                 qio_channel_yield(ioc, G_IO_IN);
111             } else {
112                 qio_channel_wait(ioc, G_IO_IN);
113             }
114             continue;
115         } else if (len < 0) {
116             goto cleanup;
117         } else if (len == 0) {
118             if (partial) {
119                 error_setg(errp,
120                            "Unexpected end-of-file before all bytes were read");
121             } else {
122                 ret = 0;
123             }
124             goto cleanup;
125         }
126 
127         partial = true;
128         iov_discard_front(&local_iov, &nlocal_iov, len);
129     }
130 
131     ret = 1;
132 
133  cleanup:
134     g_free(local_iov_head);
135     return ret;
136 }
137 
138 int qio_channel_readv_all(QIOChannel *ioc,
139                           const struct iovec *iov,
140                           size_t niov,
141                           Error **errp)
142 {
143     int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp);
144 
145     if (ret == 0) {
146         ret = -1;
147         error_setg(errp,
148                    "Unexpected end-of-file before all bytes were read");
149     } else if (ret == 1) {
150         ret = 0;
151     }
152     return ret;
153 }
154 
155 int qio_channel_writev_all(QIOChannel *ioc,
156                            const struct iovec *iov,
157                            size_t niov,
158                            Error **errp)
159 {
160     int ret = -1;
161     struct iovec *local_iov = g_new(struct iovec, niov);
162     struct iovec *local_iov_head = local_iov;
163     unsigned int nlocal_iov = niov;
164 
165     nlocal_iov = iov_copy(local_iov, nlocal_iov,
166                           iov, niov,
167                           0, iov_size(iov, niov));
168 
169     while (nlocal_iov > 0) {
170         ssize_t len;
171         len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
172         if (len == QIO_CHANNEL_ERR_BLOCK) {
173             if (qemu_in_coroutine()) {
174                 qio_channel_yield(ioc, G_IO_OUT);
175             } else {
176                 qio_channel_wait(ioc, G_IO_OUT);
177             }
178             continue;
179         }
180         if (len < 0) {
181             goto cleanup;
182         }
183 
184         iov_discard_front(&local_iov, &nlocal_iov, len);
185     }
186 
187     ret = 0;
188  cleanup:
189     g_free(local_iov_head);
190     return ret;
191 }
192 
193 ssize_t qio_channel_readv(QIOChannel *ioc,
194                           const struct iovec *iov,
195                           size_t niov,
196                           Error **errp)
197 {
198     return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
199 }
200 
201 
202 ssize_t qio_channel_writev(QIOChannel *ioc,
203                            const struct iovec *iov,
204                            size_t niov,
205                            Error **errp)
206 {
207     return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
208 }
209 
210 
211 ssize_t qio_channel_read(QIOChannel *ioc,
212                          char *buf,
213                          size_t buflen,
214                          Error **errp)
215 {
216     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
217     return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
218 }
219 
220 
221 ssize_t qio_channel_write(QIOChannel *ioc,
222                           const char *buf,
223                           size_t buflen,
224                           Error **errp)
225 {
226     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
227     return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
228 }
229 
230 
231 int qio_channel_read_all_eof(QIOChannel *ioc,
232                              char *buf,
233                              size_t buflen,
234                              Error **errp)
235 {
236     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
237     return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
238 }
239 
240 
241 int qio_channel_read_all(QIOChannel *ioc,
242                          char *buf,
243                          size_t buflen,
244                          Error **errp)
245 {
246     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
247     return qio_channel_readv_all(ioc, &iov, 1, errp);
248 }
249 
250 
251 int qio_channel_write_all(QIOChannel *ioc,
252                           const char *buf,
253                           size_t buflen,
254                           Error **errp)
255 {
256     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
257     return qio_channel_writev_all(ioc, &iov, 1, errp);
258 }
259 
260 
261 int qio_channel_set_blocking(QIOChannel *ioc,
262                               bool enabled,
263                               Error **errp)
264 {
265     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
266     return klass->io_set_blocking(ioc, enabled, errp);
267 }
268 
269 
270 int qio_channel_close(QIOChannel *ioc,
271                       Error **errp)
272 {
273     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
274     return klass->io_close(ioc, errp);
275 }
276 
277 
278 GSource *qio_channel_create_watch(QIOChannel *ioc,
279                                   GIOCondition condition)
280 {
281     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
282     GSource *ret = klass->io_create_watch(ioc, condition);
283 
284     if (ioc->name) {
285         g_source_set_name(ret, ioc->name);
286     }
287 
288     return ret;
289 }
290 
291 
292 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
293                                     AioContext *ctx,
294                                     IOHandler *io_read,
295                                     IOHandler *io_write,
296                                     void *opaque)
297 {
298     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
299 
300     klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
301 }
302 
303 guint qio_channel_add_watch_full(QIOChannel *ioc,
304                                  GIOCondition condition,
305                                  QIOChannelFunc func,
306                                  gpointer user_data,
307                                  GDestroyNotify notify,
308                                  GMainContext *context)
309 {
310     GSource *source;
311     guint id;
312 
313     source = qio_channel_create_watch(ioc, condition);
314 
315     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
316 
317     id = g_source_attach(source, context);
318     g_source_unref(source);
319 
320     return id;
321 }
322 
323 guint qio_channel_add_watch(QIOChannel *ioc,
324                             GIOCondition condition,
325                             QIOChannelFunc func,
326                             gpointer user_data,
327                             GDestroyNotify notify)
328 {
329     return qio_channel_add_watch_full(ioc, condition, func,
330                                       user_data, notify, NULL);
331 }
332 
333 GSource *qio_channel_add_watch_source(QIOChannel *ioc,
334                                       GIOCondition condition,
335                                       QIOChannelFunc func,
336                                       gpointer user_data,
337                                       GDestroyNotify notify,
338                                       GMainContext *context)
339 {
340     GSource *source;
341     guint id;
342 
343     id = qio_channel_add_watch_full(ioc, condition, func,
344                                     user_data, notify, context);
345     source = g_main_context_find_source_by_id(context, id);
346     g_source_ref(source);
347     return source;
348 }
349 
350 
351 int qio_channel_shutdown(QIOChannel *ioc,
352                          QIOChannelShutdown how,
353                          Error **errp)
354 {
355     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
356 
357     if (!klass->io_shutdown) {
358         error_setg(errp, "Data path shutdown not supported");
359         return -1;
360     }
361 
362     return klass->io_shutdown(ioc, how, errp);
363 }
364 
365 
366 void qio_channel_set_delay(QIOChannel *ioc,
367                            bool enabled)
368 {
369     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
370 
371     if (klass->io_set_delay) {
372         klass->io_set_delay(ioc, enabled);
373     }
374 }
375 
376 
377 void qio_channel_set_cork(QIOChannel *ioc,
378                           bool enabled)
379 {
380     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
381 
382     if (klass->io_set_cork) {
383         klass->io_set_cork(ioc, enabled);
384     }
385 }
386 
387 
388 off_t qio_channel_io_seek(QIOChannel *ioc,
389                           off_t offset,
390                           int whence,
391                           Error **errp)
392 {
393     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
394 
395     if (!klass->io_seek) {
396         error_setg(errp, "Channel does not support random access");
397         return -1;
398     }
399 
400     return klass->io_seek(ioc, offset, whence, errp);
401 }
402 
403 
404 static void qio_channel_restart_read(void *opaque)
405 {
406     QIOChannel *ioc = opaque;
407     Coroutine *co = ioc->read_coroutine;
408 
409     /* Assert that aio_co_wake() reenters the coroutine directly */
410     assert(qemu_get_current_aio_context() ==
411            qemu_coroutine_get_aio_context(co));
412     aio_co_wake(co);
413 }
414 
415 static void qio_channel_restart_write(void *opaque)
416 {
417     QIOChannel *ioc = opaque;
418     Coroutine *co = ioc->write_coroutine;
419 
420     /* Assert that aio_co_wake() reenters the coroutine directly */
421     assert(qemu_get_current_aio_context() ==
422            qemu_coroutine_get_aio_context(co));
423     aio_co_wake(co);
424 }
425 
426 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
427 {
428     IOHandler *rd_handler = NULL, *wr_handler = NULL;
429     AioContext *ctx;
430 
431     if (ioc->read_coroutine) {
432         rd_handler = qio_channel_restart_read;
433     }
434     if (ioc->write_coroutine) {
435         wr_handler = qio_channel_restart_write;
436     }
437 
438     ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
439     qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
440 }
441 
442 void qio_channel_attach_aio_context(QIOChannel *ioc,
443                                     AioContext *ctx)
444 {
445     assert(!ioc->read_coroutine);
446     assert(!ioc->write_coroutine);
447     ioc->ctx = ctx;
448 }
449 
450 void qio_channel_detach_aio_context(QIOChannel *ioc)
451 {
452     ioc->read_coroutine = NULL;
453     ioc->write_coroutine = NULL;
454     qio_channel_set_aio_fd_handlers(ioc);
455     ioc->ctx = NULL;
456 }
457 
458 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
459                                     GIOCondition condition)
460 {
461     assert(qemu_in_coroutine());
462     if (condition == G_IO_IN) {
463         assert(!ioc->read_coroutine);
464         ioc->read_coroutine = qemu_coroutine_self();
465     } else if (condition == G_IO_OUT) {
466         assert(!ioc->write_coroutine);
467         ioc->write_coroutine = qemu_coroutine_self();
468     } else {
469         abort();
470     }
471     qio_channel_set_aio_fd_handlers(ioc);
472     qemu_coroutine_yield();
473 
474     /* Allow interrupting the operation by reentering the coroutine other than
475      * through the aio_fd_handlers. */
476     if (condition == G_IO_IN && ioc->read_coroutine) {
477         ioc->read_coroutine = NULL;
478         qio_channel_set_aio_fd_handlers(ioc);
479     } else if (condition == G_IO_OUT && ioc->write_coroutine) {
480         ioc->write_coroutine = NULL;
481         qio_channel_set_aio_fd_handlers(ioc);
482     }
483 }
484 
485 
486 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
487                                           GIOCondition condition,
488                                           gpointer opaque)
489 {
490     GMainLoop *loop = opaque;
491 
492     g_main_loop_quit(loop);
493     return FALSE;
494 }
495 
496 
497 void qio_channel_wait(QIOChannel *ioc,
498                       GIOCondition condition)
499 {
500     GMainContext *ctxt = g_main_context_new();
501     GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
502     GSource *source;
503 
504     source = qio_channel_create_watch(ioc, condition);
505 
506     g_source_set_callback(source,
507                           (GSourceFunc)qio_channel_wait_complete,
508                           loop,
509                           NULL);
510 
511     g_source_attach(source, ctxt);
512 
513     g_main_loop_run(loop);
514 
515     g_source_unref(source);
516     g_main_loop_unref(loop);
517     g_main_context_unref(ctxt);
518 }
519 
520 
521 static void qio_channel_finalize(Object *obj)
522 {
523     QIOChannel *ioc = QIO_CHANNEL(obj);
524 
525     g_free(ioc->name);
526 
527 #ifdef _WIN32
528     if (ioc->event) {
529         CloseHandle(ioc->event);
530     }
531 #endif
532 }
533 
534 static const TypeInfo qio_channel_info = {
535     .parent = TYPE_OBJECT,
536     .name = TYPE_QIO_CHANNEL,
537     .instance_size = sizeof(QIOChannel),
538     .instance_finalize = qio_channel_finalize,
539     .abstract = true,
540     .class_size = sizeof(QIOChannelClass),
541 };
542 
543 
544 static void qio_channel_register_types(void)
545 {
546     type_register_static(&qio_channel_info);
547 }
548 
549 
550 type_init(qio_channel_register_types);
551