xref: /qemu/io/channel.c (revision 727385c4)
1 /*
2  * QEMU I/O channels
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/module.h"
26 #include "qemu/iov.h"
27 
28 bool qio_channel_has_feature(QIOChannel *ioc,
29                              QIOChannelFeature feature)
30 {
31     return ioc->features & (1 << feature);
32 }
33 
34 
35 void qio_channel_set_feature(QIOChannel *ioc,
36                              QIOChannelFeature feature)
37 {
38     ioc->features |= (1 << feature);
39 }
40 
41 
42 void qio_channel_set_name(QIOChannel *ioc,
43                           const char *name)
44 {
45     g_free(ioc->name);
46     ioc->name = g_strdup(name);
47 }
48 
49 
50 ssize_t qio_channel_readv_full(QIOChannel *ioc,
51                                const struct iovec *iov,
52                                size_t niov,
53                                int **fds,
54                                size_t *nfds,
55                                Error **errp)
56 {
57     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
58 
59     if ((fds || nfds) &&
60         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
61         error_setg_errno(errp, EINVAL,
62                          "Channel does not support file descriptor passing");
63         return -1;
64     }
65 
66     return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
67 }
68 
69 
70 ssize_t qio_channel_writev_full(QIOChannel *ioc,
71                                 const struct iovec *iov,
72                                 size_t niov,
73                                 int *fds,
74                                 size_t nfds,
75                                 Error **errp)
76 {
77     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
78 
79     if ((fds || nfds) &&
80         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
81         error_setg_errno(errp, EINVAL,
82                          "Channel does not support file descriptor passing");
83         return -1;
84     }
85 
86     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
87 }
88 
89 
90 int qio_channel_readv_all_eof(QIOChannel *ioc,
91                               const struct iovec *iov,
92                               size_t niov,
93                               Error **errp)
94 {
95     return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
96 }
97 
98 int qio_channel_readv_all(QIOChannel *ioc,
99                           const struct iovec *iov,
100                           size_t niov,
101                           Error **errp)
102 {
103     return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
104 }
105 
106 int qio_channel_readv_full_all_eof(QIOChannel *ioc,
107                                    const struct iovec *iov,
108                                    size_t niov,
109                                    int **fds, size_t *nfds,
110                                    Error **errp)
111 {
112     int ret = -1;
113     struct iovec *local_iov = g_new(struct iovec, niov);
114     struct iovec *local_iov_head = local_iov;
115     unsigned int nlocal_iov = niov;
116     int **local_fds = fds;
117     size_t *local_nfds = nfds;
118     bool partial = false;
119 
120     if (nfds) {
121         *nfds = 0;
122     }
123 
124     if (fds) {
125         *fds = NULL;
126     }
127 
128     nlocal_iov = iov_copy(local_iov, nlocal_iov,
129                           iov, niov,
130                           0, iov_size(iov, niov));
131 
132     while ((nlocal_iov > 0) || local_fds) {
133         ssize_t len;
134         len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
135                                      local_nfds, errp);
136         if (len == QIO_CHANNEL_ERR_BLOCK) {
137             if (qemu_in_coroutine()) {
138                 qio_channel_yield(ioc, G_IO_IN);
139             } else {
140                 qio_channel_wait(ioc, G_IO_IN);
141             }
142             continue;
143         }
144 
145         if (len == 0) {
146             if (local_nfds && *local_nfds) {
147                 /*
148                  * Got some FDs, but no data yet. This isn't an EOF
149                  * scenario (yet), so carry on to try to read data
150                  * on next loop iteration
151                  */
152                 goto next_iter;
153             } else if (!partial) {
154                 /* No fds and no data - EOF before any data read */
155                 ret = 0;
156                 goto cleanup;
157             } else {
158                 len = -1;
159                 error_setg(errp,
160                            "Unexpected end-of-file before all data were read");
161                 /* Fallthrough into len < 0 handling */
162             }
163         }
164 
165         if (len < 0) {
166             /* Close any FDs we previously received */
167             if (nfds && fds) {
168                 size_t i;
169                 for (i = 0; i < (*nfds); i++) {
170                     close((*fds)[i]);
171                 }
172                 g_free(*fds);
173                 *fds = NULL;
174                 *nfds = 0;
175             }
176             goto cleanup;
177         }
178 
179         if (nlocal_iov) {
180             iov_discard_front(&local_iov, &nlocal_iov, len);
181         }
182 
183 next_iter:
184         partial = true;
185         local_fds = NULL;
186         local_nfds = NULL;
187     }
188 
189     ret = 1;
190 
191  cleanup:
192     g_free(local_iov_head);
193     return ret;
194 }
195 
196 int qio_channel_readv_full_all(QIOChannel *ioc,
197                                const struct iovec *iov,
198                                size_t niov,
199                                int **fds, size_t *nfds,
200                                Error **errp)
201 {
202     int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
203 
204     if (ret == 0) {
205         error_setg(errp, "Unexpected end-of-file before all data were read");
206         return -1;
207     }
208     if (ret == 1) {
209         return 0;
210     }
211 
212     return ret;
213 }
214 
215 int qio_channel_writev_all(QIOChannel *ioc,
216                            const struct iovec *iov,
217                            size_t niov,
218                            Error **errp)
219 {
220     return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
221 }
222 
223 int qio_channel_writev_full_all(QIOChannel *ioc,
224                                 const struct iovec *iov,
225                                 size_t niov,
226                                 int *fds, size_t nfds,
227                                 Error **errp)
228 {
229     int ret = -1;
230     struct iovec *local_iov = g_new(struct iovec, niov);
231     struct iovec *local_iov_head = local_iov;
232     unsigned int nlocal_iov = niov;
233 
234     nlocal_iov = iov_copy(local_iov, nlocal_iov,
235                           iov, niov,
236                           0, iov_size(iov, niov));
237 
238     while (nlocal_iov > 0) {
239         ssize_t len;
240         len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
241                                       errp);
242         if (len == QIO_CHANNEL_ERR_BLOCK) {
243             if (qemu_in_coroutine()) {
244                 qio_channel_yield(ioc, G_IO_OUT);
245             } else {
246                 qio_channel_wait(ioc, G_IO_OUT);
247             }
248             continue;
249         }
250         if (len < 0) {
251             goto cleanup;
252         }
253 
254         iov_discard_front(&local_iov, &nlocal_iov, len);
255 
256         fds = NULL;
257         nfds = 0;
258     }
259 
260     ret = 0;
261  cleanup:
262     g_free(local_iov_head);
263     return ret;
264 }
265 
266 ssize_t qio_channel_readv(QIOChannel *ioc,
267                           const struct iovec *iov,
268                           size_t niov,
269                           Error **errp)
270 {
271     return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
272 }
273 
274 
275 ssize_t qio_channel_writev(QIOChannel *ioc,
276                            const struct iovec *iov,
277                            size_t niov,
278                            Error **errp)
279 {
280     return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
281 }
282 
283 
284 ssize_t qio_channel_read(QIOChannel *ioc,
285                          char *buf,
286                          size_t buflen,
287                          Error **errp)
288 {
289     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
290     return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
291 }
292 
293 
294 ssize_t qio_channel_write(QIOChannel *ioc,
295                           const char *buf,
296                           size_t buflen,
297                           Error **errp)
298 {
299     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
300     return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
301 }
302 
303 
304 int qio_channel_read_all_eof(QIOChannel *ioc,
305                              char *buf,
306                              size_t buflen,
307                              Error **errp)
308 {
309     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
310     return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
311 }
312 
313 
314 int qio_channel_read_all(QIOChannel *ioc,
315                          char *buf,
316                          size_t buflen,
317                          Error **errp)
318 {
319     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
320     return qio_channel_readv_all(ioc, &iov, 1, errp);
321 }
322 
323 
324 int qio_channel_write_all(QIOChannel *ioc,
325                           const char *buf,
326                           size_t buflen,
327                           Error **errp)
328 {
329     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
330     return qio_channel_writev_all(ioc, &iov, 1, errp);
331 }
332 
333 
334 int qio_channel_set_blocking(QIOChannel *ioc,
335                               bool enabled,
336                               Error **errp)
337 {
338     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
339     return klass->io_set_blocking(ioc, enabled, errp);
340 }
341 
342 
343 int qio_channel_close(QIOChannel *ioc,
344                       Error **errp)
345 {
346     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
347     return klass->io_close(ioc, errp);
348 }
349 
350 
351 GSource *qio_channel_create_watch(QIOChannel *ioc,
352                                   GIOCondition condition)
353 {
354     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
355     GSource *ret = klass->io_create_watch(ioc, condition);
356 
357     if (ioc->name) {
358         g_source_set_name(ret, ioc->name);
359     }
360 
361     return ret;
362 }
363 
364 
365 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
366                                     AioContext *ctx,
367                                     IOHandler *io_read,
368                                     IOHandler *io_write,
369                                     void *opaque)
370 {
371     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
372 
373     klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
374 }
375 
376 guint qio_channel_add_watch_full(QIOChannel *ioc,
377                                  GIOCondition condition,
378                                  QIOChannelFunc func,
379                                  gpointer user_data,
380                                  GDestroyNotify notify,
381                                  GMainContext *context)
382 {
383     GSource *source;
384     guint id;
385 
386     source = qio_channel_create_watch(ioc, condition);
387 
388     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
389 
390     id = g_source_attach(source, context);
391     g_source_unref(source);
392 
393     return id;
394 }
395 
396 guint qio_channel_add_watch(QIOChannel *ioc,
397                             GIOCondition condition,
398                             QIOChannelFunc func,
399                             gpointer user_data,
400                             GDestroyNotify notify)
401 {
402     return qio_channel_add_watch_full(ioc, condition, func,
403                                       user_data, notify, NULL);
404 }
405 
406 GSource *qio_channel_add_watch_source(QIOChannel *ioc,
407                                       GIOCondition condition,
408                                       QIOChannelFunc func,
409                                       gpointer user_data,
410                                       GDestroyNotify notify,
411                                       GMainContext *context)
412 {
413     GSource *source;
414     guint id;
415 
416     id = qio_channel_add_watch_full(ioc, condition, func,
417                                     user_data, notify, context);
418     source = g_main_context_find_source_by_id(context, id);
419     g_source_ref(source);
420     return source;
421 }
422 
423 
424 int qio_channel_shutdown(QIOChannel *ioc,
425                          QIOChannelShutdown how,
426                          Error **errp)
427 {
428     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
429 
430     if (!klass->io_shutdown) {
431         error_setg(errp, "Data path shutdown not supported");
432         return -1;
433     }
434 
435     return klass->io_shutdown(ioc, how, errp);
436 }
437 
438 
439 void qio_channel_set_delay(QIOChannel *ioc,
440                            bool enabled)
441 {
442     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
443 
444     if (klass->io_set_delay) {
445         klass->io_set_delay(ioc, enabled);
446     }
447 }
448 
449 
450 void qio_channel_set_cork(QIOChannel *ioc,
451                           bool enabled)
452 {
453     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
454 
455     if (klass->io_set_cork) {
456         klass->io_set_cork(ioc, enabled);
457     }
458 }
459 
460 
461 off_t qio_channel_io_seek(QIOChannel *ioc,
462                           off_t offset,
463                           int whence,
464                           Error **errp)
465 {
466     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
467 
468     if (!klass->io_seek) {
469         error_setg(errp, "Channel does not support random access");
470         return -1;
471     }
472 
473     return klass->io_seek(ioc, offset, whence, errp);
474 }
475 
476 
477 static void qio_channel_restart_read(void *opaque)
478 {
479     QIOChannel *ioc = opaque;
480     Coroutine *co = ioc->read_coroutine;
481 
482     /* Assert that aio_co_wake() reenters the coroutine directly */
483     assert(qemu_get_current_aio_context() ==
484            qemu_coroutine_get_aio_context(co));
485     aio_co_wake(co);
486 }
487 
488 static void qio_channel_restart_write(void *opaque)
489 {
490     QIOChannel *ioc = opaque;
491     Coroutine *co = ioc->write_coroutine;
492 
493     /* Assert that aio_co_wake() reenters the coroutine directly */
494     assert(qemu_get_current_aio_context() ==
495            qemu_coroutine_get_aio_context(co));
496     aio_co_wake(co);
497 }
498 
499 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
500 {
501     IOHandler *rd_handler = NULL, *wr_handler = NULL;
502     AioContext *ctx;
503 
504     if (ioc->read_coroutine) {
505         rd_handler = qio_channel_restart_read;
506     }
507     if (ioc->write_coroutine) {
508         wr_handler = qio_channel_restart_write;
509     }
510 
511     ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
512     qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
513 }
514 
515 void qio_channel_attach_aio_context(QIOChannel *ioc,
516                                     AioContext *ctx)
517 {
518     assert(!ioc->read_coroutine);
519     assert(!ioc->write_coroutine);
520     ioc->ctx = ctx;
521 }
522 
523 void qio_channel_detach_aio_context(QIOChannel *ioc)
524 {
525     ioc->read_coroutine = NULL;
526     ioc->write_coroutine = NULL;
527     qio_channel_set_aio_fd_handlers(ioc);
528     ioc->ctx = NULL;
529 }
530 
531 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
532                                     GIOCondition condition)
533 {
534     assert(qemu_in_coroutine());
535     if (condition == G_IO_IN) {
536         assert(!ioc->read_coroutine);
537         ioc->read_coroutine = qemu_coroutine_self();
538     } else if (condition == G_IO_OUT) {
539         assert(!ioc->write_coroutine);
540         ioc->write_coroutine = qemu_coroutine_self();
541     } else {
542         abort();
543     }
544     qio_channel_set_aio_fd_handlers(ioc);
545     qemu_coroutine_yield();
546 
547     /* Allow interrupting the operation by reentering the coroutine other than
548      * through the aio_fd_handlers. */
549     if (condition == G_IO_IN && ioc->read_coroutine) {
550         ioc->read_coroutine = NULL;
551         qio_channel_set_aio_fd_handlers(ioc);
552     } else if (condition == G_IO_OUT && ioc->write_coroutine) {
553         ioc->write_coroutine = NULL;
554         qio_channel_set_aio_fd_handlers(ioc);
555     }
556 }
557 
558 
559 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
560                                           GIOCondition condition,
561                                           gpointer opaque)
562 {
563     GMainLoop *loop = opaque;
564 
565     g_main_loop_quit(loop);
566     return FALSE;
567 }
568 
569 
570 void qio_channel_wait(QIOChannel *ioc,
571                       GIOCondition condition)
572 {
573     GMainContext *ctxt = g_main_context_new();
574     GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
575     GSource *source;
576 
577     source = qio_channel_create_watch(ioc, condition);
578 
579     g_source_set_callback(source,
580                           (GSourceFunc)qio_channel_wait_complete,
581                           loop,
582                           NULL);
583 
584     g_source_attach(source, ctxt);
585 
586     g_main_loop_run(loop);
587 
588     g_source_unref(source);
589     g_main_loop_unref(loop);
590     g_main_context_unref(ctxt);
591 }
592 
593 
594 static void qio_channel_finalize(Object *obj)
595 {
596     QIOChannel *ioc = QIO_CHANNEL(obj);
597 
598     g_free(ioc->name);
599 
600 #ifdef _WIN32
601     if (ioc->event) {
602         CloseHandle(ioc->event);
603     }
604 #endif
605 }
606 
607 static const TypeInfo qio_channel_info = {
608     .parent = TYPE_OBJECT,
609     .name = TYPE_QIO_CHANNEL,
610     .instance_size = sizeof(QIOChannel),
611     .instance_finalize = qio_channel_finalize,
612     .abstract = true,
613     .class_size = sizeof(QIOChannelClass),
614 };
615 
616 
617 static void qio_channel_register_types(void)
618 {
619     type_register_static(&qio_channel_info);
620 }
621 
622 
623 type_init(qio_channel_register_types);
624