xref: /qemu/net/af-xdp.c (revision ac132d05)
1 /*
2  * AF_XDP network backend.
3  *
4  * Copyright (c) 2023 Red Hat, Inc.
5  *
6  * Authors:
7  *  Ilya Maximets <i.maximets@ovn.org>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 
14 #include "qemu/osdep.h"
15 #include <bpf/bpf.h>
16 #include <inttypes.h>
17 #include <linux/if_link.h>
18 #include <linux/if_xdp.h>
19 #include <net/if.h>
20 #include <xdp/xsk.h>
21 
22 #include "clients.h"
23 #include "monitor/monitor.h"
24 #include "net/net.h"
25 #include "qapi/error.h"
26 #include "qemu/cutils.h"
27 #include "qemu/error-report.h"
28 #include "qemu/iov.h"
29 #include "qemu/main-loop.h"
30 #include "qemu/memalign.h"
31 
32 
33 typedef struct AFXDPState {
34     NetClientState       nc;
35 
36     struct xsk_socket    *xsk;
37     struct xsk_ring_cons rx;
38     struct xsk_ring_prod tx;
39     struct xsk_ring_cons cq;
40     struct xsk_ring_prod fq;
41 
42     char                 ifname[IFNAMSIZ];
43     int                  ifindex;
44     bool                 read_poll;
45     bool                 write_poll;
46     uint32_t             outstanding_tx;
47 
48     uint64_t             *pool;
49     uint32_t             n_pool;
50     char                 *buffer;
51     struct xsk_umem      *umem;
52 
53     uint32_t             n_queues;
54     uint32_t             xdp_flags;
55     bool                 inhibit;
56 } AFXDPState;
57 
58 #define AF_XDP_BATCH_SIZE 64
59 
60 static void af_xdp_send(void *opaque);
61 static void af_xdp_writable(void *opaque);
62 
63 /* Set the event-loop handlers for the af-xdp backend. */
64 static void af_xdp_update_fd_handler(AFXDPState *s)
65 {
66     qemu_set_fd_handler(xsk_socket__fd(s->xsk),
67                         s->read_poll ? af_xdp_send : NULL,
68                         s->write_poll ? af_xdp_writable : NULL,
69                         s);
70 }
71 
72 /* Update the read handler. */
73 static void af_xdp_read_poll(AFXDPState *s, bool enable)
74 {
75     if (s->read_poll != enable) {
76         s->read_poll = enable;
77         af_xdp_update_fd_handler(s);
78     }
79 }
80 
81 /* Update the write handler. */
82 static void af_xdp_write_poll(AFXDPState *s, bool enable)
83 {
84     if (s->write_poll != enable) {
85         s->write_poll = enable;
86         af_xdp_update_fd_handler(s);
87     }
88 }
89 
90 static void af_xdp_poll(NetClientState *nc, bool enable)
91 {
92     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
93 
94     if (s->read_poll != enable || s->write_poll != enable) {
95         s->write_poll = enable;
96         s->read_poll  = enable;
97         af_xdp_update_fd_handler(s);
98     }
99 }
100 
101 static void af_xdp_complete_tx(AFXDPState *s)
102 {
103     uint32_t idx = 0;
104     uint32_t done, i;
105     uint64_t *addr;
106 
107     done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
108 
109     for (i = 0; i < done; i++) {
110         addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
111         s->pool[s->n_pool++] = *addr;
112         s->outstanding_tx--;
113     }
114 
115     if (done) {
116         xsk_ring_cons__release(&s->cq, done);
117     }
118 }
119 
120 /*
121  * The fd_write() callback, invoked if the fd is marked as writable
122  * after a poll.
123  */
124 static void af_xdp_writable(void *opaque)
125 {
126     AFXDPState *s = opaque;
127 
128     /* Try to recover buffers that are already sent. */
129     af_xdp_complete_tx(s);
130 
131     /*
132      * Unregister the handler, unless we still have packets to transmit
133      * and kernel needs a wake up.
134      */
135     if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
136         af_xdp_write_poll(s, false);
137     }
138 
139     /* Flush any buffered packets. */
140     qemu_flush_queued_packets(&s->nc);
141 }
142 
143 static ssize_t af_xdp_receive(NetClientState *nc,
144                               const uint8_t *buf, size_t size)
145 {
146     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
147     struct xdp_desc *desc;
148     uint32_t idx;
149     void *data;
150 
151     /* Try to recover buffers that are already sent. */
152     af_xdp_complete_tx(s);
153 
154     if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
155         /* We can't transmit packet this size... */
156         return size;
157     }
158 
159     if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
160         /*
161          * Out of buffers or space in tx ring.  Poll until we can write.
162          * This will also kick the Tx, if it was waiting on CQ.
163          */
164         af_xdp_write_poll(s, true);
165         return 0;
166     }
167 
168     desc = xsk_ring_prod__tx_desc(&s->tx, idx);
169     desc->addr = s->pool[--s->n_pool];
170     desc->len = size;
171 
172     data = xsk_umem__get_data(s->buffer, desc->addr);
173     memcpy(data, buf, size);
174 
175     xsk_ring_prod__submit(&s->tx, 1);
176     s->outstanding_tx++;
177 
178     if (xsk_ring_prod__needs_wakeup(&s->tx)) {
179         af_xdp_write_poll(s, true);
180     }
181 
182     return size;
183 }
184 
185 /*
186  * Complete a previous send (backend --> guest) and enable the
187  * fd_read callback.
188  */
189 static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
190 {
191     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
192 
193     af_xdp_read_poll(s, true);
194 }
195 
196 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
197 {
198     uint32_t i, idx = 0;
199 
200     /* Leave one packet for Tx, just in case. */
201     if (s->n_pool < n + 1) {
202         n = s->n_pool;
203     }
204 
205     if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
206         return;
207     }
208 
209     for (i = 0; i < n; i++) {
210         *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
211     }
212     xsk_ring_prod__submit(&s->fq, n);
213 
214     if (xsk_ring_prod__needs_wakeup(&s->fq)) {
215         /* Receive was blocked by not having enough buffers.  Wake it up. */
216         af_xdp_read_poll(s, true);
217     }
218 }
219 
220 static void af_xdp_send(void *opaque)
221 {
222     uint32_t i, n_rx, idx = 0;
223     AFXDPState *s = opaque;
224 
225     n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
226     if (!n_rx) {
227         return;
228     }
229 
230     for (i = 0; i < n_rx; i++) {
231         const struct xdp_desc *desc;
232         struct iovec iov;
233 
234         desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
235 
236         iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
237         iov.iov_len = desc->len;
238 
239         s->pool[s->n_pool++] = desc->addr;
240 
241         if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
242                                      af_xdp_send_completed)) {
243             /*
244              * The peer does not receive anymore.  Packet is queued, stop
245              * reading from the backend until af_xdp_send_completed().
246              */
247             af_xdp_read_poll(s, false);
248 
249             /* Return unused descriptors to not break the ring cache. */
250             xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
251             n_rx = i + 1;
252             break;
253         }
254     }
255 
256     /* Release actually sent descriptors and try to re-fill. */
257     xsk_ring_cons__release(&s->rx, n_rx);
258     af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
259 }
260 
261 /* Flush and close. */
262 static void af_xdp_cleanup(NetClientState *nc)
263 {
264     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
265 
266     qemu_purge_queued_packets(nc);
267 
268     af_xdp_poll(nc, false);
269 
270     xsk_socket__delete(s->xsk);
271     s->xsk = NULL;
272     g_free(s->pool);
273     s->pool = NULL;
274     xsk_umem__delete(s->umem);
275     s->umem = NULL;
276     qemu_vfree(s->buffer);
277     s->buffer = NULL;
278 
279     /* Remove the program if it's the last open queue. */
280     if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags
281         && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) {
282         fprintf(stderr,
283                 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
284                 s->ifname, s->ifindex);
285     }
286 }
287 
288 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
289 {
290     struct xsk_umem_config config = {
291         .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
292         .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
293         .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
294         .frame_headroom = 0,
295     };
296     uint64_t n_descs;
297     uint64_t size;
298     int64_t i;
299     int ret;
300 
301     /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
302     n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
303                + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
304     size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
305 
306     s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
307     memset(s->buffer, 0, size);
308 
309     if (sock_fd < 0) {
310         ret = xsk_umem__create(&s->umem, s->buffer, size,
311                                &s->fq, &s->cq, &config);
312     } else {
313         ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
314                                        &s->fq, &s->cq, &config);
315     }
316 
317     if (ret) {
318         qemu_vfree(s->buffer);
319         error_setg_errno(errp, errno,
320                          "failed to create umem for %s queue_index: %d",
321                          s->ifname, s->nc.queue_index);
322         return -1;
323     }
324 
325     s->pool = g_new(uint64_t, n_descs);
326     /* Fill the pool in the opposite order, because it's a LIFO queue. */
327     for (i = n_descs; i >= 0; i--) {
328         s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
329     }
330     s->n_pool = n_descs;
331 
332     af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
333 
334     return 0;
335 }
336 
337 static int af_xdp_socket_create(AFXDPState *s,
338                                 const NetdevAFXDPOptions *opts, Error **errp)
339 {
340     struct xsk_socket_config cfg = {
341         .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
342         .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
343         .libxdp_flags = 0,
344         .bind_flags = XDP_USE_NEED_WAKEUP,
345         .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
346     };
347     int queue_id, error = 0;
348 
349     s->inhibit = opts->has_inhibit && opts->inhibit;
350     if (s->inhibit) {
351         cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
352     }
353 
354     if (opts->has_force_copy && opts->force_copy) {
355         cfg.bind_flags |= XDP_COPY;
356     }
357 
358     queue_id = s->nc.queue_index;
359     if (opts->has_start_queue && opts->start_queue > 0) {
360         queue_id += opts->start_queue;
361     }
362 
363     if (opts->has_mode) {
364         /* Specific mode requested. */
365         cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
366                          ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
367         if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
368                                s->umem, &s->rx, &s->tx, &cfg)) {
369             error = errno;
370         }
371     } else {
372         /* No mode requested, try native first. */
373         cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
374 
375         if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
376                                s->umem, &s->rx, &s->tx, &cfg)) {
377             /* Can't use native mode, try skb. */
378             cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
379             cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
380 
381             if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
382                                    s->umem, &s->rx, &s->tx, &cfg)) {
383                 error = errno;
384             }
385         }
386     }
387 
388     if (error) {
389         error_setg_errno(errp, error,
390                          "failed to create AF_XDP socket for %s queue_id: %d",
391                          s->ifname, queue_id);
392         return -1;
393     }
394 
395     s->xdp_flags = cfg.xdp_flags;
396 
397     return 0;
398 }
399 
400 /* NetClientInfo methods. */
401 static NetClientInfo net_af_xdp_info = {
402     .type = NET_CLIENT_DRIVER_AF_XDP,
403     .size = sizeof(AFXDPState),
404     .receive = af_xdp_receive,
405     .poll = af_xdp_poll,
406     .cleanup = af_xdp_cleanup,
407 };
408 
409 static int *parse_socket_fds(const char *sock_fds_str,
410                              int64_t n_expected, Error **errp)
411 {
412     gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
413     int64_t i, n_sock_fds = g_strv_length(substrings);
414     int *sock_fds = NULL;
415 
416     if (n_sock_fds != n_expected) {
417         error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
418                    n_expected, n_sock_fds);
419         goto exit;
420     }
421 
422     sock_fds = g_new(int, n_sock_fds);
423 
424     for (i = 0; i < n_sock_fds; i++) {
425         sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
426         if (sock_fds[i] < 0) {
427             g_free(sock_fds);
428             sock_fds = NULL;
429             goto exit;
430         }
431     }
432 
433 exit:
434     g_strfreev(substrings);
435     return sock_fds;
436 }
437 
438 /*
439  * The exported init function.
440  *
441  * ... -netdev af-xdp,ifname="..."
442  */
443 int net_init_af_xdp(const Netdev *netdev,
444                     const char *name, NetClientState *peer, Error **errp)
445 {
446     const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
447     NetClientState *nc, *nc0 = NULL;
448     unsigned int ifindex;
449     uint32_t prog_id = 0;
450     int *sock_fds = NULL;
451     int64_t i, queues;
452     Error *err = NULL;
453     AFXDPState *s;
454 
455     ifindex = if_nametoindex(opts->ifname);
456     if (!ifindex) {
457         error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
458                          opts->ifname);
459         return -1;
460     }
461 
462     queues = opts->has_queues ? opts->queues : 1;
463     if (queues < 1) {
464         error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
465                    queues, opts->ifname);
466         return -1;
467     }
468 
469     if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) {
470         error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa");
471         return -1;
472     }
473 
474     if (opts->sock_fds) {
475         sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
476         if (!sock_fds) {
477             return -1;
478         }
479     }
480 
481     for (i = 0; i < queues; i++) {
482         nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
483         qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
484         nc->queue_index = i;
485 
486         if (!nc0) {
487             nc0 = nc;
488         }
489 
490         s = DO_UPCAST(AFXDPState, nc, nc);
491 
492         pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
493         s->ifindex = ifindex;
494         s->n_queues = queues;
495 
496         if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp)
497             || af_xdp_socket_create(s, opts, errp)) {
498             /* Make sure the XDP program will be removed. */
499             s->n_queues = i;
500             error_propagate(errp, err);
501             goto err;
502         }
503     }
504 
505     if (nc0) {
506         s = DO_UPCAST(AFXDPState, nc, nc0);
507         if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
508             error_setg_errno(errp, errno,
509                              "no XDP program loaded on '%s', ifindex: %d",
510                              s->ifname, s->ifindex);
511             goto err;
512         }
513     }
514 
515     af_xdp_read_poll(s, true); /* Initially only poll for reads. */
516 
517     return 0;
518 
519 err:
520     g_free(sock_fds);
521     if (nc0) {
522         qemu_del_net_client(nc0);
523     }
524 
525     return -1;
526 }
527