1 // log4cpp logging facility
2 #include "log4cpp/Category.hh"
3 #include "log4cpp/Appender.hh"
4 #include "log4cpp/FileAppender.hh"
5 #include "log4cpp/OstreamAppender.hh"
6 #include "log4cpp/Layout.hh"
7 #include "log4cpp/BasicLayout.hh"
8 #include "log4cpp/PatternLayout.hh"
9 #include "log4cpp/Priority.hh"
10 
11 #include <boost/version.hpp>
12 #include <boost/algorithm/string.hpp>
13 
14 #include "../fast_library.h"
15 
16 // For support uint32_t, uint16_t
17 #include <sys/types.h>
18 
19 // For config map operations
20 #include <string>
21 #include <map>
22 
23 #include <stdio.h>
24 #include <iostream>
25 #include <string>
26 #define NETMAP_WITH_LIBS
27 
28 // Disable debug messages from Netmap
29 #define NETMAP_NO_DEBUG
30 #include <net/netmap_user.h>
31 #include <boost/thread.hpp>
32 
33 #if defined(__FreeBSD__) || defined(__DragonFly__)
34 // On FreeBSD function pthread_attr_setaffinity_np declared here
35 #include <pthread_np.h>
36 
37 // Also we have different type name for cpu set's store
38 typedef cpuset_t cpu_set_t;
39 #endif
40 
41 #include "../fastnetmon_packet_parser.h"
42 
43 #include "../unified_parser.hpp"
44 
45 // For pooling operations
46 #include <poll.h>
47 
48 // For support: IPPROTO_TCP, IPPROTO_ICMP, IPPROTO_UDP
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #include <netinet/in.h>
52 
53 #include "netmap_collector.h"
54 
55 // By default we read packet size from link layer
56 // But in case of Juniper we could crop first X bytes from packet:
57 // maximum-packet-length 110;
58 // And this option become mandatory if we want correct bps speed in toolkit
59 bool netmap_read_packet_length_from_ip_header = false;
60 
61 uint32_t netmap_sampling_ratio = 1;
62 
63 /* prototypes */
64 void netmap_thread(struct nm_desc* netmap_descriptor, int netmap_thread);
65 void consume_pkt(u_char* buffer, int len, int thread_number);
66 
67 // Get log4cpp logger from main program
68 extern log4cpp::Category& logger;
69 
70 // Pass unparsed packets number to main program
71 extern uint64_t total_unparsed_packets;
72 
73 // Global configuration map
74 extern std::map<std::string, std::string> configuration_map;
75 
76 u_int num_cpus = 0;
77 
78 // This variable name should be uniq for every plugin!
79 process_packet_pointer netmap_process_func_ptr = NULL;
80 
81 bool execute_strict_cpu_affinity = true;
82 
receive_packets(struct netmap_ring * ring,int thread_number)83 int receive_packets(struct netmap_ring* ring, int thread_number) {
84     u_int cur, rx, n;
85 
86     cur = ring->cur;
87     n = nm_ring_space(ring);
88 
89     for (rx = 0; rx < n; rx++) {
90         struct netmap_slot* slot = &ring->slot[cur];
91         char* p = NETMAP_BUF(ring, slot->buf_idx);
92 
93         // process data
94         consume_pkt((u_char*)p, slot->len, thread_number);
95 
96         cur = nm_ring_next(ring, cur);
97     }
98 
99     ring->head = ring->cur = cur;
100     return (rx);
101 }
102 
consume_pkt(u_char * buffer,int len,int thread_number)103 void consume_pkt(u_char* buffer, int len, int thread_number) {
104     // We should fill this structure for passing to FastNetMon
105     simple_packet packet;
106 
107     packet.sample_ratio = netmap_sampling_ratio;
108 
109     if (!parse_raw_packet_to_simple_packet(buffer, len, packet, netmap_read_packet_length_from_ip_header)) {
110         total_unparsed_packets++;
111 
112         return;
113     }
114 
115     netmap_process_func_ptr(packet);
116 }
117 
receiver(std::string interface_for_listening)118 void receiver(std::string interface_for_listening) {
119     struct nm_desc* netmap_descriptor;
120 
121     struct nmreq base_nmd;
122     bzero(&base_nmd, sizeof(base_nmd));
123 
124     // Magic from pkt-gen.c
125     base_nmd.nr_tx_rings = base_nmd.nr_rx_rings = 0;
126     base_nmd.nr_tx_slots = base_nmd.nr_rx_slots = 0;
127 
128     std::string interface = "";
129     std::string system_interface_name = "";
130     // If we haven't netmap: prefix in interface name we will append it
131     if (interface_for_listening.find("netmap:") == std::string::npos) {
132         system_interface_name = interface_for_listening;
133 
134         interface = "netmap:" + interface_for_listening;
135     } else {
136         // We should skip netmap prefix
137         system_interface_name = boost::replace_all_copy(interface_for_listening, "netmap:", "");
138 
139         interface = interface_for_listening;
140     }
141 
142 #ifdef __linux__
143     manage_interface_promisc_mode(system_interface_name, true);
144     logger.warn("Please disable all types of offload for this NIC manually: ethtool -K %s gro off gso off tso off lro off", system_interface_name.c_str());
145 #endif
146 
147     netmap_descriptor = nm_open(interface.c_str(), &base_nmd, 0, NULL);
148 
149     if (netmap_descriptor == NULL) {
150         logger.error("Can't open netmap device %s", interface.c_str());
151         return;
152     }
153 
154     logger.info("Mapped %dKB memory at %p", netmap_descriptor->req.nr_memsize >> 10, netmap_descriptor->mem);
155     logger.info("We have %d tx and %d rx rings", netmap_descriptor->req.nr_tx_rings,
156                 netmap_descriptor->req.nr_rx_rings);
157 
158     if (num_cpus > netmap_descriptor->req.nr_rx_rings) {
159         num_cpus = netmap_descriptor->req.nr_rx_rings;
160 
161         logger.info("We have number of CPUs bigger than number of NIC RX queues. Set number of "
162                     "CPU's to number of threads");
163     }
164 
165     /*
166         protocol stack and may cause a reset of the card,
167         which in turn may take some time for the PHY to
168         reconfigure. We do the open here to have time to reset.
169     */
170 
171     int wait_link = 2;
172     logger.info("Wait %d seconds for NIC reset", wait_link);
173     sleep(wait_link);
174 
175     boost::thread_group packet_receiver_thread_group;
176 
177     for (int i = 0; i < num_cpus; i++) {
178         struct nm_desc nmd = *netmap_descriptor;
179         // This operation is VERY important!
180         nmd.self = &nmd;
181 
182         uint64_t nmd_flags = 0;
183 
184         if (nmd.req.nr_flags != NR_REG_ALL_NIC) {
185             logger.error("Ooops, main descriptor should be with NR_REG_ALL_NIC flag");
186         }
187 
188         nmd.req.nr_flags = NR_REG_ONE_NIC;
189         nmd.req.nr_ringid = i;
190 
191         /* Only touch one of the rings (rx is already ok) */
192         nmd_flags |= NETMAP_NO_TX_POLL;
193 
194         struct nm_desc* new_nmd =
195         nm_open(interface.c_str(), NULL, nmd_flags | NM_OPEN_IFNAME | NM_OPEN_NO_MMAP, &nmd);
196 
197         if (new_nmd == NULL) {
198             logger.error("Can't open netmap descriptor for netmap per hardware queue thread");
199             return;
200         }
201 
202         logger.info("My first ring is %d and last ring id is %d I'm thread %d",
203                     new_nmd->first_rx_ring, new_nmd->last_rx_ring, i);
204 
205 
206         /*
207         logger<< log4cpp::Priority::INFO<< "We are using Boost "
208             << BOOST_VERSION / 100000     << "."  // major version
209             << BOOST_VERSION / 100 % 1000 << "."  // minior version
210             << BOOST_VERSION % 100;
211         */
212 
213         logger.info("Start new netmap thread %d", i);
214 
215 // Well, we have thread attributes from Boost 1.50
216 
217 #if defined(BOOST_THREAD_PLATFORM_PTHREAD) && BOOST_VERSION / 100 % 1000 >= 50 && !defined(__APPLE__) && defined(__GLIBC__)
218         /* Bind to certain core */
219         boost::thread::attributes thread_attrs;
220 
221         if (execute_strict_cpu_affinity) {
222             cpu_set_t current_cpu_set;
223 
224             int cpu_to_bind = i % num_cpus;
225 
226             CPU_ZERO(&current_cpu_set);
227             // We count cpus from zero
228             CPU_SET(cpu_to_bind, &current_cpu_set);
229 
230             logger.info("I will bind this thread to logical CPU: %d", cpu_to_bind);
231 
232             int set_affinity_result =
233             pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), &current_cpu_set);
234 
235             if (set_affinity_result != 0) {
236                 logger.error("Can't specify CPU affinity for netmap thread");
237             }
238         }
239 
240         // Start thread and pass netmap descriptor to it
241         packet_receiver_thread_group.add_thread(
242         new boost::thread(thread_attrs, boost::bind(netmap_thread, new_nmd, i)));
243 #else
244         logger.error("Sorry but CPU affinity did not supported for your platform");
245         packet_receiver_thread_group.add_thread(new boost::thread(netmap_thread, new_nmd, i));
246 #endif
247     }
248 
249     // Wait all threads for completion
250     packet_receiver_thread_group.join_all();
251 }
252 
netmap_thread(struct nm_desc * netmap_descriptor,int thread_number)253 void netmap_thread(struct nm_desc* netmap_descriptor, int thread_number) {
254     struct nm_pkthdr h;
255     u_char* buf;
256     struct pollfd fds;
257     fds.fd = netmap_descriptor->fd; // NETMAP_FD(netmap_descriptor);
258     fds.events = POLLIN;
259 
260     struct netmap_ring* rxring = NULL;
261     struct netmap_if* nifp = netmap_descriptor->nifp;
262 
263     // printf("Reading from fd %d thread id: %d", netmap_descriptor->fd, thread_number);
264 
265     for (;;) {
266         // We will wait 1000 microseconds for retry, for infinite timeout please use -1
267         int poll_result = poll(&fds, 1, 1000);
268 
269         if (poll_result == 0) {
270             // printf("poll return 0 return code");
271             continue;
272         }
273 
274         if (poll_result == -1) {
275             logger.error("Netmap plugin: poll failed with return code -1");
276         }
277 
278         for (int i = netmap_descriptor->first_rx_ring; i <= netmap_descriptor->last_rx_ring; i++) {
279             // printf("Check ring %d from thread %d", i, thread_number);
280             rxring = NETMAP_RXRING(nifp, i);
281 
282             if (nm_ring_empty(rxring)) {
283                 continue;
284             }
285 
286             receive_packets(rxring, thread_number);
287         }
288 
289         // TODO: this code could add performance degradation
290         // Add interruption point for correct toolkit shutdown
291         // boost::this_thread::interruption_point();
292     }
293 
294     // nm_close(netmap_descriptor);
295 }
296 
start_netmap_collection(process_packet_pointer func_ptr)297 void start_netmap_collection(process_packet_pointer func_ptr) {
298     logger << log4cpp::Priority::INFO << "Netmap plugin started";
299     netmap_process_func_ptr = func_ptr;
300 
301     num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
302     logger.info("We have %d cpus", num_cpus);
303 
304     std::string interfaces_list = "";
305 
306     if (configuration_map.count("interfaces") != 0) {
307         interfaces_list = configuration_map["interfaces"];
308     }
309 
310     if (configuration_map.count("netmap_sampling_ratio") != 0) {
311         netmap_sampling_ratio = convert_string_to_integer(configuration_map["netmap_sampling_ratio"]);
312     }
313 
314     if (configuration_map.count("netmap_read_packet_length_from_ip_header") != 0) {
315         netmap_read_packet_length_from_ip_header = configuration_map["netmap_read_packet_length_from_ip_header"] == "on";
316     }
317 
318     std::vector<std::string> interfaces_for_listen;
319     boost::split(interfaces_for_listen, interfaces_list, boost::is_any_of(","), boost::token_compress_on);
320 
321     logger << log4cpp::Priority::INFO << "netmap will listen on " << interfaces_for_listen.size() << " interfaces";
322 
323     // Thread group for all "master" processes
324     boost::thread_group netmap_main_threads;
325 
326     for (std::vector<std::string>::iterator interface = interfaces_for_listen.begin();
327         interface != interfaces_for_listen.end(); ++interface) {
328 
329         logger << log4cpp::Priority::INFO << "netmap will sniff interface: " << *interface;
330 
331         netmap_main_threads.add_thread( new boost::thread(receiver, *interface) );
332     }
333 
334     netmap_main_threads.join_all();
335 }
336