1 /* Author: pavel.odintsov@gmail.com */
2 /* License: GPLv2 */
3 
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <errno.h>
7 #include <string.h>
8 #include <unistd.h>
9 #include <new>
10 #include <signal.h>
11 #include <time.h>
12 #include <math.h>
13 
14 #include <sys/socket.h>
15 #include <sys/resource.h>
16 #include <sys/stat.h>
17 #include <arpa/inet.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
20 #include <netinet/udp.h>
21 #include <netinet/ip_icmp.h>
22 #include <net/if_arp.h> // struct arphdr
23 #include <netinet/if_ether.h>
24 #include <netinet/in.h>
25 
26 #include "libpatricia/patricia.h"
27 #include "fastnetmon_types.h"
28 #include "fastnetmon_packet_parser.h"
29 #include "fast_library.h"
30 #include "packet_storage.h"
31 #include "bgp_flow_spec.h"
32 
33 // Here we store variables which differs for different paltforms
34 #include "fast_platform.h"
35 
36 #ifdef ENABLE_DPI
37 #include "fast_dpi.h"
38 #endif
39 
40 #ifdef FASTNETMON_API
41 #include <grpc++/grpc++.h>
42 #include "fastnetmon.grpc.pb.h"
43 #endif
44 
45 // Plugins
46 #include "sflow_plugin/sflow_collector.h"
47 #include "netflow_plugin/netflow_collector.h"
48 #include "pcap_plugin/pcap_collector.h"
49 
50 #ifdef NETMAP_PLUGIN
51 #include "netmap_plugin/netmap_collector.h"
52 #endif
53 
54 #ifdef PF_RING
55 #include "pfring_plugin/pfring_collector.h"
56 #endif
57 
58 #ifdef SNABB_SWITCH
59 #include "snabbswitch_plugin/snabbswitch_collector.h"
60 #endif
61 
62 #ifdef FASTNETMON_ENABLE_AFPACKET
63 #include "afpacket_plugin/afpacket_collector.h"
64 #endif
65 
66 #ifdef PF_RING
67 #include "actions/pfring_hardware_filter_action.h"
68 #endif
69 
70 #ifdef ENABLE_GOBGP
71 #include "actions/gobgp_action.h"
72 #endif
73 
74 // Yes, maybe it's not an good idea but with this we can guarantee working code in example plugin
75 #include "example_plugin/example_collector.h"
76 
77 #include <algorithm>
78 #include <iostream>
79 #include <map>
80 #include <fstream>
81 
82 #include <vector>
83 #include <utility>
84 #include <sstream>
85 
86 #include <boost/thread.hpp>
87 #include <boost/thread/mutex.hpp>
88 #include <boost/regex.hpp>
89 
90 #include <boost/program_options.hpp>
91 
92 // log4cpp logging facility
93 #include "log4cpp/RemoteSyslogAppender.hh"
94 #include "log4cpp/SyslogAppender.hh"
95 #include "log4cpp/Category.hh"
96 #include "log4cpp/Appender.hh"
97 #include "log4cpp/FileAppender.hh"
98 #include "log4cpp/OstreamAppender.hh"
99 #include "log4cpp/Layout.hh"
100 #include "log4cpp/BasicLayout.hh"
101 #include "log4cpp/PatternLayout.hh"
102 #include "log4cpp/Priority.hh"
103 
104 
105 // Boost libs
106 #include <boost/algorithm/string.hpp>
107 #include <boost/algorithm/string/trim.hpp>
108 
109 #ifdef GEOIP
110 #include "GeoIP.h"
111 #endif
112 
113 #ifdef REDIS
114 #include <hiredis/hiredis.h>
115 #endif
116 
117 #ifdef MONGO
118 #include <bson.h>
119 #include <mongoc.h>
120 #endif
121 
122 // #define IPV6_HASH_COUNTERS
123 
124 #ifdef IPV6_HASH_COUNTERS
125 #include "concurrentqueue.h"
126 #endif
127 
128 #ifdef FASTNETMON_API
129 using grpc::Server;
130 using grpc::ServerBuilder;
131 using grpc::ServerContext;
132 using grpc::Status;
133 using fastmitigation::BanListRequest;
134 using fastmitigation::BanListReply;
135 using fastmitigation::Fastnetmon;
136 
137 std::unique_ptr<Server> api_server;
138 bool enable_api = false;
139 #endif
140 
141 time_t last_call_of_traffic_recalculation;
142 
143 std::string cli_stats_file_path = "/tmp/fastnetmon.dat";
144 
145 unsigned int stats_thread_sleep_time = 3600;
146 unsigned int stats_thread_initial_call_delay = 30;
147 
148 unsigned int recalculate_speed_timeout = 1;
149 
150 // Send or not any details about attack for ban script call over stdin
151 bool notify_script_pass_details = true;
152 
153 bool pfring_hardware_filters_enabled = false;
154 
155 bool notify_script_enabled = true;
156 
157 // We could collect attack dumps in pcap format
158 bool collect_attack_pcap_dumps = false;
159 
160 // We could process this dumps with DPI
161 bool process_pcap_attack_dumps_with_dpi = false;
162 
163 bool unban_only_if_attack_finished = true;
164 
165 logging_configuration_t logging_configuration;
166 
167 // Variable with all data from main screen
168 std::string screen_data_stats = "";
169 
170 // Global map with parsed config file
171 typedef std::map<std::string, std::string> configuration_map_t;
172 configuration_map_t configuration_map;
173 
174 // Every X seconds we will run ban list cleaner thread
175 // If customer uses ban_time smaller than this value we will use ban_time/2 as unban_iteration_sleep_time
176 int unban_iteration_sleep_time = 60;
177 
178 bool unban_enabled = true;
179 
180 #ifdef ENABLE_DPI
181 struct ndpi_detection_module_struct* my_ndpi_struct = NULL;
182 
183 u_int32_t ndpi_size_flow_struct = 0;
184 u_int32_t ndpi_size_id_struct = 0;
185 #endif
186 
187 #ifdef ENABLE_GOBGP
188 bool gobgp_enabled = false;
189 #endif
190 
191 #ifdef MONGO
192 std::string mongodb_host = "localhost";
193 unsigned int mongodb_port = 27017;
194 bool mongodb_enabled = false;
195 
196 std::string mongodb_database_name = "fastnetmon";
197 #endif
198 
199 /* Configuration block, we must move it to configuration file  */
200 #ifdef REDIS
201 unsigned int redis_port = 6379;
202 std::string redis_host = "127.0.0.1";
203 
204 // redis key prefix
205 std::string redis_prefix = "";
206 
207 // because it's additional and very specific feature we should disable it by default
208 bool redis_enabled = false;
209 #endif
210 
211 bool monitor_local_ip_addresses = true;
212 
213 // This flag could enable print of ban actions and thresholds on the client's screen
214 bool print_configuration_params_on_the_screen = false;
215 
216 // Trigger for enable or disable traffic counting for whole subnets
217 bool enable_subnet_counters = false;
218 
219 // We will announce whole subnet instead single IP with BGP if this flag enabled
220 bool exabgp_announce_whole_subnet = false;
221 
222 // We will announce only /32 host
223 bool exabgp_announce_host = false;
224 
225 // With this flag we will announce more specfic then whole block Flow Spec announces
226 bool exabgp_flow_spec_announces = false;
227 
228 ban_settings_t global_ban_settings;
229 
init_global_ban_settings()230 void init_global_ban_settings() {
231     // ban Configuration params
232     global_ban_settings.enable_ban_for_pps = false;
233     global_ban_settings.enable_ban_for_bandwidth = false;
234     global_ban_settings.enable_ban_for_flows_per_second = false;
235 
236     // We must ban IP if it exceeed this limit in PPS
237     global_ban_settings.ban_threshold_pps = 20000;
238 
239     // We must ban IP of it exceed this limit for number of flows in any direction
240     global_ban_settings.ban_threshold_flows = 3500;
241 
242     // We must ban client if it exceed 1GBps
243     global_ban_settings.ban_threshold_mbps = 1000;
244 
245     // Disable per protocol thresholds too
246     global_ban_settings.enable_ban_for_tcp_pps = false;
247     global_ban_settings.enable_ban_for_tcp_bandwidth = false;
248 
249     global_ban_settings.enable_ban_for_udp_pps = false;
250     global_ban_settings.enable_ban_for_udp_bandwidth = false;
251 
252     global_ban_settings.enable_ban_for_icmp_pps = false;
253     global_ban_settings.enable_ban_for_icmp_bandwidth = false;
254 
255     // Ban enable/disable flag
256     global_ban_settings.enable_ban = true;
257 }
258 
259 bool enable_conection_tracking = true;
260 
261 bool enable_snabbswitch_collection = false;
262 bool enable_afpacket_collection = false;
263 bool enable_data_collection_from_mirror = true;
264 bool enable_netmap_collection = false;
265 bool enable_sflow_collection = false;
266 bool enable_netflow_collection = false;
267 bool enable_pcap_collection = false;
268 
269 // Time consumed by reaclculation for all IPs
270 struct timeval speed_calculation_time;
271 
272 // Time consumed by drawing stats for all IPs
273 struct timeval drawing_thread_execution_time;
274 
275 // Global thread group for packet capture threads
276 boost::thread_group packet_capture_plugin_thread_group;
277 
278 // Global thread group for service processes (speed recalculation,
279 // screen updater and ban list cleaner)
280 boost::thread_group service_thread_group;
281 
282 // Total number of hosts in our networks
283 // We need this as global variable because it's very important value for configuring data structures
284 unsigned int total_number_of_hosts_in_our_networks = 0;
285 
286 #ifdef GEOIP
287 GeoIP* geo_ip = NULL;
288 #endif
289 
290 // IPv4 lookup trees
291 patricia_tree_t* lookup_tree_ipv4, *whitelist_tree_ipv4;
292 
293 // IPv6 lookup trees
294 patricia_tree_t* lookup_tree_ipv6, *whitelist_tree_ipv6;
295 
296 bool DEBUG = 0;
297 
298 // flag about dumping all packets to log
299 bool DEBUG_DUMP_ALL_PACKETS = false;
300 
301 // dump "other" packets
302 bool DEBUG_DUMP_OTHER_PACKETS = false;
303 
304 // Period for update screen for console version of tool
305 unsigned int check_period = 3;
306 
307 // Standard ban time in seconds for all attacks but you can tune this value
308 int global_ban_time = 1800;
309 
310 // We calc average pps/bps for this time
311 double average_calculation_amount = 15;
312 
313 // We calc average pps/bps for subnets with this time, we use longer value for calculation average network traffic
314 double average_calculation_amount_for_subnets = 30;
315 
316 // Show average or absolute value of speed
317 bool print_average_traffic_counts = true;
318 
319 // Key used for sorting clients in output.  Allowed sort params: packets/bytes/flows
320 std::string sort_parameter = "packets";
321 
322 // Number of lines in program output
323 unsigned int max_ips_in_list = 7;
324 
325 // Number of lines for sending ben attack details to email
326 unsigned int ban_details_records_count = 500;
327 
328 // We haven't option for configure it with configuration file
329 unsigned int number_of_packets_for_pcap_attack_dump = 500;
330 
331 // log file
332 log4cpp::Category& logger = log4cpp::Category::getRoot();
333 
334 // We storae all active BGP Flow Spec announces here
335 typedef std::map<std::string, uint32_t> active_flow_spec_announces_t;
336 active_flow_spec_announces_t active_flow_spec_announces;
337 
338 /* Configuration block ends */
339 
340 // We count total number of incoming/outgoing/internal and other traffic type packets/bytes
341 // And initilize by 0 all fields
342 total_counter_element total_counters[4];
343 total_counter_element total_speed_counters[4];
344 total_counter_element total_speed_average_counters[4];
345 
346 // Total amount of non parsed packets
347 uint64_t total_unparsed_packets = 0;
348 uint64_t total_unparsed_packets_speed = 0;
349 
350 // Total amount of IPv6 packets
351 uint64_t total_ipv6_packets = 0;
352 
353 // IPv6 traffic which belongs to our own networks
354 uint64_t our_ipv6_packets = 0;
355 
356 uint64_t incoming_total_flows_speed = 0;
357 uint64_t outgoing_total_flows_speed = 0;
358 
359 map_of_vector_counters SubnetVectorMap;
360 
361 // Here we store taffic per subnet
362 map_for_subnet_counters PerSubnetCountersMap;
363 
364 // Here we store traffic speed per subnet
365 map_for_subnet_counters PerSubnetSpeedMap;
366 
367 // Here we store average speed per subnet
368 map_for_subnet_counters PerSubnetAverageSpeedMap;
369 
370 // Flow tracking structures
371 map_of_vector_counters_for_flow SubnetVectorMapFlow;
372 
373 /* End of our data structs */
374 boost::mutex ban_list_details_mutex;
375 boost::mutex ban_list_mutex;
376 boost::mutex flow_counter;
377 
378 // map for flows
379 std::map<uint64_t, int> FlowCounter;
380 
381 // Struct for string speed per IP
382 map_of_vector_counters SubnetVectorMapSpeed;
383 
384 // Struct for storing average speed per IP for specified interval
385 map_of_vector_counters SubnetVectorMapSpeedAverage;
386 
387 #ifdef GEOIP
388 map_for_counters GeoIpCounter;
389 #endif
390 
391 // In ddos info we store attack power and direction
392 std::map<uint32_t, banlist_item> ban_list;
393 std::map<uint32_t, std::vector<simple_packet> > ban_list_details;
394 
395 host_group_map_t host_groups;
396 
397 // Here we store assignment from subnet to certain host group for fast lookup
398 subnet_to_host_group_map_t subnet_to_host_groups;
399 
400 host_group_ban_settings_map_t host_group_ban_settings_map;
401 
402 std::vector<subnet_t> our_networks;
403 std::vector<subnet_t> whitelist_networks;
404 
405 // ExaBGP support flag
406 bool exabgp_enabled = false;
407 std::string exabgp_community = "";
408 
409 // We could use separate communities for subnet and host announces
410 std::string exabgp_community_subnet = "";
411 std::string exabgp_community_host = "";
412 
413 
414 std::string exabgp_command_pipe = "/var/run/fastnetmon/exabgp.cmd";
415 std::string exabgp_next_hop = "";
416 
417 // Graphite monitoring
418 bool graphite_enabled = false;
419 std::string graphite_host = "127.0.0.1";
420 unsigned short int graphite_port = 2003;
421 
422 // Default graphite namespace
423 std::string graphite_prefix = "fastnetmon";
424 
425 bool process_incoming_traffic = true;
426 bool process_outgoing_traffic = true;
427 
428 // Prototypes
429 #ifdef ENABLE_DPI
430 void init_current_instance_of_ndpi();
431 #endif
432 
433 inline void build_average_speed_counters_from_speed_counters( map_element* current_average_speed_element, map_element& new_speed_element, double exp_value, double exp_power);
434 inline void build_speed_counters_from_packet_counters(map_element& new_speed_element, map_element* vector_itr, double speed_calc_period);
435 void execute_ip_ban(uint32_t client_ip, map_element average_speed_element, std::string flow_attack_details, subnet_t customer_subnet);
436 std::string get_attack_description_in_json(uint32_t client_ip, attack_details& current_attack);
437 logging_configuration_t read_logging_settings(configuration_map_t configuration_map);
438 std::string get_amplification_attack_type(amplification_attack_type_t attack_type);
439 std::string generate_flow_spec_for_amplification_attack(amplification_attack_type_t amplification_attack_type, std::string destination_ip);
440 bool exabgp_flow_spec_ban_manage(std::string action, std::string flow_spec_rule_as_text);
441 void call_attack_details_handlers(uint32_t client_ip, attack_details& current_attack, std::string attack_fingerprint);
442 void call_ban_handlers(uint32_t client_ip, attack_details& current_attack, std::string flow_attack_details);
443 void call_unban_handlers(uint32_t client_ip, attack_details& current_attack);
444 ban_settings_t read_ban_settings(configuration_map_t configuration_map, std::string host_group_name = "");
445 void exabgp_prefix_ban_manage(std::string action, std::string prefix_as_string_with_mask, std::string exabgp_next_hop,
446     std::string exabgp_community);
447 std::string print_subnet_load();
448 bool we_should_ban_this_ip(map_element* current_average_speed_element, ban_settings_t current_ban_settings);
449 unsigned int get_max_used_protocol(uint64_t tcp, uint64_t udp, uint64_t icmp);
450 void print_attack_details_to_file(std::string details, std::string client_ip_as_string, attack_details current_attack);
451 std::string print_ban_thresholds(ban_settings_t current_ban_settings);
452 bool load_configuration_file();
453 std::string print_flow_tracking_for_ip(conntrack_main_struct& conntrack_element, std::string client_ip);
454 void convert_integer_to_conntrack_hash_struct(packed_session* packed_connection_data,
455                                               packed_conntrack_hash* unpacked_data);
456 uint64_t convert_conntrack_hash_struct_to_integer(packed_conntrack_hash* struct_value);
457 void cleanup_ban_list();
458 std::string get_attack_description(uint32_t client_ip, attack_details& current_attack);
459 void send_attack_details(uint32_t client_ip, attack_details current_attack_details);
460 void free_up_all_resources();
461 std::string print_ddos_attack_details();
462 void recalculate_speed();
463 std::string print_channel_speed(std::string traffic_type, direction packet_direction);
464 void process_packet(simple_packet& current_packet);
465 void traffic_draw_program();
466 void interruption_signal_handler(int signal_number);
467 
468 #ifdef FASTNETMON_API
silent_logging_function(gpr_log_func_args * args)469 void silent_logging_function(gpr_log_func_args *args) {
470     // We do not want any logging here
471 }
472 
473 // Logic and data behind the server's behavior.
474 class FastnetmonApiServiceImpl final : public Fastnetmon::Service {
GetBanlist(::grpc::ServerContext * context,const::fastmitigation::BanListRequest * request,::grpc::ServerWriter<::fastmitigation::BanListReply> * writer)475     Status GetBanlist(::grpc::ServerContext* context, const ::fastmitigation::BanListRequest* request, ::grpc::ServerWriter< ::fastmitigation::BanListReply>* writer) override {
476         logger << log4cpp::Priority::INFO << "API we asked for banlist";
477 
478         for (std::map<uint32_t, banlist_item>::iterator itr = ban_list.begin(); itr != ban_list.end(); ++itr) {
479             std::string client_ip_as_string = convert_ip_as_uint_to_string(itr->first);
480 
481             BanListReply reply;
482             reply.set_ip_address( client_ip_as_string + "/32" );
483             writer->Write(reply);
484         }
485 
486         return Status::OK;
487     }
488 
ExecuteBan(ServerContext * context,const fastmitigation::ExecuteBanRequest * request,fastmitigation::ExecuteBanReply * reply)489     Status ExecuteBan(ServerContext* context, const fastmitigation::ExecuteBanRequest* request, fastmitigation::ExecuteBanReply* reply) override {
490         logger << log4cpp::Priority::INFO << "API we asked for ban for IP: " << request->ip_address();
491 
492         if (!is_v4_host(request->ip_address())) {
493             logger << log4cpp::Priority::ERROR << "IP bad format";
494             return Status::CANCELLED;
495         }
496 
497         uint32_t client_ip = convert_ip_as_string_to_uint(request->ip_address());
498 
499         struct attack_details current_attack;
500         ban_list_mutex.lock();
501         ban_list[client_ip] = current_attack;
502         ban_list_mutex.unlock();
503 
504         ban_list_details_mutex.lock();
505         ban_list_details[client_ip] = std::vector<simple_packet>();
506         ban_list_details_mutex.unlock();
507 
508         logger << log4cpp::Priority::INFO << "API call ban handlers manually";
509 
510         std::string flow_attack_details = "manually triggered attack";
511         call_ban_handlers(client_ip, current_attack, flow_attack_details);
512 
513         return Status::OK;
514     }
515 
ExecuteUnBan(ServerContext * context,const fastmitigation::ExecuteBanRequest * request,fastmitigation::ExecuteBanReply * reply)516     Status ExecuteUnBan(ServerContext* context, const fastmitigation::ExecuteBanRequest* request, fastmitigation::ExecuteBanReply* reply) override {
517         logger << log4cpp::Priority::INFO << "API: We asked for unban for IP: " << request->ip_address();
518 
519         if (!is_v4_host(request->ip_address())) {
520             logger << log4cpp::Priority::ERROR << "IP bad format";
521             return Status::CANCELLED;
522         }
523 
524         uint32_t banned_ip = convert_ip_as_string_to_uint(request->ip_address());
525 
526         if (ban_list.count(banned_ip) == 0) {
527             logger << log4cpp::Priority::ERROR << "API: Could not find IP in ban list";
528             return Status::CANCELLED;
529         }
530 
531         banlist_item ban_details = ban_list[banned_ip];
532 
533         logger << log4cpp::Priority::INFO << "API: call unban handlers";
534         call_unban_handlers(banned_ip, ban_details);
535 
536         logger << log4cpp::Priority::INFO << "API: remove IP from ban list";
537 
538         ban_list_mutex.lock();
539         ban_list.erase(banned_ip);
540         ban_list_mutex.unlock();
541 
542         return Status::OK;
543     }
544 };
545 
546 // We could not define this variable in top of the file because we should define class before
547 FastnetmonApiServiceImpl api_service;
548 
StartupApiServer()549 std::unique_ptr<Server> StartupApiServer() {
550     std::string server_address("127.0.0.1:50052");
551     ServerBuilder builder;
552     // Listen on the given address without any authentication mechanism.
553     builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
554     // Register "service" as the instance through which we'll communicate with
555     // clients. In this case it corresponds to an *synchronous* service.
556     builder.RegisterService(&api_service);
557 
558     // Finally assemble the server.
559     std::unique_ptr<Server> current_api_server(builder.BuildAndStart());
560     logger << log4cpp::Priority::INFO << "API server listening on " << server_address;
561 
562     return current_api_server;
563 }
564 
RunApiServer()565 void RunApiServer() {
566     api_server = StartupApiServer();
567 
568     // Wait for the server to shutdown. Note that some other thread must be
569     // responsible for shutting down the server for this call to ever return.
570     api_server->Wait();
571     logger << log4cpp::Priority::INFO << "API server got shutdown signal";
572 }
573 #endif
574 
575 
576 /* Class for custom comparison fields by different fields */
577 template <typename T>
578 class TrafficComparatorClass {
579     private:
580     sort_type sort_field;
581     direction sort_direction;
582 
583     public:
TrafficComparatorClass(direction sort_direction,sort_type sort_field)584     TrafficComparatorClass(direction sort_direction, sort_type sort_field) {
585         this->sort_field = sort_field;
586         this->sort_direction = sort_direction;
587     }
588 
operator ()(T a,T b)589     bool operator()(T a, T b) {
590         if (sort_field == FLOWS) {
591             if (sort_direction == INCOMING) {
592                 return a.second.in_flows > b.second.in_flows;
593             } else if (sort_direction == OUTGOING) {
594                 return a.second.out_flows > b.second.out_flows;
595             } else {
596                 return false;
597             }
598         } else if (sort_field == PACKETS) {
599             if (sort_direction == INCOMING) {
600                 return a.second.in_packets > b.second.in_packets;
601             } else if (sort_direction == OUTGOING) {
602                 return a.second.out_packets > b.second.out_packets;
603             } else {
604                 return false;
605             }
606         } else if (sort_field == BYTES) {
607             if (sort_direction == INCOMING) {
608                 return a.second.in_bytes > b.second.in_bytes;
609             } else if (sort_direction == OUTGOING) {
610                 return a.second.out_bytes > b.second.out_bytes;
611             } else {
612                 return false;
613             }
614         } else {
615             return false;
616         }
617     }
618 };
619 
sigpipe_handler_for_popen(int signo)620 void sigpipe_handler_for_popen(int signo) {
621     logger << log4cpp::Priority::ERROR << "Sorry but we experienced error with popen. "
622            << "Please check your scripts. They should receive data on stdin! Optionally you could disable passing any details with configuration param: notify_script_pass_details = no";
623 
624     // Well, we do not need exit here because we have another options to notifying about atatck
625     // exit(1);
626 }
627 
628 // exec command and pass data to it stdin
exec_with_stdin_params(std::string cmd,std::string params)629 bool exec_with_stdin_params(std::string cmd, std::string params) {
630     FILE* pipe = popen(cmd.c_str(), "w");
631     if (!pipe) {
632         logger << log4cpp::Priority::ERROR << "Can't execute program " << cmd
633                << " error code: " << errno << " error text: " << strerror(errno);
634         return false;
635     }
636 
637     int fputs_ret = fputs(params.c_str(), pipe);
638 
639     if (fputs_ret) {
640         pclose(pipe);
641         return true;
642     } else {
643         logger << log4cpp::Priority::ERROR << "Can't pass data to stdin of program " << cmd;
644         pclose(pipe);
645         return false;
646     }
647 }
648 
649 #ifdef GEOIP
geoip_init()650 bool geoip_init() {
651     // load GeoIP ASN database to memory
652     geo_ip = GeoIP_open("/usr/local/share/fastnetmon/GeoIPASNum.dat", GEOIP_MEMORY_CACHE);
653 
654     if (geo_ip == NULL) {
655         return false;
656     } else {
657         return true;
658     }
659 }
660 #endif
661 
662 #ifdef REDIS
redis_init_connection()663 redisContext* redis_init_connection() {
664     struct timeval timeout = { 1, 500000 }; // 1.5 seconds
665     redisContext* redis_context = redisConnectWithTimeout(redis_host.c_str(), redis_port, timeout);
666     if (redis_context->err) {
667         logger << log4cpp::Priority::ERROR << "Redis connection error:" << redis_context->errstr;
668         return NULL;
669     }
670 
671     // We should check connection with ping because redis do not check connection
672     redisReply* reply = (redisReply*)redisCommand(redis_context, "PING");
673     if (reply) {
674         freeReplyObject(reply);
675     } else {
676         return NULL;
677     }
678 
679     return redis_context;
680 }
681 #endif
682 
683 #ifdef MONGO
store_data_in_mongo(std::string key_name,std::string attack_details_json)684 void store_data_in_mongo(std::string key_name, std::string attack_details_json) {
685     mongoc_client_t *client;
686     mongoc_collection_t *collection;
687     mongoc_cursor_t *cursor;
688     bson_error_t error;
689     bson_oid_t oid;
690     bson_t *doc;
691 
692     mongoc_init ();
693 
694     std::string collection_name = "attacks";
695     std::string connection_string = "mongodb://" + mongodb_host + ":" + convert_int_to_string(mongodb_port) + "/";
696 
697     client = mongoc_client_new (connection_string.c_str());
698 
699     if (!client) {
700         logger << log4cpp::Priority::ERROR << "Can't connect to MongoDB database";
701         return;
702     }
703 
704     bson_error_t bson_from_json_error;
705     bson_t* bson_data = bson_new_from_json((const uint8_t *)attack_details_json.c_str(), attack_details_json.size(), &bson_from_json_error);
706     if (!bson_data) {
707         logger << log4cpp::Priority::ERROR << "Could not convert JSON to BSON";
708         return;
709     }
710 
711     // logger << log4cpp::Priority::INFO << bson_as_json(bson_data, NULL);
712 
713     collection = mongoc_client_get_collection (client, mongodb_database_name.c_str(), collection_name.c_str());
714 
715     doc = bson_new ();
716     bson_oid_init (&oid, NULL);
717     BSON_APPEND_OID (doc, "_id", &oid);
718     bson_append_document(doc,  key_name.c_str(), key_name.size(), bson_data);
719 
720     // logger << log4cpp::Priority::INFO << bson_as_json(doc, NULL);
721 
722     if (!mongoc_collection_insert (collection, MONGOC_INSERT_NONE, doc, NULL, &error)) {
723         logger << log4cpp::Priority::ERROR << "Could not store data to MongoDB: " << error.message;
724     }
725 
726     // TODO: destroy bson_data too!
727 
728     bson_destroy (doc);
729     mongoc_collection_destroy (collection);
730     mongoc_client_destroy (client);
731 
732 }
733 #endif
734 
735 #ifdef REDIS
store_data_in_redis(std::string key_name,std::string attack_details)736 void store_data_in_redis(std::string key_name, std::string attack_details) {
737     redisReply* reply = NULL;
738     redisContext* redis_context = redis_init_connection();
739 
740     if (!redis_context) {
741         logger << log4cpp::Priority::ERROR << "Could not initiate connection to Redis";
742         return;
743     }
744 
745     reply = (redisReply*)redisCommand(redis_context, "SET %s %s", key_name.c_str(), attack_details.c_str());
746 
747     // If we store data correctly ...
748     if (!reply) {
749         logger << log4cpp::Priority::ERROR << "Can't increment traffic in redis error_code: " << redis_context->err
750             << " error_string: " << redis_context->errstr;
751 
752         // Handle redis server restart corectly
753         if (redis_context->err == 1 or redis_context->err == 3) {
754             // Connection refused
755             logger << log4cpp::Priority::ERROR << "Unfortunately we can't store data in Redis because server reject connection";
756         }
757     } else {
758         freeReplyObject(reply);
759     }
760 
761     redisFree(redis_context);
762 }
763 #endif
764 
draw_table(direction data_direction,bool do_redis_update,sort_type sort_item)765 std::string draw_table(direction data_direction, bool do_redis_update, sort_type sort_item) {
766     std::vector<pair_of_map_elements> vector_for_sort;
767 
768     std::stringstream output_buffer;
769 
770     // Preallocate memory for sort vector
771     // We use total networks size for this vector
772     vector_for_sort.reserve(total_number_of_hosts_in_our_networks);
773 
774     // Switch to Average speed there!!!
775     map_of_vector_counters* current_speed_map = NULL;
776 
777     if (print_average_traffic_counts) {
778         current_speed_map = &SubnetVectorMapSpeedAverage;
779     } else {
780         current_speed_map = &SubnetVectorMapSpeed;
781     }
782 
783     map_element zero_map_element;
784     memset(&zero_map_element, 0, sizeof(zero_map_element));
785 
786     unsigned int count_of_zero_speed_packets = 0;
787     for (map_of_vector_counters::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) {
788         for (vector_of_counters::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
789             int current_index = vector_itr - itr->second.begin();
790 
791             // convert to host order for math operations
792             uint32_t subnet_ip = ntohl(itr->first.first);
793             uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
794 
795             // covnert to our standard network byte order
796             uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
797 
798             // Do not add zero speed packets to sort list
799             if (memcmp((void*)&zero_map_element, &*vector_itr, sizeof(map_element)) != 0) {
800                 vector_for_sort.push_back(std::make_pair(client_ip, *vector_itr));
801             } else {
802                 count_of_zero_speed_packets++;
803             }
804         }
805     }
806 
807     // Sort only first X elements in this vector
808     unsigned int shift_for_sort = max_ips_in_list;
809 
810     if (data_direction == INCOMING or data_direction == OUTGOING) {
811         // Because in another case we will got segmentation fault
812         unsigned int vector_size = vector_for_sort.size();
813 
814         if (vector_size < shift_for_sort) {
815             shift_for_sort = vector_size;
816         }
817 
818         std::partial_sort(vector_for_sort.begin(), vector_for_sort.begin() + shift_for_sort, vector_for_sort.end(),
819                   TrafficComparatorClass<pair_of_map_elements>(data_direction, sort_item));
820     } else {
821         logger << log4cpp::Priority::ERROR << "Unexpected bahaviour on sort function";
822         return "Internal error";
823     }
824 
825     unsigned int element_number = 0;
826 
827     // In this loop we print only top X talkers in our subnet to screen buffer
828     for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin(); ii != vector_for_sort.end(); ++ii) {
829         // Print first max_ips_in_list elements in list, we will show top X "huge" channel loaders
830         if (element_number >= max_ips_in_list) {
831             break;
832         }
833 
834         uint32_t client_ip = (*ii).first;
835         std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
836 
837         uint64_t pps = 0;
838         uint64_t bps = 0;
839         uint64_t flows = 0;
840 
841         uint64_t pps_average = 0;
842         uint64_t bps_average = 0;
843         uint64_t flows_average = 0;
844 
845         // Here we could have average or instantaneous speed
846         map_element* current_speed_element = &ii->second;
847 
848         // Create polymorphic pps, byte and flow counters
849         if (data_direction == INCOMING) {
850             pps = current_speed_element->in_packets;
851             bps = current_speed_element->in_bytes;
852             flows = current_speed_element->in_flows;
853         } else if (data_direction == OUTGOING) {
854             pps = current_speed_element->out_packets;
855             bps = current_speed_element->out_bytes;
856             flows = current_speed_element->out_flows;
857         }
858 
859         uint64_t mbps = convert_speed_to_mbps(bps);
860         uint64_t mbps_average = convert_speed_to_mbps(bps_average);
861 
862         std::string is_banned = ban_list.count(client_ip) > 0 ? " *banned* " : "";
863 
864         // We use setw for alignment
865         output_buffer << client_ip_as_string << "\t\t";
866 
867         output_buffer << std::setw(6) << pps << " pps ";
868         output_buffer << std::setw(6) << mbps << " mbps ";
869         output_buffer << std::setw(6) << flows << " flows ";
870 
871         output_buffer << is_banned << std::endl;
872 
873         element_number++;
874     }
875 
876     graphite_data_t graphite_data;
877 
878     // TODO: add graphite operations time to the config file
879     if (graphite_enabled) {
880         for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin(); ii != vector_for_sort.end(); ++ii) {
881             uint32_t client_ip = (*ii).first;
882             std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
883 
884             uint64_t pps = 0;
885             uint64_t bps = 0;
886             uint64_t flows = 0;
887 
888             // Here we could have average or instantaneous speed
889             map_element* current_speed_element = &ii->second;
890 
891             // Create polymorphic pps, byte and flow counters
892             if (data_direction == INCOMING) {
893                 pps = current_speed_element->in_packets;
894                 bps = current_speed_element->in_bytes;
895                 flows = current_speed_element->in_flows;
896             } else if (data_direction == OUTGOING) {
897                 pps = current_speed_element->out_packets;
898                 bps = current_speed_element->out_bytes;
899                 flows = current_speed_element->out_flows;
900             }
901 
902             std::string direction_as_string;
903 
904             if (data_direction == INCOMING) {
905                 direction_as_string = "incoming";
906             } else if (data_direction == OUTGOING) {
907                 direction_as_string = "outgoing";
908             }
909 
910             std::string ip_as_string_with_dash_delimiters = client_ip_as_string;
911             // Replace dots by dashes
912             std::replace(ip_as_string_with_dash_delimiters.begin(),
913                 ip_as_string_with_dash_delimiters.end(), '.', '_');
914 
915             std::string graphite_current_prefix = graphite_prefix + ".hosts." + ip_as_string_with_dash_delimiters + "." + direction_as_string;
916 
917             if (print_average_traffic_counts) {
918                 graphite_current_prefix = graphite_current_prefix + ".average";
919             }
920 
921             // We do not store zero data to Graphite
922             if (pps != 0) {
923                 graphite_data[ graphite_current_prefix + ".pps"   ] = pps;
924             }
925 
926             if (bps != 0) {
927                 graphite_data[ graphite_current_prefix + ".bps"  ]  = bps * 8;
928             }
929 
930             if (flows != 0) {
931                 graphite_data[ graphite_current_prefix + ".flows" ] = flows;
932             }
933         }
934     }
935 
936     // TODO: we should switch to piclke format instead text
937     // TODO: we should check packet size for Graphite
938     // logger << log4cpp::Priority::INFO << "We will write " << graphite_data.size() << " records to Graphite";
939 
940     if (graphite_enabled) {
941         bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
942 
943         if (!graphite_put_result) {
944             logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
945         }
946     }
947 
948     return output_buffer.str();
949 }
950 
951 // TODO: move to lirbary
952 // read whole file to vector
read_file_to_vector(std::string file_name)953 std::vector<std::string> read_file_to_vector(std::string file_name) {
954     std::vector<std::string> data;
955     std::string line;
956 
957     std::ifstream reading_file;
958 
959     reading_file.open(file_name.c_str(), std::ifstream::in);
960     if (reading_file.is_open()) {
961         while (getline(reading_file, line)) {
962             boost::algorithm::trim(line);
963             data.push_back(line);
964         }
965     } else {
966         logger << log4cpp::Priority::ERROR << "Can't open file: " << file_name;
967     }
968 
969     return data;
970 }
971 
parse_hostgroups(std::string name,std::string value)972 void parse_hostgroups(std::string name, std::string value) {
973     // We are creating new host group of subnets
974     if (name != "hostgroup") {
975         return;
976     }
977 
978     std::vector<std::string> splitted_new_host_group;
979     // We have new host groups in form:
980     // hostgroup = new_host_group_name:11.22.33.44/32,....
981     split(splitted_new_host_group, value, boost::is_any_of(":"), boost::token_compress_on);
982 
983     if (splitted_new_host_group.size() != 2) {
984         logger << log4cpp::Priority::ERROR << "We can't parse new host group";
985         return;
986     }
987 
988     boost::algorithm::trim(splitted_new_host_group[0]);
989     boost::algorithm::trim(splitted_new_host_group[1]);
990 
991     std::string host_group_name = splitted_new_host_group[0];
992 
993     if (host_groups.count(host_group_name) > 0) {
994         logger << log4cpp::Priority::WARN << "We already have this host group (" << host_group_name << "). Please check!";
995         return;
996     }
997 
998     // Split networks
999     std::vector<std::string> hostgroup_subnets = split_strings_to_vector_by_comma(splitted_new_host_group[1]);
1000     for (std::vector<std::string>::iterator itr = hostgroup_subnets.begin(); itr != hostgroup_subnets.end(); ++itr) {
1001         subnet_t subnet = convert_subnet_from_string_to_binary_with_cidr_format(*itr);
1002 
1003         host_groups[ host_group_name ].push_back( subnet );
1004 
1005         logger << log4cpp::Priority::WARN << "We add subnet " << convert_subnet_to_string( subnet )
1006             << " to host group " << host_group_name;
1007 
1008         // And add to subnet to host group lookup hash
1009         if (subnet_to_host_groups.count(subnet) > 0) {
1010             // Huston, we have problem! Subnet to host group mapping should map single subnet to single group!
1011             logger << log4cpp::Priority::WARN << "Seems you have specified single subnet " << *itr
1012                 << " to multiple host groups, please fix it, it's prohibited";
1013         } else {
1014             subnet_to_host_groups[ subnet ] = host_group_name;
1015         }
1016     }
1017 
1018     logger << log4cpp::Priority::INFO << "We have created host group " << host_group_name << " with "
1019         << host_groups[ host_group_name ].size() << " subnets";
1020 }
1021 
1022 // Load configuration
load_configuration_file()1023 bool load_configuration_file() {
1024     std::ifstream config_file(global_config_path.c_str());
1025     std::string line;
1026 
1027     if (!config_file.is_open()) {
1028         logger << log4cpp::Priority::ERROR << "Can't open config file";
1029         return false;
1030     }
1031 
1032     while (getline(config_file, line)) {
1033         std::vector<std::string> parsed_config;
1034         boost::algorithm::trim(line);
1035 
1036         if (line.find("#") == 0 or line.empty()) {
1037             // Ignore comments line
1038             continue;
1039         }
1040 
1041         boost::split(parsed_config, line, boost::is_any_of("="), boost::token_compress_on);
1042 
1043         if (parsed_config.size() == 2) {
1044             boost::algorithm::trim(parsed_config[0]);
1045             boost::algorithm::trim(parsed_config[1]);
1046 
1047             configuration_map[parsed_config[0]] = parsed_config[1];
1048 
1049             // Well, we parse host groups here
1050             parse_hostgroups(parsed_config[0], parsed_config[1]);
1051         } else {
1052             logger << log4cpp::Priority::ERROR << "Can't parse config line: '" << line << "'";
1053         }
1054     }
1055 
1056     if (configuration_map.count("enable_connection_tracking")) {
1057         if (configuration_map["enable_connection_tracking"] == "on") {
1058             enable_conection_tracking = true;
1059         } else {
1060             enable_conection_tracking = false;
1061         }
1062     }
1063 
1064     if (configuration_map.count("ban_time") != 0) {
1065         global_ban_time = convert_string_to_integer(configuration_map["ban_time"]);
1066 
1067         // Completely disable unban option
1068         if (global_ban_time == 0) {
1069             unban_enabled = false;
1070         }
1071     }
1072 
1073     if (configuration_map.count("pid_path") != 0) {
1074         pid_path = configuration_map["pid_path"];
1075     }
1076 
1077     if (configuration_map.count("cli_stats_file_path") != 0) {
1078         cli_stats_file_path = configuration_map["cli_stats_file_path"];
1079     }
1080 
1081     if (configuration_map.count("unban_only_if_attack_finished") != 0) {
1082         if (configuration_map["unban_only_if_attack_finished"] == "on") {
1083             unban_only_if_attack_finished = true;
1084         } else {
1085             unban_only_if_attack_finished = false;
1086         }
1087     }
1088 
1089     if(configuration_map.count("graphite_prefix") != 0) {
1090         graphite_prefix = configuration_map["graphite_prefix"];
1091     }
1092 
1093     if (configuration_map.count("average_calculation_time") != 0) {
1094         average_calculation_amount =
1095         convert_string_to_integer(configuration_map["average_calculation_time"]);
1096     }
1097 
1098     if (configuration_map.count("average_calculation_time_for_subnets") != 0) {
1099         average_calculation_amount_for_subnets =
1100         convert_string_to_integer(configuration_map["average_calculation_time_for_subnets"]);
1101     }
1102 
1103     if (configuration_map.count("monitor_local_ip_addresses") != 0) {
1104         monitor_local_ip_addresses = configuration_map["monitor_local_ip_addresses"] == "on" ? true : false;
1105     }
1106 
1107 #ifdef FASTNETMON_API
1108     if (configuration_map.count("enable_api") != 0) {
1109         enable_api = configuration_map["enable_api"] == "on";
1110     }
1111 #endif
1112 
1113 #ifdef ENABLE_GOBGP
1114     // GoBGP configuration
1115     if (configuration_map.count("gobgp") != 0) {
1116         gobgp_enabled = configuration_map["gobgp"] == "on";
1117     }
1118 #endif
1119 
1120     // ExaBGP configuration
1121 
1122     if (configuration_map.count("exabgp") != 0) {
1123         if (configuration_map["exabgp"] == "on") {
1124             exabgp_enabled = true;
1125         } else {
1126             exabgp_enabled = false;
1127         }
1128     }
1129 
1130     if (exabgp_enabled) {
1131         // TODO: add community format validation
1132         if (configuration_map.count("exabgp_community")) {
1133             exabgp_community = configuration_map["exabgp_community"];
1134         }
1135 
1136         if (configuration_map.count("exabgp_community_subnet")) {
1137             exabgp_community_subnet = configuration_map["exabgp_community_subnet"];
1138         } else {
1139             exabgp_community_subnet = exabgp_community;
1140         }
1141 
1142         if (configuration_map.count("exabgp_community_host")) {
1143             exabgp_community_host = configuration_map["exabgp_community_host"];
1144         } else {
1145             exabgp_community_host = exabgp_community;
1146         }
1147 
1148         if (exabgp_enabled && exabgp_announce_whole_subnet && exabgp_community_subnet.empty()) {
1149             logger << log4cpp::Priority::ERROR
1150                 << "You enabled exabgp for subnet but not specified community, we disable exabgp support";
1151 
1152             exabgp_enabled = false;
1153         }
1154 
1155         if (exabgp_enabled && exabgp_announce_host && exabgp_community_host.empty()) {
1156             logger << log4cpp::Priority::ERROR
1157                 << "You enabled exabgp for host but not specified community, we disable exabgp support";
1158 
1159             exabgp_enabled = false;
1160         }
1161     }
1162 
1163     if (exabgp_enabled) {
1164         exabgp_command_pipe = configuration_map["exabgp_command_pipe"];
1165 
1166         if (exabgp_command_pipe.empty()) {
1167             logger << log4cpp::Priority::ERROR << "You enabled exabgp but not specified "
1168                                                   "exabgp_command_pipe, so we disable exabgp "
1169                                                   "support";
1170 
1171             exabgp_enabled = false;
1172         }
1173     }
1174 
1175     if (exabgp_enabled) {
1176         exabgp_next_hop = configuration_map["exabgp_next_hop"];
1177 
1178         if (exabgp_next_hop.empty()) {
1179             logger
1180             << log4cpp::Priority::ERROR
1181             << "You enabled exabgp but not specified exabgp_next_hop, so we disable exabgp support";
1182 
1183             exabgp_enabled = false;
1184         }
1185 
1186         if (configuration_map.count("exabgp_flow_spec_announces") != 0) {
1187             exabgp_flow_spec_announces = configuration_map["exabgp_flow_spec_announces"] == "on";
1188         }
1189 
1190         if (exabgp_enabled) {
1191             logger << log4cpp::Priority::INFO << "ExaBGP support initialized correctly";
1192         }
1193     }
1194 
1195     if (configuration_map.count("sflow") != 0) {
1196         if (configuration_map["sflow"] == "on") {
1197             enable_sflow_collection = true;
1198         } else {
1199             enable_sflow_collection = false;
1200         }
1201     }
1202 
1203     if (configuration_map.count("pfring_hardware_filters_enabled") != 0) {
1204         pfring_hardware_filters_enabled = configuration_map["pfring_hardware_filters_enabled"] == "on";
1205     }
1206 
1207     if (configuration_map.count("netflow") != 0) {
1208         if (configuration_map["netflow"] == "on") {
1209             enable_netflow_collection = true;
1210         } else {
1211             enable_netflow_collection = false;
1212         }
1213     }
1214 
1215     if (configuration_map.count("exabgp_announce_whole_subnet") != 0) {
1216         exabgp_announce_whole_subnet = configuration_map["exabgp_announce_whole_subnet"] == "on" ? true : false;
1217     }
1218 
1219     if (configuration_map.count("exabgp_announce_host") != 0) {
1220         exabgp_announce_host = configuration_map["exabgp_announce_host"] == "on" ? true : false;
1221     }
1222 
1223     if (configuration_map.count("enable_subnet_counters") != 0) {
1224         enable_subnet_counters = configuration_map["enable_subnet_counters"] == "on" ? true : false;
1225     }
1226 
1227     // Graphite
1228     if (configuration_map.count("graphite") != 0) {
1229         graphite_enabled = configuration_map["graphite"] == "on" ? true : false;
1230     }
1231 
1232     if (configuration_map.count("graphite_host") != 0) {
1233         graphite_host = configuration_map["graphite_host"];
1234     }
1235 
1236     if (configuration_map.count("graphite_port") != 0) {
1237         graphite_port = convert_string_to_integer(configuration_map["graphite_port"]);
1238     }
1239 
1240     if (configuration_map.count("graphite_number_of_ips") != 0) {
1241         logger << log4cpp::Priority::ERROR << "Sorry, you have used deprecated function graphite_number_of_ips";
1242     }
1243 
1244     if (configuration_map.count("process_incoming_traffic") != 0) {
1245         process_incoming_traffic = configuration_map["process_incoming_traffic"] == "on" ? true : false;
1246     }
1247 
1248     if (configuration_map.count("process_outgoing_traffic") != 0) {
1249         process_outgoing_traffic = configuration_map["process_outgoing_traffic"] == "on" ? true : false;
1250     }
1251 
1252     if (configuration_map.count("mirror") != 0) {
1253         if (configuration_map["mirror"] == "on") {
1254             enable_data_collection_from_mirror = true;
1255         } else {
1256             enable_data_collection_from_mirror = false;
1257         }
1258     }
1259 
1260     if (configuration_map.count("mirror_netmap") != 0) {
1261         if (configuration_map["mirror_netmap"] == "on") {
1262             enable_netmap_collection = true;
1263         } else {
1264             enable_netmap_collection = false;
1265         }
1266     }
1267 
1268     if (configuration_map.count("mirror_snabbswitch") != 0) {
1269         enable_snabbswitch_collection = configuration_map["mirror_snabbswitch"] == "on";
1270     }
1271 
1272     if (configuration_map.count("mirror_afpacket") != 0) {
1273         enable_afpacket_collection = configuration_map["mirror_afpacket"] == "on";
1274     }
1275 
1276     if (enable_netmap_collection && enable_data_collection_from_mirror) {
1277         logger << log4cpp::Priority::ERROR << "You have enabled pfring and netmap data collection "
1278                                               "from mirror which strictly prohibited, please "
1279                                               "select one";
1280         exit(1);
1281     }
1282 
1283     if (configuration_map.count("pcap") != 0) {
1284         if (configuration_map["pcap"] == "on") {
1285             enable_pcap_collection = true;
1286         } else {
1287             enable_pcap_collection = false;
1288         }
1289     }
1290 
1291     // Read global ban configuration
1292     global_ban_settings = read_ban_settings(configuration_map, "");
1293 
1294     logging_configuration = read_logging_settings(configuration_map);
1295 
1296     // logger << log4cpp::Priority::INFO << "We read global ban settings: " << print_ban_thresholds(global_ban_settings);
1297 
1298     // Read host group ban settings
1299     for (host_group_map_t::iterator hostgroup_itr = host_groups.begin(); hostgroup_itr != host_groups.end(); ++hostgroup_itr) {
1300         std::string host_group_name = hostgroup_itr->first;
1301 
1302         logger << log4cpp::Priority::INFO << "We will read ban settings for " << host_group_name;
1303 
1304         host_group_ban_settings_map[ host_group_name ] =  read_ban_settings(configuration_map, host_group_name);
1305 
1306         //logger << log4cpp::Priority::INFO << "We read " << host_group_name << " ban settings "
1307         //    << print_ban_thresholds(host_group_ban_settings_map[ host_group_name ]);
1308     }
1309 
1310     if (configuration_map.count("white_list_path") != 0) {
1311         white_list_path = configuration_map["white_list_path"];
1312     }
1313 
1314     if (configuration_map.count("networks_list_path") != 0) {
1315         networks_list_path = configuration_map["networks_list_path"];
1316     }
1317 
1318 #ifdef REDIS
1319     if (configuration_map.count("redis_port") != 0) {
1320         redis_port = convert_string_to_integer(configuration_map["redis_port"]);
1321     }
1322 
1323     if (configuration_map.count("redis_host") != 0) {
1324         redis_host = configuration_map["redis_host"];
1325     }
1326 
1327     if (configuration_map.count("redis_prefix") != 0) {
1328         redis_prefix = configuration_map["redis_prefix"];
1329     }
1330 
1331     if (configuration_map.count("redis_enabled") != 0) {
1332         // We use yes and on because it's stupid typo :(
1333         if (configuration_map["redis_enabled"] == "on" or configuration_map["redis_enabled"] == "yes") {
1334             redis_enabled = true;
1335         } else {
1336             redis_enabled = false;
1337         }
1338     }
1339 #endif
1340 
1341 #ifdef MONGO
1342     if (configuration_map.count("mongodb_enabled") != 0) {
1343         if (configuration_map["mongodb_enabled"] == "on") {
1344             mongodb_enabled = true;
1345         }
1346     }
1347 
1348     if (configuration_map.count("mongodb_host") != 0) {
1349         mongodb_host = configuration_map["mongodb_host"];
1350     }
1351 
1352     if (configuration_map.count("mongodb_port") != 0) {
1353         mongodb_port = convert_string_to_integer(configuration_map["mongodb_port"]);
1354     }
1355 
1356     if (configuration_map.count("mongodb_database_name") != 0) {
1357         mongodb_database_name = configuration_map["mongodb_database_name"];
1358     }
1359 #endif
1360 
1361     if (configuration_map.count("ban_details_records_count") != 0) {
1362         ban_details_records_count =
1363         convert_string_to_integer(configuration_map["ban_details_records_count"]);
1364     }
1365 
1366     if (configuration_map.count("check_period") != 0) {
1367         check_period = convert_string_to_integer(configuration_map["check_period"]);
1368     }
1369 
1370     if (configuration_map.count("sort_parameter") != 0) {
1371         sort_parameter = configuration_map["sort_parameter"];
1372     }
1373 
1374     if (configuration_map.count("max_ips_in_list") != 0) {
1375         max_ips_in_list = convert_string_to_integer(configuration_map["max_ips_in_list"]);
1376     }
1377 
1378     if (configuration_map.count("notify_script_path") != 0) {
1379         notify_script_path = configuration_map["notify_script_path"];
1380     }
1381 
1382     if (configuration_map.count("notify_script_pass_details") != 0) {
1383         notify_script_pass_details = configuration_map["notify_script_pass_details"] == "on" ? true : false;
1384     }
1385 
1386     if (file_exists(notify_script_path)) {
1387         notify_script_enabled = true;
1388     } else {
1389         logger << log4cpp::Priority::ERROR << "We can't find notify script " << notify_script_path;
1390         notify_script_enabled = false;
1391     }
1392 
1393     if (configuration_map.count("collect_attack_pcap_dumps") != 0) {
1394         collect_attack_pcap_dumps = configuration_map["collect_attack_pcap_dumps"] == "on" ? true : false;
1395     }
1396 
1397     if (configuration_map.count("process_pcap_attack_dumps_with_dpi") != 0) {
1398         if (collect_attack_pcap_dumps) {
1399             process_pcap_attack_dumps_with_dpi = configuration_map["process_pcap_attack_dumps_with_dpi"] == "on" ? true : false;
1400         }
1401     }
1402 
1403     return true;
1404 }
1405 
1406 /* Enable core dumps for simplify debug tasks */
enable_core_dumps()1407 void enable_core_dumps() {
1408     struct rlimit rlim;
1409 
1410     int result = getrlimit(RLIMIT_CORE, &rlim);
1411 
1412     if (result) {
1413         logger << log4cpp::Priority::ERROR << "Can't get current rlimit for RLIMIT_CORE";
1414         return;
1415     } else {
1416         rlim.rlim_cur = rlim.rlim_max;
1417         setrlimit(RLIMIT_CORE, &rlim);
1418     }
1419 }
1420 
subnet_vectors_allocator(prefix_t * prefix,void * data)1421 void subnet_vectors_allocator(prefix_t* prefix, void* data) {
1422     // Network byte order
1423     uint32_t subnet_as_integer = prefix->add.sin.s_addr;
1424 
1425     u_short bitlen = prefix->bitlen;
1426     double base = 2;
1427     int network_size_in_ips = pow(base, 32 - bitlen);
1428     // logger<< log4cpp::Priority::INFO<<"Subnet: "<<prefix->add.sin.s_addr<<" network size:
1429     // "<<network_size_in_ips;
1430     logger << log4cpp::Priority::INFO << "I will allocate " << network_size_in_ips
1431            << " records for subnet " << subnet_as_integer << " cidr mask: " << bitlen;
1432 
1433     subnet_t current_subnet = std::make_pair(subnet_as_integer, bitlen);
1434 
1435     map_element zero_map_element;
1436     memset(&zero_map_element, 0, sizeof(zero_map_element));
1437 
1438     // Initilize our counters with fill constructor
1439     try {
1440         SubnetVectorMap[current_subnet]             = vector_of_counters(network_size_in_ips, zero_map_element);
1441         SubnetVectorMapSpeed[current_subnet]        = vector_of_counters(network_size_in_ips, zero_map_element);
1442         SubnetVectorMapSpeedAverage[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element);
1443     } catch (std::bad_alloc& ba) {
1444         logger << log4cpp::Priority::ERROR << "Can't allocate memory for counters";
1445         exit(1);
1446     }
1447 
1448     // Initilize map element
1449     SubnetVectorMapFlow[current_subnet] = vector_of_flow_counters(network_size_in_ips);
1450 
1451     // On creating it initilizes by zeros
1452     conntrack_main_struct zero_conntrack_main_struct;
1453     std::fill(SubnetVectorMapFlow[current_subnet].begin(),
1454               SubnetVectorMapFlow[current_subnet].end(), zero_conntrack_main_struct);
1455 
1456     PerSubnetCountersMap[current_subnet] = zero_map_element;
1457     PerSubnetSpeedMap[current_subnet] = zero_map_element;
1458 }
1459 
zeroify_all_counters()1460 void zeroify_all_counters() {
1461     map_element zero_map_element;
1462     memset(&zero_map_element, 0, sizeof(zero_map_element));
1463 
1464     for (map_of_vector_counters::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) {
1465         // logger<< log4cpp::Priority::INFO<<"Zeroify "<<itr->first;
1466         std::fill(itr->second.begin(), itr->second.end(), zero_map_element);
1467     }
1468 }
1469 
zeroify_all_flow_counters()1470 void zeroify_all_flow_counters() {
1471     // On creating it initilizes by zeros
1472     conntrack_main_struct zero_conntrack_main_struct;
1473 
1474     // Iterate over map
1475     for (map_of_vector_counters_for_flow::iterator itr = SubnetVectorMapFlow.begin();
1476          itr != SubnetVectorMapFlow.end(); ++itr) {
1477         // Iterate over vector
1478         for (vector_of_flow_counters::iterator vector_iterator = itr->second.begin();
1479              vector_iterator != itr->second.end(); ++vector_iterator) {
1480             // TODO: rewrite this monkey code
1481             vector_iterator->in_tcp.clear();
1482             vector_iterator->in_udp.clear();
1483             vector_iterator->in_icmp.clear();
1484             vector_iterator->in_other.clear();
1485 
1486             vector_iterator->out_tcp.clear();
1487             vector_iterator->out_udp.clear();
1488             vector_iterator->out_icmp.clear();
1489             vector_iterator->out_other.clear();
1490         }
1491     }
1492 }
1493 
load_our_networks_list()1494 bool load_our_networks_list() {
1495     if (file_exists(white_list_path)) {
1496         unsigned int network_entries = 0;
1497         std::vector<std::string> network_list_from_config = read_file_to_vector(white_list_path);
1498 
1499         for (std::vector<std::string>::iterator ii = network_list_from_config.begin();
1500              ii != network_list_from_config.end(); ++ii) {
1501             std::string text_subnet = *ii;
1502             if (text_subnet.empty()) {
1503                 continue;
1504             }
1505             if (is_v4_host(text_subnet)) {
1506                 logger << log4cpp::Priority::INFO << "Assuming /32 netmask for " << text_subnet;
1507                 text_subnet += "/32";
1508             } else if (!is_cidr_subnet(text_subnet)) {
1509                 logger << log4cpp::Priority::ERROR << "Can't parse line from whitelist: " << text_subnet;
1510                 continue;
1511             }
1512             network_entries++;
1513             make_and_lookup(whitelist_tree_ipv4, const_cast<char*> (text_subnet.c_str()));
1514 
1515         }
1516 
1517         logger << log4cpp::Priority::INFO << "We loaded " << network_entries
1518                << " networks from whitelist file";
1519     }
1520 
1521     std::vector<std::string> networks_list_ipv4_as_string;
1522     std::vector<std::string> networks_list_ipv6_as_string;
1523 
1524     // We can bould "our subnets" automatically here
1525     if (file_exists("/proc/vz/version")) {
1526         logger << log4cpp::Priority::INFO << "We found OpenVZ";
1527         // Add /32 CIDR mask for every IP here
1528         std::vector<std::string> openvz_ips = read_file_to_vector("/proc/vz/veip");
1529         for (std::vector<std::string>::iterator ii = openvz_ips.begin(); ii != openvz_ips.end(); ++ii) {
1530             // skip header
1531             if (strstr(ii->c_str(), "Version") != NULL) {
1532                 continue;
1533             }
1534 
1535             /*
1536                 Example data for this lines:
1537                 2a03:f480:1:17:0:0:0:19          0
1538                             185.4.72.40          0
1539             */
1540 
1541             if (strstr(ii->c_str(), ":") == NULL) {
1542                 // IPv4
1543 
1544                 std::vector<std::string> subnet_as_string;
1545                 split(subnet_as_string, *ii, boost::is_any_of(" "), boost::token_compress_on);
1546 
1547                 std::string openvz_subnet = subnet_as_string[1] + "/32";
1548                 networks_list_ipv4_as_string.push_back(openvz_subnet);
1549             } else {
1550                 // IPv6
1551 
1552                 std::vector<std::string> subnet_as_string;
1553                 split(subnet_as_string, *ii, boost::is_any_of(" "), boost::token_compress_on);
1554 
1555                 std::string openvz_subnet = subnet_as_string[1] + "/128";
1556                 networks_list_ipv6_as_string.push_back(openvz_subnet);
1557             }
1558         }
1559 
1560         logger << log4cpp::Priority::INFO << "We loaded " << networks_list_ipv4_as_string.size()
1561                << " IPv4 networks from /proc/vz/veip";
1562 
1563         logger << log4cpp::Priority::INFO << "We loaded " << networks_list_ipv6_as_string.size()
1564                 << " IPv6 networks from /proc/vz/veip";
1565     }
1566 
1567     if (monitor_local_ip_addresses && file_exists("/sbin/ip")) {
1568         logger << log4cpp::Priority::INFO
1569                << "We are working on Linux and could use ip tool for detecting local IP's";
1570 
1571         ip_addresses_list_t ip_list = get_local_ip_v4_addresses_list();
1572 
1573         logger << log4cpp::Priority::INFO << "We found " << ip_list.size()
1574                << " local IP addresses and will monitor they";
1575 
1576         for (ip_addresses_list_t::iterator iter = ip_list.begin(); iter != ip_list.end(); ++iter) {
1577             // TODO: add IPv6 here
1578             networks_list_ipv4_as_string.push_back(*iter + "/32");
1579         }
1580     }
1581 
1582     if (file_exists(networks_list_path)) {
1583         std::vector<std::string> network_list_from_config = read_file_to_vector(networks_list_path);
1584 
1585         for (std::vector<std::string>::iterator line_itr = network_list_from_config.begin(); line_itr != network_list_from_config.end(); ++line_itr) {
1586 
1587             if (line_itr->length() == 0) {
1588                 // Skip blank lines in subnet list file silently
1589                 continue;
1590             }
1591 
1592             if (strstr(line_itr->c_str(), ":") == NULL) {
1593                 networks_list_ipv4_as_string.push_back(*line_itr);
1594             } else {
1595                 networks_list_ipv6_as_string.push_back(*line_itr);
1596             }
1597         }
1598 
1599         logger << log4cpp::Priority::INFO << "We loaded " << network_list_from_config.size()
1600                << " networks from networks file";
1601     }
1602 
1603     // Some consistency checks
1604     assert(convert_ip_as_string_to_uint("255.255.255.0") == convert_cidr_to_binary_netmask(24));
1605     assert(convert_ip_as_string_to_uint("255.255.255.255") == convert_cidr_to_binary_netmask(32));
1606 
1607     logger << log4cpp::Priority::INFO << "Totally we have " << networks_list_ipv4_as_string.size() << " IPv4 subnets";
1608     logger << log4cpp::Priority::INFO << "Totally we have " << networks_list_ipv6_as_string.size() << " IPv6 subnets";
1609 
1610     for (std::vector<std::string>::iterator ii = networks_list_ipv4_as_string.begin();
1611          ii != networks_list_ipv4_as_string.end(); ++ii) {
1612 
1613         if (!is_cidr_subnet(*ii)) {
1614             logger << log4cpp::Priority::ERROR << "Can't parse line from subnet list: '" << *ii << "'";
1615             continue;
1616         }
1617 
1618         std::string network_address_in_cidr_form = *ii;
1619 
1620         unsigned int cidr_mask = get_cidr_mask_from_network_as_string(network_address_in_cidr_form);
1621         std::string network_address = get_net_address_from_network_as_string(network_address_in_cidr_form);
1622 
1623         double base = 2;
1624         total_number_of_hosts_in_our_networks += pow(base, 32 - cidr_mask);
1625 
1626         // Make sure it's "subnet address" and not an host address
1627         uint32_t subnet_address_as_uint = convert_ip_as_string_to_uint(network_address);
1628         uint32_t subnet_address_netmask_binary = convert_cidr_to_binary_netmask(cidr_mask);
1629         uint32_t generated_subnet_address = subnet_address_as_uint & subnet_address_netmask_binary;
1630 
1631         if (subnet_address_as_uint != generated_subnet_address) {
1632             std::string new_network_address_as_string =
1633             convert_ip_as_uint_to_string(generated_subnet_address) + "/" + convert_int_to_string(cidr_mask);
1634 
1635             logger << log4cpp::Priority::WARN << "We will use " << new_network_address_as_string << " instead of "
1636                    << network_address_in_cidr_form << " because it's host address";
1637 
1638             network_address_in_cidr_form = new_network_address_as_string;
1639         }
1640 
1641         make_and_lookup(lookup_tree_ipv4, const_cast<char*>(network_address_in_cidr_form.c_str()));
1642     }
1643 
1644     for (std::vector<std::string>::iterator ii = networks_list_ipv6_as_string.begin();
1645          ii != networks_list_ipv6_as_string.end(); ++ii) {
1646 
1647         // TODO: add IPv6 subnet format validation
1648         make_and_lookup_ipv6(lookup_tree_ipv6, (char*)ii->c_str());
1649     }
1650 
1651     logger << log4cpp::Priority::INFO
1652            << "Total number of monitored hosts (total size of all networks): " << total_number_of_hosts_in_our_networks;
1653 
1654     // 3 - speed counter, average speed counter and data counter
1655     uint64_t memory_requirements = 3 * sizeof(map_element) * total_number_of_hosts_in_our_networks / 1024 / 1024;
1656 
1657     logger << log4cpp::Priority::INFO
1658         << "We need " << memory_requirements << " MB of memory for storing counters for your networks";
1659 
1660     /* Preallocate data structures */
1661     patricia_process(lookup_tree_ipv4, (void_fn_t)subnet_vectors_allocator);
1662 
1663     logger << log4cpp::Priority::INFO << "We start total zerofication of counters";
1664     zeroify_all_counters();
1665     logger << log4cpp::Priority::INFO << "We finished zerofication";
1666 
1667     logger << log4cpp::Priority::INFO << "We loaded " << networks_list_ipv4_as_string.size()
1668            << " IPv4 subnets to our in-memory list of networks";
1669 
1670     return true;
1671 }
1672 
1673 #ifdef IPV6_HASH_COUNTERS
1674 
1675 moodycamel::ConcurrentQueue<simple_packet> multi_process_queue_for_ipv6_counters;
1676 
ipv6_traffic_processor()1677 void ipv6_traffic_processor() {
1678     simple_packet packets_from_queue[32];
1679 
1680     while (true) {
1681         std::size_t count = 0;
1682 
1683         while ((count = multi_process_queue_for_ipv6_counters.try_dequeue_bulk(packets_from_queue, 32)) != 0) {
1684             for (std::size_t i = 0; i != count; ++i) {
1685 #ifdef USE_NEW_ATOMIC_BUILTINS
1686                 __atomic_add_fetch(&total_ipv6_packets, 1, __ATOMIC_RELAXED);
1687 #else
1688                 __sync_fetch_and_add(&total_ipv6_packets, 1);
1689 #endif
1690 
1691                 direction packet_direction = packets_from_queue[i].packet_direction;
1692 
1693                 uint64_t sampled_number_of_packets = packets_from_queue[i].number_of_packets * packets_from_queue[i].sample_ratio;
1694                 uint64_t sampled_number_of_bytes = packets_from_queue[i].length * packets_from_queue[i].sample_ratio;
1695 
1696 #ifdef USE_NEW_ATOMIC_BUILTINS
1697                 __atomic_add_fetch(&total_counters[packet_direction].packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1698                 __atomic_add_fetch(&total_counters[packet_direction].bytes,   sampled_number_of_bytes, __ATOMIC_RELAXED);
1699 #else
1700                 __sync_fetch_and_add(&total_counters[packet_direction].packets, sampled_number_of_packets);
1701                 __sync_fetch_and_add(&total_counters[packet_direction].bytes,   sampled_number_of_bytes);
1702 #endif
1703 
1704                 if (packet_direction != OTHER) {
1705 #ifdef USE_NEW_ATOMIC_BUILTINS
1706                     __atomic_add_fetch(&our_ipv6_packets, 1, __ATOMIC_RELAXED);
1707 #else
1708                     __sync_fetch_and_add(&our_ipv6_packets, 1);
1709 #endif
1710                 }
1711             }
1712         }
1713     }
1714 }
1715 
1716 #endif
1717 
1718 /* Process simple unified packet */
process_packet(simple_packet & current_packet)1719 void process_packet(simple_packet& current_packet) {
1720     // Packets dump is very useful for bug hunting
1721     if (DEBUG_DUMP_ALL_PACKETS) {
1722         logger << log4cpp::Priority::INFO << "Dump: " << print_simple_packet(current_packet);
1723     }
1724 
1725     if (current_packet.ip_protocol_version == 6) {
1726 #ifdef IPV6_HASH_COUNTERS
1727         current_packet.packet_direction = get_packet_direction_ipv6(lookup_tree_ipv6, current_packet.src_ipv6, current_packet.dst_ipv6);
1728 
1729         // TODO: move to bulk operations here!
1730         multi_process_queue_for_ipv6_counters.enqueue(current_packet);
1731 #else
1732 
1733 
1734 #ifdef USE_NEW_ATOMIC_BUILTINS
1735         __atomic_add_fetch(&total_ipv6_packets, 1, __ATOMIC_RELAXED);
1736 #else
1737         __sync_fetch_and_add(&total_ipv6_packets, 1);
1738 #endif
1739 
1740 #endif
1741 
1742         return;
1743     }
1744 
1745     // We do not process IPv6 at all on this mement
1746     if (current_packet.ip_protocol_version != 4) {
1747         return;
1748     }
1749 
1750     // Subnet for found IPs
1751     unsigned long subnet = 0;
1752     unsigned int subnet_cidr_mask = 0;
1753 
1754     direction packet_direction = get_packet_direction(lookup_tree_ipv4, current_packet.src_ip, current_packet.dst_ip, subnet, subnet_cidr_mask);
1755 
1756     // It's useful in case when we can't find what packets do not processed correctly
1757     if (DEBUG_DUMP_OTHER_PACKETS && packet_direction == OTHER) {
1758         logger << log4cpp::Priority::INFO << "Dump other: " << print_simple_packet(current_packet);
1759     }
1760 
1761     // Skip processing of specific traffic direction
1762     if ((packet_direction == INCOMING && !process_incoming_traffic) or
1763         (packet_direction == OUTGOING && !process_outgoing_traffic)) {
1764         return;
1765     }
1766 
1767     subnet_t current_subnet = std::make_pair(subnet, subnet_cidr_mask);
1768 
1769     uint32_t subnet_in_host_byte_order = 0;
1770     // We operate in host bytes order and need to convert subnet
1771     if (subnet != 0) {
1772         subnet_in_host_byte_order = ntohl(current_subnet.first);
1773     }
1774 
1775     // Try to find map key for this subnet
1776     map_of_vector_counters::iterator itr;
1777 
1778     // Iterator for subnet counter
1779     subnet_counter_t* subnet_counter = NULL;
1780 
1781     if (packet_direction == OUTGOING or packet_direction == INCOMING) {
1782         // Find element in map of vectors
1783         itr = SubnetVectorMap.find(current_subnet);
1784 
1785         if (itr == SubnetVectorMap.end()) {
1786             logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map";
1787             return;
1788         }
1789 
1790         if (enable_subnet_counters) {
1791             map_for_subnet_counters::iterator subnet_iterator;
1792 
1793             // Find element in map of subnet counters
1794             subnet_iterator = PerSubnetCountersMap.find(current_subnet);
1795 
1796             if (subnet_iterator == PerSubnetCountersMap.end()) {
1797                 logger << log4cpp::Priority::ERROR << "Can't find counter structure for subnet";
1798                 return;
1799             }
1800 
1801             subnet_counter = &subnet_iterator->second;
1802         }
1803     }
1804 
1805     map_of_vector_counters_for_flow::iterator itr_flow;
1806 
1807     if (enable_conection_tracking) {
1808         if (packet_direction == OUTGOING or packet_direction == INCOMING) {
1809             itr_flow = SubnetVectorMapFlow.find(current_subnet);
1810 
1811             if (itr_flow == SubnetVectorMapFlow.end()) {
1812                 logger << log4cpp::Priority::ERROR
1813                        << "Can't find vector address in subnet flow map";
1814                 return;
1815             }
1816         }
1817     }
1818 
1819     /* Because we support mirroring, sflow and netflow we should support different cases:
1820         - One packet passed for processing (mirror)
1821         - Multiple packets ("flows") passed for processing (netflow)
1822         - One sampled packed passed for processing (netflow)
1823         - Another combinations of this three options
1824     */
1825 
1826     uint64_t sampled_number_of_packets = current_packet.number_of_packets * current_packet.sample_ratio;
1827     uint64_t sampled_number_of_bytes = current_packet.length * current_packet.sample_ratio;
1828 
1829 #ifdef USE_NEW_ATOMIC_BUILTINS
1830     __atomic_add_fetch(&total_counters[packet_direction].packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1831     __atomic_add_fetch(&total_counters[packet_direction].bytes,   sampled_number_of_bytes, __ATOMIC_RELAXED);
1832 #else
1833     __sync_fetch_and_add(&total_counters[packet_direction].packets, sampled_number_of_packets);
1834     __sync_fetch_and_add(&total_counters[packet_direction].bytes,   sampled_number_of_bytes);
1835 #endif
1836 
1837     // Incerementi main and per protocol packet counters
1838     if (packet_direction == OUTGOING) {
1839         int64_t shift_in_vector = (int64_t)ntohl(current_packet.src_ip) - (int64_t)subnet_in_host_byte_order;
1840 
1841         if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
1842             logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
1843                    << " which located outside allocated vector with size " << itr->second.size();
1844 
1845             logger << log4cpp::Priority::ERROR
1846                    << "We expect issues with this packet in OUTGOING direction: "
1847                    << print_simple_packet(current_packet);
1848 
1849             return;
1850         }
1851 
1852         map_element* current_element = &itr->second[shift_in_vector];
1853 
1854         // Main packet/bytes counter
1855 #ifdef USE_NEW_ATOMIC_BUILTINS
1856         __atomic_add_fetch(&current_element->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1857         __atomic_add_fetch(&current_element->out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
1858 #else
1859         __sync_fetch_and_add(&current_element->out_packets, sampled_number_of_packets);
1860         __sync_fetch_and_add(&current_element->out_bytes, sampled_number_of_bytes);
1861 #endif
1862 
1863         // Fragmented IP packets
1864         if (current_packet.ip_fragmented) {
1865 #ifdef USE_NEW_ATOMIC_BUILTINS
1866             __atomic_add_fetch(&current_element->fragmented_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1867             __atomic_add_fetch(&current_element->fragmented_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
1868 #else
1869             __sync_fetch_and_add(&current_element->fragmented_out_packets, sampled_number_of_packets);
1870             __sync_fetch_and_add(&current_element->fragmented_out_bytes, sampled_number_of_bytes);
1871 #endif
1872         }
1873 
1874         // TODO: add another counters
1875         if (enable_subnet_counters) {
1876 #ifdef USE_NEW_ATOMIC_BUILTINS
1877             __atomic_add_fetch(&subnet_counter->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1878             __atomic_add_fetch(&subnet_counter->out_bytes,   sampled_number_of_bytes, __ATOMIC_RELAXED);
1879 #else
1880             __sync_fetch_and_add(&subnet_counter->out_packets, sampled_number_of_packets);
1881             __sync_fetch_and_add(&subnet_counter->out_bytes,   sampled_number_of_bytes);
1882 #endif
1883         }
1884 
1885         conntrack_main_struct* current_element_flow = NULL;
1886         if (enable_conection_tracking) {
1887             current_element_flow = &itr_flow->second[shift_in_vector];
1888         }
1889 
1890         // Collect data when ban client
1891         if (!ban_list_details.empty() && ban_list_details.count(current_packet.src_ip) > 0 &&
1892             ban_list_details[current_packet.src_ip].size() < ban_details_records_count) {
1893 
1894             ban_list_details_mutex.lock();
1895 
1896             if (collect_attack_pcap_dumps) {
1897                 // this code SHOULD NOT be called without mutex!
1898                 if (current_packet.packet_payload_length > 0 && current_packet.packet_payload_pointer != NULL) {
1899                     ban_list[current_packet.src_ip].pcap_attack_dump.write_packet(current_packet.packet_payload_pointer,
1900                         current_packet.packet_payload_length);
1901                 }
1902             }
1903 
1904             ban_list_details[current_packet.src_ip].push_back(current_packet);
1905             ban_list_details_mutex.unlock();
1906         }
1907 
1908         uint64_t connection_tracking_hash = 0;
1909 
1910         if (enable_conection_tracking) {
1911             packed_conntrack_hash flow_tracking_structure;
1912             flow_tracking_structure.opposite_ip = current_packet.dst_ip;
1913             flow_tracking_structure.src_port = current_packet.source_port;
1914             flow_tracking_structure.dst_port = current_packet.destination_port;
1915 
1916             // convert this struct to 64 bit integer
1917             connection_tracking_hash = convert_conntrack_hash_struct_to_integer(&flow_tracking_structure);
1918         }
1919 
1920         if (current_packet.protocol == IPPROTO_TCP) {
1921 #ifdef USE_NEW_ATOMIC_BUILTINS
1922             __atomic_add_fetch(&current_element->tcp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1923             __atomic_add_fetch(&current_element->tcp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
1924 #else
1925             __sync_fetch_and_add(&current_element->tcp_out_packets, sampled_number_of_packets);
1926             __sync_fetch_and_add(&current_element->tcp_out_bytes, sampled_number_of_bytes);
1927 #endif
1928 
1929             if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
1930 #ifdef USE_NEW_ATOMIC_BUILTINS
1931                 __atomic_add_fetch(&current_element->tcp_syn_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1932                 __atomic_add_fetch(&current_element->tcp_syn_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
1933 #else
1934                 __sync_fetch_and_add(&current_element->tcp_syn_out_packets, sampled_number_of_packets);
1935                 __sync_fetch_and_add(&current_element->tcp_syn_out_bytes, sampled_number_of_bytes);
1936 #endif
1937             }
1938 
1939             if (enable_conection_tracking) {
1940                 flow_counter.lock();
1941                 conntrack_key_struct* conntrack_key_struct_ptr =
1942                 &current_element_flow->out_tcp[connection_tracking_hash];
1943 
1944                 conntrack_key_struct_ptr->packets += sampled_number_of_packets;
1945                 conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
1946 
1947                 flow_counter.unlock();
1948             }
1949         } else if (current_packet.protocol == IPPROTO_UDP) {
1950 #ifdef USE_NEW_ATOMIC_BUILTINS
1951             __atomic_add_fetch(&current_element->udp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1952             __atomic_add_fetch(&current_element->udp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
1953 #else
1954             __sync_fetch_and_add(&current_element->udp_out_packets, sampled_number_of_packets);
1955             __sync_fetch_and_add(&current_element->udp_out_bytes, sampled_number_of_bytes);
1956 #endif
1957 
1958             if (enable_conection_tracking) {
1959                 flow_counter.lock();
1960                 conntrack_key_struct* conntrack_key_struct_ptr =
1961                 &current_element_flow->out_udp[connection_tracking_hash];
1962 
1963                 conntrack_key_struct_ptr->packets += sampled_number_of_packets;
1964                 conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
1965 
1966                 flow_counter.unlock();
1967             }
1968         } else if (current_packet.protocol == IPPROTO_ICMP) {
1969 #ifdef USE_NEW_ATOMIC_BUILTINS
1970             __atomic_add_fetch(&current_element->icmp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1971             __atomic_add_fetch(&current_element->icmp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
1972 #else
1973             __sync_fetch_and_add(&current_element->icmp_out_packets, sampled_number_of_packets);
1974             __sync_fetch_and_add(&current_element->icmp_out_bytes, sampled_number_of_bytes);
1975 #endif
1976             // no flow tracking for icmp
1977         } else {
1978         }
1979 
1980     } else if (packet_direction == INCOMING) {
1981         int64_t shift_in_vector = (int64_t)ntohl(current_packet.dst_ip) - (int64_t)subnet_in_host_byte_order;
1982 
1983         if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
1984             logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
1985                    << " which located outside allocated vector with size " << itr->second.size();
1986 
1987             logger << log4cpp::Priority::ERROR
1988                    << "We expect issues with this packet in INCOMING direction: "
1989                    << print_simple_packet(current_packet);
1990 
1991             return;
1992         }
1993 
1994         map_element* current_element = &itr->second[shift_in_vector];
1995 
1996         // Main packet/bytes counter
1997 #ifdef USE_NEW_ATOMIC_BUILTINS
1998         __atomic_add_fetch(&current_element->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
1999         __atomic_add_fetch(&current_element->in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
2000 #else
2001         __sync_fetch_and_add(&current_element->in_packets, sampled_number_of_packets);
2002         __sync_fetch_and_add(&current_element->in_bytes, sampled_number_of_bytes);
2003 #endif
2004 
2005         if (enable_subnet_counters) {
2006 #ifdef USE_NEW_ATOMIC_BUILTINS
2007             __atomic_add_fetch(&subnet_counter->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
2008             __atomic_add_fetch(&subnet_counter->in_bytes,   sampled_number_of_bytes, __ATOMIC_RELAXED);
2009 #else
2010             __sync_fetch_and_add(&subnet_counter->in_packets, sampled_number_of_packets);
2011             __sync_fetch_and_add(&subnet_counter->in_bytes,   sampled_number_of_bytes);
2012 #endif
2013         }
2014 
2015         // Count fragmented IP packets
2016         if (current_packet.ip_fragmented) {
2017 #ifdef USE_NEW_ATOMIC_BUILTINS
2018             __atomic_add_fetch(&current_element->fragmented_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
2019             __atomic_add_fetch(&current_element->fragmented_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
2020 #else
2021             __sync_fetch_and_add(&current_element->fragmented_in_packets, sampled_number_of_packets);
2022             __sync_fetch_and_add(&current_element->fragmented_in_bytes, sampled_number_of_bytes);
2023 #endif
2024         }
2025 
2026         conntrack_main_struct* current_element_flow = NULL;
2027 
2028         if (enable_conection_tracking) {
2029             current_element_flow = &itr_flow->second[shift_in_vector];
2030         }
2031 
2032         uint64_t connection_tracking_hash = 0;
2033         if (enable_conection_tracking) {
2034             packed_conntrack_hash flow_tracking_structure;
2035             flow_tracking_structure.opposite_ip = current_packet.src_ip;
2036             flow_tracking_structure.src_port = current_packet.source_port;
2037             flow_tracking_structure.dst_port = current_packet.destination_port;
2038 
2039             // convert this struct to 64 bit integer
2040             connection_tracking_hash = convert_conntrack_hash_struct_to_integer(&flow_tracking_structure);
2041         }
2042 
2043         // Collect attack details
2044         if (!ban_list_details.empty() && ban_list_details.count(current_packet.dst_ip) > 0 &&
2045             ban_list_details[current_packet.dst_ip].size() < ban_details_records_count) {
2046 
2047             ban_list_details_mutex.lock();
2048 
2049             if (collect_attack_pcap_dumps) {
2050                 // this code SHOULD NOT be called without mutex!
2051                 if (current_packet.packet_payload_length > 0 && current_packet.packet_payload_pointer != NULL) {
2052                     ban_list[current_packet.dst_ip].pcap_attack_dump.write_packet(current_packet.packet_payload_pointer,
2053                         current_packet.packet_payload_length);
2054                 }
2055             }
2056 
2057             ban_list_details[current_packet.dst_ip].push_back(current_packet);
2058             ban_list_details_mutex.unlock();
2059         }
2060 
2061         if (current_packet.protocol == IPPROTO_TCP) {
2062 #ifdef USE_NEW_ATOMIC_BUILTINS
2063             __atomic_add_fetch(&current_element->tcp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
2064             __atomic_add_fetch(&current_element->tcp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
2065 #else
2066             __sync_fetch_and_add(&current_element->tcp_in_packets, sampled_number_of_packets);
2067             __sync_fetch_and_add(&current_element->tcp_in_bytes, sampled_number_of_bytes);
2068 #endif
2069 
2070             if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
2071 #ifdef USE_NEW_ATOMIC_BUILTINS
2072                 __atomic_add_fetch(&current_element->tcp_syn_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
2073                 __atomic_add_fetch(&current_element->tcp_syn_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
2074 #else
2075                 __sync_fetch_and_add(&current_element->tcp_syn_in_packets, sampled_number_of_packets);
2076                 __sync_fetch_and_add(&current_element->tcp_syn_in_bytes, sampled_number_of_bytes);
2077 #endif
2078             }
2079 
2080             if (enable_conection_tracking) {
2081                 flow_counter.lock();
2082                 conntrack_key_struct* conntrack_key_struct_ptr =
2083                 &current_element_flow->in_tcp[connection_tracking_hash];
2084 
2085                 conntrack_key_struct_ptr->packets += sampled_number_of_packets;
2086                 conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
2087 
2088                 flow_counter.unlock();
2089             }
2090         } else if (current_packet.protocol == IPPROTO_UDP) {
2091 #ifdef USE_NEW_ATOMIC_BUILTINS
2092             __atomic_add_fetch(&current_element->udp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
2093             __atomic_add_fetch(&current_element->udp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
2094 #else
2095             __sync_fetch_and_add(&current_element->udp_in_packets, sampled_number_of_packets);
2096             __sync_fetch_and_add(&current_element->udp_in_bytes, sampled_number_of_bytes);
2097 #endif
2098 
2099             if (enable_conection_tracking) {
2100                 flow_counter.lock();
2101                 conntrack_key_struct* conntrack_key_struct_ptr =
2102                 &current_element_flow->in_udp[connection_tracking_hash];
2103 
2104                 conntrack_key_struct_ptr->packets += sampled_number_of_packets;
2105                 conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
2106                 flow_counter.unlock();
2107             }
2108         } else if (current_packet.protocol == IPPROTO_ICMP) {
2109 #ifdef USE_NEW_ATOMIC_BUILTINS
2110             __atomic_add_fetch(&current_element->icmp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
2111             __atomic_add_fetch(&current_element->icmp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
2112 #else
2113             __sync_fetch_and_add(&current_element->icmp_in_packets, sampled_number_of_packets);
2114             __sync_fetch_and_add(&current_element->icmp_in_bytes, sampled_number_of_bytes);
2115 #endif
2116             // no flow tracking for icmp
2117         } else {
2118             // TBD
2119         }
2120 
2121     } else if (packet_direction == INTERNAL) {
2122     }
2123 }
2124 
2125 #ifdef GEOIP
get_asn_for_ip(uint32_t ip)2126 unsigned int get_asn_for_ip(uint32_t ip) {
2127     char* asn_raw = GeoIP_org_by_name(geo_ip, convert_ip_as_uint_to_string(remote_ip).c_str());
2128     uint32_t asn_number = 0;
2129 
2130     if (asn_raw == NULL) {
2131         asn_number = 0;
2132     } else {
2133         // split string: AS1299 TeliaSonera International Carrier
2134         std::vector<std::string> asn_as_string;
2135         split(asn_as_string, asn_raw, boost::is_any_of(" "), boost::token_compress_on);
2136 
2137         // free up original string
2138         free(asn_raw);
2139 
2140         // extract raw number
2141         asn_number = convert_string_to_integer(asn_as_string[0].substr(2));
2142     }
2143 
2144     return asn_number;
2145 }
2146 #endif
2147 
2148 // It's vizualization thread :)
screen_draw_thread()2149 void screen_draw_thread() {
2150     // we need wait one second for calculating speed by recalculate_speed
2151 
2152     //#include <sys/prctl.h>
2153     // prctl(PR_SET_NAME , "fastnetmon calc thread", 0, 0, 0);
2154 
2155     // Sleep for a half second for shift against calculatiuon thread
2156     boost::this_thread::sleep(boost::posix_time::milliseconds(500));
2157 
2158     while (true) {
2159         // Available only from boost 1.54: boost::this_thread::sleep_for(
2160         // boost::chrono::seconds(check_period) );
2161         boost::this_thread::sleep(boost::posix_time::seconds(check_period));
2162         traffic_draw_program();
2163     }
2164 }
2165 
recalculate_speed_thread_handler()2166 void recalculate_speed_thread_handler() {
2167     while (true) {
2168         // recalculate data every one second
2169         // Available only from boost 1.54: boost::this_thread::sleep_for( boost::chrono::seconds(1)
2170         // );
2171         boost::this_thread::sleep(boost::posix_time::seconds(recalculate_speed_timeout));
2172         recalculate_speed();
2173     }
2174 }
2175 
2176 // Get ban settings for this subnet or return global ban settings
get_ban_settings_for_this_subnet(subnet_t subnet,std::string & host_group_name)2177 ban_settings_t get_ban_settings_for_this_subnet(subnet_t subnet, std::string& host_group_name) {
2178     // Try to find host group for this subnet
2179     subnet_to_host_group_map_t::iterator host_group_itr = subnet_to_host_groups.find( subnet );
2180 
2181     if (host_group_itr == subnet_to_host_groups.end()) {
2182         // We haven't host groups for all subnets, it's OK
2183         // logger << log4cpp::Priority::INFO << "We haven't custom host groups for this network. We will use global ban settings";
2184         host_group_name = "global";
2185         return global_ban_settings;
2186     }
2187 
2188     host_group_name = host_group_itr->second;
2189 
2190     // We found host group for this subnet
2191     host_group_ban_settings_map_t::iterator hostgroup_settings_itr =
2192         host_group_ban_settings_map.find(host_group_itr->second);
2193 
2194     if (hostgroup_settings_itr == host_group_ban_settings_map.end()) {
2195         logger << log4cpp::Priority::ERROR << "We can't find ban settings for host group " << host_group_itr->second;
2196         return global_ban_settings;
2197     }
2198 
2199     // We found ban settings for this host group and use they instead global
2200     return hostgroup_settings_itr->second;
2201 }
2202 
2203 /* Calculate speed for all connnections */
recalculate_speed()2204 void recalculate_speed() {
2205     // logger<< log4cpp::Priority::INFO<<"We run recalculate_speed";
2206 
2207     struct timeval start_calc_time;
2208     gettimeofday(&start_calc_time, NULL);
2209 
2210     double speed_calc_period = recalculate_speed_timeout;
2211     time_t start_time;
2212     time(&start_time);
2213 
2214     // If we got 1+ seconds lag we should use new "delta" or skip this step
2215     double time_difference = difftime(start_time, last_call_of_traffic_recalculation);
2216 
2217     if (time_difference < 1) {
2218         // It could occur on program start
2219         logger << log4cpp::Priority::INFO
2220                << "We skip one iteration of speed_calc because it runs so early!";
2221         return;
2222     } else if (int(time_difference) == int(speed_calc_period)) {
2223         // All fine, we run on time
2224     } else {
2225         logger << log4cpp::Priority::INFO
2226                << "Time from last run of speed_recalc is soooo big, we got ugly lags: " << time_difference;
2227         speed_calc_period = time_difference;
2228     }
2229 
2230     map_element zero_map_element;
2231     memset(&zero_map_element, 0, sizeof(zero_map_element));
2232 
2233     uint64_t incoming_total_flows = 0;
2234     uint64_t outgoing_total_flows = 0;
2235 
2236     if (enable_subnet_counters) {
2237         for (map_for_subnet_counters::iterator itr = PerSubnetSpeedMap.begin(); itr != PerSubnetSpeedMap.end(); ++itr) {
2238             subnet_t current_subnet = itr->first;
2239 
2240             map_for_subnet_counters::iterator iter_subnet = PerSubnetCountersMap.find(current_subnet);
2241 
2242             if (iter_subnet == PerSubnetCountersMap.end()) {
2243                 logger << log4cpp::Priority::INFO<<"Can't find traffic counters for subnet";
2244                 break;
2245             }
2246 
2247             subnet_counter_t* subnet_traffic = &iter_subnet->second;
2248 
2249             subnet_counter_t new_speed_element;
2250 
2251             new_speed_element.in_packets = uint64_t((double)subnet_traffic->in_packets / speed_calc_period);
2252             new_speed_element.in_bytes   = uint64_t((double)subnet_traffic->in_bytes   / speed_calc_period);
2253 
2254             new_speed_element.out_packets = uint64_t((double)subnet_traffic->out_packets / speed_calc_period);
2255             new_speed_element.out_bytes   = uint64_t((double)subnet_traffic->out_bytes   / speed_calc_period);
2256 
2257             /* Moving average recalculation for subnets */
2258             /* http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance */
2259             double exp_power_subnet = -speed_calc_period / average_calculation_amount_for_subnets;
2260             double exp_value_subnet = exp(exp_power_subnet);
2261 
2262             map_element* current_average_speed_element = &PerSubnetAverageSpeedMap[current_subnet];
2263 
2264             current_average_speed_element->in_bytes = uint64_t(new_speed_element.in_bytes +
2265                 exp_value_subnet * ((double)current_average_speed_element->in_bytes - (double)new_speed_element.in_bytes));
2266 
2267             current_average_speed_element->out_bytes = uint64_t(new_speed_element.out_bytes +
2268                 exp_value_subnet * ((double)current_average_speed_element->out_bytes - (double)new_speed_element.out_bytes));
2269 
2270             current_average_speed_element->in_packets = uint64_t(new_speed_element.in_packets +
2271                 exp_value_subnet * ((double)current_average_speed_element->in_packets - (double)new_speed_element.in_packets));
2272 
2273             current_average_speed_element->out_packets = uint64_t(new_speed_element.out_packets +
2274                 exp_value_subnet * ((double)current_average_speed_element->out_packets - (double)new_speed_element.out_packets));
2275 
2276             // Update speed calculation structure
2277             PerSubnetSpeedMap[current_subnet] = new_speed_element;
2278             *subnet_traffic = zero_map_element;
2279 
2280             //logger << log4cpp::Priority::INFO<<convert_subnet_to_string(current_subnet)
2281             //    << "in pps: " << new_speed_element.in_packets << " out pps: " << new_speed_element.out_packets;
2282         }
2283     }
2284 
2285     for (map_of_vector_counters::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) {
2286         for (vector_of_counters::iterator vector_itr = itr->second.begin();
2287              vector_itr != itr->second.end(); ++vector_itr) {
2288             int current_index = vector_itr - itr->second.begin();
2289 
2290             // New element
2291             map_element new_speed_element;
2292 
2293             // convert to host order for math operations
2294             uint32_t subnet_ip = ntohl(itr->first.first);
2295             uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
2296 
2297             // covnert to our standard network byte order
2298             uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
2299 
2300             // Calculate speed for IP or whole subnet
2301             build_speed_counters_from_packet_counters(new_speed_element, & *vector_itr, speed_calc_period);
2302 
2303             conntrack_main_struct* flow_counter_ptr = &SubnetVectorMapFlow[itr->first][current_index];
2304 
2305             if (enable_conection_tracking) {
2306                 // todo: optimize this operations!
2307                 // it's really bad and SLOW CODE
2308                 uint64_t total_out_flows =
2309                     (uint64_t)flow_counter_ptr->out_tcp.size() + (uint64_t)flow_counter_ptr->out_udp.size() +
2310                     (uint64_t)flow_counter_ptr->out_icmp.size() + (uint64_t)flow_counter_ptr->out_other.size();
2311 
2312                 uint64_t total_in_flows =
2313                     (uint64_t)flow_counter_ptr->in_tcp.size() + (uint64_t)flow_counter_ptr->in_udp.size() +
2314                     (uint64_t)flow_counter_ptr->in_icmp.size() + (uint64_t)flow_counter_ptr->in_other.size();
2315 
2316                 new_speed_element.out_flows = uint64_t((double)total_out_flows / speed_calc_period);
2317                 new_speed_element.in_flows = uint64_t((double)total_in_flows / speed_calc_period);
2318 
2319                 // Increment global counter
2320                 outgoing_total_flows += new_speed_element.out_flows;
2321                 incoming_total_flows += new_speed_element.in_flows;
2322             } else {
2323                 new_speed_element.out_flows = 0;
2324                 new_speed_element.in_flows = 0;
2325             }
2326 
2327             /* Moving average recalculation */
2328             // http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance
2329             // double speed_calc_period = 1;
2330             double exp_power = -speed_calc_period / average_calculation_amount;
2331             double exp_value = exp(exp_power);
2332 
2333             map_element* current_average_speed_element = &SubnetVectorMapSpeedAverage[itr->first][current_index];
2334 
2335             // Calculate average speed from per-second speed
2336             build_average_speed_counters_from_speed_counters(current_average_speed_element, new_speed_element, exp_value, exp_power);
2337 
2338             if (enable_conection_tracking) {
2339                 current_average_speed_element->out_flows = uint64_t(
2340                     new_speed_element.out_flows +
2341                     exp_value * ((double)current_average_speed_element->out_flows - (double)new_speed_element.out_flows));
2342 
2343                 current_average_speed_element->in_flows = uint64_t(
2344                     new_speed_element.in_flows +
2345                     exp_value * ((double)current_average_speed_element->in_flows - (double)new_speed_element.in_flows));
2346             }
2347 
2348             /* Moving average recalculation end */
2349             std::string host_group_name;
2350             ban_settings_t current_ban_settings = get_ban_settings_for_this_subnet(itr->first, host_group_name);
2351 
2352             if (we_should_ban_this_ip(current_average_speed_element, current_ban_settings)) {
2353                 logger << log4cpp::Priority::DEBUG << "We have found host group for this host as: " << host_group_name;
2354 
2355                 std::string flow_attack_details = "";
2356 
2357                 if (enable_conection_tracking) {
2358                     flow_attack_details =
2359                     print_flow_tracking_for_ip(*flow_counter_ptr, convert_ip_as_uint_to_string(client_ip));
2360                 }
2361 
2362                 // TODO: we should pass type of ddos ban source (pps, flowd, bandwidth)!
2363                 execute_ip_ban(client_ip, *current_average_speed_element, flow_attack_details, itr->first);
2364             }
2365 
2366             SubnetVectorMapSpeed[itr->first][current_index] = new_speed_element;
2367 
2368             *vector_itr = zero_map_element;
2369         }
2370     }
2371 
2372     // Calculate global flow speed
2373     incoming_total_flows_speed = uint64_t((double)incoming_total_flows / (double)speed_calc_period);
2374     outgoing_total_flows_speed = uint64_t((double)outgoing_total_flows / (double)speed_calc_period);
2375 
2376     if (enable_conection_tracking) {
2377         // Clean Flow Counter
2378         flow_counter.lock();
2379         zeroify_all_flow_counters();
2380         flow_counter.unlock();
2381     }
2382 
2383     total_unparsed_packets_speed = uint64_t((double)total_unparsed_packets / (double)speed_calc_period);
2384     total_unparsed_packets = 0;
2385 
2386     for (unsigned int index = 0; index < 4; index++) {
2387         total_speed_counters[index].bytes =
2388         uint64_t((double)total_counters[index].bytes / (double)speed_calc_period);
2389 
2390         total_speed_counters[index].packets =
2391         uint64_t((double)total_counters[index].packets / (double)speed_calc_period);
2392 
2393         double exp_power = -speed_calc_period / average_calculation_amount;
2394         double exp_value = exp(exp_power);
2395 
2396         total_speed_average_counters[index].bytes = uint64_t(total_speed_counters[index].bytes + exp_value *
2397             ((double) total_speed_average_counters[index].bytes - (double) total_speed_counters[index].bytes));
2398 
2399         total_speed_average_counters[index].packets = uint64_t(total_speed_counters[index].packets + exp_value *
2400             ((double) total_speed_average_counters[index].packets - (double) total_speed_counters[index].packets));
2401 
2402         // nullify data counters after speed calculation
2403         total_counters[index].bytes = 0;
2404         total_counters[index].packets = 0;
2405     }
2406 
2407     // Set time of previous startup
2408     time(&last_call_of_traffic_recalculation);
2409 
2410     struct timeval finish_calc_time;
2411     gettimeofday(&finish_calc_time, NULL);
2412 
2413     timeval_subtract(&speed_calculation_time, &finish_calc_time, &start_calc_time);
2414 }
2415 
print_screen_contents_into_file(std::string screen_data_stats_param)2416 void print_screen_contents_into_file(std::string screen_data_stats_param) {
2417     std::ofstream screen_data_file;
2418     screen_data_file.open(cli_stats_file_path.c_str(), std::ios::trunc);
2419 
2420     if (screen_data_file.is_open()) {
2421         // Set 660 permissions to file for security reasons
2422         chmod(cli_stats_file_path.c_str(), S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2423 
2424         screen_data_file << screen_data_stats_param;
2425         screen_data_file.close();
2426     } else {
2427         logger << log4cpp::Priority::ERROR << "Can't print program screen into file";
2428     }
2429 }
2430 
traffic_draw_program()2431 void traffic_draw_program() {
2432     std::stringstream output_buffer;
2433 
2434     // logger<<log4cpp::Priority::INFO<<"Draw table call";
2435 
2436     struct timeval start_calc_time;
2437     gettimeofday(&start_calc_time, NULL);
2438 
2439     sort_type sorter;
2440     if (sort_parameter == "packets") {
2441         sorter = PACKETS;
2442     } else if (sort_parameter == "bytes") {
2443         sorter = BYTES;
2444     } else if (sort_parameter == "flows") {
2445         sorter = FLOWS;
2446     } else {
2447         logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
2448         sorter = PACKETS;
2449     }
2450 
2451     output_buffer << "FastNetMon " << fastnetmon_version
2452                   << " Pavel Odintsov: fastnetmon.com"
2453                   << "\n"
2454                   << "IPs ordered by: " << sort_parameter << "\n";
2455 
2456     output_buffer << print_channel_speed("Incoming traffic", INCOMING) << std::endl;
2457 
2458     if (process_incoming_traffic) {
2459         output_buffer << draw_table(INCOMING, true, sorter);
2460         output_buffer << std::endl;
2461     }
2462 
2463     output_buffer << print_channel_speed("Outgoing traffic", OUTGOING) << std::endl;
2464 
2465     if (process_outgoing_traffic) {
2466         output_buffer << draw_table(OUTGOING, false, sorter);
2467         output_buffer << std::endl;
2468     }
2469 
2470     output_buffer << print_channel_speed("Internal traffic", INTERNAL) << std::endl;
2471 
2472     output_buffer << std::endl;
2473 
2474     output_buffer << print_channel_speed("Other traffic", OTHER) << std::endl;
2475 
2476     output_buffer << std::endl;
2477 
2478     // Application statistics
2479     output_buffer << "Screen updated in:\t\t" << drawing_thread_execution_time.tv_sec << " sec "
2480                   << drawing_thread_execution_time.tv_usec << " microseconds\n";
2481 
2482     output_buffer << "Traffic calculated in:\t\t" << speed_calculation_time.tv_sec << " sec "
2483                   << speed_calculation_time.tv_usec << " microseconds\n";
2484 
2485     if (speed_calculation_time.tv_sec > 0) {
2486         output_buffer << "ALERT! Toolkit working incorrectly! We should calculate speed in ~1 second\n";
2487     }
2488 
2489 #ifdef IPV6_HASH_COUNTERS
2490     output_buffer << "Total amount of IPv6 packets: " << total_ipv6_packets << "\n";
2491 #endif
2492 
2493     output_buffer << "Total amount of IPv6 packets related to our own network: " << our_ipv6_packets << "\n";
2494     output_buffer << "Not processed packets: " << total_unparsed_packets_speed << " pps\n";
2495 
2496     // Print backend stats
2497     if (enable_pcap_collection) {
2498         output_buffer << get_pcap_stats() << "\n";
2499     }
2500 
2501 #ifdef PF_RING
2502     if (enable_data_collection_from_mirror) {
2503         output_buffer << get_pf_ring_stats();
2504     }
2505 #endif
2506 
2507     // Print thresholds
2508     if (print_configuration_params_on_the_screen) {
2509         output_buffer << "\n" << print_ban_thresholds(global_ban_settings);
2510     }
2511 
2512     if (!ban_list.empty()) {
2513         output_buffer << std::endl << "Ban list:" << std::endl;
2514         output_buffer << print_ddos_attack_details();
2515     }
2516 
2517     if (enable_subnet_counters) {
2518         output_buffer << std::endl << "Subnet load:" << std::endl;
2519         output_buffer << print_subnet_load() << "\n";
2520     }
2521 
2522     screen_data_stats = output_buffer.str();
2523 
2524     // Print screen contents into file
2525     print_screen_contents_into_file(screen_data_stats);
2526 
2527     struct timeval end_calc_time;
2528     gettimeofday(&end_calc_time, NULL);
2529 
2530     timeval_subtract(&drawing_thread_execution_time, &end_calc_time, &start_calc_time);
2531 }
2532 
2533 // pretty print channel speed in pps and MBit
print_channel_speed(std::string traffic_type,direction packet_direction)2534 std::string print_channel_speed(std::string traffic_type, direction packet_direction) {
2535     uint64_t speed_in_pps = total_speed_average_counters[packet_direction].packets;
2536     uint64_t speed_in_bps = total_speed_average_counters[packet_direction].bytes;
2537 
2538     unsigned int number_of_tabs = 1;
2539     // We need this for correct alignment of blocks
2540     if (traffic_type == "Other traffic") {
2541         number_of_tabs = 2;
2542     }
2543 
2544     std::stringstream stream;
2545     stream << traffic_type;
2546 
2547     for (unsigned int i = 0; i < number_of_tabs; i++) {
2548         stream << "\t";
2549     }
2550 
2551     uint64_t speed_in_mbps = convert_speed_to_mbps(speed_in_bps);
2552 
2553     stream << std::setw(6) << speed_in_pps << " pps " << std::setw(6) << speed_in_mbps << " mbps";
2554 
2555     if (traffic_type == "Incoming traffic" or traffic_type == "Outgoing traffic") {
2556         if (packet_direction == INCOMING) {
2557             stream << " " << std::setw(6) << incoming_total_flows_speed << " flows";
2558         } else if (packet_direction == OUTGOING) {
2559             stream << " " << std::setw(6) << outgoing_total_flows_speed << " flows";
2560         }
2561 
2562         if (graphite_enabled) {
2563             graphite_data_t graphite_data;
2564 
2565             std::string direction_as_string;
2566 
2567             if (packet_direction == INCOMING) {
2568                 direction_as_string = "incoming";
2569 
2570                 graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] = incoming_total_flows_speed;
2571             } else if (packet_direction == OUTGOING) {
2572                 direction_as_string = "outgoing";
2573 
2574                 graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] = outgoing_total_flows_speed;
2575             }
2576 
2577             graphite_data[graphite_prefix + ".total." + direction_as_string + ".pps"] = speed_in_pps;
2578             graphite_data[graphite_prefix + ".total." + direction_as_string + ".bps"] = speed_in_bps * 8;
2579 
2580             bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
2581 
2582             if (!graphite_put_result) {
2583                 logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
2584             }
2585         }
2586     }
2587 
2588     return stream.str();
2589 }
2590 
file_is_appendable(std::string path)2591 bool file_is_appendable(std::string path) {
2592     std::ofstream check_appendable_file;
2593 
2594     check_appendable_file.open(path.c_str(), std::ios::app);
2595 
2596     if (check_appendable_file.is_open()) {
2597         // all fine, just close file
2598         check_appendable_file.close();
2599 
2600         return true;
2601     } else {
2602         return false;
2603     }
2604 }
2605 
init_logging()2606 void init_logging() {
2607     // So log4cpp will never notify you if it could not write to log file due to permissions issues
2608     // We will check it manually
2609 
2610     if (!file_is_appendable(log_file_path)) {
2611         std::cerr << "Can't open log file " << log_file_path << " for writing! Please check file and folder permissions" << std::endl;
2612         exit(EXIT_FAILURE);
2613     }
2614 
2615     log4cpp::PatternLayout* layout = new log4cpp::PatternLayout();
2616     layout->setConversionPattern("%d [%p] %m%n");
2617 
2618     log4cpp::Appender* appender = new log4cpp::FileAppender("default", log_file_path);
2619     appender->setLayout(layout);
2620 
2621     logger.setPriority(log4cpp::Priority::INFO);
2622     logger.addAppender(appender);
2623 
2624     logger << log4cpp::Priority::INFO << "Logger initialized!";
2625 }
2626 
reconfigure_logging()2627 void reconfigure_logging() {
2628     log4cpp::PatternLayout* layout = new log4cpp::PatternLayout();
2629     layout->setConversionPattern("[%p] %m%n");
2630 
2631     if (logging_configuration.local_syslog_logging) {
2632         log4cpp::Appender* local_syslog_appender = new log4cpp::SyslogAppender("fastnetmon", "fastnetmon", LOG_USER);
2633 	local_syslog_appender->setLayout(layout);
2634         logger.addAppender(local_syslog_appender);
2635 
2636         logger << log4cpp::Priority::INFO << "We start local syslog logging corectly";
2637     }
2638 
2639     if (logging_configuration.remote_syslog_logging) {
2640         log4cpp::Appender* remote_syslog_appender = new log4cpp::RemoteSyslogAppender(
2641             "fastnetmon", "fastnetmon", logging_configuration.remote_syslog_server, LOG_USER, logging_configuration.remote_syslog_port);
2642 
2643 	remote_syslog_appender->setLayout(layout);
2644         logger.addAppender(remote_syslog_appender);
2645 
2646         logger << log4cpp::Priority::INFO << "We start remote syslog logging corectly";
2647     }
2648 }
2649 
2650 // Call fork function
do_fork()2651 int do_fork() {
2652     int status = 0;
2653 
2654     switch (fork()) {
2655     case 0:
2656         // It's child
2657         break;
2658     case -1:
2659         /* fork failed */
2660         status = -1;
2661         break;
2662     default:
2663         // We should close master process with _exit(0)
2664         // We should not call exit() because it will destroy all global variables for program
2665         _exit(0);
2666     }
2667 
2668     return status;
2669 }
2670 
2671 
redirect_fds()2672 void redirect_fds() {
2673     // Close stdin, stdout and stderr
2674     close(0);
2675     close(1);
2676     close(2);
2677 
2678     if (open("/dev/null", O_RDWR) != 0) {
2679         // We can't notify anybody now
2680         exit(1);
2681     }
2682 
2683     // Create copy of zero decriptor for 1 and 2 fd's
2684     // We do not need return codes here but we need do it for suppressing complaints from compiler
2685     int first_dup_result  = dup(0);
2686     int second_dup_result = dup(0);
2687 }
2688 
main(int argc,char ** argv)2689 int main(int argc, char** argv) {
2690     bool daemonize = false;
2691 
2692     namespace po = boost::program_options;
2693 
2694     try {
2695         po::options_description desc("Allowed options");
2696         desc.add_options()
2697             ("help", "produce help message")
2698             ("version", "show version")
2699             ("daemonize", "detach from the terminal")
2700             ("configuration_file", po::value<std::string>(), "set path to custom configuration file")
2701             ("log_file", po::value<std::string>(), "set path to custom log file")
2702         ;
2703 
2704         po::variables_map vm;
2705         po::store(po::parse_command_line(argc, argv, desc), vm);
2706         po::notify(vm);
2707 
2708         if (vm.count("help")) {
2709             std::cout << desc << std::endl;
2710             exit(EXIT_SUCCESS);
2711         }
2712 
2713         if (vm.count("version")) {
2714             std::cout << "Version: " << fastnetmon_version << std::endl;
2715             exit(EXIT_SUCCESS);
2716         }
2717 
2718         if (vm.count("daemonize")) {
2719             daemonize = true;
2720         }
2721 
2722         if (vm.count("configuration_file")) {
2723             global_config_path = vm["configuration_file"].as<std::string>();
2724             std::cout << "We will use custom path to configuration file: " << global_config_path << std::endl;
2725         }
2726 
2727         if (vm.count("log_file")) {
2728             log_file_path = vm["log_file"].as<std::string>();
2729             std::cout << "We will use custom path to log file: " << log_file_path << std::endl;
2730         }
2731     } catch (po::error& e) {
2732         std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
2733         exit(EXIT_FAILURE);
2734     }
2735 
2736     // We use ideas from here https://github.com/bmc/daemonize/blob/master/daemon.c
2737 
2738     if (daemonize) {
2739         int status = 0;
2740 
2741         printf("We will run in daemonized mode\n");
2742 
2743         if ((status = do_fork()) < 0) {
2744             // fork failed
2745             status = -1;
2746         } else if (setsid() < 0) {
2747             // Create new session
2748             status = -1;
2749         } else if ((status = do_fork()) < 0) {
2750             status = -1;
2751         } else {
2752             // Clear inherited umask
2753             umask(0);
2754 
2755             // Chdir to root
2756             int chdir_result = chdir("/");
2757 
2758             // close all descriptors because we are daemon!
2759             redirect_fds();
2760         }
2761     }
2762 
2763     // enable core dumps
2764     enable_core_dumps();
2765 
2766     init_logging();
2767 
2768 #ifdef FASTNETMON_API
2769     gpr_set_log_function(silent_logging_function);
2770 #endif
2771 
2772     // Set default ban configuration
2773     init_global_ban_settings();
2774 
2775     // We should read configurartion file _after_ logging initialization
2776     bool load_config_result = load_configuration_file();
2777 
2778     if (!load_config_result) {
2779         std::cerr << "Can't open config file " << global_config_path << " please create it!" << std::endl;
2780         exit(1);
2781     }
2782 
2783     if (file_exists(pid_path)) {
2784         pid_t pid_from_file = 0;
2785 
2786         if (read_pid_from_file(pid_from_file, pid_path)) {
2787             // We could read pid
2788             if (pid_from_file > 0) {
2789                 // We use signal zero for check process existence
2790                 int kill_result = kill(pid_from_file, 0);
2791 
2792                 if (kill_result == 0) {
2793                     logger << log4cpp::Priority::ERROR
2794                            << "FastNetMon is already running with pid: " << pid_from_file;
2795                     exit(1);
2796                 } else {
2797                     // Yes, we have pid with pid but it's zero
2798                 }
2799             } else {
2800                 // pid from file is broken, we assume tool is not running
2801             }
2802         } else {
2803             // We can't open file, let's assume it's broken and tool is not running
2804         }
2805     } else {
2806         // no pid file
2807     }
2808 
2809     // If we not failed in check steps we could run toolkit
2810     bool print_pid_to_file_result = print_pid_to_file(getpid(), pid_path);
2811 
2812     if (!print_pid_to_file_result) {
2813         logger << log4cpp::Priority::ERROR << "Could not create pid file, please check permissions: " << pid_path;
2814         exit(EXIT_FAILURE);
2815     }
2816 
2817 #ifdef ENABLE_DPI
2818     init_current_instance_of_ndpi();
2819 #endif
2820 
2821     lookup_tree_ipv4 = New_Patricia(32);
2822     whitelist_tree_ipv4 = New_Patricia(32);
2823 
2824     lookup_tree_ipv6 = New_Patricia(128);
2825     whitelist_tree_ipv6 = New_Patricia(128);
2826 
2827     // nullify total counters
2828     for (int index = 0; index < 4; index++) {
2829         total_counters[index].bytes = 0;
2830         total_counters[index].packets = 0;
2831 
2832         total_speed_counters[index].bytes = 0;
2833         total_speed_counters[index].packets = 0;
2834 
2835         total_speed_average_counters[index].bytes = 0;
2836         total_speed_average_counters[index].packets = 0;
2837     }
2838 
2839     /* Create folder for attack details */
2840     if (!folder_exists(attack_details_folder)) {
2841         int mkdir_result = mkdir(attack_details_folder.c_str(), S_IRWXU);
2842 
2843         if (mkdir_result != 0) {
2844             logger << log4cpp::Priority::ERROR << "Can't create folder for attack details: " << attack_details_folder;
2845             exit(1);
2846         }
2847     }
2848 
2849     if (getenv("DUMP_ALL_PACKETS") != NULL) {
2850         DEBUG_DUMP_ALL_PACKETS = true;
2851     }
2852 
2853     if (getenv("DUMP_OTHER_PACKETS") != NULL) {
2854         DEBUG_DUMP_OTHER_PACKETS = true;
2855     }
2856 
2857     if (sizeof(packed_conntrack_hash) != sizeof(uint64_t) or sizeof(packed_conntrack_hash) != 8) {
2858         logger << log4cpp::Priority::INFO << "Assertion about size of packed_conntrack_hash, it's "
2859                << sizeof(packed_conntrack_hash) << " instead 8";
2860         exit(1);
2861     }
2862 
2863     logger << log4cpp::Priority::INFO << "Read configuration file";
2864 
2865     // Reconfigure logging. We will enable specific logging methods here
2866     reconfigure_logging();
2867 
2868     load_our_networks_list();
2869 
2870     // Setup CTRL+C handler
2871     if (signal(SIGINT, interruption_signal_handler) == SIG_ERR) {
2872         logger << log4cpp::Priority::ERROR << "Can't setup SIGINT handler";
2873         exit(1);
2874     }
2875 
2876     /* Without this SIGPIPE error could shutdown toolkit on call of exec_with_stdin_params */
2877     if (signal(SIGPIPE, sigpipe_handler_for_popen) == SIG_ERR) {
2878         logger << log4cpp::Priority::ERROR << "Can't setup SIGPIPE handler";
2879         exit(1);
2880     }
2881 
2882 #ifdef GEOIP
2883     // Init GeoIP
2884     if (!geoip_init()) {
2885         logger << log4cpp::Priority::ERROR << "Can't load geoip tables";
2886         exit(1);
2887     }
2888 #endif
2889     // Init previous run date
2890     time(&last_call_of_traffic_recalculation);
2891 
2892     // We call init for each action
2893 #ifdef ENABLE_GOBGP
2894     if (gobgp_enabled) {
2895         gobgp_action_init();
2896     }
2897 #endif
2898 
2899 #ifdef IPV6_HASH_COUNTERS
2900     service_thread_group.add_thread(new boost::thread(ipv6_traffic_processor));
2901 #endif
2902 
2903 #ifdef FASTNETMON_API
2904     if (enable_api) {
2905         service_thread_group.add_thread(new boost::thread(RunApiServer));
2906     }
2907 #endif
2908 
2909     // Run screen draw thread
2910     service_thread_group.add_thread(new boost::thread(screen_draw_thread));
2911 
2912     // start thread for recalculating speed in realtime
2913     service_thread_group.add_thread(new boost::thread(recalculate_speed_thread_handler));
2914 
2915     // Run banlist cleaner thread
2916     if (unban_enabled) {
2917         service_thread_group.add_thread(new boost::thread(cleanup_ban_list));
2918     }
2919 
2920 #ifdef PF_RING
2921     if (enable_data_collection_from_mirror) {
2922         packet_capture_plugin_thread_group.add_thread(new boost::thread(start_pfring_collection, process_packet));
2923     }
2924 #endif
2925 
2926 #ifdef NETMAP_PLUGIN
2927     // netmap processing
2928     if (enable_netmap_collection) {
2929         packet_capture_plugin_thread_group.add_thread(new boost::thread(start_netmap_collection, process_packet));
2930     }
2931 #endif
2932 
2933 #ifdef SNABB_SWITCH
2934     if (enable_snabbswitch_collection) {
2935         packet_capture_plugin_thread_group.add_thread(new boost::thread(start_snabbswitch_collection, process_packet));
2936     }
2937 #endif
2938 
2939 #ifdef FASTNETMON_ENABLE_AFPACKET
2940     if (enable_afpacket_collection) {
2941         packet_capture_plugin_thread_group.add_thread(new boost::thread(start_afpacket_collection, process_packet));
2942     }
2943 #endif
2944 
2945     if (enable_sflow_collection) {
2946         packet_capture_plugin_thread_group.add_thread(new boost::thread(start_sflow_collection, process_packet));
2947     }
2948 
2949     if (enable_netflow_collection) {
2950         packet_capture_plugin_thread_group.add_thread(new boost::thread(start_netflow_collection, process_packet));
2951     }
2952 
2953     if (enable_pcap_collection) {
2954         packet_capture_plugin_thread_group.add_thread(new boost::thread(start_pcap_collection, process_packet));
2955     }
2956 
2957     // Wait for all threads in capture thread group
2958     packet_capture_plugin_thread_group.join_all();
2959 
2960     // Wait for all service threads
2961     service_thread_group.join_all();
2962 
2963     free_up_all_resources();
2964 
2965     return 0;
2966 }
2967 
free_up_all_resources()2968 void free_up_all_resources() {
2969 #ifdef GEOIP
2970     // Free up geoip handle
2971     GeoIP_delete(geo_ip);
2972 #endif
2973 
2974     Destroy_Patricia(lookup_tree_ipv4, (void_fn_t)0);
2975     Destroy_Patricia(whitelist_tree_ipv4, (void_fn_t)0);
2976 
2977     Destroy_Patricia(lookup_tree_ipv6, (void_fn_t)0);
2978     Destroy_Patricia(whitelist_tree_ipv6, (void_fn_t)0);
2979 }
2980 
2981 // For correct program shutdown by CTRL+C
interruption_signal_handler(int signal_number)2982 void interruption_signal_handler(int signal_number) {
2983 
2984     logger << log4cpp::Priority::INFO << "SIGNAL captured, prepare toolkit shutdown";
2985 
2986 #ifdef FASTNETMON_API
2987     logger << log4cpp::Priority::INFO << "Send shutdown command to API server";
2988     api_server->Shutdown();
2989 #endif
2990 
2991     logger << log4cpp::Priority::INFO << "Interrupt service threads";
2992     service_thread_group.interrupt_all();
2993 
2994     logger << log4cpp::Priority::INFO << "Wait while they finished";
2995     service_thread_group.join_all();
2996 
2997     logger << log4cpp::Priority::INFO << "Interrupt packet capture treads";
2998     packet_capture_plugin_thread_group.interrupt_all();
2999 
3000     logger << log4cpp::Priority::INFO << "Wait while they finished";
3001     packet_capture_plugin_thread_group.join_all();
3002 
3003     logger << log4cpp::Priority::INFO << "Shutdown main process";
3004 
3005     // TODO: we should REMOVE this exit command and wait for correct toolkit shutdown
3006     exit(1);
3007 }
3008 
detect_attack_protocol(map_element & speed_element,direction attack_direction)3009 unsigned int detect_attack_protocol(map_element& speed_element, direction attack_direction) {
3010     if (attack_direction == INCOMING) {
3011         return get_max_used_protocol(speed_element.tcp_in_packets, speed_element.udp_in_packets,
3012                                      speed_element.icmp_in_packets);
3013     } else {
3014         // OUTGOING
3015         return get_max_used_protocol(speed_element.tcp_out_packets, speed_element.udp_out_packets,
3016                                      speed_element.icmp_out_packets);
3017     }
3018 }
3019 
3020 #define my_max_on_defines(a, b) (a > b ? a : b)
get_max_used_protocol(uint64_t tcp,uint64_t udp,uint64_t icmp)3021 unsigned int get_max_used_protocol(uint64_t tcp, uint64_t udp, uint64_t icmp) {
3022     unsigned int max = my_max_on_defines(my_max_on_defines(udp, tcp), icmp);
3023 
3024     if (max == tcp) {
3025         return IPPROTO_TCP;
3026     } else if (max == udp) {
3027         return IPPROTO_UDP;
3028     } else if (max == icmp) {
3029         return IPPROTO_ICMP;
3030     }
3031 
3032     return 0;
3033 }
3034 
exabgp_ban_manage(std::string action,std::string ip_as_string,attack_details current_attack)3035 void exabgp_ban_manage(std::string action, std::string ip_as_string, attack_details current_attack) {
3036     // We will announce whole subent here
3037     if (exabgp_announce_whole_subnet) {
3038         std::string subnet_as_string_with_mask = convert_subnet_to_string(current_attack.customer_network);
3039 
3040         exabgp_prefix_ban_manage(action, subnet_as_string_with_mask, exabgp_next_hop, exabgp_community_subnet);
3041     }
3042 
3043     // And we could announce single host here (/32)
3044     if (exabgp_announce_host) {
3045         std::string ip_as_string_with_mask = ip_as_string + "/32";
3046 
3047         exabgp_prefix_ban_manage(action, ip_as_string_with_mask, exabgp_next_hop, exabgp_community_host);
3048     }
3049 }
3050 
3051 // Low level ExaBGP ban management
exabgp_prefix_ban_manage(std::string action,std::string prefix_as_string_with_mask,std::string exabgp_next_hop,std::string exabgp_community)3052 void exabgp_prefix_ban_manage(std::string action, std::string prefix_as_string_with_mask,
3053     std::string exabgp_next_hop, std::string exabgp_community) {
3054 
3055     /* Buffer for BGP message */
3056     char bgp_message[256];
3057 
3058     if (action == "ban") {
3059         sprintf(bgp_message, "announce route %s next-hop %s community %s\n",
3060             prefix_as_string_with_mask.c_str(), exabgp_next_hop.c_str(), exabgp_community.c_str());
3061     } else {
3062         sprintf(bgp_message, "withdraw route %s next-hop %s\n", prefix_as_string_with_mask.c_str(), exabgp_next_hop.c_str());
3063     }
3064 
3065     logger << log4cpp::Priority::INFO << "ExaBGP announce message: " << bgp_message;
3066 
3067     int exabgp_pipe = open(exabgp_command_pipe.c_str(), O_WRONLY);
3068 
3069     if (exabgp_pipe <= 0) {
3070         logger << log4cpp::Priority::ERROR << "Can't open ExaBGP pipe " << exabgp_command_pipe
3071                << " Ban is not executed";
3072         return;
3073     }
3074 
3075     int wrote_bytes = write(exabgp_pipe, bgp_message, strlen(bgp_message));
3076 
3077     if (wrote_bytes != strlen(bgp_message)) {
3078         logger << log4cpp::Priority::ERROR << "Can't write message to ExaBGP pipe";
3079     }
3080 
3081     close(exabgp_pipe);
3082 }
3083 
exabgp_flow_spec_ban_manage(std::string action,std::string flow_spec_rule_as_text)3084 bool exabgp_flow_spec_ban_manage(std::string action, std::string flow_spec_rule_as_text) {
3085     std::string announce_action;
3086 
3087     if (action == "ban") {
3088         announce_action = "announce";
3089     } else {
3090         announce_action = "withdraw";
3091     }
3092 
3093     // Trailing \n is very important!
3094     std::string bgp_message = announce_action + " " + flow_spec_rule_as_text + "\n";
3095 
3096     int exabgp_pipe = open(exabgp_command_pipe.c_str(), O_WRONLY);
3097 
3098     if (exabgp_pipe <= 0) {
3099         logger << log4cpp::Priority::ERROR << "Can't open ExaBGP pipe for flow spec announce " << exabgp_command_pipe;
3100         return false;
3101     }
3102 
3103     int wrote_bytes = write(exabgp_pipe, bgp_message.c_str(), bgp_message.size());
3104 
3105     if (wrote_bytes != bgp_message.size()) {
3106         logger << log4cpp::Priority::ERROR << "Can't write message to ExaBGP pipe";
3107         return false;
3108     }
3109 
3110     close(exabgp_pipe);
3111     return true;
3112 }
3113 
execute_ip_ban(uint32_t client_ip,map_element average_speed_element,std::string flow_attack_details,subnet_t customer_subnet)3114 void execute_ip_ban(uint32_t client_ip, map_element average_speed_element, std::string flow_attack_details, subnet_t customer_subnet) {
3115     struct attack_details current_attack;
3116     uint64_t pps = 0;
3117 
3118     uint64_t in_pps = average_speed_element.in_packets;
3119     uint64_t out_pps = average_speed_element.out_packets;
3120     uint64_t in_bps = average_speed_element.in_bytes;
3121     uint64_t out_bps = average_speed_element.out_bytes;
3122     uint64_t in_flows = average_speed_element.in_flows;
3123     uint64_t out_flows = average_speed_element.out_flows;
3124 
3125     direction data_direction;
3126 
3127     if (!global_ban_settings.enable_ban) {
3128         logger << log4cpp::Priority::INFO << "We do not ban: " << convert_ip_as_uint_to_string(client_ip)
3129                << " because ban disabled completely";
3130         return;
3131     }
3132 
3133     // Detect attack direction with simple heuristic
3134     if (abs(int((int)in_pps - (int)out_pps)) < 1000) {
3135         // If difference between pps speed is so small we should do additional investigation using
3136         // bandwidth speed
3137         if (in_bps > out_bps) {
3138             data_direction = INCOMING;
3139             pps = in_pps;
3140         } else {
3141             data_direction = OUTGOING;
3142             pps = out_pps;
3143         }
3144     } else {
3145         if (in_pps > out_pps) {
3146             data_direction = INCOMING;
3147             pps = in_pps;
3148         } else {
3149             data_direction = OUTGOING;
3150             pps = out_pps;
3151         }
3152     }
3153 
3154     current_attack.attack_protocol = detect_attack_protocol(average_speed_element, data_direction);
3155 
3156     if (ban_list.count(client_ip) > 0) {
3157         if (ban_list[client_ip].attack_direction != data_direction) {
3158             logger << log4cpp::Priority::INFO
3159                    << "We expected very strange situation: attack direction for "
3160                    << convert_ip_as_uint_to_string(client_ip) << " was changed";
3161 
3162             return;
3163         }
3164 
3165         // update attack power
3166         if (pps > ban_list[client_ip].max_attack_power) {
3167             ban_list[client_ip].max_attack_power = pps;
3168         }
3169 
3170         return;
3171     }
3172 
3173     prefix_t prefix_for_check_adreess;
3174     prefix_for_check_adreess.add.sin.s_addr = client_ip;
3175     prefix_for_check_adreess.family = AF_INET;
3176     prefix_for_check_adreess.bitlen = 32;
3177 
3178     bool in_white_list = (patricia_search_best2(whitelist_tree_ipv4, &prefix_for_check_adreess, 1) != NULL);
3179 
3180     if (in_white_list) {
3181         return;
3182     }
3183 
3184     std::string data_direction_as_string = get_direction_name(data_direction);
3185 
3186     logger << log4cpp::Priority::INFO << "We run execute_ip_ban code with following params "
3187         << " in_pps: "  << in_pps
3188         << " out_pps: " << out_pps
3189         << " in_bps: "  << in_bps
3190         << " out_bps: " << out_bps
3191         << " and we decide it's " << data_direction_as_string << " attack";
3192 
3193     std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
3194     std::string pps_as_string = convert_int_to_string(pps);
3195 
3196     // Store information about subnet
3197     current_attack.customer_network = customer_subnet;
3198 
3199     // Store ban time
3200     time(&current_attack.ban_timestamp);
3201     // set ban time in seconds
3202     current_attack.ban_time = global_ban_time;
3203     current_attack.unban_enabled = unban_enabled;
3204 
3205     // Pass main information about attack
3206     current_attack.attack_direction = data_direction;
3207     current_attack.attack_power = pps;
3208     current_attack.max_attack_power = pps;
3209 
3210     current_attack.in_packets = in_pps;
3211     current_attack.out_packets = out_pps;
3212 
3213     current_attack.in_bytes = in_bps;
3214     current_attack.out_bytes = out_bps;
3215 
3216     // pass flow information
3217     current_attack.in_flows = in_flows;
3218     current_attack.out_flows = out_flows;
3219 
3220     current_attack.fragmented_in_packets = average_speed_element.fragmented_in_packets;
3221     current_attack.tcp_in_packets = average_speed_element.tcp_in_packets;
3222     current_attack.tcp_syn_in_packets = average_speed_element.tcp_syn_in_packets;
3223     current_attack.udp_in_packets = average_speed_element.udp_in_packets;
3224     current_attack.icmp_in_packets = average_speed_element.icmp_in_packets;
3225 
3226     current_attack.fragmented_out_packets = average_speed_element.fragmented_out_packets;
3227     current_attack.tcp_out_packets = average_speed_element.tcp_out_packets;
3228     current_attack.tcp_syn_out_packets = average_speed_element.tcp_syn_out_packets;
3229     current_attack.udp_out_packets = average_speed_element.udp_out_packets;
3230     current_attack.icmp_out_packets = average_speed_element.icmp_out_packets;
3231 
3232     current_attack.fragmented_out_bytes = average_speed_element.fragmented_out_bytes;
3233     current_attack.tcp_out_bytes = average_speed_element.tcp_out_bytes;
3234     current_attack.tcp_syn_out_bytes = average_speed_element.tcp_syn_out_bytes;
3235     current_attack.udp_out_bytes = average_speed_element.udp_out_bytes;
3236     current_attack.icmp_out_bytes = average_speed_element.icmp_out_bytes;
3237 
3238     current_attack.fragmented_in_bytes = average_speed_element.fragmented_in_bytes;
3239     current_attack.tcp_in_bytes = average_speed_element.tcp_in_bytes;
3240     current_attack.tcp_syn_in_bytes = average_speed_element.tcp_syn_in_bytes;
3241     current_attack.udp_in_bytes = average_speed_element.udp_in_bytes;
3242     current_attack.icmp_in_bytes = average_speed_element.icmp_in_bytes;
3243 
3244     current_attack.average_in_packets = average_speed_element.in_packets;
3245     current_attack.average_in_bytes = average_speed_element.in_bytes;
3246     current_attack.average_in_flows = average_speed_element.in_flows;
3247 
3248     current_attack.average_out_packets = average_speed_element.out_packets;
3249     current_attack.average_out_bytes = average_speed_element.out_bytes;
3250     current_attack.average_out_flows = average_speed_element.out_flows;
3251 
3252     if (collect_attack_pcap_dumps) {
3253         bool buffer_allocation_result = current_attack.pcap_attack_dump.allocate_buffer( number_of_packets_for_pcap_attack_dump );
3254 
3255         if (!buffer_allocation_result) {
3256             logger << log4cpp::Priority::ERROR << "Can't allocate buffer for attack, switch off this option completely ";
3257             collect_attack_pcap_dumps = false;
3258         }
3259 
3260     }
3261 
3262     ban_list_mutex.lock();
3263     ban_list[client_ip] = current_attack;
3264     ban_list_mutex.unlock();
3265 
3266     ban_list_details_mutex.lock();
3267     ban_list_details[client_ip] = std::vector<simple_packet>();
3268     ban_list_details_mutex.unlock();
3269 
3270     logger << log4cpp::Priority::INFO << "Attack with direction: " << data_direction_as_string
3271            << " IP: " << client_ip_as_string << " Power: " << pps_as_string;
3272 
3273     call_ban_handlers(client_ip, ban_list[client_ip], flow_attack_details);
3274 }
3275 
call_ban_handlers(uint32_t client_ip,attack_details & current_attack,std::string flow_attack_details)3276 void call_ban_handlers(uint32_t client_ip, attack_details& current_attack, std::string flow_attack_details) {
3277     std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
3278     std::string pps_as_string = convert_int_to_string(current_attack.attack_power);
3279     std::string data_direction_as_string = get_direction_name(current_attack.attack_direction);
3280 
3281     bool store_attack_details_to_file = true;
3282 
3283     std::string basic_attack_information = get_attack_description(client_ip, current_attack);
3284 
3285     std::string basic_attack_information_in_json = get_attack_description_in_json(client_ip, current_attack);
3286 
3287     std::string full_attack_description = basic_attack_information + flow_attack_details;
3288 
3289     if (store_attack_details_to_file) {
3290         print_attack_details_to_file(full_attack_description, client_ip_as_string, current_attack);
3291     }
3292 
3293     if (pfring_hardware_filters_enabled) {
3294 #ifdef PF_RING
3295         logger << log4cpp::Priority::INFO
3296             << "We will block traffic to/from this IP with hardware filters";
3297         pfring_hardware_filter_action_block(client_ip_as_string);
3298 #else
3299         logger << log4cpp::Priority::ERROR << "You haven't compiled PF_RING hardware filters support";
3300 #endif
3301     }
3302 
3303     if (notify_script_enabled) {
3304         std::string script_call_params = notify_script_path + " " + client_ip_as_string + " " +
3305                                          data_direction_as_string + " " + pps_as_string +
3306                                          " " + "ban";
3307         logger << log4cpp::Priority::INFO << "Call script for ban client: " << client_ip_as_string;
3308 
3309         // We should execute external script in separate thread because any lag in this code will be
3310         // very distructive
3311 
3312         if (notify_script_pass_details) {
3313             // We will pass attack details over stdin
3314             boost::thread exec_thread(exec_with_stdin_params, script_call_params, full_attack_description);
3315             exec_thread.detach();
3316         } else {
3317             // Do not pass anything to script
3318             boost::thread exec_thread(exec, script_call_params);
3319             exec_thread.detach();
3320         }
3321 
3322         logger << log4cpp::Priority::INFO << "Script for ban client is finished: " << client_ip_as_string;
3323     }
3324 
3325     if (exabgp_enabled) {
3326         logger << log4cpp::Priority::INFO << "Call ExaBGP for ban client started: " << client_ip_as_string;
3327 
3328         boost::thread exabgp_thread(exabgp_ban_manage, "ban", client_ip_as_string, current_attack);
3329         exabgp_thread.detach();
3330 
3331         logger << log4cpp::Priority::INFO << "Call to ExaBGP for ban client is finished: " << client_ip_as_string;
3332     }
3333 
3334 #ifdef ENABLE_GOBGP
3335     if (gobgp_enabled) {
3336         logger << log4cpp::Priority::INFO << "Call GoBGP for ban client started: " << client_ip_as_string;
3337 
3338         boost::thread gobgp_thread(gobgp_ban_manage, "ban", client_ip_as_string, current_attack);
3339         gobgp_thread.detach();
3340 
3341         logger << log4cpp::Priority::INFO << "Call to GoBGP for ban client is finished: " << client_ip_as_string;
3342     }
3343 #endif
3344 
3345 #ifdef REDIS
3346     if (redis_enabled) {
3347         std::string redis_key_name = client_ip_as_string + "_information";
3348 
3349         if (!redis_prefix.empty()) {
3350             redis_key_name = redis_prefix + "_" + client_ip_as_string + "_information";
3351         }
3352 
3353         logger << log4cpp::Priority::INFO << "Start data save in Redis in key: " << redis_key_name;
3354         boost::thread redis_store_thread(store_data_in_redis, redis_key_name, basic_attack_information_in_json);
3355         redis_store_thread.detach();
3356         logger << log4cpp::Priority::INFO << "Finish data save in Redis in key: " << redis_key_name;
3357 
3358         // If we have flow dump put in redis too
3359         if (!flow_attack_details.empty()) {
3360             std::string redis_key_name = client_ip_as_string + "_flow_dump";
3361 
3362             if (!redis_prefix.empty()) {
3363                 redis_key_name = redis_prefix + "_" + client_ip_as_string + "_flow_dump";
3364             }
3365 
3366             logger << log4cpp::Priority::INFO << "Start data save in redis in key: " << redis_key_name;
3367             boost::thread redis_store_thread(store_data_in_redis, redis_key_name, flow_attack_details);
3368             redis_store_thread.detach();
3369             logger << log4cpp::Priority::INFO << "Finish data save in redis in key: " << redis_key_name;
3370         }
3371     }
3372 #endif
3373 
3374 #ifdef MONGO
3375     if (mongodb_enabled) {
3376         std::string mongo_key_name = client_ip_as_string + "_information_" +
3377             print_time_t_in_fastnetmon_format(current_attack.ban_timestamp);
3378 
3379         // We could not use dot in key names: http://docs.mongodb.org/manual/core/document/#dot-notation
3380         std::replace(mongo_key_name.begin(), mongo_key_name.end(), '.', '_');
3381 
3382         logger << log4cpp::Priority::INFO << "Start data save in Mongo in key: " << mongo_key_name;
3383         boost::thread  mongo_store_thread(store_data_in_mongo, mongo_key_name, basic_attack_information_in_json);
3384         mongo_store_thread.detach();
3385         logger << log4cpp::Priority::INFO << "Finish data save in Mongo in key: " << mongo_key_name;
3386     }
3387 #endif
3388 }
3389 
3390 
3391 /* Thread for cleaning up ban list */
cleanup_ban_list()3392 void cleanup_ban_list() {
3393     // If we use very small ban time we should call ban_cleanup thread more often
3394     if (unban_iteration_sleep_time > global_ban_time) {
3395         unban_iteration_sleep_time = int(global_ban_time / 2);
3396 
3397         logger << log4cpp::Priority::INFO << "You are using enough small ban time " << global_ban_time
3398             << " we need reduce unban_iteration_sleep_time twices to " << unban_iteration_sleep_time << " seconds";
3399     }
3400 
3401     logger << log4cpp::Priority::INFO << "Run banlist cleanup thread, we will awake every " << unban_iteration_sleep_time << " seconds";
3402 
3403     while (true) {
3404         boost::this_thread::sleep(boost::posix_time::seconds(unban_iteration_sleep_time));
3405 
3406         time_t current_time;
3407         time(&current_time);
3408 
3409         std::vector<uint32_t> ban_list_items_for_erase;
3410 
3411         for (std::map<uint32_t, banlist_item>::iterator itr = ban_list.begin(); itr != ban_list.end(); ++itr) {
3412             uint32_t client_ip = itr->first;
3413 
3414             // This IP should be banned permanentely and we skip any processing
3415             if (!itr->second.unban_enabled) {
3416                 continue;
3417             }
3418 
3419             double time_difference = difftime(current_time, itr->second.ban_timestamp);
3420             int ban_time = itr->second.ban_time;
3421 
3422             // Yes, we reached end of ban time for this customer
3423             bool we_could_unban_this_ip = time_difference > ban_time;
3424 
3425             // We haven't reached time for unban yet
3426             if (!we_could_unban_this_ip) {
3427                 continue;
3428             }
3429 
3430             // Check about ongoing attack
3431             if (unban_only_if_attack_finished) {
3432                 std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
3433                 uint32_t subnet_in_host_byte_order = ntohl(itr->second.customer_network.first);
3434                 int64_t shift_in_vector = (int64_t)ntohl(client_ip) - (int64_t)subnet_in_host_byte_order;
3435 
3436                 // Try to find average speed element
3437                 map_of_vector_counters::iterator itr_average_speed =
3438                     SubnetVectorMapSpeedAverage.find(itr->second.customer_network);
3439 
3440                 if (itr_average_speed == SubnetVectorMapSpeedAverage.end()) {
3441                     logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map for unban function";
3442                     continue;
3443                 }
3444 
3445                 if (shift_in_vector < 0 or shift_in_vector >= itr_average_speed->second.size()) {
3446                     logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
3447                         << " which located outside allocated vector with size " << itr_average_speed->second.size();
3448 
3449                     continue;
3450                 }
3451 
3452                 map_element* average_speed_element = &itr_average_speed->second[shift_in_vector];
3453 
3454                 // We get ban settings from host subnet
3455                 std::string host_group_name;
3456                 ban_settings_t current_ban_settings = get_ban_settings_for_this_subnet(itr->second.customer_network, host_group_name);
3457 
3458                 if (we_should_ban_this_ip(average_speed_element, current_ban_settings)) {
3459                     logger << log4cpp::Priority::ERROR << "Attack to IP " << client_ip_as_string
3460                         << " still going! We should not unblock this host";
3461 
3462                     // Well, we still saw attack, skip to next iteration
3463                     continue;
3464                 }
3465             }
3466 
3467             // Add this IP to remove list
3468             // We will remove keyas really after this loop
3469             ban_list_items_for_erase.push_back(itr->first);
3470 
3471             // Call all hooks for unban
3472             call_unban_handlers(itr->first, itr->second);
3473         }
3474 
3475         // Remove all unbanned hosts from the ban list
3476         for (std::vector<uint32_t>::iterator itr = ban_list_items_for_erase.begin(); itr != ban_list_items_for_erase.end(); ++itr) {
3477             ban_list_mutex.lock();
3478             ban_list.erase(*itr);
3479             ban_list_mutex.unlock();
3480         }
3481     }
3482 }
3483 
call_unban_handlers(uint32_t client_ip,attack_details & current_attack)3484 void call_unban_handlers(uint32_t client_ip, attack_details& current_attack) {
3485     std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
3486 
3487     logger << log4cpp::Priority::INFO << "We will unban banned IP: " << client_ip_as_string
3488         << " because it ban time " << current_attack.ban_time << " seconds is ended";
3489 
3490     if (notify_script_enabled) {
3491         std::string data_direction_as_string = get_direction_name(current_attack.attack_direction);
3492         std::string pps_as_string = convert_int_to_string(current_attack.attack_power);
3493 
3494         std::string script_call_params = notify_script_path + " " + client_ip_as_string +
3495             " " + data_direction_as_string + " " +  pps_as_string + " unban";
3496 
3497         logger << log4cpp::Priority::INFO << "Call script for unban client: " << client_ip_as_string;
3498 
3499         // We should execute external script in separate thread because any lag in this
3500         // code will be very distructive
3501         boost::thread exec_thread(exec, script_call_params);
3502         exec_thread.detach();
3503 
3504         logger << log4cpp::Priority::INFO << "Script for unban client is finished: " << client_ip_as_string;
3505     }
3506 
3507     if (exabgp_enabled) {
3508         logger << log4cpp::Priority::INFO
3509                << "Call ExaBGP for unban client started: " << client_ip_as_string;
3510 
3511         boost::thread exabgp_thread(exabgp_ban_manage, "unban", client_ip_as_string, current_attack);
3512         exabgp_thread.detach();
3513 
3514         logger << log4cpp::Priority::INFO << "Call to ExaBGP for unban client is finished: " << client_ip_as_string;
3515     }
3516 
3517 #ifdef ENABLE_GOBGP
3518     if (gobgp_enabled) {
3519         logger << log4cpp::Priority::INFO << "Call GoBGP for unban client started: " << client_ip_as_string;
3520 
3521         boost::thread gobgp_thread(gobgp_ban_manage, "unban", client_ip_as_string, current_attack);
3522         gobgp_thread.detach();
3523 
3524         logger << log4cpp::Priority::INFO << "Call to GoBGP for unban client is finished: " << client_ip_as_string;
3525     }
3526 #endif
3527 }
3528 
print_ddos_attack_details()3529 std::string print_ddos_attack_details() {
3530     std::stringstream output_buffer;
3531 
3532     for (std::map<uint32_t, banlist_item>::iterator ii = ban_list.begin(); ii != ban_list.end(); ++ii) {
3533         uint32_t client_ip = (*ii).first;
3534 
3535         std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
3536         std::string max_pps_as_string = convert_int_to_string(((*ii).second).max_attack_power);
3537         std::string attack_direction = get_direction_name(((*ii).second).attack_direction);
3538 
3539         output_buffer << client_ip_as_string << "/" << max_pps_as_string << " pps "
3540                       << attack_direction << " at "
3541                       << print_time_t_in_fastnetmon_format(ii->second.ban_timestamp) << std::endl;
3542 
3543         send_attack_details(client_ip, (*ii).second);
3544     }
3545 
3546 
3547     return output_buffer.str();
3548 }
3549 
get_attack_description(uint32_t client_ip,attack_details & current_attack)3550 std::string get_attack_description(uint32_t client_ip, attack_details& current_attack) {
3551     std::stringstream attack_description;
3552 
3553     attack_description << "IP: " << convert_ip_as_uint_to_string(client_ip) << "\n";
3554     attack_description << serialize_attack_description(current_attack) << "\n";
3555 
3556     if (enable_subnet_counters) {
3557         // Got subnet tracking structure
3558         // TODO: we suppose case "no key exists" is not possible
3559         map_element network_speed_meter = PerSubnetSpeedMap[ current_attack.customer_network ];
3560         map_element average_network_speed_meter = PerSubnetAverageSpeedMap[ current_attack.customer_network ];
3561 
3562         attack_description <<"Network: " << convert_subnet_to_string(current_attack.customer_network) << "\n";
3563 
3564         attack_description << serialize_network_load_to_text(network_speed_meter, false);
3565         attack_description << serialize_network_load_to_text(average_network_speed_meter, true);
3566     }
3567 
3568     attack_description << serialize_statistic_counters_about_attack(current_attack);
3569 
3570     return attack_description.str();
3571 }
3572 
get_attack_description_in_json(uint32_t client_ip,attack_details & current_attack)3573 std::string get_attack_description_in_json(uint32_t client_ip, attack_details& current_attack) {
3574     json_object* jobj = json_object_new_object();
3575 
3576     json_object_object_add(jobj, "ip", json_object_new_string(convert_ip_as_uint_to_string(client_ip).c_str()));
3577     json_object_object_add(jobj, "attack_details", serialize_attack_description_to_json(current_attack) );
3578 
3579     if (enable_subnet_counters) {
3580         map_element network_speed_meter = PerSubnetSpeedMap[ current_attack.customer_network ];
3581         map_element average_network_speed_meter = PerSubnetAverageSpeedMap[ current_attack.customer_network ];
3582 
3583         json_object_object_add(jobj, "network_load", serialize_network_load_to_json(network_speed_meter));
3584         json_object_object_add(jobj, "network_average_load", serialize_network_load_to_json(average_network_speed_meter));
3585     }
3586 
3587     // So we haven't statistic_counters here but from my point of view they are useless
3588 
3589     std::string json_as_text = json_object_to_json_string(jobj);
3590 
3591     // Free memory
3592     json_object_put(jobj);
3593 
3594     return json_as_text;
3595 }
3596 
generate_simple_packets_dump(std::vector<simple_packet> & ban_list_details)3597 std::string generate_simple_packets_dump(std::vector<simple_packet>& ban_list_details) {
3598     std::stringstream attack_details;
3599 
3600     std::map<unsigned int, unsigned int> protocol_counter;
3601     for (std::vector<simple_packet>::iterator iii = ban_list_details.begin(); iii != ban_list_details.end(); ++iii) {
3602             attack_details << print_simple_packet(*iii);
3603 
3604         protocol_counter[iii->protocol]++;
3605     }
3606 
3607     std::map<unsigned int, unsigned int>::iterator max_proto =
3608     std::max_element(protocol_counter.begin(), protocol_counter.end(), protocol_counter.value_comp());
3609     /*
3610     attack_details
3611         << "\n"
3612         << "We got more packets (" << max_proto->second << " from " << ban_details_records_count
3613         << ") for protocol: " << get_protocol_name_by_number(max_proto->first) << "\n";
3614     */
3615 
3616     return attack_details.str();
3617 }
3618 
send_attack_details(uint32_t client_ip,attack_details current_attack_details)3619 void send_attack_details(uint32_t client_ip, attack_details current_attack_details) {
3620     std::string pps_as_string = convert_int_to_string(current_attack_details.attack_power);
3621     std::string attack_direction = get_direction_name(current_attack_details.attack_direction);
3622     std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
3623 
3624     // Very strange code but it work in 95% cases
3625     if (ban_list_details.count(client_ip) > 0 && ban_list_details[client_ip].size() >= ban_details_records_count) {
3626         std::stringstream attack_details;
3627 
3628         attack_details << get_attack_description(client_ip, current_attack_details) << "\n\n";
3629         attack_details << generate_simple_packets_dump(ban_list_details[client_ip]);
3630 
3631         logger << log4cpp::Priority::INFO << "Attack with direction: " << attack_direction
3632                << " IP: " << client_ip_as_string << " Power: " << pps_as_string
3633                << " traffic samples collected";
3634 
3635         call_attack_details_handlers(client_ip, current_attack_details, attack_details.str());
3636 
3637         // TODO: here we have definitely RACE CONDITION!!! FIX IT
3638 
3639         // Remove key and prevent collection new data about this attack
3640         ban_list_details_mutex.lock();
3641         ban_list_details.erase(client_ip);
3642         ban_list_details_mutex.unlock();
3643     }
3644 }
3645 
3646 #ifdef ENABLE_DPI
3647 // Parse raw binary stand-alone packet with nDPI
dpi_parse_packet(char * buffer,uint32_t len,uint32_t snap_len,struct ndpi_id_struct * src,struct ndpi_id_struct * dst,struct ndpi_flow_struct * flow,std::string & parsed_packet_as_string)3648 ndpi_protocol dpi_parse_packet(char* buffer, uint32_t len, uint32_t snap_len, struct ndpi_id_struct *src, struct ndpi_id_struct *dst, struct ndpi_flow_struct *flow, std::string& parsed_packet_as_string) {
3649     struct pfring_pkthdr packet_header;
3650     memset(&packet_header, 0, sizeof(packet_header));
3651     packet_header.len = len;
3652     packet_header.caplen = snap_len;
3653 
3654     fastnetmon_parse_pkt((u_char*)buffer, &packet_header, 4, 1, 0);
3655 
3656     uint32_t current_tickt = 0;
3657     uint8_t* iph = (uint8_t*)(&buffer[packet_header.extended_hdr.parsed_pkt.offset.l3_offset]);
3658     unsigned int ipsize = packet_header.len;
3659 
3660     ndpi_protocol detected_protocol = ndpi_detection_process_packet(my_ndpi_struct, flow, iph, ipsize, current_tickt, src, dst);
3661 
3662     // So bad approach :(
3663     char print_buffer[512];
3664     fastnetmon_print_parsed_pkt(print_buffer, 512, (u_char*)buffer, &packet_header);
3665 
3666     parsed_packet_as_string = std::string(print_buffer);
3667 
3668     return detected_protocol;
3669 }
3670 #endif
3671 
3672 #ifdef ENABLE_DPI
init_current_instance_of_ndpi()3673 void init_current_instance_of_ndpi() {
3674     my_ndpi_struct = init_ndpi();
3675 
3676     if (my_ndpi_struct == NULL) {
3677         logger << log4cpp::Priority::ERROR << "Can't load nDPI, disable it!";
3678         process_pcap_attack_dumps_with_dpi = false;
3679 
3680         return;
3681     }
3682 
3683     // Load sizes of main parsing structures
3684     ndpi_size_id_struct   = ndpi_detection_get_sizeof_ndpi_id_struct();
3685     ndpi_size_flow_struct = ndpi_detection_get_sizeof_ndpi_flow_struct();
3686 }
3687 
3688 // Zeroify nDPI structure without memory leaks
zeroify_ndpi_flow(struct ndpi_flow_struct * flow)3689 void zeroify_ndpi_flow(struct ndpi_flow_struct* flow) {
3690     if (flow->http.url) {
3691         ndpi_free(flow->http.url);
3692     }
3693 
3694     if (flow->http.content_type) {
3695         ndpi_free(flow->http.content_type);
3696     }
3697 
3698     memset(flow, 0, ndpi_size_flow_struct);
3699 }
3700 
3701 // Run flow spec mitigation rule
launch_bgp_flow_spec_rule(amplification_attack_type_t attack_type,std::string client_ip_as_string)3702 void launch_bgp_flow_spec_rule(amplification_attack_type_t attack_type, std::string client_ip_as_string) {
3703     logger << log4cpp::Priority::INFO << "We detected this attack as: " << get_amplification_attack_type(attack_type);
3704 
3705     std::string flow_spec_rule_text = generate_flow_spec_for_amplification_attack(attack_type, client_ip_as_string);
3706 
3707     logger << log4cpp::Priority::INFO << "We have generated BGP Flow Spec rule for this attack: " << flow_spec_rule_text;
3708 
3709     if (exabgp_flow_spec_announces) {
3710         active_flow_spec_announces_t::iterator itr = active_flow_spec_announces.find(flow_spec_rule_text);
3711 
3712         if (itr == active_flow_spec_announces.end()) {
3713             // We havent this flow spec rule active yet
3714 
3715             logger << log4cpp::Priority::INFO << "We will publish flow spec announce about this attack";
3716             bool exabgp_publish_result = exabgp_flow_spec_ban_manage("ban", flow_spec_rule_text);
3717 
3718             if (exabgp_publish_result) {
3719                 active_flow_spec_announces[ flow_spec_rule_text ] = 1;
3720             }
3721         } else {
3722             // We have already blocked this attack
3723             logger << log4cpp::Priority::INFO << "The same rule was already sent to ExaBGP formerly";
3724         }
3725     } else {
3726           logger << log4cpp::Priority::INFO << "exabgp_flow_spec_announces disabled. We will not talk to ExaBGP";
3727     }
3728 }
3729 
3730 // Not so pretty copy and paste from pcap_reader()
3731 // TODO: rewrite to memory parser
produce_dpi_dump_for_pcap_dump(std::string pcap_file_path,std::stringstream & ss,std::string client_ip_as_string)3732 void produce_dpi_dump_for_pcap_dump(std::string pcap_file_path, std::stringstream& ss, std::string client_ip_as_string) {
3733     int filedesc = open(pcap_file_path.c_str(), O_RDONLY);
3734 
3735     if (filedesc <= 0) {
3736         logger << log4cpp::Priority::ERROR << "Can't open file for DPI";
3737         return;
3738     }
3739 
3740     struct fastnetmon_pcap_file_header pcap_header;
3741     ssize_t file_header_readed_bytes = read(filedesc, &pcap_header, sizeof(struct fastnetmon_pcap_file_header));
3742 
3743     if (file_header_readed_bytes != sizeof(struct fastnetmon_pcap_file_header)) {
3744         logger << log4cpp::Priority::ERROR << "Can't read pcap file header";
3745         return;
3746     }
3747 
3748     // http://www.tcpdump.org/manpages/pcap-savefile.5.html
3749     if (pcap_header.magic == 0xa1b2c3d4 or pcap_header.magic == 0xd4c3b2a1) {
3750         // printf("Magic readed correctly\n");
3751     } else {
3752         logger << log4cpp::Priority::ERROR << "Magic in file header broken";
3753         return;
3754     }
3755 
3756     // Buffer for packets
3757     char packet_buffer[pcap_header.snaplen];
3758 
3759     unsigned int total_packets_number = 0;
3760 
3761     uint64_t dns_amplification_packets = 0;
3762     uint64_t ntp_amplification_packets = 0;
3763     uint64_t ssdp_amplification_packets = 0;
3764     uint64_t snmp_amplification_packets = 0;
3765 
3766 
3767     struct ndpi_id_struct *src = (struct ndpi_id_struct*)malloc(ndpi_size_id_struct);
3768     memset(src, 0, ndpi_size_id_struct);
3769 
3770     struct ndpi_id_struct* dst = (struct ndpi_id_struct*)malloc(ndpi_size_id_struct);
3771     memset(dst, 0, ndpi_size_id_struct);
3772 
3773     struct ndpi_flow_struct* flow = (struct ndpi_flow_struct*)malloc(ndpi_size_flow_struct);
3774     memset(flow, 0, ndpi_size_flow_struct);
3775 
3776     while (1) {
3777         struct fastnetmon_pcap_pkthdr pcap_packet_header;
3778         ssize_t packet_header_readed_bytes =
3779         read(filedesc, &pcap_packet_header, sizeof(struct fastnetmon_pcap_pkthdr));
3780 
3781         if (packet_header_readed_bytes != sizeof(struct fastnetmon_pcap_pkthdr)) {
3782             if (packet_header_readed_bytes != 0) {
3783                 logger << log4cpp::Priority::INFO << "All packet read ? (" << packet_header_readed_bytes << ", " << errno << ")";
3784             }
3785             // We haven't any packets
3786             break;
3787         }
3788 
3789         if (pcap_packet_header.incl_len > pcap_header.snaplen) {
3790             logger << log4cpp::Priority::ERROR << "Please enlarge packet buffer for DPI";
3791             return;
3792         }
3793 
3794         ssize_t packet_payload_readed_bytes = read(filedesc, packet_buffer, pcap_packet_header.incl_len);
3795 
3796         if (pcap_packet_header.incl_len != packet_payload_readed_bytes) {
3797             logger << log4cpp::Priority::ERROR << "I read packet header but can't read packet payload";
3798             return;
3799         }
3800 
3801         // The flow must be reset to zero state - in other case the DPI will not detect all packets properly.
3802         // To use flow properly there must be much more complicated code (with flow buffer for each flow probably)
3803         // following code is copied from ndpi_free_flow() just to be sure there will be no memory leaks due to memset()
3804         zeroify_ndpi_flow(flow);
3805 
3806         std::string parsed_packet_as_string;
3807 
3808         ndpi_protocol detected_protocol = dpi_parse_packet(packet_buffer, pcap_packet_header.orig_len, pcap_packet_header.incl_len, src, dst, flow, parsed_packet_as_string);
3809 
3810 #if NDPI_MAJOR >= 2
3811         u_int16_t app_protocol = detected_protocol.app_protocol;
3812 #else
3813         u_int16_t app_protocol = detected_protocol.protocol;
3814 #endif
3815         char* protocol_name = ndpi_get_proto_name(my_ndpi_struct, app_protocol);
3816         char* master_protocol_name = ndpi_get_proto_name(my_ndpi_struct, detected_protocol.master_protocol);
3817 
3818         if (app_protocol == NDPI_PROTOCOL_DNS) {
3819             // It's answer for ANY request with so much
3820             if (flow->protos.dns.query_type == 255 && flow->protos.dns.num_queries < flow->protos.dns.num_answers) {
3821                 dns_amplification_packets++;
3822             }
3823 
3824         } else if (app_protocol == NDPI_PROTOCOL_NTP) {
3825             // Detect packets with type MON_GETLIST_1
3826             if (flow->protos.ntp.version == 2 && flow->protos.ntp.request_code == 42) {
3827                 ntp_amplification_packets++;
3828             }
3829         } else if (app_protocol == NDPI_PROTOCOL_SSDP) {
3830             // So, this protocol completely unexpected in WAN networks
3831             ssdp_amplification_packets++;
3832         } else if (app_protocol == NDPI_PROTOCOL_SNMP) {
3833             // TODO: we need detailed tests for SNMP!
3834             snmp_amplification_packets++;
3835         }
3836 
3837         ss << parsed_packet_as_string << " protocol: " << protocol_name << " master_protocol: " << master_protocol_name << "\n";
3838 
3839         total_packets_number++;
3840     }
3841 
3842     // Free up all memory
3843     ndpi_free_flow(flow);
3844     free(dst);
3845     free(src);
3846 
3847     close(filedesc);
3848 
3849     logger << log4cpp::Priority::INFO
3850            << "DPI pkt stats: total:"  << total_packets_number
3851                            << " DNS:"  << dns_amplification_packets
3852                            << " NTP:"  << ntp_amplification_packets
3853                            << " SSDP:" << ssdp_amplification_packets
3854                            << " SNMP:" << snmp_amplification_packets;
3855 
3856     amplification_attack_type_t attack_type;
3857 
3858     // Attack type in unknown by default
3859     attack_type = AMPLIFICATION_ATTACK_UNKNOWN;
3860 
3861     // Detect amplification attack type
3862     if ( (double)dns_amplification_packets / (double)total_packets_number > 0.2) {
3863         launch_bgp_flow_spec_rule(AMPLIFICATION_ATTACK_DNS, client_ip_as_string);
3864     } else if ( (double)ntp_amplification_packets / (double)total_packets_number > 0.2) {
3865         launch_bgp_flow_spec_rule(AMPLIFICATION_ATTACK_NTP, client_ip_as_string);
3866     } else if ( (double)ssdp_amplification_packets / (double)total_packets_number > 0.2) {
3867         launch_bgp_flow_spec_rule(AMPLIFICATION_ATTACK_SSDP, client_ip_as_string);
3868     } else if ( (double)snmp_amplification_packets / (double)total_packets_number > 0.2) {
3869         launch_bgp_flow_spec_rule(AMPLIFICATION_ATTACK_SNMP, client_ip_as_string);
3870     } else {
3871         /*TODO
3872             - full IP ban should be announced here !
3873             - and maybe some protocol/port based statistics could be used to filter new/unknown attacks...
3874         */
3875 
3876         logger << log4cpp::Priority::ERROR << "We can't detect attack type with DPI. It's not so critical, only for your information";
3877     }
3878 }
3879 
3880 #endif
3881 
call_attack_details_handlers(uint32_t client_ip,attack_details & current_attack,std::string attack_fingerprint)3882 void call_attack_details_handlers(uint32_t client_ip, attack_details& current_attack, std::string attack_fingerprint) {
3883     std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
3884     std::string attack_direction = get_direction_name(current_attack.attack_direction);
3885     std::string pps_as_string = convert_int_to_string(current_attack.attack_power);
3886 
3887     // We place this variables here because we need this paths from DPI parser code
3888     std::string ban_timestamp_as_string = print_time_t_in_fastnetmon_format(current_attack.ban_timestamp);
3889     std::string attack_pcap_dump_path = attack_details_folder + "/" + client_ip_as_string + "_" + ban_timestamp_as_string + ".pcap";
3890 
3891     if (collect_attack_pcap_dumps) {
3892         int pcap_fump_filedesc = open(attack_pcap_dump_path.c_str(), O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR);
3893         if (pcap_fump_filedesc <= 0) {
3894             logger << log4cpp::Priority::ERROR << "Can't open file for storing pcap dump: " << attack_pcap_dump_path;
3895         } else {
3896             ssize_t wrote_bytes = write(pcap_fump_filedesc,
3897                 (void*)current_attack.pcap_attack_dump.get_buffer_pointer(),
3898                 current_attack.pcap_attack_dump.get_used_memory());
3899 
3900             if (wrote_bytes != current_attack.pcap_attack_dump.get_used_memory()) {
3901                  logger << log4cpp::Priority::ERROR << "Can't wrote all attack details to the disk correctly";
3902             }
3903 
3904             close (pcap_fump_filedesc);
3905 
3906             // Freeup memory
3907             current_attack.pcap_attack_dump.deallocate_buffer();
3908         }
3909     }
3910 
3911 #ifdef ENABLE_DPI
3912     // Yes, will be fine to read packets from the memory but we haven't this code yet
3913     // Thus we could read from file with not good performance because it's simpler
3914     if (collect_attack_pcap_dumps && process_pcap_attack_dumps_with_dpi) {
3915         std::stringstream string_buffer_for_dpi_data;
3916 
3917         string_buffer_for_dpi_data << "\n\nDPI\n\n";
3918 
3919         produce_dpi_dump_for_pcap_dump(attack_pcap_dump_path, string_buffer_for_dpi_data, client_ip_as_string);
3920 
3921         attack_fingerprint = attack_fingerprint + string_buffer_for_dpi_data.str();
3922     }
3923 #endif
3924 
3925     print_attack_details_to_file(attack_fingerprint, client_ip_as_string, current_attack);
3926 
3927     // Pass attack details to script
3928     if (notify_script_enabled) {
3929             logger << log4cpp::Priority::INFO
3930                    << "Call script for notify about attack details for: " << client_ip_as_string;
3931 
3932             std::string script_params = notify_script_path + " " + client_ip_as_string + " " +
3933                                         attack_direction + " " + pps_as_string + " attack_details";
3934 
3935             // We should execute external script in separate thread because any lag in this code
3936             // will be very distructive
3937             boost::thread exec_with_params_thread(exec_with_stdin_params, script_params, attack_fingerprint);
3938             exec_with_params_thread.detach();
3939 
3940             logger << log4cpp::Priority::INFO
3941                    << "Script for notify about attack details is finished: " << client_ip_as_string;
3942         }
3943 
3944 #ifdef REDIS
3945         if (redis_enabled) {
3946             std::string redis_key_name = client_ip_as_string + "_packets_dump";
3947 
3948             if (!redis_prefix.empty()) {
3949                 redis_key_name = redis_prefix + "_" + client_ip_as_string + "_packets_dump";
3950             }
3951 
3952             logger << log4cpp::Priority::INFO << "Start data save in redis for key: " << redis_key_name;
3953             boost::thread redis_store_thread(store_data_in_redis, redis_key_name, attack_fingerprint);
3954             redis_store_thread.detach();
3955             logger << log4cpp::Priority::INFO << "Finish data save in redis for key: " << redis_key_name;
3956         }
3957 #endif
3958 }
3959 
convert_conntrack_hash_struct_to_integer(packed_conntrack_hash * struct_value)3960 uint64_t convert_conntrack_hash_struct_to_integer(packed_conntrack_hash* struct_value) {
3961     uint64_t unpacked_data = 0;
3962     memcpy(&unpacked_data, struct_value, sizeof(uint64_t));
3963     return unpacked_data;
3964 }
3965 
convert_integer_to_conntrack_hash_struct(packed_session * packed_connection_data,packed_conntrack_hash * unpacked_data)3966 void convert_integer_to_conntrack_hash_struct(packed_session* packed_connection_data,
3967                                               packed_conntrack_hash* unpacked_data) {
3968     memcpy(unpacked_data, packed_connection_data, sizeof(uint64_t));
3969 }
3970 
print_flow_tracking_for_specified_protocol(contrack_map_type & protocol_map,std::string client_ip,direction flow_direction)3971 std::string print_flow_tracking_for_specified_protocol(contrack_map_type& protocol_map,
3972                                                        std::string client_ip,
3973                                                        direction flow_direction) {
3974     std::stringstream buffer;
3975     // We shoud iterate over all fields
3976 
3977     int printed_records = 0;
3978     for (contrack_map_type::iterator itr = protocol_map.begin(); itr != protocol_map.end(); ++itr) {
3979         // We should limit number of records in flow dump because syn flood attacks produce
3980         // thounsands of lines
3981         if (printed_records > ban_details_records_count) {
3982             buffer << "Flows have cropped due to very long list.\n";
3983             break;
3984         }
3985 
3986         uint64_t packed_connection_data = itr->first;
3987         packed_conntrack_hash unpacked_key_struct;
3988         convert_integer_to_conntrack_hash_struct(&packed_connection_data, &unpacked_key_struct);
3989 
3990         std::string opposite_ip_as_string = convert_ip_as_uint_to_string(unpacked_key_struct.opposite_ip);
3991         if (flow_direction == INCOMING) {
3992             buffer << client_ip << ":" << unpacked_key_struct.dst_port << " < "
3993                    << opposite_ip_as_string << ":" << unpacked_key_struct.src_port << " ";
3994         } else if (flow_direction == OUTGOING) {
3995             buffer << client_ip << ":" << unpacked_key_struct.src_port << " > "
3996                    << opposite_ip_as_string << ":" << unpacked_key_struct.dst_port << " ";
3997         }
3998 
3999         buffer << itr->second.bytes << " bytes " << itr->second.packets << " packets";
4000         buffer << "\n";
4001 
4002         printed_records++;
4003     }
4004 
4005     return buffer.str();
4006 }
4007 
4008 /*
4009     Attack types:
4010         - syn flood: one local port, multiple remote hosts (and maybe multiple remote ports) and
4011    small packet size
4012 */
4013 
4014 /* Iterate over all flow tracking table */
process_flow_tracking_table(conntrack_main_struct & conntrack_element,std::string client_ip)4015 bool process_flow_tracking_table(conntrack_main_struct& conntrack_element, std::string client_ip) {
4016     std::map<uint32_t, unsigned int> uniq_remote_hosts_which_generate_requests_to_us;
4017     std::map<unsigned int, unsigned int> uniq_local_ports_which_target_of_connectiuons_from_inside;
4018 
4019     /* Process incoming TCP connections */
4020     for (contrack_map_type::iterator itr = conntrack_element.in_tcp.begin();
4021          itr != conntrack_element.in_tcp.end(); ++itr) {
4022         uint64_t packed_connection_data = itr->first;
4023         packed_conntrack_hash unpacked_key_struct;
4024         convert_integer_to_conntrack_hash_struct(&packed_connection_data, &unpacked_key_struct);
4025 
4026         uniq_remote_hosts_which_generate_requests_to_us[unpacked_key_struct.opposite_ip]++;
4027         uniq_local_ports_which_target_of_connectiuons_from_inside[unpacked_key_struct.dst_port]++;
4028 
4029         // we can calc average packet size
4030         // string opposite_ip_as_string =
4031         // convert_ip_as_uint_to_string(unpacked_key_struct.opposite_ip);
4032         // unpacked_key_struct.src_port
4033         // unpacked_key_struct.dst_port
4034         // itr->second.packets
4035         // itr->second.bytes
4036     }
4037 
4038     return true;
4039 }
4040 
print_flow_tracking_for_ip(conntrack_main_struct & conntrack_element,std::string client_ip)4041 std::string print_flow_tracking_for_ip(conntrack_main_struct& conntrack_element, std::string client_ip) {
4042     std::stringstream buffer;
4043 
4044     std::string in_tcp =
4045     print_flow_tracking_for_specified_protocol(conntrack_element.in_tcp, client_ip, INCOMING);
4046     std::string in_udp =
4047     print_flow_tracking_for_specified_protocol(conntrack_element.in_udp, client_ip, INCOMING);
4048 
4049     unsigned long long total_number_of_incoming_tcp_flows = conntrack_element.in_tcp.size();
4050     unsigned long long total_number_of_incoming_udp_flows = conntrack_element.in_udp.size();
4051 
4052     unsigned long long total_number_of_outgoing_tcp_flows = conntrack_element.out_tcp.size();
4053     unsigned long long total_number_of_outgoing_udp_flows = conntrack_element.out_udp.size();
4054 
4055     bool we_have_incoming_flows = in_tcp.length() > 0 or in_udp.length() > 0;
4056     if (we_have_incoming_flows) {
4057         buffer << "Incoming\n\n";
4058 
4059         if (in_tcp.length() > 0) {
4060             buffer << "TCP flows: " << total_number_of_incoming_tcp_flows << "\n";
4061             buffer << in_tcp << "\n";
4062         }
4063 
4064         if (in_udp.length() > 0) {
4065             buffer << "UDP flows: " << total_number_of_incoming_udp_flows << "\n";
4066             buffer << in_udp << "\n";
4067         }
4068     }
4069 
4070     std::string out_tcp =
4071     print_flow_tracking_for_specified_protocol(conntrack_element.out_tcp, client_ip, OUTGOING);
4072     std::string out_udp =
4073     print_flow_tracking_for_specified_protocol(conntrack_element.out_udp, client_ip, OUTGOING);
4074 
4075     bool we_have_outgoing_flows = out_tcp.length() > 0 or out_udp.length() > 0;
4076 
4077     // print delimiter if we have flows in both directions
4078     if (we_have_incoming_flows && we_have_outgoing_flows) {
4079         buffer << "\n";
4080     }
4081 
4082     if (we_have_outgoing_flows) {
4083         buffer << "Outgoing\n\n";
4084 
4085         if (out_tcp.length() > 0) {
4086             buffer << "TCP flows: " << total_number_of_outgoing_tcp_flows << "\n";
4087             buffer << out_tcp << "\n";
4088         }
4089 
4090         if (out_udp.length() > 0) {
4091             buffer << "UDP flows: " << total_number_of_outgoing_udp_flows << "\n";
4092             buffer << out_udp << "\n";
4093         }
4094     }
4095 
4096     return buffer.str();
4097 }
4098 
print_subnet_load()4099 std::string print_subnet_load() {
4100     std::stringstream buffer;
4101 
4102     sort_type sorter;
4103     if (sort_parameter == "packets") {
4104         sorter = PACKETS;
4105     } else if (sort_parameter == "bytes") {
4106         sorter = BYTES;
4107     } else if (sort_parameter == "flows") {
4108         sorter = FLOWS;
4109     } else {
4110         logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
4111         sorter = PACKETS;
4112     }
4113 
4114     std::vector<pair_of_map_for_subnet_counters_elements_t> vector_for_sort;
4115     vector_for_sort.reserve(PerSubnetSpeedMap.size());
4116 
4117     for (map_for_subnet_counters::iterator itr = PerSubnetSpeedMap.begin(); itr != PerSubnetSpeedMap.end(); ++itr) {
4118         vector_for_sort.push_back(std::make_pair(itr->first, itr->second));
4119     }
4120 
4121     std::sort(vector_for_sort.begin(), vector_for_sort.end(),
4122         TrafficComparatorClass<pair_of_map_for_subnet_counters_elements_t>(INCOMING, sorter));
4123 
4124     graphite_data_t graphite_data;
4125 
4126     for (std::vector<pair_of_map_for_subnet_counters_elements_t>::iterator itr = vector_for_sort.begin(); itr != vector_for_sort.end(); ++itr) {
4127         map_element* speed = &itr->second;
4128         std::string subnet_as_string = convert_subnet_to_string(itr->first);
4129 
4130         buffer
4131             << std::setw(18)
4132             << std::left
4133             << subnet_as_string;
4134 
4135         if (graphite_enabled) {
4136             std::string subnet_as_string_as_dash_delimiters = subnet_as_string;
4137 
4138             // Replace dots by dashes
4139             std::replace(subnet_as_string_as_dash_delimiters.begin(),
4140                 subnet_as_string_as_dash_delimiters.end(), '.', '_');
4141 
4142             // Replace / by dashes too
4143             std::replace(subnet_as_string_as_dash_delimiters.begin(),
4144                 subnet_as_string_as_dash_delimiters.end(), '/', '_');
4145 
4146             graphite_data[ graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".incoming.pps" ] = speed->in_packets;
4147             graphite_data[ graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".outgoing.pps" ] = speed->out_packets;
4148 
4149             graphite_data[ graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".incoming.bps" ] = speed->in_bytes * 8;
4150             graphite_data[ graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".outgoing.bps" ] = speed->out_bytes * 8;
4151         }
4152 
4153         buffer
4154             << " "
4155             << "pps in: "   << std::setw(8) << speed->in_packets
4156             << " out: "     << std::setw(8) << speed->out_packets
4157             << " mbps in: " << std::setw(5) << convert_speed_to_mbps(speed->in_bytes)
4158             << " out: "     << std::setw(5) << convert_speed_to_mbps(speed->out_bytes)
4159             << "\n";
4160     }
4161 
4162     if (graphite_enabled) {
4163         bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
4164 
4165         if (!graphite_put_result) {
4166             logger << log4cpp::Priority::ERROR << "Can't store network load data to Graphite";
4167         }
4168     }
4169 
4170     return buffer.str();
4171 }
4172 
print_ban_thresholds(ban_settings_t current_ban_settings)4173 std::string print_ban_thresholds(ban_settings_t current_ban_settings) {
4174     std::stringstream output_buffer;
4175 
4176     output_buffer << "Configuration params:\n";
4177     if (current_ban_settings.enable_ban) {
4178         output_buffer << "We call ban script: yes\n";
4179     } else {
4180         output_buffer << "We call ban script: no\n";
4181     }
4182 
4183     output_buffer << "Packets per second: ";
4184     if (current_ban_settings.enable_ban_for_pps) {
4185         output_buffer << current_ban_settings.ban_threshold_pps;
4186     } else {
4187         output_buffer << "disabled";
4188     }
4189 
4190     output_buffer << "\n";
4191 
4192     output_buffer << "Mbps per second: ";
4193     if (current_ban_settings.enable_ban_for_bandwidth) {
4194         output_buffer << current_ban_settings.ban_threshold_mbps;
4195     } else {
4196         output_buffer << "disabled";
4197     }
4198 
4199     output_buffer << "\n";
4200 
4201     output_buffer << "Flows per second: ";
4202     if (current_ban_settings.enable_ban_for_flows_per_second) {
4203         output_buffer << current_ban_settings.ban_threshold_flows;
4204     } else {
4205         output_buffer << "disabled";
4206     }
4207 
4208     output_buffer << "\n";
4209     return output_buffer.str();
4210 }
4211 
print_attack_details_to_file(std::string details,std::string client_ip_as_string,attack_details current_attack)4212 void print_attack_details_to_file(std::string details, std::string client_ip_as_string, attack_details current_attack) {
4213     std::ofstream my_attack_details_file;
4214 
4215     std::string ban_timestamp_as_string = print_time_t_in_fastnetmon_format(current_attack.ban_timestamp);
4216     std::string attack_dump_path = attack_details_folder + "/" + client_ip_as_string + "_" + ban_timestamp_as_string + ".txt";
4217 
4218     my_attack_details_file.open(attack_dump_path.c_str(), std::ios::app);
4219 
4220     if (my_attack_details_file.is_open()) {
4221         my_attack_details_file << details << "\n\n";
4222         my_attack_details_file.close();
4223     } else {
4224         logger << log4cpp::Priority::ERROR << "Can't print attack details to file";
4225     }
4226 }
4227 
read_logging_settings(configuration_map_t configuration_map)4228 logging_configuration_t read_logging_settings(configuration_map_t configuration_map) {
4229     logging_configuration_t logging_configuration_temp;
4230 
4231     if (configuration_map.count("logging:local_syslog_logging") != 0) {
4232         logging_configuration_temp.local_syslog_logging = configuration_map["logging:local_syslog_logging"] == "on";
4233     }
4234 
4235     if (configuration_map.count("logging:remote_syslog_logging") != 0) {
4236         logging_configuration_temp.remote_syslog_logging = configuration_map["logging:remote_syslog_logging"] == "on";
4237     }
4238 
4239     if (configuration_map.count("logging:remote_syslog_server") != 0) {
4240         logging_configuration_temp.remote_syslog_server = configuration_map["logging:remote_syslog_server"];
4241     }
4242 
4243     if (configuration_map.count("logging:remote_syslog_port") != 0) {
4244         logging_configuration_temp.remote_syslog_port = convert_string_to_integer(configuration_map["logging:remote_syslog_port"]);
4245     }
4246 
4247     if (logging_configuration_temp.remote_syslog_logging) {
4248         if (logging_configuration_temp.remote_syslog_port > 0 && !logging_configuration_temp.remote_syslog_server.empty()) {
4249             logger << log4cpp::Priority::INFO << "We have configured remote syslog logging corectly";
4250         } else {
4251             logger << log4cpp::Priority::ERROR << "You have enabled remote logging but haven't specified port or host";
4252             logging_configuration_temp.remote_syslog_logging = false;
4253         }
4254     }
4255 
4256     if (logging_configuration_temp.local_syslog_logging) {
4257         logger << log4cpp::Priority::INFO << "We have configured local syslog logging corectly";
4258     }
4259 
4260     return logging_configuration_temp;
4261 }
4262 
read_ban_settings(configuration_map_t configuration_map,std::string host_group_name)4263 ban_settings_t read_ban_settings(configuration_map_t configuration_map, std::string host_group_name) {
4264     ban_settings_t ban_settings;
4265 
4266     std::string prefix = "";
4267     if (host_group_name != "") {
4268         prefix = host_group_name + "_";
4269     }
4270 
4271     if (configuration_map.count(prefix + "enable_ban") != 0) {
4272         ban_settings.enable_ban = configuration_map[prefix + "enable_ban"] == "on";
4273     }
4274 
4275     if (configuration_map.count(prefix + "ban_for_pps") != 0) {
4276         ban_settings.enable_ban_for_pps = configuration_map[prefix + "ban_for_pps"] == "on";
4277     }
4278 
4279     if (configuration_map.count(prefix + "ban_for_bandwidth") != 0) {
4280         ban_settings.enable_ban_for_bandwidth = configuration_map[prefix + "ban_for_bandwidth"] == "on";
4281     }
4282 
4283     if (configuration_map.count(prefix + "ban_for_flows") != 0) {
4284         ban_settings.enable_ban_for_flows_per_second = configuration_map[prefix + "ban_for_flows"] == "on";
4285     }
4286 
4287     // Per protocol bandwidth triggers
4288     if (configuration_map.count(prefix + "ban_for_tcp_bandwidth") != 0) {
4289         ban_settings.enable_ban_for_tcp_bandwidth = configuration_map[prefix + "ban_for_tcp_bandwidth"] == "on";
4290     }
4291 
4292     if (configuration_map.count(prefix + "ban_for_udp_bandwidth") != 0) {
4293         ban_settings.enable_ban_for_udp_bandwidth = configuration_map[prefix + "ban_for_udp_bandwidth"] == "on";
4294     }
4295 
4296     if (configuration_map.count(prefix + "ban_for_icmp_bandwidth") != 0) {
4297         ban_settings.enable_ban_for_icmp_bandwidth = configuration_map[prefix + "ban_for_icmp_bandwidth"] == "on";
4298     }
4299 
4300     // Per protocol pps ban triggers
4301     if (configuration_map.count(prefix + "ban_for_tcp_pps") != 0) {
4302         ban_settings.enable_ban_for_tcp_pps = configuration_map[prefix + "ban_for_tcp_pps"] == "on";
4303     }
4304 
4305     if (configuration_map.count(prefix + "ban_for_udp_pps") != 0) {
4306         ban_settings.enable_ban_for_udp_pps = configuration_map[prefix + "ban_for_udp_pps"] == "on";
4307     }
4308 
4309     if (configuration_map.count(prefix + "ban_for_icmp_pps") != 0) {
4310         ban_settings.enable_ban_for_icmp_pps = configuration_map[prefix + "ban_for_icmp_pps"] == "on";
4311     }
4312 
4313     // Pps per protocol thresholds
4314     if (configuration_map.count(prefix + "threshold_tcp_pps") != 0) {
4315         ban_settings.ban_threshold_tcp_pps = convert_string_to_integer(configuration_map[prefix + "threshold_tcp_pps"]);
4316     }
4317 
4318     if (configuration_map.count(prefix + "threshold_udp_pps") != 0) {
4319         ban_settings.ban_threshold_udp_pps = convert_string_to_integer(configuration_map[prefix + "threshold_udp_pps"]);
4320     }
4321 
4322     if (configuration_map.count(prefix + "threshold_icmp_pps") != 0) {
4323         ban_settings.ban_threshold_icmp_pps = convert_string_to_integer(configuration_map[prefix + "threshold_icmp_pps"]);
4324     }
4325 
4326     // Bandwidth per protocol thresholds
4327     if (configuration_map.count(prefix + "threshold_tcp_mbps") != 0) {
4328         ban_settings.ban_threshold_tcp_mbps = convert_string_to_integer(configuration_map[prefix + "threshold_tcp_mbps"]);
4329     }
4330 
4331     if (configuration_map.count(prefix + "threshold_udp_mbps") != 0) {
4332         ban_settings.ban_threshold_udp_mbps = convert_string_to_integer(configuration_map[prefix + "threshold_udp_mbps"]);
4333     }
4334 
4335     if (configuration_map.count(prefix + "threshold_icmp_mbps") != 0) {
4336         ban_settings.ban_threshold_icmp_mbps = convert_string_to_integer(configuration_map[prefix + "threshold_icmp_mbps"]);
4337     }
4338 
4339     if (configuration_map.count(prefix + "threshold_pps") != 0) {
4340         ban_settings.ban_threshold_pps = convert_string_to_integer(configuration_map[prefix + "threshold_pps"]);
4341     }
4342 
4343     if (configuration_map.count(prefix + "threshold_mbps") != 0) {
4344         ban_settings.ban_threshold_mbps = convert_string_to_integer(configuration_map[prefix + "threshold_mbps"]);
4345     }
4346 
4347     if (configuration_map.count(prefix + "threshold_flows") != 0) {
4348         ban_settings.ban_threshold_flows = convert_string_to_integer(configuration_map[prefix + "threshold_flows"]);
4349     }
4350 
4351     return ban_settings;
4352 }
4353 
4354 
4355 
exceed_pps_speed(uint64_t in_counter,uint64_t out_counter,unsigned int threshold)4356 bool exceed_pps_speed(uint64_t in_counter, uint64_t out_counter, unsigned int threshold) {
4357     if (in_counter > threshold or out_counter > threshold) {
4358         return true;
4359     } else {
4360         return false;
4361     }
4362 }
4363 
exceed_flow_speed(uint64_t in_counter,uint64_t out_counter,unsigned int threshold)4364 bool exceed_flow_speed(uint64_t in_counter, uint64_t out_counter, unsigned int threshold) {
4365     if (in_counter > threshold or out_counter > threshold) {
4366         return true;
4367     } else {
4368         return false;
4369     }
4370 }
4371 
exceed_mbps_speed(uint64_t in_counter,uint64_t out_counter,unsigned int threshold_mbps)4372 bool exceed_mbps_speed(uint64_t in_counter, uint64_t out_counter, unsigned int threshold_mbps) {
4373     if (convert_speed_to_mbps(in_counter) > threshold_mbps or convert_speed_to_mbps(out_counter) > threshold_mbps) {
4374         return true;
4375     } else {
4376         return false;
4377     }
4378 }
4379 
4380 // Return true when we should ban this IP
we_should_ban_this_ip(map_element * average_speed_element,ban_settings_t current_ban_settings)4381 bool we_should_ban_this_ip(map_element* average_speed_element, ban_settings_t current_ban_settings) {
4382     // we detect overspeed by packets
4383     bool attack_detected_by_pps = false;
4384     bool attack_detected_by_bandwidth = false;
4385     bool attack_detected_by_flow = false;
4386     if (current_ban_settings.enable_ban_for_pps &&
4387         exceed_pps_speed(average_speed_element->in_packets, average_speed_element->out_packets, current_ban_settings.ban_threshold_pps)) {
4388         logger << log4cpp::Priority::DEBUG  << "We detected this attack by pps limit";
4389         return true;
4390     }
4391 
4392     if (current_ban_settings.enable_ban_for_bandwidth &&
4393         exceed_mbps_speed(average_speed_element->in_bytes, average_speed_element->out_bytes, current_ban_settings.ban_threshold_mbps)) {
4394         logger << log4cpp::Priority::DEBUG  << "We detected this attack by mbps limit";
4395         return true;
4396     }
4397 
4398     if (current_ban_settings.enable_ban_for_flows_per_second &&
4399         exceed_flow_speed(average_speed_element->in_flows, average_speed_element->out_flows, current_ban_settings.ban_threshold_flows)) {
4400         logger << log4cpp::Priority::DEBUG  << "We detected this attack by flow limit";
4401         return true;
4402     }
4403 
4404     // We could try per protocol thresholds here
4405 
4406     // Per protocol pps thresholds
4407     if (current_ban_settings.enable_ban_for_tcp_pps &&
4408         exceed_pps_speed(average_speed_element->tcp_in_packets, average_speed_element->tcp_out_packets, current_ban_settings.ban_threshold_tcp_pps)) {
4409         logger << log4cpp::Priority::DEBUG  << "We detected this attack by tcp pps limit";
4410         return true;
4411     }
4412 
4413     if (current_ban_settings.enable_ban_for_udp_pps &&
4414         exceed_pps_speed(average_speed_element->udp_in_packets, average_speed_element->udp_out_packets, current_ban_settings.ban_threshold_udp_pps)) {
4415         logger << log4cpp::Priority::DEBUG  << "We detected this attack by udp pps limit";
4416         return true;
4417     }
4418 
4419     if (current_ban_settings.enable_ban_for_icmp_pps &&
4420         exceed_pps_speed(average_speed_element->icmp_in_packets, average_speed_element->icmp_out_packets, current_ban_settings.ban_threshold_icmp_pps)) {
4421         logger << log4cpp::Priority::DEBUG  << "We detected this attack by icmp pps limit";
4422         return true;
4423     }
4424 
4425     // Per protocol bandwidth thresholds
4426     if (current_ban_settings.enable_ban_for_tcp_bandwidth &&
4427         exceed_mbps_speed(average_speed_element->tcp_in_bytes, average_speed_element->tcp_out_bytes, current_ban_settings.ban_threshold_tcp_mbps)) {
4428         logger << log4cpp::Priority::DEBUG  << "We detected this attack by tcp mbps limit";
4429         return true;
4430     }
4431 
4432     if (current_ban_settings.enable_ban_for_udp_bandwidth &&
4433         exceed_mbps_speed(average_speed_element->udp_in_bytes, average_speed_element->udp_out_bytes, current_ban_settings.ban_threshold_udp_mbps)) {
4434         logger << log4cpp::Priority::DEBUG  << "We detected this attack by udp mbps limit";
4435         return true;
4436     }
4437 
4438     if (current_ban_settings.enable_ban_for_icmp_bandwidth &&
4439         exceed_mbps_speed(average_speed_element->icmp_in_bytes, average_speed_element->icmp_out_bytes, current_ban_settings.ban_threshold_icmp_mbps)) {
4440         logger << log4cpp::Priority::DEBUG  << "We detected this attack by icmp mbps limit";
4441         return true;
4442     }
4443 
4444     return false;
4445 }
4446 
generate_flow_spec_for_amplification_attack(amplification_attack_type_t amplification_attack_type,std::string destination_ip)4447 std::string generate_flow_spec_for_amplification_attack(amplification_attack_type_t amplification_attack_type, std::string destination_ip) {
4448     exabgp_flow_spec_rule_t exabgp_rule;
4449 
4450     bgp_flow_spec_action_t my_action;
4451 
4452     // We drop all traffic by default
4453     my_action.set_type(FLOW_SPEC_ACTION_DISCARD);
4454 
4455     // Assign action to the rule
4456     exabgp_rule.set_action( my_action );
4457 
4458     // TODO: rewrite!
4459     exabgp_rule.set_destination_subnet( convert_subnet_from_string_to_binary_with_cidr_format( destination_ip + "/32") );
4460 
4461     // We use only UDP here
4462     exabgp_rule.add_protocol(FLOW_SPEC_PROTOCOL_UDP);
4463 
4464     if (amplification_attack_type == AMPLIFICATION_ATTACK_DNS) {
4465         exabgp_rule.add_source_port(53);
4466     } else if (amplification_attack_type == AMPLIFICATION_ATTACK_NTP) {
4467         exabgp_rule.add_source_port(123);
4468     } else if (amplification_attack_type == AMPLIFICATION_ATTACK_SSDP) {
4469         exabgp_rule.add_source_port(1900);
4470     } else if (amplification_attack_type == AMPLIFICATION_ATTACK_SNMP) {
4471         exabgp_rule.add_source_port(161);
4472     } else if (amplification_attack_type == AMPLIFICATION_ATTACK_CHARGEN) {
4473         exabgp_rule.add_source_port(19);
4474     }
4475 
4476     return exabgp_rule.serialize_single_line_exabgp_v4_configuration();
4477 }
4478 
get_amplification_attack_type(amplification_attack_type_t attack_type)4479 std::string get_amplification_attack_type(amplification_attack_type_t attack_type) {
4480     if (attack_type == AMPLIFICATION_ATTACK_UNKNOWN) {
4481         return "unknown";
4482     } else if (attack_type == AMPLIFICATION_ATTACK_DNS) {
4483         return "dns_amplification";
4484     } else if (attack_type == AMPLIFICATION_ATTACK_NTP) {
4485         return "ntp_amplification";
4486     } else if (attack_type == AMPLIFICATION_ATTACK_SSDP) {
4487         return "ssdp_amplification";
4488     } else if (attack_type == AMPLIFICATION_ATTACK_SNMP) {
4489         return "snmp_amplification";
4490     } else if (attack_type == AMPLIFICATION_ATTACK_CHARGEN) {
4491         return "chargen_amplification";
4492     } else {
4493         return "unexpected";
4494     }
4495 }
4496 
4497 // We calculate speed from packet counters here
build_speed_counters_from_packet_counters(map_element & new_speed_element,map_element * vector_itr,double speed_calc_period)4498 inline void build_speed_counters_from_packet_counters(map_element& new_speed_element, map_element* vector_itr, double speed_calc_period) {
4499     // calculate_speed(new_speed_element speed_element, vector_itr* );
4500     new_speed_element.in_packets = uint64_t((double)vector_itr->in_packets / speed_calc_period);
4501     new_speed_element.out_packets = uint64_t((double)vector_itr->out_packets / speed_calc_period);
4502 
4503     new_speed_element.in_bytes = uint64_t((double)vector_itr->in_bytes / speed_calc_period);
4504     new_speed_element.out_bytes = uint64_t((double)vector_itr->out_bytes / speed_calc_period);
4505 
4506     // Fragmented
4507     new_speed_element.fragmented_in_packets =
4508     uint64_t((double)vector_itr->fragmented_in_packets / speed_calc_period);
4509     new_speed_element.fragmented_out_packets =
4510     uint64_t((double)vector_itr->fragmented_out_packets / speed_calc_period);
4511 
4512     new_speed_element.fragmented_in_bytes =
4513     uint64_t((double)vector_itr->fragmented_in_bytes / speed_calc_period);
4514     new_speed_element.fragmented_out_bytes =
4515     uint64_t((double)vector_itr->fragmented_out_bytes / speed_calc_period);
4516 
4517     // By protocol counters
4518 
4519     // TCP
4520     new_speed_element.tcp_in_packets = uint64_t((double)vector_itr->tcp_in_packets / speed_calc_period);
4521     new_speed_element.tcp_out_packets =
4522     uint64_t((double)vector_itr->tcp_out_packets / speed_calc_period);
4523 
4524     new_speed_element.tcp_in_bytes = uint64_t((double)vector_itr->tcp_in_bytes / speed_calc_period);
4525     new_speed_element.tcp_out_bytes = uint64_t((double)vector_itr->tcp_out_bytes / speed_calc_period);
4526 
4527     // TCP syn
4528     new_speed_element.tcp_syn_in_packets =
4529     uint64_t((double)vector_itr->tcp_syn_in_packets / speed_calc_period);
4530     new_speed_element.tcp_syn_out_packets =
4531     uint64_t((double)vector_itr->tcp_syn_out_packets / speed_calc_period);
4532 
4533     new_speed_element.tcp_syn_in_bytes =
4534     uint64_t((double)vector_itr->tcp_syn_in_bytes / speed_calc_period);
4535     new_speed_element.tcp_syn_out_bytes =
4536     uint64_t((double)vector_itr->tcp_syn_out_bytes / speed_calc_period);
4537 
4538     // UDP
4539     new_speed_element.udp_in_packets = uint64_t((double)vector_itr->udp_in_packets / speed_calc_period);
4540     new_speed_element.udp_out_packets =
4541     uint64_t((double)vector_itr->udp_out_packets / speed_calc_period);
4542 
4543     new_speed_element.udp_in_bytes = uint64_t((double)vector_itr->udp_in_bytes / speed_calc_period);
4544     new_speed_element.udp_out_bytes = uint64_t((double)vector_itr->udp_out_bytes / speed_calc_period);
4545 
4546     // ICMP
4547     new_speed_element.icmp_in_packets =
4548     uint64_t((double)vector_itr->icmp_in_packets / speed_calc_period);
4549     new_speed_element.icmp_out_packets =
4550     uint64_t((double)vector_itr->icmp_out_packets / speed_calc_period);
4551 
4552     new_speed_element.icmp_in_bytes = uint64_t((double)vector_itr->icmp_in_bytes / speed_calc_period);
4553     new_speed_element.icmp_out_bytes = uint64_t((double)vector_itr->icmp_out_bytes / speed_calc_period);
4554 }
4555 
build_average_speed_counters_from_speed_counters(map_element * current_average_speed_element,map_element & new_speed_element,double exp_value,double exp_power)4556 inline void build_average_speed_counters_from_speed_counters(
4557     map_element* current_average_speed_element,
4558     map_element& new_speed_element,
4559     double exp_value,
4560     double exp_power) {
4561 
4562     // Global bytes counters
4563     current_average_speed_element->in_bytes = uint64_t(
4564         new_speed_element.in_bytes +
4565         exp_value * ((double)current_average_speed_element->in_bytes - (double)new_speed_element.in_bytes));
4566 
4567     current_average_speed_element->out_bytes = uint64_t(
4568         new_speed_element.out_bytes +
4569         exp_value * ((double)current_average_speed_element->out_bytes - (double)new_speed_element.out_bytes));
4570 
4571     // Global packet counters
4572     current_average_speed_element->in_packets = uint64_t(
4573         new_speed_element.in_packets +
4574         exp_value * ((double)current_average_speed_element->in_packets - (double)new_speed_element.in_packets));
4575 
4576     current_average_speed_element->out_packets = uint64_t(
4577         new_speed_element.out_packets +
4578         exp_value * ((double)current_average_speed_element->out_packets - (double)new_speed_element.out_packets));
4579 
4580     // Per packet type packet counters for in traffic
4581    current_average_speed_element->fragmented_in_packets = uint64_t(
4582         new_speed_element.fragmented_in_packets +
4583         exp_value * ((double)current_average_speed_element->fragmented_in_packets - (double)new_speed_element.fragmented_in_packets));
4584 
4585     current_average_speed_element->tcp_in_packets = uint64_t(
4586         new_speed_element.tcp_in_packets +
4587         exp_value * ((double)current_average_speed_element->tcp_in_packets - (double)new_speed_element.tcp_in_packets));
4588 
4589     current_average_speed_element->tcp_syn_in_packets = uint64_t(
4590         new_speed_element.tcp_syn_in_packets +
4591         exp_value * ((double)current_average_speed_element->tcp_syn_in_packets - (double)new_speed_element.tcp_syn_in_packets));
4592 
4593     current_average_speed_element->udp_in_packets = uint64_t(
4594         new_speed_element.udp_in_packets +
4595         exp_value * ((double)current_average_speed_element->udp_in_packets - (double)new_speed_element.udp_in_packets));
4596 
4597     current_average_speed_element->icmp_in_packets = uint64_t(
4598         new_speed_element.icmp_in_packets +
4599         exp_value * ((double)current_average_speed_element->icmp_in_packets - (double)new_speed_element.icmp_in_packets));
4600 
4601     // Per packet type packets counters for out
4602     current_average_speed_element->fragmented_out_packets = uint64_t(
4603         new_speed_element.fragmented_out_packets +
4604         exp_value * ((double)current_average_speed_element->fragmented_out_packets - (double)new_speed_element.fragmented_out_packets));
4605 
4606     current_average_speed_element->tcp_out_packets = uint64_t(
4607         new_speed_element.tcp_out_packets +
4608         exp_value * ((double)current_average_speed_element->tcp_out_packets - (double)new_speed_element.tcp_out_packets));
4609 
4610     current_average_speed_element->tcp_syn_out_packets = uint64_t(
4611         new_speed_element.tcp_syn_out_packets +
4612         exp_value * ((double)current_average_speed_element->tcp_syn_out_packets - (double)new_speed_element.tcp_syn_out_packets));
4613 
4614     current_average_speed_element->udp_out_packets = uint64_t(
4615         new_speed_element.udp_out_packets +
4616         exp_value * ((double)current_average_speed_element->udp_out_packets - (double)new_speed_element.udp_out_packets));
4617 
4618     current_average_speed_element->icmp_out_packets = uint64_t(
4619         new_speed_element.icmp_out_packets +
4620         exp_value * ((double)current_average_speed_element->icmp_out_packets - (double)new_speed_element.icmp_out_packets));
4621 
4622     // Per packet type bytes counter for out
4623     current_average_speed_element->fragmented_out_bytes = uint64_t(
4624         new_speed_element.fragmented_out_bytes +
4625         exp_value * ((double)current_average_speed_element->fragmented_out_bytes - (double)new_speed_element.fragmented_out_bytes));
4626 
4627     current_average_speed_element->tcp_out_bytes = uint64_t(
4628         new_speed_element.tcp_out_bytes +
4629         exp_value * ((double)current_average_speed_element->tcp_out_bytes - (double)new_speed_element.tcp_out_bytes));
4630 
4631     current_average_speed_element->tcp_syn_out_bytes = uint64_t(
4632         new_speed_element.tcp_syn_out_bytes +
4633         exp_value * ((double)current_average_speed_element->tcp_syn_out_bytes - (double)new_speed_element.tcp_syn_out_bytes));
4634 
4635     current_average_speed_element->udp_out_bytes = uint64_t(
4636         new_speed_element.udp_out_bytes +
4637         exp_value * ((double)current_average_speed_element->udp_out_bytes - (double)new_speed_element.udp_out_bytes));
4638 
4639     current_average_speed_element->icmp_out_bytes = uint64_t(
4640         new_speed_element.icmp_out_bytes +
4641         exp_value * ((double)current_average_speed_element->icmp_out_bytes - (double)new_speed_element.icmp_out_bytes));
4642 
4643     // Per packet type bytes counter for in
4644     current_average_speed_element->fragmented_in_bytes = uint64_t(
4645         new_speed_element.fragmented_in_bytes +
4646         exp_value * ((double)current_average_speed_element->fragmented_in_bytes - (double)new_speed_element.fragmented_in_bytes));
4647 
4648     current_average_speed_element->tcp_in_bytes = uint64_t(
4649         new_speed_element.tcp_in_bytes +
4650         exp_value * ((double)current_average_speed_element->tcp_in_bytes - (double)new_speed_element.tcp_in_bytes));
4651 
4652     current_average_speed_element->tcp_syn_in_bytes = uint64_t(
4653         new_speed_element.tcp_syn_in_bytes +
4654         exp_value * ((double)current_average_speed_element->tcp_syn_in_bytes - (double)new_speed_element.tcp_syn_in_bytes));
4655 
4656     current_average_speed_element->udp_in_bytes = uint64_t(
4657         new_speed_element.udp_in_bytes +
4658         exp_value * ((double)current_average_speed_element->udp_in_bytes - (double)new_speed_element.udp_in_bytes));
4659 
4660     current_average_speed_element->icmp_in_bytes = uint64_t(
4661         new_speed_element.icmp_in_bytes +
4662         exp_value * ((double)current_average_speed_element->icmp_in_bytes - (double)new_speed_element.icmp_in_bytes));
4663 }
4664 
4665