xref: /qemu/io/channel.c (revision 9ffb8270)
1 /*
2  * QEMU I/O channels
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/iov.h"
26 
27 bool qio_channel_has_feature(QIOChannel *ioc,
28                              QIOChannelFeature feature)
29 {
30     return ioc->features & (1 << feature);
31 }
32 
33 
34 void qio_channel_set_feature(QIOChannel *ioc,
35                              QIOChannelFeature feature)
36 {
37     ioc->features |= (1 << feature);
38 }
39 
40 
41 void qio_channel_set_name(QIOChannel *ioc,
42                           const char *name)
43 {
44     g_free(ioc->name);
45     ioc->name = g_strdup(name);
46 }
47 
48 
49 ssize_t qio_channel_readv_full(QIOChannel *ioc,
50                                const struct iovec *iov,
51                                size_t niov,
52                                int **fds,
53                                size_t *nfds,
54                                Error **errp)
55 {
56     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
57 
58     if ((fds || nfds) &&
59         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
60         error_setg_errno(errp, EINVAL,
61                          "Channel does not support file descriptor passing");
62         return -1;
63     }
64 
65     return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
66 }
67 
68 
69 ssize_t qio_channel_writev_full(QIOChannel *ioc,
70                                 const struct iovec *iov,
71                                 size_t niov,
72                                 int *fds,
73                                 size_t nfds,
74                                 Error **errp)
75 {
76     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
77 
78     if ((fds || nfds) &&
79         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
80         error_setg_errno(errp, EINVAL,
81                          "Channel does not support file descriptor passing");
82         return -1;
83     }
84 
85     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
86 }
87 
88 
89 
90 int qio_channel_readv_all(QIOChannel *ioc,
91                           const struct iovec *iov,
92                           size_t niov,
93                           Error **errp)
94 {
95     int ret = -1;
96     struct iovec *local_iov = g_new(struct iovec, niov);
97     struct iovec *local_iov_head = local_iov;
98     unsigned int nlocal_iov = niov;
99 
100     nlocal_iov = iov_copy(local_iov, nlocal_iov,
101                           iov, niov,
102                           0, iov_size(iov, niov));
103 
104     while (nlocal_iov > 0) {
105         ssize_t len;
106         len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
107         if (len == QIO_CHANNEL_ERR_BLOCK) {
108             if (qemu_in_coroutine()) {
109                 qio_channel_yield(ioc, G_IO_IN);
110             } else {
111                 qio_channel_wait(ioc, G_IO_IN);
112             }
113             continue;
114         } else if (len < 0) {
115             goto cleanup;
116         } else if (len == 0) {
117             error_setg(errp,
118                        "Unexpected end-of-file before all bytes were read");
119             goto cleanup;
120         }
121 
122         iov_discard_front(&local_iov, &nlocal_iov, len);
123     }
124 
125     ret = 0;
126 
127  cleanup:
128     g_free(local_iov_head);
129     return ret;
130 }
131 
132 int qio_channel_writev_all(QIOChannel *ioc,
133                            const struct iovec *iov,
134                            size_t niov,
135                            Error **errp)
136 {
137     int ret = -1;
138     struct iovec *local_iov = g_new(struct iovec, niov);
139     struct iovec *local_iov_head = local_iov;
140     unsigned int nlocal_iov = niov;
141 
142     nlocal_iov = iov_copy(local_iov, nlocal_iov,
143                           iov, niov,
144                           0, iov_size(iov, niov));
145 
146     while (nlocal_iov > 0) {
147         ssize_t len;
148         len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
149         if (len == QIO_CHANNEL_ERR_BLOCK) {
150             if (qemu_in_coroutine()) {
151                 qio_channel_yield(ioc, G_IO_OUT);
152             } else {
153                 qio_channel_wait(ioc, G_IO_OUT);
154             }
155             continue;
156         }
157         if (len < 0) {
158             goto cleanup;
159         }
160 
161         iov_discard_front(&local_iov, &nlocal_iov, len);
162     }
163 
164     ret = 0;
165  cleanup:
166     g_free(local_iov_head);
167     return ret;
168 }
169 
170 ssize_t qio_channel_readv(QIOChannel *ioc,
171                           const struct iovec *iov,
172                           size_t niov,
173                           Error **errp)
174 {
175     return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
176 }
177 
178 
179 ssize_t qio_channel_writev(QIOChannel *ioc,
180                            const struct iovec *iov,
181                            size_t niov,
182                            Error **errp)
183 {
184     return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
185 }
186 
187 
188 ssize_t qio_channel_read(QIOChannel *ioc,
189                          char *buf,
190                          size_t buflen,
191                          Error **errp)
192 {
193     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
194     return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
195 }
196 
197 
198 ssize_t qio_channel_write(QIOChannel *ioc,
199                           const char *buf,
200                           size_t buflen,
201                           Error **errp)
202 {
203     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
204     return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
205 }
206 
207 
208 int qio_channel_read_all(QIOChannel *ioc,
209                          char *buf,
210                          size_t buflen,
211                          Error **errp)
212 {
213     struct iovec iov = { .iov_base = buf, .iov_len = buflen };
214     return qio_channel_readv_all(ioc, &iov, 1, errp);
215 }
216 
217 
218 int qio_channel_write_all(QIOChannel *ioc,
219                           const char *buf,
220                           size_t buflen,
221                           Error **errp)
222 {
223     struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
224     return qio_channel_writev_all(ioc, &iov, 1, errp);
225 }
226 
227 
228 int qio_channel_set_blocking(QIOChannel *ioc,
229                               bool enabled,
230                               Error **errp)
231 {
232     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
233     return klass->io_set_blocking(ioc, enabled, errp);
234 }
235 
236 
237 int qio_channel_close(QIOChannel *ioc,
238                       Error **errp)
239 {
240     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
241     return klass->io_close(ioc, errp);
242 }
243 
244 
245 GSource *qio_channel_create_watch(QIOChannel *ioc,
246                                   GIOCondition condition)
247 {
248     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
249     GSource *ret = klass->io_create_watch(ioc, condition);
250 
251     if (ioc->name) {
252         g_source_set_name(ret, ioc->name);
253     }
254 
255     return ret;
256 }
257 
258 
259 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
260                                     AioContext *ctx,
261                                     IOHandler *io_read,
262                                     IOHandler *io_write,
263                                     void *opaque)
264 {
265     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
266 
267     klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
268 }
269 
270 guint qio_channel_add_watch(QIOChannel *ioc,
271                             GIOCondition condition,
272                             QIOChannelFunc func,
273                             gpointer user_data,
274                             GDestroyNotify notify)
275 {
276     GSource *source;
277     guint id;
278 
279     source = qio_channel_create_watch(ioc, condition);
280 
281     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
282 
283     id = g_source_attach(source, NULL);
284     g_source_unref(source);
285 
286     return id;
287 }
288 
289 
290 int qio_channel_shutdown(QIOChannel *ioc,
291                          QIOChannelShutdown how,
292                          Error **errp)
293 {
294     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
295 
296     if (!klass->io_shutdown) {
297         error_setg(errp, "Data path shutdown not supported");
298         return -1;
299     }
300 
301     return klass->io_shutdown(ioc, how, errp);
302 }
303 
304 
305 void qio_channel_set_delay(QIOChannel *ioc,
306                            bool enabled)
307 {
308     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
309 
310     if (klass->io_set_delay) {
311         klass->io_set_delay(ioc, enabled);
312     }
313 }
314 
315 
316 void qio_channel_set_cork(QIOChannel *ioc,
317                           bool enabled)
318 {
319     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
320 
321     if (klass->io_set_cork) {
322         klass->io_set_cork(ioc, enabled);
323     }
324 }
325 
326 
327 off_t qio_channel_io_seek(QIOChannel *ioc,
328                           off_t offset,
329                           int whence,
330                           Error **errp)
331 {
332     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
333 
334     if (!klass->io_seek) {
335         error_setg(errp, "Channel does not support random access");
336         return -1;
337     }
338 
339     return klass->io_seek(ioc, offset, whence, errp);
340 }
341 
342 
343 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc);
344 
345 static void qio_channel_restart_read(void *opaque)
346 {
347     QIOChannel *ioc = opaque;
348     Coroutine *co = ioc->read_coroutine;
349 
350     ioc->read_coroutine = NULL;
351     qio_channel_set_aio_fd_handlers(ioc);
352     aio_co_wake(co);
353 }
354 
355 static void qio_channel_restart_write(void *opaque)
356 {
357     QIOChannel *ioc = opaque;
358     Coroutine *co = ioc->write_coroutine;
359 
360     ioc->write_coroutine = NULL;
361     qio_channel_set_aio_fd_handlers(ioc);
362     aio_co_wake(co);
363 }
364 
365 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
366 {
367     IOHandler *rd_handler = NULL, *wr_handler = NULL;
368     AioContext *ctx;
369 
370     if (ioc->read_coroutine) {
371         rd_handler = qio_channel_restart_read;
372     }
373     if (ioc->write_coroutine) {
374         wr_handler = qio_channel_restart_write;
375     }
376 
377     ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
378     qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
379 }
380 
381 void qio_channel_attach_aio_context(QIOChannel *ioc,
382                                     AioContext *ctx)
383 {
384     assert(!ioc->read_coroutine);
385     assert(!ioc->write_coroutine);
386     ioc->ctx = ctx;
387 }
388 
389 void qio_channel_detach_aio_context(QIOChannel *ioc)
390 {
391     ioc->read_coroutine = NULL;
392     ioc->write_coroutine = NULL;
393     qio_channel_set_aio_fd_handlers(ioc);
394     ioc->ctx = NULL;
395 }
396 
397 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
398                                     GIOCondition condition)
399 {
400     assert(qemu_in_coroutine());
401     if (condition == G_IO_IN) {
402         assert(!ioc->read_coroutine);
403         ioc->read_coroutine = qemu_coroutine_self();
404     } else if (condition == G_IO_OUT) {
405         assert(!ioc->write_coroutine);
406         ioc->write_coroutine = qemu_coroutine_self();
407     } else {
408         abort();
409     }
410     qio_channel_set_aio_fd_handlers(ioc);
411     qemu_coroutine_yield();
412 }
413 
414 
415 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
416                                           GIOCondition condition,
417                                           gpointer opaque)
418 {
419     GMainLoop *loop = opaque;
420 
421     g_main_loop_quit(loop);
422     return FALSE;
423 }
424 
425 
426 void qio_channel_wait(QIOChannel *ioc,
427                       GIOCondition condition)
428 {
429     GMainContext *ctxt = g_main_context_new();
430     GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
431     GSource *source;
432 
433     source = qio_channel_create_watch(ioc, condition);
434 
435     g_source_set_callback(source,
436                           (GSourceFunc)qio_channel_wait_complete,
437                           loop,
438                           NULL);
439 
440     g_source_attach(source, ctxt);
441 
442     g_main_loop_run(loop);
443 
444     g_source_unref(source);
445     g_main_loop_unref(loop);
446     g_main_context_unref(ctxt);
447 }
448 
449 
450 static void qio_channel_finalize(Object *obj)
451 {
452     QIOChannel *ioc = QIO_CHANNEL(obj);
453 
454     g_free(ioc->name);
455 
456 #ifdef _WIN32
457     if (ioc->event) {
458         CloseHandle(ioc->event);
459     }
460 #endif
461 }
462 
463 static const TypeInfo qio_channel_info = {
464     .parent = TYPE_OBJECT,
465     .name = TYPE_QIO_CHANNEL,
466     .instance_size = sizeof(QIOChannel),
467     .instance_finalize = qio_channel_finalize,
468     .abstract = true,
469     .class_size = sizeof(QIOChannelClass),
470 };
471 
472 
473 static void qio_channel_register_types(void)
474 {
475     type_register_static(&qio_channel_info);
476 }
477 
478 
479 type_init(qio_channel_register_types);
480