xref: /qemu/io/channel-socket.c (revision 76eb88b1)
1 /*
2  * QEMU I/O channels sockets driver
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-watch.h"
26 #include "trace.h"
27 #include "qapi/clone-visitor.h"
28 #ifdef CONFIG_LINUX
29 #include <linux/errqueue.h>
30 #include <sys/socket.h>
31 
32 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
33 #define QEMU_MSG_ZEROCOPY
34 #endif
35 #endif
36 
37 #define SOCKET_MAX_FDS 16
38 
39 SocketAddress *
40 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
41                                      Error **errp)
42 {
43     return socket_sockaddr_to_address(&ioc->localAddr,
44                                       ioc->localAddrLen,
45                                       errp);
46 }
47 
48 SocketAddress *
49 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
50                                       Error **errp)
51 {
52     return socket_sockaddr_to_address(&ioc->remoteAddr,
53                                       ioc->remoteAddrLen,
54                                       errp);
55 }
56 
57 QIOChannelSocket *
58 qio_channel_socket_new(void)
59 {
60     QIOChannelSocket *sioc;
61     QIOChannel *ioc;
62 
63     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
64     sioc->fd = -1;
65     sioc->zero_copy_queued = 0;
66     sioc->zero_copy_sent = 0;
67 
68     ioc = QIO_CHANNEL(sioc);
69     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
70 
71 #ifdef WIN32
72     ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
73 #endif
74 
75     trace_qio_channel_socket_new(sioc);
76 
77     return sioc;
78 }
79 
80 
81 static int
82 qio_channel_socket_set_fd(QIOChannelSocket *sioc,
83                           int fd,
84                           Error **errp)
85 {
86     if (sioc->fd != -1) {
87         error_setg(errp, "Socket is already open");
88         return -1;
89     }
90 
91     sioc->fd = fd;
92     sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
93     sioc->localAddrLen = sizeof(sioc->localAddr);
94 
95 
96     if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
97                     &sioc->remoteAddrLen) < 0) {
98         if (errno == ENOTCONN) {
99             memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
100             sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
101         } else {
102             error_setg_errno(errp, errno,
103                              "Unable to query remote socket address");
104             goto error;
105         }
106     }
107 
108     if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
109                     &sioc->localAddrLen) < 0) {
110         error_setg_errno(errp, errno,
111                          "Unable to query local socket address");
112         goto error;
113     }
114 
115 #ifndef WIN32
116     if (sioc->localAddr.ss_family == AF_UNIX) {
117         QIOChannel *ioc = QIO_CHANNEL(sioc);
118         qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
119     }
120 #endif /* WIN32 */
121 
122     return 0;
123 
124  error:
125     sioc->fd = -1; /* Let the caller close FD on failure */
126     return -1;
127 }
128 
129 QIOChannelSocket *
130 qio_channel_socket_new_fd(int fd,
131                           Error **errp)
132 {
133     QIOChannelSocket *ioc;
134 
135     ioc = qio_channel_socket_new();
136     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
137         object_unref(OBJECT(ioc));
138         return NULL;
139     }
140 
141     trace_qio_channel_socket_new_fd(ioc, fd);
142 
143     return ioc;
144 }
145 
146 
147 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
148                                     SocketAddress *addr,
149                                     Error **errp)
150 {
151     int fd;
152 
153     trace_qio_channel_socket_connect_sync(ioc, addr);
154     fd = socket_connect(addr, errp);
155     if (fd < 0) {
156         trace_qio_channel_socket_connect_fail(ioc);
157         return -1;
158     }
159 
160     trace_qio_channel_socket_connect_complete(ioc, fd);
161     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
162         close(fd);
163         return -1;
164     }
165 
166 #ifdef QEMU_MSG_ZEROCOPY
167     int ret, v = 1;
168     ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
169     if (ret == 0) {
170         /* Zero copy available on host */
171         qio_channel_set_feature(QIO_CHANNEL(ioc),
172                                 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
173     }
174 #endif
175 
176     return 0;
177 }
178 
179 
180 static void qio_channel_socket_connect_worker(QIOTask *task,
181                                               gpointer opaque)
182 {
183     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
184     SocketAddress *addr = opaque;
185     Error *err = NULL;
186 
187     qio_channel_socket_connect_sync(ioc, addr, &err);
188 
189     qio_task_set_error(task, err);
190 }
191 
192 
193 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
194                                       SocketAddress *addr,
195                                       QIOTaskFunc callback,
196                                       gpointer opaque,
197                                       GDestroyNotify destroy,
198                                       GMainContext *context)
199 {
200     QIOTask *task = qio_task_new(
201         OBJECT(ioc), callback, opaque, destroy);
202     SocketAddress *addrCopy;
203 
204     addrCopy = QAPI_CLONE(SocketAddress, addr);
205 
206     /* socket_connect() does a non-blocking connect(), but it
207      * still blocks in DNS lookups, so we must use a thread */
208     trace_qio_channel_socket_connect_async(ioc, addr);
209     qio_task_run_in_thread(task,
210                            qio_channel_socket_connect_worker,
211                            addrCopy,
212                            (GDestroyNotify)qapi_free_SocketAddress,
213                            context);
214 }
215 
216 
217 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
218                                    SocketAddress *addr,
219                                    int num,
220                                    Error **errp)
221 {
222     int fd;
223 
224     trace_qio_channel_socket_listen_sync(ioc, addr, num);
225     fd = socket_listen(addr, num, errp);
226     if (fd < 0) {
227         trace_qio_channel_socket_listen_fail(ioc);
228         return -1;
229     }
230 
231     trace_qio_channel_socket_listen_complete(ioc, fd);
232     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
233         close(fd);
234         return -1;
235     }
236     qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
237 
238     return 0;
239 }
240 
241 
242 struct QIOChannelListenWorkerData {
243     SocketAddress *addr;
244     int num; /* amount of expected connections */
245 };
246 
247 static void qio_channel_listen_worker_free(gpointer opaque)
248 {
249     struct QIOChannelListenWorkerData *data = opaque;
250 
251     qapi_free_SocketAddress(data->addr);
252     g_free(data);
253 }
254 
255 static void qio_channel_socket_listen_worker(QIOTask *task,
256                                              gpointer opaque)
257 {
258     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
259     struct QIOChannelListenWorkerData *data = opaque;
260     Error *err = NULL;
261 
262     qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
263 
264     qio_task_set_error(task, err);
265 }
266 
267 
268 void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
269                                      SocketAddress *addr,
270                                      int num,
271                                      QIOTaskFunc callback,
272                                      gpointer opaque,
273                                      GDestroyNotify destroy,
274                                      GMainContext *context)
275 {
276     QIOTask *task = qio_task_new(
277         OBJECT(ioc), callback, opaque, destroy);
278     struct QIOChannelListenWorkerData *data;
279 
280     data = g_new0(struct QIOChannelListenWorkerData, 1);
281     data->addr = QAPI_CLONE(SocketAddress, addr);
282     data->num = num;
283 
284     /* socket_listen() blocks in DNS lookups, so we must use a thread */
285     trace_qio_channel_socket_listen_async(ioc, addr, num);
286     qio_task_run_in_thread(task,
287                            qio_channel_socket_listen_worker,
288                            data,
289                            qio_channel_listen_worker_free,
290                            context);
291 }
292 
293 
294 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
295                                   SocketAddress *localAddr,
296                                   SocketAddress *remoteAddr,
297                                   Error **errp)
298 {
299     int fd;
300 
301     trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
302     fd = socket_dgram(remoteAddr, localAddr, errp);
303     if (fd < 0) {
304         trace_qio_channel_socket_dgram_fail(ioc);
305         return -1;
306     }
307 
308     trace_qio_channel_socket_dgram_complete(ioc, fd);
309     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
310         close(fd);
311         return -1;
312     }
313 
314     return 0;
315 }
316 
317 
318 struct QIOChannelSocketDGramWorkerData {
319     SocketAddress *localAddr;
320     SocketAddress *remoteAddr;
321 };
322 
323 
324 static void qio_channel_socket_dgram_worker_free(gpointer opaque)
325 {
326     struct QIOChannelSocketDGramWorkerData *data = opaque;
327     qapi_free_SocketAddress(data->localAddr);
328     qapi_free_SocketAddress(data->remoteAddr);
329     g_free(data);
330 }
331 
332 static void qio_channel_socket_dgram_worker(QIOTask *task,
333                                             gpointer opaque)
334 {
335     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
336     struct QIOChannelSocketDGramWorkerData *data = opaque;
337     Error *err = NULL;
338 
339     /* socket_dgram() blocks in DNS lookups, so we must use a thread */
340     qio_channel_socket_dgram_sync(ioc, data->localAddr,
341                                   data->remoteAddr, &err);
342 
343     qio_task_set_error(task, err);
344 }
345 
346 
347 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
348                                     SocketAddress *localAddr,
349                                     SocketAddress *remoteAddr,
350                                     QIOTaskFunc callback,
351                                     gpointer opaque,
352                                     GDestroyNotify destroy,
353                                     GMainContext *context)
354 {
355     QIOTask *task = qio_task_new(
356         OBJECT(ioc), callback, opaque, destroy);
357     struct QIOChannelSocketDGramWorkerData *data = g_new0(
358         struct QIOChannelSocketDGramWorkerData, 1);
359 
360     data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
361     data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
362 
363     trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
364     qio_task_run_in_thread(task,
365                            qio_channel_socket_dgram_worker,
366                            data,
367                            qio_channel_socket_dgram_worker_free,
368                            context);
369 }
370 
371 
372 QIOChannelSocket *
373 qio_channel_socket_accept(QIOChannelSocket *ioc,
374                           Error **errp)
375 {
376     QIOChannelSocket *cioc;
377 
378     cioc = qio_channel_socket_new();
379     cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
380     cioc->localAddrLen = sizeof(ioc->localAddr);
381 
382  retry:
383     trace_qio_channel_socket_accept(ioc);
384     cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
385                            &cioc->remoteAddrLen);
386     if (cioc->fd < 0) {
387         if (errno == EINTR) {
388             goto retry;
389         }
390         error_setg_errno(errp, errno, "Unable to accept connection");
391         trace_qio_channel_socket_accept_fail(ioc);
392         goto error;
393     }
394 
395     if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
396                     &cioc->localAddrLen) < 0) {
397         error_setg_errno(errp, errno,
398                          "Unable to query local socket address");
399         goto error;
400     }
401 
402 #ifndef WIN32
403     if (cioc->localAddr.ss_family == AF_UNIX) {
404         QIOChannel *ioc_local = QIO_CHANNEL(cioc);
405         qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
406     }
407 #endif /* WIN32 */
408 
409     trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
410     return cioc;
411 
412  error:
413     object_unref(OBJECT(cioc));
414     return NULL;
415 }
416 
417 static void qio_channel_socket_init(Object *obj)
418 {
419     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
420     ioc->fd = -1;
421 }
422 
423 static void qio_channel_socket_finalize(Object *obj)
424 {
425     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
426 
427     if (ioc->fd != -1) {
428         QIOChannel *ioc_local = QIO_CHANNEL(ioc);
429         if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
430             Error *err = NULL;
431 
432             socket_listen_cleanup(ioc->fd, &err);
433             if (err) {
434                 error_report_err(err);
435                 err = NULL;
436             }
437         }
438 #ifdef WIN32
439         WSAEventSelect(ioc->fd, NULL, 0);
440 #endif
441         closesocket(ioc->fd);
442         ioc->fd = -1;
443     }
444 }
445 
446 
447 #ifndef WIN32
448 static void qio_channel_socket_copy_fds(struct msghdr *msg,
449                                         int **fds, size_t *nfds)
450 {
451     struct cmsghdr *cmsg;
452 
453     *nfds = 0;
454     *fds = NULL;
455 
456     for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
457         int fd_size, i;
458         int gotfds;
459 
460         if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
461             cmsg->cmsg_level != SOL_SOCKET ||
462             cmsg->cmsg_type != SCM_RIGHTS) {
463             continue;
464         }
465 
466         fd_size = cmsg->cmsg_len - CMSG_LEN(0);
467 
468         if (!fd_size) {
469             continue;
470         }
471 
472         gotfds = fd_size / sizeof(int);
473         *fds = g_renew(int, *fds, *nfds + gotfds);
474         memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
475 
476         for (i = 0; i < gotfds; i++) {
477             int fd = (*fds)[*nfds + i];
478             if (fd < 0) {
479                 continue;
480             }
481 
482             /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
483             qemu_socket_set_block(fd);
484 
485 #ifndef MSG_CMSG_CLOEXEC
486             qemu_set_cloexec(fd);
487 #endif
488         }
489         *nfds += gotfds;
490     }
491 }
492 
493 
494 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
495                                         const struct iovec *iov,
496                                         size_t niov,
497                                         int **fds,
498                                         size_t *nfds,
499                                         Error **errp)
500 {
501     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
502     ssize_t ret;
503     struct msghdr msg = { NULL, };
504     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
505     int sflags = 0;
506 
507     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
508 
509     msg.msg_iov = (struct iovec *)iov;
510     msg.msg_iovlen = niov;
511     if (fds && nfds) {
512         msg.msg_control = control;
513         msg.msg_controllen = sizeof(control);
514 #ifdef MSG_CMSG_CLOEXEC
515         sflags |= MSG_CMSG_CLOEXEC;
516 #endif
517 
518     }
519 
520  retry:
521     ret = recvmsg(sioc->fd, &msg, sflags);
522     if (ret < 0) {
523         if (errno == EAGAIN) {
524             return QIO_CHANNEL_ERR_BLOCK;
525         }
526         if (errno == EINTR) {
527             goto retry;
528         }
529 
530         error_setg_errno(errp, errno,
531                          "Unable to read from socket");
532         return -1;
533     }
534 
535     if (fds && nfds) {
536         qio_channel_socket_copy_fds(&msg, fds, nfds);
537     }
538 
539     return ret;
540 }
541 
542 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
543                                          const struct iovec *iov,
544                                          size_t niov,
545                                          int *fds,
546                                          size_t nfds,
547                                          int flags,
548                                          Error **errp)
549 {
550     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
551     ssize_t ret;
552     struct msghdr msg = { NULL, };
553     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
554     size_t fdsize = sizeof(int) * nfds;
555     struct cmsghdr *cmsg;
556     int sflags = 0;
557 
558     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
559 
560     msg.msg_iov = (struct iovec *)iov;
561     msg.msg_iovlen = niov;
562 
563     if (nfds) {
564         if (nfds > SOCKET_MAX_FDS) {
565             error_setg_errno(errp, EINVAL,
566                              "Only %d FDs can be sent, got %zu",
567                              SOCKET_MAX_FDS, nfds);
568             return -1;
569         }
570 
571         msg.msg_control = control;
572         msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
573 
574         cmsg = CMSG_FIRSTHDR(&msg);
575         cmsg->cmsg_len = CMSG_LEN(fdsize);
576         cmsg->cmsg_level = SOL_SOCKET;
577         cmsg->cmsg_type = SCM_RIGHTS;
578         memcpy(CMSG_DATA(cmsg), fds, fdsize);
579     }
580 
581     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
582 #ifdef QEMU_MSG_ZEROCOPY
583         sflags = MSG_ZEROCOPY;
584 #else
585         /*
586          * We expect QIOChannel class entry point to have
587          * blocked this code path already
588          */
589         g_assert_not_reached();
590 #endif
591     }
592 
593  retry:
594     ret = sendmsg(sioc->fd, &msg, sflags);
595     if (ret <= 0) {
596         switch (errno) {
597         case EAGAIN:
598             return QIO_CHANNEL_ERR_BLOCK;
599         case EINTR:
600             goto retry;
601         case ENOBUFS:
602             if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
603                 error_setg_errno(errp, errno,
604                                  "Process can't lock enough memory for using MSG_ZEROCOPY");
605                 return -1;
606             }
607             break;
608         }
609 
610         error_setg_errno(errp, errno,
611                          "Unable to write to socket");
612         return -1;
613     }
614 
615     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
616         sioc->zero_copy_queued++;
617     }
618 
619     return ret;
620 }
621 #else /* WIN32 */
622 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
623                                         const struct iovec *iov,
624                                         size_t niov,
625                                         int **fds,
626                                         size_t *nfds,
627                                         Error **errp)
628 {
629     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
630     ssize_t done = 0;
631     ssize_t i;
632 
633     for (i = 0; i < niov; i++) {
634         ssize_t ret;
635     retry:
636         ret = recv(sioc->fd,
637                    iov[i].iov_base,
638                    iov[i].iov_len,
639                    0);
640         if (ret < 0) {
641             if (errno == EAGAIN) {
642                 if (done) {
643                     return done;
644                 } else {
645                     return QIO_CHANNEL_ERR_BLOCK;
646                 }
647             } else if (errno == EINTR) {
648                 goto retry;
649             } else {
650                 error_setg_errno(errp, errno,
651                                  "Unable to read from socket");
652                 return -1;
653             }
654         }
655         done += ret;
656         if (ret < iov[i].iov_len) {
657             return done;
658         }
659     }
660 
661     return done;
662 }
663 
664 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
665                                          const struct iovec *iov,
666                                          size_t niov,
667                                          int *fds,
668                                          size_t nfds,
669                                          int flags,
670                                          Error **errp)
671 {
672     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
673     ssize_t done = 0;
674     ssize_t i;
675 
676     for (i = 0; i < niov; i++) {
677         ssize_t ret;
678     retry:
679         ret = send(sioc->fd,
680                    iov[i].iov_base,
681                    iov[i].iov_len,
682                    0);
683         if (ret < 0) {
684             if (errno == EAGAIN) {
685                 if (done) {
686                     return done;
687                 } else {
688                     return QIO_CHANNEL_ERR_BLOCK;
689                 }
690             } else if (errno == EINTR) {
691                 goto retry;
692             } else {
693                 error_setg_errno(errp, errno,
694                                  "Unable to write to socket");
695                 return -1;
696             }
697         }
698         done += ret;
699         if (ret < iov[i].iov_len) {
700             return done;
701         }
702     }
703 
704     return done;
705 }
706 #endif /* WIN32 */
707 
708 
709 #ifdef QEMU_MSG_ZEROCOPY
710 static int qio_channel_socket_flush(QIOChannel *ioc,
711                                     Error **errp)
712 {
713     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
714     struct msghdr msg = {};
715     struct sock_extended_err *serr;
716     struct cmsghdr *cm;
717     char control[CMSG_SPACE(sizeof(*serr))];
718     int received;
719     int ret;
720 
721     if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
722         return 0;
723     }
724 
725     msg.msg_control = control;
726     msg.msg_controllen = sizeof(control);
727     memset(control, 0, sizeof(control));
728 
729     ret = 1;
730 
731     while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
732         received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
733         if (received < 0) {
734             switch (errno) {
735             case EAGAIN:
736                 /* Nothing on errqueue, wait until something is available */
737                 qio_channel_wait(ioc, G_IO_ERR);
738                 continue;
739             case EINTR:
740                 continue;
741             default:
742                 error_setg_errno(errp, errno,
743                                  "Unable to read errqueue");
744                 return -1;
745             }
746         }
747 
748         cm = CMSG_FIRSTHDR(&msg);
749         if (cm->cmsg_level != SOL_IP   && cm->cmsg_type != IP_RECVERR &&
750             cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
751             error_setg_errno(errp, EPROTOTYPE,
752                              "Wrong cmsg in errqueue");
753             return -1;
754         }
755 
756         serr = (void *) CMSG_DATA(cm);
757         if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
758             error_setg_errno(errp, serr->ee_errno,
759                              "Error on socket");
760             return -1;
761         }
762         if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
763             error_setg_errno(errp, serr->ee_origin,
764                              "Error not from zero copy");
765             return -1;
766         }
767 
768         /* No errors, count successfully finished sendmsg()*/
769         sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
770 
771         /* If any sendmsg() succeeded using zero copy, return 0 at the end */
772         if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
773             ret = 0;
774         }
775     }
776 
777     return ret;
778 }
779 
780 #endif /* QEMU_MSG_ZEROCOPY */
781 
782 static int
783 qio_channel_socket_set_blocking(QIOChannel *ioc,
784                                 bool enabled,
785                                 Error **errp)
786 {
787     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
788 
789     if (enabled) {
790         qemu_socket_set_block(sioc->fd);
791     } else {
792         qemu_socket_set_nonblock(sioc->fd);
793     }
794     return 0;
795 }
796 
797 
798 static void
799 qio_channel_socket_set_delay(QIOChannel *ioc,
800                              bool enabled)
801 {
802     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
803     int v = enabled ? 0 : 1;
804 
805     setsockopt(sioc->fd,
806                IPPROTO_TCP, TCP_NODELAY,
807                &v, sizeof(v));
808 }
809 
810 
811 static void
812 qio_channel_socket_set_cork(QIOChannel *ioc,
813                             bool enabled)
814 {
815     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
816     int v = enabled ? 1 : 0;
817 
818     socket_set_cork(sioc->fd, v);
819 }
820 
821 
822 static int
823 qio_channel_socket_close(QIOChannel *ioc,
824                          Error **errp)
825 {
826     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
827     int rc = 0;
828     Error *err = NULL;
829 
830     if (sioc->fd != -1) {
831 #ifdef WIN32
832         WSAEventSelect(sioc->fd, NULL, 0);
833 #endif
834         if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
835             socket_listen_cleanup(sioc->fd, errp);
836         }
837 
838         if (closesocket(sioc->fd) < 0) {
839             sioc->fd = -1;
840             error_setg_errno(&err, errno, "Unable to close socket");
841             error_propagate(errp, err);
842             return -1;
843         }
844         sioc->fd = -1;
845     }
846     return rc;
847 }
848 
849 static int
850 qio_channel_socket_shutdown(QIOChannel *ioc,
851                             QIOChannelShutdown how,
852                             Error **errp)
853 {
854     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
855     int sockhow;
856 
857     switch (how) {
858     case QIO_CHANNEL_SHUTDOWN_READ:
859         sockhow = SHUT_RD;
860         break;
861     case QIO_CHANNEL_SHUTDOWN_WRITE:
862         sockhow = SHUT_WR;
863         break;
864     case QIO_CHANNEL_SHUTDOWN_BOTH:
865     default:
866         sockhow = SHUT_RDWR;
867         break;
868     }
869 
870     if (shutdown(sioc->fd, sockhow) < 0) {
871         error_setg_errno(errp, errno,
872                          "Unable to shutdown socket");
873         return -1;
874     }
875     return 0;
876 }
877 
878 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
879                                                   AioContext *ctx,
880                                                   IOHandler *io_read,
881                                                   IOHandler *io_write,
882                                                   void *opaque)
883 {
884     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
885     aio_set_fd_handler(ctx, sioc->fd, false,
886                        io_read, io_write, NULL, NULL, opaque);
887 }
888 
889 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
890                                                 GIOCondition condition)
891 {
892     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
893     return qio_channel_create_socket_watch(ioc,
894                                            sioc->fd,
895                                            condition);
896 }
897 
898 static void qio_channel_socket_class_init(ObjectClass *klass,
899                                           void *class_data G_GNUC_UNUSED)
900 {
901     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
902 
903     ioc_klass->io_writev = qio_channel_socket_writev;
904     ioc_klass->io_readv = qio_channel_socket_readv;
905     ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
906     ioc_klass->io_close = qio_channel_socket_close;
907     ioc_klass->io_shutdown = qio_channel_socket_shutdown;
908     ioc_klass->io_set_cork = qio_channel_socket_set_cork;
909     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
910     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
911     ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
912 #ifdef QEMU_MSG_ZEROCOPY
913     ioc_klass->io_flush = qio_channel_socket_flush;
914 #endif
915 }
916 
917 static const TypeInfo qio_channel_socket_info = {
918     .parent = TYPE_QIO_CHANNEL,
919     .name = TYPE_QIO_CHANNEL_SOCKET,
920     .instance_size = sizeof(QIOChannelSocket),
921     .instance_init = qio_channel_socket_init,
922     .instance_finalize = qio_channel_socket_finalize,
923     .class_init = qio_channel_socket_class_init,
924 };
925 
926 static void qio_channel_socket_register_types(void)
927 {
928     type_register_static(&qio_channel_socket_info);
929 }
930 
931 type_init(qio_channel_socket_register_types);
932