1#!/usr/bin/perl 2 3use strict; 4use threads; 5use threads::shared; 6use Sys::Syslog; 7use Data::Dumper; 8use Getopt::Long; 9use POSIX; 10use IO::Socket::Multicast; 11use JSON; 12 13# Native Maxmind library - http://www.maxmind.com/download/geoip/api/perl/ 14# requires: http://www.maxmind.com/app/c 15use Geo::IP; 16 17# set these to the same port and multicast (or unicast) address as the detector 18use constant GROUP => '226.1.1.2'; 19use constant PORT => '2000'; 20 21my %ipc_source :shared; 22my %ipc_customer :shared; 23my $time_to_die :shared = 0; 24my $debug; 25my $foreground=0; 26 27# determines how often you want to aggregage and write-out stats dumps 28my $interval = 60; 29 30# you can get the binary format GeoLiteCity.dat from Maxmind 31# http://www.maxmind.com/app/geolitecity 32my $gi = Geo::IP->open("/usr/local/GeoLiteCity.dat",GEOIP_MEMORY_CACHE | GEOIP_CHECK_CACHE); 33 34# adjust this to the path where you want to keep the 35sub PATH {'/tmp/'} 36 37$|=1; 38 39GetOptions( 40 "debug" => \$debug, 41 "foreground" => \$foreground, 42 "interval=s" => \$interval, 43); 44 45 46main(); 47exit(); 48 49sub main() { 50 51 # daemonize unless running in foreground 52 unless ($foreground){ 53 daemonize(); 54 } 55 56 # prepare data acquisition thread 57 threads->new(\&get_data); 58 59 while (! $time_to_die ) { 60 61 # record time started to help evenly space runs 62 my $start_run = time(); 63 my $next_run = $start_run + $interval; 64 65 # de-serialize latest copy of source address structure 66 # execute this in a isolated scope so that lock goes out of scope 67 { 68 my $source_distance; 69 70 # lock data structure to prevent other thread from updating it 71 lock(%ipc_source); 72 73 # open coordinates file for graph generation 74 open(CRDS, ">".PATH."/coords.txt.tmp"); 75 76 # calculate great circle distance between each source IP and local POP 77 foreach my $key (keys %ipc_source) { 78 79 eval { 80 my $r = $gi->record_by_addr($key); 81 82 # write raw entry to coordinates file 83 print CRDS $key.",".$ipc_source{$key}.",".$r->latitude.",".$r->longitude."\n"; 84 }; 85 if ($@) { 86 print CRDS $key.",".$ipc_source{$key}.",0,0\n"; 87 } 88 } 89 90 # close coordinate file 91 close CRDS; 92 system("mv ".PATH."/coords.txt.tmp ".PATH."/coords.txt"); 93 94 # clean out structure for next sample period 95 %ipc_source = (); 96 } 97 98 # sleep to make the interval 99 while((my $time_left = ($next_run - time())) > 0) { 100 sleep($time_left); 101 } 102 } 103 threads->join(); 104 return; 105} 106 107# fetch data from UDP multicast 108sub get_data() { 109 110 # set up our multicast listener 111 # note: this will receive unicast fine too 112 my $sock = IO::Socket::Multicast->new(LocalPort=>PORT,ReuseAddr=>1); 113 $sock->mcast_add(GROUP) || die "Couldn't set group: $!\n"; 114 115 116 while ( ! $time_to_die ) { 117 my $data; 118 next unless $sock->recv($data,1500); 119 120 # decode JSON 121 eval { 122 my $obj = decode_json $data; 123 print Dumper $obj; 124 foreach my $ip (keys %{$obj->{data}}) { 125 my $count = $obj->{data}->{$ip}; 126 lock(%ipc_source); 127 $ipc_source{$ip}+=$count; 128 } 129 }; 130 131 } 132 133 # done! 134 threads->exit(); 135} 136 137# daemonize application 138sub daemonize { 139 140 chdir '/' or die "Can't chdir to /: $!"; 141 open STDIN, '/dev/null' or die "Can't read /dev/null: $!"; 142 open STDOUT, '>/dev/null'; 143 144 # fork and exit parent 145 my $pid = fork(); 146 exit if $pid; 147 die "Couldn't fork: $!" unless defined ($pid); 148 POSIX::setsid() || die ("$0 can't start a new session: $!"); 149 open STDERR, '>&STDOUT' or die "Can't dup stdout: $!"; 150 151 # signal handlers 152 $SIG{KILL} = \&handler; 153} 154 155sub handler { 156 $time_to_die = 1; 157} 158