1#!/usr/bin/env perl
2
3=head1 NAME
4
5tstatd - Logs real-time accounting daemon
6
7SYNOPSIS
8
9tstatd [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN
10
11OPTIONS
12
13=over
14
15=item -a I<zone>, --agregate-zone=I<zone>
16
17Agregate data from all anonymous logs (wildcards without explicit
18zone specified) into I<zone>. Default behavior is to create new
19zone for each anonymous log from its file name.
20
21=item -b I<file>, --database-file=I<file>
22
23Use I<file> as persistent storage to keep accumulated data across
24daemon restarts. Default is auto generated from daemon name,
25specified identity and '.db' suffix.
26
27=item --basename
28
29Use only base name (excluding directories and suffix) of anonymous log file
30for auto-created zones.
31
32=item -c I<dir>, --change-dir=I<dir>
33
34Change current directory to I<dir> before wildcards expanding.
35
36=item -d, --debug
37
38Composition of options: C<--foreground> and C<--log-level=debug>.
39
40=item -f, --foreground
41
42Don't detach daemon from control terminal, logging to C<stderr> instead
43log file or syslog.
44
45=item --log-facility=I<name>
46
47Use I<name> as C<facility> for syslog logging (see syslog (3) for list
48of available values). Default is 'daemon'.
49
50=item --log-level=I<level>
51
52Set minimal logging level to I<level> (see syslog (3) for list of available
53values). Default is 'notice'.
54
55=item --log-file=I<file>
56
57Use logging to I<file> instead of syslog logging (which is default).
58
59=item -e I<num>, --expand-period=I<num>
60
61Do wildcards re-expanding and checking for new and missed logs
62every I<num> seconds. Default is '60'.
63
64=item -h, --help
65
66Print brief help message about available options.
67
68=item -i I<string>, --identity=I<string>
69
70Just a string used in title of daemon process, syslog ident (see syslog(3)),
71C<--database-file> and C<--pid-file>. Idea behind this options - multiple
72C<tstatd> instances running simultaneosly.
73
74=item -l [I<address>:]I<port>, --listen=[I<address>:]I<port>
75
76Specify I<address> and I<port> for TCP listen socket binding.
77Default is '127.0.0.1:3638'.
78
79=item --multiple
80
81With this option specified same log file could be included into several
82zones (if log name satisifies several wildcards). Default behavior is to
83include log file only in first satisified zone.
84
85=item -n I<num>, --windows-num=I<num>
86
87Set number of sliding-windows to I<num>. Default is '60'.
88
89=item -o I<string>, --options=I<string>
90
91Comma-separated plugin supported options (like a mount (8) options).
92
93=item --override-from=I<file>
94
95Load content of I<file> into plugin package namespace.
96This is way to easy customize plugin behavior without creating
97another plugin.
98
99=item -p I<file>, --pid-file=I<file>
100
101Use I<file> to keep daemon process id. Default is auto generated
102from daemon name, specified identity and '.pid' suffix.
103
104=item --parse-error=I<level>
105
106Do logging with I<level> (see syslog (3) for available values) about
107all unparsed log lines. Hint: use 'none' for ignoring such lines.
108Default is defining by plugin and usually is 'debug'.
109
110=item -r I<pattern>, --regex=I<pattern>
111
112Use I<pattern> instead of plugin default regular expression for
113matching log lines.
114
115=item --regex-from=I<file>
116
117Load regular expression from I<file> and use instead of plugin default
118regular expression for matching log lines.
119
120=item -s I<num>, --store-period=I<num>
121
122Store accumulated data in a persistent storage every I<num> seconds.
123Default is '60'.
124
125=item --timer=I<zone>:I<timer>:I<num>
126
127Create named I<timer> firing every I<num> seconds for I<zone>.
128
129=item -u <user>, --user=I<user>
130
131Change effective privileges of daemon process to I<user>.
132
133=item -v, --version
134
135Print version information of C<tstatd> and exit.
136
137=item -w I<num>, --window-size=<num>
138
139Set size (duration) of sliding window to I<num> seconds.
140Default is '10'.
141
142=back
143
144
145=head1 SEE ALSO
146
147L<Tail::Stat>
148
149
150=head1 AUTHOR
151
152Oleg A. Mamontov, C<< <oleg@mamontov.net> >>
153
154
155=head1 COPYRIGHT & LICENSE
156
157This program is free software; you can redistribute it and/or modify it
158under the terms of either: the GNU General Public License as published
159by the Free Software Foundation; or the Artistic License.
160
161See http://dev.perl.org/licenses/ for more information.
162
163
164=cut
165
166use strict;
167use warnings qw(all);
168
169use Cwd qw(getcwd realpath);
170use DateTime;
171use File::Basename qw(fileparse);
172use FindBin;
173use Getopt::Long qw(:config no_auto_abbrev bundling);
174use JSON::XS;
175use List::Util qw(min);
176use Log::Dispatch;
177use Log::Dispatch::File;
178use Log::Dispatch::Screen;
179use Log::Dispatch::Syslog;
180use Pid::File::Flock;
181use POE qw(Wheel::FollowTail Wheel::ListenAccept Wheel::ReadWrite);
182use POSIX qw(setsid setuid strftime);
183use Socket;
184use Tail::Stat;
185use Tie::Hash::Indexed;
186
187# parse command line
188my %opts;
189GetOptions(\%opts, qw/
190	agregate-zone|a=s
191	basename
192	database-file|b=s
193	change-dir|c=s
194	debug|d
195	foreground|f
196	log-facility=s
197	log-file=s
198	log-level=s
199	expand-period|e=i
200	help|h
201	identity|i=s
202	listen|l=s
203	windows-num|n=i
204	multiple
205	options|o=s
206	override-from=s@
207	parse-error=s
208	pid-file|p=s
209	regex|r=s
210	regex-from=s
211	timer=s@
212	store-period|s=i
213	user|u=s
214	version|v
215	window-size|w=i
216/) or die usage();
217
218# explicitly requested help
219die usage() if $opts{help};
220
221# version requested
222print version() and exit if $opts{version};
223
224# no arguments
225die usage() if @ARGV < 2;
226
227# try to load requested plugin
228my $pname  = shift @ARGV;
229my $pclass = "Tail::Stat::Plugin::$pname";
230eval "require $pclass" or die "can't load plugin '$pname': $@\n";
231
232
233# parameters defaults & validation
234if (exists $opts{'agregate-zone'}) {
235	die "invalid zone: '$opts{'agregate-zone'}'\n"
236		if $opts{'agregate-zone'} =~ /[^a-z0-9_-]/;
237}
238
239if (exists $opts{identity}) {
240	die "invalid identity: '$opts{identity}'\n" if $opts{identity} =~ /[^\w]/;
241}
242
243$opts{'database-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.db';
244$opts{'database-file'} = realpath $opts{'database-file'};
245
246if (exists $opts{'change-dir'}) {
247	die "no such directory: '$opts{'change-dir'}'\n"
248		unless -d $opts{'change-dir'};
249}
250
251$opts{'log-facility'} ||= 'daemon';
252die "invalid log facility: '$opts{'log-facility'}'\n"
253	unless $opts{'log-facility'} =~ /^(auth|authpriv|cron|daemon|kern|local[0-7]|mail|news|syslog|user|uucp)$/;
254
255$opts{'log-level'} ||= 'notice';
256die "invalid log level: '$opts{'log-level'}'\n"
257	unless $opts{'log-level'} =~ /^(debug|info|notice|warning|error|critical|alert|emergency)$/;
258
259if ($opts{debug}) {
260	$opts{'log-level'} = 'debug';
261	delete $opts{'log-file'};
262	$opts{foreground}  = 1;
263}
264
265$opts{'log-file'} = realpath $opts{'log-file'} if exists $opts{'log-file'};
266
267$opts{'expand-period'} = 60 unless exists $opts{'expand-period'};
268die "invalid expand period: '$opts{'expand-period'}'\n"
269	if $opts{'expand-period'} =~ /[^\d]/;
270
271$opts{'listen'} ||= '127.0.0.1:3638';
272
273$opts{'windows-num'} = 60  unless exists $opts{'windows-num'};
274die "invalid windows number: '$opts{'windows-num'}'\n"
275	if $opts{'windows-num'} =~ /[^\d]/;
276
277$opts{'store-period'} = 10 unless exists $opts{'store-period'};
278die "invalid store period: '$opts{'store-period'}'\n"
279	if $opts{'store-period'} =~ /[^\d]/;
280
281$opts{'window-size'} = 10 unless exists $opts{'window-size'};
282die "invalid window size: '$opts{'window-size'}'\n"
283	if $opts{'window-size'} =~ /[^\d]/;
284
285$opts{'parse-error'} ||= $pclass->parse_error;
286die "invalid parse error: '$opts{'parse-error'}'\n"
287	unless $opts{'parse-error'} =~ /^(debug|info|none|notice|warning|error|critical|alert|emergency)$/;
288
289$opts{'pid-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.pid';
290$opts{'pid-file'} = realpath $opts{'pid-file'};
291
292if (exists $opts{'regex-from'}) {
293	die "options regex and regex-from are mutually exclusive\n"
294		if exists $opts{regex};
295	local $/;
296	open FH, $opts{'regex-from'} or
297		die "can't read regex: $!\n";
298	$opts{regex} = <FH>;
299}
300
301# loading overrides
302for ( @{ $opts{'override-from'} } ) {
303	local $/;
304	open FH, $_ or
305		die "can't read override from '$_': $!\n";
306	eval "package $pclass; use strict; use warnings qw(all); ".<FH>;
307	die "can't apply overrides from '$_': $@\n" if $@;
308}
309
310defined (my $uid = $opts{'user'} ? getpwnam($opts{'user'}) : $>) or
311	die "unknown user: $opts{'user'}\n";
312
313# grouping log files by zones, order does matter
314# due to support '--multiple' option
315my %zones;
316tie %zones, 'Tie::Hash::Indexed';
317for (@ARGV) {
318	/^([\w\d\_-]+):(.+)/ && do {
319		push @{$zones{$1}}, $2;
320		next;
321	};
322	push @{$zones{
323		$opts{'agregate-zone'} ||
324		( $opts{basename} ? fileparse($_,qr/\.[^\.]+/) : $_ )
325	}}, $_;
326}
327
328# parsing timers
329my %timers;
330my %units = (
331	w => [ 'week',   7 * 86_400 ],
332	d => [ 'day',    86_400 ],
333	h => [ 'hour',   3_600 ],
334	m => [ 'minute', 60 ],
335	s => [ 'second', 1 ],
336);
337for (@{ $opts{timer} }) {
338	my ($z,$n,$p,$u) = /^(\S+):(\S+):(\d+)(w|d|h|m|s)?$/ or
339		die "invalid timer format: $_\n";
340	die "no such zone '$z' for timer '$_'\n" unless exists $zones{$z};
341	die "zone '$z' already has timer '$n'\n" if exists $timers{$z}{$n};
342
343	$u ||= 's';
344	$timers{$z}{$n} = [ $p * $units{$u}[1], $units{$u}[0] ];
345}
346
347# listen socket
348my $sock = IO::Socket::INET->new(
349	(
350		$opts{'listen'} =~ /:/ ?
351		( LocalAddr => $opts{'listen'} ) :
352		( LocalPort => $opts{'listen'} )
353	),
354	Listen    => SOMAXCONN,
355	ReuseAddr => 1,
356) or die "can't create listen socket: $!\n";
357
358# set process privileges
359setuid $uid or die "can't setuid to $opts{'user'}: $!\n" unless $uid == $>;
360
361# set process title
362$0 = $FindBin::RealScript.': '.$pname.($opts{identity} ? ' ['.$opts{identity}.']' : '');
363
364# fork
365unless ($opts{foreground}) {
366	defined(my $pid = fork) or die "can't fork: $!\n";
367	exit if $pid;
368}
369
370# protecting against second instance running
371Pid::File::Flock->new($opts{'pid-file'}) unless $opts{foreground};
372
373# daemonize
374unless ($opts{foreground}) {
375	chdir '/' or die "can't chdir: $!\n";
376	die "can't create new session: $!\n" if setsid == -1;
377	open STDIN,  '</dev/null' or die "can't close stdin\n";
378	open STDOUT, '>/dev/null' or die "can't close stdout\n";
379	open STDERR, '>/dev/null' or die "can't close stderr\n";
380}
381
382# logger
383(my $log = Log::Dispatch->new)->add(logger());
384$log->notice("starting up");
385
386# catch perl warnings
387$SIG{__WARN__} = sub { $log->warning(@_) };
388
389# main POE session
390POE::Session->create(
391	inline_states => {
392
393		# initializing
394		_start => sub {
395			$log->debug("initializing POE session");
396
397			# talk POE kernel adjust to the new situation
398			$_[KERNEL]->has_forked unless $opts{foreground};
399
400			# signals
401			$log->debug("setting up signal handlers");
402			$_[KERNEL]->sig(HUP  => 'hangup');
403			$_[KERNEL]->sig(INT  => 'interrupt');
404			$_[KERNEL]->sig(TERM => 'terminate');
405			$_[KERNEL]->sig(USR1 => 'rotate');
406
407			# statistics server
408			$log->debug("creating TCP server");
409			$_[HEAP]->{server} = POE::Wheel::ListenAccept->new(
410				Handle      => $sock,
411				AcceptEvent => 'server_accept',
412				ErrorEvent  => 'server_error',
413			);
414
415			# serializer
416			$log->debug("creating serializer");
417			$_[HEAP]->{serial} = JSON::XS->new->pretty;
418
419			# creating plugin instance
420			my %popts;
421			for (split /,/, $opts{options} || '') {
422				my ($k,$v) = split /=/;
423				$popts{$k} = defined $v ? $v : 1;
424			}
425			$popts{regex} = $opts{regex} if exists $opts{regex};
426			$_[HEAP]->{plugin} = $pclass->new(%popts);
427
428			# setting up zones
429			$_[HEAP]->{zones} = \%zones;
430
431			# load previous data
432			if (-f $opts{'database-file'}) {
433				$_[KERNEL]->call($_[SESSION], 'do_load') or return;
434			}
435
436			# create insufficient references
437			for (keys %zones) {
438				$_[HEAP]->{data}{$_}{public}     ||= {};
439				$_[HEAP]->{data}{$_}{private}    ||= {};
440				$_[HEAP]->{data}{$_}{windows}    ||= [];
441				$_[HEAP]->{data}{$_}{windows}[0] ||= {};
442
443				# call plugin initialization code
444				$_[HEAP]->{plugin}->init_zone(
445					$_,
446					$_[HEAP]->{data}{$_}{public},
447					$_[HEAP]->{data}{$_}{private},
448					$_[HEAP]->{data}{$_}{windows}[0],
449				);
450			}
451
452			# expanding zones wildcards
453			$_[KERNEL]->call($_[SESSION], 'do_expand');
454
455			# creating named timers
456			$_[HEAP]->{timers} = \%timers;
457			for my $z ( keys %{ $_[HEAP]->{timers} } ) {
458				for my $n ( keys %{ $_[HEAP]->{timers}{$z} } ) {
459					$_[KERNEL]->call($_[SESSION], 'set_timer', $z, $n);
460				}
461			}
462
463			# schedule save task
464			if ($opts{'store-period'}) {
465				$log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)");
466				$_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} );
467			}
468
469			# schedule expanding wildcards
470			if ($opts{'expand-period'}) {
471				$log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)");
472				$_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} );
473			}
474
475			# schedule windows heartbeat
476			if ($opts{'window-size'} && $opts{'windows-num'}) {
477				$log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)");
478				$_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} );
479			}
480		},
481
482		# expanding task
483		expand_heartbeat => sub {
484			$log->debug("wildcards expanding heartbeat occurred");
485
486			# expanding zones wildcards
487			$_[KERNEL]->call($_[SESSION], 'do_expand');
488
489			# schedule next call
490			if ($opts{'expand-period'}) {
491				$log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)");
492				$_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} );
493			}
494		},
495
496		# setting named timer
497		set_timer => sub {
498			my $t = $_[HEAP]->{timers}{$_[ARG0]}{$_[ARG1]};
499			my $next = DateTime->now(
500				time_zone => 'local'
501			)->add(
502				seconds => $t->[0]
503			)->truncate(
504				to => $t->[1]
505			);
506			$log->debug("setting named timer '$_[ARG1]' for zone '$_[ARG0] at '".$next->strftime('%Y-%m-%d %H:%M:%S')."'");
507			$_[KERNEL]->alarm_set( named_timer => $next->epoch, $_[ARG0], $_[ARG1] );
508		},
509
510		# named timer handler
511		named_timer => sub {
512			$log->debug("processing named timer '$_[ARG1]' for zone '$_[ARG0]");
513			$_[HEAP]->{plugin}->process_timer(
514				$_[ARG1],
515				$_[HEAP]->{data}{$_[ARG0]}{public},
516				$_[HEAP]->{data}{$_[ARG0]}{private},
517				$_[HEAP]->{data}{$_[ARG0]}{windows}
518			) ? do {
519				$log->debug("renewing timer '$_[ARG1]' for zone '$_[ARG0]");
520				$_[KERNEL]->call($_[SESSION], 'set_timer', $_[ARG0], $_[ARG1]);
521			} : do {
522				$log->debug("clearing timer '$_[ARG1]' for zone '$_[ARG0]'");
523			};
524		},
525
526		# expanding wildcards
527		do_expand => sub {
528			$log->debug("begin expanding wildcards");
529
530			my $cwd = getcwd;
531			if ($opts{'change-dir'}) {
532				chdir $opts{'change-dir'} or
533					$log->warning("can't change directory to '$opts{'change-dir'}'");
534			}
535
536			my %exif;  # existing files
537			for my $zone ( keys %{ $_[HEAP]->{zones} } ) {
538				my @files;
539				push @files, map { realpath $_ } grep { -f } glob $_
540					for @{ $_[HEAP]->{zones}{$zone} };
541				$log->debug("found ".scalar(@files)." file(s) in zone '$zone'");
542
543				# create missing watchers
544				FILE:
545				for my $f (@files) {
546					$exif{$f}++;
547
548					# searching for already monitored file
549					for my $w ( values %{ $_[HEAP]->{watchers} } ) {
550						next unless $f eq $w->[0];
551
552						unless ($opts{multiple}) {
553							$log->debug("file '$f' already monitored, ignoring for zone '$zone'");
554							next FILE;
555						}
556
557						# searching for already subscribed zone
558						for my $z ( @{ $w->[1] } ) {
559							next unless $z eq $zone;
560							$log->debug("zone already subscribed for '$f'");
561							next FILE;
562						}
563
564						# subscribe to existing watcher
565						push @{ $w->[1] }, $zone;
566						$log->debug("zone subscribed for '$f'");
567						next FILE;
568					}
569
570					# create new watcher and subscribe zone
571					my $w = POE::Wheel::FollowTail->new(
572						Filename   => $f,
573						Filter     => POE::Filter::Line->new( InputLiteral => "\n" ),
574						ErrorEvent => 'watcher_err',
575						InputEvent => 'watcher_line',
576						ResetEvent => 'watcher_roll',
577					);
578					$log->debug("created new watcher [".$w->ID."] for '$f'");
579					$log->debug("zone subscribed for '$f'");
580					$_[HEAP]->{watchers}{$w->ID} = [ $f, [ $zone ], $w ];
581				}
582			}
583
584			# remove excess watchers
585			for my $w (values %{ $_[HEAP]->{watchers} } ) {
586				next if $exif{$w->[0]};  # file exists
587
588				delete $_[HEAP]->{watchers}{$w->[2]->ID};
589				$log->debug("excess watcher [".$w->[2]->ID."] removed for '".$w->[0]."'");
590			}
591
592			if ($opts{'change-dir'}) {
593				chdir $cwd or
594					$log->warning("can't restore directory to '$cwd'");
595			}
596		},
597
598		# new log line
599		watcher_line => sub {
600			my $w = $_[HEAP]->{watchers}{$_[ARG1]};
601			my @data = $_[HEAP]->{plugin}->process_line($_[ARG0]) or do {
602				return $log->log(
603					level   => $opts{'parse-error'},
604					message => "can't parse: '$_[ARG0]' from '$w->[0]'"
605				) unless $opts{'parse-error'} eq 'none';
606			};
607			# subscribers loop
608			for my $z ( @{ $w->[1] } ) {
609				$_[HEAP]->{plugin}->process_data(
610					\@data,
611					$_[HEAP]->{data}{$z}{public},
612					$_[HEAP]->{data}{$z}{private},
613					$_[HEAP]->{data}{$z}{windows}[0]
614				);
615			}
616		},
617
618		# log rotating occurred
619		watcher_roll => sub {
620			my $w = $_[HEAP]->{watchers}{$_[ARG0]};
621			# clear tail fragment
622			$w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ] =~ s/[^\n]+\z//
623				if $w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ];
624			$log->info("rolled over '$w->[0]'");
625		},
626
627		# log tailing error
628		watcher_err => sub {
629			my $w = $_[HEAP]->{watchers}{$_[ARG3]};
630			$log->error("$_[ARG0] failed ($_[ARG1] during tail '$w->[0]': $_[ARG2]");
631		},
632
633		# windows processing
634		windows_heartbeat => sub {
635			$log->debug("windows heartbeat occurred");
636
637			# schedule windows processing
638			for ( keys %{ $_[HEAP]->{zones} } ) {
639				$_[KERNEL]->yield( do_window => $_ );
640			}
641
642			# schedule next call
643			$log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)");
644			$_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} );
645		},
646
647		do_window => sub {
648			# windows ring
649			my $wins = $_[HEAP]->{data}{$_[ARG0]}{windows};
650
651			# call plugin handler with last complete window
652			$_[HEAP]->{plugin}->process_window(
653				$_[HEAP]->{data}{$_[ARG0]}{public},
654				$_[HEAP]->{data}{$_[ARG0]}{private},
655				$wins
656			);
657
658			# slide windows
659			unshift @$wins, {};
660			$#$wins = $opts{'windows-num'} - 1
661				if $#$wins > $opts{'windows-num'} - 1;
662		},
663
664		# periodically task
665		save_heartbeat => sub {
666			$log->debug("saving heartbeat occurred");
667
668			# save accumulated data
669			$_[KERNEL]->call($_[SESSION], 'do_save');
670
671			# schedule next call
672			$log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)");
673			$_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} );
674		},
675
676		# loading stored data
677		do_load => sub {
678			$log->debug("loading stored data");
679			open FH, $opts{'database-file'} or do {
680				$log->error("can't open database file: $!");
681				return $_[KERNEL]->call($_[SESSION], 'shutdown');
682			};
683			local $/;
684			my $d = $_[HEAP]->{serial}->decode(<FH>) or do {
685				$log->error("can't read database file: $!");
686				return $_[KERNEL]->call($_[SESSION], 'shutdown');
687			};
688
689			# assign
690			$_[HEAP]->{data} = $d->{zones} || {};
691		},
692
693		# store accumulated data
694		do_save => sub {
695			$log->debug("storing accumulated data");
696			open FH, '>', $opts{'database-file'}.'~' or do {
697				return $log->warning("can't write database file: $!");
698			};
699			my $d = { zones => $_[HEAP]->{data} || {} };
700			print FH $_[HEAP]->{serial}->encode($d);
701			close FH;
702
703			if (-f $opts{'database-file'}) {
704				unlink $opts{'database-file'} or do {
705					return $log->warning("can't remove old database file: $!");
706				};
707			};
708			rename $opts{'database-file'}.'~', $opts{'database-file'} or do {
709				return $log->warning("can't rename new database file: $!");
710			};
711		},
712
713		# new client accepted
714		server_accept => sub {
715			my ($port,$addr) = sockaddr_in $_[ARG1];
716			$log->debug("client accepted from ".inet_ntoa($addr).":$port");
717			my $c = POE::Wheel::ReadWrite->new(
718				Handle     => $_[ARG0],
719				InputEvent => 'client_input',
720				ErrorEvent => 'client_error',
721			);
722			$_[HEAP]->{clients}{$c->ID} = $c;
723		},
724
725		# server error occurred
726		server_error => sub {
727			$log->error("$_[ARG0] failed ($_[ARG1] during serving: $_[ARG2]");
728			$_[KERNEL]->call($_[SESSION], 'shutdown');
729		},
730
731		# got client command
732		client_input => sub {
733			$log->debug("got client command: '$_[ARG0]'");
734			my $cln = $_[HEAP]->{clients}{$_[ARG1]} or
735				return $log->warning("unknown client #$_[ARG1]");
736
737			for ($_[ARG0]) {
738
739				# zones list (active & inactive zones)
740				/^\s*zones\s*$/i and do {
741					$cln->put(
742						map { 'a:'.$_ }
743						keys %{ $_[HEAP]->{zones} },
744					);
745					$cln->put(
746						map { 'i:'.$_ }
747						grep { ! exists $_[HEAP]->{zones}{$_} }
748						keys %{ $_[HEAP]->{data} },
749					);
750					last;
751				};
752
753				# wildcards list (active zones only)
754				/^\s*globs\s+(\S+)\s*$/i and do {
755					my $z = $_[HEAP]->{zones}{$1} or do {
756						$log->warning("invalid client globs query: '$1'");
757						$cln->put('no such active zone');
758						last;
759					};
760					$cln->put( sort @{ $z } );
761					last;
762				};
763
764				# files list (active zones only)
765				/^\s*files\s+(\S+)\s*$/i and do {
766					$_[HEAP]->{zones}{$1} or do {
767						$log->warning("invalid client files query: '$1'");
768						$cln->put('no such active zone');
769						last;
770					};
771
772					my @f;
773					for my $w ( values %{ $_[HEAP]->{watchers} } ) {
774						for my $z ( @{ $w->[1] } ) {
775							next unless $z eq $1;
776							push @f, [ $w->[0], $w->[2]->tell, -s $w->[0] ];
777							last;
778						}
779					}
780					for ( sort { $a->[0] cmp $b->[0] } @f ) {
781						$cln->put( join ':', $_->[1] eq '0 but true' ? 0 : $_->[1], $_->[2], $_->[0] );
782					}
783					last;
784				};
785
786				# zone dump (active & inactive zones)
787				/^\s*dump\s+(\S+)\s*$/i and do {
788					$_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do {
789						$log->warning("invalid client dump query: '$1'");
790						$cln->put('no such zone');
791						last;
792					};
793					my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} };
794					$cln->put( $_[HEAP]->{plugin}->dump_zone( $1,
795						$_[HEAP]->{data}{$1}{public},
796						$_[HEAP]->{data}{$1}{private},
797						[ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ],
798					));
799					last;
800				};
801
802				# zone statistics (active & inactive zones)
803				/^\s*stats\s+(\S+)\s*$/i and do {
804					$_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do {
805						$log->warning("invalid client stats query: '$1'");
806						$cln->put('no such zone');
807						last;
808					};
809					my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} };
810					$cln->put( $_[HEAP]->{plugin}->stats_zone( $1,
811						$_[HEAP]->{data}{$1}{public},
812						$_[HEAP]->{data}{$1}{private},
813						[ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ],
814					));
815					last;
816				};
817
818				# remove zone statistics (inactive zones only)
819				/^\s*wipe\s+(\S+)\s*$/i and do {
820					if ($1 eq '*') {
821						for ( keys %{ $_[HEAP]->{data} } ) {
822							next if exists $_[HEAP]->{zones}{$_};
823							delete $_[HEAP]->{data}{$_};
824						}
825					} else {
826						$_[HEAP]->{data}{$1} or do {
827							$log->warning("invalid client wipe query: '$1'");
828							$cln->put('no such inactive zone');
829							last;
830						};
831						$_[HEAP]->{zones}{$1} and do {
832							$log->warning("invalid client wipe query: '$1'");
833							$cln->put('zone is active');
834							last;
835						};
836						delete $_[HEAP]->{data}{$1};
837					}
838					$cln->put( 'ok' );
839					$_[KERNEL]->call($_[SESSION], 'do_save');
840					last;
841				};
842
843
844				# disconnect request
845				/^\s*quit\s*$/i and do {
846					return delete $_[HEAP]->{clients}->{$_[ARG1]};
847				};
848
849				# invalid command
850				$log->warning("invalid client command: '$_[ARG0]'");
851				$_[HEAP]->{clients}{$_[ARG1]}->put('error');
852			}
853
854			# force buffer flush
855			$_[HEAP]->{clients}{$_[ARG1]}->flush;
856			$log->debug("buffer flushed");
857		},
858
859		# client errors (disconnect included)
860		client_error => sub {
861			$_[ARG1] ?
862				$log->error("$_[ARG0] ($_[ARG1] from client: $_[ARG2]") :
863				$log->debug("client disconnected");
864			# drop client connection
865			delete $_[HEAP]->{clients}->{$_[ARG3]};
866		},
867
868		# got SIGHUP
869		hangup => sub {
870			$log->notice("got SIGHUP, re-expanging zones wildcards");
871
872			# expanding zones wildcards
873			$_[KERNEL]->call($_[SESSION], 'do_expand');
874
875			# keep signal handled
876			$_[KERNEL]->sig_handled;
877		},
878
879		# got SIGINT
880		interrupt => sub {
881			$log->notice("got SIGINT, terminating");
882
883			# keep signal handled
884			$_[KERNEL]->sig_handled;
885
886			# shutting down
887			$_[KERNEL]->call($_[SESSION], 'shutdown');
888		},
889
890		# got SIGTERM
891		terminate => sub {
892			$log->notice("got SIGTERM, terminating");
893
894			# shutting down
895			$_[KERNEL]->call($_[SESSION], 'shutdown');
896
897			# keep signal handled
898			$_[KERNEL]->sig_handled;
899		},
900
901		# got SIGUSR1
902		rotate => sub {
903			$log->notice("got SIGUSR1, re-opening log file");
904
905			# drop & create logger
906			$log->remove('main');
907			$log->add(logger());
908
909			# keep signal handled
910			$_[KERNEL]->sig_handled;
911		},
912
913		# graceful exit
914		shutdown => sub {
915			$log->debug("gracefully shutting down");
916
917			# store statistics
918			$_[KERNEL]->call($_[SESSION], 'do_save');
919
920			# drop timers
921			$log->debug("removing alarms");
922			$_[KERNEL]->alarm_remove_all;
923
924			# drop server
925			$log->debug("shutting down server");
926			delete $_[HEAP]->{server};
927
928			# drop clients
929			$log->debug("disconnecting clients");
930			delete $_[HEAP]->{clients}{$_} for keys %{ $_[HEAP]->{clients} };
931
932			# drop watchers
933			$log->debug("shutting down watchers");
934			for (values %{ $_[HEAP]->{watchers} }) {
935				delete $_[HEAP]->{watchers}{$_->[2]->ID};
936				$log->debug("shutdown watcher [".$_->[2]->ID."] for '$_->[0]'");
937			}
938		},
939	},
940);
941
942
943# go!
944POE::Kernel->run;
945
946$log->notice("exit");
947
948# log object create
949sub logger {
950	return Log::Dispatch::Screen->new(
951		name      => 'main',
952		callbacks => [ \&pfmt, \&lfmt, \&dfmt ],
953		min_level => $opts{'log-level'},
954		stderr    => 1
955	) if $opts{foreground};
956
957	return Log::Dispatch::File->new(
958		name      => 'main',
959		callbacks => [ \&pfmt, \&lfmt, \&dfmt ],
960		min_level => $opts{'log-level'},
961		filename  => $opts{'log-file'},
962		mode      => '>>'
963	) if $opts{'log-file'};
964
965	return Log::Dispatch::Syslog->new(
966		name      => 'main',
967		callbacks => [ \&pfmt, ],
968		min_level => $opts{'log-level'},
969		facility  => $opts{'log-facility'},
970		ident     => $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '' )
971	);
972};
973
974# log formatting routines
975sub pfmt {
976	my %m = @_;
977	sprintf "$$: %s\n", $m{message};
978}
979sub lfmt {
980	my %m = @_;
981	sprintf "[%s] %s", $m{level}, $m{message};
982}
983sub dfmt {
984	my %m = @_;
985	sprintf "%s %s", strftime("%Y/%m/%d %H:%M:%S",localtime), $m{message};
986}
987
988sub usage {
989	<<EOM;
990
991Usage: $FindBin::RealScript [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN
992
993More information available under 'Tail::Stat' man page.
994
995Options:
996    -a, --agregate-zone=ZONE     agregating anonymous logs to zone
997    -b, --database-file=FILE     persistent database file
998        --basename               create anonymous zones from base name of log files
999    -c, --change-dir=DIR         change directory before wildards expanding
1000    -d, --debug                  implies: --foreground --log-level=debug
1001    -f, --foreground             no detach, logging to stderr
1002        --log-facility=NAME      set facility for syslog logging
1003        --log-level=LEVEL        minimum logging level
1004        --log-file=FILE          logging to file instead syslog
1005    -e, --expand-period=SECONDS  zones wildcard expand period
1006    -h, --help                   show this help message
1007    -i, --identity=STRING        add string to process title, default pid-file,
1008                                 default database-file and syslog ident
1009    -l, --listen=[ADDR:]PORT     TCP statistic server listen socket
1010        --multiple               log includes in all expanded wildcard
1011    -n, --windows-num=NUM        number of sliding windows (default 60)
1012    -o, --options=STRING         comma-separated plugin supported options
1013                                 (like a mount (8) options)
1014        --override-from=FILE     load overriding methods from file
1015    -p, --pid-file=FILE          pid file path
1016        --parse-error=LEVEL      logging level for unparsed lines
1017    -r, --regex=PATTERN          override plugin regular expression
1018        --regex-from=FILE        read regular expression from file
1019    -s, --store-period=SECONDS   data store period (default 60)
1020        --timer=ZONE:NAME:PERIOD add named timer with fixed period
1021    -u, --user=LOGIN             change effective process uid to
1022    -v, --version                print version and exit
1023    -w, --window-size=SECONDS    size of one sliding window (default 10)
1024
1025EOM
1026}
1027
1028sub version {
1029	<<EOM;
1030
1031$FindBin::RealScript version $Tail::Stat::VERSION
1032
1033Copyright (C) 2010 Oleg A. Mamontov
1034
1035This program is free software; you can redistribute it and/or modify it
1036under the terms of either: the GNU General Public License as published
1037by the Free Software Foundation; or the Artistic License.
1038
1039See http://dev.perl.org/licenses/ for more information.
1040
1041EOM
1042}
1043
1044