1#!/usr/bin/perl -T 2 3#------------------------------------------------------------------------------ 4# This is amavisd-status, a program to show status of child processes 5# in amavisd-new. 6# 7# Author: Mark Martinec <Mark.Martinec@ijs.si> 8# 9# Copyright (c) 2012-2016, Mark Martinec 10# All rights reserved. 11# 12# Redistribution and use in source and binary forms, with or without 13# modification, are permitted provided that the following conditions 14# are met: 15# 1. Redistributions of source code must retain the above copyright notice, 16# this list of conditions and the following disclaimer. 17# 2. Redistributions in binary form must reproduce the above copyright notice, 18# this list of conditions and the following disclaimer in the documentation 19# and/or other materials provided with the distribution. 20# 21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 22# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 23# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 24# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS 25# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 26# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 27# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 28# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 29# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 30# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 31# POSSIBILITY OF SUCH DAMAGE. 32# 33# The views and conclusions contained in the software and documentation are 34# those of the authors and should not be interpreted as representing official 35# policies, either expressed or implied, of the Jozef Stefan Institute. 36 37# (the above license is the 2-clause BSD license, also known as 38# a "Simplified BSD License", and pertains to this program only) 39# 40# Patches and problem reports are welcome. 41# The latest version of this program is available at: 42# http://www.ijs.si/software/amavisd/ 43#------------------------------------------------------------------------------ 44 45use strict; 46use re 'taint'; 47use warnings; 48use warnings FATAL => qw(utf8 void); 49no warnings 'uninitialized'; 50 51use Errno qw(ESRCH ENOENT); 52use POSIX qw(strftime); 53use Time::HiRes (); 54 55use vars qw($VERSION); $VERSION = 2.011000; 56use vars qw($myversion $myproduct_name $myversion_id $myversion_date); 57use vars qw($outer_sock_specs); 58 59$myproduct_name = 'amavisd-status'; 60$myversion_id = '2.11.0'; $myversion_date = '20150720'; 61$myversion = "$myproduct_name-$myversion_id ($myversion_date)"; 62 63### USER CONFIGURABLE: 64 65# should match a socket of the same name in amavis-services 66$outer_sock_specs = "tcp://127.0.0.1:23232"; 67 68### END OF USER CONFIGURABLE 69 70 71use vars qw($zmq_mod_name $zmq_mod_version $zmq_lib_version); 72BEGIN { 73 my($zmq_major, $zmq_minor, $zmq_patch); 74 if (eval { require ZMQ::LibZMQ3 && require ZMQ::Constants }) { 75 $zmq_mod_name = 'ZMQ::LibZMQ3'; # new interface module to zmq v3 or libxs 76 import ZMQ::LibZMQ3; import ZMQ::Constants qw(:all); 77 ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ3::zmq_version(); 78 # *zmq_sendmsg [native] # (socket,msgobj,flags) 79 # *zmq_recvmsg [native] # (socket,flags) -> msgobj 80 *zmq_sendstr = sub { # (socket,string,flags) 81 my $rv = zmq_send($_[0], $_[1], length $_[1], $_[2]||0); 82 $rv == -1 ? undef : $rv; 83 }; 84 } elsif (eval { require ZMQ::LibZMQ2 && require ZMQ::Constants }) { 85 $zmq_mod_name = 'ZMQ::LibZMQ2'; # new interface module to zmq v2 86 import ZMQ::LibZMQ2; import ZMQ::Constants qw(:all); 87 ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ2::zmq_version(); 88 # zmq v2/v3 incompatibile renaming 89 *zmq_sendmsg = \&ZMQ::LibZMQ2::zmq_send; # (socket,msgobj,flags) 90 *zmq_recvmsg = \&ZMQ::LibZMQ2::zmq_recv; # (socket,flags) -> msgobj 91 *zmq_sendstr = sub { # (socket,string,flags) 92 my $rv = zmq_send(@_); $rv == -1 ? undef : $rv; 93 }; 94 } elsif (eval { require ZeroMQ::Constants && require ZeroMQ::Raw }) { 95 $zmq_mod_name = 'ZeroMQ'; # old interface module to zmq v2 96 import ZeroMQ::Raw; import ZeroMQ::Constants qw(:all); 97 ($zmq_major, $zmq_minor, $zmq_patch) = ZeroMQ::version(); 98 # zmq v2/v3 incompatibile renaming 99 *zmq_sendmsg = \&ZeroMQ::Raw::zmq_send; # (socket,msgobj,flags) 100 *zmq_recvmsg = \&ZeroMQ::Raw::zmq_recv; # (socket,flags) -> msgobj 101 *zmq_sendstr = sub { # (socket,string,flags) 102 my $rv = zmq_send(@_); $rv == -1 ? undef : $rv; 103 }; 104 } else { 105 die "Perl modules ZMQ::LibZMQ3 or ZMQ::LibZMQ2 or ZeroMQ not available\n"; 106 } 107 $zmq_mod_version = $zmq_mod_name->VERSION; 108 $zmq_lib_version = join('.', $zmq_major, $zmq_minor, $zmq_patch); 109 1; 110} 111 112sub zmq_version { 113 sprintf("%s %s, lib %s", 114 $zmq_mod_name, $zmq_mod_version, $zmq_lib_version); 115}; 116 117sub zmq_recvstr { # (socket,buffer,offset) -> (len,more) 118 my $sock = $_[0]; 119 my $offset = $_[2] || 0; 120 my $zm = zmq_recvmsg($sock); # a copy of a received msg obj 121 if (!$zm) { substr($_[1],$offset) = ''; return } 122 ($offset ? substr($_[1],$offset) : $_[1]) = zmq_msg_data($zm); 123 my $len = length($_[1]) - $offset; 124 zmq_msg_close($zm); 125 return $len if !wantarray; 126 my $more = zmq_getsockopt($sock, ZMQ_RCVMORE); 127 if ($more == -1) { substr($_[1],$offset) = ''; return } 128 ($len, $more); 129}; 130 131 132my $wakeuptime = 1; # -w, sleep time in seconds, may be fractional 133my $repeatcount; # -c, repeat count (when defined) 134 135my $zmq_poll_units = 1000; # milliseconds since zmq v3 136$zmq_poll_units *= 1000 if $zmq_lib_version =~ /^[012]\./; # microseconds 137 138sub fmt_age($$$) { 139 my($t,$state_bar,$idling) = @_; 140 $t = int($t); 141 my $char = $idling ? '.' : '='; 142 my $bar_l = $idling ? $t : length($state_bar); 143 my $bar = substr( ($char x 9 . ':') x 3 . $char x 5, 0,$bar_l); 144 if (!$idling) { 145 $state_bar = substr($state_bar,0,length($bar)-2) . substr($state_bar,-1,1) 146 . '>' if length($state_bar) > length($bar); 147 for my $j (0 .. length($bar)-1) { 148 substr($bar,$j,1) = substr($state_bar,$j,1) 149 if substr($bar,$j,1) eq '=' && substr($state_bar,$j,1) ne ' '; 150 } 151 } 152 my $s = $t % 60; $t = int($t/60); 153 my $m = $t % 60; $t = int($t/60); 154 my $h = $t % 24; $t = int($t/24); 155 my $d = $t; 156 my $str = sprintf("%d:%02d:%02d", $h,$m,$s); 157 $str = (!$d ? " " : sprintf("%dd",$d)) . $str; 158 $str . ' ' . $bar; 159}; 160 161sub usage() { 162 print <<'EOD'; 163States legend: 164 A accepted a connection 165 b begin with a protocol for accepting a request 166 m 'MAIL FROM' smtp command started a new transaction in the same session 167 d transferring data from MTA to amavisd 168 = content checking just started 169 G generating and verifying unique mail_id 170 D decoding of mail parts 171 V virus scanning 172 S spam scanning 173 P pen pals database lookup and updates 174 r preparing results 175 Q quarantining and preparing/sending notifications 176 F forwarding mail to MTA 177 . content checking just finished 178 sp space indicates idle (elapsed time bar is showing dots) 179 180EOD 181 print "Usage: $0 [-c <count>] [-w <wait-interval>]\n"; 182} 183 184my($zmq_ctx, $zmq_sock); 185my %process; # associative array on pid 186my $any_events = 0; 187 188sub process_message { 189 my($msgstr, $msgstr_l, $more, $val, $p); 190 for (;;) { 191 ($msgstr_l,$more) = zmq_recvstr($zmq_sock,$msgstr); 192 defined $msgstr_l or die "zmq_recvstr failed: $!"; 193 $any_events = 1; 194 local($1); 195 if (!defined $msgstr) { 196 # should not happen (except on a failure of zmq_recvmsg) 197 } elsif ($msgstr =~ /^am\.st \d+\s+/s) { 198 my($subscription_chan, $pid, $time, $state, $task_id) = 199 split(' ',$msgstr); 200 if ($state eq 'FLUSH') { 201 %process = (); # flush all kept state (e.g. on a restart) 202 printf STDERR ("state flushed (restart)\n"); 203 } elsif ($state eq 'exiting' || $state eq 'purged') { 204 delete $process{$pid}; # may or may not exist 205 } else { 206 $state = ' ' if $state eq '-'; 207 $p = $process{$pid}; 208 if ($p) { 209 $p->{state} = $state; 210 $p->{task_id} = $task_id; 211 } else { # new process appeared 212 $process{$pid} = $p = { 213 state => $state, 214 task_id => $task_id, 215 timestamp => undef, 216 base_timestamp => undef, 217 last_displ_timestamp => undef, 218 state_bars => undef, 219 }; 220 } 221 my $now = Time::HiRes::time; 222 if ($time > 1e9) { # Unix time in seconds with fraction (> Y2000) 223 $p->{base_timestamp} = $p->{timestamp} = $time; 224 $p->{state_bars} = ''; # reset for a new task 225 } elsif (!$p->{base_timestamp}) { # delta time but no base 226 $p->{timestamp} = $now; 227 $p->{base_timestamp} = $p->{timestamp} - $time/1000; # estimate 228 } else { # delta time in ms since base_timestamp 229 $p->{timestamp} = $p->{base_timestamp} + $time/1000; 230 } 231 $p->{tick} = $now; 232 } 233 } elsif ($msgstr =~ /^am\.proc\.(busy|idle) /) { 234 my($subscription_chan, @pid_list) = split(' ',$msgstr); 235 my $now = Time::HiRes::time; 236 for my $pid (@pid_list) { 237 if ($process{$pid}) { 238 $p->{tick} = $now; 239 } else { 240 $process{$pid} = $p = { 241 state => $1 eq 'busy' ? '?' : ' ', 242 base_timestamp => $now, timestamp => $now, tick => $now, 243 }; 244 } 245 } 246 } else { 247 print STDERR "Unrecognized message received: $msgstr\n"; 248 } 249 last if !$more; 250 } 251 1; 252} 253 254use vars qw($peak_active $last_peak_reading_time); 255BEGIN { $peak_active = 0 } 256sub display_state() { 257 my $num_idling = 0; 258 my $num_active = 0; 259 my $now = Time::HiRes::time; 260 for my $pid (sort { $a <=> $b } keys %process) { 261 my $p = $process{$pid}; 262 my $idling = !defined $p->{task_id} && $p->{state} =~ /^[. ]\z/s; 263 my $age = $now - $p->{base_timestamp}; 264 if ($idling) { 265 $num_idling++; 266 $p->{state_bars} = ''; 267 next; # suppress reporting idle processes (or comment-out) 268 } else { 269 $num_active++; 270 my $len = int($age + 0.5); 271 $len = 1 if $len < 1; 272 my $str = $p->{state_bars}; 273 $str = '' if !defined $str; 274 if ($len > length $str) { # replicate last character to desired size 275 my $ch = $str eq '' ? '=' : substr($str,-1,1); 276 $str .= $ch x ($len - length $str); 277 } 278 substr($str,$len-1,1) = $p->{state}; 279 $p->{state_bars} = $str; 280 } 281 printf STDERR ("PID %5d: %-11s %s\n", 282 $pid, $p->{task_id} || $p->{state}, 283 fmt_age($age, $p->{state_bars}, $idling) ); 284 } 285 $now = Time::HiRes::time; 286 if ($num_active > $peak_active) { 287 $peak_active = $num_active; 288 } elsif ($peak_active >= 0.1) { # exponential decay of a peak indicator 289 my $halflife = 8; # seconds 290 my $weight = exp(-(($now - $last_peak_reading_time) / $halflife) * log(2)); 291 $peak_active *= $weight; 292 $peak_active = $num_active if $num_active > $peak_active; 293 } 294 $last_peak_reading_time = $now; 295 my $bar = ('*' x $num_active) . ('.' x $num_idling); 296 my $ipeak_active = int($peak_active+0.5); 297 if ($ipeak_active > $num_active) { 298 my $padding = $ipeak_active - ($num_active + $num_idling); 299 $bar .= ' ' x $padding if $padding > 0; 300 substr($bar, $num_active, $ipeak_active-$num_active) = 301 ':' x ($ipeak_active-$num_active); 302 } 303 printf STDERR ("%d active, %d idling processes\n", $num_active, $num_idling); 304 printf STDERR ("%s\n", $bar); 305} 306 307# main program starts here 308 309 my $normal_termination = 0; 310 $SIG{INT} = sub { die "\n" }; # do the END code block when interrupted 311 $SIG{TERM} = sub { die "\n" }; # do the END code block when killed 312 313 while (@ARGV) { 314 my $opt = shift @ARGV; 315 my $val = shift @ARGV; 316 if ($opt eq '-w' && $val =~ /^\+?\d+(?:\.\d*)?\z/) { $wakeuptime = $val } 317 elsif ($opt eq '-c' && $val =~ /^[+-]?\d+\z/) { $repeatcount = $val } 318 else { usage(); exit 1 } 319 } 320 321 $zmq_ctx = zmq_init(); 322 $zmq_ctx or die "Can't create ZMQ context: $!"; 323 $zmq_sock = zmq_socket($zmq_ctx,ZMQ_SUB); 324 $zmq_sock or die "Can't create ZMQ socket: $!"; 325 326 my $sock_ipv4only = 1; # a ZMQ default 327 if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) { 328 zmq_setsockopt($zmq_sock, ZMQ_IPV4ONLY(), 0) != -1 329 or die "zmq_setsockopt failed: $!"; 330 $sock_ipv4only = 0; 331 } 332 zmq_setsockopt($zmq_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1 333 or die "zmq_setsockopt SUBSCRIBE failed: $!"; 334 zmq_setsockopt($zmq_sock, ZMQ_SUBSCRIBE, 'am.proc.') != -1 335 or die "zmq_setsockopt SUBSCRIBE failed: $!"; 336 337 zmq_connect($zmq_sock, $outer_sock_specs) != -1 338 or die "zmq_connect to $outer_sock_specs failed: $!"; 339 340 print <<'EOD'; 341process-id task-id elapsed in elapsed-bar (dots indicate idle) 342 or state idle or busy 343EOD 344 345 my $last_display_time; 346 for (;;) { 347 if (defined $repeatcount) { 348 last if $repeatcount <= 0; 349 $repeatcount--; 350 } 351 $| = 0; 352 print "\n"; 353 354 my $now = Time::HiRes::time; 355 my $redraw_at = 356 defined $last_display_time ? $last_display_time + $wakeuptime 357 : $now + 0.2; 358 for (;;) { 359 my $timeout = $redraw_at - $now; 360 $timeout = 0 if $timeout < 0; 361 $any_events = 0; 362 zmq_poll( 363 [ 364 { socket => $zmq_sock, 365 events => ZMQ_POLLIN, 366 callback => \&process_message, 367 }, 368 ], 369 $timeout * $zmq_poll_units 370 ) != -1 or die "zmq_poll failed: $!"; 371 $now = Time::HiRes::time; 372 last if $now >= $redraw_at; 373 last if $any_events && $now > $last_display_time + 0.2; 374 } 375 376 while (my($pid,$p) = each %process) { # remove stale entries 377 delete $process{$pid} if $p && $now - $p->{tick} > 10*60; 378 } 379 display_state(); 380 $last_display_time = Time::HiRes::time; 381 382 $| = 1; # flush STDOUT 383 } # forever 384 385 $normal_termination = 1; 386 387END { 388 # ignoring status 389 zmq_close($zmq_sock) if $zmq_sock; 390 zmq_term($zmq_ctx) if $zmq_ctx; 391 print "exited\n" if !$normal_termination; 392 393 close(STDOUT) or die "Error closing STDOUT: $!"; 394 close(STDERR) or die "Error closing STDERR: $!"; 395} 396