1 // log4cpp logging facility
2 #include "log4cpp/Category.hh"
3 #include "log4cpp/Appender.hh"
4 #include "log4cpp/FileAppender.hh"
5 #include "log4cpp/OstreamAppender.hh"
6 #include "log4cpp/Layout.hh"
7 #include "log4cpp/BasicLayout.hh"
8 #include "log4cpp/PatternLayout.hh"
9 #include "log4cpp/Priority.hh"
10 
11 #include <boost/version.hpp>
12 #include <boost/algorithm/string.hpp>
13 
14 #include "../fast_library.h"
15 
16 // For support uint32_t, uint16_t
17 #include <sys/types.h>
18 
19 // For config map operations
20 #include <string>
21 #include <map>
22 
23 #include <stdio.h>
24 #include <iostream>
25 #include <string>
26 
27 #include "../fastnetmon_packet_parser.h"
28 
29 // For support: IPPROTO_TCP, IPPROTO_ICMP, IPPROTO_UDP
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 
34 #include "afpacket_collector.h"
35 
36 #include <boost/thread.hpp>
37 #include <sys/mman.h>
38 #include <poll.h>
39 #include <arpa/inet.h>
40 #include <sys/socket.h>
41 #include <sys/ioctl.h>
42 #include <net/if.h>
43 #include <linux/if_packet.h>
44 #include <net/ethernet.h> /* the L2 protocols */
45 
46 #include "../unified_parser.hpp"
47 
48 bool afpacket_read_packet_length_from_ip_header = false;
49 
50 // Get log4cpp logger from main program
51 extern log4cpp::Category& logger;
52 
53 // Pass unparsed packets number to main program
54 extern uint64_t total_unparsed_packets;
55 
56 // Global configuration map
57 extern std::map<std::string, std::string> configuration_map;
58 
59 // This variable name should be uniq for every plugin!
60 process_packet_pointer afpacket_process_func_ptr = NULL;
61 
62 // 4194304 bytes
63 unsigned int blocksiz = 1 << 22;
64 // 2048 bytes
65 unsigned int framesiz = 1 << 11;
66 unsigned int blocknum = 64;
67 
68 struct block_desc {
69     uint32_t version;
70     uint32_t offset_to_priv;
71     struct tpacket_hdr_v1 h1;
72 };
73 
74 // Get interface number by name
get_interface_number_by_device_name(int socket_fd,std::string interface_name)75 int get_interface_number_by_device_name(int socket_fd, std::string interface_name) {
76     struct ifreq ifr;
77     memset(&ifr, 0, sizeof(ifr));
78 
79     if (interface_name.size() > IFNAMSIZ) {
80         return -1;
81     }
82 
83     strncpy(ifr.ifr_name, interface_name.c_str(), sizeof(ifr.ifr_name));
84 
85     if (ioctl(socket_fd, SIOCGIFINDEX, &ifr) == -1) {
86         return -1;
87     }
88 
89     return ifr.ifr_ifindex;
90 }
91 
flush_block(struct block_desc * pbd)92 void flush_block(struct block_desc *pbd) {
93     pbd->h1.block_status = TP_STATUS_KERNEL;
94 }
95 
walk_block(struct block_desc * pbd,const int block_num)96 void walk_block(struct block_desc *pbd, const int block_num) {
97     int num_pkts = pbd->h1.num_pkts, i;
98     unsigned long bytes = 0;
99     struct tpacket3_hdr *ppd;
100 
101     ppd = (struct tpacket3_hdr *) ((uint8_t *) pbd +
102                        pbd->h1.offset_to_first_pkt);
103     for (i = 0; i < num_pkts; ++i) {
104         bytes += ppd->tp_snaplen;
105 
106         // struct ethhdr *eth = (struct ethhdr *) ((uint8_t *) ppd + ppd->tp_mac);
107         // Print packets
108 
109         struct pfring_pkthdr packet_header;
110         memset(&packet_header, 0, sizeof(packet_header));
111         packet_header.len = ppd->tp_snaplen;
112         packet_header.caplen = ppd->tp_snaplen;
113 
114         u_int8_t timestamp = 0;
115         u_int8_t add_hash = 0;
116 
117         u_char* data_pointer = (u_char*)((uint8_t *) ppd + ppd->tp_mac);
118 
119         simple_packet packet;
120         int parser_result = parse_raw_packet_to_simple_packet((u_char*)data_pointer, ppd->tp_snaplen, packet, afpacket_read_packet_length_from_ip_header);
121 
122         //char print_buffer[512];
123         //fastnetmon_print_parsed_pkt(print_buffer, 512, data_pointer, &packet_header);
124         //printf("%s\n", print_buffer);
125 
126         ppd = (struct tpacket3_hdr *) ((uint8_t *) ppd +
127                            ppd->tp_next_offset);
128 
129         if (parser_result) {
130             afpacket_process_func_ptr(packet);
131         } else {
132             total_unparsed_packets++;
133         }
134     }
135 }
136 
setup_socket(std::string interface_name,int fanout_group_id)137 int setup_socket(std::string interface_name, int fanout_group_id) {
138     // More details here: http://man7.org/linux/man-pages/man7/packet.7.html
139     // We could use SOCK_RAW or SOCK_DGRAM for second argument
140     // SOCK_RAW - raw packets pass from the kernel
141     // SOCK_DGRAM - some amount of processing
142     // Third argument manage ether type of captured packets
143     int packet_socket = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
144 
145     if (packet_socket == -1) {
146         logger << log4cpp::Priority::ERROR << "Can't create AF_PACKET socket";
147         return -1;
148     }
149 
150     // We whould use V3 bcause it could read/pool in per block basis instead per packet
151     int version = TPACKET_V3;
152     int setsockopt_packet_version = setsockopt(packet_socket, SOL_PACKET, PACKET_VERSION, &version, sizeof(version));
153 
154     if (setsockopt_packet_version < 0) {
155         logger << log4cpp::Priority::ERROR << "Can't set packet v3 version";
156         return -1;
157     }
158 
159     int interface_number = get_interface_number_by_device_name(packet_socket, interface_name);
160 
161     if (interface_number == -1) {
162         logger << log4cpp::Priority::ERROR << "Can't get interface number by interface name for " << interface_name;
163         return -1;
164     }
165 
166     // Switch to PROMISC mode
167     struct packet_mreq sock_params;
168     memset(&sock_params, 0, sizeof(sock_params));
169     sock_params.mr_type = PACKET_MR_PROMISC;
170     sock_params.mr_ifindex = interface_number;
171 
172     int set_promisc = setsockopt(packet_socket, SOL_PACKET, PACKET_ADD_MEMBERSHIP, (void *)&sock_params, sizeof(sock_params));
173 
174     if (set_promisc == -1) {
175         logger << log4cpp::Priority::ERROR << "Can't enable promisc mode";
176         return -1;
177     }
178 
179     struct sockaddr_ll bind_address;
180     memset(&bind_address, 0, sizeof(bind_address));
181 
182     bind_address.sll_family = AF_PACKET;
183     bind_address.sll_protocol = htons(ETH_P_ALL);
184     bind_address.sll_ifindex = interface_number;
185 
186     // We will follow http://yusufonlinux.blogspot.ru/2010/11/data-link-access-and-zero-copy.html
187     // And this: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt
188 
189     struct tpacket_req3 req;
190     memset(&req, 0, sizeof(req));
191 
192     req.tp_block_size = blocksiz;
193     req.tp_frame_size = framesiz;
194     req.tp_block_nr = blocknum;
195     req.tp_frame_nr = (blocksiz * blocknum) / framesiz;
196 
197     req.tp_retire_blk_tov = 60; // Timeout in msec
198     req.tp_feature_req_word = TP_FT_REQ_FILL_RXHASH;
199 
200     int setsockopt_rx_ring = setsockopt(packet_socket, SOL_PACKET , PACKET_RX_RING , (void*)&req , sizeof(req));
201 
202     if (setsockopt_rx_ring == -1) {
203         logger << log4cpp::Priority::ERROR << "Can't enable RX_RING for AF_PACKET socket";
204         return -1;
205     }
206 
207     // We use per thread structures
208     uint8_t* mapped_buffer = NULL;
209     struct iovec* rd = NULL;
210 
211     mapped_buffer = (uint8_t*)mmap(NULL, req.tp_block_size * req.tp_block_nr, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED, packet_socket, 0);
212 
213     if (mapped_buffer == MAP_FAILED) {
214         logger << log4cpp::Priority::ERROR << "MMAP failed";
215         return -1;
216     }
217 
218     // Allocate iov structure for each block
219     rd = (struct iovec*)malloc(req.tp_block_nr * sizeof(struct iovec));
220 
221     // Initilize iov structures
222     for (int i = 0; i < req.tp_block_nr; ++i) {
223         rd[i].iov_base = mapped_buffer + (i * req.tp_block_size);
224         rd[i].iov_len = req.tp_block_size;
225     }
226 
227     int bind_result = bind(packet_socket, (struct sockaddr *)&bind_address, sizeof(bind_address));
228 
229     if (bind_result == -1) {
230         logger << log4cpp::Priority::ERROR << "Can't bind to AF_PACKET socket";
231         return -1;
232     }
233 
234    if (fanout_group_id) {
235         // PACKET_FANOUT_LB - round robin
236         // PACKET_FANOUT_CPU - send packets to CPU where packet arrived
237         int fanout_type = PACKET_FANOUT_CPU;
238 
239         int fanout_arg = (fanout_group_id | (fanout_type << 16));
240 
241         int setsockopt_fanout = setsockopt(packet_socket, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg));
242 
243         if (setsockopt_fanout < 0) {
244             logger << log4cpp::Priority::ERROR << "Can't configure fanout error number: "<< errno << " error: " << strerror(errno);
245             return -1;
246         }
247     }
248 
249     unsigned int current_block_num = 0;
250 
251     struct pollfd pfd;
252     memset(&pfd, 0, sizeof(pfd));
253 
254     pfd.fd = packet_socket;
255     pfd.events = POLLIN | POLLERR;
256     pfd.revents = 0;
257 
258     while (true) {
259         struct block_desc *pbd = (struct block_desc *) rd[current_block_num].iov_base;
260 
261         if ((pbd->h1.block_status & TP_STATUS_USER) == 0) {
262             poll(&pfd, 1, -1);
263 
264             continue;
265         }
266 
267         walk_block(pbd, current_block_num);
268         flush_block(pbd);
269         current_block_num = (current_block_num + 1) % blocknum;
270     }
271 
272     return packet_socket;
273 }
274 
start_af_packet_capture(std::string interface_name,int fanout_group_id)275 void start_af_packet_capture(std::string interface_name, int fanout_group_id) {
276     setup_socket(interface_name, fanout_group_id);
277 }
278 
get_af_packet_stats()279 void get_af_packet_stats() {
280 // getsockopt PACKET_STATISTICS
281 }
282 
283 // Could get some speed up on NUMA servers
284 bool afpacket_execute_strict_cpu_affinity = true;
285 
start_afpacket_collection(process_packet_pointer func_ptr)286 void start_afpacket_collection(process_packet_pointer func_ptr) {
287     logger << log4cpp::Priority::INFO << "AF_PACKET plugin started";
288     afpacket_process_func_ptr = func_ptr;
289 
290     if (configuration_map.count("netmap_read_packet_length_from_ip_header") != 0) {
291         afpacket_read_packet_length_from_ip_header = configuration_map["netmap_read_packet_length_from_ip_header"] == "on";
292     }
293 
294     std::string interfaces_list = "";
295 
296     if (configuration_map.count("interfaces") != 0) {
297         interfaces_list = configuration_map["interfaces"];
298     }
299 
300     std::vector<std::string> interfaces_for_listen;
301     boost::split(interfaces_for_listen, interfaces_list, boost::is_any_of(","), boost::token_compress_on);
302 
303     logger << log4cpp::Priority::INFO << "AF_PACKET will listen on " << interfaces_for_listen.size() << " interfaces";
304 
305     if (interfaces_for_listen.size() == 0) {
306         logger << log4cpp::Priority::ERROR << "Please specify intreface for AF_PACKET";
307         return;
308     }
309 
310     if (interfaces_for_listen.size() > 1) {
311         logger << log4cpp::Priority::WARN << "We support only single interface for AF_PACKET, sorry!";
312     }
313 
314     std::string capture_interface = interfaces_for_listen[0];
315 
316     int fanout_group_id = getpid() & 0xffff;
317 
318     unsigned int num_cpus = sysconf(_SC_NPROCESSORS_ONLN);;
319     logger.info("We have %d cpus for AF_PACKET", num_cpus);
320 
321     if (num_cpus > 1) {
322         boost::thread_group packet_receiver_thread_group;
323 
324         for (int cpu = 0; cpu < num_cpus; cpu++) {
325 
326 // Well, we have thread attributes from Boost 1.50
327 #if defined(BOOST_THREAD_PLATFORM_PTHREAD) && BOOST_VERSION / 100 % 1000 >= 50 && defined(__GLIBC__)
328             boost::thread::attributes thread_attrs;
329 
330             if (afpacket_execute_strict_cpu_affinity) {
331                 cpu_set_t current_cpu_set;
332 
333                 int cpu_to_bind = cpu % num_cpus;
334                 CPU_ZERO(&current_cpu_set);
335                 // We count cpus from zero
336                 CPU_SET(cpu_to_bind, &current_cpu_set);
337 
338                 int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), &current_cpu_set);
339 
340                 if (set_affinity_result != 0) {
341                     logger << log4cpp::Priority::ERROR << "Can't set CPU affinity for thread";
342                 }
343             }
344 
345             packet_receiver_thread_group.add_thread(
346                 new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, capture_interface, fanout_group_id))
347             );
348 #else
349             logger.error("Sorry but CPU affinity did not supported for your platform");
350 
351             packet_receiver_thread_group.add_thread(
352                 new boost::thread(start_af_packet_capture, capture_interface, fanout_group_id)
353             );
354 #endif
355         }
356 
357         // Wait all processes for finish
358         packet_receiver_thread_group.join_all();
359     } else {
360         start_af_packet_capture(capture_interface, 0);
361     }
362 }
363