1 // log4cpp logging facility
2 #include "log4cpp/Category.hh"
3 #include "log4cpp/Appender.hh"
4 #include "log4cpp/FileAppender.hh"
5 #include "log4cpp/OstreamAppender.hh"
6 #include "log4cpp/Layout.hh"
7 #include "log4cpp/BasicLayout.hh"
8 #include "log4cpp/PatternLayout.hh"
9 #include "log4cpp/Priority.hh"
10
11 #include <boost/version.hpp>
12 #include <boost/algorithm/string.hpp>
13
14 #include "../fast_library.h"
15
16 // For support uint32_t, uint16_t
17 #include <sys/types.h>
18
19 // For config map operations
20 #include <string>
21 #include <map>
22
23 #include <stdio.h>
24 #include <iostream>
25 #include <string>
26
27 #include "../fastnetmon_packet_parser.h"
28
29 // For support: IPPROTO_TCP, IPPROTO_ICMP, IPPROTO_UDP
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33
34 #include "afpacket_collector.h"
35
36 #include <boost/thread.hpp>
37 #include <sys/mman.h>
38 #include <poll.h>
39 #include <arpa/inet.h>
40 #include <sys/socket.h>
41 #include <sys/ioctl.h>
42 #include <net/if.h>
43 #include <linux/if_packet.h>
44 #include <net/ethernet.h> /* the L2 protocols */
45
46 #include "../unified_parser.hpp"
47
48 bool afpacket_read_packet_length_from_ip_header = false;
49
50 // Get log4cpp logger from main program
51 extern log4cpp::Category& logger;
52
53 // Pass unparsed packets number to main program
54 extern uint64_t total_unparsed_packets;
55
56 // Global configuration map
57 extern std::map<std::string, std::string> configuration_map;
58
59 // This variable name should be uniq for every plugin!
60 process_packet_pointer afpacket_process_func_ptr = NULL;
61
62 // 4194304 bytes
63 unsigned int blocksiz = 1 << 22;
64 // 2048 bytes
65 unsigned int framesiz = 1 << 11;
66 unsigned int blocknum = 64;
67
68 struct block_desc {
69 uint32_t version;
70 uint32_t offset_to_priv;
71 struct tpacket_hdr_v1 h1;
72 };
73
74 // Get interface number by name
get_interface_number_by_device_name(int socket_fd,std::string interface_name)75 int get_interface_number_by_device_name(int socket_fd, std::string interface_name) {
76 struct ifreq ifr;
77 memset(&ifr, 0, sizeof(ifr));
78
79 if (interface_name.size() > IFNAMSIZ) {
80 return -1;
81 }
82
83 strncpy(ifr.ifr_name, interface_name.c_str(), sizeof(ifr.ifr_name));
84
85 if (ioctl(socket_fd, SIOCGIFINDEX, &ifr) == -1) {
86 return -1;
87 }
88
89 return ifr.ifr_ifindex;
90 }
91
flush_block(struct block_desc * pbd)92 void flush_block(struct block_desc *pbd) {
93 pbd->h1.block_status = TP_STATUS_KERNEL;
94 }
95
walk_block(struct block_desc * pbd,const int block_num)96 void walk_block(struct block_desc *pbd, const int block_num) {
97 int num_pkts = pbd->h1.num_pkts, i;
98 unsigned long bytes = 0;
99 struct tpacket3_hdr *ppd;
100
101 ppd = (struct tpacket3_hdr *) ((uint8_t *) pbd +
102 pbd->h1.offset_to_first_pkt);
103 for (i = 0; i < num_pkts; ++i) {
104 bytes += ppd->tp_snaplen;
105
106 // struct ethhdr *eth = (struct ethhdr *) ((uint8_t *) ppd + ppd->tp_mac);
107 // Print packets
108
109 struct pfring_pkthdr packet_header;
110 memset(&packet_header, 0, sizeof(packet_header));
111 packet_header.len = ppd->tp_snaplen;
112 packet_header.caplen = ppd->tp_snaplen;
113
114 u_int8_t timestamp = 0;
115 u_int8_t add_hash = 0;
116
117 u_char* data_pointer = (u_char*)((uint8_t *) ppd + ppd->tp_mac);
118
119 simple_packet packet;
120 int parser_result = parse_raw_packet_to_simple_packet((u_char*)data_pointer, ppd->tp_snaplen, packet, afpacket_read_packet_length_from_ip_header);
121
122 //char print_buffer[512];
123 //fastnetmon_print_parsed_pkt(print_buffer, 512, data_pointer, &packet_header);
124 //printf("%s\n", print_buffer);
125
126 ppd = (struct tpacket3_hdr *) ((uint8_t *) ppd +
127 ppd->tp_next_offset);
128
129 if (parser_result) {
130 afpacket_process_func_ptr(packet);
131 } else {
132 total_unparsed_packets++;
133 }
134 }
135 }
136
setup_socket(std::string interface_name,int fanout_group_id)137 int setup_socket(std::string interface_name, int fanout_group_id) {
138 // More details here: http://man7.org/linux/man-pages/man7/packet.7.html
139 // We could use SOCK_RAW or SOCK_DGRAM for second argument
140 // SOCK_RAW - raw packets pass from the kernel
141 // SOCK_DGRAM - some amount of processing
142 // Third argument manage ether type of captured packets
143 int packet_socket = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
144
145 if (packet_socket == -1) {
146 logger << log4cpp::Priority::ERROR << "Can't create AF_PACKET socket";
147 return -1;
148 }
149
150 // We whould use V3 bcause it could read/pool in per block basis instead per packet
151 int version = TPACKET_V3;
152 int setsockopt_packet_version = setsockopt(packet_socket, SOL_PACKET, PACKET_VERSION, &version, sizeof(version));
153
154 if (setsockopt_packet_version < 0) {
155 logger << log4cpp::Priority::ERROR << "Can't set packet v3 version";
156 return -1;
157 }
158
159 int interface_number = get_interface_number_by_device_name(packet_socket, interface_name);
160
161 if (interface_number == -1) {
162 logger << log4cpp::Priority::ERROR << "Can't get interface number by interface name for " << interface_name;
163 return -1;
164 }
165
166 // Switch to PROMISC mode
167 struct packet_mreq sock_params;
168 memset(&sock_params, 0, sizeof(sock_params));
169 sock_params.mr_type = PACKET_MR_PROMISC;
170 sock_params.mr_ifindex = interface_number;
171
172 int set_promisc = setsockopt(packet_socket, SOL_PACKET, PACKET_ADD_MEMBERSHIP, (void *)&sock_params, sizeof(sock_params));
173
174 if (set_promisc == -1) {
175 logger << log4cpp::Priority::ERROR << "Can't enable promisc mode";
176 return -1;
177 }
178
179 struct sockaddr_ll bind_address;
180 memset(&bind_address, 0, sizeof(bind_address));
181
182 bind_address.sll_family = AF_PACKET;
183 bind_address.sll_protocol = htons(ETH_P_ALL);
184 bind_address.sll_ifindex = interface_number;
185
186 // We will follow http://yusufonlinux.blogspot.ru/2010/11/data-link-access-and-zero-copy.html
187 // And this: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt
188
189 struct tpacket_req3 req;
190 memset(&req, 0, sizeof(req));
191
192 req.tp_block_size = blocksiz;
193 req.tp_frame_size = framesiz;
194 req.tp_block_nr = blocknum;
195 req.tp_frame_nr = (blocksiz * blocknum) / framesiz;
196
197 req.tp_retire_blk_tov = 60; // Timeout in msec
198 req.tp_feature_req_word = TP_FT_REQ_FILL_RXHASH;
199
200 int setsockopt_rx_ring = setsockopt(packet_socket, SOL_PACKET , PACKET_RX_RING , (void*)&req , sizeof(req));
201
202 if (setsockopt_rx_ring == -1) {
203 logger << log4cpp::Priority::ERROR << "Can't enable RX_RING for AF_PACKET socket";
204 return -1;
205 }
206
207 // We use per thread structures
208 uint8_t* mapped_buffer = NULL;
209 struct iovec* rd = NULL;
210
211 mapped_buffer = (uint8_t*)mmap(NULL, req.tp_block_size * req.tp_block_nr, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED, packet_socket, 0);
212
213 if (mapped_buffer == MAP_FAILED) {
214 logger << log4cpp::Priority::ERROR << "MMAP failed";
215 return -1;
216 }
217
218 // Allocate iov structure for each block
219 rd = (struct iovec*)malloc(req.tp_block_nr * sizeof(struct iovec));
220
221 // Initilize iov structures
222 for (int i = 0; i < req.tp_block_nr; ++i) {
223 rd[i].iov_base = mapped_buffer + (i * req.tp_block_size);
224 rd[i].iov_len = req.tp_block_size;
225 }
226
227 int bind_result = bind(packet_socket, (struct sockaddr *)&bind_address, sizeof(bind_address));
228
229 if (bind_result == -1) {
230 logger << log4cpp::Priority::ERROR << "Can't bind to AF_PACKET socket";
231 return -1;
232 }
233
234 if (fanout_group_id) {
235 // PACKET_FANOUT_LB - round robin
236 // PACKET_FANOUT_CPU - send packets to CPU where packet arrived
237 int fanout_type = PACKET_FANOUT_CPU;
238
239 int fanout_arg = (fanout_group_id | (fanout_type << 16));
240
241 int setsockopt_fanout = setsockopt(packet_socket, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg));
242
243 if (setsockopt_fanout < 0) {
244 logger << log4cpp::Priority::ERROR << "Can't configure fanout error number: "<< errno << " error: " << strerror(errno);
245 return -1;
246 }
247 }
248
249 unsigned int current_block_num = 0;
250
251 struct pollfd pfd;
252 memset(&pfd, 0, sizeof(pfd));
253
254 pfd.fd = packet_socket;
255 pfd.events = POLLIN | POLLERR;
256 pfd.revents = 0;
257
258 while (true) {
259 struct block_desc *pbd = (struct block_desc *) rd[current_block_num].iov_base;
260
261 if ((pbd->h1.block_status & TP_STATUS_USER) == 0) {
262 poll(&pfd, 1, -1);
263
264 continue;
265 }
266
267 walk_block(pbd, current_block_num);
268 flush_block(pbd);
269 current_block_num = (current_block_num + 1) % blocknum;
270 }
271
272 return packet_socket;
273 }
274
start_af_packet_capture(std::string interface_name,int fanout_group_id)275 void start_af_packet_capture(std::string interface_name, int fanout_group_id) {
276 setup_socket(interface_name, fanout_group_id);
277 }
278
get_af_packet_stats()279 void get_af_packet_stats() {
280 // getsockopt PACKET_STATISTICS
281 }
282
283 // Could get some speed up on NUMA servers
284 bool afpacket_execute_strict_cpu_affinity = true;
285
start_afpacket_collection(process_packet_pointer func_ptr)286 void start_afpacket_collection(process_packet_pointer func_ptr) {
287 logger << log4cpp::Priority::INFO << "AF_PACKET plugin started";
288 afpacket_process_func_ptr = func_ptr;
289
290 if (configuration_map.count("netmap_read_packet_length_from_ip_header") != 0) {
291 afpacket_read_packet_length_from_ip_header = configuration_map["netmap_read_packet_length_from_ip_header"] == "on";
292 }
293
294 std::string interfaces_list = "";
295
296 if (configuration_map.count("interfaces") != 0) {
297 interfaces_list = configuration_map["interfaces"];
298 }
299
300 std::vector<std::string> interfaces_for_listen;
301 boost::split(interfaces_for_listen, interfaces_list, boost::is_any_of(","), boost::token_compress_on);
302
303 logger << log4cpp::Priority::INFO << "AF_PACKET will listen on " << interfaces_for_listen.size() << " interfaces";
304
305 if (interfaces_for_listen.size() == 0) {
306 logger << log4cpp::Priority::ERROR << "Please specify intreface for AF_PACKET";
307 return;
308 }
309
310 if (interfaces_for_listen.size() > 1) {
311 logger << log4cpp::Priority::WARN << "We support only single interface for AF_PACKET, sorry!";
312 }
313
314 std::string capture_interface = interfaces_for_listen[0];
315
316 int fanout_group_id = getpid() & 0xffff;
317
318 unsigned int num_cpus = sysconf(_SC_NPROCESSORS_ONLN);;
319 logger.info("We have %d cpus for AF_PACKET", num_cpus);
320
321 if (num_cpus > 1) {
322 boost::thread_group packet_receiver_thread_group;
323
324 for (int cpu = 0; cpu < num_cpus; cpu++) {
325
326 // Well, we have thread attributes from Boost 1.50
327 #if defined(BOOST_THREAD_PLATFORM_PTHREAD) && BOOST_VERSION / 100 % 1000 >= 50 && defined(__GLIBC__)
328 boost::thread::attributes thread_attrs;
329
330 if (afpacket_execute_strict_cpu_affinity) {
331 cpu_set_t current_cpu_set;
332
333 int cpu_to_bind = cpu % num_cpus;
334 CPU_ZERO(¤t_cpu_set);
335 // We count cpus from zero
336 CPU_SET(cpu_to_bind, ¤t_cpu_set);
337
338 int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), ¤t_cpu_set);
339
340 if (set_affinity_result != 0) {
341 logger << log4cpp::Priority::ERROR << "Can't set CPU affinity for thread";
342 }
343 }
344
345 packet_receiver_thread_group.add_thread(
346 new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, capture_interface, fanout_group_id))
347 );
348 #else
349 logger.error("Sorry but CPU affinity did not supported for your platform");
350
351 packet_receiver_thread_group.add_thread(
352 new boost::thread(start_af_packet_capture, capture_interface, fanout_group_id)
353 );
354 #endif
355 }
356
357 // Wait all processes for finish
358 packet_receiver_thread_group.join_all();
359 } else {
360 start_af_packet_capture(capture_interface, 0);
361 }
362 }
363