xref: /qemu/io/channel-socket.c (revision 60f782b6)
1 /*
2  * QEMU I/O channels sockets driver
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-watch.h"
26 #include "trace.h"
27 #include "qapi/clone-visitor.h"
28 #ifdef CONFIG_LINUX
29 #include <linux/errqueue.h>
30 #include <sys/socket.h>
31 
32 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
33 #define QEMU_MSG_ZEROCOPY
34 #endif
35 #endif
36 
37 #define SOCKET_MAX_FDS 16
38 
39 SocketAddress *
40 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
41                                      Error **errp)
42 {
43     return socket_sockaddr_to_address(&ioc->localAddr,
44                                       ioc->localAddrLen,
45                                       errp);
46 }
47 
48 SocketAddress *
49 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
50                                       Error **errp)
51 {
52     return socket_sockaddr_to_address(&ioc->remoteAddr,
53                                       ioc->remoteAddrLen,
54                                       errp);
55 }
56 
57 QIOChannelSocket *
58 qio_channel_socket_new(void)
59 {
60     QIOChannelSocket *sioc;
61     QIOChannel *ioc;
62 
63     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
64     sioc->fd = -1;
65     sioc->zero_copy_queued = 0;
66     sioc->zero_copy_sent = 0;
67 
68     ioc = QIO_CHANNEL(sioc);
69     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
70 
71 #ifdef WIN32
72     ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
73 #endif
74 
75     trace_qio_channel_socket_new(sioc);
76 
77     return sioc;
78 }
79 
80 
81 static int
82 qio_channel_socket_set_fd(QIOChannelSocket *sioc,
83                           int fd,
84                           Error **errp)
85 {
86     if (sioc->fd != -1) {
87         error_setg(errp, "Socket is already open");
88         return -1;
89     }
90 
91     sioc->fd = fd;
92     sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
93     sioc->localAddrLen = sizeof(sioc->localAddr);
94 
95 
96     if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
97                     &sioc->remoteAddrLen) < 0) {
98         if (errno == ENOTCONN) {
99             memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
100             sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
101         } else {
102             error_setg_errno(errp, errno,
103                              "Unable to query remote socket address");
104             goto error;
105         }
106     }
107 
108     if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
109                     &sioc->localAddrLen) < 0) {
110         error_setg_errno(errp, errno,
111                          "Unable to query local socket address");
112         goto error;
113     }
114 
115 #ifndef WIN32
116     if (sioc->localAddr.ss_family == AF_UNIX) {
117         QIOChannel *ioc = QIO_CHANNEL(sioc);
118         qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
119     }
120 #endif /* WIN32 */
121 
122     return 0;
123 
124  error:
125     sioc->fd = -1; /* Let the caller close FD on failure */
126     return -1;
127 }
128 
129 QIOChannelSocket *
130 qio_channel_socket_new_fd(int fd,
131                           Error **errp)
132 {
133     QIOChannelSocket *ioc;
134 
135     ioc = qio_channel_socket_new();
136     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
137         object_unref(OBJECT(ioc));
138         return NULL;
139     }
140 
141     trace_qio_channel_socket_new_fd(ioc, fd);
142 
143     return ioc;
144 }
145 
146 
147 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
148                                     SocketAddress *addr,
149                                     Error **errp)
150 {
151     int fd;
152 
153     trace_qio_channel_socket_connect_sync(ioc, addr);
154     fd = socket_connect(addr, errp);
155     if (fd < 0) {
156         trace_qio_channel_socket_connect_fail(ioc);
157         return -1;
158     }
159 
160     trace_qio_channel_socket_connect_complete(ioc, fd);
161     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
162         close(fd);
163         return -1;
164     }
165 
166 #ifdef QEMU_MSG_ZEROCOPY
167     int ret, v = 1;
168     ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
169     if (ret == 0) {
170         /* Zero copy available on host */
171         qio_channel_set_feature(QIO_CHANNEL(ioc),
172                                 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
173     }
174 #endif
175 
176     qio_channel_set_feature(QIO_CHANNEL(ioc),
177                             QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
178 
179     return 0;
180 }
181 
182 
183 static void qio_channel_socket_connect_worker(QIOTask *task,
184                                               gpointer opaque)
185 {
186     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
187     SocketAddress *addr = opaque;
188     Error *err = NULL;
189 
190     qio_channel_socket_connect_sync(ioc, addr, &err);
191 
192     qio_task_set_error(task, err);
193 }
194 
195 
196 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
197                                       SocketAddress *addr,
198                                       QIOTaskFunc callback,
199                                       gpointer opaque,
200                                       GDestroyNotify destroy,
201                                       GMainContext *context)
202 {
203     QIOTask *task = qio_task_new(
204         OBJECT(ioc), callback, opaque, destroy);
205     SocketAddress *addrCopy;
206 
207     addrCopy = QAPI_CLONE(SocketAddress, addr);
208 
209     /* socket_connect() does a non-blocking connect(), but it
210      * still blocks in DNS lookups, so we must use a thread */
211     trace_qio_channel_socket_connect_async(ioc, addr);
212     qio_task_run_in_thread(task,
213                            qio_channel_socket_connect_worker,
214                            addrCopy,
215                            (GDestroyNotify)qapi_free_SocketAddress,
216                            context);
217 }
218 
219 
220 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
221                                    SocketAddress *addr,
222                                    int num,
223                                    Error **errp)
224 {
225     int fd;
226 
227     trace_qio_channel_socket_listen_sync(ioc, addr, num);
228     fd = socket_listen(addr, num, errp);
229     if (fd < 0) {
230         trace_qio_channel_socket_listen_fail(ioc);
231         return -1;
232     }
233 
234     trace_qio_channel_socket_listen_complete(ioc, fd);
235     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
236         close(fd);
237         return -1;
238     }
239     qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
240 
241     return 0;
242 }
243 
244 
245 struct QIOChannelListenWorkerData {
246     SocketAddress *addr;
247     int num; /* amount of expected connections */
248 };
249 
250 static void qio_channel_listen_worker_free(gpointer opaque)
251 {
252     struct QIOChannelListenWorkerData *data = opaque;
253 
254     qapi_free_SocketAddress(data->addr);
255     g_free(data);
256 }
257 
258 static void qio_channel_socket_listen_worker(QIOTask *task,
259                                              gpointer opaque)
260 {
261     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
262     struct QIOChannelListenWorkerData *data = opaque;
263     Error *err = NULL;
264 
265     qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
266 
267     qio_task_set_error(task, err);
268 }
269 
270 
271 void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
272                                      SocketAddress *addr,
273                                      int num,
274                                      QIOTaskFunc callback,
275                                      gpointer opaque,
276                                      GDestroyNotify destroy,
277                                      GMainContext *context)
278 {
279     QIOTask *task = qio_task_new(
280         OBJECT(ioc), callback, opaque, destroy);
281     struct QIOChannelListenWorkerData *data;
282 
283     data = g_new0(struct QIOChannelListenWorkerData, 1);
284     data->addr = QAPI_CLONE(SocketAddress, addr);
285     data->num = num;
286 
287     /* socket_listen() blocks in DNS lookups, so we must use a thread */
288     trace_qio_channel_socket_listen_async(ioc, addr, num);
289     qio_task_run_in_thread(task,
290                            qio_channel_socket_listen_worker,
291                            data,
292                            qio_channel_listen_worker_free,
293                            context);
294 }
295 
296 
297 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
298                                   SocketAddress *localAddr,
299                                   SocketAddress *remoteAddr,
300                                   Error **errp)
301 {
302     int fd;
303 
304     trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
305     fd = socket_dgram(remoteAddr, localAddr, errp);
306     if (fd < 0) {
307         trace_qio_channel_socket_dgram_fail(ioc);
308         return -1;
309     }
310 
311     trace_qio_channel_socket_dgram_complete(ioc, fd);
312     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
313         close(fd);
314         return -1;
315     }
316 
317     return 0;
318 }
319 
320 
321 struct QIOChannelSocketDGramWorkerData {
322     SocketAddress *localAddr;
323     SocketAddress *remoteAddr;
324 };
325 
326 
327 static void qio_channel_socket_dgram_worker_free(gpointer opaque)
328 {
329     struct QIOChannelSocketDGramWorkerData *data = opaque;
330     qapi_free_SocketAddress(data->localAddr);
331     qapi_free_SocketAddress(data->remoteAddr);
332     g_free(data);
333 }
334 
335 static void qio_channel_socket_dgram_worker(QIOTask *task,
336                                             gpointer opaque)
337 {
338     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
339     struct QIOChannelSocketDGramWorkerData *data = opaque;
340     Error *err = NULL;
341 
342     /* socket_dgram() blocks in DNS lookups, so we must use a thread */
343     qio_channel_socket_dgram_sync(ioc, data->localAddr,
344                                   data->remoteAddr, &err);
345 
346     qio_task_set_error(task, err);
347 }
348 
349 
350 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
351                                     SocketAddress *localAddr,
352                                     SocketAddress *remoteAddr,
353                                     QIOTaskFunc callback,
354                                     gpointer opaque,
355                                     GDestroyNotify destroy,
356                                     GMainContext *context)
357 {
358     QIOTask *task = qio_task_new(
359         OBJECT(ioc), callback, opaque, destroy);
360     struct QIOChannelSocketDGramWorkerData *data = g_new0(
361         struct QIOChannelSocketDGramWorkerData, 1);
362 
363     data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
364     data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
365 
366     trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
367     qio_task_run_in_thread(task,
368                            qio_channel_socket_dgram_worker,
369                            data,
370                            qio_channel_socket_dgram_worker_free,
371                            context);
372 }
373 
374 
375 QIOChannelSocket *
376 qio_channel_socket_accept(QIOChannelSocket *ioc,
377                           Error **errp)
378 {
379     QIOChannelSocket *cioc;
380 
381     cioc = qio_channel_socket_new();
382     cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
383     cioc->localAddrLen = sizeof(ioc->localAddr);
384 
385  retry:
386     trace_qio_channel_socket_accept(ioc);
387     cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
388                            &cioc->remoteAddrLen);
389     if (cioc->fd < 0) {
390         if (errno == EINTR) {
391             goto retry;
392         }
393         error_setg_errno(errp, errno, "Unable to accept connection");
394         trace_qio_channel_socket_accept_fail(ioc);
395         goto error;
396     }
397 
398     if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
399                     &cioc->localAddrLen) < 0) {
400         error_setg_errno(errp, errno,
401                          "Unable to query local socket address");
402         goto error;
403     }
404 
405 #ifndef WIN32
406     if (cioc->localAddr.ss_family == AF_UNIX) {
407         QIOChannel *ioc_local = QIO_CHANNEL(cioc);
408         qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
409     }
410 #endif /* WIN32 */
411 
412     qio_channel_set_feature(QIO_CHANNEL(cioc),
413                             QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
414 
415     trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
416     return cioc;
417 
418  error:
419     object_unref(OBJECT(cioc));
420     return NULL;
421 }
422 
423 static void qio_channel_socket_init(Object *obj)
424 {
425     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
426     ioc->fd = -1;
427 }
428 
429 static void qio_channel_socket_finalize(Object *obj)
430 {
431     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
432 
433     if (ioc->fd != -1) {
434         QIOChannel *ioc_local = QIO_CHANNEL(ioc);
435         if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
436             Error *err = NULL;
437 
438             socket_listen_cleanup(ioc->fd, &err);
439             if (err) {
440                 error_report_err(err);
441                 err = NULL;
442             }
443         }
444 #ifdef WIN32
445         qemu_socket_unselect(ioc->fd, NULL);
446 #endif
447         close(ioc->fd);
448         ioc->fd = -1;
449     }
450 }
451 
452 
453 #ifndef WIN32
454 static void qio_channel_socket_copy_fds(struct msghdr *msg,
455                                         int **fds, size_t *nfds)
456 {
457     struct cmsghdr *cmsg;
458 
459     *nfds = 0;
460     *fds = NULL;
461 
462     for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
463         int fd_size, i;
464         int gotfds;
465 
466         if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
467             cmsg->cmsg_level != SOL_SOCKET ||
468             cmsg->cmsg_type != SCM_RIGHTS) {
469             continue;
470         }
471 
472         fd_size = cmsg->cmsg_len - CMSG_LEN(0);
473 
474         if (!fd_size) {
475             continue;
476         }
477 
478         gotfds = fd_size / sizeof(int);
479         *fds = g_renew(int, *fds, *nfds + gotfds);
480         memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
481 
482         for (i = 0; i < gotfds; i++) {
483             int fd = (*fds)[*nfds + i];
484             if (fd < 0) {
485                 continue;
486             }
487 
488             /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
489             qemu_socket_set_block(fd);
490 
491 #ifndef MSG_CMSG_CLOEXEC
492             qemu_set_cloexec(fd);
493 #endif
494         }
495         *nfds += gotfds;
496     }
497 }
498 
499 
500 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
501                                         const struct iovec *iov,
502                                         size_t niov,
503                                         int **fds,
504                                         size_t *nfds,
505                                         int flags,
506                                         Error **errp)
507 {
508     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
509     ssize_t ret;
510     struct msghdr msg = { NULL, };
511     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
512     int sflags = 0;
513 
514     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
515 
516     msg.msg_iov = (struct iovec *)iov;
517     msg.msg_iovlen = niov;
518     if (fds && nfds) {
519         msg.msg_control = control;
520         msg.msg_controllen = sizeof(control);
521 #ifdef MSG_CMSG_CLOEXEC
522         sflags |= MSG_CMSG_CLOEXEC;
523 #endif
524 
525     }
526 
527     if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
528         sflags |= MSG_PEEK;
529     }
530 
531  retry:
532     ret = recvmsg(sioc->fd, &msg, sflags);
533     if (ret < 0) {
534         if (errno == EAGAIN) {
535             return QIO_CHANNEL_ERR_BLOCK;
536         }
537         if (errno == EINTR) {
538             goto retry;
539         }
540 
541         error_setg_errno(errp, errno,
542                          "Unable to read from socket");
543         return -1;
544     }
545 
546     if (fds && nfds) {
547         qio_channel_socket_copy_fds(&msg, fds, nfds);
548     }
549 
550     return ret;
551 }
552 
553 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
554                                          const struct iovec *iov,
555                                          size_t niov,
556                                          int *fds,
557                                          size_t nfds,
558                                          int flags,
559                                          Error **errp)
560 {
561     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
562     ssize_t ret;
563     struct msghdr msg = { NULL, };
564     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
565     size_t fdsize = sizeof(int) * nfds;
566     struct cmsghdr *cmsg;
567     int sflags = 0;
568 
569     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
570 
571     msg.msg_iov = (struct iovec *)iov;
572     msg.msg_iovlen = niov;
573 
574     if (nfds) {
575         if (nfds > SOCKET_MAX_FDS) {
576             error_setg_errno(errp, EINVAL,
577                              "Only %d FDs can be sent, got %zu",
578                              SOCKET_MAX_FDS, nfds);
579             return -1;
580         }
581 
582         msg.msg_control = control;
583         msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
584 
585         cmsg = CMSG_FIRSTHDR(&msg);
586         cmsg->cmsg_len = CMSG_LEN(fdsize);
587         cmsg->cmsg_level = SOL_SOCKET;
588         cmsg->cmsg_type = SCM_RIGHTS;
589         memcpy(CMSG_DATA(cmsg), fds, fdsize);
590     }
591 
592     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
593 #ifdef QEMU_MSG_ZEROCOPY
594         sflags = MSG_ZEROCOPY;
595 #else
596         /*
597          * We expect QIOChannel class entry point to have
598          * blocked this code path already
599          */
600         g_assert_not_reached();
601 #endif
602     }
603 
604  retry:
605     ret = sendmsg(sioc->fd, &msg, sflags);
606     if (ret <= 0) {
607         switch (errno) {
608         case EAGAIN:
609             return QIO_CHANNEL_ERR_BLOCK;
610         case EINTR:
611             goto retry;
612         case ENOBUFS:
613             if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
614                 error_setg_errno(errp, errno,
615                                  "Process can't lock enough memory for using MSG_ZEROCOPY");
616                 return -1;
617             }
618             break;
619         }
620 
621         error_setg_errno(errp, errno,
622                          "Unable to write to socket");
623         return -1;
624     }
625 
626     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
627         sioc->zero_copy_queued++;
628     }
629 
630     return ret;
631 }
632 #else /* WIN32 */
633 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
634                                         const struct iovec *iov,
635                                         size_t niov,
636                                         int **fds,
637                                         size_t *nfds,
638                                         int flags,
639                                         Error **errp)
640 {
641     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
642     ssize_t done = 0;
643     ssize_t i;
644     int sflags = 0;
645 
646     if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
647         sflags |= MSG_PEEK;
648     }
649 
650     for (i = 0; i < niov; i++) {
651         ssize_t ret;
652     retry:
653         ret = recv(sioc->fd,
654                    iov[i].iov_base,
655                    iov[i].iov_len,
656                    sflags);
657         if (ret < 0) {
658             if (errno == EAGAIN) {
659                 if (done) {
660                     return done;
661                 } else {
662                     return QIO_CHANNEL_ERR_BLOCK;
663                 }
664             } else if (errno == EINTR) {
665                 goto retry;
666             } else {
667                 error_setg_errno(errp, errno,
668                                  "Unable to read from socket");
669                 return -1;
670             }
671         }
672         done += ret;
673         if (ret < iov[i].iov_len) {
674             return done;
675         }
676     }
677 
678     return done;
679 }
680 
681 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
682                                          const struct iovec *iov,
683                                          size_t niov,
684                                          int *fds,
685                                          size_t nfds,
686                                          int flags,
687                                          Error **errp)
688 {
689     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
690     ssize_t done = 0;
691     ssize_t i;
692 
693     for (i = 0; i < niov; i++) {
694         ssize_t ret;
695     retry:
696         ret = send(sioc->fd,
697                    iov[i].iov_base,
698                    iov[i].iov_len,
699                    0);
700         if (ret < 0) {
701             if (errno == EAGAIN) {
702                 if (done) {
703                     return done;
704                 } else {
705                     return QIO_CHANNEL_ERR_BLOCK;
706                 }
707             } else if (errno == EINTR) {
708                 goto retry;
709             } else {
710                 error_setg_errno(errp, errno,
711                                  "Unable to write to socket");
712                 return -1;
713             }
714         }
715         done += ret;
716         if (ret < iov[i].iov_len) {
717             return done;
718         }
719     }
720 
721     return done;
722 }
723 #endif /* WIN32 */
724 
725 
726 #ifdef QEMU_MSG_ZEROCOPY
727 static int qio_channel_socket_flush(QIOChannel *ioc,
728                                     Error **errp)
729 {
730     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
731     struct msghdr msg = {};
732     struct sock_extended_err *serr;
733     struct cmsghdr *cm;
734     char control[CMSG_SPACE(sizeof(*serr))];
735     int received;
736     int ret;
737 
738     if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
739         return 0;
740     }
741 
742     msg.msg_control = control;
743     msg.msg_controllen = sizeof(control);
744     memset(control, 0, sizeof(control));
745 
746     ret = 1;
747 
748     while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
749         received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
750         if (received < 0) {
751             switch (errno) {
752             case EAGAIN:
753                 /* Nothing on errqueue, wait until something is available */
754                 qio_channel_wait(ioc, G_IO_ERR);
755                 continue;
756             case EINTR:
757                 continue;
758             default:
759                 error_setg_errno(errp, errno,
760                                  "Unable to read errqueue");
761                 return -1;
762             }
763         }
764 
765         cm = CMSG_FIRSTHDR(&msg);
766         if (cm->cmsg_level != SOL_IP   && cm->cmsg_type != IP_RECVERR &&
767             cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
768             error_setg_errno(errp, EPROTOTYPE,
769                              "Wrong cmsg in errqueue");
770             return -1;
771         }
772 
773         serr = (void *) CMSG_DATA(cm);
774         if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
775             error_setg_errno(errp, serr->ee_errno,
776                              "Error on socket");
777             return -1;
778         }
779         if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
780             error_setg_errno(errp, serr->ee_origin,
781                              "Error not from zero copy");
782             return -1;
783         }
784 
785         /* No errors, count successfully finished sendmsg()*/
786         sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
787 
788         /* If any sendmsg() succeeded using zero copy, return 0 at the end */
789         if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
790             ret = 0;
791         }
792     }
793 
794     return ret;
795 }
796 
797 #endif /* QEMU_MSG_ZEROCOPY */
798 
799 static int
800 qio_channel_socket_set_blocking(QIOChannel *ioc,
801                                 bool enabled,
802                                 Error **errp)
803 {
804     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
805 
806     if (enabled) {
807         qemu_socket_set_block(sioc->fd);
808     } else {
809         qemu_socket_set_nonblock(sioc->fd);
810     }
811     return 0;
812 }
813 
814 
815 static void
816 qio_channel_socket_set_delay(QIOChannel *ioc,
817                              bool enabled)
818 {
819     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
820     int v = enabled ? 0 : 1;
821 
822     setsockopt(sioc->fd,
823                IPPROTO_TCP, TCP_NODELAY,
824                &v, sizeof(v));
825 }
826 
827 
828 static void
829 qio_channel_socket_set_cork(QIOChannel *ioc,
830                             bool enabled)
831 {
832     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
833     int v = enabled ? 1 : 0;
834 
835     socket_set_cork(sioc->fd, v);
836 }
837 
838 
839 static int
840 qio_channel_socket_close(QIOChannel *ioc,
841                          Error **errp)
842 {
843     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
844     int rc = 0;
845     Error *err = NULL;
846 
847     if (sioc->fd != -1) {
848 #ifdef WIN32
849         qemu_socket_unselect(sioc->fd, NULL);
850 #endif
851         if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
852             socket_listen_cleanup(sioc->fd, errp);
853         }
854 
855         if (close(sioc->fd) < 0) {
856             sioc->fd = -1;
857             error_setg_errno(&err, errno, "Unable to close socket");
858             error_propagate(errp, err);
859             return -1;
860         }
861         sioc->fd = -1;
862     }
863     return rc;
864 }
865 
866 static int
867 qio_channel_socket_shutdown(QIOChannel *ioc,
868                             QIOChannelShutdown how,
869                             Error **errp)
870 {
871     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
872     int sockhow;
873 
874     switch (how) {
875     case QIO_CHANNEL_SHUTDOWN_READ:
876         sockhow = SHUT_RD;
877         break;
878     case QIO_CHANNEL_SHUTDOWN_WRITE:
879         sockhow = SHUT_WR;
880         break;
881     case QIO_CHANNEL_SHUTDOWN_BOTH:
882     default:
883         sockhow = SHUT_RDWR;
884         break;
885     }
886 
887     if (shutdown(sioc->fd, sockhow) < 0) {
888         error_setg_errno(errp, errno,
889                          "Unable to shutdown socket");
890         return -1;
891     }
892     return 0;
893 }
894 
895 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
896                                                   AioContext *ctx,
897                                                   IOHandler *io_read,
898                                                   IOHandler *io_write,
899                                                   void *opaque)
900 {
901     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
902     aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
903 }
904 
905 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
906                                                 GIOCondition condition)
907 {
908     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
909     return qio_channel_create_socket_watch(ioc,
910                                            sioc->fd,
911                                            condition);
912 }
913 
914 static void qio_channel_socket_class_init(ObjectClass *klass,
915                                           void *class_data G_GNUC_UNUSED)
916 {
917     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
918 
919     ioc_klass->io_writev = qio_channel_socket_writev;
920     ioc_klass->io_readv = qio_channel_socket_readv;
921     ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
922     ioc_klass->io_close = qio_channel_socket_close;
923     ioc_klass->io_shutdown = qio_channel_socket_shutdown;
924     ioc_klass->io_set_cork = qio_channel_socket_set_cork;
925     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
926     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
927     ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
928 #ifdef QEMU_MSG_ZEROCOPY
929     ioc_klass->io_flush = qio_channel_socket_flush;
930 #endif
931 }
932 
933 static const TypeInfo qio_channel_socket_info = {
934     .parent = TYPE_QIO_CHANNEL,
935     .name = TYPE_QIO_CHANNEL_SOCKET,
936     .instance_size = sizeof(QIOChannelSocket),
937     .instance_init = qio_channel_socket_init,
938     .instance_finalize = qio_channel_socket_finalize,
939     .class_init = qio_channel_socket_class_init,
940 };
941 
942 static void qio_channel_socket_register_types(void)
943 {
944     type_register_static(&qio_channel_socket_info);
945 }
946 
947 type_init(qio_channel_socket_register_types);
948