1#!/usr/bin/env perl 2 3=head1 NAME 4 5tstatd - Logs real-time accounting daemon 6 7SYNOPSIS 8 9tstatd [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN 10 11OPTIONS 12 13=over 14 15=item -a I<zone>, --agregate-zone=I<zone> 16 17Agregate data from all anonymous logs (wildcards without explicit 18zone specified) into I<zone>. Default behavior is to create new 19zone for each anonymous log from its file name. 20 21=item -b I<file>, --database-file=I<file> 22 23Use I<file> as persistent storage to keep accumulated data across 24daemon restarts. Default is auto generated from daemon name, 25specified identity and '.db' suffix. 26 27=item --basename 28 29Use only base name (excluding directories and suffix) of anonymous log file 30for auto-created zones. 31 32=item -c I<dir>, --change-dir=I<dir> 33 34Change current directory to I<dir> before wildcards expanding. 35 36=item -d, --debug 37 38Composition of options: C<--foreground> and C<--log-level=debug>. 39 40=item -f, --foreground 41 42Don't detach daemon from control terminal, logging to C<stderr> instead 43log file or syslog. 44 45=item --log-facility=I<name> 46 47Use I<name> as C<facility> for syslog logging (see syslog (3) for list 48of available values). Default is 'daemon'. 49 50=item --log-level=I<level> 51 52Set minimal logging level to I<level> (see syslog (3) for list of available 53values). Default is 'notice'. 54 55=item --log-file=I<file> 56 57Use logging to I<file> instead of syslog logging (which is default). 58 59=item -e I<num>, --expand-period=I<num> 60 61Do wildcards re-expanding and checking for new and missed logs 62every I<num> seconds. Default is '60'. 63 64=item -h, --help 65 66Print brief help message about available options. 67 68=item -i I<string>, --identity=I<string> 69 70Just a string used in title of daemon process, syslog ident (see syslog(3)), 71C<--database-file> and C<--pid-file>. Idea behind this options - multiple 72C<tstatd> instances running simultaneosly. 73 74=item -l [I<address>:]I<port>, --listen=[I<address>:]I<port> 75 76Specify I<address> and I<port> for TCP listen socket binding. 77Default is '127.0.0.1:3638'. 78 79=item --multiple 80 81With this option specified same log file could be included into several 82zones (if log name satisifies several wildcards). Default behavior is to 83include log file only in first satisified zone. 84 85=item -n I<num>, --windows-num=I<num> 86 87Set number of sliding-windows to I<num>. Default is '60'. 88 89=item -o I<string>, --options=I<string> 90 91Comma-separated plugin supported options (like a mount (8) options). 92 93=item --override-from=I<file> 94 95Load content of I<file> into plugin package namespace. 96This is way to easy customize plugin behavior without creating 97another plugin. 98 99=item -p I<file>, --pid-file=I<file> 100 101Use I<file> to keep daemon process id. Default is auto generated 102from daemon name, specified identity and '.pid' suffix. 103 104=item --parse-error=I<level> 105 106Do logging with I<level> (see syslog (3) for available values) about 107all unparsed log lines. Hint: use 'none' for ignoring such lines. 108Default is defining by plugin and usually is 'debug'. 109 110=item -r I<pattern>, --regex=I<pattern> 111 112Use I<pattern> instead of plugin default regular expression for 113matching log lines. 114 115=item --regex-from=I<file> 116 117Load regular expression from I<file> and use instead of plugin default 118regular expression for matching log lines. 119 120=item -s I<num>, --store-period=I<num> 121 122Store accumulated data in a persistent storage every I<num> seconds. 123Default is '60'. 124 125=item --timer=I<zone>:I<timer>:I<num> 126 127Create named I<timer> firing every I<num> seconds for I<zone>. 128 129=item -u <user>, --user=I<user> 130 131Change effective privileges of daemon process to I<user>. 132 133=item -v, --version 134 135Print version information of C<tstatd> and exit. 136 137=item -w I<num>, --window-size=<num> 138 139Set size (duration) of sliding window to I<num> seconds. 140Default is '10'. 141 142=back 143 144 145=head1 SEE ALSO 146 147L<Tail::Stat> 148 149 150=head1 AUTHOR 151 152Oleg A. Mamontov, C<< <oleg@mamontov.net> >> 153 154 155=head1 COPYRIGHT & LICENSE 156 157This program is free software; you can redistribute it and/or modify it 158under the terms of either: the GNU General Public License as published 159by the Free Software Foundation; or the Artistic License. 160 161See http://dev.perl.org/licenses/ for more information. 162 163 164=cut 165 166use strict; 167use warnings qw(all); 168 169use Cwd qw(getcwd realpath); 170use DateTime; 171use File::Basename qw(fileparse); 172use FindBin; 173use Getopt::Long qw(:config no_auto_abbrev bundling); 174use JSON::XS; 175use List::Util qw(min); 176use Log::Dispatch; 177use Log::Dispatch::File; 178use Log::Dispatch::Screen; 179use Log::Dispatch::Syslog; 180use Pid::File::Flock; 181use POE qw(Wheel::FollowTail Wheel::ListenAccept Wheel::ReadWrite); 182use POSIX qw(setsid setuid strftime); 183use Socket; 184use Tail::Stat; 185use Tie::Hash::Indexed; 186 187# parse command line 188my %opts; 189GetOptions(\%opts, qw/ 190 agregate-zone|a=s 191 basename 192 database-file|b=s 193 change-dir|c=s 194 debug|d 195 foreground|f 196 log-facility=s 197 log-file=s 198 log-level=s 199 expand-period|e=i 200 help|h 201 identity|i=s 202 listen|l=s 203 windows-num|n=i 204 multiple 205 options|o=s 206 override-from=s@ 207 parse-error=s 208 pid-file|p=s 209 regex|r=s 210 regex-from=s 211 timer=s@ 212 store-period|s=i 213 user|u=s 214 version|v 215 window-size|w=i 216/) or die usage(); 217 218# explicitly requested help 219die usage() if $opts{help}; 220 221# version requested 222print version() and exit if $opts{version}; 223 224# no arguments 225die usage() if @ARGV < 2; 226 227# try to load requested plugin 228my $pname = shift @ARGV; 229my $pclass = "Tail::Stat::Plugin::$pname"; 230eval "require $pclass" or die "can't load plugin '$pname': $@\n"; 231 232 233# parameters defaults & validation 234if (exists $opts{'agregate-zone'}) { 235 die "invalid zone: '$opts{'agregate-zone'}'\n" 236 if $opts{'agregate-zone'} =~ /[^a-z0-9_-]/; 237} 238 239if (exists $opts{identity}) { 240 die "invalid identity: '$opts{identity}'\n" if $opts{identity} =~ /[^\w]/; 241} 242 243$opts{'database-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.db'; 244$opts{'database-file'} = realpath $opts{'database-file'}; 245 246if (exists $opts{'change-dir'}) { 247 die "no such directory: '$opts{'change-dir'}'\n" 248 unless -d $opts{'change-dir'}; 249} 250 251$opts{'log-facility'} ||= 'daemon'; 252die "invalid log facility: '$opts{'log-facility'}'\n" 253 unless $opts{'log-facility'} =~ /^(auth|authpriv|cron|daemon|kern|local[0-7]|mail|news|syslog|user|uucp)$/; 254 255$opts{'log-level'} ||= 'notice'; 256die "invalid log level: '$opts{'log-level'}'\n" 257 unless $opts{'log-level'} =~ /^(debug|info|notice|warning|error|critical|alert|emergency)$/; 258 259if ($opts{debug}) { 260 $opts{'log-level'} = 'debug'; 261 delete $opts{'log-file'}; 262 $opts{foreground} = 1; 263} 264 265$opts{'log-file'} = realpath $opts{'log-file'} if exists $opts{'log-file'}; 266 267$opts{'expand-period'} = 60 unless exists $opts{'expand-period'}; 268die "invalid expand period: '$opts{'expand-period'}'\n" 269 if $opts{'expand-period'} =~ /[^\d]/; 270 271$opts{'listen'} ||= '127.0.0.1:3638'; 272 273$opts{'windows-num'} = 60 unless exists $opts{'windows-num'}; 274die "invalid windows number: '$opts{'windows-num'}'\n" 275 if $opts{'windows-num'} =~ /[^\d]/; 276 277$opts{'store-period'} = 10 unless exists $opts{'store-period'}; 278die "invalid store period: '$opts{'store-period'}'\n" 279 if $opts{'store-period'} =~ /[^\d]/; 280 281$opts{'window-size'} = 10 unless exists $opts{'window-size'}; 282die "invalid window size: '$opts{'window-size'}'\n" 283 if $opts{'window-size'} =~ /[^\d]/; 284 285$opts{'parse-error'} ||= $pclass->parse_error; 286die "invalid parse error: '$opts{'parse-error'}'\n" 287 unless $opts{'parse-error'} =~ /^(debug|info|none|notice|warning|error|critical|alert|emergency)$/; 288 289$opts{'pid-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.pid'; 290$opts{'pid-file'} = realpath $opts{'pid-file'}; 291 292if (exists $opts{'regex-from'}) { 293 die "options regex and regex-from are mutually exclusive\n" 294 if exists $opts{regex}; 295 local $/; 296 open FH, $opts{'regex-from'} or 297 die "can't read regex: $!\n"; 298 $opts{regex} = <FH>; 299} 300 301# loading overrides 302for ( @{ $opts{'override-from'} } ) { 303 local $/; 304 open FH, $_ or 305 die "can't read override from '$_': $!\n"; 306 eval "package $pclass; use strict; use warnings qw(all); ".<FH>; 307 die "can't apply overrides from '$_': $@\n" if $@; 308} 309 310defined (my $uid = $opts{'user'} ? getpwnam($opts{'user'}) : $>) or 311 die "unknown user: $opts{'user'}\n"; 312 313# grouping log files by zones, order does matter 314# due to support '--multiple' option 315my %zones; 316tie %zones, 'Tie::Hash::Indexed'; 317for (@ARGV) { 318 /^([\w\d\_-]+):(.+)/ && do { 319 push @{$zones{$1}}, $2; 320 next; 321 }; 322 push @{$zones{ 323 $opts{'agregate-zone'} || 324 ( $opts{basename} ? fileparse($_,qr/\.[^\.]+/) : $_ ) 325 }}, $_; 326} 327 328# parsing timers 329my %timers; 330my %units = ( 331 w => [ 'week', 7 * 86_400 ], 332 d => [ 'day', 86_400 ], 333 h => [ 'hour', 3_600 ], 334 m => [ 'minute', 60 ], 335 s => [ 'second', 1 ], 336); 337for (@{ $opts{timer} }) { 338 my ($z,$n,$p,$u) = /^(\S+):(\S+):(\d+)(w|d|h|m|s)?$/ or 339 die "invalid timer format: $_\n"; 340 die "no such zone '$z' for timer '$_'\n" unless exists $zones{$z}; 341 die "zone '$z' already has timer '$n'\n" if exists $timers{$z}{$n}; 342 343 $u ||= 's'; 344 $timers{$z}{$n} = [ $p * $units{$u}[1], $units{$u}[0] ]; 345} 346 347# listen socket 348my $sock = IO::Socket::INET->new( 349 ( 350 $opts{'listen'} =~ /:/ ? 351 ( LocalAddr => $opts{'listen'} ) : 352 ( LocalPort => $opts{'listen'} ) 353 ), 354 Listen => SOMAXCONN, 355 ReuseAddr => 1, 356) or die "can't create listen socket: $!\n"; 357 358# set process privileges 359setuid $uid or die "can't setuid to $opts{'user'}: $!\n" unless $uid == $>; 360 361# set process title 362$0 = $FindBin::RealScript.': '.$pname.($opts{identity} ? ' ['.$opts{identity}.']' : ''); 363 364# fork 365unless ($opts{foreground}) { 366 defined(my $pid = fork) or die "can't fork: $!\n"; 367 exit if $pid; 368} 369 370# protecting against second instance running 371Pid::File::Flock->new($opts{'pid-file'}) unless $opts{foreground}; 372 373# daemonize 374unless ($opts{foreground}) { 375 chdir '/' or die "can't chdir: $!\n"; 376 die "can't create new session: $!\n" if setsid == -1; 377 open STDIN, '</dev/null' or die "can't close stdin\n"; 378 open STDOUT, '>/dev/null' or die "can't close stdout\n"; 379 open STDERR, '>/dev/null' or die "can't close stderr\n"; 380} 381 382# logger 383(my $log = Log::Dispatch->new)->add(logger()); 384$log->notice("starting up"); 385 386# catch perl warnings 387$SIG{__WARN__} = sub { $log->warning(@_) }; 388 389# main POE session 390POE::Session->create( 391 inline_states => { 392 393 # initializing 394 _start => sub { 395 $log->debug("initializing POE session"); 396 397 # talk POE kernel adjust to the new situation 398 $_[KERNEL]->has_forked unless $opts{foreground}; 399 400 # signals 401 $log->debug("setting up signal handlers"); 402 $_[KERNEL]->sig(HUP => 'hangup'); 403 $_[KERNEL]->sig(INT => 'interrupt'); 404 $_[KERNEL]->sig(TERM => 'terminate'); 405 $_[KERNEL]->sig(USR1 => 'rotate'); 406 407 # statistics server 408 $log->debug("creating TCP server"); 409 $_[HEAP]->{server} = POE::Wheel::ListenAccept->new( 410 Handle => $sock, 411 AcceptEvent => 'server_accept', 412 ErrorEvent => 'server_error', 413 ); 414 415 # serializer 416 $log->debug("creating serializer"); 417 $_[HEAP]->{serial} = JSON::XS->new->pretty; 418 419 # creating plugin instance 420 my %popts; 421 for (split /,/, $opts{options} || '') { 422 my ($k,$v) = split /=/; 423 $popts{$k} = defined $v ? $v : 1; 424 } 425 $popts{regex} = $opts{regex} if exists $opts{regex}; 426 $_[HEAP]->{plugin} = $pclass->new(%popts); 427 428 # setting up zones 429 $_[HEAP]->{zones} = \%zones; 430 431 # load previous data 432 if (-f $opts{'database-file'}) { 433 $_[KERNEL]->call($_[SESSION], 'do_load') or return; 434 } 435 436 # create insufficient references 437 for (keys %zones) { 438 $_[HEAP]->{data}{$_}{public} ||= {}; 439 $_[HEAP]->{data}{$_}{private} ||= {}; 440 $_[HEAP]->{data}{$_}{windows} ||= []; 441 $_[HEAP]->{data}{$_}{windows}[0] ||= {}; 442 443 # call plugin initialization code 444 $_[HEAP]->{plugin}->init_zone( 445 $_, 446 $_[HEAP]->{data}{$_}{public}, 447 $_[HEAP]->{data}{$_}{private}, 448 $_[HEAP]->{data}{$_}{windows}[0], 449 ); 450 } 451 452 # expanding zones wildcards 453 $_[KERNEL]->call($_[SESSION], 'do_expand'); 454 455 # creating named timers 456 $_[HEAP]->{timers} = \%timers; 457 for my $z ( keys %{ $_[HEAP]->{timers} } ) { 458 for my $n ( keys %{ $_[HEAP]->{timers}{$z} } ) { 459 $_[KERNEL]->call($_[SESSION], 'set_timer', $z, $n); 460 } 461 } 462 463 # schedule save task 464 if ($opts{'store-period'}) { 465 $log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)"); 466 $_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} ); 467 } 468 469 # schedule expanding wildcards 470 if ($opts{'expand-period'}) { 471 $log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)"); 472 $_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} ); 473 } 474 475 # schedule windows heartbeat 476 if ($opts{'window-size'} && $opts{'windows-num'}) { 477 $log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)"); 478 $_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} ); 479 } 480 }, 481 482 # expanding task 483 expand_heartbeat => sub { 484 $log->debug("wildcards expanding heartbeat occurred"); 485 486 # expanding zones wildcards 487 $_[KERNEL]->call($_[SESSION], 'do_expand'); 488 489 # schedule next call 490 if ($opts{'expand-period'}) { 491 $log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)"); 492 $_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} ); 493 } 494 }, 495 496 # setting named timer 497 set_timer => sub { 498 my $t = $_[HEAP]->{timers}{$_[ARG0]}{$_[ARG1]}; 499 my $next = DateTime->now( 500 time_zone => 'local' 501 )->add( 502 seconds => $t->[0] 503 )->truncate( 504 to => $t->[1] 505 ); 506 $log->debug("setting named timer '$_[ARG1]' for zone '$_[ARG0] at '".$next->strftime('%Y-%m-%d %H:%M:%S')."'"); 507 $_[KERNEL]->alarm_set( named_timer => $next->epoch, $_[ARG0], $_[ARG1] ); 508 }, 509 510 # named timer handler 511 named_timer => sub { 512 $log->debug("processing named timer '$_[ARG1]' for zone '$_[ARG0]"); 513 $_[HEAP]->{plugin}->process_timer( 514 $_[ARG1], 515 $_[HEAP]->{data}{$_[ARG0]}{public}, 516 $_[HEAP]->{data}{$_[ARG0]}{private}, 517 $_[HEAP]->{data}{$_[ARG0]}{windows} 518 ) ? do { 519 $log->debug("renewing timer '$_[ARG1]' for zone '$_[ARG0]"); 520 $_[KERNEL]->call($_[SESSION], 'set_timer', $_[ARG0], $_[ARG1]); 521 } : do { 522 $log->debug("clearing timer '$_[ARG1]' for zone '$_[ARG0]'"); 523 }; 524 }, 525 526 # expanding wildcards 527 do_expand => sub { 528 $log->debug("begin expanding wildcards"); 529 530 my $cwd = getcwd; 531 if ($opts{'change-dir'}) { 532 chdir $opts{'change-dir'} or 533 $log->warning("can't change directory to '$opts{'change-dir'}'"); 534 } 535 536 my %exif; # existing files 537 for my $zone ( keys %{ $_[HEAP]->{zones} } ) { 538 my @files; 539 push @files, map { realpath $_ } grep { -f } glob $_ 540 for @{ $_[HEAP]->{zones}{$zone} }; 541 $log->debug("found ".scalar(@files)." file(s) in zone '$zone'"); 542 543 # create missing watchers 544 FILE: 545 for my $f (@files) { 546 $exif{$f}++; 547 548 # searching for already monitored file 549 for my $w ( values %{ $_[HEAP]->{watchers} } ) { 550 next unless $f eq $w->[0]; 551 552 unless ($opts{multiple}) { 553 $log->debug("file '$f' already monitored, ignoring for zone '$zone'"); 554 next FILE; 555 } 556 557 # searching for already subscribed zone 558 for my $z ( @{ $w->[1] } ) { 559 next unless $z eq $zone; 560 $log->debug("zone already subscribed for '$f'"); 561 next FILE; 562 } 563 564 # subscribe to existing watcher 565 push @{ $w->[1] }, $zone; 566 $log->debug("zone subscribed for '$f'"); 567 next FILE; 568 } 569 570 # create new watcher and subscribe zone 571 my $w = POE::Wheel::FollowTail->new( 572 Filename => $f, 573 Filter => POE::Filter::Line->new( InputLiteral => "\n" ), 574 ErrorEvent => 'watcher_err', 575 InputEvent => 'watcher_line', 576 ResetEvent => 'watcher_roll', 577 ); 578 $log->debug("created new watcher [".$w->ID."] for '$f'"); 579 $log->debug("zone subscribed for '$f'"); 580 $_[HEAP]->{watchers}{$w->ID} = [ $f, [ $zone ], $w ]; 581 } 582 } 583 584 # remove excess watchers 585 for my $w (values %{ $_[HEAP]->{watchers} } ) { 586 next if $exif{$w->[0]}; # file exists 587 588 delete $_[HEAP]->{watchers}{$w->[2]->ID}; 589 $log->debug("excess watcher [".$w->[2]->ID."] removed for '".$w->[0]."'"); 590 } 591 592 if ($opts{'change-dir'}) { 593 chdir $cwd or 594 $log->warning("can't restore directory to '$cwd'"); 595 } 596 }, 597 598 # new log line 599 watcher_line => sub { 600 my $w = $_[HEAP]->{watchers}{$_[ARG1]}; 601 my @data = $_[HEAP]->{plugin}->process_line($_[ARG0]) or do { 602 return $log->log( 603 level => $opts{'parse-error'}, 604 message => "can't parse: '$_[ARG0]' from '$w->[0]'" 605 ) unless $opts{'parse-error'} eq 'none'; 606 }; 607 # subscribers loop 608 for my $z ( @{ $w->[1] } ) { 609 $_[HEAP]->{plugin}->process_data( 610 \@data, 611 $_[HEAP]->{data}{$z}{public}, 612 $_[HEAP]->{data}{$z}{private}, 613 $_[HEAP]->{data}{$z}{windows}[0] 614 ); 615 } 616 }, 617 618 # log rotating occurred 619 watcher_roll => sub { 620 my $w = $_[HEAP]->{watchers}{$_[ARG0]}; 621 # clear tail fragment 622 $w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ] =~ s/[^\n]+\z// 623 if $w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ]; 624 $log->info("rolled over '$w->[0]'"); 625 }, 626 627 # log tailing error 628 watcher_err => sub { 629 my $w = $_[HEAP]->{watchers}{$_[ARG3]}; 630 $log->error("$_[ARG0] failed ($_[ARG1] during tail '$w->[0]': $_[ARG2]"); 631 }, 632 633 # windows processing 634 windows_heartbeat => sub { 635 $log->debug("windows heartbeat occurred"); 636 637 # schedule windows processing 638 for ( keys %{ $_[HEAP]->{zones} } ) { 639 $_[KERNEL]->yield( do_window => $_ ); 640 } 641 642 # schedule next call 643 $log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)"); 644 $_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} ); 645 }, 646 647 do_window => sub { 648 # windows ring 649 my $wins = $_[HEAP]->{data}{$_[ARG0]}{windows}; 650 651 # call plugin handler with last complete window 652 $_[HEAP]->{plugin}->process_window( 653 $_[HEAP]->{data}{$_[ARG0]}{public}, 654 $_[HEAP]->{data}{$_[ARG0]}{private}, 655 $wins 656 ); 657 658 # slide windows 659 unshift @$wins, {}; 660 $#$wins = $opts{'windows-num'} - 1 661 if $#$wins > $opts{'windows-num'} - 1; 662 }, 663 664 # periodically task 665 save_heartbeat => sub { 666 $log->debug("saving heartbeat occurred"); 667 668 # save accumulated data 669 $_[KERNEL]->call($_[SESSION], 'do_save'); 670 671 # schedule next call 672 $log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)"); 673 $_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} ); 674 }, 675 676 # loading stored data 677 do_load => sub { 678 $log->debug("loading stored data"); 679 open FH, $opts{'database-file'} or do { 680 $log->error("can't open database file: $!"); 681 return $_[KERNEL]->call($_[SESSION], 'shutdown'); 682 }; 683 local $/; 684 my $d = $_[HEAP]->{serial}->decode(<FH>) or do { 685 $log->error("can't read database file: $!"); 686 return $_[KERNEL]->call($_[SESSION], 'shutdown'); 687 }; 688 689 # assign 690 $_[HEAP]->{data} = $d->{zones} || {}; 691 }, 692 693 # store accumulated data 694 do_save => sub { 695 $log->debug("storing accumulated data"); 696 open FH, '>', $opts{'database-file'}.'~' or do { 697 return $log->warning("can't write database file: $!"); 698 }; 699 my $d = { zones => $_[HEAP]->{data} || {} }; 700 print FH $_[HEAP]->{serial}->encode($d); 701 close FH; 702 703 if (-f $opts{'database-file'}) { 704 unlink $opts{'database-file'} or do { 705 return $log->warning("can't remove old database file: $!"); 706 }; 707 }; 708 rename $opts{'database-file'}.'~', $opts{'database-file'} or do { 709 return $log->warning("can't rename new database file: $!"); 710 }; 711 }, 712 713 # new client accepted 714 server_accept => sub { 715 my ($port,$addr) = sockaddr_in $_[ARG1]; 716 $log->debug("client accepted from ".inet_ntoa($addr).":$port"); 717 my $c = POE::Wheel::ReadWrite->new( 718 Handle => $_[ARG0], 719 InputEvent => 'client_input', 720 ErrorEvent => 'client_error', 721 ); 722 $_[HEAP]->{clients}{$c->ID} = $c; 723 }, 724 725 # server error occurred 726 server_error => sub { 727 $log->error("$_[ARG0] failed ($_[ARG1] during serving: $_[ARG2]"); 728 $_[KERNEL]->call($_[SESSION], 'shutdown'); 729 }, 730 731 # got client command 732 client_input => sub { 733 $log->debug("got client command: '$_[ARG0]'"); 734 my $cln = $_[HEAP]->{clients}{$_[ARG1]} or 735 return $log->warning("unknown client #$_[ARG1]"); 736 737 for ($_[ARG0]) { 738 739 # zones list (active & inactive zones) 740 /^\s*zones\s*$/i and do { 741 $cln->put( 742 map { 'a:'.$_ } 743 keys %{ $_[HEAP]->{zones} }, 744 ); 745 $cln->put( 746 map { 'i:'.$_ } 747 grep { ! exists $_[HEAP]->{zones}{$_} } 748 keys %{ $_[HEAP]->{data} }, 749 ); 750 last; 751 }; 752 753 # wildcards list (active zones only) 754 /^\s*globs\s+(\S+)\s*$/i and do { 755 my $z = $_[HEAP]->{zones}{$1} or do { 756 $log->warning("invalid client globs query: '$1'"); 757 $cln->put('no such active zone'); 758 last; 759 }; 760 $cln->put( sort @{ $z } ); 761 last; 762 }; 763 764 # files list (active zones only) 765 /^\s*files\s+(\S+)\s*$/i and do { 766 $_[HEAP]->{zones}{$1} or do { 767 $log->warning("invalid client files query: '$1'"); 768 $cln->put('no such active zone'); 769 last; 770 }; 771 772 my @f; 773 for my $w ( values %{ $_[HEAP]->{watchers} } ) { 774 for my $z ( @{ $w->[1] } ) { 775 next unless $z eq $1; 776 push @f, [ $w->[0], $w->[2]->tell, -s $w->[0] ]; 777 last; 778 } 779 } 780 for ( sort { $a->[0] cmp $b->[0] } @f ) { 781 $cln->put( join ':', $_->[1] eq '0 but true' ? 0 : $_->[1], $_->[2], $_->[0] ); 782 } 783 last; 784 }; 785 786 # zone dump (active & inactive zones) 787 /^\s*dump\s+(\S+)\s*$/i and do { 788 $_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do { 789 $log->warning("invalid client dump query: '$1'"); 790 $cln->put('no such zone'); 791 last; 792 }; 793 my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} }; 794 $cln->put( $_[HEAP]->{plugin}->dump_zone( $1, 795 $_[HEAP]->{data}{$1}{public}, 796 $_[HEAP]->{data}{$1}{private}, 797 [ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ], 798 )); 799 last; 800 }; 801 802 # zone statistics (active & inactive zones) 803 /^\s*stats\s+(\S+)\s*$/i and do { 804 $_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do { 805 $log->warning("invalid client stats query: '$1'"); 806 $cln->put('no such zone'); 807 last; 808 }; 809 my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} }; 810 $cln->put( $_[HEAP]->{plugin}->stats_zone( $1, 811 $_[HEAP]->{data}{$1}{public}, 812 $_[HEAP]->{data}{$1}{private}, 813 [ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ], 814 )); 815 last; 816 }; 817 818 # remove zone statistics (inactive zones only) 819 /^\s*wipe\s+(\S+)\s*$/i and do { 820 if ($1 eq '*') { 821 for ( keys %{ $_[HEAP]->{data} } ) { 822 next if exists $_[HEAP]->{zones}{$_}; 823 delete $_[HEAP]->{data}{$_}; 824 } 825 } else { 826 $_[HEAP]->{data}{$1} or do { 827 $log->warning("invalid client wipe query: '$1'"); 828 $cln->put('no such inactive zone'); 829 last; 830 }; 831 $_[HEAP]->{zones}{$1} and do { 832 $log->warning("invalid client wipe query: '$1'"); 833 $cln->put('zone is active'); 834 last; 835 }; 836 delete $_[HEAP]->{data}{$1}; 837 } 838 $cln->put( 'ok' ); 839 $_[KERNEL]->call($_[SESSION], 'do_save'); 840 last; 841 }; 842 843 844 # disconnect request 845 /^\s*quit\s*$/i and do { 846 return delete $_[HEAP]->{clients}->{$_[ARG1]}; 847 }; 848 849 # invalid command 850 $log->warning("invalid client command: '$_[ARG0]'"); 851 $_[HEAP]->{clients}{$_[ARG1]}->put('error'); 852 } 853 854 # force buffer flush 855 $_[HEAP]->{clients}{$_[ARG1]}->flush; 856 $log->debug("buffer flushed"); 857 }, 858 859 # client errors (disconnect included) 860 client_error => sub { 861 $_[ARG1] ? 862 $log->error("$_[ARG0] ($_[ARG1] from client: $_[ARG2]") : 863 $log->debug("client disconnected"); 864 # drop client connection 865 delete $_[HEAP]->{clients}->{$_[ARG3]}; 866 }, 867 868 # got SIGHUP 869 hangup => sub { 870 $log->notice("got SIGHUP, re-expanging zones wildcards"); 871 872 # expanding zones wildcards 873 $_[KERNEL]->call($_[SESSION], 'do_expand'); 874 875 # keep signal handled 876 $_[KERNEL]->sig_handled; 877 }, 878 879 # got SIGINT 880 interrupt => sub { 881 $log->notice("got SIGINT, terminating"); 882 883 # keep signal handled 884 $_[KERNEL]->sig_handled; 885 886 # shutting down 887 $_[KERNEL]->call($_[SESSION], 'shutdown'); 888 }, 889 890 # got SIGTERM 891 terminate => sub { 892 $log->notice("got SIGTERM, terminating"); 893 894 # shutting down 895 $_[KERNEL]->call($_[SESSION], 'shutdown'); 896 897 # keep signal handled 898 $_[KERNEL]->sig_handled; 899 }, 900 901 # got SIGUSR1 902 rotate => sub { 903 $log->notice("got SIGUSR1, re-opening log file"); 904 905 # drop & create logger 906 $log->remove('main'); 907 $log->add(logger()); 908 909 # keep signal handled 910 $_[KERNEL]->sig_handled; 911 }, 912 913 # graceful exit 914 shutdown => sub { 915 $log->debug("gracefully shutting down"); 916 917 # store statistics 918 $_[KERNEL]->call($_[SESSION], 'do_save'); 919 920 # drop timers 921 $log->debug("removing alarms"); 922 $_[KERNEL]->alarm_remove_all; 923 924 # drop server 925 $log->debug("shutting down server"); 926 delete $_[HEAP]->{server}; 927 928 # drop clients 929 $log->debug("disconnecting clients"); 930 delete $_[HEAP]->{clients}{$_} for keys %{ $_[HEAP]->{clients} }; 931 932 # drop watchers 933 $log->debug("shutting down watchers"); 934 for (values %{ $_[HEAP]->{watchers} }) { 935 delete $_[HEAP]->{watchers}{$_->[2]->ID}; 936 $log->debug("shutdown watcher [".$_->[2]->ID."] for '$_->[0]'"); 937 } 938 }, 939 }, 940); 941 942 943# go! 944POE::Kernel->run; 945 946$log->notice("exit"); 947 948# log object create 949sub logger { 950 return Log::Dispatch::Screen->new( 951 name => 'main', 952 callbacks => [ \&pfmt, \&lfmt, \&dfmt ], 953 min_level => $opts{'log-level'}, 954 stderr => 1 955 ) if $opts{foreground}; 956 957 return Log::Dispatch::File->new( 958 name => 'main', 959 callbacks => [ \&pfmt, \&lfmt, \&dfmt ], 960 min_level => $opts{'log-level'}, 961 filename => $opts{'log-file'}, 962 mode => '>>' 963 ) if $opts{'log-file'}; 964 965 return Log::Dispatch::Syslog->new( 966 name => 'main', 967 callbacks => [ \&pfmt, ], 968 min_level => $opts{'log-level'}, 969 facility => $opts{'log-facility'}, 970 ident => $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '' ) 971 ); 972}; 973 974# log formatting routines 975sub pfmt { 976 my %m = @_; 977 sprintf "$$: %s\n", $m{message}; 978} 979sub lfmt { 980 my %m = @_; 981 sprintf "[%s] %s", $m{level}, $m{message}; 982} 983sub dfmt { 984 my %m = @_; 985 sprintf "%s %s", strftime("%Y/%m/%d %H:%M:%S",localtime), $m{message}; 986} 987 988sub usage { 989 <<EOM; 990 991Usage: $FindBin::RealScript [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN 992 993More information available under 'Tail::Stat' man page. 994 995Options: 996 -a, --agregate-zone=ZONE agregating anonymous logs to zone 997 -b, --database-file=FILE persistent database file 998 --basename create anonymous zones from base name of log files 999 -c, --change-dir=DIR change directory before wildards expanding 1000 -d, --debug implies: --foreground --log-level=debug 1001 -f, --foreground no detach, logging to stderr 1002 --log-facility=NAME set facility for syslog logging 1003 --log-level=LEVEL minimum logging level 1004 --log-file=FILE logging to file instead syslog 1005 -e, --expand-period=SECONDS zones wildcard expand period 1006 -h, --help show this help message 1007 -i, --identity=STRING add string to process title, default pid-file, 1008 default database-file and syslog ident 1009 -l, --listen=[ADDR:]PORT TCP statistic server listen socket 1010 --multiple log includes in all expanded wildcard 1011 -n, --windows-num=NUM number of sliding windows (default 60) 1012 -o, --options=STRING comma-separated plugin supported options 1013 (like a mount (8) options) 1014 --override-from=FILE load overriding methods from file 1015 -p, --pid-file=FILE pid file path 1016 --parse-error=LEVEL logging level for unparsed lines 1017 -r, --regex=PATTERN override plugin regular expression 1018 --regex-from=FILE read regular expression from file 1019 -s, --store-period=SECONDS data store period (default 60) 1020 --timer=ZONE:NAME:PERIOD add named timer with fixed period 1021 -u, --user=LOGIN change effective process uid to 1022 -v, --version print version and exit 1023 -w, --window-size=SECONDS size of one sliding window (default 10) 1024 1025EOM 1026} 1027 1028sub version { 1029 <<EOM; 1030 1031$FindBin::RealScript version $Tail::Stat::VERSION 1032 1033Copyright (C) 2010 Oleg A. Mamontov 1034 1035This program is free software; you can redistribute it and/or modify it 1036under the terms of either: the GNU General Public License as published 1037by the Free Software Foundation; or the Artistic License. 1038 1039See http://dev.perl.org/licenses/ for more information. 1040 1041EOM 1042} 1043 1044