xref: /qemu/io/channel.c (revision b2a3cbb8)
1 /*
2  * QEMU I/O channels
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/module.h"
26 #include "qemu/iov.h"
27 
28 bool qio_channel_has_feature(QIOChannel *ioc,
29                              QIOChannelFeature feature)
30 {
31     return ioc->features & (1 << feature);
32 }
33 
34 
35 void qio_channel_set_feature(QIOChannel *ioc,
36                              QIOChannelFeature feature)
37 {
38     ioc->features |= (1 << feature);
39 }
40 
41 
42 void qio_channel_set_name(QIOChannel *ioc,
43                           const char *name)
44 {
45     g_free(ioc->name);
46     ioc->name = g_strdup(name);
47 }
48 
49 
50 ssize_t qio_channel_readv_full(QIOChannel *ioc,
51                                const struct iovec *iov,
52                                size_t niov,
53                                int **fds,
54                                size_t *nfds,
55                                Error **errp)
56 {
57     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
58 
59     if ((fds || nfds) &&
60         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
61         error_setg_errno(errp, EINVAL,
62                          "Channel does not support file descriptor passing");
63         return -1;
64     }
65 
66     return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
67 }
68 
69 
70 ssize_t qio_channel_writev_full(QIOChannel *ioc,
71                                 const struct iovec *iov,
72                                 size_t niov,
73                                 int *fds,
74                                 size_t nfds,
75                                 int flags,
76                                 Error **errp)
77 {
78     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
79 
80     if (fds || nfds) {
81         if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
82             error_setg_errno(errp, EINVAL,
83                              "Channel does not support file descriptor passing");
84             return -1;
85         }
86         if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
87             error_setg_errno(errp, EINVAL,
88                              "Zero Copy does not support file descriptor passing");
89             return -1;
90         }
91     }
92 
93     if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
94         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
95         error_setg_errno(errp, EINVAL,
96                          "Requested Zero Copy feature is not available");
97         return -1;
98     }
99 
100     return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
101 }
102 
103 
104 int qio_channel_readv_all_eof(QIOChannel *ioc,
105                               const struct iovec *iov,
106                               size_t niov,
107                               Error **errp)
108 {
109     return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
110 }
111 
112 int qio_channel_readv_all(QIOChannel *ioc,
113                           const struct iovec *iov,
114                           size_t niov,
115                           Error **errp)
116 {
117     return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
118 }
119 
120 int qio_channel_readv_full_all_eof(QIOChannel *ioc,
121                                    const struct iovec *iov,
122                                    size_t niov,
123                                    int **fds, size_t *nfds,
124                                    Error **errp)
125 {
126     int ret = -1;
127     struct iovec *local_iov = g_new(struct iovec, niov);
128     struct iovec *local_iov_head = local_iov;
129     unsigned int nlocal_iov = niov;
130     int **local_fds = fds;
131     size_t *local_nfds = nfds;
132     bool partial = false;
133 
134     if (nfds) {
135         *nfds = 0;
136     }
137 
138     if (fds) {
139         *fds = NULL;
140     }
141 
142     nlocal_iov = iov_copy(local_iov, nlocal_iov,
143                           iov, niov,
144                           0, iov_size(iov, niov));
145 
146     while ((nlocal_iov > 0) || local_fds) {
147         ssize_t len;
148         len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
149                                      local_nfds, errp);
150         if (len == QIO_CHANNEL_ERR_BLOCK) {
151             if (qemu_in_coroutine()) {
152                 qio_channel_yield(ioc, G_IO_IN);
153             } else {
154                 qio_channel_wait(ioc, G_IO_IN);
155             }
156             continue;
157         }
158 
159         if (len == 0) {
160             if (local_nfds && *local_nfds) {
161                 /*
162                  * Got some FDs, but no data yet. This isn't an EOF
163                  * scenario (yet), so carry on to try to read data
164                  * on next loop iteration
165                  */
166                 goto next_iter;
167             } else if (!partial) {
168                 /* No fds and no data - EOF before any data read */
169                 ret = 0;
170                 goto cleanup;
171             } else {
172                 len = -1;
173                 error_setg(errp,
174                            "Unexpected end-of-file before all data were read");
175                 /* Fallthrough into len < 0 handling */
176             }
177         }
178 
179         if (len < 0) {
180             /* Close any FDs we previously received */
181             if (nfds && fds) {
182                 size_t i;
183                 for (i = 0; i < (*nfds); i++) {
184                     close((*fds)[i]);
185                 }
186                 g_free(*fds);
187                 *fds = NULL;
188                 *nfds = 0;
189             }
190             goto cleanup;
191         }
192 
193         if (nlocal_iov) {
194             iov_discard_front(&local_iov, &nlocal_iov, len);
195         }
196 
197 next_iter:
198         partial = true;
199         local_fds = NULL;
200         local_nfds = NULL;
201     }
202 
203     ret = 1;
204 
205  cleanup:
206     g_free(local_iov_head);
207     return ret;
208 }
209 
210 int qio_channel_readv_full_all(QIOChannel *ioc,
211                                const struct iovec *iov,
212                                size_t niov,
213                                int **fds, size_t *nfds,
214                                Error **errp)
215 {
216     int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
217 
218     if (ret == 0) {
219         error_setg(errp, "Unexpected end-of-file before all data were read");
220         return -1;
221     }
222     if (ret == 1) {
223         return 0;
224     }
225 
226     return ret;
227 }
228 
229 int qio_channel_writev_all(QIOChannel *ioc,
230                            const struct iovec *iov,
231                            size_t niov,
232                            Error **errp)
233 {
234     return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
235 }
236 
237 int qio_channel_writev_full_all(QIOChannel *ioc,
238                                 const struct iovec *iov,
239                                 size_t niov,
240                                 int *fds, size_t nfds,
241                                 int flags, Error **errp)
242 {
243     int ret = -1;
244     struct iovec *local_iov = g_new(struct iovec, niov);
245     struct iovec *local_iov_head = local_iov;
246     unsigned int nlocal_iov = niov;
247 
248     nlocal_iov = iov_copy(local_iov, nlocal_iov,
249                           iov, niov,
250                           0, iov_size(iov, niov));
251 
252     while (nlocal_iov > 0) {
253         ssize_t len;
254 
255         len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
256                                             nfds, flags, errp);
257 
258         if (len == QIO_CHANNEL_ERR_BLOCK) {
259             if (qemu_in_coroutine()) {
260                 qio_channel_yield(ioc, G_IO_OUT);
261             } else {
262                 qio_channel_wait(ioc, G_IO_OUT);
263             }
264             continue;
265         }
266         if (len < 0) {
267             goto cleanup;
268         }
269 
270         iov_discard_front(&local_iov, &nlocal_iov, len);
271 
272         fds = NULL;
273         nfds = 0;
274     }
275 
276     ret = 0;
277  cleanup:
278     g_free(local_iov_head);
279     return ret;
280 }
281 
282 ssize_t qio_channel_readv(QIOChannel *ioc,
283                           const struct iovec *iov,
284                           size_t niov,
285                           Error **errp)
286 {
287     return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
288 }
289 
290 
291 ssize_t qio_channel_writev(QIOChannel *ioc,
292                            const struct iovec *iov,
293                            size_t niov,
294                            Error **errp)
295 {
296     return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
297 }
298 
299 
300 ssize_t qio_channel_read(QIOChannel *ioc,
301                          char *buf,
302                          size_t buflen,
303                          Error **errp)
304 {
305     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
306     return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
307 }
308 
309 
310 ssize_t qio_channel_write(QIOChannel *ioc,
311                           const char *buf,
312                           size_t buflen,
313                           Error **errp)
314 {
315     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
316     return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
317 }
318 
319 
320 int qio_channel_read_all_eof(QIOChannel *ioc,
321                              char *buf,
322                              size_t buflen,
323                              Error **errp)
324 {
325     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
326     return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
327 }
328 
329 
330 int qio_channel_read_all(QIOChannel *ioc,
331                          char *buf,
332                          size_t buflen,
333                          Error **errp)
334 {
335     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
336     return qio_channel_readv_all(ioc, &iov, 1, errp);
337 }
338 
339 
340 int qio_channel_write_all(QIOChannel *ioc,
341                           const char *buf,
342                           size_t buflen,
343                           Error **errp)
344 {
345     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
346     return qio_channel_writev_all(ioc, &iov, 1, errp);
347 }
348 
349 
350 int qio_channel_set_blocking(QIOChannel *ioc,
351                               bool enabled,
352                               Error **errp)
353 {
354     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
355     return klass->io_set_blocking(ioc, enabled, errp);
356 }
357 
358 
359 int qio_channel_close(QIOChannel *ioc,
360                       Error **errp)
361 {
362     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
363     return klass->io_close(ioc, errp);
364 }
365 
366 
367 GSource *qio_channel_create_watch(QIOChannel *ioc,
368                                   GIOCondition condition)
369 {
370     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
371     GSource *ret = klass->io_create_watch(ioc, condition);
372 
373     if (ioc->name) {
374         g_source_set_name(ret, ioc->name);
375     }
376 
377     return ret;
378 }
379 
380 
381 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
382                                     AioContext *ctx,
383                                     IOHandler *io_read,
384                                     IOHandler *io_write,
385                                     void *opaque)
386 {
387     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
388 
389     klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
390 }
391 
392 guint qio_channel_add_watch_full(QIOChannel *ioc,
393                                  GIOCondition condition,
394                                  QIOChannelFunc func,
395                                  gpointer user_data,
396                                  GDestroyNotify notify,
397                                  GMainContext *context)
398 {
399     GSource *source;
400     guint id;
401 
402     source = qio_channel_create_watch(ioc, condition);
403 
404     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
405 
406     id = g_source_attach(source, context);
407     g_source_unref(source);
408 
409     return id;
410 }
411 
412 guint qio_channel_add_watch(QIOChannel *ioc,
413                             GIOCondition condition,
414                             QIOChannelFunc func,
415                             gpointer user_data,
416                             GDestroyNotify notify)
417 {
418     return qio_channel_add_watch_full(ioc, condition, func,
419                                       user_data, notify, NULL);
420 }
421 
422 GSource *qio_channel_add_watch_source(QIOChannel *ioc,
423                                       GIOCondition condition,
424                                       QIOChannelFunc func,
425                                       gpointer user_data,
426                                       GDestroyNotify notify,
427                                       GMainContext *context)
428 {
429     GSource *source;
430     guint id;
431 
432     id = qio_channel_add_watch_full(ioc, condition, func,
433                                     user_data, notify, context);
434     source = g_main_context_find_source_by_id(context, id);
435     g_source_ref(source);
436     return source;
437 }
438 
439 
440 int qio_channel_shutdown(QIOChannel *ioc,
441                          QIOChannelShutdown how,
442                          Error **errp)
443 {
444     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
445 
446     if (!klass->io_shutdown) {
447         error_setg(errp, "Data path shutdown not supported");
448         return -1;
449     }
450 
451     return klass->io_shutdown(ioc, how, errp);
452 }
453 
454 
455 void qio_channel_set_delay(QIOChannel *ioc,
456                            bool enabled)
457 {
458     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
459 
460     if (klass->io_set_delay) {
461         klass->io_set_delay(ioc, enabled);
462     }
463 }
464 
465 
466 void qio_channel_set_cork(QIOChannel *ioc,
467                           bool enabled)
468 {
469     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
470 
471     if (klass->io_set_cork) {
472         klass->io_set_cork(ioc, enabled);
473     }
474 }
475 
476 
477 off_t qio_channel_io_seek(QIOChannel *ioc,
478                           off_t offset,
479                           int whence,
480                           Error **errp)
481 {
482     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
483 
484     if (!klass->io_seek) {
485         error_setg(errp, "Channel does not support random access");
486         return -1;
487     }
488 
489     return klass->io_seek(ioc, offset, whence, errp);
490 }
491 
492 int qio_channel_flush(QIOChannel *ioc,
493                                 Error **errp)
494 {
495     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
496 
497     if (!klass->io_flush ||
498         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
499         return 0;
500     }
501 
502     return klass->io_flush(ioc, errp);
503 }
504 
505 
506 static void qio_channel_restart_read(void *opaque)
507 {
508     QIOChannel *ioc = opaque;
509     Coroutine *co = ioc->read_coroutine;
510 
511     /* Assert that aio_co_wake() reenters the coroutine directly */
512     assert(qemu_get_current_aio_context() ==
513            qemu_coroutine_get_aio_context(co));
514     aio_co_wake(co);
515 }
516 
517 static void qio_channel_restart_write(void *opaque)
518 {
519     QIOChannel *ioc = opaque;
520     Coroutine *co = ioc->write_coroutine;
521 
522     /* Assert that aio_co_wake() reenters the coroutine directly */
523     assert(qemu_get_current_aio_context() ==
524            qemu_coroutine_get_aio_context(co));
525     aio_co_wake(co);
526 }
527 
528 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
529 {
530     IOHandler *rd_handler = NULL, *wr_handler = NULL;
531     AioContext *ctx;
532 
533     if (ioc->read_coroutine) {
534         rd_handler = qio_channel_restart_read;
535     }
536     if (ioc->write_coroutine) {
537         wr_handler = qio_channel_restart_write;
538     }
539 
540     ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
541     qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
542 }
543 
544 void qio_channel_attach_aio_context(QIOChannel *ioc,
545                                     AioContext *ctx)
546 {
547     assert(!ioc->read_coroutine);
548     assert(!ioc->write_coroutine);
549     ioc->ctx = ctx;
550 }
551 
552 void qio_channel_detach_aio_context(QIOChannel *ioc)
553 {
554     ioc->read_coroutine = NULL;
555     ioc->write_coroutine = NULL;
556     qio_channel_set_aio_fd_handlers(ioc);
557     ioc->ctx = NULL;
558 }
559 
560 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
561                                     GIOCondition condition)
562 {
563     assert(qemu_in_coroutine());
564     if (condition == G_IO_IN) {
565         assert(!ioc->read_coroutine);
566         ioc->read_coroutine = qemu_coroutine_self();
567     } else if (condition == G_IO_OUT) {
568         assert(!ioc->write_coroutine);
569         ioc->write_coroutine = qemu_coroutine_self();
570     } else {
571         abort();
572     }
573     qio_channel_set_aio_fd_handlers(ioc);
574     qemu_coroutine_yield();
575 
576     /* Allow interrupting the operation by reentering the coroutine other than
577      * through the aio_fd_handlers. */
578     if (condition == G_IO_IN && ioc->read_coroutine) {
579         ioc->read_coroutine = NULL;
580         qio_channel_set_aio_fd_handlers(ioc);
581     } else if (condition == G_IO_OUT && ioc->write_coroutine) {
582         ioc->write_coroutine = NULL;
583         qio_channel_set_aio_fd_handlers(ioc);
584     }
585 }
586 
587 
588 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
589                                           GIOCondition condition,
590                                           gpointer opaque)
591 {
592     GMainLoop *loop = opaque;
593 
594     g_main_loop_quit(loop);
595     return FALSE;
596 }
597 
598 
599 void qio_channel_wait(QIOChannel *ioc,
600                       GIOCondition condition)
601 {
602     GMainContext *ctxt = g_main_context_new();
603     GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
604     GSource *source;
605 
606     source = qio_channel_create_watch(ioc, condition);
607 
608     g_source_set_callback(source,
609                           (GSourceFunc)qio_channel_wait_complete,
610                           loop,
611                           NULL);
612 
613     g_source_attach(source, ctxt);
614 
615     g_main_loop_run(loop);
616 
617     g_source_unref(source);
618     g_main_loop_unref(loop);
619     g_main_context_unref(ctxt);
620 }
621 
622 
623 static void qio_channel_finalize(Object *obj)
624 {
625     QIOChannel *ioc = QIO_CHANNEL(obj);
626 
627     g_free(ioc->name);
628 
629 #ifdef _WIN32
630     if (ioc->event) {
631         CloseHandle(ioc->event);
632     }
633 #endif
634 }
635 
636 static const TypeInfo qio_channel_info = {
637     .parent = TYPE_OBJECT,
638     .name = TYPE_QIO_CHANNEL,
639     .instance_size = sizeof(QIOChannel),
640     .instance_finalize = qio_channel_finalize,
641     .abstract = true,
642     .class_size = sizeof(QIOChannelClass),
643 };
644 
645 
646 static void qio_channel_register_types(void)
647 {
648     type_register_static(&qio_channel_info);
649 }
650 
651 
652 type_init(qio_channel_register_types);
653