1 // log4cpp logging facility
2 #include "log4cpp/Category.hh"
3 #include "log4cpp/Appender.hh"
4 #include "log4cpp/FileAppender.hh"
5 #include "log4cpp/OstreamAppender.hh"
6 #include "log4cpp/Layout.hh"
7 #include "log4cpp/BasicLayout.hh"
8 #include "log4cpp/PatternLayout.hh"
9 #include "log4cpp/Priority.hh"
10
11 #include <boost/version.hpp>
12 #include <boost/algorithm/string.hpp>
13
14 #include "../fast_library.h"
15
16 // For support uint32_t, uint16_t
17 #include <sys/types.h>
18
19 // For config map operations
20 #include <string>
21 #include <map>
22
23 #include <stdio.h>
24 #include <iostream>
25 #include <string>
26 #define NETMAP_WITH_LIBS
27
28 // Disable debug messages from Netmap
29 #define NETMAP_NO_DEBUG
30 #include <net/netmap_user.h>
31 #include <boost/thread.hpp>
32
33 #if defined(__FreeBSD__) || defined(__DragonFly__)
34 // On FreeBSD function pthread_attr_setaffinity_np declared here
35 #include <pthread_np.h>
36
37 // Also we have different type name for cpu set's store
38 typedef cpuset_t cpu_set_t;
39 #endif
40
41 #include "../fastnetmon_packet_parser.h"
42
43 #include "../unified_parser.hpp"
44
45 // For pooling operations
46 #include <poll.h>
47
48 // For support: IPPROTO_TCP, IPPROTO_ICMP, IPPROTO_UDP
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #include <netinet/in.h>
52
53 #include "netmap_collector.h"
54
55 // By default we read packet size from link layer
56 // But in case of Juniper we could crop first X bytes from packet:
57 // maximum-packet-length 110;
58 // And this option become mandatory if we want correct bps speed in toolkit
59 bool netmap_read_packet_length_from_ip_header = false;
60
61 uint32_t netmap_sampling_ratio = 1;
62
63 /* prototypes */
64 void netmap_thread(struct nm_desc* netmap_descriptor, int netmap_thread);
65 void consume_pkt(u_char* buffer, int len, int thread_number);
66
67 // Get log4cpp logger from main program
68 extern log4cpp::Category& logger;
69
70 // Pass unparsed packets number to main program
71 extern uint64_t total_unparsed_packets;
72
73 // Global configuration map
74 extern std::map<std::string, std::string> configuration_map;
75
76 u_int num_cpus = 0;
77
78 // This variable name should be uniq for every plugin!
79 process_packet_pointer netmap_process_func_ptr = NULL;
80
81 bool execute_strict_cpu_affinity = true;
82
receive_packets(struct netmap_ring * ring,int thread_number)83 int receive_packets(struct netmap_ring* ring, int thread_number) {
84 u_int cur, rx, n;
85
86 cur = ring->cur;
87 n = nm_ring_space(ring);
88
89 for (rx = 0; rx < n; rx++) {
90 struct netmap_slot* slot = &ring->slot[cur];
91 char* p = NETMAP_BUF(ring, slot->buf_idx);
92
93 // process data
94 consume_pkt((u_char*)p, slot->len, thread_number);
95
96 cur = nm_ring_next(ring, cur);
97 }
98
99 ring->head = ring->cur = cur;
100 return (rx);
101 }
102
consume_pkt(u_char * buffer,int len,int thread_number)103 void consume_pkt(u_char* buffer, int len, int thread_number) {
104 // We should fill this structure for passing to FastNetMon
105 simple_packet packet;
106
107 packet.sample_ratio = netmap_sampling_ratio;
108
109 if (!parse_raw_packet_to_simple_packet(buffer, len, packet, netmap_read_packet_length_from_ip_header)) {
110 total_unparsed_packets++;
111
112 return;
113 }
114
115 netmap_process_func_ptr(packet);
116 }
117
receiver(std::string interface_for_listening)118 void receiver(std::string interface_for_listening) {
119 struct nm_desc* netmap_descriptor;
120
121 struct nmreq base_nmd;
122 bzero(&base_nmd, sizeof(base_nmd));
123
124 // Magic from pkt-gen.c
125 base_nmd.nr_tx_rings = base_nmd.nr_rx_rings = 0;
126 base_nmd.nr_tx_slots = base_nmd.nr_rx_slots = 0;
127
128 std::string interface = "";
129 std::string system_interface_name = "";
130 // If we haven't netmap: prefix in interface name we will append it
131 if (interface_for_listening.find("netmap:") == std::string::npos) {
132 system_interface_name = interface_for_listening;
133
134 interface = "netmap:" + interface_for_listening;
135 } else {
136 // We should skip netmap prefix
137 system_interface_name = boost::replace_all_copy(interface_for_listening, "netmap:", "");
138
139 interface = interface_for_listening;
140 }
141
142 #ifdef __linux__
143 manage_interface_promisc_mode(system_interface_name, true);
144 logger.warn("Please disable all types of offload for this NIC manually: ethtool -K %s gro off gso off tso off lro off", system_interface_name.c_str());
145 #endif
146
147 netmap_descriptor = nm_open(interface.c_str(), &base_nmd, 0, NULL);
148
149 if (netmap_descriptor == NULL) {
150 logger.error("Can't open netmap device %s", interface.c_str());
151 return;
152 }
153
154 logger.info("Mapped %dKB memory at %p", netmap_descriptor->req.nr_memsize >> 10, netmap_descriptor->mem);
155 logger.info("We have %d tx and %d rx rings", netmap_descriptor->req.nr_tx_rings,
156 netmap_descriptor->req.nr_rx_rings);
157
158 if (num_cpus > netmap_descriptor->req.nr_rx_rings) {
159 num_cpus = netmap_descriptor->req.nr_rx_rings;
160
161 logger.info("We have number of CPUs bigger than number of NIC RX queues. Set number of "
162 "CPU's to number of threads");
163 }
164
165 /*
166 protocol stack and may cause a reset of the card,
167 which in turn may take some time for the PHY to
168 reconfigure. We do the open here to have time to reset.
169 */
170
171 int wait_link = 2;
172 logger.info("Wait %d seconds for NIC reset", wait_link);
173 sleep(wait_link);
174
175 boost::thread_group packet_receiver_thread_group;
176
177 for (int i = 0; i < num_cpus; i++) {
178 struct nm_desc nmd = *netmap_descriptor;
179 // This operation is VERY important!
180 nmd.self = &nmd;
181
182 uint64_t nmd_flags = 0;
183
184 if (nmd.req.nr_flags != NR_REG_ALL_NIC) {
185 logger.error("Ooops, main descriptor should be with NR_REG_ALL_NIC flag");
186 }
187
188 nmd.req.nr_flags = NR_REG_ONE_NIC;
189 nmd.req.nr_ringid = i;
190
191 /* Only touch one of the rings (rx is already ok) */
192 nmd_flags |= NETMAP_NO_TX_POLL;
193
194 struct nm_desc* new_nmd =
195 nm_open(interface.c_str(), NULL, nmd_flags | NM_OPEN_IFNAME | NM_OPEN_NO_MMAP, &nmd);
196
197 if (new_nmd == NULL) {
198 logger.error("Can't open netmap descriptor for netmap per hardware queue thread");
199 return;
200 }
201
202 logger.info("My first ring is %d and last ring id is %d I'm thread %d",
203 new_nmd->first_rx_ring, new_nmd->last_rx_ring, i);
204
205
206 /*
207 logger<< log4cpp::Priority::INFO<< "We are using Boost "
208 << BOOST_VERSION / 100000 << "." // major version
209 << BOOST_VERSION / 100 % 1000 << "." // minior version
210 << BOOST_VERSION % 100;
211 */
212
213 logger.info("Start new netmap thread %d", i);
214
215 // Well, we have thread attributes from Boost 1.50
216
217 #if defined(BOOST_THREAD_PLATFORM_PTHREAD) && BOOST_VERSION / 100 % 1000 >= 50 && !defined(__APPLE__) && defined(__GLIBC__)
218 /* Bind to certain core */
219 boost::thread::attributes thread_attrs;
220
221 if (execute_strict_cpu_affinity) {
222 cpu_set_t current_cpu_set;
223
224 int cpu_to_bind = i % num_cpus;
225
226 CPU_ZERO(¤t_cpu_set);
227 // We count cpus from zero
228 CPU_SET(cpu_to_bind, ¤t_cpu_set);
229
230 logger.info("I will bind this thread to logical CPU: %d", cpu_to_bind);
231
232 int set_affinity_result =
233 pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), ¤t_cpu_set);
234
235 if (set_affinity_result != 0) {
236 logger.error("Can't specify CPU affinity for netmap thread");
237 }
238 }
239
240 // Start thread and pass netmap descriptor to it
241 packet_receiver_thread_group.add_thread(
242 new boost::thread(thread_attrs, boost::bind(netmap_thread, new_nmd, i)));
243 #else
244 logger.error("Sorry but CPU affinity did not supported for your platform");
245 packet_receiver_thread_group.add_thread(new boost::thread(netmap_thread, new_nmd, i));
246 #endif
247 }
248
249 // Wait all threads for completion
250 packet_receiver_thread_group.join_all();
251 }
252
netmap_thread(struct nm_desc * netmap_descriptor,int thread_number)253 void netmap_thread(struct nm_desc* netmap_descriptor, int thread_number) {
254 struct nm_pkthdr h;
255 u_char* buf;
256 struct pollfd fds;
257 fds.fd = netmap_descriptor->fd; // NETMAP_FD(netmap_descriptor);
258 fds.events = POLLIN;
259
260 struct netmap_ring* rxring = NULL;
261 struct netmap_if* nifp = netmap_descriptor->nifp;
262
263 // printf("Reading from fd %d thread id: %d", netmap_descriptor->fd, thread_number);
264
265 for (;;) {
266 // We will wait 1000 microseconds for retry, for infinite timeout please use -1
267 int poll_result = poll(&fds, 1, 1000);
268
269 if (poll_result == 0) {
270 // printf("poll return 0 return code");
271 continue;
272 }
273
274 if (poll_result == -1) {
275 logger.error("Netmap plugin: poll failed with return code -1");
276 }
277
278 for (int i = netmap_descriptor->first_rx_ring; i <= netmap_descriptor->last_rx_ring; i++) {
279 // printf("Check ring %d from thread %d", i, thread_number);
280 rxring = NETMAP_RXRING(nifp, i);
281
282 if (nm_ring_empty(rxring)) {
283 continue;
284 }
285
286 receive_packets(rxring, thread_number);
287 }
288
289 // TODO: this code could add performance degradation
290 // Add interruption point for correct toolkit shutdown
291 // boost::this_thread::interruption_point();
292 }
293
294 // nm_close(netmap_descriptor);
295 }
296
start_netmap_collection(process_packet_pointer func_ptr)297 void start_netmap_collection(process_packet_pointer func_ptr) {
298 logger << log4cpp::Priority::INFO << "Netmap plugin started";
299 netmap_process_func_ptr = func_ptr;
300
301 num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
302 logger.info("We have %d cpus", num_cpus);
303
304 std::string interfaces_list = "";
305
306 if (configuration_map.count("interfaces") != 0) {
307 interfaces_list = configuration_map["interfaces"];
308 }
309
310 if (configuration_map.count("netmap_sampling_ratio") != 0) {
311 netmap_sampling_ratio = convert_string_to_integer(configuration_map["netmap_sampling_ratio"]);
312 }
313
314 if (configuration_map.count("netmap_read_packet_length_from_ip_header") != 0) {
315 netmap_read_packet_length_from_ip_header = configuration_map["netmap_read_packet_length_from_ip_header"] == "on";
316 }
317
318 std::vector<std::string> interfaces_for_listen;
319 boost::split(interfaces_for_listen, interfaces_list, boost::is_any_of(","), boost::token_compress_on);
320
321 logger << log4cpp::Priority::INFO << "netmap will listen on " << interfaces_for_listen.size() << " interfaces";
322
323 // Thread group for all "master" processes
324 boost::thread_group netmap_main_threads;
325
326 for (std::vector<std::string>::iterator interface = interfaces_for_listen.begin();
327 interface != interfaces_for_listen.end(); ++interface) {
328
329 logger << log4cpp::Priority::INFO << "netmap will sniff interface: " << *interface;
330
331 netmap_main_threads.add_thread( new boost::thread(receiver, *interface) );
332 }
333
334 netmap_main_threads.join_all();
335 }
336