1#!/usr/bin/perl
2
3use strict;
4use threads;
5use threads::shared;
6use Sys::Syslog;
7use Data::Dumper;
8use Getopt::Long;
9use POSIX;
10use IO::Socket::Multicast;
11use JSON;
12
13# Native Maxmind library - http://www.maxmind.com/download/geoip/api/perl/
14# requires: http://www.maxmind.com/app/c
15use Geo::IP;
16
17# set these to the same port and multicast (or unicast) address as the detector
18use constant GROUP => '226.1.1.2';
19use constant PORT  => '2000';
20
21my %ipc_source :shared;
22my %ipc_customer :shared;
23my $time_to_die :shared = 0;
24my $debug;
25my $foreground=0;
26
27# determines how often you want to aggregage and write-out stats dumps
28my $interval = 60;
29
30# you can get the binary format GeoLiteCity.dat from Maxmind
31# http://www.maxmind.com/app/geolitecity
32my $gi = Geo::IP->open("/usr/local/GeoLiteCity.dat",GEOIP_MEMORY_CACHE | GEOIP_CHECK_CACHE);
33
34# adjust this to the path where you want to keep the
35sub PATH {'/tmp/'}
36
37$|=1;
38
39GetOptions(
40  "debug" => \$debug,
41  "foreground" => \$foreground,
42  "interval=s" => \$interval,
43);
44
45
46main();
47exit();
48
49sub main() {
50
51  # daemonize unless running in foreground
52  unless ($foreground){
53    daemonize();
54  }
55
56  # prepare data acquisition thread
57  threads->new(\&get_data);
58
59  while (! $time_to_die ) {
60
61    # record time started to help evenly space runs
62    my $start_run = time();
63    my $next_run = $start_run + $interval;
64
65    # de-serialize latest copy of source address structure
66    # execute this in a isolated scope so that lock goes out of scope
67    {
68      my $source_distance;
69
70      # lock data structure to prevent other thread from updating it
71      lock(%ipc_source);
72
73      # open coordinates file for graph generation
74      open(CRDS, ">".PATH."/coords.txt.tmp");
75
76      # calculate great circle distance between each source IP and local POP
77      foreach my $key (keys %ipc_source) {
78
79        eval {
80        my $r = $gi->record_by_addr($key);
81
82        # write raw entry to coordinates file
83        print CRDS $key.",".$ipc_source{$key}.",".$r->latitude.",".$r->longitude."\n";
84        };
85        if ($@) {
86          print CRDS $key.",".$ipc_source{$key}.",0,0\n";
87        }
88      }
89
90      # close coordinate file
91      close CRDS;
92      system("mv ".PATH."/coords.txt.tmp ".PATH."/coords.txt");
93
94      # clean out structure for next sample period
95      %ipc_source = ();
96    }
97
98    # sleep to make the interval
99    while((my $time_left = ($next_run - time())) > 0) {
100      sleep($time_left);
101    }
102  }
103  threads->join();
104  return;
105}
106
107# fetch data from UDP multicast
108sub get_data() {
109
110  # set up our multicast listener
111  # note: this will receive unicast fine too
112  my $sock = IO::Socket::Multicast->new(LocalPort=>PORT,ReuseAddr=>1);
113  $sock->mcast_add(GROUP) || die "Couldn't set group: $!\n";
114
115
116  while (  ! $time_to_die  ) {
117    my $data;
118    next unless $sock->recv($data,1500);
119
120    # decode JSON
121    eval {
122      my $obj = decode_json $data;
123      print Dumper $obj;
124      foreach my $ip (keys %{$obj->{data}}) {
125        my $count = $obj->{data}->{$ip};
126        lock(%ipc_source);
127        $ipc_source{$ip}+=$count;
128      }
129    };
130
131  }
132
133  # done!
134  threads->exit();
135}
136
137# daemonize application
138sub daemonize {
139
140  chdir '/' or die "Can't chdir to /: $!";
141  open STDIN, '/dev/null' or die "Can't read /dev/null: $!";
142  open STDOUT, '>/dev/null';
143
144  # fork and exit parent
145  my $pid = fork();
146  exit if $pid;
147  die "Couldn't fork: $!" unless defined ($pid);
148  POSIX::setsid() || die ("$0 can't start a new session: $!");
149  open STDERR, '>&STDOUT' or die "Can't dup stdout: $!";
150
151  # signal handlers
152  $SIG{KILL} = \&handler;
153}
154
155sub handler {
156  $time_to_die = 1;
157}
158