1 /*
2 ** Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
3 ** Copyright (C) 2010-2013 Sourcefire, Inc.
4 ** Author: Michael R. Altizer <mialtize@cisco.com>
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU General Public License Version 2 as
8 ** published by the Free Software Foundation.  You may not use, modify or
9 ** distribute this program under any other version of the GNU General
10 ** Public License.
11 **
12 ** This program is distributed in the hope that it will be useful,
13 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ** GNU General Public License for more details.
16 **
17 ** You should have received a copy of the GNU General Public License
18 ** along with this program; if not, write to the Free Software
19 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
20 */
21 
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 
26 #define _GNU_SOURCE // For POLLRDHUP
27 
28 #include <errno.h>
29 #include <limits.h>
30 #include <linux/if_ether.h>
31 #include <linux/if_packet.h>
32 #include <net/if.h>
33 #include <net/if_arp.h>
34 #include <netinet/in.h>
35 #include <poll.h>
36 #include <stdbool.h>
37 #include <stdint.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <sys/ioctl.h>
42 #include <sys/mman.h>
43 #include <sys/socket.h>
44 #include <unistd.h>
45 
46 #ifdef LIBPCAP_AVAILABLE
47 #include <pcap.h>
48 #include <pthread.h>
49 #else
50 #include "daq_dlt.h"
51 #endif
52 
53 #include "daq_module_api.h"
54 
55 #define DAQ_AFPACKET_VERSION 7
56 
57 #define AF_PACKET_DEFAULT_BUFFER_SIZE   128
58 #define AF_PACKET_MAX_INTERFACES         32
59 
60 #define SET_ERROR(modinst, ...)    daq_base_api.set_errbuf(modinst, __VA_ARGS__)
61 
62 union thdr
63 {
64     struct tpacket2_hdr *h2;
65     uint8_t *raw;
66 };
67 
68 typedef struct _af_packet_entry
69 {
70     struct _af_packet_entry *next;
71     union thdr hdr;
72 } AFPacketEntry;
73 
74 typedef struct _af_packet_ring
75 {
76     struct tpacket_req layout;
77     unsigned int size;
78     void *start;
79     AFPacketEntry *entries;
80     AFPacketEntry *cursor;
81 } AFPacketRing;
82 
83 typedef struct _af_packet_instance
84 {
85     struct _af_packet_instance *next;
86     int fd;
87     unsigned tp_version;
88     unsigned tp_hdrlen;
89     unsigned tp_reserve;
90     unsigned tp_frame_size;
91     unsigned actual_snaplen;
92     void *buffer;
93     AFPacketRing rx_ring;
94     AFPacketRing tx_ring;
95     char *name;
96     int index;
97     struct _af_packet_instance *peer;
98     int mtu;
99     bool active;
100 } AFPacketInstance;
101 
102 typedef struct _af_packet_fanout_cfg
103 {
104     uint16_t fanout_flags;
105     uint16_t fanout_type;
106     bool enabled;
107 } AFPacketFanoutCfg;
108 
109 typedef struct _af_packet_pkt_desc
110 {
111     DAQ_Msg_t msg;
112     DAQ_PktHdr_t pkthdr;
113     uint8_t *data;
114     AFPacketInstance *instance;
115     unsigned int length;
116     struct _af_packet_pkt_desc *next;
117 } AFPacketPktDesc;
118 
119 typedef struct _af_packet_msg_pool
120 {
121     AFPacketPktDesc *pool;
122     AFPacketPktDesc *freelist;
123     DAQ_MsgPoolInfo_t info;
124 } AFPacketMsgPool;
125 
126 typedef struct _afpacket_context
127 {
128     /* Configuration */
129     char *device;
130     char *filter;
131     int snaplen;
132     int timeout;
133     uint32_t ring_size;
134     AFPacketFanoutCfg fanout_cfg;
135     bool use_tx_ring;
136     bool debug;
137     /* State */
138     DAQ_ModuleInstance_h modinst;
139     AFPacketMsgPool pool;
140     AFPacketInstance *instances;
141     uint32_t intf_count;
142 #ifdef LIBPCAP_AVAILABLE
143     struct bpf_program fcode;
144 #endif
145     volatile bool interrupted;
146     DAQ_Stats_t stats;
147     /* Message receive state */
148     AFPacketInstance *curr_instance;
149 } AFPacket_Context_t;
150 
151 /* VLAN defintions stolen from LibPCAP's vlan.h. */
152 struct vlan_tag {
153     u_int16_t   vlan_tpid;      /* ETH_P_8021Q */
154     u_int16_t   vlan_tci;       /* VLAN TCI */
155 };
156 #define VLAN_TAG_LEN    4
157 
158 static DAQ_VariableDesc_t afpacket_variable_descriptions[] = {
159     { "buffer_size_mb", "Packet buffer space to allocate in megabytes", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
160     { "debug", "Enable debugging output to stdout", DAQ_VAR_DESC_FORBIDS_ARGUMENT },
161     { "fanout_type", "Fanout loadbalancing method", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
162     { "fanout_flag", "Fanout loadbalancing option", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
163     { "use_tx_ring", "Use memory-mapped TX ring", DAQ_VAR_DESC_FORBIDS_ARGUMENT },
164 };
165 
166 static const int vlan_offset = 2 * ETH_ALEN;
167 static DAQ_BaseAPI_t daq_base_api;
168 #ifdef LIBPCAP_AVAILABLE
169 static pthread_mutex_t bpf_mutex = PTHREAD_MUTEX_INITIALIZER;
170 #endif
171 
destroy_packet_pool(AFPacket_Context_t * afpc)172 static void destroy_packet_pool(AFPacket_Context_t *afpc)
173 {
174     AFPacketMsgPool *pool = &afpc->pool;
175     if (pool->pool)
176     {
177         while (pool->info.size > 0)
178             free(pool->pool[--pool->info.size].data);
179         free(pool->pool);
180         pool->pool = NULL;
181     }
182     pool->freelist = NULL;
183     pool->info.available = 0;
184     pool->info.mem_size = 0;
185 }
186 
create_packet_pool(AFPacket_Context_t * afpc,unsigned size)187 static int create_packet_pool(AFPacket_Context_t *afpc, unsigned size)
188 {
189     AFPacketMsgPool *pool = &afpc->pool;
190     pool->pool = calloc(sizeof(AFPacketPktDesc), size);
191     if (!pool->pool)
192     {
193         SET_ERROR(afpc->modinst, "%s: Could not allocate %zu bytes for a packet descriptor pool!",
194                 __func__, sizeof(AFPacketPktDesc) * size);
195         return DAQ_ERROR_NOMEM;
196     }
197     pool->info.mem_size = sizeof(AFPacketPktDesc) * size;
198     while (pool->info.size < size)
199     {
200         /* Allocate packet data and set up descriptor */
201         AFPacketPktDesc *desc = &pool->pool[pool->info.size];
202         desc->data = malloc(afpc->instances->actual_snaplen);
203         if (!desc->data)
204         {
205             SET_ERROR(afpc->modinst, "%s: Could not allocate %d bytes for a packet descriptor message buffer!",
206                     __func__, afpc->instances->actual_snaplen);
207             return DAQ_ERROR_NOMEM;
208         }
209         pool->info.mem_size += afpc->instances->actual_snaplen;
210 
211         /* Initialize non-zero invariant packet header fields. */
212         DAQ_PktHdr_t *pkthdr = &desc->pkthdr;
213         pkthdr->ingress_group = DAQ_PKTHDR_UNKNOWN;
214         pkthdr->egress_group = DAQ_PKTHDR_UNKNOWN;
215 
216         /* Initialize non-zero invariant message header fields. */
217         DAQ_Msg_t *msg = &desc->msg;
218         msg->type = DAQ_MSG_TYPE_PACKET;
219         msg->hdr_len = sizeof(desc->pkthdr);
220         msg->hdr = &desc->pkthdr;
221         msg->data = desc->data;
222         msg->owner = afpc->modinst;
223         msg->priv = desc;
224 
225         /* Place it on the free list */
226         desc->next = pool->freelist;
227         pool->freelist = desc;
228 
229         pool->info.size++;
230     }
231     pool->info.available = pool->info.size;
232     return DAQ_SUCCESS;
233 }
234 
bind_instance_interface(AFPacket_Context_t * afpc,AFPacketInstance * instance,int protocol)235 static int bind_instance_interface(AFPacket_Context_t *afpc, AFPacketInstance *instance, int protocol)
236 {
237     struct sockaddr_ll sll;
238     int err;
239     socklen_t errlen = sizeof(err);
240 
241     /* Bind to the specified device so we only see packets from it. */
242     memset(&sll, 0, sizeof(struct sockaddr_ll));
243     sll.sll_family = AF_PACKET;
244     sll.sll_ifindex = instance->index;
245     sll.sll_protocol = htons(protocol);
246 
247     if (bind(instance->fd, (struct sockaddr *) &sll, sizeof(sll)) == -1)
248     {
249         SET_ERROR(afpc->modinst, "%s: bind(%s): %s\n", __func__, instance->name, strerror(errno));
250         return DAQ_ERROR;
251     }
252 
253     /* Any pending errors, e.g., network is down? */
254     if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) || err)
255     {
256         SET_ERROR(afpc->modinst, "%s: getsockopt: %s", __func__, err ? strerror(err) : strerror(errno));
257         return DAQ_ERROR;
258     }
259 
260     return DAQ_SUCCESS;
261 }
262 
set_up_ring(AFPacket_Context_t * afpc,AFPacketInstance * instance,AFPacketRing * ring)263 static int set_up_ring(AFPacket_Context_t *afpc, AFPacketInstance *instance, AFPacketRing *ring)
264 {
265     unsigned int idx, block, frame, frame_offset;
266 
267     /* Allocate a ring to hold packet pointers. */
268     ring->entries = calloc(ring->layout.tp_frame_nr, sizeof(AFPacketEntry));
269     if (!ring->entries)
270     {
271         SET_ERROR(afpc->modinst, "%s: Could not allocate ring buffer entries for device %s!", __func__, instance->name);
272         return DAQ_ERROR_NOMEM;
273     }
274 
275     /* Set up the buffer entry pointers in the ring. */
276     idx = 0;
277     for (block = 0; block < ring->layout.tp_block_nr; block++)
278     {
279         unsigned int block_offset = block * ring->layout.tp_block_size;
280         for (frame = 0; frame < (ring->layout.tp_block_size / ring->layout.tp_frame_size) && idx < ring->layout.tp_frame_nr; frame++)
281         {
282             frame_offset = frame * ring->layout.tp_frame_size;
283             ring->entries[idx].hdr.raw = (uint8_t *) ring->start + block_offset + frame_offset;
284             ring->entries[idx].next = &ring->entries[idx + 1];
285             idx++;
286         }
287     }
288     /* Make this a circular buffer ... a RING if you will! */
289     ring->entries[ring->layout.tp_frame_nr - 1].next = &ring->entries[0];
290     /* Initialize our entry point into the ring as the first buffer entry. */
291     ring->cursor = &ring->entries[0];
292 
293     return DAQ_SUCCESS;
294 }
295 
destroy_instance(AFPacketInstance * instance)296 static void destroy_instance(AFPacketInstance *instance)
297 {
298     if (instance)
299     {
300         if (instance->fd != -1)
301         {
302             /* Destroy the userspace RX ring. */
303             if (instance->rx_ring.entries)
304             {
305                 free(instance->rx_ring.entries);
306                 instance->rx_ring.entries = NULL;
307             }
308             /* Destroy the userspace TX ring. */
309             if (instance->tx_ring.entries)
310             {
311                 free(instance->tx_ring.entries);
312                 instance->tx_ring.entries = NULL;
313             }
314             /* Unmap the kernel packet ring. */
315             if (instance->buffer != MAP_FAILED)
316             {
317                 unsigned int ringsize = instance->rx_ring.size + instance->tx_ring.size;
318                 munmap(instance->buffer, ringsize);
319                 instance->buffer = MAP_FAILED;
320             }
321             /* Tell the kernel to destroy the rings. */
322             struct tpacket_req req;
323             memset(&req, 0, sizeof(req));
324             setsockopt(instance->fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req));
325             if (instance->tx_ring.size)
326                 setsockopt(instance->fd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req));
327             close(instance->fd);
328         }
329         if (instance->name)
330         {
331             free(instance->name);
332             instance->name = NULL;
333         }
334         free(instance);
335     }
336 }
337 
iface_get_arptype(AFPacketInstance * instance)338 static int iface_get_arptype(AFPacketInstance *instance)
339 {
340     struct ifreq ifr;
341 
342     snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", instance->name);
343 
344     if (ioctl(instance->fd, SIOCGIFHWADDR, &ifr) == -1)
345     {
346         if (errno == ENODEV)
347             return DAQ_ERROR_NODEV;
348         return DAQ_ERROR;
349     }
350 
351     return ifr.ifr_hwaddr.sa_family;
352 }
353 
create_instance(AFPacket_Context_t * afpc,const char * device)354 static AFPacketInstance *create_instance(AFPacket_Context_t *afpc, const char *device)
355 {
356     AFPacketInstance *instance = NULL;
357     struct ifreq ifr;
358     socklen_t len;
359     int val;
360 
361     instance = calloc(1, sizeof(AFPacketInstance));
362     if (!instance)
363     {
364         SET_ERROR(afpc->modinst, "%s: Could not allocate a new instance structure.", __func__);
365         goto err;
366     }
367     instance->buffer = MAP_FAILED;
368 
369     if ((instance->name = strdup(device)) == NULL)
370     {
371         SET_ERROR(afpc->modinst, "%s: Could not allocate a copy of the device name.", __func__);
372         goto err;;
373     }
374 
375     /* Open the PF_PACKET raw socket to receive all network traffic completely unmodified.
376         We use 0 for the protocol so that the packet pseudo-interface will not go into a running
377         state until we bind() it to an interface later with a real protocol. */
378     instance->fd = socket(PF_PACKET, SOCK_RAW, 0);
379     if (instance->fd == -1)
380     {
381         SET_ERROR(afpc->modinst, "%s: Could not open the PF_PACKET socket: %s", __func__, strerror(errno));
382         goto err;
383     }
384 
385     /* Find the device index of the specified interface. */
386     memset(&ifr, 0, sizeof(ifr));
387     strncpy(ifr.ifr_name, device, sizeof(ifr.ifr_name));
388     if (ioctl(instance->fd, SIOCGIFINDEX, &ifr) == -1)
389     {
390         SET_ERROR(afpc->modinst, "%s: Could not find index for device %s", __func__, instance->name);
391         goto err;
392     }
393     instance->index = ifr.ifr_ifindex;
394 
395     /* Probe whether the kernel supports TPACKET_V2 */
396     val = TPACKET_V2;
397     len = sizeof(val);
398     if (getsockopt(instance->fd, SOL_PACKET, PACKET_HDRLEN, &val, &len) < 0)
399     {
400         SET_ERROR(afpc->modinst, "Couldn't retrieve TPACKET_V2 header length: %s", strerror(errno));
401         goto err;
402     }
403     instance->tp_hdrlen = val;
404 
405     /* Tell the kernel to use TPACKET_V2 */
406     val = TPACKET_V2;
407     if (setsockopt(instance->fd, SOL_PACKET, PACKET_VERSION, &val, sizeof(val)) < 0)
408     {
409         SET_ERROR(afpc->modinst, "Couldn't activate TPACKET_V2 on packet socket: %s", strerror(errno));
410         goto err;
411     }
412     instance->tp_version = TPACKET_V2;
413 
414     /* Reserve space for VLAN tag reconstruction */
415     val = VLAN_TAG_LEN;
416     if (setsockopt(instance->fd, SOL_PACKET, PACKET_RESERVE, &val, sizeof(val)) < 0)
417     {
418         SET_ERROR(afpc->modinst, "Couldn't set up a %d-byte reservation packet socket: %s", val, strerror(errno));
419         goto err;
420     }
421 
422     /* Bypass the kernel's qdisc layer when transmitting */
423     val = 1;
424     if (setsockopt(instance->fd, SOL_PACKET, PACKET_QDISC_BYPASS, &val, sizeof(val)) < 0)
425     {
426         SET_ERROR(afpc->modinst, "Couldn't configure bypassing qdisc on TX: %s", strerror(errno));
427         goto err;
428     }
429 
430     /* Don't block on malformed frames in the TX ring */
431     val = 1;
432     if (setsockopt(instance->fd, SOL_PACKET, PACKET_LOSS, &val, sizeof(val)) < 0)
433     {
434         SET_ERROR(afpc->modinst, "Couldn't configure dropping malformed TX packets: %s", strerror(errno));
435         goto err;
436     }
437 
438     /* Get the interface MTU */
439     memset (&ifr, 0, sizeof(ifr));
440     snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", instance->name);
441     if (ioctl(instance->fd, SIOCGIFMTU, &ifr) == -1)
442     {
443         SET_ERROR(afpc->modinst, "%s: Could not query MTU for '%s': %s (%d)", __func__,
444                 instance->name, strerror(errno), errno);
445         goto err;
446     }
447     instance->mtu = ifr.ifr_mtu;
448 
449     /* Get the current reservation.  Hopefully it's what we set it to earlier for VLAN reconstruction. */
450     if (getsockopt(instance->fd, SOL_PACKET, PACKET_RESERVE, &val, &len) == -1)
451     {
452         SET_ERROR(afpc->modinst, "%s: Could not query packet reserved space for '%s': %s (%d)",
453                 __func__, instance->name, strerror(errno), errno);
454         goto err;
455     }
456     instance->tp_reserve = val;
457 
458     /* Bind the socket to the interface with protocol 0 to associate it with the interface while not putting it
459         into the running state yet. */
460     if (bind_instance_interface(afpc, instance, 0) != 0)
461         goto err;
462 
463     /* Verify that the link-layer type is ethernet as that's all we're supporting. */
464     int arptype = iface_get_arptype(instance);
465     if (arptype < 0)
466     {
467         SET_ERROR(afpc->modinst, "%s: failed to get interface type for device %s: (%d) %s",
468                 __func__, instance->name, errno, strerror(errno));
469         goto err;
470     }
471 
472     /* Normal loopback traffic presents itself as ethernet traffic with zeroed out MAC addresses,
473         and injected traffic shows (ala tcpreplay) shows up with populated ethernet headers.  Either
474         way, we can just treat it like normal ethernet traffic and handle it. */
475     if (arptype != ARPHRD_ETHER && arptype != ARPHRD_LOOPBACK)
476     {
477         SET_ERROR(afpc->modinst, "%s: invalid interface type for device %s: %d",
478                 __func__, instance->name, arptype);
479         goto err;
480     }
481 
482     /* Turn on promiscuous mode for the device. */
483     struct packet_mreq mr;
484     memset(&mr, 0, sizeof(mr));
485     mr.mr_ifindex = instance->index;
486     mr.mr_type = PACKET_MR_PROMISC;
487     if (setsockopt(instance->fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, &mr, sizeof(mr)) == -1)
488     {
489         SET_ERROR(afpc->modinst, "%s: setsockopt: %s", __func__, strerror(errno));
490         goto err;
491     }
492 
493     if (afpc->debug)
494     {
495         printf("[%s]\n", instance->name);
496         printf("  TPacket Version: %u\n", instance->tp_version);
497         printf("  TPacket Header Length: %u\n", instance->tp_hdrlen);
498         printf("  MTU: %d\n", instance->mtu);
499         printf("  Reservation: %u\n", instance->tp_reserve);
500     }
501 
502     return instance;
503 
504 err:
505     destroy_instance(instance);
506     return NULL;
507 }
508 
calculate_frame_size(AFPacket_Context_t * afpc,AFPacketInstance * instance)509 static void calculate_frame_size(AFPacket_Context_t *afpc, AFPacketInstance *instance)
510 {
511     /* Calculate the TPACKET frame size to use.
512         From packet_mmap.txt in the Linux kernel documentation:
513 
514             Frame structure:
515                - Start. Frame must be aligned to TPACKET_ALIGNMENT=16
516                - struct tpacket_hdr
517                - pad to TPACKET_ALIGNMENT=16
518                - struct sockaddr_ll
519                - Gap, chosen so that packet data (Start+tp_net) aligns to
520                  TPACKET_ALIGNMENT=16
521                - Start+tp_mac: [ Optional MAC header ]
522                - Start+tp_net: Packet data, aligned to TPACKET_ALIGNMENT=16.
523                - Pad to align to TPACKET_ALIGNMENT=16
524 
525         The space we reserve for VLAN reconstruction sits before the MAC header.
526         I'm aware the ETH_HLEN should always be less than 16, but I just want this logic
527         to as closely match that in the kernel as possible.
528     */
529     unsigned tp_hdrlen_sll = TPACKET_ALIGN(instance->tp_hdrlen) + sizeof(struct sockaddr_ll);
530     unsigned netoff = TPACKET_ALIGN(tp_hdrlen_sll + (ETH_HLEN < 16 ? 16 : ETH_HLEN)) + instance->tp_reserve;
531     unsigned macoff = netoff - ETH_HLEN;
532 
533     instance->tp_frame_size = TPACKET_ALIGN(macoff + afpc->snaplen);
534     instance->actual_snaplen = instance->tp_frame_size - macoff;
535 }
536 
calculate_layout(AFPacket_Context_t * afpc,AFPacketInstance * instance,struct tpacket_req * layout,int order)537 static int calculate_layout(AFPacket_Context_t *afpc, AFPacketInstance *instance, struct tpacket_req *layout, int order)
538 {
539     unsigned int frames_per_block;
540 
541     /* Use the pre-calculated frame size. */
542     layout->tp_frame_size = instance->tp_frame_size;
543 
544     /* Calculate the minimum block size required. */
545     layout->tp_block_size = getpagesize() << order;
546     while (layout->tp_block_size < layout->tp_frame_size)
547         layout->tp_block_size <<= 1;
548     frames_per_block = layout->tp_block_size / layout->tp_frame_size;
549     if (frames_per_block == 0)
550     {
551         SET_ERROR(afpc->modinst, "%s: Invalid frames per block (%u/%u) for %s",
552                 __func__, layout->tp_block_size, layout->tp_frame_size, afpc->device);
553         return DAQ_ERROR;
554     }
555 
556     /* Find the total number of frames required to amount to the requested per-interface memory.
557         Then find the number of blocks required to hold those packet buffer frames. */
558     layout->tp_frame_nr = afpc->ring_size / layout->tp_frame_size;
559     layout->tp_block_nr = layout->tp_frame_nr / frames_per_block;
560     /* afpc->layout.tp_frame_nr is requested to match frames_per_block * n_blocks */
561     layout->tp_frame_nr = layout->tp_block_nr * frames_per_block;
562     if (afpc->debug)
563     {
564         printf("AFPacket Layout:\n");
565         printf("  Frame Size: %u\n", layout->tp_frame_size);
566         printf("  Frames:     %u\n", layout->tp_frame_nr);
567         printf("  Block Size: %u (Order %d)\n", layout->tp_block_size, order);
568         printf("  Blocks:     %u\n", layout->tp_block_nr);
569         printf("  Wasted:     %u\n", layout->tp_block_nr * (layout->tp_block_size % layout->tp_frame_size));
570     }
571 
572     return DAQ_SUCCESS;
573 }
574 
575 #define DEFAULT_ORDER 5
create_ring(AFPacket_Context_t * afpc,AFPacketInstance * instance,AFPacketRing * ring,int optname)576 static int create_ring(AFPacket_Context_t *afpc, AFPacketInstance *instance, AFPacketRing *ring, int optname)
577 {
578     /* Starting with page allocations of order 5, try to allocate an RX ring in the kernel. */
579     for (int order = DEFAULT_ORDER; order >= 0; order--)
580     {
581         if (calculate_layout(afpc, instance, &ring->layout, order))
582             return DAQ_ERROR;
583 
584         /* Ask the kernel to create the ring. */
585         int rc = setsockopt(instance->fd, SOL_PACKET, optname, (void*) &ring->layout, sizeof(struct tpacket_req));
586         if (rc)
587         {
588             if (errno == ENOMEM)
589             {
590                 if (afpc->debug)
591                     printf("%s: Allocation of kernel packet ring failed with order %d, retrying...\n", instance->name, order);
592                 continue;
593             }
594             SET_ERROR(afpc->modinst, "%s: Couldn't create kernel ring on packet socket: %s",
595                     __func__, strerror(errno));
596             return DAQ_ERROR;
597         }
598         /* Store the total ring size for later. */
599         ring->size = ring->layout.tp_block_size * ring->layout.tp_block_nr;
600         if (afpc->debug)
601             printf("Created a ring of type %d with total size of %u\n", optname, ring->size);
602         return DAQ_SUCCESS;
603     }
604 
605     /* If we got here, it means we failed allocation on order 0. */
606     SET_ERROR(afpc->modinst, "%s: Couldn't allocate enough memory for the kernel packet ring!", instance->name);
607     return DAQ_ERROR;
608 }
609 
mmap_rings(AFPacket_Context_t * afpc,AFPacketInstance * instance)610 static int mmap_rings(AFPacket_Context_t *afpc, AFPacketInstance *instance)
611 {
612     unsigned int ringsize;
613 
614     /* Map the ring into userspace. */
615     ringsize = instance->rx_ring.size + instance->tx_ring.size;
616     instance->buffer = mmap(0, ringsize, PROT_READ | PROT_WRITE, MAP_SHARED, instance->fd, 0);
617     if (instance->buffer == MAP_FAILED)
618     {
619         SET_ERROR(afpc->modinst, "%s: Could not MMAP the ring: %s", __func__, strerror(errno));
620         return DAQ_ERROR;
621     }
622     instance->rx_ring.start = instance->buffer;
623     if (instance->tx_ring.size)
624         instance->tx_ring.start = (uint8_t *) instance->buffer + instance->rx_ring.size;
625 
626     return DAQ_SUCCESS;
627 }
628 
create_instance_rings(AFPacket_Context_t * afpc,AFPacketInstance * instance)629 static int create_instance_rings(AFPacket_Context_t *afpc, AFPacketInstance *instance)
630 {
631     /* Calculate the frame size to request from the kernel. */
632     calculate_frame_size(afpc, instance);
633 
634     /* Request the kernel RX ring from af_packet... */
635     if (create_ring(afpc, instance, &instance->rx_ring, PACKET_RX_RING) != DAQ_SUCCESS)
636         return DAQ_ERROR;
637     /* ...request the kernel TX ring from af_packet if we're in inline mode... */
638     if (instance->peer && afpc->use_tx_ring)
639     {
640         if (create_ring(afpc, instance, &instance->tx_ring, PACKET_TX_RING) != DAQ_SUCCESS)
641             return DAQ_ERROR;
642     }
643     /* ...map the memory for the kernel ring(s) into userspace... */
644     if (mmap_rings(afpc, instance) != DAQ_SUCCESS)
645         return DAQ_ERROR;
646     /* ...and, finally, set up a userspace ring buffer to represent the kernel RX ring... */
647     if (set_up_ring(afpc, instance, &instance->rx_ring) != DAQ_SUCCESS)
648         return DAQ_ERROR;
649     /* ...as well as one for the TX ring if we're in inline mode... */
650     if (instance->peer && afpc->use_tx_ring)
651     {
652         if (set_up_ring(afpc, instance, &instance->tx_ring) != DAQ_SUCCESS)
653             return DAQ_ERROR;
654     }
655 
656     return DAQ_SUCCESS;
657 }
658 
configure_fanout(AFPacket_Context_t * afpc,AFPacketInstance * instance)659 static int configure_fanout(AFPacket_Context_t *afpc, AFPacketInstance *instance)
660 {
661     int fanout_arg;
662 
663     fanout_arg = ((afpc->fanout_cfg.fanout_type | afpc->fanout_cfg.fanout_flags)) << 16 | instance->index;
664     if (setsockopt(instance->fd, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg)) == -1)
665     {
666         SET_ERROR(afpc->modinst, "%s: Could not configure packet fanout: %s", __func__, strerror(errno));
667         return DAQ_ERROR;
668     }
669 
670     return DAQ_SUCCESS;
671 }
672 
start_instance(AFPacket_Context_t * afpc,AFPacketInstance * instance)673 static int start_instance(AFPacket_Context_t *afpc, AFPacketInstance *instance)
674 {
675     /* Bind the RX ring to this interface. */
676     if (bind_instance_interface(afpc, instance, ETH_P_ALL) != 0)
677         return -1;
678 
679     /* Configure packet fanout if requested.  This must happen after the final binding. */
680     if (afpc->fanout_cfg.enabled && configure_fanout(afpc, instance) != DAQ_SUCCESS)
681         return -1;
682 
683     instance->active = true;
684 
685     return 0;
686 }
687 
update_hw_stats(AFPacket_Context_t * afpc)688 static void update_hw_stats(AFPacket_Context_t *afpc)
689 {
690     AFPacketInstance *instance;
691     struct tpacket_stats kstats;
692     socklen_t len = sizeof (struct tpacket_stats);
693 
694     for (instance = afpc->instances; instance; instance = instance->next)
695     {
696         if (!instance->active)
697             continue;
698         memset(&kstats, 0, len);
699         if (getsockopt(instance->fd, SOL_PACKET, PACKET_STATISTICS, &kstats, &len) > -1)
700         {
701             /* tp_packets is a superset of tp_drops as it is incremented prior to the processing
702                 that determines the copy will be dropped/not made. */
703             afpc->stats.hw_packets_received += kstats.tp_packets - kstats.tp_drops;
704             afpc->stats.hw_packets_dropped += kstats.tp_drops;
705         }
706         else
707             fprintf(stderr, "Failed to get stats for %s: %d %s\n", instance->name, errno, strerror(errno));
708     }
709 }
710 
af_packet_close(AFPacket_Context_t * afpc)711 static int af_packet_close(AFPacket_Context_t *afpc)
712 {
713     AFPacketInstance *instance;
714 
715     if (!afpc)
716         return -1;
717 
718     /* Cache the latest hardware stats before stopping. */
719     update_hw_stats(afpc);
720 
721     while ((instance = afpc->instances) != NULL)
722     {
723         afpc->instances = instance->next;
724         destroy_instance(instance);
725     }
726 
727 #ifdef LIBPCAP_AVAILABLE
728     pcap_freecode(&afpc->fcode);
729 #endif
730 
731     return 0;
732 }
733 
create_bridge(AFPacket_Context_t * afpc,const char * device_name1,const char * device_name2)734 static int create_bridge(AFPacket_Context_t *afpc, const char *device_name1, const char *device_name2)
735 {
736     AFPacketInstance *instance, *peer1, *peer2;
737 
738     peer1 = peer2 = NULL;
739     for (instance = afpc->instances; instance; instance = instance->next)
740     {
741         if (!strcmp(instance->name, device_name1))
742             peer1 = instance;
743         else if (!strcmp(instance->name, device_name2))
744             peer2 = instance;
745     }
746 
747     if (!peer1 || !peer2)
748         return DAQ_ERROR_NODEV;
749 
750     peer1->peer = peer2;
751     peer2->peer = peer1;
752 
753     return DAQ_SUCCESS;
754 }
755 
reset_stats(AFPacket_Context_t * afpc)756 static void reset_stats(AFPacket_Context_t *afpc)
757 {
758     AFPacketInstance *instance;
759     struct tpacket_stats kstats;
760     socklen_t len = sizeof (struct tpacket_stats);
761 
762     memset(&afpc->stats, 0, sizeof(DAQ_Stats_t));
763     /* Just call PACKET_STATISTICS to clear each instance's stats. */
764     for (instance = afpc->instances; instance; instance = instance->next)
765         getsockopt(instance->fd, SOL_PACKET, PACKET_STATISTICS, &kstats, &len);
766 }
767 
afpacket_daq_module_load(const DAQ_BaseAPI_t * base_api)768 static int afpacket_daq_module_load(const DAQ_BaseAPI_t *base_api)
769 {
770     if (base_api->api_version != DAQ_BASE_API_VERSION || base_api->api_size != sizeof(DAQ_BaseAPI_t))
771         return DAQ_ERROR;
772 
773     daq_base_api = *base_api;
774 
775     return DAQ_SUCCESS;
776 }
777 
afpacket_daq_module_unload(void)778 static int afpacket_daq_module_unload(void)
779 {
780     memset(&daq_base_api, 0, sizeof(daq_base_api));
781     return DAQ_SUCCESS;
782 }
783 
afpacket_daq_get_variable_descs(const DAQ_VariableDesc_t ** var_desc_table)784 static int afpacket_daq_get_variable_descs(const DAQ_VariableDesc_t **var_desc_table)
785 {
786     *var_desc_table = afpacket_variable_descriptions;
787 
788     return sizeof(afpacket_variable_descriptions) / sizeof(DAQ_VariableDesc_t);
789 }
790 
afpacket_daq_instantiate(const DAQ_ModuleConfig_h modcfg,DAQ_ModuleInstance_h modinst,void ** ctxt_ptr)791 static int afpacket_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInstance_h modinst, void **ctxt_ptr)
792 {
793     AFPacket_Context_t *afpc;
794     AFPacketInstance *afi;
795     const char *size_str = NULL;
796     char *name1, *name2, *dev;
797     char intf[IFNAMSIZ];
798     size_t len;
799     int num_intfs = 0;
800     int rval = DAQ_ERROR;
801 
802     afpc = calloc(1, sizeof(AFPacket_Context_t));
803     if (!afpc)
804     {
805         SET_ERROR(modinst, "%s: Couldn't allocate memory for the new AFPacket context!", __func__);
806         rval = DAQ_ERROR_NOMEM;
807         goto err;
808     }
809     afpc->modinst = modinst;
810 
811     afpc->device = strdup(daq_base_api.config_get_input(modcfg));
812     if (!afpc->device)
813     {
814         SET_ERROR(modinst, "%s: Couldn't allocate memory for the device string!", __func__);
815         rval = DAQ_ERROR_NOMEM;
816         goto err;
817     }
818 
819     afpc->snaplen = daq_base_api.config_get_snaplen(modcfg);
820     afpc->timeout = (int) daq_base_api.config_get_timeout(modcfg);
821     if (afpc->timeout == 0)
822         afpc->timeout = -1;
823 
824     dev = afpc->device;
825     if (*dev == ':' || ((len = strlen(dev)) > 0 && *(dev + len - 1) == ':') ||
826             (daq_base_api.config_get_mode(modcfg) == DAQ_MODE_PASSIVE && strstr(dev, "::")))
827     {
828         SET_ERROR(modinst, "%s: Invalid interface specification: '%s'!", __func__, afpc->device);
829         goto err;
830     }
831 
832     const char *varKey, *varValue;
833     daq_base_api.config_first_variable(modcfg, &varKey, &varValue);
834     while (varKey)
835     {
836         if (!strcmp(varKey, "buffer_size_mb"))
837             size_str = varValue;
838         else if (!strcmp(varKey, "debug"))
839             afpc->debug = true;
840         else if (!strcmp(varKey, "fanout_type"))
841         {
842             if (!varValue)
843             {
844                 SET_ERROR(modinst, "%s: %s requires an argument!", __func__, varKey);
845                 goto err;
846             }
847             /* Using anything other than 'hash' is probably asking for trouble, but
848                 I'll never stop you from shooting yourself in the foot. */
849             if (!strcmp(varValue, "hash"))
850                 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_HASH;
851             else if (!strcmp(varValue, "lb"))
852                 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_LB;
853             else if (!strcmp(varValue, "cpu"))
854                 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_CPU;
855             else if (!strcmp(varValue, "rollover"))
856                 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_ROLLOVER;
857             else if (!strcmp(varValue, "rnd"))
858                 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_RND;
859             else if (!strcmp(varValue, "qm"))
860                 afpc->fanout_cfg.fanout_type = PACKET_FANOUT_QM;
861             else
862             {
863                 SET_ERROR(modinst, "%s: Unrecognized argument for %s: '%s'!", __func__, varKey, varValue);
864                 goto err;
865             }
866             afpc->fanout_cfg.enabled = true;
867         }
868         else if (!strcmp(varKey, "fanout_flag"))
869         {
870             if (!varValue)
871             {
872                 SET_ERROR(modinst, "%s: %s requires an argument!", __func__, varKey);
873                 goto err;
874             }
875             if (!strcmp(varValue, "rollover"))
876                 afpc->fanout_cfg.fanout_flags |= PACKET_FANOUT_FLAG_ROLLOVER;
877             else if (!strcmp(varValue, "defrag"))
878                 afpc->fanout_cfg.fanout_flags |= PACKET_FANOUT_FLAG_DEFRAG;
879             else
880             {
881                 SET_ERROR(modinst, "%s: Unrecognized argument for %s: '%s'!", __func__, varKey, varValue);
882                 goto err;
883             }
884         }
885         else if (!strcmp(varKey, "use_tx_ring"))
886             afpc->use_tx_ring = true;
887 
888         daq_base_api.config_next_variable(modcfg, &varKey, &varValue);
889     }
890 
891     uint32_t size;
892     if (size_str && strcmp("max", size_str) != 0)
893         size = strtoul(size_str, NULL, 10);
894     else
895         size = AF_PACKET_DEFAULT_BUFFER_SIZE;
896     /* The size is specified in megabytes.  Convert it to bytes. */
897     size = size * 1024 * 1024;
898 
899     while (*dev != '\0')
900     {
901         len = strcspn(dev, ":");
902         if (len >= IFNAMSIZ)
903         {
904             SET_ERROR(modinst, "%s: Interface name too long! (%zu)", __func__, len);
905             goto err;
906         }
907         if (len != 0)
908         {
909             afpc->intf_count++;
910             if (afpc->intf_count >= AF_PACKET_MAX_INTERFACES)
911             {
912                 SET_ERROR(modinst, "%s: Using more than %d interfaces is not supported!", __func__, AF_PACKET_MAX_INTERFACES);
913                 goto err;
914             }
915             snprintf(intf, len + 1, "%s", dev);
916             afi = create_instance(afpc, intf);
917             if (!afi)
918                 goto err;
919 
920             afi->next = afpc->instances;
921             afpc->instances = afi;
922             num_intfs++;
923             if (daq_base_api.config_get_mode(modcfg) != DAQ_MODE_PASSIVE)
924             {
925                 if (num_intfs == 2)
926                 {
927                     name1 = afpc->instances->next->name;
928                     name2 = afpc->instances->name;
929 
930                     if (create_bridge(afpc, name1, name2) != DAQ_SUCCESS)
931                     {
932                         SET_ERROR(modinst, "%s: Couldn't create the bridge between %s and %s!", __func__, name1, name2);
933                         goto err;
934                     }
935                     num_intfs = 0;
936                 }
937                 else if (num_intfs > 2)
938                     break;
939             }
940         }
941         else
942             len = 1;
943         dev += len;
944     }
945 
946     /* If there are any leftover unbridged interfaces and we're not in Passive mode, error out. */
947     if (!afpc->instances || (daq_base_api.config_get_mode(modcfg) != DAQ_MODE_PASSIVE && num_intfs != 0))
948     {
949         SET_ERROR(modinst, "%s: Invalid interface specification: '%s'!", __func__, afpc->device);
950         goto err;
951     }
952 
953     /*
954      * Determine the dimensions of the kernel RX/TX ring(s) to request.
955      * Divide the total packet buffer memory evenly across the number of rings.
956      * (One per passive interface, two per inline.)
957      */
958     unsigned num_rings = 0;
959     for (afi = afpc->instances; afi; afi = afi->next)
960         num_rings += (afi->peer && afpc->use_tx_ring) ? 2 : 1;
961     afpc->ring_size = size / num_rings;
962 
963     /* Create the RX (and potentially TX) rings and map them into userspace. */
964     for (afi = afpc->instances; afi; afi = afi->next)
965     {
966         if ((rval = create_instance_rings(afpc, afi)) != DAQ_SUCCESS)
967             goto err;
968     }
969 
970     /* Finally, create the message buffer pool. */
971     uint32_t pool_size = daq_base_api.config_get_msg_pool_size(modcfg);
972     if (pool_size == 0)
973     {
974         /* Default the pool size to 10% of the allocated RX frames. */
975         for (afi = afpc->instances; afi; afi = afi->next)
976             pool_size += afi->rx_ring.layout.tp_frame_nr;
977         pool_size /= 10;
978     }
979     if ((rval = create_packet_pool(afpc, pool_size)) != DAQ_SUCCESS)
980         goto err;
981 
982     afpc->curr_instance = afpc->instances;
983 
984     *ctxt_ptr = afpc;
985 
986     return DAQ_SUCCESS;
987 
988 err:
989     if (afpc)
990     {
991         af_packet_close(afpc);
992         if (afpc->device)
993             free(afpc->device);
994         destroy_packet_pool(afpc);
995         free(afpc);
996     }
997     return rval;
998 }
999 
afpacket_daq_destroy(void * handle)1000 static void afpacket_daq_destroy(void *handle)
1001 {
1002     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1003 
1004     af_packet_close(afpc);
1005     if (afpc->device)
1006         free(afpc->device);
1007     if (afpc->filter)
1008         free(afpc->filter);
1009     destroy_packet_pool(afpc);
1010     free(afpc);
1011 }
1012 
afpacket_daq_set_filter(void * handle,const char * filter)1013 static int afpacket_daq_set_filter(void *handle, const char *filter)
1014 {
1015 #ifdef LIBPCAP_AVAILABLE
1016     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1017     struct bpf_program fcode;
1018 
1019     if (afpc->filter)
1020         free(afpc->filter);
1021 
1022     afpc->filter = strdup(filter);
1023     if (!afpc->filter)
1024     {
1025         SET_ERROR(afpc->modinst, "%s: Couldn't allocate memory for the filter string!", __func__);
1026         return DAQ_ERROR;
1027     }
1028 
1029     pthread_mutex_lock(&bpf_mutex);
1030     if (pcap_compile_nopcap(afpc->snaplen, DLT_EN10MB, &fcode, afpc->filter, 1, PCAP_NETMASK_UNKNOWN) == -1)
1031     {
1032         pthread_mutex_unlock(&bpf_mutex);
1033         SET_ERROR(afpc->modinst, "%s: BPF state machine compilation failed!", __func__);
1034         return DAQ_ERROR;
1035     }
1036     pthread_mutex_unlock(&bpf_mutex);
1037 
1038     pcap_freecode(&afpc->fcode);
1039     afpc->fcode.bf_len = fcode.bf_len;
1040     afpc->fcode.bf_insns = fcode.bf_insns;
1041 
1042     return DAQ_SUCCESS;
1043 #else
1044     return DAQ_ERROR_NOTSUP;
1045 #endif
1046 }
1047 
afpacket_daq_start(void * handle)1048 static int afpacket_daq_start(void *handle)
1049 {
1050     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1051     AFPacketInstance *instance;
1052 
1053     for (instance = afpc->instances; instance; instance = instance->next)
1054     {
1055         if (start_instance(afpc, instance) != 0)
1056             return DAQ_ERROR;
1057     }
1058 
1059     reset_stats(afpc);
1060 
1061     return DAQ_SUCCESS;
1062 }
1063 
afpacket_transmit_packet(AFPacketInstance * egress,const uint8_t * packet_data,unsigned int len)1064 static inline int afpacket_transmit_packet(AFPacketInstance *egress, const uint8_t *packet_data, unsigned int len)
1065 {
1066     if (egress)
1067     {
1068         if (egress->tx_ring.size)
1069         {
1070             AFPacketEntry *entry;
1071 
1072             entry = egress->tx_ring.cursor;
1073             if (entry->hdr.h2->tp_status != TP_STATUS_AVAILABLE)
1074             {
1075                 /* FIXME: This should probably wait for a TX slot to free up via poll(). */
1076                 return DAQ_ERROR_AGAIN;
1077             }
1078             memcpy(entry->hdr.raw + TPACKET_ALIGN(egress->tp_hdrlen), packet_data, len);
1079             entry->hdr.h2->tp_len = len;
1080             entry->hdr.h2->tp_status = TP_STATUS_SEND_REQUEST;
1081             /* FIXME: This should call sendto() with MSG_DONTWAIT and handle no-buffer conditions gracefully.
1082                 Performance without MSG_DONTWAIT is apparently pretty miserable. */
1083             if (send(egress->fd, NULL, 0, 0) < 0)
1084                 return DAQ_ERROR;
1085             egress->tx_ring.cursor = entry->next;
1086         }
1087         else
1088         {
1089             while (send(egress->fd, packet_data, len, 0) < 0)
1090             {
1091                 if (errno == ENOBUFS)
1092                 {
1093                     struct pollfd pfd;
1094                     pfd.fd = egress->fd;
1095                     pfd.revents = 0;
1096                     pfd.events = POLLOUT;
1097                     if (poll(&pfd, 1, 10) > 0 && (pfd.revents & POLLOUT))
1098                         continue;
1099                 }
1100                 return DAQ_ERROR;
1101             }
1102         }
1103     }
1104 
1105     return DAQ_SUCCESS;
1106 }
1107 
afpacket_inject_packet(AFPacket_Context_t * afpc,AFPacketInstance * egress,const uint8_t * data,uint32_t data_len)1108 static int afpacket_inject_packet(AFPacket_Context_t *afpc, AFPacketInstance *egress, const uint8_t *data, uint32_t data_len)
1109 {
1110     if (!egress)
1111     {
1112         SET_ERROR(afpc->modinst, "%s: Could not determine which instance to inject the packet out of!", __func__);
1113         return DAQ_ERROR;
1114     }
1115 
1116     int rval = afpacket_transmit_packet(egress, data, data_len);
1117     if (rval != DAQ_SUCCESS)
1118     {
1119         if (rval == DAQ_ERROR_AGAIN)
1120             SET_ERROR(afpc->modinst, "%s: Could not send packet because the TX ring is full.", __func__);
1121         else
1122             SET_ERROR(afpc->modinst, "%s: Error sending packet: %s (%d)", __func__, strerror(errno), errno);
1123         return rval;
1124     }
1125 
1126     afpc->stats.packets_injected++;
1127 
1128     return DAQ_SUCCESS;
1129 }
1130 
afpacket_daq_inject(void * handle,DAQ_MsgType type,const void * hdr,const uint8_t * data,uint32_t data_len)1131 static int afpacket_daq_inject(void *handle, DAQ_MsgType type, const void *hdr, const uint8_t *data, uint32_t data_len)
1132 {
1133     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1134 
1135     if (type != DAQ_MSG_TYPE_PACKET)
1136         return DAQ_ERROR_NOTSUP;
1137 
1138     const DAQ_PktHdr_t *pkthdr = (const DAQ_PktHdr_t *) hdr;
1139     AFPacketInstance *egress;
1140 
1141     for (egress = afpc->instances; egress; egress = egress->next)
1142     {
1143         if (egress->index == pkthdr->ingress_index)
1144             break;
1145     }
1146 
1147     return afpacket_inject_packet(afpc, egress, data, data_len);
1148 }
1149 
afpacket_daq_inject_relative(void * handle,const DAQ_Msg_t * msg,const uint8_t * data,uint32_t data_len,int reverse)1150 static int afpacket_daq_inject_relative(void *handle, const DAQ_Msg_t *msg, const uint8_t *data, uint32_t data_len, int reverse)
1151 {
1152     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1153     AFPacketPktDesc *desc = (AFPacketPktDesc *) msg->priv;
1154     AFPacketInstance *egress = reverse ? desc->instance : desc->instance->peer;
1155 
1156     return afpacket_inject_packet(afpc, egress, data, data_len);
1157 }
1158 
afpacket_daq_interrupt(void * handle)1159 static int afpacket_daq_interrupt(void *handle)
1160 {
1161     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1162 
1163     afpc->interrupted = true;
1164 
1165     return DAQ_SUCCESS;
1166 }
1167 
afpacket_daq_stop(void * handle)1168 static int afpacket_daq_stop(void *handle)
1169 {
1170     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1171 
1172     af_packet_close(afpc);
1173 
1174     return DAQ_SUCCESS;
1175 }
1176 
afpacket_daq_ioctl(void * handle,DAQ_IoctlCmd cmd,void * arg,size_t arglen)1177 static int afpacket_daq_ioctl(void *handle, DAQ_IoctlCmd cmd, void *arg, size_t arglen)
1178 {
1179     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1180 
1181     /* Only supports GET_DEVICE_INDEX for now */
1182     if (cmd != DIOCTL_GET_DEVICE_INDEX || arglen != sizeof(DIOCTL_QueryDeviceIndex))
1183         return DAQ_ERROR_NOTSUP;
1184 
1185     DIOCTL_QueryDeviceIndex *qdi = (DIOCTL_QueryDeviceIndex *) arg;
1186 
1187     if (!qdi->device)
1188     {
1189         SET_ERROR(afpc->modinst, "No device name to find the index of!");
1190         return DAQ_ERROR_INVAL;
1191     }
1192 
1193     for (AFPacketInstance *instance = afpc->instances; instance; instance = instance->next)
1194     {
1195         if (!strcmp(qdi->device, instance->name))
1196         {
1197             qdi->index = instance->index;
1198             return DAQ_SUCCESS;
1199         }
1200     }
1201 
1202     return DAQ_ERROR_NODEV;
1203 }
1204 
afpacket_daq_get_stats(void * handle,DAQ_Stats_t * stats)1205 static int afpacket_daq_get_stats(void *handle, DAQ_Stats_t *stats)
1206 {
1207     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1208 
1209     update_hw_stats(afpc);
1210     memcpy(stats, &afpc->stats, sizeof(DAQ_Stats_t));
1211 
1212     return DAQ_SUCCESS;
1213 }
1214 
afpacket_daq_reset_stats(void * handle)1215 static void afpacket_daq_reset_stats(void *handle)
1216 {
1217     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1218 
1219     reset_stats(afpc);
1220 }
1221 
afpacket_daq_get_snaplen(void * handle)1222 static int afpacket_daq_get_snaplen(void *handle)
1223 {
1224     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1225 
1226     /* Note: This returns the maximum capture length that will be returned by the kernel.
1227         It is slightly larger than the originally requested snaplen due to reserving room
1228         for reconstructing the VLAN tag as well as rounding up due to alignment. */
1229 
1230     return afpc->instances->actual_snaplen;
1231 }
1232 
afpacket_daq_get_capabilities(void * handle)1233 static uint32_t afpacket_daq_get_capabilities(void *handle)
1234 {
1235     uint32_t capabilities = DAQ_CAPA_BLOCK | DAQ_CAPA_REPLACE | DAQ_CAPA_INJECT |
1236                             DAQ_CAPA_UNPRIV_START | DAQ_CAPA_INTERRUPT | DAQ_CAPA_DEVICE_INDEX;
1237 #ifdef LIBPCAP_AVAILABLE
1238     capabilities |= DAQ_CAPA_BPF;
1239 #endif
1240     return capabilities;
1241 }
1242 
afpacket_daq_get_datalink_type(void * handle)1243 static int afpacket_daq_get_datalink_type(void *handle)
1244 {
1245     return DLT_EN10MB;
1246 }
1247 
find_packet(AFPacket_Context_t * afpc)1248 static inline AFPacketEntry *find_packet(AFPacket_Context_t *afpc)
1249 {
1250     AFPacketInstance *instance;
1251     AFPacketEntry *entry;
1252 
1253     instance = afpc->curr_instance;
1254     do
1255     {
1256         instance = instance->next ? instance->next : afpc->instances;
1257         if (instance->rx_ring.cursor->hdr.h2->tp_status & TP_STATUS_USER)
1258         {
1259             afpc->curr_instance = instance;
1260             entry = instance->rx_ring.cursor;
1261             instance->rx_ring.cursor = entry->next;
1262             return entry;
1263         }
1264     } while (instance != afpc->curr_instance);
1265 
1266     return NULL;
1267 }
1268 
wait_for_packet(AFPacket_Context_t * afpc)1269 static inline DAQ_RecvStatus wait_for_packet(AFPacket_Context_t *afpc)
1270 {
1271     AFPacketInstance *instance;
1272     struct pollfd pfd[AF_PACKET_MAX_INTERFACES];
1273     uint32_t i;
1274 
1275     for (i = 0, instance = afpc->instances; instance; i++, instance = instance->next)
1276     {
1277         pfd[i].fd = instance->fd;
1278         pfd[i].revents = 0;
1279         pfd[i].events = POLLIN;
1280     }
1281     /* Chop the timeout into one second chunks (plus any remainer) to improve responsiveness to
1282         interruption when there is no traffic and the timeout is very long (or unlimited). */
1283     int timeout = afpc->timeout;
1284     while (timeout != 0)
1285     {
1286         /* If the receive has been canceled, break out of the loop and return. */
1287         if (afpc->interrupted)
1288         {
1289             afpc->interrupted = false;
1290             return DAQ_RSTAT_INTERRUPTED;
1291         }
1292 
1293         int poll_timeout;
1294         if (timeout >= 1000)
1295         {
1296             poll_timeout = 1000;
1297             timeout -= 1000;
1298         }
1299         else if (timeout > 0)
1300         {
1301             poll_timeout = timeout;
1302             timeout = 0;
1303         }
1304         else
1305             poll_timeout = 1000;
1306 
1307         int ret = poll(pfd, afpc->intf_count, poll_timeout);
1308         /* If some number of of sockets have events returned, check them all for badness. */
1309         if (ret > 0)
1310         {
1311             for (i = 0; i < afpc->intf_count; i++)
1312             {
1313                 if (pfd[i].revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL))
1314                 {
1315                     if (pfd[i].revents & (POLLHUP | POLLRDHUP))
1316                         SET_ERROR(afpc->modinst, "%s: Hang-up on a packet socket", __func__);
1317                     else if (pfd[i].revents & POLLERR)
1318                         SET_ERROR(afpc->modinst, "%s: Encountered error condition on a packet socket", __func__);
1319                     else if (pfd[i].revents & POLLNVAL)
1320                         SET_ERROR(afpc->modinst, "%s: Invalid polling request on a packet socket", __func__);
1321                     return DAQ_RSTAT_ERROR;
1322                 }
1323             }
1324             /* All good! A packet should be waiting for us somewhere. */
1325             return DAQ_RSTAT_OK;
1326         }
1327         /* If we were interrupted by a signal, start the loop over.  The user should call daq_interrupt to actually exit. */
1328         if (ret < 0 && errno != EINTR)
1329         {
1330             SET_ERROR(afpc->modinst, "%s: Poll failed: %s (%d)", __func__, strerror(errno), errno);
1331             return DAQ_RSTAT_ERROR;
1332         }
1333     }
1334 
1335     return DAQ_RSTAT_TIMEOUT;
1336 }
1337 
afpacket_daq_msg_receive(void * handle,const unsigned max_recv,const DAQ_Msg_t * msgs[],DAQ_RecvStatus * rstat)1338 static unsigned afpacket_daq_msg_receive(void *handle, const unsigned max_recv, const DAQ_Msg_t *msgs[], DAQ_RecvStatus *rstat)
1339 {
1340     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1341     AFPacketInstance *instance;
1342     DAQ_RecvStatus status = DAQ_RSTAT_OK;
1343     unsigned idx = 0;
1344 
1345     while (idx < max_recv)
1346     {
1347         /* Check to see if the receive has been canceled.  If so, reset it and return appropriately. */
1348         if (afpc->interrupted)
1349         {
1350             afpc->interrupted = false;
1351             status = DAQ_RSTAT_INTERRUPTED;
1352             break;
1353         }
1354 
1355         /* Make sure that we have a packet descriptor available to populate. */
1356         AFPacketPktDesc *desc = afpc->pool.freelist;
1357         if (!desc)
1358         {
1359             status = DAQ_RSTAT_NOBUF;
1360             break;
1361         }
1362 
1363         /* Try to find a packet ready for processing from one of the RX rings. */
1364         AFPacketEntry *entry = find_packet(afpc);
1365         if (!entry)
1366         {
1367             /* Only block waiting for a packet if we haven't received anything yet. */
1368             /* FIXIT-L - Bad interaction with leading packets filtered by BPF? */
1369             if (idx != 0)
1370             {
1371                 status = DAQ_RSTAT_WOULD_BLOCK;
1372                 break;
1373             }
1374             status = wait_for_packet(afpc);
1375             if (status != DAQ_RSTAT_OK)
1376                 break;
1377             continue;
1378         }
1379 
1380         unsigned int tp_len, tp_mac, tp_snaplen, tp_sec, tp_usec;
1381         tp_len = entry->hdr.h2->tp_len;
1382         tp_mac = entry->hdr.h2->tp_mac;
1383         tp_snaplen = entry->hdr.h2->tp_snaplen;
1384         tp_sec = entry->hdr.h2->tp_sec;
1385         tp_usec = entry->hdr.h2->tp_nsec / 1000;
1386         instance = afpc->curr_instance;
1387         if (tp_mac + tp_snaplen > instance->rx_ring.layout.tp_frame_size)
1388         {
1389             SET_ERROR(afpc->modinst, "%s: Corrupted frame on kernel ring (MAC offset %u + CapLen %u > FrameSize %d)",
1390                     __func__, tp_mac, tp_snaplen, afpc->curr_instance->rx_ring.layout.tp_frame_size);
1391             status = DAQ_RSTAT_ERROR;
1392             break;
1393         }
1394 
1395         uint8_t *data = entry->hdr.raw + tp_mac;
1396         /* Make a valiant attempt at reconstructing the VLAN tag if it has been stripped by moving the
1397            MAC addresses backward into the reserved space to make room for the VLAN tag and filling
1398            that tag structure in.  */
1399         if (
1400                 (entry->hdr.h2->tp_vlan_tci || (entry->hdr.h2->tp_status & TP_STATUS_VLAN_VALID)) &&
1401                 tp_snaplen >= (unsigned int) vlan_offset)
1402         {
1403             struct vlan_tag *tag;
1404 
1405             data -= VLAN_TAG_LEN;
1406             memmove((void *) data, data + VLAN_TAG_LEN, vlan_offset);
1407 
1408             tag = (struct vlan_tag *) (data + vlan_offset);
1409             if (entry->hdr.h2->tp_vlan_tpid && (entry->hdr.h2->tp_status & TP_STATUS_VLAN_TPID_VALID))
1410                 tag->vlan_tpid = htons(entry->hdr.h2->tp_vlan_tpid);
1411             else
1412                 tag->vlan_tpid = htons(ETH_P_8021Q);
1413             tag->vlan_tci = htons(entry->hdr.h2->tp_vlan_tci);
1414 
1415             tp_snaplen += VLAN_TAG_LEN;
1416             tp_len += VLAN_TAG_LEN;
1417         }
1418 #ifdef LIBPCAP_AVAILABLE
1419         /* Check to see if this hits the BPF.  If it does, dispose of it and
1420            move on to the next packet (transmitting in the inline scenario). */
1421         if (afpc->fcode.bf_insns && bpf_filter(afpc->fcode.bf_insns, data, tp_len, tp_snaplen) == 0)
1422         {
1423             afpc->stats.packets_filtered++;
1424             afpacket_transmit_packet(instance->peer, data, tp_snaplen);
1425             entry->hdr.h2->tp_status = TP_STATUS_KERNEL;
1426             continue;
1427         }
1428 #endif
1429         afpc->stats.packets_received++;
1430 
1431         /* Populate the packet descriptor, copying the packet data and releasing the packet
1432            ring entry back to the kernel for reuse. */
1433         memcpy(desc->data, data, tp_snaplen);
1434         entry->hdr.h2->tp_status = TP_STATUS_KERNEL;
1435         desc->instance = instance;
1436         desc->length = tp_snaplen;
1437 
1438         /* Next, set up the DAQ message.  Most fields are prepopulated and unchanging. */
1439         DAQ_Msg_t *msg = &desc->msg;
1440         msg->data_len = tp_snaplen;
1441 
1442         /* Then, set up the DAQ packet header. */
1443         DAQ_PktHdr_t *pkthdr = &desc->pkthdr;
1444         pkthdr->ts.tv_sec = tp_sec;
1445         pkthdr->ts.tv_usec = tp_usec;
1446         pkthdr->pktlen = tp_len;
1447         pkthdr->ingress_index = instance->index;
1448         pkthdr->egress_index = instance->peer ? instance->peer->index : DAQ_PKTHDR_UNKNOWN;
1449         pkthdr->flags = 0;
1450         /* The following fields should remain in their virgin state:
1451             address_space_id (0)
1452             ingress_group (DAQ_PKTHDR_UNKNOWN)
1453             egress_group (DAQ_PKTHDR_UNKNOWN)
1454             opaque (0)
1455             flow_id (0)
1456          */
1457 
1458         /* Last, but not least, extract this descriptor from the free list and
1459            place the message in the return vector. */
1460         afpc->pool.freelist = desc->next;
1461         desc->next = NULL;
1462         afpc->pool.info.available--;
1463         msgs[idx] = &desc->msg;
1464 
1465         idx++;
1466     }
1467 
1468     *rstat = status;
1469 
1470     return idx;
1471 }
1472 
1473 static const DAQ_Verdict verdict_translation_table[MAX_DAQ_VERDICT] = {
1474     DAQ_VERDICT_PASS,       /* DAQ_VERDICT_PASS */
1475     DAQ_VERDICT_BLOCK,      /* DAQ_VERDICT_BLOCK */
1476     DAQ_VERDICT_PASS,       /* DAQ_VERDICT_REPLACE */
1477     DAQ_VERDICT_PASS,       /* DAQ_VERDICT_WHITELIST */
1478     DAQ_VERDICT_BLOCK,      /* DAQ_VERDICT_BLACKLIST */
1479     DAQ_VERDICT_PASS        /* DAQ_VERDICT_IGNORE */
1480 };
1481 
afpacket_daq_msg_finalize(void * handle,const DAQ_Msg_t * msg,DAQ_Verdict verdict)1482 static int afpacket_daq_msg_finalize(void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict)
1483 {
1484     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1485     AFPacketPktDesc *desc = (AFPacketPktDesc *) msg->priv;
1486 
1487     /* Sanitize and enact the verdict. */
1488     if (verdict >= MAX_DAQ_VERDICT)
1489         verdict = DAQ_VERDICT_PASS;
1490     afpc->stats.verdicts[verdict]++;
1491     verdict = verdict_translation_table[verdict];
1492     if (verdict == DAQ_VERDICT_PASS)
1493         afpacket_transmit_packet(desc->instance->peer, desc->data, desc->length);
1494 
1495     /* Toss the descriptor back on the free list for reuse. */
1496     desc->next = afpc->pool.freelist;
1497     afpc->pool.freelist = desc;
1498     afpc->pool.info.available++;
1499 
1500     return DAQ_SUCCESS;
1501 }
1502 
afpacket_daq_get_msg_pool_info(void * handle,DAQ_MsgPoolInfo_t * info)1503 static int afpacket_daq_get_msg_pool_info(void *handle, DAQ_MsgPoolInfo_t *info)
1504 {
1505     AFPacket_Context_t *afpc = (AFPacket_Context_t *) handle;
1506 
1507     *info = afpc->pool.info;
1508 
1509     return DAQ_SUCCESS;
1510 }
1511 
1512 #ifdef BUILDING_SO
1513 DAQ_SO_PUBLIC const DAQ_ModuleAPI_t DAQ_MODULE_DATA =
1514 #else
1515 const DAQ_ModuleAPI_t afpacket_daq_module_data =
1516 #endif
1517 {
1518     /* .api_version = */ DAQ_MODULE_API_VERSION,
1519     /* .api_size = */ sizeof(DAQ_ModuleAPI_t),
1520     /* .module_version = */ DAQ_AFPACKET_VERSION,
1521     /* .name = */ "afpacket",
1522     /* .type = */ DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_INLINE_CAPABLE | DAQ_TYPE_MULTI_INSTANCE,
1523     /* .load = */ afpacket_daq_module_load,
1524     /* .unload = */ afpacket_daq_module_unload,
1525     /* .get_variable_descs = */ afpacket_daq_get_variable_descs,
1526     /* .instantiate = */ afpacket_daq_instantiate,
1527     /* .destroy = */ afpacket_daq_destroy,
1528     /* .set_filter = */ afpacket_daq_set_filter,
1529     /* .start = */ afpacket_daq_start,
1530     /* .inject = */ afpacket_daq_inject,
1531     /* .inject_relative = */ afpacket_daq_inject_relative,
1532     /* .interrupt = */ afpacket_daq_interrupt,
1533     /* .stop = */ afpacket_daq_stop,
1534     /* .ioctl = */ afpacket_daq_ioctl,
1535     /* .get_stats = */ afpacket_daq_get_stats,
1536     /* .reset_stats = */ afpacket_daq_reset_stats,
1537     /* .get_snaplen = */ afpacket_daq_get_snaplen,
1538     /* .get_capabilities = */ afpacket_daq_get_capabilities,
1539     /* .get_datalink_type = */ afpacket_daq_get_datalink_type,
1540     /* .config_load = */ NULL,
1541     /* .config_swap = */ NULL,
1542     /* .config_free = */ NULL,
1543     /* .msg_receive = */ afpacket_daq_msg_receive,
1544     /* .msg_finalize = */ afpacket_daq_msg_finalize,
1545     /* .get_msg_pool_info = */ afpacket_daq_get_msg_pool_info,
1546 };
1547