1 // SPDX-License-Identifier: GPL-2.0
2 /* Copyright(c) 2020 Intel Corporation. */
3 
4 #define _GNU_SOURCE
5 #include <poll.h>
6 #include <pthread.h>
7 #include <signal.h>
8 #include <sched.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/mman.h>
13 #include <sys/resource.h>
14 #include <sys/socket.h>
15 #include <sys/types.h>
16 #include <time.h>
17 #include <unistd.h>
18 #include <getopt.h>
19 #include <netinet/ether.h>
20 #include <net/if.h>
21 
22 #include <linux/bpf.h>
23 #include <linux/if_link.h>
24 #include <linux/if_xdp.h>
25 
26 #include <bpf/libbpf.h>
27 #include <bpf/xsk.h>
28 #include <bpf/bpf.h>
29 
30 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
31 
32 typedef __u64 u64;
33 typedef __u32 u32;
34 typedef __u16 u16;
35 typedef __u8  u8;
36 
37 /* This program illustrates the packet forwarding between multiple AF_XDP
38  * sockets in multi-threaded environment. All threads are sharing a common
39  * buffer pool, with each socket having its own private buffer cache.
40  *
41  * Example 1: Single thread handling two sockets. The packets received by socket
42  * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
43  * QB), while the packets received by socket B are forwarded to socket A. The
44  * thread is running on CPU core X:
45  *
46  *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
47  *
48  * Example 2: Two threads, each handling two sockets. The thread running on CPU
49  * core X forwards all the packets received by socket A to socket B, and all the
50  * packets received by socket B to socket A. The thread running on CPU core Y is
51  * performing the same packet forwarding between sockets C and D:
52  *
53  *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
54  *         -c CX -c CY
55  */
56 
57 /*
58  * Buffer pool and buffer cache
59  *
60  * For packet forwarding, the packet buffers are typically allocated from the
61  * pool for packet reception and freed back to the pool for further reuse once
62  * the packet transmission is completed.
63  *
64  * The buffer pool is shared between multiple threads. In order to minimize the
65  * access latency to the shared buffer pool, each thread creates one (or
66  * several) buffer caches, which, unlike the buffer pool, are private to the
67  * thread that creates them and therefore cannot be shared with other threads.
68  * The access to the shared pool is only needed either (A) when the cache gets
69  * empty due to repeated buffer allocations and it needs to be replenished from
70  * the pool, or (B) when the cache gets full due to repeated buffer free and it
71  * needs to be flushed back to the pull.
72  *
73  * In a packet forwarding system, a packet received on any input port can
74  * potentially be transmitted on any output port, depending on the forwarding
75  * configuration. For AF_XDP sockets, for this to work with zero-copy of the
76  * packet buffers when, it is required that the buffer pool memory fits into the
77  * UMEM area shared by all the sockets.
78  */
79 
80 struct bpool_params {
81 	u32 n_buffers;
82 	u32 buffer_size;
83 	int mmap_flags;
84 
85 	u32 n_users_max;
86 	u32 n_buffers_per_slab;
87 };
88 
89 /* This buffer pool implementation organizes the buffers into equally sized
90  * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
91  * pool that are completely filled with buffer pointers (full slabs).
92  *
93  * Each buffer cache has a slab for buffer allocation and a slab for buffer
94  * free, with both of these slabs initially empty. When the cache's allocation
95  * slab goes empty, it is swapped with one of the available full slabs from the
96  * pool, if any is available. When the cache's free slab goes full, it is
97  * swapped for one of the empty slabs from the pool, which is guaranteed to
98  * succeed.
99  *
100  * Partially filled slabs never get traded between the cache and the pool
101  * (except when the cache itself is destroyed), which enables fast operation
102  * through pointer swapping.
103  */
104 struct bpool {
105 	struct bpool_params params;
106 	pthread_mutex_t lock;
107 	void *addr;
108 
109 	u64 **slabs;
110 	u64 **slabs_reserved;
111 	u64 *buffers;
112 	u64 *buffers_reserved;
113 
114 	u64 n_slabs;
115 	u64 n_slabs_reserved;
116 	u64 n_buffers;
117 
118 	u64 n_slabs_available;
119 	u64 n_slabs_reserved_available;
120 
121 	struct xsk_umem_config umem_cfg;
122 	struct xsk_ring_prod umem_fq;
123 	struct xsk_ring_cons umem_cq;
124 	struct xsk_umem *umem;
125 };
126 
127 static struct bpool *
bpool_init(struct bpool_params * params,struct xsk_umem_config * umem_cfg)128 bpool_init(struct bpool_params *params,
129 	   struct xsk_umem_config *umem_cfg)
130 {
131 	struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
132 	u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
133 	u64 slabs_size, slabs_reserved_size;
134 	u64 buffers_size, buffers_reserved_size;
135 	u64 total_size, i;
136 	struct bpool *bp;
137 	u8 *p;
138 	int status;
139 
140 	/* mmap prep. */
141 	if (setrlimit(RLIMIT_MEMLOCK, &r))
142 		return NULL;
143 
144 	/* bpool internals dimensioning. */
145 	n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
146 		params->n_buffers_per_slab;
147 	n_slabs_reserved = params->n_users_max * 2;
148 	n_buffers = n_slabs * params->n_buffers_per_slab;
149 	n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
150 
151 	slabs_size = n_slabs * sizeof(u64 *);
152 	slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
153 	buffers_size = n_buffers * sizeof(u64);
154 	buffers_reserved_size = n_buffers_reserved * sizeof(u64);
155 
156 	total_size = sizeof(struct bpool) +
157 		slabs_size + slabs_reserved_size +
158 		buffers_size + buffers_reserved_size;
159 
160 	/* bpool memory allocation. */
161 	p = calloc(total_size, sizeof(u8));
162 	if (!p)
163 		return NULL;
164 
165 	/* bpool memory initialization. */
166 	bp = (struct bpool *)p;
167 	memcpy(&bp->params, params, sizeof(*params));
168 	bp->params.n_buffers = n_buffers;
169 
170 	bp->slabs = (u64 **)&p[sizeof(struct bpool)];
171 	bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
172 		slabs_size];
173 	bp->buffers = (u64 *)&p[sizeof(struct bpool) +
174 		slabs_size + slabs_reserved_size];
175 	bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
176 		slabs_size + slabs_reserved_size + buffers_size];
177 
178 	bp->n_slabs = n_slabs;
179 	bp->n_slabs_reserved = n_slabs_reserved;
180 	bp->n_buffers = n_buffers;
181 
182 	for (i = 0; i < n_slabs; i++)
183 		bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
184 	bp->n_slabs_available = n_slabs;
185 
186 	for (i = 0; i < n_slabs_reserved; i++)
187 		bp->slabs_reserved[i] = &bp->buffers_reserved[i *
188 			params->n_buffers_per_slab];
189 	bp->n_slabs_reserved_available = n_slabs_reserved;
190 
191 	for (i = 0; i < n_buffers; i++)
192 		bp->buffers[i] = i * params->buffer_size;
193 
194 	/* lock. */
195 	status = pthread_mutex_init(&bp->lock, NULL);
196 	if (status) {
197 		free(p);
198 		return NULL;
199 	}
200 
201 	/* mmap. */
202 	bp->addr = mmap(NULL,
203 			n_buffers * params->buffer_size,
204 			PROT_READ | PROT_WRITE,
205 			MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
206 			-1,
207 			0);
208 	if (bp->addr == MAP_FAILED) {
209 		pthread_mutex_destroy(&bp->lock);
210 		free(p);
211 		return NULL;
212 	}
213 
214 	/* umem. */
215 	status = xsk_umem__create(&bp->umem,
216 				  bp->addr,
217 				  bp->params.n_buffers * bp->params.buffer_size,
218 				  &bp->umem_fq,
219 				  &bp->umem_cq,
220 				  umem_cfg);
221 	if (status) {
222 		munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
223 		pthread_mutex_destroy(&bp->lock);
224 		free(p);
225 		return NULL;
226 	}
227 	memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
228 
229 	return bp;
230 }
231 
232 static void
bpool_free(struct bpool * bp)233 bpool_free(struct bpool *bp)
234 {
235 	if (!bp)
236 		return;
237 
238 	xsk_umem__delete(bp->umem);
239 	munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
240 	pthread_mutex_destroy(&bp->lock);
241 	free(bp);
242 }
243 
244 struct bcache {
245 	struct bpool *bp;
246 
247 	u64 *slab_cons;
248 	u64 *slab_prod;
249 
250 	u64 n_buffers_cons;
251 	u64 n_buffers_prod;
252 };
253 
254 static u32
bcache_slab_size(struct bcache * bc)255 bcache_slab_size(struct bcache *bc)
256 {
257 	struct bpool *bp = bc->bp;
258 
259 	return bp->params.n_buffers_per_slab;
260 }
261 
262 static struct bcache *
bcache_init(struct bpool * bp)263 bcache_init(struct bpool *bp)
264 {
265 	struct bcache *bc;
266 
267 	bc = calloc(1, sizeof(struct bcache));
268 	if (!bc)
269 		return NULL;
270 
271 	bc->bp = bp;
272 	bc->n_buffers_cons = 0;
273 	bc->n_buffers_prod = 0;
274 
275 	pthread_mutex_lock(&bp->lock);
276 	if (bp->n_slabs_reserved_available == 0) {
277 		pthread_mutex_unlock(&bp->lock);
278 		free(bc);
279 		return NULL;
280 	}
281 
282 	bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
283 	bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
284 	bp->n_slabs_reserved_available -= 2;
285 	pthread_mutex_unlock(&bp->lock);
286 
287 	return bc;
288 }
289 
290 static void
bcache_free(struct bcache * bc)291 bcache_free(struct bcache *bc)
292 {
293 	struct bpool *bp;
294 
295 	if (!bc)
296 		return;
297 
298 	/* In order to keep this example simple, the case of freeing any
299 	 * existing buffers from the cache back to the pool is ignored.
300 	 */
301 
302 	bp = bc->bp;
303 	pthread_mutex_lock(&bp->lock);
304 	bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
305 	bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
306 	bp->n_slabs_reserved_available += 2;
307 	pthread_mutex_unlock(&bp->lock);
308 
309 	free(bc);
310 }
311 
312 /* To work correctly, the implementation requires that the *n_buffers* input
313  * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
314  * is typically the case, with one exception taking place when large number of
315  * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
316  */
317 static inline u32
bcache_cons_check(struct bcache * bc,u32 n_buffers)318 bcache_cons_check(struct bcache *bc, u32 n_buffers)
319 {
320 	struct bpool *bp = bc->bp;
321 	u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
322 	u64 n_buffers_cons = bc->n_buffers_cons;
323 	u64 n_slabs_available;
324 	u64 *slab_full;
325 
326 	/*
327 	 * Consumer slab is not empty: Use what's available locally. Do not
328 	 * look for more buffers from the pool when the ask can only be
329 	 * partially satisfied.
330 	 */
331 	if (n_buffers_cons)
332 		return (n_buffers_cons < n_buffers) ?
333 			n_buffers_cons :
334 			n_buffers;
335 
336 	/*
337 	 * Consumer slab is empty: look to trade the current consumer slab
338 	 * (full) for a full slab from the pool, if any is available.
339 	 */
340 	pthread_mutex_lock(&bp->lock);
341 	n_slabs_available = bp->n_slabs_available;
342 	if (!n_slabs_available) {
343 		pthread_mutex_unlock(&bp->lock);
344 		return 0;
345 	}
346 
347 	n_slabs_available--;
348 	slab_full = bp->slabs[n_slabs_available];
349 	bp->slabs[n_slabs_available] = bc->slab_cons;
350 	bp->n_slabs_available = n_slabs_available;
351 	pthread_mutex_unlock(&bp->lock);
352 
353 	bc->slab_cons = slab_full;
354 	bc->n_buffers_cons = n_buffers_per_slab;
355 	return n_buffers;
356 }
357 
358 static inline u64
bcache_cons(struct bcache * bc)359 bcache_cons(struct bcache *bc)
360 {
361 	u64 n_buffers_cons = bc->n_buffers_cons - 1;
362 	u64 buffer;
363 
364 	buffer = bc->slab_cons[n_buffers_cons];
365 	bc->n_buffers_cons = n_buffers_cons;
366 	return buffer;
367 }
368 
369 static inline void
bcache_prod(struct bcache * bc,u64 buffer)370 bcache_prod(struct bcache *bc, u64 buffer)
371 {
372 	struct bpool *bp = bc->bp;
373 	u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
374 	u64 n_buffers_prod = bc->n_buffers_prod;
375 	u64 n_slabs_available;
376 	u64 *slab_empty;
377 
378 	/*
379 	 * Producer slab is not yet full: store the current buffer to it.
380 	 */
381 	if (n_buffers_prod < n_buffers_per_slab) {
382 		bc->slab_prod[n_buffers_prod] = buffer;
383 		bc->n_buffers_prod = n_buffers_prod + 1;
384 		return;
385 	}
386 
387 	/*
388 	 * Producer slab is full: trade the cache's current producer slab
389 	 * (full) for an empty slab from the pool, then store the current
390 	 * buffer to the new producer slab. As one full slab exists in the
391 	 * cache, it is guaranteed that there is at least one empty slab
392 	 * available in the pool.
393 	 */
394 	pthread_mutex_lock(&bp->lock);
395 	n_slabs_available = bp->n_slabs_available;
396 	slab_empty = bp->slabs[n_slabs_available];
397 	bp->slabs[n_slabs_available] = bc->slab_prod;
398 	bp->n_slabs_available = n_slabs_available + 1;
399 	pthread_mutex_unlock(&bp->lock);
400 
401 	slab_empty[0] = buffer;
402 	bc->slab_prod = slab_empty;
403 	bc->n_buffers_prod = 1;
404 }
405 
406 /*
407  * Port
408  *
409  * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
410  * packet forwarding to happen with no packet buffer copy, all the sockets need
411  * to share the same UMEM area, which is used as the buffer pool memory.
412  */
413 #ifndef MAX_BURST_RX
414 #define MAX_BURST_RX 64
415 #endif
416 
417 #ifndef MAX_BURST_TX
418 #define MAX_BURST_TX 64
419 #endif
420 
421 struct burst_rx {
422 	u64 addr[MAX_BURST_RX];
423 	u32 len[MAX_BURST_RX];
424 };
425 
426 struct burst_tx {
427 	u64 addr[MAX_BURST_TX];
428 	u32 len[MAX_BURST_TX];
429 	u32 n_pkts;
430 };
431 
432 struct port_params {
433 	struct xsk_socket_config xsk_cfg;
434 	struct bpool *bp;
435 	const char *iface;
436 	u32 iface_queue;
437 };
438 
439 struct port {
440 	struct port_params params;
441 
442 	struct bcache *bc;
443 
444 	struct xsk_ring_cons rxq;
445 	struct xsk_ring_prod txq;
446 	struct xsk_ring_prod umem_fq;
447 	struct xsk_ring_cons umem_cq;
448 	struct xsk_socket *xsk;
449 	int umem_fq_initialized;
450 
451 	u64 n_pkts_rx;
452 	u64 n_pkts_tx;
453 };
454 
455 static void
port_free(struct port * p)456 port_free(struct port *p)
457 {
458 	if (!p)
459 		return;
460 
461 	/* To keep this example simple, the code to free the buffers from the
462 	 * socket's receive and transmit queues, as well as from the UMEM fill
463 	 * and completion queues, is not included.
464 	 */
465 
466 	if (p->xsk)
467 		xsk_socket__delete(p->xsk);
468 
469 	bcache_free(p->bc);
470 
471 	free(p);
472 }
473 
474 static struct port *
port_init(struct port_params * params)475 port_init(struct port_params *params)
476 {
477 	struct port *p;
478 	u32 umem_fq_size, pos = 0;
479 	int status, i;
480 
481 	/* Memory allocation and initialization. */
482 	p = calloc(sizeof(struct port), 1);
483 	if (!p)
484 		return NULL;
485 
486 	memcpy(&p->params, params, sizeof(p->params));
487 	umem_fq_size = params->bp->umem_cfg.fill_size;
488 
489 	/* bcache. */
490 	p->bc = bcache_init(params->bp);
491 	if (!p->bc ||
492 	    (bcache_slab_size(p->bc) < umem_fq_size) ||
493 	    (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
494 		port_free(p);
495 		return NULL;
496 	}
497 
498 	/* xsk socket. */
499 	status = xsk_socket__create_shared(&p->xsk,
500 					   params->iface,
501 					   params->iface_queue,
502 					   params->bp->umem,
503 					   &p->rxq,
504 					   &p->txq,
505 					   &p->umem_fq,
506 					   &p->umem_cq,
507 					   &params->xsk_cfg);
508 	if (status) {
509 		port_free(p);
510 		return NULL;
511 	}
512 
513 	/* umem fq. */
514 	xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
515 
516 	for (i = 0; i < umem_fq_size; i++)
517 		*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
518 			bcache_cons(p->bc);
519 
520 	xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
521 	p->umem_fq_initialized = 1;
522 
523 	return p;
524 }
525 
526 static inline u32
port_rx_burst(struct port * p,struct burst_rx * b)527 port_rx_burst(struct port *p, struct burst_rx *b)
528 {
529 	u32 n_pkts, pos, i;
530 
531 	/* Free buffers for FQ replenish. */
532 	n_pkts = ARRAY_SIZE(b->addr);
533 
534 	n_pkts = bcache_cons_check(p->bc, n_pkts);
535 	if (!n_pkts)
536 		return 0;
537 
538 	/* RXQ. */
539 	n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
540 	if (!n_pkts) {
541 		if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
542 			struct pollfd pollfd = {
543 				.fd = xsk_socket__fd(p->xsk),
544 				.events = POLLIN,
545 			};
546 
547 			poll(&pollfd, 1, 0);
548 		}
549 		return 0;
550 	}
551 
552 	for (i = 0; i < n_pkts; i++) {
553 		b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
554 		b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
555 	}
556 
557 	xsk_ring_cons__release(&p->rxq, n_pkts);
558 	p->n_pkts_rx += n_pkts;
559 
560 	/* UMEM FQ. */
561 	for ( ; ; ) {
562 		int status;
563 
564 		status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
565 		if (status == n_pkts)
566 			break;
567 
568 		if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
569 			struct pollfd pollfd = {
570 				.fd = xsk_socket__fd(p->xsk),
571 				.events = POLLIN,
572 			};
573 
574 			poll(&pollfd, 1, 0);
575 		}
576 	}
577 
578 	for (i = 0; i < n_pkts; i++)
579 		*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
580 			bcache_cons(p->bc);
581 
582 	xsk_ring_prod__submit(&p->umem_fq, n_pkts);
583 
584 	return n_pkts;
585 }
586 
587 static inline void
port_tx_burst(struct port * p,struct burst_tx * b)588 port_tx_burst(struct port *p, struct burst_tx *b)
589 {
590 	u32 n_pkts, pos, i;
591 	int status;
592 
593 	/* UMEM CQ. */
594 	n_pkts = p->params.bp->umem_cfg.comp_size;
595 
596 	n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
597 
598 	for (i = 0; i < n_pkts; i++) {
599 		u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
600 
601 		bcache_prod(p->bc, addr);
602 	}
603 
604 	xsk_ring_cons__release(&p->umem_cq, n_pkts);
605 
606 	/* TXQ. */
607 	n_pkts = b->n_pkts;
608 
609 	for ( ; ; ) {
610 		status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
611 		if (status == n_pkts)
612 			break;
613 
614 		if (xsk_ring_prod__needs_wakeup(&p->txq))
615 			sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
616 			       NULL, 0);
617 	}
618 
619 	for (i = 0; i < n_pkts; i++) {
620 		xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
621 		xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
622 	}
623 
624 	xsk_ring_prod__submit(&p->txq, n_pkts);
625 	if (xsk_ring_prod__needs_wakeup(&p->txq))
626 		sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
627 	p->n_pkts_tx += n_pkts;
628 }
629 
630 /*
631  * Thread
632  *
633  * Packet forwarding threads.
634  */
635 #ifndef MAX_PORTS_PER_THREAD
636 #define MAX_PORTS_PER_THREAD 16
637 #endif
638 
639 struct thread_data {
640 	struct port *ports_rx[MAX_PORTS_PER_THREAD];
641 	struct port *ports_tx[MAX_PORTS_PER_THREAD];
642 	u32 n_ports_rx;
643 	struct burst_rx burst_rx;
644 	struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
645 	u32 cpu_core_id;
646 	int quit;
647 };
648 
swap_mac_addresses(void * data)649 static void swap_mac_addresses(void *data)
650 {
651 	struct ether_header *eth = (struct ether_header *)data;
652 	struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
653 	struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
654 	struct ether_addr tmp;
655 
656 	tmp = *src_addr;
657 	*src_addr = *dst_addr;
658 	*dst_addr = tmp;
659 }
660 
661 static void *
thread_func(void * arg)662 thread_func(void *arg)
663 {
664 	struct thread_data *t = arg;
665 	cpu_set_t cpu_cores;
666 	u32 i;
667 
668 	CPU_ZERO(&cpu_cores);
669 	CPU_SET(t->cpu_core_id, &cpu_cores);
670 	pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
671 
672 	for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
673 		struct port *port_rx = t->ports_rx[i];
674 		struct port *port_tx = t->ports_tx[i];
675 		struct burst_rx *brx = &t->burst_rx;
676 		struct burst_tx *btx = &t->burst_tx[i];
677 		u32 n_pkts, j;
678 
679 		/* RX. */
680 		n_pkts = port_rx_burst(port_rx, brx);
681 		if (!n_pkts)
682 			continue;
683 
684 		/* Process & TX. */
685 		for (j = 0; j < n_pkts; j++) {
686 			u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
687 			u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
688 						     addr);
689 
690 			swap_mac_addresses(pkt);
691 
692 			btx->addr[btx->n_pkts] = brx->addr[j];
693 			btx->len[btx->n_pkts] = brx->len[j];
694 			btx->n_pkts++;
695 
696 			if (btx->n_pkts == MAX_BURST_TX) {
697 				port_tx_burst(port_tx, btx);
698 				btx->n_pkts = 0;
699 			}
700 		}
701 	}
702 
703 	return NULL;
704 }
705 
706 /*
707  * Process
708  */
709 static const struct bpool_params bpool_params_default = {
710 	.n_buffers = 64 * 1024,
711 	.buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
712 	.mmap_flags = 0,
713 
714 	.n_users_max = 16,
715 	.n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
716 };
717 
718 static const struct xsk_umem_config umem_cfg_default = {
719 	.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
720 	.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
721 	.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
722 	.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
723 	.flags = 0,
724 };
725 
726 static const struct port_params port_params_default = {
727 	.xsk_cfg = {
728 		.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
729 		.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
730 		.libbpf_flags = 0,
731 		.xdp_flags = XDP_FLAGS_DRV_MODE,
732 		.bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
733 	},
734 
735 	.bp = NULL,
736 	.iface = NULL,
737 	.iface_queue = 0,
738 };
739 
740 #ifndef MAX_PORTS
741 #define MAX_PORTS 64
742 #endif
743 
744 #ifndef MAX_THREADS
745 #define MAX_THREADS 64
746 #endif
747 
748 static struct bpool_params bpool_params;
749 static struct xsk_umem_config umem_cfg;
750 static struct bpool *bp;
751 
752 static struct port_params port_params[MAX_PORTS];
753 static struct port *ports[MAX_PORTS];
754 static u64 n_pkts_rx[MAX_PORTS];
755 static u64 n_pkts_tx[MAX_PORTS];
756 static int n_ports;
757 
758 static pthread_t threads[MAX_THREADS];
759 static struct thread_data thread_data[MAX_THREADS];
760 static int n_threads;
761 
762 static void
print_usage(char * prog_name)763 print_usage(char *prog_name)
764 {
765 	const char *usage =
766 		"Usage:\n"
767 		"\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
768 		"\n"
769 		"-c CORE        CPU core to run a packet forwarding thread\n"
770 		"               on. May be invoked multiple times.\n"
771 		"\n"
772 		"-b SIZE        Number of buffers in the buffer pool shared\n"
773 		"               by all the forwarding threads. Default: %u.\n"
774 		"\n"
775 		"-i INTERFACE   Network interface. Each (INTERFACE, QUEUE)\n"
776 		"               pair specifies one forwarding port. May be\n"
777 		"               invoked multiple times.\n"
778 		"\n"
779 		"-q QUEUE       Network interface queue for RX and TX. Each\n"
780 		"               (INTERFACE, QUEUE) pair specified one\n"
781 		"               forwarding port. Default: %u. May be invoked\n"
782 		"               multiple times.\n"
783 		"\n";
784 	printf(usage,
785 	       prog_name,
786 	       bpool_params_default.n_buffers,
787 	       port_params_default.iface_queue);
788 }
789 
790 static int
parse_args(int argc,char ** argv)791 parse_args(int argc, char **argv)
792 {
793 	struct option lgopts[] = {
794 		{ NULL,  0, 0, 0 }
795 	};
796 	int opt, option_index;
797 
798 	/* Parse the input arguments. */
799 	for ( ; ;) {
800 		opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
801 		if (opt == EOF)
802 			break;
803 
804 		switch (opt) {
805 		case 'b':
806 			bpool_params.n_buffers = atoi(optarg);
807 			break;
808 
809 		case 'c':
810 			if (n_threads == MAX_THREADS) {
811 				printf("Max number of threads (%d) reached.\n",
812 				       MAX_THREADS);
813 				return -1;
814 			}
815 
816 			thread_data[n_threads].cpu_core_id = atoi(optarg);
817 			n_threads++;
818 			break;
819 
820 		case 'i':
821 			if (n_ports == MAX_PORTS) {
822 				printf("Max number of ports (%d) reached.\n",
823 				       MAX_PORTS);
824 				return -1;
825 			}
826 
827 			port_params[n_ports].iface = optarg;
828 			port_params[n_ports].iface_queue = 0;
829 			n_ports++;
830 			break;
831 
832 		case 'q':
833 			if (n_ports == 0) {
834 				printf("No port specified for queue.\n");
835 				return -1;
836 			}
837 			port_params[n_ports - 1].iface_queue = atoi(optarg);
838 			break;
839 
840 		default:
841 			printf("Illegal argument.\n");
842 			return -1;
843 		}
844 	}
845 
846 	optind = 1; /* reset getopt lib */
847 
848 	/* Check the input arguments. */
849 	if (!n_ports) {
850 		printf("No ports specified.\n");
851 		return -1;
852 	}
853 
854 	if (!n_threads) {
855 		printf("No threads specified.\n");
856 		return -1;
857 	}
858 
859 	if (n_ports % n_threads) {
860 		printf("Ports cannot be evenly distributed to threads.\n");
861 		return -1;
862 	}
863 
864 	return 0;
865 }
866 
867 static void
print_port(u32 port_id)868 print_port(u32 port_id)
869 {
870 	struct port *port = ports[port_id];
871 
872 	printf("Port %u: interface = %s, queue = %u\n",
873 	       port_id, port->params.iface, port->params.iface_queue);
874 }
875 
876 static void
print_thread(u32 thread_id)877 print_thread(u32 thread_id)
878 {
879 	struct thread_data *t = &thread_data[thread_id];
880 	u32 i;
881 
882 	printf("Thread %u (CPU core %u): ",
883 	       thread_id, t->cpu_core_id);
884 
885 	for (i = 0; i < t->n_ports_rx; i++) {
886 		struct port *port_rx = t->ports_rx[i];
887 		struct port *port_tx = t->ports_tx[i];
888 
889 		printf("(%s, %u) -> (%s, %u), ",
890 		       port_rx->params.iface,
891 		       port_rx->params.iface_queue,
892 		       port_tx->params.iface,
893 		       port_tx->params.iface_queue);
894 	}
895 
896 	printf("\n");
897 }
898 
899 static void
print_port_stats_separator(void)900 print_port_stats_separator(void)
901 {
902 	printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
903 	       "----",
904 	       "------------",
905 	       "-------------",
906 	       "------------",
907 	       "-------------");
908 }
909 
910 static void
print_port_stats_header(void)911 print_port_stats_header(void)
912 {
913 	print_port_stats_separator();
914 	printf("| %4s | %12s | %13s | %12s | %13s |\n",
915 	       "Port",
916 	       "RX packets",
917 	       "RX rate (pps)",
918 	       "TX packets",
919 	       "TX_rate (pps)");
920 	print_port_stats_separator();
921 }
922 
923 static void
print_port_stats_trailer(void)924 print_port_stats_trailer(void)
925 {
926 	print_port_stats_separator();
927 	printf("\n");
928 }
929 
930 static void
print_port_stats(int port_id,u64 ns_diff)931 print_port_stats(int port_id, u64 ns_diff)
932 {
933 	struct port *p = ports[port_id];
934 	double rx_pps, tx_pps;
935 
936 	rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
937 	tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
938 
939 	printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
940 	       port_id,
941 	       p->n_pkts_rx,
942 	       rx_pps,
943 	       p->n_pkts_tx,
944 	       tx_pps);
945 
946 	n_pkts_rx[port_id] = p->n_pkts_rx;
947 	n_pkts_tx[port_id] = p->n_pkts_tx;
948 }
949 
950 static void
print_port_stats_all(u64 ns_diff)951 print_port_stats_all(u64 ns_diff)
952 {
953 	int i;
954 
955 	print_port_stats_header();
956 	for (i = 0; i < n_ports; i++)
957 		print_port_stats(i, ns_diff);
958 	print_port_stats_trailer();
959 }
960 
961 static int quit;
962 
963 static void
signal_handler(int sig)964 signal_handler(int sig)
965 {
966 	quit = 1;
967 }
968 
remove_xdp_program(void)969 static void remove_xdp_program(void)
970 {
971 	int i;
972 
973 	for (i = 0 ; i < n_ports; i++)
974 		bpf_set_link_xdp_fd(if_nametoindex(port_params[i].iface), -1,
975 				    port_params[i].xsk_cfg.xdp_flags);
976 }
977 
main(int argc,char ** argv)978 int main(int argc, char **argv)
979 {
980 	struct timespec time;
981 	u64 ns0;
982 	int i;
983 
984 	/* Parse args. */
985 	memcpy(&bpool_params, &bpool_params_default,
986 	       sizeof(struct bpool_params));
987 	memcpy(&umem_cfg, &umem_cfg_default,
988 	       sizeof(struct xsk_umem_config));
989 	for (i = 0; i < MAX_PORTS; i++)
990 		memcpy(&port_params[i], &port_params_default,
991 		       sizeof(struct port_params));
992 
993 	if (parse_args(argc, argv)) {
994 		print_usage(argv[0]);
995 		return -1;
996 	}
997 
998 	/* Buffer pool initialization. */
999 	bp = bpool_init(&bpool_params, &umem_cfg);
1000 	if (!bp) {
1001 		printf("Buffer pool initialization failed.\n");
1002 		return -1;
1003 	}
1004 	printf("Buffer pool created successfully.\n");
1005 
1006 	/* Ports initialization. */
1007 	for (i = 0; i < MAX_PORTS; i++)
1008 		port_params[i].bp = bp;
1009 
1010 	for (i = 0; i < n_ports; i++) {
1011 		ports[i] = port_init(&port_params[i]);
1012 		if (!ports[i]) {
1013 			printf("Port %d initialization failed.\n", i);
1014 			return -1;
1015 		}
1016 		print_port(i);
1017 	}
1018 	printf("All ports created successfully.\n");
1019 
1020 	/* Threads. */
1021 	for (i = 0; i < n_threads; i++) {
1022 		struct thread_data *t = &thread_data[i];
1023 		u32 n_ports_per_thread = n_ports / n_threads, j;
1024 
1025 		for (j = 0; j < n_ports_per_thread; j++) {
1026 			t->ports_rx[j] = ports[i * n_ports_per_thread + j];
1027 			t->ports_tx[j] = ports[i * n_ports_per_thread +
1028 				(j + 1) % n_ports_per_thread];
1029 		}
1030 
1031 		t->n_ports_rx = n_ports_per_thread;
1032 
1033 		print_thread(i);
1034 	}
1035 
1036 	for (i = 0; i < n_threads; i++) {
1037 		int status;
1038 
1039 		status = pthread_create(&threads[i],
1040 					NULL,
1041 					thread_func,
1042 					&thread_data[i]);
1043 		if (status) {
1044 			printf("Thread %d creation failed.\n", i);
1045 			return -1;
1046 		}
1047 	}
1048 	printf("All threads created successfully.\n");
1049 
1050 	/* Print statistics. */
1051 	signal(SIGINT, signal_handler);
1052 	signal(SIGTERM, signal_handler);
1053 	signal(SIGABRT, signal_handler);
1054 
1055 	clock_gettime(CLOCK_MONOTONIC, &time);
1056 	ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
1057 	for ( ; !quit; ) {
1058 		u64 ns1, ns_diff;
1059 
1060 		sleep(1);
1061 		clock_gettime(CLOCK_MONOTONIC, &time);
1062 		ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
1063 		ns_diff = ns1 - ns0;
1064 		ns0 = ns1;
1065 
1066 		print_port_stats_all(ns_diff);
1067 	}
1068 
1069 	/* Threads completion. */
1070 	printf("Quit.\n");
1071 	for (i = 0; i < n_threads; i++)
1072 		thread_data[i].quit = 1;
1073 
1074 	for (i = 0; i < n_threads; i++)
1075 		pthread_join(threads[i], NULL);
1076 
1077 	for (i = 0; i < n_ports; i++)
1078 		port_free(ports[i]);
1079 
1080 	bpool_free(bp);
1081 
1082 	remove_xdp_program();
1083 
1084 	return 0;
1085 }
1086