1 /*
2 ** Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
3 ** Copyright (C) 2010-2013 Sourcefire, Inc.
4 ** Author: Michael R. Altizer <mialtize@cisco.com>
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU General Public License Version 2 as
8 ** published by the Free Software Foundation. You may not use, modify or
9 ** distribute this program under any other version of the GNU General
10 ** Public License.
11 **
12 ** This program is distributed in the hope that it will be useful,
13 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ** GNU General Public License for more details.
16 **
17 ** You should have received a copy of the GNU General Public License
18 ** along with this program; if not, write to the Free Software
19 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #define _GNU_SOURCE // For POLLRDHUP
27
28 #include <errno.h>
29 #include <limits.h>
30 #include <linux/if_ether.h>
31 #include <linux/if_packet.h>
32 #include <net/if.h>
33 #include <net/if_arp.h>
34 #include <netinet/in.h>
35 #include <poll.h>
36 #include <stdbool.h>
37 #include <stdint.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <sys/ioctl.h>
42 #include <sys/mman.h>
43 #include <sys/socket.h>
44 #include <unistd.h>
45
46 #ifdef LIBPCAP_AVAILABLE
47 #include <pcap.h>
48 #include <pthread.h>
49 #else
50 #include "daq_dlt.h"
51 #endif
52
53 #include "daq_module_api.h"
54
55 #define DAQ_AFPACKET_VERSION 7
56
57 #define AF_PACKET_DEFAULT_BUFFER_SIZE 128
58 #define AF_PACKET_MAX_INTERFACES 32
59
60 #define SET_ERROR(modinst, ...) daq_base_api.set_errbuf(modinst, __VA_ARGS__)
61
62 union thdr
63 {
64 struct tpacket2_hdr *h2;
65 uint8_t *raw;
66 };
67
68 typedef struct _af_packet_entry
69 {
70 struct _af_packet_entry *next;
71 union thdr hdr;
72 } AFPacketEntry;
73
74 typedef struct _af_packet_ring
75 {
76 struct tpacket_req layout;
77 unsigned int size;
78 void *start;
79 AFPacketEntry *entries;
80 AFPacketEntry *cursor;
81 } AFPacketRing;
82
83 typedef struct _af_packet_instance
84 {
85 struct _af_packet_instance *next;
86 int fd;
87 unsigned tp_version;
88 unsigned tp_hdrlen;
89 unsigned tp_reserve;
90 unsigned tp_frame_size;
91 unsigned actual_snaplen;
92 void *buffer;
93 AFPacketRing rx_ring;
94 AFPacketRing tx_ring;
95 char *name;
96 int index;
97 struct _af_packet_instance *peer;
98 int mtu;
99 bool active;
100 } AFPacketInstance;
101
102 typedef struct _af_packet_fanout_cfg
103 {
104 uint16_t fanout_flags;
105 uint16_t fanout_type;
106 bool enabled;
107 } AFPacketFanoutCfg;
108
109 typedef struct _af_packet_pkt_desc
110 {
111 DAQ_Msg_t msg;
112 DAQ_PktHdr_t pkthdr;
113 uint8_t *data;
114 AFPacketInstance *instance;
115 unsigned int length;
116 struct _af_packet_pkt_desc *next;
117 } AFPacketPktDesc;
118
119 typedef struct _af_packet_msg_pool
120 {
121 AFPacketPktDesc *pool;
122 AFPacketPktDesc *freelist;
123 DAQ_MsgPoolInfo_t info;
124 } AFPacketMsgPool;
125
126 typedef struct _afpacket_context
127 {
128 /* Configuration */
129 char *device;
130 char *filter;
131 int snaplen;
132 int timeout;
133 uint32_t ring_size;
134 AFPacketFanoutCfg fanout_cfg;
135 bool use_tx_ring;
136 bool debug;
137 /* State */
138 DAQ_ModuleInstance_h modinst;
139 AFPacketMsgPool pool;
140 AFPacketInstance *instances;
141 uint32_t intf_count;
142 #ifdef LIBPCAP_AVAILABLE
143 struct bpf_program fcode;
144 #endif
145 volatile bool interrupted;
146 DAQ_Stats_t stats;
147 /* Message receive state */
148 AFPacketInstance *curr_instance;
149 } AFPacket_Context_t;
150
151 /* VLAN defintions stolen from LibPCAP's vlan.h. */
152 struct vlan_tag {
153 u_int16_t vlan_tpid; /* ETH_P_8021Q */
154 u_int16_t vlan_tci; /* VLAN TCI */
155 };
156 #define VLAN_TAG_LEN 4
157
158 static DAQ_VariableDesc_t afpacket_variable_descriptions[] = {
159 { "buffer_size_mb", "Packet buffer space to allocate in megabytes", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
160 { "debug", "Enable debugging output to stdout", DAQ_VAR_DESC_FORBIDS_ARGUMENT },
161 { "fanout_type", "Fanout loadbalancing method", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
162 { "fanout_flag", "Fanout loadbalancing option", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
163 { "use_tx_ring", "Use memory-mapped TX ring", DAQ_VAR_DESC_FORBIDS_ARGUMENT },
164 };
165
166 static const int vlan_offset = 2 * ETH_ALEN;
167 static DAQ_BaseAPI_t daq_base_api;
168 #ifdef LIBPCAP_AVAILABLE
169 static pthread_mutex_t bpf_mutex = PTHREAD_MUTEX_INITIALIZER;
170 #endif
171
destroy_packet_pool(AFPacket_Context_t * afpc)172 static void destroy_packet_pool(AFPacket_Context_t *afpc)
173 {
174 AFPacketMsgPool *pool = &afpc->pool;
175 if (pool->pool)
176 {
177 while (pool->info.size > 0)
178 free(pool->pool[--pool->info.size].data);
179 free(pool->pool);
180 pool->pool = NULL;
181 }
182 pool->freelist = NULL;
183 pool->info.available = 0;
184 pool->info.mem_size = 0;
185 }
186
create_packet_pool(AFPacket_Context_t * afpc,unsigned size)187 static int create_packet_pool(AFPacket_Context_t *afpc, unsigned size)
188 {
189 AFPacketMsgPool *pool = &afpc->pool;
190 pool->pool = calloc(sizeof(AFPacketPktDesc), size);
191 if (!pool->pool)
192 {
193 SET_ERROR(afpc->modinst, "%s: Could not allocate %zu bytes for a packet descriptor pool!",
194 __func__, sizeof(AFPacketPktDesc) * size);
195 return DAQ_ERROR_NOMEM;
196 }
197 pool->info.mem_size = sizeof(AFPacketPktDesc) * size;
198 while (pool->info.size < size)
199 {
200 /* Allocate packet data and set up descriptor */
201 AFPacketPktDesc *desc = &pool->pool[pool->info.size];
202 desc->data = malloc(afpc->instances->actual_snaplen);
203 if (!desc->data)
204 {
205 SET_ERROR(afpc->modinst, "%s: Could not allocate %d bytes for a packet descriptor message buffer!",
206 __func__, afpc->instances->actual_snaplen);
207 return DAQ_ERROR_NOMEM;
208 }
209 pool->info.mem_size += afpc->instances->actual_snaplen;
210
211 /* Initialize non-zero invariant packet header fields. */
212 DAQ_PktHdr_t *pkthdr = &desc->pkthdr;
213 pkthdr->ingress_group = DAQ_PKTHDR_UNKNOWN;
214 pkthdr->egress_group = DAQ_PKTHDR_UNKNOWN;
215
216 /* Initialize non-zero invariant message header fields. */
217 DAQ_Msg_t *msg = &desc->msg;
218 msg->type = DAQ_MSG_TYPE_PACKET;
219 msg->hdr_len = sizeof(desc->pkthdr);
220 msg->hdr = &desc->pkthdr;
221 msg->data = desc->data;
222 msg->owner = afpc->modinst;
223 msg->priv = desc;
224
225 /* Place it on the free list */
226 desc->next = pool->freelist;
227 pool->freelist = desc;
228
229 pool->info.size++;
230 }
231 pool->info.available = pool->info.size;
232 return DAQ_SUCCESS;
233 }
234
bind_instance_interface(AFPacket_Context_t * afpc,AFPacketInstance * instance,int protocol)235 static int bind_instance_interface(AFPacket_Context_t *afpc, AFPacketInstance *instance, int protocol)
236 {
237 struct sockaddr_ll sll;
238 int err;
239 socklen_t errlen = sizeof(err);
240
241 /* Bind to the specified device so we only see packets from it. */
242 memset(&sll, 0, sizeof(struct sockaddr_ll));
243 sll.sll_family = AF_PACKET;
244 sll.sll_ifindex = instance->index;
245 sll.sll_protocol = htons(protocol);
246
247 if (bind(instance->fd, (struct sockaddr *) &sll, sizeof(sll)) == -1)
248 {
249 SET_ERROR(afpc->modinst, "%s: bind(%s): %s\n", __func__, instance->name, strerror(errno));
250 return DAQ_ERROR;
251 }
252
253 /* Any pending errors, e.g., network is down? */
254 if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) || err)
255 {
256 SET_ERROR(afpc->modinst, "%s: getsockopt: %s", __func__, err ? strerror(err) : strerror(errno));
257 return DAQ_ERROR;
258 }
259
260 return DAQ_SUCCESS;
261 }
262
set_up_ring(AFPacket_Context_t * afpc,AFPacketInstance * instance,AFPacketRing * ring)263 static int set_up_ring(AFPacket_Context_t *afpc, AFPacketInstance *instance, AFPacketRing *ring)
264 {
265 unsigned int idx, block, frame, frame_offset;
266
267 /* Allocate a ring to hold packet pointers. */
268 ring->entries = calloc(ring->layout.tp_frame_nr, sizeof(AFPacketEntry));
269 if (!ring->entries)
270 {
271 SET_ERROR(afpc->modinst, "%s: Could not allocate ring buffer entries for device %s!", __func__, instance->name);
272 return DAQ_ERROR_NOMEM;
273 }
274
275 /* Set up the buffer entry pointers in the ring. */
276 idx = 0;
277 for (block = 0; block < ring->layout.tp_block_nr; block++)
278 {
279 unsigned int block_offset = block * ring->layout.tp_block_size;
280 for (frame = 0; frame < (ring->layout.tp_block_size / ring->layout.tp_frame_size) && idx < ring->layout.tp_frame_nr; frame++)
281 {
282 frame_offset = frame * ring->layout.tp_frame_size;
283 ring->entries[idx].hdr.raw = (uint8_t *) ring->start + block_offset + frame_offset;
284 ring->entries[idx].next = &ring->entries[idx + 1];
285 idx++;
286 }
287 }
288 /* Make this a circular buffer ... a RING if you will! */
289 ring->entries[ring->layout.tp_frame_nr - 1].next = &ring->entries[0];
290 /* Initialize our entry point into the ring as the first buffer entry. */
291 ring->cursor = &ring->entries[0];
292
293 return DAQ_SUCCESS;
294 }
295
destroy_instance(AFPacketInstance * instance)296 static void destroy_instance(AFPacketInstance *instance)
297 {
298 if (instance)
299 {
300 if (instance->fd != -1)
301 {
302 /* Destroy the userspace RX ring. */
303 if (instance->rx_ring.entries)
304 {
305 free(instance->rx_ring.entries);
306 instance->rx_ring.entries = NULL;
307 }
308 /* Destroy the userspace TX ring. */
309 if (instance->tx_ring.entries)
310 {
311 free(instance->tx_ring.entries);
312 instance->tx_ring.entries = NULL;
313 }
314 /* Unmap the kernel packet ring. */
315 if (instance->buffer != MAP_FAILED)
316 {
317 unsigned int ringsize = instance->rx_ring.size + instance->tx_ring.size;
318 munmap(instance->buffer, ringsize);
319 instance->buffer = MAP_FAILED;
320 }
321 /* Tell the kernel to destroy the rings. */
322 struct tpacket_req req;
323 memset(&req, 0, sizeof(req));
324 setsockopt(instance->fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req));
325 if (instance->tx_ring.size)
326 setsockopt(instance->fd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req));
327 close(instance->fd);
328 }
329 if (instance->name)
330 {
331 free(instance->name);
332 instance->name = NULL;
333 }
334 free(instance);
335 }
336 }
337
iface_get_arptype(AFPacketInstance * instance)338 static int iface_get_arptype(AFPacketInstance *instance)
339 {
340 struct ifreq ifr;
341
342 snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", instance->name);
343
344 if (ioctl(instance->fd, SIOCGIFHWADDR, &ifr) == -1)
345 {
346 if (errno == ENODEV)
347 return DAQ_ERROR_NODEV;
348 return DAQ_ERROR;
349 }
350
351 return ifr.ifr_hwaddr.sa_family;
352 }
353
create_instance(AFPacket_Context_t * afpc,const char * device)354 static AFPacketInstance *create_instance(AFPacket_Context_t *afpc, const char *device)
355 {
356 AFPacketInstance *instance = NULL;
357 struct ifreq ifr;
358 socklen_t len;
359 int val;
360
361 instance = calloc(1, sizeof(AFPacketInstance));
362 if (!instance)
363 {
364 SET_ERROR(afpc->modinst, "%s: Could not allocate a new instance structure.", __func__);
365 goto err;
366 }
367 instance->buffer = MAP_FAILED;
368
369 if ((instance->name = strdup(device)) == NULL)
370 {
371 SET_ERROR(afpc->modinst, "%s: Could not allocate a copy of the device name.", __func__);
372 goto err;;
373 }
374
375 /* Open the PF_PACKET raw socket to receive all network traffic completely unmodified.
376 We use 0 for the protocol so that the packet pseudo-interface will not go into a running
377 state until we bind() it to an interface later with a real protocol. */
378 instance->fd = socket(PF_PACKET, SOCK_RAW, 0);
379 if (instance->fd == -1)
380 {
381 SET_ERROR(afpc->modinst, "%s: Could not open the PF_PACKET socket: %s", __func__, strerror(errno));
382 goto err;
383 }
384
385 /* Find the device index of the specified interface. */
386 memset(&ifr, 0, sizeof(ifr));
387 strncpy(ifr.ifr_name, device, sizeof(ifr.ifr_name));
388 if (ioctl(instance->fd, SIOCGIFINDEX, &ifr) == -1)
389 {
390 SET_ERROR(afpc->modinst, "%s: Could not find index for device %s", __func__, instance->name);
391 goto err;
392 }
393 instance->index = ifr.ifr_ifindex;
394
395 /* Probe whether the kernel supports TPACKET_V2 */
396 val = TPACKET_V2;
397 len = sizeof(val);
398 if (getsockopt(instance->fd, SOL_PACKET, PACKET_HDRLEN, &val, &len) < 0)
399 {
400 SET_ERROR(afpc->modinst, "Couldn't retrieve TPACKET_V2 header length: %s", strerror(errno));
401 goto err;
402 }
403 instance->tp_hdrlen = val;
404
405 /* Tell the kernel to use TPACKET_V2 */
406 val = TPACKET_V2;
407 if (setsockopt(instance->fd, SOL_PACKET, PACKET_VERSION, &val, sizeof(val)) < 0)
408 {
409 SET_ERROR(afpc->modinst, "Couldn't activate TPACKET_V2 on packet socket: %s", strerror(errno));
410 goto err;
411 }
412 instance->tp_version = TPACKET_V2;
413
414 /* Reserve space for VLAN tag reconstruction */
415 val = VLAN_TAG_LEN;
416 if (setsockopt(instance->fd, SOL_PACKET, PACKET_RESERVE, &val, sizeof(val)) < 0)
417 {
418 SET_ERROR(afpc->modinst, "Couldn't set up a %d-byte reservation packet socket: %s", val, strerror(errno));
419 goto err;
420 }
421
422 /* Bypass the kernel's qdisc layer when transmitting */
423 val = 1;
424 if (setsockopt(instance->fd, SOL_PACKET, PACKET_QDISC_BYPASS, &val, sizeof(val)) < 0)
425 {
426 SET_ERROR(afpc->modinst, "Couldn't configure bypassing qdisc on TX: %s", strerror(errno));
427 goto err;
428 }
429
430 /* Don't block on malformed frames in the TX ring */
431 val = 1;
432 if (setsockopt(instance->fd, SOL_PACKET, PACKET_LOSS, &val, sizeof(val)) < 0)
433 {
434 SET_ERROR(afpc->modinst, "Couldn't configure dropping malformed TX packets: %s", strerror(errno));
435 goto err;
436 }
437
438 /* Get the interface MTU */
439 memset (&ifr, 0, sizeof(ifr));
440 snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", instance->name);
441 if (ioctl(instance->fd, SIOCGIFMTU, &ifr) == -1)
442 {
443 SET_ERROR(afpc->modinst, "%s: Could not query MTU for '%s': %s (%d)", __func__,
444 instance->name, strerror(errno), errno);
445 goto err;
446 }
447 instance->mtu = ifr.ifr_mtu;
448
449 /* Get the current reservation. Hopefully it's what we set it to earlier for VLAN reconstruction. */
450 if (getsockopt(instance->fd, SOL_PACKET, PACKET_RESERVE, &val, &len) == -1)
451 {
452 SET_ERROR(afpc->modinst, "%s: Could not query packet reserved space for '%s': %s (%d)",
453 __func__, instance->name, strerror(errno), errno);
454 goto err;
455 }
456 instance->tp_reserve = val;
457
458 /* Bind the socket to the interface with protocol 0 to associate it with the interface while not putting it
459 into the running state yet. */
460 if (bind_instance_interface(afpc, instance, 0) != 0)
461 goto err;
462
463 /* Verify that the link-layer type is ethernet as that's all we're supporting. */
464 int arptype = iface_get_arptype(instance);
465 if (arptype < 0)
466 {
467 SET_ERROR(afpc->modinst, "%s: failed to get interface type for device %s: (%d) %s",
468 __func__, instance->name, errno, strerror(errno));
469 goto err;
470 }
471
472 /* Normal loopback traffic presents itself as ethernet traffic with zeroed out MAC addresses,
473 and injected traffic shows (ala tcpreplay) shows up with populated ethernet headers. Either
474 way, we can just treat it like normal ethernet traffic and handle it. */
475 if (arptype != ARPHRD_ETHER && arptype != ARPHRD_LOOPBACK)
476 {
477 SET_ERROR(afpc->modinst, "%s: invalid interface type for device %s: %d",
478 __func__, instance->name, arptype);
479 goto err;
480 }
481
482 /* Turn on promiscuous mode for the device. */
483 struct packet_mreq mr;
484 memset(&mr, 0, sizeof(mr));
485 mr.mr_ifindex = instance->index;
486 mr.mr_type = PACKET_MR_PROMISC;
487 if (setsockopt(instance->fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, &mr, sizeof(mr)) == -1)
488 {
489 SET_ERROR(afpc->modinst, "%s: setsockopt: %s", __func__, strerror(errno));
490 goto err;
491 }
492
493 if (afpc->debug)
494 {
495 printf("[%s]\n", instance->name);
496 printf(" TPacket Version: %u\n", instance->tp_version);
497 printf(" TPacket Header Length: %u\n", instance->tp_hdrlen);
498 printf(" MTU: %d\n", instance->mtu);
499 printf(" Reservation: %u\n", instance->tp_reserve);
500 }
501
502 return instance;
503
504 err:
505 destroy_instance(instance);
506 return NULL;
507 }
508
calculate_frame_size(AFPacket_Context_t * afpc,AFPacketInstance * instance)509 static void calculate_frame_size(AFPacket_Context_t *afpc, AFPacketInstance *instance)
510 {
511 /* Calculate the TPACKET frame size to use.
512 From packet_mmap.txt in the Linux kernel documentation:
513
514 Frame structure:
515 - Start. Frame must be aligned to TPACKET_ALIGNMENT=16
516 - struct tpacket_hdr
517 - pad to TPACKET_ALIGNMENT=16
518 - struct sockaddr_ll
519 - Gap, chosen so that packet data (Start+tp_net) aligns to
520 TPACKET_ALIGNMENT=16
521 - Start+tp_mac: [ Optional MAC header ]
522 - Start+tp_net: Packet data, aligned to TPACKET_ALIGNMENT=16.
523 - Pad to align to TPACKET_ALIGNMENT=16
524
525 The space we reserve for VLAN reconstruction sits before the MAC header.
526 I'm aware the ETH_HLEN should always be less than 16, but I just want this logic
527 to as closely match that in the kernel as possible.
528 */
529 unsigned tp_hdrlen_sll = TPACKET_ALIGN(instance->tp_hdrlen) + sizeof(struct sockaddr_ll);
530 unsigned netoff = TPACKET_ALIGN(tp_hdrlen_sll + (ETH_HLEN < 16 ? 16 : ETH_HLEN)) + instance->tp_reserve;
531 unsigned macoff = netoff - ETH_HLEN;
532
533 instance->tp_frame_size = TPACKET_ALIGN(macoff + afpc->snaplen);
534 instance->actual_snaplen = instance->tp_frame_size - macoff;
535 }
536
calculate_layout(AFPacket_Context_t * afpc,AFPacketInstance * instance,struct tpacket_req * layout,int order)537 static int calculate_layout(AFPacket_Context_t *afpc, AFPacketInstance *instance, struct tpacket_req *layout, int order)
538 {
539 unsigned int frames_per_block;
540
541 /* Use the pre-calculated frame size. */
542 layout->tp_frame_size = instance->tp_frame_size;
543
544 /* Calculate the minimum block size required. */
545 layout->tp_block_size = getpagesize() << order;
546 while (layout->tp_block_size < layout->tp_frame_size)
547 layout->tp_block_size <<= 1;
548 frames_per_block = layout->tp_block_size / layout->tp_frame_size;
549 if (frames_per_block == 0)
550 {
551 SET_ERROR(afpc->modinst, "%s: Invalid frames per block (%u/%u) for %s",
552 __func__, layout->tp_block_size, layout->tp_frame_size, afpc->device);
553 return DAQ_ERROR;
554 }
555
556 /* Find the total number of frames required to amount to the requested per-interface memory.
557 Then find the number of blocks required to hold those packet buffer frames. */
558 layout->tp_frame_nr = afpc->ring_size / layout->tp_frame_size;
559 layout->tp_block_nr = layout->tp_frame_nr / frames_per_block;
560 /* afpc->layout.tp_frame_nr is requested to match frames_per_block * n_blocks */
561 layout->tp_frame_nr = layout->tp_block_nr * frames_per_block;
562 if (afpc->debug)
563 {
564 printf("AFPacket Layout:\n");
565 printf(" Frame Size: %u\n", layout->tp_frame_size);
566 printf(" Frames: %u\n", layout->tp_frame_nr);
567 printf(" Block Size: %u (Order %d)\n", layout->tp_block_size, order);
568 printf(" Blocks: %u\n", layout->tp_block_nr);
569 printf(" Wasted: %u\n", layout->tp_block_nr * (layout->tp_block_size % layout->tp_frame_size));
570 }
571
572 return DAQ_SUCCESS;
573 }
574
575 #define DEFAULT_ORDER 5
create_ring(AFPacket_Context_t * afpc,AFPacketInstance * instance,AFPacketRing * ring,int optname)576 static int create_ring(AFPacket_Context_t *afpc, AFPacketInstance *instance, AFPacketRing *ring, int optname)
577 {
578 /* Starting with page allocations of order 5, try to allocate an RX ring in the kernel. */
579 for (int order = DEFAULT_ORDER; order >= 0; order--)
580 {
581 if (calculate_layout(afpc, instance, &ring->layout, order))
582 return DAQ_ERROR;
583
584 /* Ask the kernel to create the ring. */
585 int rc = setsockopt(instance->fd, SOL_PACKET, optname, (void*) &ring->layout, sizeof(struct tpacket_req));
586 if (rc)
587 {
588 if (errno == ENOMEM)
589 {
590 if (afpc->debug)
591 printf("%s: Allocation of kernel packet ring failed with order %d, retrying...\n", instance->name, order);
592 continue;
593 }
594 SET_ERROR(afpc->modinst, "%s: Couldn't create kernel ring on packet socket: %s",
595 __func__, strerror(errno));
596 return DAQ_ERROR;
597 }
598 /* Store the total ring size for later. */
599 ring->size = ring->layout.tp_block_size * ring->layout.tp_block_nr;
600 if (afpc->debug)
601 printf("Created a ring of type %d with total size of %u\n", optname, ring->size);
602 return DAQ_SUCCESS;
603 }
604
605 /* If we got here, it means we failed allocation on order 0. */
606 SET_ERROR(afpc->modinst, "%s: Couldn't allocate enough memory for the kernel packet ring!", instance->name);
607 return DAQ_ERROR;
608 }
609
mmap_rings(AFPacket_Context_t * afpc,AFPacketInstance * instance)610 static int mmap_rings(AFPacket_Context_t *afpc, AFPacketInstance *instance)
611 {
612 unsigned int ringsize;
613
614 /* Map the ring into userspace. */
615 ringsize = instance->rx_ring.size + instance->tx_ring.size;
616 instance->buffer = mmap(0, ringsize, PROT_READ | PROT_WRITE, MAP_SHARED, instance->fd, 0);
617 if (instance->buffer == MAP_FAILED)
618 {
619 SET_ERROR(afpc->modinst, "%s: Could not MMAP the ring: %s", __func__, strerror(errno));
620 return DAQ_ERROR;
621 }
622 instance->rx_ring.start = instance->buffer;
623 if (instance->tx_ring.size)
624 instance->tx_ring.start = (uint8_t *) instance->buffer + instance->rx_ring.size;
625
626 return DAQ_SUCCESS;
627 }
628
create_instance_rings(AFPacket_Context_t * afpc,AFPacketInstance * instance)629 static int create_instance_rings(AFPacket_Context_t *afpc, AFPacketInstance *instance)
630 {
631 /* Calculate the frame size to request from the kernel. */
632 calculate_frame_size(afpc, instance);
633
634 /* Request the kernel RX ring from af_packet... */
635 if (create_ring(afpc, instance, &instance->rx_ring, PACKET_RX_RING) != DAQ_SUCCESS)
636 return DAQ_ERROR;
637 /* ...request the kernel TX ring from af_packet if we're in inline mode... */
638 if (instance->peer && afpc->use_tx_ring)
639 {
640 if (create_ring(afpc, instance, &instance->tx_ring, PACKET_TX_RING) != DAQ_SUCCESS)
641 return DAQ_ERROR;
642 }
643 /* ...map the memory for the kernel ring(s) into userspace... */
644 if (mmap_rings(afpc, instance) != DAQ_SUCCESS)
645 return DAQ_ERROR;
646 /* ...and, finally, set up a userspace ring buffer to represent the kernel RX ring... */
647 if (set_up_ring(afpc, instance, &instance->rx_ring) != DAQ_SUCCESS)
648 return DAQ_ERROR;
649 /* ...as well as one for the TX ring if we're in inline mode... */
650 if (instance->peer && afpc->use_tx_ring)
651 {
652 if (set_up_ring(afpc, instance, &instance->tx_ring) != DAQ_SUCCESS)
653 return DAQ_ERROR;
654 }
655
656 return DAQ_SUCCESS;
657 }
658
configure_fanout(AFPacket_Context_t * afpc,AFPacketInstance * instance)659 static int configure_fanout(AFPacket_Context_t *afpc, AFPacketInstance *instance)
660 {
661 int fanout_arg;
662
663 fanout_arg = ((afpc->fanout_cfg.fanout_type | afpc->fanout_cfg.fanout_flags)) << 16 | instance->index;
664 if (setsockopt(instance->fd, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg)) == -1)
665 {
666 SET_ERROR(afpc->modinst, "%s: Could not configure packet fanout: %s", __func__, strerror(errno));
667 return DAQ_ERROR;
668 }
669
670 return DAQ_SUCCESS;
671 }
672
start_instance(AFPacket_Context_t * afpc,AFPacketInstance * instance)673 static int start_instance(AFPacket_Context_t *afpc, AFPacketInstance *instance)
674 {
675 /* Bind the RX ring to this interface. */
676 if (bind_instance_interface(afpc, instance, ETH_P_ALL) != 0)
677 return -1;
678
679 /* Configure packet fanout if requested. This must happen after the final binding. */
680 if (afpc->fanout_cfg.enabled && configure_fanout(afpc, instance) != DAQ_SUCCESS)
681 return -1;
682
683 instance->active = true;
684
685 return 0;
686 }
687
update_hw_stats(AFPacket_Context_t * afpc)688 static void update_hw_stats(AFPacket_Context_t *afpc)
689 {
690 AFPacketInstance *instance;
691 struct tpacket_stats kstats;
692 socklen_t len = sizeof (struct tpacket_stats);
693
694 for (instance = afpc->instances; instance; instance = instance->next)
695 {
696 if (!instance->active)
697 continue;
698 memset(&kstats, 0, len);
699 if (getsockopt(instance->fd, SOL_PACKET, PACKET_STATISTICS, &kstats, &len) > -1)
700 {
701 /* tp_packets is a superset of tp_drops as it is incremented prior to the processing
702 that determines the copy will be dropped/not made. */
703 afpc->stats.hw_packets_received += kstats.tp_packets - kstats.tp_drops;
704 afpc->stats.hw_packets_dropped += kstats.tp_drops;
705 }
706 else
707 fprintf(stderr, "Failed to get stats for %s: %d %s\n", instance->name, errno, strerror(errno));
708 }
709 }
710
af_packet_close(AFPacket_Context_t * afpc)711 static int af_packet_close(AFPacket_Context_t *afpc)
712 {
713 AFPacketInstance *instance;
714
715 if (!afpc)
716 return -1;
717
718 /* Cache the latest hardware stats before stopping. */
719 update_hw_stats(afpc);
720
721 while ((instance = afpc->instances) != NULL)
722 {
723 afpc->instances = instance->next;
724 destroy_instance(instance);
725 }
726
727 #ifdef LIBPCAP_AVAILABLE
728 pcap_freecode(&afpc->fcode);
729 #endif
730
731 return 0;
732 }
733
create_bridge(AFPacket_Context_t * afpc,const char * device_name1,const char * device_name2)734 static int create_bridge(AFPacket_Context_t *afpc, const char *device_name1, const char *device_name2)
735 {
736 AFPacketInstance *instance, *peer1, *peer2;
737
738 peer1 = peer2 = NULL;
739 for (instance = afpc->instances; instance; instance = instance->next)
740 {
741 if (!strcmp(instance->name, device_name1))
742 peer1 = instance;
743 else if (!strcmp(instance->name, device_name2))
744 peer2 = instance;
745 }
746
747 if (!peer1 || !peer2)
748 return DAQ_ERROR_NODEV;
749
750 peer1->peer = peer2;
751 peer2->peer = peer1;
752
753 return DAQ_SUCCESS;
754 }
755
reset_stats(AFPacket_Context_t * afpc)756 static void reset_stats(AFPacket_Context_t *afpc)
757 {
758 AFPacketInstance *instance;
759 struct tpacket_stats kstats;
760 socklen_t len = sizeof (struct tpacket_stats);
761
762 memset(&afpc->stats, 0, sizeof(DAQ_Stats_t));
763 /* Just call PACKET_STATISTICS to clear each instance's stats. */
764 for (instance = afpc->instances; instance; instance = instance->next)
765 getsockopt(instance->fd, SOL_PACKET, PACKET_STATISTICS, &kstats, &len);
766 }
767
afpacket_daq_module_load(const DAQ_BaseAPI_t * base_api)768 static int afpacket_daq_module_load(const DAQ_BaseAPI_t *base_api)
769 {
770 if (base_api->api_version != DAQ_BASE_API_VERSION || base_api->api_size != sizeof(DAQ_BaseAPI_t))
771 return DAQ_ERROR;
772
773 daq_base_api = *base_api;
774
775 return DAQ_SUCCESS;
776 }
777
afpacket_daq_module_unload(void)778 static int afpacket_daq_module_unload(void)
779 {
780 memset(&daq_base_api, 0, sizeof(daq_base_api));
781 return DAQ_SUCCESS;
782 }
783
afpacket_daq_get_variable_descs(const DAQ_VariableDesc_t ** var_desc_table)784 static int afpacket_daq_get_variable_descs(const DAQ_VariableDesc_t **var_desc_table)
785 {
786 *var_desc_table = afpacket_variable_descriptions;
787
788 return sizeof(afpacket_variable_descriptions) / sizeof(DAQ_VariableDesc_t);
789 }
790
afpacket_daq_instantiate(const DAQ_ModuleConfig_h modcfg,DAQ_ModuleInstance_h modinst,void ** ctxt_ptr)791 static int afpacket_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInstance_h modinst, void **ctxt_ptr)
792 {
793 AFPacket_Context_t *afpc;
794 AFPacketInstance *afi;
795 const char *size_str = NULL;
796 char *name1, *name2, *dev;
797 char intf[IFNAMSIZ];
798 size_t len;
799 int num_intfs = 0;
800 int rval = DAQ_ERROR;
801
802 afpc = calloc(1, sizeof(AFPacket_Context_t));
803 if (!afpc)
804 {
805 SET_ERROR(modinst, "%s: Couldn't allocate memory for the new AFPacket context!", __func__);
806 rval = DAQ_ERROR_NOMEM;
807 goto err;
808 }
809 afpc->modinst = modinst;
810
811 afpc->device = strdup(daq_base_api.config_get_input(modcfg));
812 if (!afpc->device)
813 {
814 SET_ERROR(modinst, "%s: Couldn't allocate memory for the device string!", __func__);
815 rval = DAQ_ERROR_NOMEM;
816 goto err;
817 }
818
819 afpc->snaplen = daq_base_api.config_get_snaplen(modcfg);
820 afpc->timeout = (int) daq_base_api.config_get_timeout(modcfg);
821 if (afpc->timeout == 0)
822 afpc->timeout = -1;
823
824 dev = afpc->device;
825 if (*dev == ':' || ((len = strlen(dev)) > 0 && *(dev + len - 1) == ':') ||
826 (daq_base_api.config_get_mode(modcfg) == DAQ_MODE_PASSIVE && strstr(dev, "::")))
827 {
828 SET_ERROR(modinst, "%s: Invalid interface specification: '%s'!", __func__, afpc->device);
829 goto err;
830 }
831
832 const char *varKey, *varValue;
833 daq_base_api.config_first_variable(modcfg, &varKey, &varValue);
834 while (varKey)
835 {
836 if (!strcmp(varKey, "buffer_size_mb"))
837 size_str = varValue;
838 else if (!strcmp(varKey, "debug"))
839 afpc->debug = true;
840 else if (!strcmp(varKey, "fanout_type"))
841 {
842 if (!varValue)
843 {
844 SET_ERROR(modinst, "%s: %s requires an argument!", __func__, varKey);
845 goto err;
846 }
847 /* Using anything other than 'hash' is probably asking for trouble, but
848 I'll never stop you from shooting yourself in the foot. */
849 if (!strcmp(varValue, "hash"))
850 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_HASH;
851 else if (!strcmp(varValue, "lb"))
852 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_LB;
853 else if (!strcmp(varValue, "cpu"))
854 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_CPU;
855 else if (!strcmp(varValue, "rollover"))
856 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_ROLLOVER;
857 else if (!strcmp(varValue, "rnd"))
858 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_RND;
859 else if (!strcmp(varValue, "qm"))
860 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_QM;
861 else
862 {
863 SET_ERROR(modinst, "%s: Unrecognized argument for %s: '%s'!", __func__, varKey, varValue);
864 goto err;
865 }
866 afpc->fanout_cfg.enabled = true;
867 }
868 else if (!strcmp(varKey, "fanout_flag"))
869 {
870 if (!varValue)
871 {
872 SET_ERROR(modinst, "%s: %s requires an argument!", __func__, varKey);
873 goto err;
874 }
875 if (!strcmp(varValue, "rollover"))
876 afpc->fanout_cfg.fanout_flags |= PACKET_FANOUT_FLAG_ROLLOVER;
877 else if (!strcmp(varValue, "defrag"))
878 afpc->fanout_cfg.fanout_flags |= PACKET_FANOUT_FLAG_DEFRAG;
879 else
880 {
881 SET_ERROR(modinst, "%s: Unrecognized argument for %s: '%s'!", __func__, varKey, varValue);
882 goto err;
883 }
884 }
885 else if (!strcmp(varKey, "use_tx_ring"))
886 afpc->use_tx_ring = true;
887
888 daq_base_api.config_next_variable(modcfg, &varKey, &varValue);
889 }
890
891 uint32_t size;
892 if (size_str && strcmp("max", size_str) != 0)
893 size = strtoul(size_str, NULL, 10);
894 else
895 size = AF_PACKET_DEFAULT_BUFFER_SIZE;
896 /* The size is specified in megabytes. Convert it to bytes. */
897 size = size * 1024 * 1024;
898
899 while (*dev != '\0')
900 {
901 len = strcspn(dev, ":");
902 if (len >= IFNAMSIZ)
903 {
904 SET_ERROR(modinst, "%s: Interface name too long! (%zu)", __func__, len);
905 goto err;
906 }
907 if (len != 0)
908 {
909 afpc->intf_count++;
910 if (afpc->intf_count >= AF_PACKET_MAX_INTERFACES)
911 {
912 SET_ERROR(modinst, "%s: Using more than %d interfaces is not supported!", __func__, AF_PACKET_MAX_INTERFACES);
913 goto err;
914 }
915 snprintf(intf, len + 1, "%s", dev);
916 afi = create_instance(afpc, intf);
917 if (!afi)
918 goto err;
919
920 afi->next = afpc->instances;
921 afpc->instances = afi;
922 num_intfs++;
923 if (daq_base_api.config_get_mode(modcfg) != DAQ_MODE_PASSIVE)
924 {
925 if (num_intfs == 2)
926 {
927 name1 = afpc->instances->next->name;
928 name2 = afpc->instances->name;
929
930 if (create_bridge(afpc, name1, name2) != DAQ_SUCCESS)
931 {
932 SET_ERROR(modinst, "%s: Couldn't create the bridge between %s and %s!", __func__, name1, name2);
933 goto err;
934 }
935 num_intfs = 0;
936 }
937 else if (num_intfs > 2)
938 break;
939 }
940 }
941 else
942 len = 1;
943 dev += len;
944 }
945
946 /* If there are any leftover unbridged interfaces and we're not in Passive mode, error out. */
947 if (!afpc->instances || (daq_base_api.config_get_mode(modcfg) != DAQ_MODE_PASSIVE && num_intfs != 0))
948 {
949 SET_ERROR(modinst, "%s: Invalid interface specification: '%s'!", __func__, afpc->device);
950 goto err;
951 }
952
953 /*
954 * Determine the dimensions of the kernel RX/TX ring(s) to request.
955 * Divide the total packet buffer memory evenly across the number of rings.
956 * (One per passive interface, two per inline.)
957 */
958 unsigned num_rings = 0;
959 for (afi = afpc->instances; afi; afi = afi->next)
960 num_rings += (afi->peer && afpc->use_tx_ring) ? 2 : 1;
961 afpc->ring_size = size / num_rings;
962
963 /* Create the RX (and potentially TX) rings and map them into userspace. */
964 for (afi = afpc->instances; afi; afi = afi->next)
965 {
966 if ((rval = create_instance_rings(afpc, afi)) != DAQ_SUCCESS)
967 goto err;
968 }
969
970 /* Finally, create the message buffer pool. */
971 uint32_t pool_size = daq_base_api.config_get_msg_pool_size(modcfg);
972 if (pool_size == 0)
973 {
974 /* Default the pool size to 10% of the allocated RX frames. */
975 for (afi = afpc->instances; afi; afi = afi->next)
976 pool_size += afi->rx_ring.layout.tp_frame_nr;
977 pool_size /= 10;
978 }
979 if ((rval = create_packet_pool(afpc, pool_size)) != DAQ_SUCCESS)
980 goto err;
981
982 afpc->curr_instance = afpc->instances;
983
984 *ctxt_ptr = afpc;
985
986 return DAQ_SUCCESS;
987
988 err:
989 if (afpc)
990 {
991 af_packet_close(afpc);
992 if (afpc->device)
993 free(afpc->device);
994 destroy_packet_pool(afpc);
995 free(afpc);
996 }
997 return rval;
998 }
999
afpacket_daq_destroy(void * handle)1000 static void afpacket_daq_destroy(void *handle)
1001 {
1002 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1003
1004 af_packet_close(afpc);
1005 if (afpc->device)
1006 free(afpc->device);
1007 if (afpc->filter)
1008 free(afpc->filter);
1009 destroy_packet_pool(afpc);
1010 free(afpc);
1011 }
1012
afpacket_daq_set_filter(void * handle,const char * filter)1013 static int afpacket_daq_set_filter(void *handle, const char *filter)
1014 {
1015 #ifdef LIBPCAP_AVAILABLE
1016 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1017 struct bpf_program fcode;
1018
1019 if (afpc->filter)
1020 free(afpc->filter);
1021
1022 afpc->filter = strdup(filter);
1023 if (!afpc->filter)
1024 {
1025 SET_ERROR(afpc->modinst, "%s: Couldn't allocate memory for the filter string!", __func__);
1026 return DAQ_ERROR;
1027 }
1028
1029 pthread_mutex_lock(&bpf_mutex);
1030 if (pcap_compile_nopcap(afpc->snaplen, DLT_EN10MB, &fcode, afpc->filter, 1, PCAP_NETMASK_UNKNOWN) == -1)
1031 {
1032 pthread_mutex_unlock(&bpf_mutex);
1033 SET_ERROR(afpc->modinst, "%s: BPF state machine compilation failed!", __func__);
1034 return DAQ_ERROR;
1035 }
1036 pthread_mutex_unlock(&bpf_mutex);
1037
1038 pcap_freecode(&afpc->fcode);
1039 afpc->fcode.bf_len = fcode.bf_len;
1040 afpc->fcode.bf_insns = fcode.bf_insns;
1041
1042 return DAQ_SUCCESS;
1043 #else
1044 return DAQ_ERROR_NOTSUP;
1045 #endif
1046 }
1047
afpacket_daq_start(void * handle)1048 static int afpacket_daq_start(void *handle)
1049 {
1050 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1051 AFPacketInstance *instance;
1052
1053 for (instance = afpc->instances; instance; instance = instance->next)
1054 {
1055 if (start_instance(afpc, instance) != 0)
1056 return DAQ_ERROR;
1057 }
1058
1059 reset_stats(afpc);
1060
1061 return DAQ_SUCCESS;
1062 }
1063
afpacket_transmit_packet(AFPacketInstance * egress,const uint8_t * packet_data,unsigned int len)1064 static inline int afpacket_transmit_packet(AFPacketInstance *egress, const uint8_t *packet_data, unsigned int len)
1065 {
1066 if (egress)
1067 {
1068 if (egress->tx_ring.size)
1069 {
1070 AFPacketEntry *entry;
1071
1072 entry = egress->tx_ring.cursor;
1073 if (entry->hdr.h2->tp_status != TP_STATUS_AVAILABLE)
1074 {
1075 /* FIXME: This should probably wait for a TX slot to free up via poll(). */
1076 return DAQ_ERROR_AGAIN;
1077 }
1078 memcpy(entry->hdr.raw + TPACKET_ALIGN(egress->tp_hdrlen), packet_data, len);
1079 entry->hdr.h2->tp_len = len;
1080 entry->hdr.h2->tp_status = TP_STATUS_SEND_REQUEST;
1081 /* FIXME: This should call sendto() with MSG_DONTWAIT and handle no-buffer conditions gracefully.
1082 Performance without MSG_DONTWAIT is apparently pretty miserable. */
1083 if (send(egress->fd, NULL, 0, 0) < 0)
1084 return DAQ_ERROR;
1085 egress->tx_ring.cursor = entry->next;
1086 }
1087 else
1088 {
1089 while (send(egress->fd, packet_data, len, 0) < 0)
1090 {
1091 if (errno == ENOBUFS)
1092 {
1093 struct pollfd pfd;
1094 pfd.fd = egress->fd;
1095 pfd.revents = 0;
1096 pfd.events = POLLOUT;
1097 if (poll(&pfd, 1, 10) > 0 && (pfd.revents & POLLOUT))
1098 continue;
1099 }
1100 return DAQ_ERROR;
1101 }
1102 }
1103 }
1104
1105 return DAQ_SUCCESS;
1106 }
1107
afpacket_inject_packet(AFPacket_Context_t * afpc,AFPacketInstance * egress,const uint8_t * data,uint32_t data_len)1108 static int afpacket_inject_packet(AFPacket_Context_t *afpc, AFPacketInstance *egress, const uint8_t *data, uint32_t data_len)
1109 {
1110 if (!egress)
1111 {
1112 SET_ERROR(afpc->modinst, "%s: Could not determine which instance to inject the packet out of!", __func__);
1113 return DAQ_ERROR;
1114 }
1115
1116 int rval = afpacket_transmit_packet(egress, data, data_len);
1117 if (rval != DAQ_SUCCESS)
1118 {
1119 if (rval == DAQ_ERROR_AGAIN)
1120 SET_ERROR(afpc->modinst, "%s: Could not send packet because the TX ring is full.", __func__);
1121 else
1122 SET_ERROR(afpc->modinst, "%s: Error sending packet: %s (%d)", __func__, strerror(errno), errno);
1123 return rval;
1124 }
1125
1126 afpc->stats.packets_injected++;
1127
1128 return DAQ_SUCCESS;
1129 }
1130
afpacket_daq_inject(void * handle,DAQ_MsgType type,const void * hdr,const uint8_t * data,uint32_t data_len)1131 static int afpacket_daq_inject(void *handle, DAQ_MsgType type, const void *hdr, const uint8_t *data, uint32_t data_len)
1132 {
1133 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1134
1135 if (type != DAQ_MSG_TYPE_PACKET)
1136 return DAQ_ERROR_NOTSUP;
1137
1138 const DAQ_PktHdr_t *pkthdr = (const DAQ_PktHdr_t *) hdr;
1139 AFPacketInstance *egress;
1140
1141 for (egress = afpc->instances; egress; egress = egress->next)
1142 {
1143 if (egress->index == pkthdr->ingress_index)
1144 break;
1145 }
1146
1147 return afpacket_inject_packet(afpc, egress, data, data_len);
1148 }
1149
afpacket_daq_inject_relative(void * handle,const DAQ_Msg_t * msg,const uint8_t * data,uint32_t data_len,int reverse)1150 static int afpacket_daq_inject_relative(void *handle, const DAQ_Msg_t *msg, const uint8_t *data, uint32_t data_len, int reverse)
1151 {
1152 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1153 AFPacketPktDesc *desc = (AFPacketPktDesc *) msg->priv;
1154 AFPacketInstance *egress = reverse ? desc->instance : desc->instance->peer;
1155
1156 return afpacket_inject_packet(afpc, egress, data, data_len);
1157 }
1158
afpacket_daq_interrupt(void * handle)1159 static int afpacket_daq_interrupt(void *handle)
1160 {
1161 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1162
1163 afpc->interrupted = true;
1164
1165 return DAQ_SUCCESS;
1166 }
1167
afpacket_daq_stop(void * handle)1168 static int afpacket_daq_stop(void *handle)
1169 {
1170 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1171
1172 af_packet_close(afpc);
1173
1174 return DAQ_SUCCESS;
1175 }
1176
afpacket_daq_ioctl(void * handle,DAQ_IoctlCmd cmd,void * arg,size_t arglen)1177 static int afpacket_daq_ioctl(void *handle, DAQ_IoctlCmd cmd, void *arg, size_t arglen)
1178 {
1179 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1180
1181 /* Only supports GET_DEVICE_INDEX for now */
1182 if (cmd != DIOCTL_GET_DEVICE_INDEX || arglen != sizeof(DIOCTL_QueryDeviceIndex))
1183 return DAQ_ERROR_NOTSUP;
1184
1185 DIOCTL_QueryDeviceIndex *qdi = (DIOCTL_QueryDeviceIndex *) arg;
1186
1187 if (!qdi->device)
1188 {
1189 SET_ERROR(afpc->modinst, "No device name to find the index of!");
1190 return DAQ_ERROR_INVAL;
1191 }
1192
1193 for (AFPacketInstance *instance = afpc->instances; instance; instance = instance->next)
1194 {
1195 if (!strcmp(qdi->device, instance->name))
1196 {
1197 qdi->index = instance->index;
1198 return DAQ_SUCCESS;
1199 }
1200 }
1201
1202 return DAQ_ERROR_NODEV;
1203 }
1204
afpacket_daq_get_stats(void * handle,DAQ_Stats_t * stats)1205 static int afpacket_daq_get_stats(void *handle, DAQ_Stats_t *stats)
1206 {
1207 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1208
1209 update_hw_stats(afpc);
1210 memcpy(stats, &afpc->stats, sizeof(DAQ_Stats_t));
1211
1212 return DAQ_SUCCESS;
1213 }
1214
afpacket_daq_reset_stats(void * handle)1215 static void afpacket_daq_reset_stats(void *handle)
1216 {
1217 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1218
1219 reset_stats(afpc);
1220 }
1221
afpacket_daq_get_snaplen(void * handle)1222 static int afpacket_daq_get_snaplen(void *handle)
1223 {
1224 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1225
1226 /* Note: This returns the maximum capture length that will be returned by the kernel.
1227 It is slightly larger than the originally requested snaplen due to reserving room
1228 for reconstructing the VLAN tag as well as rounding up due to alignment. */
1229
1230 return afpc->instances->actual_snaplen;
1231 }
1232
afpacket_daq_get_capabilities(void * handle)1233 static uint32_t afpacket_daq_get_capabilities(void *handle)
1234 {
1235 uint32_t capabilities = DAQ_CAPA_BLOCK | DAQ_CAPA_REPLACE | DAQ_CAPA_INJECT |
1236 DAQ_CAPA_UNPRIV_START | DAQ_CAPA_INTERRUPT | DAQ_CAPA_DEVICE_INDEX;
1237 #ifdef LIBPCAP_AVAILABLE
1238 capabilities |= DAQ_CAPA_BPF;
1239 #endif
1240 return capabilities;
1241 }
1242
afpacket_daq_get_datalink_type(void * handle)1243 static int afpacket_daq_get_datalink_type(void *handle)
1244 {
1245 return DLT_EN10MB;
1246 }
1247
find_packet(AFPacket_Context_t * afpc)1248 static inline AFPacketEntry *find_packet(AFPacket_Context_t *afpc)
1249 {
1250 AFPacketInstance *instance;
1251 AFPacketEntry *entry;
1252
1253 instance = afpc->curr_instance;
1254 do
1255 {
1256 instance = instance->next ? instance->next : afpc->instances;
1257 if (instance->rx_ring.cursor->hdr.h2->tp_status & TP_STATUS_USER)
1258 {
1259 afpc->curr_instance = instance;
1260 entry = instance->rx_ring.cursor;
1261 instance->rx_ring.cursor = entry->next;
1262 return entry;
1263 }
1264 } while (instance != afpc->curr_instance);
1265
1266 return NULL;
1267 }
1268
wait_for_packet(AFPacket_Context_t * afpc)1269 static inline DAQ_RecvStatus wait_for_packet(AFPacket_Context_t *afpc)
1270 {
1271 AFPacketInstance *instance;
1272 struct pollfd pfd[AF_PACKET_MAX_INTERFACES];
1273 uint32_t i;
1274
1275 for (i = 0, instance = afpc->instances; instance; i++, instance = instance->next)
1276 {
1277 pfd[i].fd = instance->fd;
1278 pfd[i].revents = 0;
1279 pfd[i].events = POLLIN;
1280 }
1281 /* Chop the timeout into one second chunks (plus any remainer) to improve responsiveness to
1282 interruption when there is no traffic and the timeout is very long (or unlimited). */
1283 int timeout = afpc->timeout;
1284 while (timeout != 0)
1285 {
1286 /* If the receive has been canceled, break out of the loop and return. */
1287 if (afpc->interrupted)
1288 {
1289 afpc->interrupted = false;
1290 return DAQ_RSTAT_INTERRUPTED;
1291 }
1292
1293 int poll_timeout;
1294 if (timeout >= 1000)
1295 {
1296 poll_timeout = 1000;
1297 timeout -= 1000;
1298 }
1299 else if (timeout > 0)
1300 {
1301 poll_timeout = timeout;
1302 timeout = 0;
1303 }
1304 else
1305 poll_timeout = 1000;
1306
1307 int ret = poll(pfd, afpc->intf_count, poll_timeout);
1308 /* If some number of of sockets have events returned, check them all for badness. */
1309 if (ret > 0)
1310 {
1311 for (i = 0; i < afpc->intf_count; i++)
1312 {
1313 if (pfd[i].revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL))
1314 {
1315 if (pfd[i].revents & (POLLHUP | POLLRDHUP))
1316 SET_ERROR(afpc->modinst, "%s: Hang-up on a packet socket", __func__);
1317 else if (pfd[i].revents & POLLERR)
1318 SET_ERROR(afpc->modinst, "%s: Encountered error condition on a packet socket", __func__);
1319 else if (pfd[i].revents & POLLNVAL)
1320 SET_ERROR(afpc->modinst, "%s: Invalid polling request on a packet socket", __func__);
1321 return DAQ_RSTAT_ERROR;
1322 }
1323 }
1324 /* All good! A packet should be waiting for us somewhere. */
1325 return DAQ_RSTAT_OK;
1326 }
1327 /* If we were interrupted by a signal, start the loop over. The user should call daq_interrupt to actually exit. */
1328 if (ret < 0 && errno != EINTR)
1329 {
1330 SET_ERROR(afpc->modinst, "%s: Poll failed: %s (%d)", __func__, strerror(errno), errno);
1331 return DAQ_RSTAT_ERROR;
1332 }
1333 }
1334
1335 return DAQ_RSTAT_TIMEOUT;
1336 }
1337
afpacket_daq_msg_receive(void * handle,const unsigned max_recv,const DAQ_Msg_t * msgs[],DAQ_RecvStatus * rstat)1338 static unsigned afpacket_daq_msg_receive(void *handle, const unsigned max_recv, const DAQ_Msg_t *msgs[], DAQ_RecvStatus *rstat)
1339 {
1340 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1341 AFPacketInstance *instance;
1342 DAQ_RecvStatus status = DAQ_RSTAT_OK;
1343 unsigned idx = 0;
1344
1345 while (idx < max_recv)
1346 {
1347 /* Check to see if the receive has been canceled. If so, reset it and return appropriately. */
1348 if (afpc->interrupted)
1349 {
1350 afpc->interrupted = false;
1351 status = DAQ_RSTAT_INTERRUPTED;
1352 break;
1353 }
1354
1355 /* Make sure that we have a packet descriptor available to populate. */
1356 AFPacketPktDesc *desc = afpc->pool.freelist;
1357 if (!desc)
1358 {
1359 status = DAQ_RSTAT_NOBUF;
1360 break;
1361 }
1362
1363 /* Try to find a packet ready for processing from one of the RX rings. */
1364 AFPacketEntry *entry = find_packet(afpc);
1365 if (!entry)
1366 {
1367 /* Only block waiting for a packet if we haven't received anything yet. */
1368 /* FIXIT-L - Bad interaction with leading packets filtered by BPF? */
1369 if (idx != 0)
1370 {
1371 status = DAQ_RSTAT_WOULD_BLOCK;
1372 break;
1373 }
1374 status = wait_for_packet(afpc);
1375 if (status != DAQ_RSTAT_OK)
1376 break;
1377 continue;
1378 }
1379
1380 unsigned int tp_len, tp_mac, tp_snaplen, tp_sec, tp_usec;
1381 tp_len = entry->hdr.h2->tp_len;
1382 tp_mac = entry->hdr.h2->tp_mac;
1383 tp_snaplen = entry->hdr.h2->tp_snaplen;
1384 tp_sec = entry->hdr.h2->tp_sec;
1385 tp_usec = entry->hdr.h2->tp_nsec / 1000;
1386 instance = afpc->curr_instance;
1387 if (tp_mac + tp_snaplen > instance->rx_ring.layout.tp_frame_size)
1388 {
1389 SET_ERROR(afpc->modinst, "%s: Corrupted frame on kernel ring (MAC offset %u + CapLen %u > FrameSize %d)",
1390 __func__, tp_mac, tp_snaplen, afpc->curr_instance->rx_ring.layout.tp_frame_size);
1391 status = DAQ_RSTAT_ERROR;
1392 break;
1393 }
1394
1395 uint8_t *data = entry->hdr.raw + tp_mac;
1396 /* Make a valiant attempt at reconstructing the VLAN tag if it has been stripped by moving the
1397 MAC addresses backward into the reserved space to make room for the VLAN tag and filling
1398 that tag structure in. */
1399 if (
1400 (entry->hdr.h2->tp_vlan_tci || (entry->hdr.h2->tp_status & TP_STATUS_VLAN_VALID)) &&
1401 tp_snaplen >= (unsigned int) vlan_offset)
1402 {
1403 struct vlan_tag *tag;
1404
1405 data -= VLAN_TAG_LEN;
1406 memmove((void *) data, data + VLAN_TAG_LEN, vlan_offset);
1407
1408 tag = (struct vlan_tag *) (data + vlan_offset);
1409 if (entry->hdr.h2->tp_vlan_tpid && (entry->hdr.h2->tp_status & TP_STATUS_VLAN_TPID_VALID))
1410 tag->vlan_tpid = htons(entry->hdr.h2->tp_vlan_tpid);
1411 else
1412 tag->vlan_tpid = htons(ETH_P_8021Q);
1413 tag->vlan_tci = htons(entry->hdr.h2->tp_vlan_tci);
1414
1415 tp_snaplen += VLAN_TAG_LEN;
1416 tp_len += VLAN_TAG_LEN;
1417 }
1418 #ifdef LIBPCAP_AVAILABLE
1419 /* Check to see if this hits the BPF. If it does, dispose of it and
1420 move on to the next packet (transmitting in the inline scenario). */
1421 if (afpc->fcode.bf_insns && bpf_filter(afpc->fcode.bf_insns, data, tp_len, tp_snaplen) == 0)
1422 {
1423 afpc->stats.packets_filtered++;
1424 afpacket_transmit_packet(instance->peer, data, tp_snaplen);
1425 entry->hdr.h2->tp_status = TP_STATUS_KERNEL;
1426 continue;
1427 }
1428 #endif
1429 afpc->stats.packets_received++;
1430
1431 /* Populate the packet descriptor, copying the packet data and releasing the packet
1432 ring entry back to the kernel for reuse. */
1433 memcpy(desc->data, data, tp_snaplen);
1434 entry->hdr.h2->tp_status = TP_STATUS_KERNEL;
1435 desc->instance = instance;
1436 desc->length = tp_snaplen;
1437
1438 /* Next, set up the DAQ message. Most fields are prepopulated and unchanging. */
1439 DAQ_Msg_t *msg = &desc->msg;
1440 msg->data_len = tp_snaplen;
1441
1442 /* Then, set up the DAQ packet header. */
1443 DAQ_PktHdr_t *pkthdr = &desc->pkthdr;
1444 pkthdr->ts.tv_sec = tp_sec;
1445 pkthdr->ts.tv_usec = tp_usec;
1446 pkthdr->pktlen = tp_len;
1447 pkthdr->ingress_index = instance->index;
1448 pkthdr->egress_index = instance->peer ? instance->peer->index : DAQ_PKTHDR_UNKNOWN;
1449 pkthdr->flags = 0;
1450 /* The following fields should remain in their virgin state:
1451 address_space_id (0)
1452 ingress_group (DAQ_PKTHDR_UNKNOWN)
1453 egress_group (DAQ_PKTHDR_UNKNOWN)
1454 opaque (0)
1455 flow_id (0)
1456 */
1457
1458 /* Last, but not least, extract this descriptor from the free list and
1459 place the message in the return vector. */
1460 afpc->pool.freelist = desc->next;
1461 desc->next = NULL;
1462 afpc->pool.info.available--;
1463 msgs[idx] = &desc->msg;
1464
1465 idx++;
1466 }
1467
1468 *rstat = status;
1469
1470 return idx;
1471 }
1472
1473 static const DAQ_Verdict verdict_translation_table[MAX_DAQ_VERDICT] = {
1474 DAQ_VERDICT_PASS, /* DAQ_VERDICT_PASS */
1475 DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLOCK */
1476 DAQ_VERDICT_PASS, /* DAQ_VERDICT_REPLACE */
1477 DAQ_VERDICT_PASS, /* DAQ_VERDICT_WHITELIST */
1478 DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLACKLIST */
1479 DAQ_VERDICT_PASS /* DAQ_VERDICT_IGNORE */
1480 };
1481
afpacket_daq_msg_finalize(void * handle,const DAQ_Msg_t * msg,DAQ_Verdict verdict)1482 static int afpacket_daq_msg_finalize(void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict)
1483 {
1484 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1485 AFPacketPktDesc *desc = (AFPacketPktDesc *) msg->priv;
1486
1487 /* Sanitize and enact the verdict. */
1488 if (verdict >= MAX_DAQ_VERDICT)
1489 verdict = DAQ_VERDICT_PASS;
1490 afpc->stats.verdicts[verdict]++;
1491 verdict = verdict_translation_table[verdict];
1492 if (verdict == DAQ_VERDICT_PASS)
1493 afpacket_transmit_packet(desc->instance->peer, desc->data, desc->length);
1494
1495 /* Toss the descriptor back on the free list for reuse. */
1496 desc->next = afpc->pool.freelist;
1497 afpc->pool.freelist = desc;
1498 afpc->pool.info.available++;
1499
1500 return DAQ_SUCCESS;
1501 }
1502
afpacket_daq_get_msg_pool_info(void * handle,DAQ_MsgPoolInfo_t * info)1503 static int afpacket_daq_get_msg_pool_info(void *handle, DAQ_MsgPoolInfo_t *info)
1504 {
1505 AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1506
1507 *info = afpc->pool.info;
1508
1509 return DAQ_SUCCESS;
1510 }
1511
1512 #ifdef BUILDING_SO
1513 DAQ_SO_PUBLIC const DAQ_ModuleAPI_t DAQ_MODULE_DATA =
1514 #else
1515 const DAQ_ModuleAPI_t afpacket_daq_module_data =
1516 #endif
1517 {
1518 /* .api_version = */ DAQ_MODULE_API_VERSION,
1519 /* .api_size = */ sizeof(DAQ_ModuleAPI_t),
1520 /* .module_version = */ DAQ_AFPACKET_VERSION,
1521 /* .name = */ "afpacket",
1522 /* .type = */ DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_INLINE_CAPABLE | DAQ_TYPE_MULTI_INSTANCE,
1523 /* .load = */ afpacket_daq_module_load,
1524 /* .unload = */ afpacket_daq_module_unload,
1525 /* .get_variable_descs = */ afpacket_daq_get_variable_descs,
1526 /* .instantiate = */ afpacket_daq_instantiate,
1527 /* .destroy = */ afpacket_daq_destroy,
1528 /* .set_filter = */ afpacket_daq_set_filter,
1529 /* .start = */ afpacket_daq_start,
1530 /* .inject = */ afpacket_daq_inject,
1531 /* .inject_relative = */ afpacket_daq_inject_relative,
1532 /* .interrupt = */ afpacket_daq_interrupt,
1533 /* .stop = */ afpacket_daq_stop,
1534 /* .ioctl = */ afpacket_daq_ioctl,
1535 /* .get_stats = */ afpacket_daq_get_stats,
1536 /* .reset_stats = */ afpacket_daq_reset_stats,
1537 /* .get_snaplen = */ afpacket_daq_get_snaplen,
1538 /* .get_capabilities = */ afpacket_daq_get_capabilities,
1539 /* .get_datalink_type = */ afpacket_daq_get_datalink_type,
1540 /* .config_load = */ NULL,
1541 /* .config_swap = */ NULL,
1542 /* .config_free = */ NULL,
1543 /* .msg_receive = */ afpacket_daq_msg_receive,
1544 /* .msg_finalize = */ afpacket_daq_msg_finalize,
1545 /* .get_msg_pool_info = */ afpacket_daq_get_msg_pool_info,
1546 };
1547