xref: /qemu/net/stream.c (revision 73b49878)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2022 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 
28 #include "net/net.h"
29 #include "clients.h"
30 #include "monitor/monitor.h"
31 #include "qapi/error.h"
32 #include "qemu/error-report.h"
33 #include "qemu/option.h"
34 #include "qemu/sockets.h"
35 #include "qemu/iov.h"
36 #include "qemu/main-loop.h"
37 #include "qemu/cutils.h"
38 #include "io/channel.h"
39 #include "io/channel-socket.h"
40 #include "io/net-listener.h"
41 #include "qapi/qapi-events-net.h"
42 #include "qapi/qapi-visit-sockets.h"
43 #include "qapi/clone-visitor.h"
44 
45 typedef struct NetStreamState {
46     NetClientState nc;
47     QIOChannel *listen_ioc;
48     QIONetListener *listener;
49     QIOChannel *ioc;
50     guint ioc_read_tag;
51     guint ioc_write_tag;
52     SocketReadState rs;
53     unsigned int send_index;      /* number of bytes sent*/
54     uint32_t reconnect;
55     guint timer_tag;
56     SocketAddress *addr;
57 } NetStreamState;
58 
59 static void net_stream_listen(QIONetListener *listener,
60                               QIOChannelSocket *cioc,
61                               void *opaque);
62 static void net_stream_arm_reconnect(NetStreamState *s);
63 
64 static gboolean net_stream_writable(QIOChannel *ioc,
65                                     GIOCondition condition,
66                                     gpointer data)
67 {
68     NetStreamState *s = data;
69 
70     s->ioc_write_tag = 0;
71 
72     qemu_flush_queued_packets(&s->nc);
73 
74     return G_SOURCE_REMOVE;
75 }
76 
77 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
78                                   size_t size)
79 {
80     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
81     uint32_t len = htonl(size);
82     struct iovec iov[] = {
83         {
84             .iov_base = &len,
85             .iov_len  = sizeof(len),
86         }, {
87             .iov_base = (void *)buf,
88             .iov_len  = size,
89         },
90     };
91     struct iovec local_iov[2];
92     unsigned int nlocal_iov;
93     size_t remaining;
94     ssize_t ret;
95 
96     remaining = iov_size(iov, 2) - s->send_index;
97     nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
98     ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
99     if (ret == QIO_CHANNEL_ERR_BLOCK) {
100         ret = 0; /* handled further down */
101     }
102     if (ret == -1) {
103         s->send_index = 0;
104         return -errno;
105     }
106     if (ret < (ssize_t)remaining) {
107         s->send_index += ret;
108         s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
109                                                  net_stream_writable, s, NULL);
110         return 0;
111     }
112     s->send_index = 0;
113     return size;
114 }
115 
116 static gboolean net_stream_send(QIOChannel *ioc,
117                                 GIOCondition condition,
118                                 gpointer data);
119 
120 static void net_stream_send_completed(NetClientState *nc, ssize_t len)
121 {
122     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
123 
124     if (!s->ioc_read_tag) {
125         s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
126                                                 net_stream_send, s, NULL);
127     }
128 }
129 
130 static void net_stream_rs_finalize(SocketReadState *rs)
131 {
132     NetStreamState *s = container_of(rs, NetStreamState, rs);
133 
134     if (qemu_send_packet_async(&s->nc, rs->buf,
135                                rs->packet_len,
136                                net_stream_send_completed) == 0) {
137         if (s->ioc_read_tag) {
138             g_source_remove(s->ioc_read_tag);
139             s->ioc_read_tag = 0;
140         }
141     }
142 }
143 
144 static gboolean net_stream_send(QIOChannel *ioc,
145                                 GIOCondition condition,
146                                 gpointer data)
147 {
148     NetStreamState *s = data;
149     int size;
150     int ret;
151     char buf1[NET_BUFSIZE];
152     const char *buf;
153 
154     size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
155     if (size < 0) {
156         if (errno != EWOULDBLOCK) {
157             goto eoc;
158         }
159     } else if (size == 0) {
160         /* end of connection */
161     eoc:
162         s->ioc_read_tag = 0;
163         if (s->ioc_write_tag) {
164             g_source_remove(s->ioc_write_tag);
165             s->ioc_write_tag = 0;
166         }
167         if (s->listener) {
168             qemu_set_info_str(&s->nc, "listening");
169             qio_net_listener_set_client_func(s->listener, net_stream_listen,
170                                              s, NULL);
171         }
172         object_unref(OBJECT(s->ioc));
173         s->ioc = NULL;
174 
175         net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
176         s->nc.link_down = true;
177 
178         qapi_event_send_netdev_stream_disconnected(s->nc.name);
179         net_stream_arm_reconnect(s);
180 
181         return G_SOURCE_REMOVE;
182     }
183     buf = buf1;
184 
185     ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
186 
187     if (ret == -1) {
188         goto eoc;
189     }
190 
191     return G_SOURCE_CONTINUE;
192 }
193 
194 static void net_stream_cleanup(NetClientState *nc)
195 {
196     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
197     if (s->timer_tag) {
198         g_source_remove(s->timer_tag);
199         s->timer_tag = 0;
200     }
201     if (s->addr) {
202         qapi_free_SocketAddress(s->addr);
203         s->addr = NULL;
204     }
205     if (s->ioc) {
206         if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
207             if (s->ioc_read_tag) {
208                 g_source_remove(s->ioc_read_tag);
209                 s->ioc_read_tag = 0;
210             }
211             if (s->ioc_write_tag) {
212                 g_source_remove(s->ioc_write_tag);
213                 s->ioc_write_tag = 0;
214             }
215         }
216         object_unref(OBJECT(s->ioc));
217         s->ioc = NULL;
218     }
219     if (s->listen_ioc) {
220         if (s->listener) {
221             qio_net_listener_disconnect(s->listener);
222             object_unref(OBJECT(s->listener));
223             s->listener = NULL;
224         }
225         object_unref(OBJECT(s->listen_ioc));
226         s->listen_ioc = NULL;
227     }
228 }
229 
230 static NetClientInfo net_stream_info = {
231     .type = NET_CLIENT_DRIVER_STREAM,
232     .size = sizeof(NetStreamState),
233     .receive = net_stream_receive,
234     .cleanup = net_stream_cleanup,
235 };
236 
237 static void net_stream_listen(QIONetListener *listener,
238                               QIOChannelSocket *cioc,
239                               void *opaque)
240 {
241     NetStreamState *s = opaque;
242     SocketAddress *addr;
243     char *uri;
244 
245     object_ref(OBJECT(cioc));
246 
247     qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
248 
249     s->ioc = QIO_CHANNEL(cioc);
250     qio_channel_set_name(s->ioc, "stream-server");
251     s->nc.link_down = false;
252 
253     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
254                                             s, NULL);
255 
256     if (cioc->localAddr.ss_family == AF_UNIX) {
257         addr = qio_channel_socket_get_local_address(cioc, NULL);
258     } else {
259         addr = qio_channel_socket_get_remote_address(cioc, NULL);
260     }
261     g_assert(addr != NULL);
262     uri = socket_uri(addr);
263     qemu_set_info_str(&s->nc, "%s", uri);
264     g_free(uri);
265     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
266     qapi_free_SocketAddress(addr);
267 }
268 
269 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
270 {
271     NetStreamState *s = opaque;
272     QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
273     SocketAddress *addr;
274     int ret;
275     Error *err = NULL;
276 
277     if (qio_task_propagate_error(task, &err)) {
278         qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
279         error_free(err);
280         return;
281     }
282 
283     addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
284     g_assert(addr != NULL);
285     ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
286     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
287         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
288                           addr->u.fd.str, -ret);
289         return;
290     }
291     g_assert(ret == 0);
292     qapi_free_SocketAddress(addr);
293 
294     s->nc.link_down = true;
295     s->listener = qio_net_listener_new();
296 
297     qemu_set_info_str(&s->nc, "listening");
298     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
299     qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
300     qio_net_listener_add(s->listener, listen_sioc);
301 }
302 
303 static int net_stream_server_init(NetClientState *peer,
304                                   const char *model,
305                                   const char *name,
306                                   SocketAddress *addr,
307                                   Error **errp)
308 {
309     NetClientState *nc;
310     NetStreamState *s;
311     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
312 
313     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
314     s = DO_UPCAST(NetStreamState, nc, nc);
315     qemu_set_info_str(&s->nc, "initializing");
316 
317     s->listen_ioc = QIO_CHANNEL(listen_sioc);
318     qio_channel_socket_listen_async(listen_sioc, addr, 0,
319                                     net_stream_server_listening, s,
320                                     NULL, NULL);
321 
322     return 0;
323 }
324 
325 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
326 {
327     NetStreamState *s = opaque;
328     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
329     SocketAddress *addr;
330     gchar *uri;
331     int ret;
332     Error *err = NULL;
333 
334     if (qio_task_propagate_error(task, &err)) {
335         qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
336         error_free(err);
337         goto error;
338     }
339 
340     addr = qio_channel_socket_get_remote_address(sioc, NULL);
341     g_assert(addr != NULL);
342     uri = socket_uri(addr);
343     qemu_set_info_str(&s->nc, "%s", uri);
344     g_free(uri);
345 
346     ret = qemu_socket_try_set_nonblock(sioc->fd);
347     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
348         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
349                           addr->u.fd.str, -ret);
350         qapi_free_SocketAddress(addr);
351         goto error;
352     }
353     g_assert(ret == 0);
354 
355     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
356 
357     /* Disable Nagle algorithm on TCP sockets to reduce latency */
358     qio_channel_set_delay(s->ioc, false);
359 
360     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
361                                             s, NULL);
362     s->nc.link_down = false;
363     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
364     qapi_free_SocketAddress(addr);
365 
366     return;
367 error:
368     object_unref(OBJECT(s->ioc));
369     s->ioc = NULL;
370     net_stream_arm_reconnect(s);
371 }
372 
373 static gboolean net_stream_reconnect(gpointer data)
374 {
375     NetStreamState *s = data;
376     QIOChannelSocket *sioc;
377 
378     s->timer_tag = 0;
379 
380     sioc = qio_channel_socket_new();
381     s->ioc = QIO_CHANNEL(sioc);
382     qio_channel_socket_connect_async(sioc, s->addr,
383                                      net_stream_client_connected, s,
384                                      NULL, NULL);
385     return G_SOURCE_REMOVE;
386 }
387 
388 static void net_stream_arm_reconnect(NetStreamState *s)
389 {
390     if (s->reconnect && s->timer_tag == 0) {
391         qemu_set_info_str(&s->nc, "connecting");
392         s->timer_tag = g_timeout_add_seconds(s->reconnect,
393                                              net_stream_reconnect, s);
394     }
395 }
396 
397 static int net_stream_client_init(NetClientState *peer,
398                                   const char *model,
399                                   const char *name,
400                                   SocketAddress *addr,
401                                   uint32_t reconnect,
402                                   Error **errp)
403 {
404     NetStreamState *s;
405     NetClientState *nc;
406     QIOChannelSocket *sioc = qio_channel_socket_new();
407 
408     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
409     s = DO_UPCAST(NetStreamState, nc, nc);
410     qemu_set_info_str(&s->nc, "connecting");
411 
412     s->ioc = QIO_CHANNEL(sioc);
413     s->nc.link_down = true;
414 
415     s->reconnect = reconnect;
416     if (reconnect) {
417         s->addr = QAPI_CLONE(SocketAddress, addr);
418     }
419     qio_channel_socket_connect_async(sioc, addr,
420                                      net_stream_client_connected, s,
421                                      NULL, NULL);
422 
423     return 0;
424 }
425 
426 int net_init_stream(const Netdev *netdev, const char *name,
427                     NetClientState *peer, Error **errp)
428 {
429     const NetdevStreamOptions *sock;
430 
431     assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
432     sock = &netdev->u.stream;
433 
434     if (!sock->has_server || !sock->server) {
435         return net_stream_client_init(peer, "stream", name, sock->addr,
436                                       sock->has_reconnect ? sock->reconnect : 0,
437                                       errp);
438     }
439     if (sock->has_reconnect) {
440         error_setg(errp, "'reconnect' option is incompatible with "
441                          "socket in server mode");
442         return -1;
443     }
444     return net_stream_server_init(peer, "stream", name, sock->addr, errp);
445 }
446