xref: /qemu/tests/vhost-user-bridge.c (revision 33848cee)
1 /*
2  * Vhost User Bridge
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * Authors:
7  *  Victor Kaplansky <victork@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 /*
14  * TODO:
15  *     - main should get parameters from the command line.
16  *     - implement all request handlers. Still not implemented:
17  *          vubr_get_queue_num_exec()
18  *          vubr_send_rarp_exec()
19  *     - test for broken requests and virtqueue.
20  *     - implement features defined by Virtio 1.0 spec.
21  *     - support mergeable buffers and indirect descriptors.
22  *     - implement clean shutdown.
23  *     - implement non-blocking writes to UDP backend.
24  *     - implement polling strategy.
25  *     - implement clean starting/stopping of vq processing
26  *     - implement clean starting/stopping of used and buffers
27  *       dirty page logging.
28  */
29 
30 #define _FILE_OFFSET_BITS 64
31 
32 #include "qemu/osdep.h"
33 #include "qemu/iov.h"
34 #include "standard-headers/linux/virtio_net.h"
35 #include "contrib/libvhost-user/libvhost-user.h"
36 
37 #define VHOST_USER_BRIDGE_DEBUG 1
38 
39 #define DPRINT(...) \
40     do { \
41         if (VHOST_USER_BRIDGE_DEBUG) { \
42             printf(__VA_ARGS__); \
43         } \
44     } while (0)
45 
46 typedef void (*CallbackFunc)(int sock, void *ctx);
47 
48 typedef struct Event {
49     void *ctx;
50     CallbackFunc callback;
51 } Event;
52 
53 typedef struct Dispatcher {
54     int max_sock;
55     fd_set fdset;
56     Event events[FD_SETSIZE];
57 } Dispatcher;
58 
59 typedef struct VubrDev {
60     VuDev vudev;
61     Dispatcher dispatcher;
62     int backend_udp_sock;
63     struct sockaddr_in backend_udp_dest;
64     int hdrlen;
65     int sock;
66     int ready;
67     int quit;
68 } VubrDev;
69 
70 static void
71 vubr_die(const char *s)
72 {
73     perror(s);
74     exit(1);
75 }
76 
77 static int
78 dispatcher_init(Dispatcher *dispr)
79 {
80     FD_ZERO(&dispr->fdset);
81     dispr->max_sock = -1;
82     return 0;
83 }
84 
85 static int
86 dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb)
87 {
88     if (sock >= FD_SETSIZE) {
89         fprintf(stderr,
90                 "Error: Failed to add new event. sock %d should be less than %d\n",
91                 sock, FD_SETSIZE);
92         return -1;
93     }
94 
95     dispr->events[sock].ctx = ctx;
96     dispr->events[sock].callback = cb;
97 
98     FD_SET(sock, &dispr->fdset);
99     if (sock > dispr->max_sock) {
100         dispr->max_sock = sock;
101     }
102     DPRINT("Added sock %d for watching. max_sock: %d\n",
103            sock, dispr->max_sock);
104     return 0;
105 }
106 
107 static int
108 dispatcher_remove(Dispatcher *dispr, int sock)
109 {
110     if (sock >= FD_SETSIZE) {
111         fprintf(stderr,
112                 "Error: Failed to remove event. sock %d should be less than %d\n",
113                 sock, FD_SETSIZE);
114         return -1;
115     }
116 
117     FD_CLR(sock, &dispr->fdset);
118     DPRINT("Sock %d removed from dispatcher watch.\n", sock);
119     return 0;
120 }
121 
122 /* timeout in us */
123 static int
124 dispatcher_wait(Dispatcher *dispr, uint32_t timeout)
125 {
126     struct timeval tv;
127     tv.tv_sec = timeout / 1000000;
128     tv.tv_usec = timeout % 1000000;
129 
130     fd_set fdset = dispr->fdset;
131 
132     /* wait until some of sockets become readable. */
133     int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv);
134 
135     if (rc == -1) {
136         vubr_die("select");
137     }
138 
139     /* Timeout */
140     if (rc == 0) {
141         return 0;
142     }
143 
144     /* Now call callback for every ready socket. */
145 
146     int sock;
147     for (sock = 0; sock < dispr->max_sock + 1; sock++) {
148         /* The callback on a socket can remove other sockets from the
149          * dispatcher, thus we have to check that the socket is
150          * still not removed from dispatcher's list
151          */
152         if (FD_ISSET(sock, &fdset) && FD_ISSET(sock, &dispr->fdset)) {
153             Event *e = &dispr->events[sock];
154             e->callback(sock, e->ctx);
155         }
156     }
157 
158     return 0;
159 }
160 
161 static void
162 vubr_handle_tx(VuDev *dev, int qidx)
163 {
164     VuVirtq *vq = vu_get_queue(dev, qidx);
165     VubrDev *vubr = container_of(dev, VubrDev, vudev);
166     int hdrlen = vubr->hdrlen;
167     VuVirtqElement *elem = NULL;
168 
169     assert(qidx % 2);
170 
171     for (;;) {
172         ssize_t ret;
173         unsigned int out_num;
174         struct iovec sg[VIRTQUEUE_MAX_SIZE], *out_sg;
175 
176         elem = vu_queue_pop(dev, vq, sizeof(VuVirtqElement));
177         if (!elem) {
178             break;
179         }
180 
181         out_num = elem->out_num;
182         out_sg = elem->out_sg;
183         if (out_num < 1) {
184             fprintf(stderr, "virtio-net header not in first element\n");
185             break;
186         }
187         if (VHOST_USER_BRIDGE_DEBUG) {
188             iov_hexdump(out_sg, out_num, stderr, "TX:", 1024);
189         }
190 
191         if (hdrlen) {
192             unsigned sg_num = iov_copy(sg, ARRAY_SIZE(sg),
193                                        out_sg, out_num,
194                                        hdrlen, -1);
195             out_num = sg_num;
196             out_sg = sg;
197         }
198 
199         struct msghdr msg = {
200             .msg_name = (struct sockaddr *) &vubr->backend_udp_dest,
201             .msg_namelen = sizeof(struct sockaddr_in),
202             .msg_iov = out_sg,
203             .msg_iovlen = out_num,
204         };
205         do {
206             ret = sendmsg(vubr->backend_udp_sock, &msg, 0);
207         } while (ret == -1 && (errno == EAGAIN || errno == EINTR));
208 
209         if (ret == -1) {
210             vubr_die("sendmsg()");
211         }
212 
213         vu_queue_push(dev, vq, elem, 0);
214         vu_queue_notify(dev, vq);
215 
216         free(elem);
217         elem = NULL;
218     }
219 
220     free(elem);
221 }
222 
223 static void
224 iov_restore_front(struct iovec *front, struct iovec *iov, size_t bytes)
225 {
226     struct iovec *cur;
227 
228     for (cur = front; front != iov; cur++) {
229         bytes -= cur->iov_len;
230     }
231 
232     cur->iov_base -= bytes;
233     cur->iov_len += bytes;
234 }
235 
236 static void
237 iov_truncate(struct iovec *iov, unsigned iovc, size_t bytes)
238 {
239     unsigned i;
240 
241     for (i = 0; i < iovc; i++, iov++) {
242         if (bytes < iov->iov_len) {
243             iov->iov_len = bytes;
244             return;
245         }
246 
247         bytes -= iov->iov_len;
248     }
249 
250     assert(!"couldn't truncate iov");
251 }
252 
253 static void
254 vubr_backend_recv_cb(int sock, void *ctx)
255 {
256     VubrDev *vubr = (VubrDev *) ctx;
257     VuDev *dev = &vubr->vudev;
258     VuVirtq *vq = vu_get_queue(dev, 0);
259     VuVirtqElement *elem = NULL;
260     struct iovec mhdr_sg[VIRTQUEUE_MAX_SIZE];
261     struct virtio_net_hdr_mrg_rxbuf mhdr;
262     unsigned mhdr_cnt = 0;
263     int hdrlen = vubr->hdrlen;
264     int i = 0;
265     struct virtio_net_hdr hdr = {
266         .flags = 0,
267         .gso_type = VIRTIO_NET_HDR_GSO_NONE
268     };
269 
270     DPRINT("\n\n   ***   IN UDP RECEIVE CALLBACK    ***\n\n");
271     DPRINT("    hdrlen = %d\n", hdrlen);
272 
273     if (!vu_queue_enabled(dev, vq) ||
274         !vu_queue_avail_bytes(dev, vq, hdrlen, 0)) {
275         DPRINT("Got UDP packet, but no available descriptors on RX virtq.\n");
276         return;
277     }
278 
279     do {
280         struct iovec *sg;
281         ssize_t ret, total = 0;
282         unsigned int num;
283 
284         elem = vu_queue_pop(dev, vq, sizeof(VuVirtqElement));
285         if (!elem) {
286             break;
287         }
288 
289         if (elem->in_num < 1) {
290             fprintf(stderr, "virtio-net contains no in buffers\n");
291             break;
292         }
293 
294         sg = elem->in_sg;
295         num = elem->in_num;
296         if (i == 0) {
297             if (hdrlen == 12) {
298                 mhdr_cnt = iov_copy(mhdr_sg, ARRAY_SIZE(mhdr_sg),
299                                     sg, elem->in_num,
300                                     offsetof(typeof(mhdr), num_buffers),
301                                     sizeof(mhdr.num_buffers));
302             }
303             iov_from_buf(sg, elem->in_num, 0, &hdr, sizeof hdr);
304             total += hdrlen;
305             assert(iov_discard_front(&sg, &num, hdrlen) == hdrlen);
306         }
307 
308         struct msghdr msg = {
309             .msg_name = (struct sockaddr *) &vubr->backend_udp_dest,
310             .msg_namelen = sizeof(struct sockaddr_in),
311             .msg_iov = sg,
312             .msg_iovlen = elem->in_num,
313             .msg_flags = MSG_DONTWAIT,
314         };
315         do {
316             ret = recvmsg(vubr->backend_udp_sock, &msg, 0);
317         } while (ret == -1 && (errno == EINTR));
318 
319         if (i == 0) {
320             iov_restore_front(elem->in_sg, sg, hdrlen);
321         }
322 
323         if (ret == -1) {
324             if (errno == EWOULDBLOCK) {
325                 vu_queue_rewind(dev, vq, 1);
326                 break;
327             }
328 
329             vubr_die("recvmsg()");
330         }
331 
332         total += ret;
333         iov_truncate(elem->in_sg, elem->in_num, total);
334         vu_queue_fill(dev, vq, elem, total, i++);
335 
336         free(elem);
337         elem = NULL;
338     } while (false); /* could loop if DONTWAIT worked? */
339 
340     if (mhdr_cnt) {
341         mhdr.num_buffers = i;
342         iov_from_buf(mhdr_sg, mhdr_cnt,
343                      0,
344                      &mhdr.num_buffers, sizeof mhdr.num_buffers);
345     }
346 
347     vu_queue_flush(dev, vq, i);
348     vu_queue_notify(dev, vq);
349 
350     free(elem);
351 }
352 
353 static void
354 vubr_receive_cb(int sock, void *ctx)
355 {
356     VubrDev *vubr = (VubrDev *)ctx;
357 
358     if (!vu_dispatch(&vubr->vudev)) {
359         fprintf(stderr, "Error while dispatching\n");
360     }
361 }
362 
363 typedef struct WatchData {
364     VuDev *dev;
365     vu_watch_cb cb;
366     void *data;
367 } WatchData;
368 
369 static void
370 watch_cb(int sock, void *ctx)
371 {
372     struct WatchData *wd = ctx;
373 
374     wd->cb(wd->dev, VU_WATCH_IN, wd->data);
375 }
376 
377 static void
378 vubr_set_watch(VuDev *dev, int fd, int condition,
379                vu_watch_cb cb, void *data)
380 {
381     VubrDev *vubr = container_of(dev, VubrDev, vudev);
382     static WatchData watches[FD_SETSIZE];
383     struct WatchData *wd = &watches[fd];
384 
385     wd->cb = cb;
386     wd->data = data;
387     wd->dev = dev;
388     dispatcher_add(&vubr->dispatcher, fd, wd, watch_cb);
389 }
390 
391 static void
392 vubr_remove_watch(VuDev *dev, int fd)
393 {
394     VubrDev *vubr = container_of(dev, VubrDev, vudev);
395 
396     dispatcher_remove(&vubr->dispatcher, fd);
397 }
398 
399 static int
400 vubr_send_rarp_exec(VuDev *dev, VhostUserMsg *vmsg)
401 {
402     DPRINT("Function %s() not implemented yet.\n", __func__);
403     return 0;
404 }
405 
406 static int
407 vubr_process_msg(VuDev *dev, VhostUserMsg *vmsg, int *do_reply)
408 {
409     switch (vmsg->request) {
410     case VHOST_USER_SEND_RARP:
411         *do_reply = vubr_send_rarp_exec(dev, vmsg);
412         return 1;
413     default:
414         /* let the library handle the rest */
415         return 0;
416     }
417 
418     return 0;
419 }
420 
421 static void
422 vubr_set_features(VuDev *dev, uint64_t features)
423 {
424     VubrDev *vubr = container_of(dev, VubrDev, vudev);
425 
426     if ((features & (1ULL << VIRTIO_F_VERSION_1)) ||
427         (features & (1ULL << VIRTIO_NET_F_MRG_RXBUF))) {
428         vubr->hdrlen = 12;
429     } else {
430         vubr->hdrlen = 10;
431     }
432 }
433 
434 static uint64_t
435 vubr_get_features(VuDev *dev)
436 {
437     return 1ULL << VIRTIO_NET_F_GUEST_ANNOUNCE |
438         1ULL << VIRTIO_NET_F_MRG_RXBUF;
439 }
440 
441 static void
442 vubr_queue_set_started(VuDev *dev, int qidx, bool started)
443 {
444     VuVirtq *vq = vu_get_queue(dev, qidx);
445 
446     if (qidx % 2 == 1) {
447         vu_set_queue_handler(dev, vq, started ? vubr_handle_tx : NULL);
448     }
449 }
450 
451 static void
452 vubr_panic(VuDev *dev, const char *msg)
453 {
454     VubrDev *vubr = container_of(dev, VubrDev, vudev);
455 
456     fprintf(stderr, "PANIC: %s\n", msg);
457 
458     dispatcher_remove(&vubr->dispatcher, dev->sock);
459     vubr->quit = 1;
460 }
461 
462 static const VuDevIface vuiface = {
463     .get_features = vubr_get_features,
464     .set_features = vubr_set_features,
465     .process_msg = vubr_process_msg,
466     .queue_set_started = vubr_queue_set_started,
467 };
468 
469 static void
470 vubr_accept_cb(int sock, void *ctx)
471 {
472     VubrDev *dev = (VubrDev *)ctx;
473     int conn_fd;
474     struct sockaddr_un un;
475     socklen_t len = sizeof(un);
476 
477     conn_fd = accept(sock, (struct sockaddr *) &un, &len);
478     if (conn_fd == -1) {
479         vubr_die("accept()");
480     }
481     DPRINT("Got connection from remote peer on sock %d\n", conn_fd);
482 
483     vu_init(&dev->vudev,
484             conn_fd,
485             vubr_panic,
486             vubr_set_watch,
487             vubr_remove_watch,
488             &vuiface);
489 
490     dispatcher_add(&dev->dispatcher, conn_fd, ctx, vubr_receive_cb);
491     dispatcher_remove(&dev->dispatcher, sock);
492 }
493 
494 static VubrDev *
495 vubr_new(const char *path, bool client)
496 {
497     VubrDev *dev = (VubrDev *) calloc(1, sizeof(VubrDev));
498     struct sockaddr_un un;
499     CallbackFunc cb;
500     size_t len;
501 
502     /* Get a UNIX socket. */
503     dev->sock = socket(AF_UNIX, SOCK_STREAM, 0);
504     if (dev->sock == -1) {
505         vubr_die("socket");
506     }
507 
508     un.sun_family = AF_UNIX;
509     strcpy(un.sun_path, path);
510     len = sizeof(un.sun_family) + strlen(path);
511 
512     if (!client) {
513         unlink(path);
514 
515         if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) {
516             vubr_die("bind");
517         }
518 
519         if (listen(dev->sock, 1) == -1) {
520             vubr_die("listen");
521         }
522         cb = vubr_accept_cb;
523 
524         DPRINT("Waiting for connections on UNIX socket %s ...\n", path);
525     } else {
526         if (connect(dev->sock, (struct sockaddr *)&un, len) == -1) {
527             vubr_die("connect");
528         }
529         vu_init(&dev->vudev,
530                 dev->sock,
531                 vubr_panic,
532                 vubr_set_watch,
533                 vubr_remove_watch,
534                 &vuiface);
535         cb = vubr_receive_cb;
536     }
537 
538     dispatcher_init(&dev->dispatcher);
539 
540     dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev, cb);
541 
542     return dev;
543 }
544 
545 static void
546 vubr_set_host(struct sockaddr_in *saddr, const char *host)
547 {
548     if (isdigit(host[0])) {
549         if (!inet_aton(host, &saddr->sin_addr)) {
550             fprintf(stderr, "inet_aton() failed.\n");
551             exit(1);
552         }
553     } else {
554         struct hostent *he = gethostbyname(host);
555 
556         if (!he) {
557             fprintf(stderr, "gethostbyname() failed.\n");
558             exit(1);
559         }
560         saddr->sin_addr = *(struct in_addr *)he->h_addr;
561     }
562 }
563 
564 static void
565 vubr_backend_udp_setup(VubrDev *dev,
566                        const char *local_host,
567                        const char *local_port,
568                        const char *remote_host,
569                        const char *remote_port)
570 {
571     int sock;
572     const char *r;
573 
574     int lport, rport;
575 
576     lport = strtol(local_port, (char **)&r, 0);
577     if (r == local_port) {
578         fprintf(stderr, "lport parsing failed.\n");
579         exit(1);
580     }
581 
582     rport = strtol(remote_port, (char **)&r, 0);
583     if (r == remote_port) {
584         fprintf(stderr, "rport parsing failed.\n");
585         exit(1);
586     }
587 
588     struct sockaddr_in si_local = {
589         .sin_family = AF_INET,
590         .sin_port = htons(lport),
591     };
592 
593     vubr_set_host(&si_local, local_host);
594 
595     /* setup destination for sends */
596     dev->backend_udp_dest = (struct sockaddr_in) {
597         .sin_family = AF_INET,
598         .sin_port = htons(rport),
599     };
600     vubr_set_host(&dev->backend_udp_dest, remote_host);
601 
602     sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
603     if (sock == -1) {
604         vubr_die("socket");
605     }
606 
607     if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) {
608         vubr_die("bind");
609     }
610 
611     dev->backend_udp_sock = sock;
612     dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb);
613     DPRINT("Waiting for data from udp backend on %s:%d...\n",
614            local_host, lport);
615 }
616 
617 static void
618 vubr_run(VubrDev *dev)
619 {
620     while (!dev->quit) {
621         /* timeout 200ms */
622         dispatcher_wait(&dev->dispatcher, 200000);
623         /* Here one can try polling strategy. */
624     }
625 }
626 
627 static int
628 vubr_parse_host_port(const char **host, const char **port, const char *buf)
629 {
630     char *p = strchr(buf, ':');
631 
632     if (!p) {
633         return -1;
634     }
635     *p = '\0';
636     *host = strdup(buf);
637     *port = strdup(p + 1);
638     return 0;
639 }
640 
641 #define DEFAULT_UD_SOCKET "/tmp/vubr.sock"
642 #define DEFAULT_LHOST "127.0.0.1"
643 #define DEFAULT_LPORT "4444"
644 #define DEFAULT_RHOST "127.0.0.1"
645 #define DEFAULT_RPORT "5555"
646 
647 static const char *ud_socket_path = DEFAULT_UD_SOCKET;
648 static const char *lhost = DEFAULT_LHOST;
649 static const char *lport = DEFAULT_LPORT;
650 static const char *rhost = DEFAULT_RHOST;
651 static const char *rport = DEFAULT_RPORT;
652 
653 int
654 main(int argc, char *argv[])
655 {
656     VubrDev *dev;
657     int opt;
658     bool client = false;
659 
660     while ((opt = getopt(argc, argv, "l:r:u:c")) != -1) {
661 
662         switch (opt) {
663         case 'l':
664             if (vubr_parse_host_port(&lhost, &lport, optarg) < 0) {
665                 goto out;
666             }
667             break;
668         case 'r':
669             if (vubr_parse_host_port(&rhost, &rport, optarg) < 0) {
670                 goto out;
671             }
672             break;
673         case 'u':
674             ud_socket_path = strdup(optarg);
675             break;
676         case 'c':
677             client = true;
678             break;
679         default:
680             goto out;
681         }
682     }
683 
684     DPRINT("ud socket: %s (%s)\n", ud_socket_path,
685            client ? "client" : "server");
686     DPRINT("local:     %s:%s\n", lhost, lport);
687     DPRINT("remote:    %s:%s\n", rhost, rport);
688 
689     dev = vubr_new(ud_socket_path, client);
690     if (!dev) {
691         return 1;
692     }
693 
694     vubr_backend_udp_setup(dev, lhost, lport, rhost, rport);
695     vubr_run(dev);
696 
697     vu_deinit(&dev->vudev);
698 
699     return 0;
700 
701 out:
702     fprintf(stderr, "Usage: %s ", argv[0]);
703     fprintf(stderr, "[-c] [-u ud_socket_path] [-l lhost:lport] [-r rhost:rport]\n");
704     fprintf(stderr, "\t-u path to unix doman socket. default: %s\n",
705             DEFAULT_UD_SOCKET);
706     fprintf(stderr, "\t-l local host and port. default: %s:%s\n",
707             DEFAULT_LHOST, DEFAULT_LPORT);
708     fprintf(stderr, "\t-r remote host and port. default: %s:%s\n",
709             DEFAULT_RHOST, DEFAULT_RPORT);
710     fprintf(stderr, "\t-c client mode\n");
711 
712     return 1;
713 }
714