1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/iov.h" 26 27 bool qio_channel_has_feature(QIOChannel *ioc, 28 QIOChannelFeature feature) 29 { 30 return ioc->features & (1 << feature); 31 } 32 33 34 void qio_channel_set_feature(QIOChannel *ioc, 35 QIOChannelFeature feature) 36 { 37 ioc->features |= (1 << feature); 38 } 39 40 41 void qio_channel_set_name(QIOChannel *ioc, 42 const char *name) 43 { 44 g_free(ioc->name); 45 ioc->name = g_strdup(name); 46 } 47 48 49 ssize_t qio_channel_readv_full(QIOChannel *ioc, 50 const struct iovec *iov, 51 size_t niov, 52 int **fds, 53 size_t *nfds, 54 Error **errp) 55 { 56 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 57 58 if ((fds || nfds) && 59 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 60 error_setg_errno(errp, EINVAL, 61 "Channel does not support file descriptor passing"); 62 return -1; 63 } 64 65 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 66 } 67 68 69 ssize_t qio_channel_writev_full(QIOChannel *ioc, 70 const struct iovec *iov, 71 size_t niov, 72 int *fds, 73 size_t nfds, 74 Error **errp) 75 { 76 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 77 78 if ((fds || nfds) && 79 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 80 error_setg_errno(errp, EINVAL, 81 "Channel does not support file descriptor passing"); 82 return -1; 83 } 84 85 return klass->io_writev(ioc, iov, niov, fds, nfds, errp); 86 } 87 88 89 90 int qio_channel_readv_all(QIOChannel *ioc, 91 const struct iovec *iov, 92 size_t niov, 93 Error **errp) 94 { 95 int ret = -1; 96 struct iovec *local_iov = g_new(struct iovec, niov); 97 struct iovec *local_iov_head = local_iov; 98 unsigned int nlocal_iov = niov; 99 100 nlocal_iov = iov_copy(local_iov, nlocal_iov, 101 iov, niov, 102 0, iov_size(iov, niov)); 103 104 while (nlocal_iov > 0) { 105 ssize_t len; 106 len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp); 107 if (len == QIO_CHANNEL_ERR_BLOCK) { 108 if (qemu_in_coroutine()) { 109 qio_channel_yield(ioc, G_IO_IN); 110 } else { 111 qio_channel_wait(ioc, G_IO_IN); 112 } 113 continue; 114 } else if (len < 0) { 115 goto cleanup; 116 } else if (len == 0) { 117 error_setg(errp, 118 "Unexpected end-of-file before all bytes were read"); 119 goto cleanup; 120 } 121 122 iov_discard_front(&local_iov, &nlocal_iov, len); 123 } 124 125 ret = 0; 126 127 cleanup: 128 g_free(local_iov_head); 129 return ret; 130 } 131 132 int qio_channel_writev_all(QIOChannel *ioc, 133 const struct iovec *iov, 134 size_t niov, 135 Error **errp) 136 { 137 int ret = -1; 138 struct iovec *local_iov = g_new(struct iovec, niov); 139 struct iovec *local_iov_head = local_iov; 140 unsigned int nlocal_iov = niov; 141 142 nlocal_iov = iov_copy(local_iov, nlocal_iov, 143 iov, niov, 144 0, iov_size(iov, niov)); 145 146 while (nlocal_iov > 0) { 147 ssize_t len; 148 len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp); 149 if (len == QIO_CHANNEL_ERR_BLOCK) { 150 if (qemu_in_coroutine()) { 151 qio_channel_yield(ioc, G_IO_OUT); 152 } else { 153 qio_channel_wait(ioc, G_IO_OUT); 154 } 155 continue; 156 } 157 if (len < 0) { 158 goto cleanup; 159 } 160 161 iov_discard_front(&local_iov, &nlocal_iov, len); 162 } 163 164 ret = 0; 165 cleanup: 166 g_free(local_iov_head); 167 return ret; 168 } 169 170 ssize_t qio_channel_readv(QIOChannel *ioc, 171 const struct iovec *iov, 172 size_t niov, 173 Error **errp) 174 { 175 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 176 } 177 178 179 ssize_t qio_channel_writev(QIOChannel *ioc, 180 const struct iovec *iov, 181 size_t niov, 182 Error **errp) 183 { 184 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp); 185 } 186 187 188 ssize_t qio_channel_read(QIOChannel *ioc, 189 char *buf, 190 size_t buflen, 191 Error **errp) 192 { 193 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 194 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 195 } 196 197 198 ssize_t qio_channel_write(QIOChannel *ioc, 199 const char *buf, 200 size_t buflen, 201 Error **errp) 202 { 203 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 204 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp); 205 } 206 207 208 int qio_channel_read_all(QIOChannel *ioc, 209 char *buf, 210 size_t buflen, 211 Error **errp) 212 { 213 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 214 return qio_channel_readv_all(ioc, &iov, 1, errp); 215 } 216 217 218 int qio_channel_write_all(QIOChannel *ioc, 219 const char *buf, 220 size_t buflen, 221 Error **errp) 222 { 223 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 224 return qio_channel_writev_all(ioc, &iov, 1, errp); 225 } 226 227 228 int qio_channel_set_blocking(QIOChannel *ioc, 229 bool enabled, 230 Error **errp) 231 { 232 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 233 return klass->io_set_blocking(ioc, enabled, errp); 234 } 235 236 237 int qio_channel_close(QIOChannel *ioc, 238 Error **errp) 239 { 240 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 241 return klass->io_close(ioc, errp); 242 } 243 244 245 GSource *qio_channel_create_watch(QIOChannel *ioc, 246 GIOCondition condition) 247 { 248 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 249 GSource *ret = klass->io_create_watch(ioc, condition); 250 251 if (ioc->name) { 252 g_source_set_name(ret, ioc->name); 253 } 254 255 return ret; 256 } 257 258 259 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 260 AioContext *ctx, 261 IOHandler *io_read, 262 IOHandler *io_write, 263 void *opaque) 264 { 265 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 266 267 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 268 } 269 270 guint qio_channel_add_watch(QIOChannel *ioc, 271 GIOCondition condition, 272 QIOChannelFunc func, 273 gpointer user_data, 274 GDestroyNotify notify) 275 { 276 GSource *source; 277 guint id; 278 279 source = qio_channel_create_watch(ioc, condition); 280 281 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 282 283 id = g_source_attach(source, NULL); 284 g_source_unref(source); 285 286 return id; 287 } 288 289 290 int qio_channel_shutdown(QIOChannel *ioc, 291 QIOChannelShutdown how, 292 Error **errp) 293 { 294 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 295 296 if (!klass->io_shutdown) { 297 error_setg(errp, "Data path shutdown not supported"); 298 return -1; 299 } 300 301 return klass->io_shutdown(ioc, how, errp); 302 } 303 304 305 void qio_channel_set_delay(QIOChannel *ioc, 306 bool enabled) 307 { 308 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 309 310 if (klass->io_set_delay) { 311 klass->io_set_delay(ioc, enabled); 312 } 313 } 314 315 316 void qio_channel_set_cork(QIOChannel *ioc, 317 bool enabled) 318 { 319 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 320 321 if (klass->io_set_cork) { 322 klass->io_set_cork(ioc, enabled); 323 } 324 } 325 326 327 off_t qio_channel_io_seek(QIOChannel *ioc, 328 off_t offset, 329 int whence, 330 Error **errp) 331 { 332 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 333 334 if (!klass->io_seek) { 335 error_setg(errp, "Channel does not support random access"); 336 return -1; 337 } 338 339 return klass->io_seek(ioc, offset, whence, errp); 340 } 341 342 343 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc); 344 345 static void qio_channel_restart_read(void *opaque) 346 { 347 QIOChannel *ioc = opaque; 348 Coroutine *co = ioc->read_coroutine; 349 350 ioc->read_coroutine = NULL; 351 qio_channel_set_aio_fd_handlers(ioc); 352 aio_co_wake(co); 353 } 354 355 static void qio_channel_restart_write(void *opaque) 356 { 357 QIOChannel *ioc = opaque; 358 Coroutine *co = ioc->write_coroutine; 359 360 ioc->write_coroutine = NULL; 361 qio_channel_set_aio_fd_handlers(ioc); 362 aio_co_wake(co); 363 } 364 365 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 366 { 367 IOHandler *rd_handler = NULL, *wr_handler = NULL; 368 AioContext *ctx; 369 370 if (ioc->read_coroutine) { 371 rd_handler = qio_channel_restart_read; 372 } 373 if (ioc->write_coroutine) { 374 wr_handler = qio_channel_restart_write; 375 } 376 377 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 378 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 379 } 380 381 void qio_channel_attach_aio_context(QIOChannel *ioc, 382 AioContext *ctx) 383 { 384 assert(!ioc->read_coroutine); 385 assert(!ioc->write_coroutine); 386 ioc->ctx = ctx; 387 } 388 389 void qio_channel_detach_aio_context(QIOChannel *ioc) 390 { 391 ioc->read_coroutine = NULL; 392 ioc->write_coroutine = NULL; 393 qio_channel_set_aio_fd_handlers(ioc); 394 ioc->ctx = NULL; 395 } 396 397 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 398 GIOCondition condition) 399 { 400 assert(qemu_in_coroutine()); 401 if (condition == G_IO_IN) { 402 assert(!ioc->read_coroutine); 403 ioc->read_coroutine = qemu_coroutine_self(); 404 } else if (condition == G_IO_OUT) { 405 assert(!ioc->write_coroutine); 406 ioc->write_coroutine = qemu_coroutine_self(); 407 } else { 408 abort(); 409 } 410 qio_channel_set_aio_fd_handlers(ioc); 411 qemu_coroutine_yield(); 412 } 413 414 415 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 416 GIOCondition condition, 417 gpointer opaque) 418 { 419 GMainLoop *loop = opaque; 420 421 g_main_loop_quit(loop); 422 return FALSE; 423 } 424 425 426 void qio_channel_wait(QIOChannel *ioc, 427 GIOCondition condition) 428 { 429 GMainContext *ctxt = g_main_context_new(); 430 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 431 GSource *source; 432 433 source = qio_channel_create_watch(ioc, condition); 434 435 g_source_set_callback(source, 436 (GSourceFunc)qio_channel_wait_complete, 437 loop, 438 NULL); 439 440 g_source_attach(source, ctxt); 441 442 g_main_loop_run(loop); 443 444 g_source_unref(source); 445 g_main_loop_unref(loop); 446 g_main_context_unref(ctxt); 447 } 448 449 450 static void qio_channel_finalize(Object *obj) 451 { 452 QIOChannel *ioc = QIO_CHANNEL(obj); 453 454 g_free(ioc->name); 455 456 #ifdef _WIN32 457 if (ioc->event) { 458 CloseHandle(ioc->event); 459 } 460 #endif 461 } 462 463 static const TypeInfo qio_channel_info = { 464 .parent = TYPE_OBJECT, 465 .name = TYPE_QIO_CHANNEL, 466 .instance_size = sizeof(QIOChannel), 467 .instance_finalize = qio_channel_finalize, 468 .abstract = true, 469 .class_size = sizeof(QIOChannelClass), 470 }; 471 472 473 static void qio_channel_register_types(void) 474 { 475 type_register_static(&qio_channel_info); 476 } 477 478 479 type_init(qio_channel_register_types); 480