xref: /qemu/net/stream.c (revision 75ac231c)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2022 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 
28 #include "net/net.h"
29 #include "clients.h"
30 #include "monitor/monitor.h"
31 #include "qapi/error.h"
32 #include "qemu/error-report.h"
33 #include "qemu/option.h"
34 #include "qemu/sockets.h"
35 #include "qemu/iov.h"
36 #include "qemu/main-loop.h"
37 #include "qemu/cutils.h"
38 #include "io/channel.h"
39 #include "io/channel-socket.h"
40 #include "io/net-listener.h"
41 #include "qapi/qapi-events-net.h"
42 
43 typedef struct NetStreamState {
44     NetClientState nc;
45     QIOChannel *listen_ioc;
46     QIONetListener *listener;
47     QIOChannel *ioc;
48     guint ioc_read_tag;
49     guint ioc_write_tag;
50     SocketReadState rs;
51     unsigned int send_index;      /* number of bytes sent*/
52 } NetStreamState;
53 
54 static void net_stream_listen(QIONetListener *listener,
55                               QIOChannelSocket *cioc,
56                               void *opaque);
57 
58 static gboolean net_stream_writable(QIOChannel *ioc,
59                                     GIOCondition condition,
60                                     gpointer data)
61 {
62     NetStreamState *s = data;
63 
64     s->ioc_write_tag = 0;
65 
66     qemu_flush_queued_packets(&s->nc);
67 
68     return G_SOURCE_REMOVE;
69 }
70 
71 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
72                                   size_t size)
73 {
74     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
75     uint32_t len = htonl(size);
76     struct iovec iov[] = {
77         {
78             .iov_base = &len,
79             .iov_len  = sizeof(len),
80         }, {
81             .iov_base = (void *)buf,
82             .iov_len  = size,
83         },
84     };
85     struct iovec local_iov[2];
86     unsigned int nlocal_iov;
87     size_t remaining;
88     ssize_t ret;
89 
90     remaining = iov_size(iov, 2) - s->send_index;
91     nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
92     ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
93     if (ret == QIO_CHANNEL_ERR_BLOCK) {
94         ret = 0; /* handled further down */
95     }
96     if (ret == -1) {
97         s->send_index = 0;
98         return -errno;
99     }
100     if (ret < (ssize_t)remaining) {
101         s->send_index += ret;
102         s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
103                                                  net_stream_writable, s, NULL);
104         return 0;
105     }
106     s->send_index = 0;
107     return size;
108 }
109 
110 static gboolean net_stream_send(QIOChannel *ioc,
111                                 GIOCondition condition,
112                                 gpointer data);
113 
114 static void net_stream_send_completed(NetClientState *nc, ssize_t len)
115 {
116     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
117 
118     if (!s->ioc_read_tag) {
119         s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
120                                                 net_stream_send, s, NULL);
121     }
122 }
123 
124 static void net_stream_rs_finalize(SocketReadState *rs)
125 {
126     NetStreamState *s = container_of(rs, NetStreamState, rs);
127 
128     if (qemu_send_packet_async(&s->nc, rs->buf,
129                                rs->packet_len,
130                                net_stream_send_completed) == 0) {
131         if (s->ioc_read_tag) {
132             g_source_remove(s->ioc_read_tag);
133             s->ioc_read_tag = 0;
134         }
135     }
136 }
137 
138 static gboolean net_stream_send(QIOChannel *ioc,
139                                 GIOCondition condition,
140                                 gpointer data)
141 {
142     NetStreamState *s = data;
143     int size;
144     int ret;
145     char buf1[NET_BUFSIZE];
146     const char *buf;
147 
148     size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
149     if (size < 0) {
150         if (errno != EWOULDBLOCK) {
151             goto eoc;
152         }
153     } else if (size == 0) {
154         /* end of connection */
155     eoc:
156         s->ioc_read_tag = 0;
157         if (s->ioc_write_tag) {
158             g_source_remove(s->ioc_write_tag);
159             s->ioc_write_tag = 0;
160         }
161         if (s->listener) {
162             qio_net_listener_set_client_func(s->listener, net_stream_listen,
163                                              s, NULL);
164         }
165         object_unref(OBJECT(s->ioc));
166         s->ioc = NULL;
167 
168         net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
169         s->nc.link_down = true;
170         qemu_set_info_str(&s->nc, "");
171 
172         qapi_event_send_netdev_stream_disconnected(s->nc.name);
173 
174         return G_SOURCE_REMOVE;
175     }
176     buf = buf1;
177 
178     ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
179 
180     if (ret == -1) {
181         goto eoc;
182     }
183 
184     return G_SOURCE_CONTINUE;
185 }
186 
187 static void net_stream_cleanup(NetClientState *nc)
188 {
189     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
190     if (s->ioc) {
191         if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
192             if (s->ioc_read_tag) {
193                 g_source_remove(s->ioc_read_tag);
194                 s->ioc_read_tag = 0;
195             }
196             if (s->ioc_write_tag) {
197                 g_source_remove(s->ioc_write_tag);
198                 s->ioc_write_tag = 0;
199             }
200         }
201         object_unref(OBJECT(s->ioc));
202         s->ioc = NULL;
203     }
204     if (s->listen_ioc) {
205         if (s->listener) {
206             qio_net_listener_disconnect(s->listener);
207             object_unref(OBJECT(s->listener));
208             s->listener = NULL;
209         }
210         object_unref(OBJECT(s->listen_ioc));
211         s->listen_ioc = NULL;
212     }
213 }
214 
215 static NetClientInfo net_stream_info = {
216     .type = NET_CLIENT_DRIVER_STREAM,
217     .size = sizeof(NetStreamState),
218     .receive = net_stream_receive,
219     .cleanup = net_stream_cleanup,
220 };
221 
222 static void net_stream_listen(QIONetListener *listener,
223                               QIOChannelSocket *cioc,
224                               void *opaque)
225 {
226     NetStreamState *s = opaque;
227     SocketAddress *addr;
228     char *uri;
229 
230     object_ref(OBJECT(cioc));
231 
232     qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
233 
234     s->ioc = QIO_CHANNEL(cioc);
235     qio_channel_set_name(s->ioc, "stream-server");
236     s->nc.link_down = false;
237 
238     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
239                                             s, NULL);
240 
241     if (cioc->localAddr.ss_family == AF_UNIX) {
242         addr = qio_channel_socket_get_local_address(cioc, NULL);
243     } else {
244         addr = qio_channel_socket_get_remote_address(cioc, NULL);
245     }
246     g_assert(addr != NULL);
247     uri = socket_uri(addr);
248     qemu_set_info_str(&s->nc, uri);
249     g_free(uri);
250     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
251     qapi_free_SocketAddress(addr);
252 }
253 
254 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
255 {
256     NetStreamState *s = opaque;
257     QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
258     SocketAddress *addr;
259     int ret;
260 
261     if (listen_sioc->fd < 0) {
262         qemu_set_info_str(&s->nc, "connection error");
263         return;
264     }
265 
266     addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
267     g_assert(addr != NULL);
268     ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
269     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
270         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
271                           addr->u.fd.str, -ret);
272         return;
273     }
274     g_assert(ret == 0);
275     qapi_free_SocketAddress(addr);
276 
277     s->nc.link_down = true;
278     s->listener = qio_net_listener_new();
279 
280     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
281     qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
282     qio_net_listener_add(s->listener, listen_sioc);
283 }
284 
285 static int net_stream_server_init(NetClientState *peer,
286                                   const char *model,
287                                   const char *name,
288                                   SocketAddress *addr,
289                                   Error **errp)
290 {
291     NetClientState *nc;
292     NetStreamState *s;
293     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
294 
295     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
296     s = DO_UPCAST(NetStreamState, nc, nc);
297 
298     s->listen_ioc = QIO_CHANNEL(listen_sioc);
299     qio_channel_socket_listen_async(listen_sioc, addr, 0,
300                                     net_stream_server_listening, s,
301                                     NULL, NULL);
302 
303     return 0;
304 }
305 
306 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
307 {
308     NetStreamState *s = opaque;
309     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
310     SocketAddress *addr;
311     gchar *uri;
312     int ret;
313 
314     if (sioc->fd < 0) {
315         qemu_set_info_str(&s->nc, "connection error");
316         goto error;
317     }
318 
319     addr = qio_channel_socket_get_remote_address(sioc, NULL);
320     g_assert(addr != NULL);
321     uri = socket_uri(addr);
322     qemu_set_info_str(&s->nc, uri);
323     g_free(uri);
324 
325     ret = qemu_socket_try_set_nonblock(sioc->fd);
326     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
327         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
328                           addr->u.fd.str, -ret);
329         qapi_free_SocketAddress(addr);
330         goto error;
331     }
332     g_assert(ret == 0);
333 
334     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
335 
336     /* Disable Nagle algorithm on TCP sockets to reduce latency */
337     qio_channel_set_delay(s->ioc, false);
338 
339     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
340                                             s, NULL);
341     s->nc.link_down = false;
342     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
343     qapi_free_SocketAddress(addr);
344 
345     return;
346 error:
347     object_unref(OBJECT(s->ioc));
348     s->ioc = NULL;
349 }
350 
351 static int net_stream_client_init(NetClientState *peer,
352                                   const char *model,
353                                   const char *name,
354                                   SocketAddress *addr,
355                                   Error **errp)
356 {
357     NetStreamState *s;
358     NetClientState *nc;
359     QIOChannelSocket *sioc = qio_channel_socket_new();
360 
361     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
362     s = DO_UPCAST(NetStreamState, nc, nc);
363 
364     s->ioc = QIO_CHANNEL(sioc);
365     s->nc.link_down = true;
366 
367     qio_channel_socket_connect_async(sioc, addr,
368                                      net_stream_client_connected, s,
369                                      NULL, NULL);
370 
371     return 0;
372 }
373 
374 int net_init_stream(const Netdev *netdev, const char *name,
375                     NetClientState *peer, Error **errp)
376 {
377     const NetdevStreamOptions *sock;
378 
379     assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
380     sock = &netdev->u.stream;
381 
382     if (!sock->has_server || !sock->server) {
383         return net_stream_client_init(peer, "stream", name, sock->addr, errp);
384     }
385     return net_stream_server_init(peer, "stream", name, sock->addr, errp);
386 }
387