xref: /qemu/net/af-xdp.c (revision e3404e01)
1 /*
2  * AF_XDP network backend.
3  *
4  * Copyright (c) 2023 Red Hat, Inc.
5  *
6  * Authors:
7  *  Ilya Maximets <i.maximets@ovn.org>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 
14 #include "qemu/osdep.h"
15 #include <bpf/bpf.h>
16 #include <linux/if_link.h>
17 #include <linux/if_xdp.h>
18 #include <net/if.h>
19 #include <xdp/xsk.h>
20 
21 #include "clients.h"
22 #include "monitor/monitor.h"
23 #include "net/net.h"
24 #include "qapi/error.h"
25 #include "qemu/cutils.h"
26 #include "qemu/error-report.h"
27 #include "qemu/iov.h"
28 #include "qemu/main-loop.h"
29 #include "qemu/memalign.h"
30 
31 
32 typedef struct AFXDPState {
33     NetClientState       nc;
34 
35     struct xsk_socket    *xsk;
36     struct xsk_ring_cons rx;
37     struct xsk_ring_prod tx;
38     struct xsk_ring_cons cq;
39     struct xsk_ring_prod fq;
40 
41     char                 ifname[IFNAMSIZ];
42     int                  ifindex;
43     bool                 read_poll;
44     bool                 write_poll;
45     uint32_t             outstanding_tx;
46 
47     uint64_t             *pool;
48     uint32_t             n_pool;
49     char                 *buffer;
50     struct xsk_umem      *umem;
51 
52     uint32_t             n_queues;
53     uint32_t             xdp_flags;
54     bool                 inhibit;
55 } AFXDPState;
56 
57 #define AF_XDP_BATCH_SIZE 64
58 
59 static void af_xdp_send(void *opaque);
60 static void af_xdp_writable(void *opaque);
61 
62 /* Set the event-loop handlers for the af-xdp backend. */
63 static void af_xdp_update_fd_handler(AFXDPState *s)
64 {
65     qemu_set_fd_handler(xsk_socket__fd(s->xsk),
66                         s->read_poll ? af_xdp_send : NULL,
67                         s->write_poll ? af_xdp_writable : NULL,
68                         s);
69 }
70 
71 /* Update the read handler. */
72 static void af_xdp_read_poll(AFXDPState *s, bool enable)
73 {
74     if (s->read_poll != enable) {
75         s->read_poll = enable;
76         af_xdp_update_fd_handler(s);
77     }
78 }
79 
80 /* Update the write handler. */
81 static void af_xdp_write_poll(AFXDPState *s, bool enable)
82 {
83     if (s->write_poll != enable) {
84         s->write_poll = enable;
85         af_xdp_update_fd_handler(s);
86     }
87 }
88 
89 static void af_xdp_poll(NetClientState *nc, bool enable)
90 {
91     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
92 
93     if (s->read_poll != enable || s->write_poll != enable) {
94         s->write_poll = enable;
95         s->read_poll  = enable;
96         af_xdp_update_fd_handler(s);
97     }
98 }
99 
100 static void af_xdp_complete_tx(AFXDPState *s)
101 {
102     uint32_t idx = 0;
103     uint32_t done, i;
104     uint64_t *addr;
105 
106     done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
107 
108     for (i = 0; i < done; i++) {
109         addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
110         s->pool[s->n_pool++] = *addr;
111         s->outstanding_tx--;
112     }
113 
114     if (done) {
115         xsk_ring_cons__release(&s->cq, done);
116     }
117 }
118 
119 /*
120  * The fd_write() callback, invoked if the fd is marked as writable
121  * after a poll.
122  */
123 static void af_xdp_writable(void *opaque)
124 {
125     AFXDPState *s = opaque;
126 
127     /* Try to recover buffers that are already sent. */
128     af_xdp_complete_tx(s);
129 
130     /*
131      * Unregister the handler, unless we still have packets to transmit
132      * and kernel needs a wake up.
133      */
134     if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
135         af_xdp_write_poll(s, false);
136     }
137 
138     /* Flush any buffered packets. */
139     qemu_flush_queued_packets(&s->nc);
140 }
141 
142 static ssize_t af_xdp_receive(NetClientState *nc,
143                               const uint8_t *buf, size_t size)
144 {
145     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
146     struct xdp_desc *desc;
147     uint32_t idx;
148     void *data;
149 
150     /* Try to recover buffers that are already sent. */
151     af_xdp_complete_tx(s);
152 
153     if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
154         /* We can't transmit packet this size... */
155         return size;
156     }
157 
158     if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
159         /*
160          * Out of buffers or space in tx ring.  Poll until we can write.
161          * This will also kick the Tx, if it was waiting on CQ.
162          */
163         af_xdp_write_poll(s, true);
164         return 0;
165     }
166 
167     desc = xsk_ring_prod__tx_desc(&s->tx, idx);
168     desc->addr = s->pool[--s->n_pool];
169     desc->len = size;
170 
171     data = xsk_umem__get_data(s->buffer, desc->addr);
172     memcpy(data, buf, size);
173 
174     xsk_ring_prod__submit(&s->tx, 1);
175     s->outstanding_tx++;
176 
177     if (xsk_ring_prod__needs_wakeup(&s->tx)) {
178         af_xdp_write_poll(s, true);
179     }
180 
181     return size;
182 }
183 
184 /*
185  * Complete a previous send (backend --> guest) and enable the
186  * fd_read callback.
187  */
188 static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
189 {
190     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
191 
192     af_xdp_read_poll(s, true);
193 }
194 
195 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
196 {
197     uint32_t i, idx = 0;
198 
199     /* Leave one packet for Tx, just in case. */
200     if (s->n_pool < n + 1) {
201         n = s->n_pool;
202     }
203 
204     if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
205         return;
206     }
207 
208     for (i = 0; i < n; i++) {
209         *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
210     }
211     xsk_ring_prod__submit(&s->fq, n);
212 
213     if (xsk_ring_prod__needs_wakeup(&s->fq)) {
214         /* Receive was blocked by not having enough buffers.  Wake it up. */
215         af_xdp_read_poll(s, true);
216     }
217 }
218 
219 static void af_xdp_send(void *opaque)
220 {
221     uint32_t i, n_rx, idx = 0;
222     AFXDPState *s = opaque;
223 
224     n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
225     if (!n_rx) {
226         return;
227     }
228 
229     for (i = 0; i < n_rx; i++) {
230         const struct xdp_desc *desc;
231         struct iovec iov;
232 
233         desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
234 
235         iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
236         iov.iov_len = desc->len;
237 
238         s->pool[s->n_pool++] = desc->addr;
239 
240         if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
241                                      af_xdp_send_completed)) {
242             /*
243              * The peer does not receive anymore.  Packet is queued, stop
244              * reading from the backend until af_xdp_send_completed().
245              */
246             af_xdp_read_poll(s, false);
247 
248             /* Return unused descriptors to not break the ring cache. */
249             xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
250             n_rx = i + 1;
251             break;
252         }
253     }
254 
255     /* Release actually sent descriptors and try to re-fill. */
256     xsk_ring_cons__release(&s->rx, n_rx);
257     af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
258 }
259 
260 /* Flush and close. */
261 static void af_xdp_cleanup(NetClientState *nc)
262 {
263     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
264 
265     qemu_purge_queued_packets(nc);
266 
267     af_xdp_poll(nc, false);
268 
269     xsk_socket__delete(s->xsk);
270     s->xsk = NULL;
271     g_free(s->pool);
272     s->pool = NULL;
273     xsk_umem__delete(s->umem);
274     s->umem = NULL;
275     qemu_vfree(s->buffer);
276     s->buffer = NULL;
277 
278     /* Remove the program if it's the last open queue. */
279     if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags
280         && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) {
281         fprintf(stderr,
282                 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
283                 s->ifname, s->ifindex);
284     }
285 }
286 
287 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
288 {
289     struct xsk_umem_config config = {
290         .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
291         .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
292         .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
293         .frame_headroom = 0,
294     };
295     uint64_t n_descs;
296     uint64_t size;
297     int64_t i;
298     int ret;
299 
300     /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
301     n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
302                + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
303     size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
304 
305     s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
306     memset(s->buffer, 0, size);
307 
308     if (sock_fd < 0) {
309         ret = xsk_umem__create(&s->umem, s->buffer, size,
310                                &s->fq, &s->cq, &config);
311     } else {
312         ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
313                                        &s->fq, &s->cq, &config);
314     }
315 
316     if (ret) {
317         qemu_vfree(s->buffer);
318         error_setg_errno(errp, errno,
319                          "failed to create umem for %s queue_index: %d",
320                          s->ifname, s->nc.queue_index);
321         return -1;
322     }
323 
324     s->pool = g_new(uint64_t, n_descs);
325     /* Fill the pool in the opposite order, because it's a LIFO queue. */
326     for (i = n_descs; i >= 0; i--) {
327         s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
328     }
329     s->n_pool = n_descs;
330 
331     af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
332 
333     return 0;
334 }
335 
336 static int af_xdp_socket_create(AFXDPState *s,
337                                 const NetdevAFXDPOptions *opts, Error **errp)
338 {
339     struct xsk_socket_config cfg = {
340         .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
341         .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
342         .libxdp_flags = 0,
343         .bind_flags = XDP_USE_NEED_WAKEUP,
344         .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
345     };
346     int queue_id, error = 0;
347 
348     s->inhibit = opts->has_inhibit && opts->inhibit;
349     if (s->inhibit) {
350         cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
351     }
352 
353     if (opts->has_force_copy && opts->force_copy) {
354         cfg.bind_flags |= XDP_COPY;
355     }
356 
357     queue_id = s->nc.queue_index;
358     if (opts->has_start_queue && opts->start_queue > 0) {
359         queue_id += opts->start_queue;
360     }
361 
362     if (opts->has_mode) {
363         /* Specific mode requested. */
364         cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
365                          ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
366         if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
367                                s->umem, &s->rx, &s->tx, &cfg)) {
368             error = errno;
369         }
370     } else {
371         /* No mode requested, try native first. */
372         cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
373 
374         if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
375                                s->umem, &s->rx, &s->tx, &cfg)) {
376             /* Can't use native mode, try skb. */
377             cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
378             cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
379 
380             if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
381                                    s->umem, &s->rx, &s->tx, &cfg)) {
382                 error = errno;
383             }
384         }
385     }
386 
387     if (error) {
388         error_setg_errno(errp, error,
389                          "failed to create AF_XDP socket for %s queue_id: %d",
390                          s->ifname, queue_id);
391         return -1;
392     }
393 
394     s->xdp_flags = cfg.xdp_flags;
395 
396     return 0;
397 }
398 
399 /* NetClientInfo methods. */
400 static NetClientInfo net_af_xdp_info = {
401     .type = NET_CLIENT_DRIVER_AF_XDP,
402     .size = sizeof(AFXDPState),
403     .receive = af_xdp_receive,
404     .poll = af_xdp_poll,
405     .cleanup = af_xdp_cleanup,
406 };
407 
408 static int *parse_socket_fds(const char *sock_fds_str,
409                              int64_t n_expected, Error **errp)
410 {
411     gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
412     int64_t i, n_sock_fds = g_strv_length(substrings);
413     int *sock_fds = NULL;
414 
415     if (n_sock_fds != n_expected) {
416         error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
417                    n_expected, n_sock_fds);
418         goto exit;
419     }
420 
421     sock_fds = g_new(int, n_sock_fds);
422 
423     for (i = 0; i < n_sock_fds; i++) {
424         sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
425         if (sock_fds[i] < 0) {
426             g_free(sock_fds);
427             sock_fds = NULL;
428             goto exit;
429         }
430     }
431 
432 exit:
433     g_strfreev(substrings);
434     return sock_fds;
435 }
436 
437 /*
438  * The exported init function.
439  *
440  * ... -netdev af-xdp,ifname="..."
441  */
442 int net_init_af_xdp(const Netdev *netdev,
443                     const char *name, NetClientState *peer, Error **errp)
444 {
445     const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
446     NetClientState *nc, *nc0 = NULL;
447     unsigned int ifindex;
448     uint32_t prog_id = 0;
449     g_autofree int *sock_fds = NULL;
450     int64_t i, queues;
451     Error *err = NULL;
452     AFXDPState *s;
453 
454     ifindex = if_nametoindex(opts->ifname);
455     if (!ifindex) {
456         error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
457                          opts->ifname);
458         return -1;
459     }
460 
461     queues = opts->has_queues ? opts->queues : 1;
462     if (queues < 1) {
463         error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
464                    queues, opts->ifname);
465         return -1;
466     }
467 
468     if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) {
469         error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa");
470         return -1;
471     }
472 
473     if (opts->sock_fds) {
474         sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
475         if (!sock_fds) {
476             return -1;
477         }
478     }
479 
480     for (i = 0; i < queues; i++) {
481         nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
482         qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
483         nc->queue_index = i;
484 
485         if (!nc0) {
486             nc0 = nc;
487         }
488 
489         s = DO_UPCAST(AFXDPState, nc, nc);
490 
491         pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
492         s->ifindex = ifindex;
493         s->n_queues = queues;
494 
495         if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp)
496             || af_xdp_socket_create(s, opts, errp)) {
497             /* Make sure the XDP program will be removed. */
498             s->n_queues = i;
499             error_propagate(errp, err);
500             goto err;
501         }
502     }
503 
504     if (nc0) {
505         s = DO_UPCAST(AFXDPState, nc, nc0);
506         if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
507             error_setg_errno(errp, errno,
508                              "no XDP program loaded on '%s', ifindex: %d",
509                              s->ifname, s->ifindex);
510             goto err;
511         }
512     }
513 
514     af_xdp_read_poll(s, true); /* Initially only poll for reads. */
515 
516     return 0;
517 
518 err:
519     if (nc0) {
520         qemu_del_net_client(nc0);
521     }
522 
523     return -1;
524 }
525