xref: /qemu/io/channel.c (revision 5ac034b1)
1 /*
2  * QEMU I/O channels
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/module.h"
26 #include "qemu/iov.h"
27 
28 bool qio_channel_has_feature(QIOChannel *ioc,
29                              QIOChannelFeature feature)
30 {
31     return ioc->features & (1 << feature);
32 }
33 
34 
35 void qio_channel_set_feature(QIOChannel *ioc,
36                              QIOChannelFeature feature)
37 {
38     ioc->features |= (1 << feature);
39 }
40 
41 
42 void qio_channel_set_name(QIOChannel *ioc,
43                           const char *name)
44 {
45     g_free(ioc->name);
46     ioc->name = g_strdup(name);
47 }
48 
49 
50 ssize_t qio_channel_readv_full(QIOChannel *ioc,
51                                const struct iovec *iov,
52                                size_t niov,
53                                int **fds,
54                                size_t *nfds,
55                                int flags,
56                                Error **errp)
57 {
58     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
59 
60     if ((fds || nfds) &&
61         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
62         error_setg_errno(errp, EINVAL,
63                          "Channel does not support file descriptor passing");
64         return -1;
65     }
66 
67     if ((flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) &&
68         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
69         error_setg_errno(errp, EINVAL,
70                          "Channel does not support peek read");
71         return -1;
72     }
73 
74     return klass->io_readv(ioc, iov, niov, fds, nfds, flags, errp);
75 }
76 
77 
78 ssize_t qio_channel_writev_full(QIOChannel *ioc,
79                                 const struct iovec *iov,
80                                 size_t niov,
81                                 int *fds,
82                                 size_t nfds,
83                                 int flags,
84                                 Error **errp)
85 {
86     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
87 
88     if (fds || nfds) {
89         if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
90             error_setg_errno(errp, EINVAL,
91                              "Channel does not support file descriptor passing");
92             return -1;
93         }
94         if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
95             error_setg_errno(errp, EINVAL,
96                              "Zero Copy does not support file descriptor passing");
97             return -1;
98         }
99     }
100 
101     if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
102         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
103         error_setg_errno(errp, EINVAL,
104                          "Requested Zero Copy feature is not available");
105         return -1;
106     }
107 
108     return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
109 }
110 
111 
112 int qio_channel_readv_all_eof(QIOChannel *ioc,
113                               const struct iovec *iov,
114                               size_t niov,
115                               Error **errp)
116 {
117     return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
118 }
119 
120 int qio_channel_readv_all(QIOChannel *ioc,
121                           const struct iovec *iov,
122                           size_t niov,
123                           Error **errp)
124 {
125     return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
126 }
127 
128 int qio_channel_readv_full_all_eof(QIOChannel *ioc,
129                                    const struct iovec *iov,
130                                    size_t niov,
131                                    int **fds, size_t *nfds,
132                                    Error **errp)
133 {
134     int ret = -1;
135     struct iovec *local_iov = g_new(struct iovec, niov);
136     struct iovec *local_iov_head = local_iov;
137     unsigned int nlocal_iov = niov;
138     int **local_fds = fds;
139     size_t *local_nfds = nfds;
140     bool partial = false;
141 
142     if (nfds) {
143         *nfds = 0;
144     }
145 
146     if (fds) {
147         *fds = NULL;
148     }
149 
150     nlocal_iov = iov_copy(local_iov, nlocal_iov,
151                           iov, niov,
152                           0, iov_size(iov, niov));
153 
154     while ((nlocal_iov > 0) || local_fds) {
155         ssize_t len;
156         len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
157                                      local_nfds, 0, errp);
158         if (len == QIO_CHANNEL_ERR_BLOCK) {
159             if (qemu_in_coroutine()) {
160                 qio_channel_yield(ioc, G_IO_IN);
161             } else {
162                 qio_channel_wait(ioc, G_IO_IN);
163             }
164             continue;
165         }
166 
167         if (len == 0) {
168             if (local_nfds && *local_nfds) {
169                 /*
170                  * Got some FDs, but no data yet. This isn't an EOF
171                  * scenario (yet), so carry on to try to read data
172                  * on next loop iteration
173                  */
174                 goto next_iter;
175             } else if (!partial) {
176                 /* No fds and no data - EOF before any data read */
177                 ret = 0;
178                 goto cleanup;
179             } else {
180                 len = -1;
181                 error_setg(errp,
182                            "Unexpected end-of-file before all data were read");
183                 /* Fallthrough into len < 0 handling */
184             }
185         }
186 
187         if (len < 0) {
188             /* Close any FDs we previously received */
189             if (nfds && fds) {
190                 size_t i;
191                 for (i = 0; i < (*nfds); i++) {
192                     close((*fds)[i]);
193                 }
194                 g_free(*fds);
195                 *fds = NULL;
196                 *nfds = 0;
197             }
198             goto cleanup;
199         }
200 
201         if (nlocal_iov) {
202             iov_discard_front(&local_iov, &nlocal_iov, len);
203         }
204 
205 next_iter:
206         partial = true;
207         local_fds = NULL;
208         local_nfds = NULL;
209     }
210 
211     ret = 1;
212 
213  cleanup:
214     g_free(local_iov_head);
215     return ret;
216 }
217 
218 int qio_channel_readv_full_all(QIOChannel *ioc,
219                                const struct iovec *iov,
220                                size_t niov,
221                                int **fds, size_t *nfds,
222                                Error **errp)
223 {
224     int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
225 
226     if (ret == 0) {
227         error_setg(errp, "Unexpected end-of-file before all data were read");
228         return -1;
229     }
230     if (ret == 1) {
231         return 0;
232     }
233 
234     return ret;
235 }
236 
237 int qio_channel_writev_all(QIOChannel *ioc,
238                            const struct iovec *iov,
239                            size_t niov,
240                            Error **errp)
241 {
242     return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
243 }
244 
245 int qio_channel_writev_full_all(QIOChannel *ioc,
246                                 const struct iovec *iov,
247                                 size_t niov,
248                                 int *fds, size_t nfds,
249                                 int flags, Error **errp)
250 {
251     int ret = -1;
252     struct iovec *local_iov = g_new(struct iovec, niov);
253     struct iovec *local_iov_head = local_iov;
254     unsigned int nlocal_iov = niov;
255 
256     nlocal_iov = iov_copy(local_iov, nlocal_iov,
257                           iov, niov,
258                           0, iov_size(iov, niov));
259 
260     while (nlocal_iov > 0) {
261         ssize_t len;
262 
263         len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
264                                             nfds, flags, errp);
265 
266         if (len == QIO_CHANNEL_ERR_BLOCK) {
267             if (qemu_in_coroutine()) {
268                 qio_channel_yield(ioc, G_IO_OUT);
269             } else {
270                 qio_channel_wait(ioc, G_IO_OUT);
271             }
272             continue;
273         }
274         if (len < 0) {
275             goto cleanup;
276         }
277 
278         iov_discard_front(&local_iov, &nlocal_iov, len);
279 
280         fds = NULL;
281         nfds = 0;
282     }
283 
284     ret = 0;
285  cleanup:
286     g_free(local_iov_head);
287     return ret;
288 }
289 
290 ssize_t qio_channel_readv(QIOChannel *ioc,
291                           const struct iovec *iov,
292                           size_t niov,
293                           Error **errp)
294 {
295     return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, 0, errp);
296 }
297 
298 
299 ssize_t qio_channel_writev(QIOChannel *ioc,
300                            const struct iovec *iov,
301                            size_t niov,
302                            Error **errp)
303 {
304     return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
305 }
306 
307 
308 ssize_t qio_channel_read(QIOChannel *ioc,
309                          char *buf,
310                          size_t buflen,
311                          Error **errp)
312 {
313     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
314     return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, 0, errp);
315 }
316 
317 
318 ssize_t qio_channel_write(QIOChannel *ioc,
319                           const char *buf,
320                           size_t buflen,
321                           Error **errp)
322 {
323     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
324     return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
325 }
326 
327 
328 int qio_channel_read_all_eof(QIOChannel *ioc,
329                              char *buf,
330                              size_t buflen,
331                              Error **errp)
332 {
333     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
334     return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
335 }
336 
337 
338 int qio_channel_read_all(QIOChannel *ioc,
339                          char *buf,
340                          size_t buflen,
341                          Error **errp)
342 {
343     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
344     return qio_channel_readv_all(ioc, &iov, 1, errp);
345 }
346 
347 
348 int qio_channel_write_all(QIOChannel *ioc,
349                           const char *buf,
350                           size_t buflen,
351                           Error **errp)
352 {
353     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
354     return qio_channel_writev_all(ioc, &iov, 1, errp);
355 }
356 
357 
358 int qio_channel_set_blocking(QIOChannel *ioc,
359                               bool enabled,
360                               Error **errp)
361 {
362     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
363     return klass->io_set_blocking(ioc, enabled, errp);
364 }
365 
366 
367 int qio_channel_close(QIOChannel *ioc,
368                       Error **errp)
369 {
370     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
371     return klass->io_close(ioc, errp);
372 }
373 
374 
375 GSource *qio_channel_create_watch(QIOChannel *ioc,
376                                   GIOCondition condition)
377 {
378     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
379     GSource *ret = klass->io_create_watch(ioc, condition);
380 
381     if (ioc->name) {
382         g_source_set_name(ret, ioc->name);
383     }
384 
385     return ret;
386 }
387 
388 
389 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
390                                     AioContext *ctx,
391                                     IOHandler *io_read,
392                                     IOHandler *io_write,
393                                     void *opaque)
394 {
395     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
396 
397     klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
398 }
399 
400 guint qio_channel_add_watch_full(QIOChannel *ioc,
401                                  GIOCondition condition,
402                                  QIOChannelFunc func,
403                                  gpointer user_data,
404                                  GDestroyNotify notify,
405                                  GMainContext *context)
406 {
407     GSource *source;
408     guint id;
409 
410     source = qio_channel_create_watch(ioc, condition);
411 
412     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
413 
414     id = g_source_attach(source, context);
415     g_source_unref(source);
416 
417     return id;
418 }
419 
420 guint qio_channel_add_watch(QIOChannel *ioc,
421                             GIOCondition condition,
422                             QIOChannelFunc func,
423                             gpointer user_data,
424                             GDestroyNotify notify)
425 {
426     return qio_channel_add_watch_full(ioc, condition, func,
427                                       user_data, notify, NULL);
428 }
429 
430 GSource *qio_channel_add_watch_source(QIOChannel *ioc,
431                                       GIOCondition condition,
432                                       QIOChannelFunc func,
433                                       gpointer user_data,
434                                       GDestroyNotify notify,
435                                       GMainContext *context)
436 {
437     GSource *source;
438     guint id;
439 
440     id = qio_channel_add_watch_full(ioc, condition, func,
441                                     user_data, notify, context);
442     source = g_main_context_find_source_by_id(context, id);
443     g_source_ref(source);
444     return source;
445 }
446 
447 
448 int qio_channel_shutdown(QIOChannel *ioc,
449                          QIOChannelShutdown how,
450                          Error **errp)
451 {
452     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
453 
454     if (!klass->io_shutdown) {
455         error_setg(errp, "Data path shutdown not supported");
456         return -1;
457     }
458 
459     return klass->io_shutdown(ioc, how, errp);
460 }
461 
462 
463 void qio_channel_set_delay(QIOChannel *ioc,
464                            bool enabled)
465 {
466     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
467 
468     if (klass->io_set_delay) {
469         klass->io_set_delay(ioc, enabled);
470     }
471 }
472 
473 
474 void qio_channel_set_cork(QIOChannel *ioc,
475                           bool enabled)
476 {
477     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
478 
479     if (klass->io_set_cork) {
480         klass->io_set_cork(ioc, enabled);
481     }
482 }
483 
484 
485 off_t qio_channel_io_seek(QIOChannel *ioc,
486                           off_t offset,
487                           int whence,
488                           Error **errp)
489 {
490     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
491 
492     if (!klass->io_seek) {
493         error_setg(errp, "Channel does not support random access");
494         return -1;
495     }
496 
497     return klass->io_seek(ioc, offset, whence, errp);
498 }
499 
500 int qio_channel_flush(QIOChannel *ioc,
501                                 Error **errp)
502 {
503     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
504 
505     if (!klass->io_flush ||
506         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
507         return 0;
508     }
509 
510     return klass->io_flush(ioc, errp);
511 }
512 
513 
514 static void qio_channel_restart_read(void *opaque)
515 {
516     QIOChannel *ioc = opaque;
517     Coroutine *co = ioc->read_coroutine;
518 
519     /* Assert that aio_co_wake() reenters the coroutine directly */
520     assert(qemu_get_current_aio_context() ==
521            qemu_coroutine_get_aio_context(co));
522     aio_co_wake(co);
523 }
524 
525 static void qio_channel_restart_write(void *opaque)
526 {
527     QIOChannel *ioc = opaque;
528     Coroutine *co = ioc->write_coroutine;
529 
530     /* Assert that aio_co_wake() reenters the coroutine directly */
531     assert(qemu_get_current_aio_context() ==
532            qemu_coroutine_get_aio_context(co));
533     aio_co_wake(co);
534 }
535 
536 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
537 {
538     IOHandler *rd_handler = NULL, *wr_handler = NULL;
539     AioContext *ctx;
540 
541     if (ioc->read_coroutine) {
542         rd_handler = qio_channel_restart_read;
543     }
544     if (ioc->write_coroutine) {
545         wr_handler = qio_channel_restart_write;
546     }
547 
548     ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
549     qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
550 }
551 
552 void qio_channel_attach_aio_context(QIOChannel *ioc,
553                                     AioContext *ctx)
554 {
555     assert(!ioc->read_coroutine);
556     assert(!ioc->write_coroutine);
557     ioc->ctx = ctx;
558 }
559 
560 void qio_channel_detach_aio_context(QIOChannel *ioc)
561 {
562     ioc->read_coroutine = NULL;
563     ioc->write_coroutine = NULL;
564     qio_channel_set_aio_fd_handlers(ioc);
565     ioc->ctx = NULL;
566 }
567 
568 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
569                                     GIOCondition condition)
570 {
571     assert(qemu_in_coroutine());
572     if (condition == G_IO_IN) {
573         assert(!ioc->read_coroutine);
574         ioc->read_coroutine = qemu_coroutine_self();
575     } else if (condition == G_IO_OUT) {
576         assert(!ioc->write_coroutine);
577         ioc->write_coroutine = qemu_coroutine_self();
578     } else {
579         abort();
580     }
581     qio_channel_set_aio_fd_handlers(ioc);
582     qemu_coroutine_yield();
583 
584     /* Allow interrupting the operation by reentering the coroutine other than
585      * through the aio_fd_handlers. */
586     if (condition == G_IO_IN && ioc->read_coroutine) {
587         ioc->read_coroutine = NULL;
588         qio_channel_set_aio_fd_handlers(ioc);
589     } else if (condition == G_IO_OUT && ioc->write_coroutine) {
590         ioc->write_coroutine = NULL;
591         qio_channel_set_aio_fd_handlers(ioc);
592     }
593 }
594 
595 
596 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
597                                           GIOCondition condition,
598                                           gpointer opaque)
599 {
600     GMainLoop *loop = opaque;
601 
602     g_main_loop_quit(loop);
603     return FALSE;
604 }
605 
606 
607 void qio_channel_wait(QIOChannel *ioc,
608                       GIOCondition condition)
609 {
610     GMainContext *ctxt = g_main_context_new();
611     GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
612     GSource *source;
613 
614     source = qio_channel_create_watch(ioc, condition);
615 
616     g_source_set_callback(source,
617                           (GSourceFunc)qio_channel_wait_complete,
618                           loop,
619                           NULL);
620 
621     g_source_attach(source, ctxt);
622 
623     g_main_loop_run(loop);
624 
625     g_source_unref(source);
626     g_main_loop_unref(loop);
627     g_main_context_unref(ctxt);
628 }
629 
630 
631 static void qio_channel_finalize(Object *obj)
632 {
633     QIOChannel *ioc = QIO_CHANNEL(obj);
634 
635     g_free(ioc->name);
636 
637 #ifdef _WIN32
638     if (ioc->event) {
639         CloseHandle(ioc->event);
640     }
641 #endif
642 }
643 
644 static const TypeInfo qio_channel_info = {
645     .parent = TYPE_OBJECT,
646     .name = TYPE_QIO_CHANNEL,
647     .instance_size = sizeof(QIOChannel),
648     .instance_finalize = qio_channel_finalize,
649     .abstract = true,
650     .class_size = sizeof(QIOChannelClass),
651 };
652 
653 
654 static void qio_channel_register_types(void)
655 {
656     type_register_static(&qio_channel_info);
657 }
658 
659 
660 type_init(qio_channel_register_types);
661