xref: /qemu/io/channel.c (revision f03868bd)
1 /*
2  * QEMU I/O channels
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/iov.h"
26 
27 bool qio_channel_has_feature(QIOChannel *ioc,
28                              QIOChannelFeature feature)
29 {
30     return ioc->features & (1 << feature);
31 }
32 
33 
34 void qio_channel_set_feature(QIOChannel *ioc,
35                              QIOChannelFeature feature)
36 {
37     ioc->features |= (1 << feature);
38 }
39 
40 
41 void qio_channel_set_name(QIOChannel *ioc,
42                           const char *name)
43 {
44     g_free(ioc->name);
45     ioc->name = g_strdup(name);
46 }
47 
48 
49 ssize_t qio_channel_readv_full(QIOChannel *ioc,
50                                const struct iovec *iov,
51                                size_t niov,
52                                int **fds,
53                                size_t *nfds,
54                                Error **errp)
55 {
56     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
57 
58     if ((fds || nfds) &&
59         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
60         error_setg_errno(errp, EINVAL,
61                          "Channel does not support file descriptor passing");
62         return -1;
63     }
64 
65     return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
66 }
67 
68 
69 ssize_t qio_channel_writev_full(QIOChannel *ioc,
70                                 const struct iovec *iov,
71                                 size_t niov,
72                                 int *fds,
73                                 size_t nfds,
74                                 Error **errp)
75 {
76     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
77 
78     if ((fds || nfds) &&
79         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
80         error_setg_errno(errp, EINVAL,
81                          "Channel does not support file descriptor passing");
82         return -1;
83     }
84 
85     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
86 }
87 
88 
89 int qio_channel_readv_all_eof(QIOChannel *ioc,
90                               const struct iovec *iov,
91                               size_t niov,
92                               Error **errp)
93 {
94     int ret = -1;
95     struct iovec *local_iov = g_new(struct iovec, niov);
96     struct iovec *local_iov_head = local_iov;
97     unsigned int nlocal_iov = niov;
98     bool partial = false;
99 
100     nlocal_iov = iov_copy(local_iov, nlocal_iov,
101                           iov, niov,
102                           0, iov_size(iov, niov));
103 
104     while (nlocal_iov > 0) {
105         ssize_t len;
106         len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
107         if (len == QIO_CHANNEL_ERR_BLOCK) {
108             if (qemu_in_coroutine()) {
109                 qio_channel_yield(ioc, G_IO_IN);
110             } else {
111                 qio_channel_wait(ioc, G_IO_IN);
112             }
113             continue;
114         } else if (len < 0) {
115             goto cleanup;
116         } else if (len == 0) {
117             if (partial) {
118                 error_setg(errp,
119                            "Unexpected end-of-file before all bytes were read");
120             } else {
121                 ret = 0;
122             }
123             goto cleanup;
124         }
125 
126         partial = true;
127         iov_discard_front(&local_iov, &nlocal_iov, len);
128     }
129 
130     ret = 1;
131 
132  cleanup:
133     g_free(local_iov_head);
134     return ret;
135 }
136 
137 int qio_channel_readv_all(QIOChannel *ioc,
138                           const struct iovec *iov,
139                           size_t niov,
140                           Error **errp)
141 {
142     int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp);
143 
144     if (ret == 0) {
145         ret = -1;
146         error_setg(errp,
147                    "Unexpected end-of-file before all bytes were read");
148     } else if (ret == 1) {
149         ret = 0;
150     }
151     return ret;
152 }
153 
154 int qio_channel_writev_all(QIOChannel *ioc,
155                            const struct iovec *iov,
156                            size_t niov,
157                            Error **errp)
158 {
159     int ret = -1;
160     struct iovec *local_iov = g_new(struct iovec, niov);
161     struct iovec *local_iov_head = local_iov;
162     unsigned int nlocal_iov = niov;
163 
164     nlocal_iov = iov_copy(local_iov, nlocal_iov,
165                           iov, niov,
166                           0, iov_size(iov, niov));
167 
168     while (nlocal_iov > 0) {
169         ssize_t len;
170         len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
171         if (len == QIO_CHANNEL_ERR_BLOCK) {
172             if (qemu_in_coroutine()) {
173                 qio_channel_yield(ioc, G_IO_OUT);
174             } else {
175                 qio_channel_wait(ioc, G_IO_OUT);
176             }
177             continue;
178         }
179         if (len < 0) {
180             goto cleanup;
181         }
182 
183         iov_discard_front(&local_iov, &nlocal_iov, len);
184     }
185 
186     ret = 0;
187  cleanup:
188     g_free(local_iov_head);
189     return ret;
190 }
191 
192 ssize_t qio_channel_readv(QIOChannel *ioc,
193                           const struct iovec *iov,
194                           size_t niov,
195                           Error **errp)
196 {
197     return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
198 }
199 
200 
201 ssize_t qio_channel_writev(QIOChannel *ioc,
202                            const struct iovec *iov,
203                            size_t niov,
204                            Error **errp)
205 {
206     return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
207 }
208 
209 
210 ssize_t qio_channel_read(QIOChannel *ioc,
211                          char *buf,
212                          size_t buflen,
213                          Error **errp)
214 {
215     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
216     return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
217 }
218 
219 
220 ssize_t qio_channel_write(QIOChannel *ioc,
221                           const char *buf,
222                           size_t buflen,
223                           Error **errp)
224 {
225     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
226     return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
227 }
228 
229 
230 int qio_channel_read_all_eof(QIOChannel *ioc,
231                              char *buf,
232                              size_t buflen,
233                              Error **errp)
234 {
235     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
236     return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
237 }
238 
239 
240 int qio_channel_read_all(QIOChannel *ioc,
241                          char *buf,
242                          size_t buflen,
243                          Error **errp)
244 {
245     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
246     return qio_channel_readv_all(ioc, &iov, 1, errp);
247 }
248 
249 
250 int qio_channel_write_all(QIOChannel *ioc,
251                           const char *buf,
252                           size_t buflen,
253                           Error **errp)
254 {
255     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
256     return qio_channel_writev_all(ioc, &iov, 1, errp);
257 }
258 
259 
260 int qio_channel_set_blocking(QIOChannel *ioc,
261                               bool enabled,
262                               Error **errp)
263 {
264     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
265     return klass->io_set_blocking(ioc, enabled, errp);
266 }
267 
268 
269 int qio_channel_close(QIOChannel *ioc,
270                       Error **errp)
271 {
272     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
273     return klass->io_close(ioc, errp);
274 }
275 
276 
277 GSource *qio_channel_create_watch(QIOChannel *ioc,
278                                   GIOCondition condition)
279 {
280     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
281     GSource *ret = klass->io_create_watch(ioc, condition);
282 
283     if (ioc->name) {
284         g_source_set_name(ret, ioc->name);
285     }
286 
287     return ret;
288 }
289 
290 
291 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
292                                     AioContext *ctx,
293                                     IOHandler *io_read,
294                                     IOHandler *io_write,
295                                     void *opaque)
296 {
297     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
298 
299     klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
300 }
301 
302 guint qio_channel_add_watch_full(QIOChannel *ioc,
303                                  GIOCondition condition,
304                                  QIOChannelFunc func,
305                                  gpointer user_data,
306                                  GDestroyNotify notify,
307                                  GMainContext *context)
308 {
309     GSource *source;
310     guint id;
311 
312     source = qio_channel_create_watch(ioc, condition);
313 
314     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
315 
316     id = g_source_attach(source, context);
317     g_source_unref(source);
318 
319     return id;
320 }
321 
322 guint qio_channel_add_watch(QIOChannel *ioc,
323                             GIOCondition condition,
324                             QIOChannelFunc func,
325                             gpointer user_data,
326                             GDestroyNotify notify)
327 {
328     return qio_channel_add_watch_full(ioc, condition, func,
329                                       user_data, notify, NULL);
330 }
331 
332 GSource *qio_channel_add_watch_source(QIOChannel *ioc,
333                                       GIOCondition condition,
334                                       QIOChannelFunc func,
335                                       gpointer user_data,
336                                       GDestroyNotify notify,
337                                       GMainContext *context)
338 {
339     GSource *source;
340     guint id;
341 
342     id = qio_channel_add_watch_full(ioc, condition, func,
343                                     user_data, notify, context);
344     source = g_main_context_find_source_by_id(context, id);
345     g_source_ref(source);
346     return source;
347 }
348 
349 
350 int qio_channel_shutdown(QIOChannel *ioc,
351                          QIOChannelShutdown how,
352                          Error **errp)
353 {
354     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
355 
356     if (!klass->io_shutdown) {
357         error_setg(errp, "Data path shutdown not supported");
358         return -1;
359     }
360 
361     return klass->io_shutdown(ioc, how, errp);
362 }
363 
364 
365 void qio_channel_set_delay(QIOChannel *ioc,
366                            bool enabled)
367 {
368     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
369 
370     if (klass->io_set_delay) {
371         klass->io_set_delay(ioc, enabled);
372     }
373 }
374 
375 
376 void qio_channel_set_cork(QIOChannel *ioc,
377                           bool enabled)
378 {
379     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
380 
381     if (klass->io_set_cork) {
382         klass->io_set_cork(ioc, enabled);
383     }
384 }
385 
386 
387 off_t qio_channel_io_seek(QIOChannel *ioc,
388                           off_t offset,
389                           int whence,
390                           Error **errp)
391 {
392     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
393 
394     if (!klass->io_seek) {
395         error_setg(errp, "Channel does not support random access");
396         return -1;
397     }
398 
399     return klass->io_seek(ioc, offset, whence, errp);
400 }
401 
402 
403 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc);
404 
405 static void qio_channel_restart_read(void *opaque)
406 {
407     QIOChannel *ioc = opaque;
408     Coroutine *co = ioc->read_coroutine;
409 
410     ioc->read_coroutine = NULL;
411     qio_channel_set_aio_fd_handlers(ioc);
412     aio_co_wake(co);
413 }
414 
415 static void qio_channel_restart_write(void *opaque)
416 {
417     QIOChannel *ioc = opaque;
418     Coroutine *co = ioc->write_coroutine;
419 
420     ioc->write_coroutine = NULL;
421     qio_channel_set_aio_fd_handlers(ioc);
422     aio_co_wake(co);
423 }
424 
425 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
426 {
427     IOHandler *rd_handler = NULL, *wr_handler = NULL;
428     AioContext *ctx;
429 
430     if (ioc->read_coroutine) {
431         rd_handler = qio_channel_restart_read;
432     }
433     if (ioc->write_coroutine) {
434         wr_handler = qio_channel_restart_write;
435     }
436 
437     ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
438     qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
439 }
440 
441 void qio_channel_attach_aio_context(QIOChannel *ioc,
442                                     AioContext *ctx)
443 {
444     assert(!ioc->read_coroutine);
445     assert(!ioc->write_coroutine);
446     ioc->ctx = ctx;
447 }
448 
449 void qio_channel_detach_aio_context(QIOChannel *ioc)
450 {
451     ioc->read_coroutine = NULL;
452     ioc->write_coroutine = NULL;
453     qio_channel_set_aio_fd_handlers(ioc);
454     ioc->ctx = NULL;
455 }
456 
457 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
458                                     GIOCondition condition)
459 {
460     assert(qemu_in_coroutine());
461     if (condition == G_IO_IN) {
462         assert(!ioc->read_coroutine);
463         ioc->read_coroutine = qemu_coroutine_self();
464     } else if (condition == G_IO_OUT) {
465         assert(!ioc->write_coroutine);
466         ioc->write_coroutine = qemu_coroutine_self();
467     } else {
468         abort();
469     }
470     qio_channel_set_aio_fd_handlers(ioc);
471     qemu_coroutine_yield();
472 }
473 
474 
475 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
476                                           GIOCondition condition,
477                                           gpointer opaque)
478 {
479     GMainLoop *loop = opaque;
480 
481     g_main_loop_quit(loop);
482     return FALSE;
483 }
484 
485 
486 void qio_channel_wait(QIOChannel *ioc,
487                       GIOCondition condition)
488 {
489     GMainContext *ctxt = g_main_context_new();
490     GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
491     GSource *source;
492 
493     source = qio_channel_create_watch(ioc, condition);
494 
495     g_source_set_callback(source,
496                           (GSourceFunc)qio_channel_wait_complete,
497                           loop,
498                           NULL);
499 
500     g_source_attach(source, ctxt);
501 
502     g_main_loop_run(loop);
503 
504     g_source_unref(source);
505     g_main_loop_unref(loop);
506     g_main_context_unref(ctxt);
507 }
508 
509 
510 static void qio_channel_finalize(Object *obj)
511 {
512     QIOChannel *ioc = QIO_CHANNEL(obj);
513 
514     g_free(ioc->name);
515 
516 #ifdef _WIN32
517     if (ioc->event) {
518         CloseHandle(ioc->event);
519     }
520 #endif
521 }
522 
523 static const TypeInfo qio_channel_info = {
524     .parent = TYPE_OBJECT,
525     .name = TYPE_QIO_CHANNEL,
526     .instance_size = sizeof(QIOChannel),
527     .instance_finalize = qio_channel_finalize,
528     .abstract = true,
529     .class_size = sizeof(QIOChannelClass),
530 };
531 
532 
533 static void qio_channel_register_types(void)
534 {
535     type_register_static(&qio_channel_info);
536 }
537 
538 
539 type_init(qio_channel_register_types);
540