1#!/usr/bin/perl -T
2
3#------------------------------------------------------------------------------
4# This is amavisd-status, a program to show status of child processes
5# in amavisd-new.
6#
7# Author: Mark Martinec <Mark.Martinec@ijs.si>
8#
9# Copyright (c) 2012-2016, Mark Martinec
10# All rights reserved.
11#
12# Redistribution and use in source and binary forms, with or without
13# modification, are permitted provided that the following conditions
14# are met:
15# 1. Redistributions of source code must retain the above copyright notice,
16#    this list of conditions and the following disclaimer.
17# 2. Redistributions in binary form must reproduce the above copyright notice,
18#    this list of conditions and the following disclaimer in the documentation
19#    and/or other materials provided with the distribution.
20#
21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
25# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31# POSSIBILITY OF SUCH DAMAGE.
32#
33# The views and conclusions contained in the software and documentation are
34# those of the authors and should not be interpreted as representing official
35# policies, either expressed or implied, of the Jozef Stefan Institute.
36
37# (the above license is the 2-clause BSD license, also known as
38#  a "Simplified BSD License", and pertains to this program only)
39#
40# Patches and problem reports are welcome.
41# The latest version of this program is available at:
42#   http://www.ijs.si/software/amavisd/
43#------------------------------------------------------------------------------
44
45use strict;
46use re 'taint';
47use warnings;
48use warnings FATAL => qw(utf8 void);
49no warnings 'uninitialized';
50
51use Errno qw(ESRCH ENOENT);
52use POSIX qw(strftime);
53use Time::HiRes ();
54
55use vars qw($VERSION);  $VERSION = 2.011000;
56use vars qw($myversion $myproduct_name $myversion_id $myversion_date);
57use vars qw($outer_sock_specs);
58
59$myproduct_name = 'amavisd-status';
60$myversion_id = '2.11.0'; $myversion_date = '20150720';
61$myversion = "$myproduct_name-$myversion_id ($myversion_date)";
62
63### USER CONFIGURABLE:
64
65# should match a socket of the same name in amavis-services
66$outer_sock_specs = "tcp://127.0.0.1:23232";
67
68### END OF USER CONFIGURABLE
69
70
71use vars qw($zmq_mod_name $zmq_mod_version $zmq_lib_version);
72BEGIN {
73  my($zmq_major, $zmq_minor, $zmq_patch);
74  if (eval { require ZMQ::LibZMQ3 && require ZMQ::Constants }) {
75    $zmq_mod_name = 'ZMQ::LibZMQ3';  # new interface module to zmq v3 or libxs
76    import ZMQ::LibZMQ3;  import ZMQ::Constants qw(:all);
77    ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ3::zmq_version();
78  # *zmq_sendmsg   [native]                   # (socket,msgobj,flags)
79  # *zmq_recvmsg   [native]                   # (socket,flags) -> msgobj
80    *zmq_sendstr = sub {                      # (socket,string,flags)
81      my $rv = zmq_send($_[0], $_[1], length $_[1], $_[2]||0);
82      $rv == -1 ? undef : $rv;
83    };
84  } elsif (eval { require ZMQ::LibZMQ2 && require ZMQ::Constants }) {
85    $zmq_mod_name = 'ZMQ::LibZMQ2';  # new interface module to zmq v2
86    import ZMQ::LibZMQ2;  import ZMQ::Constants qw(:all);
87    ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ2::zmq_version();
88    # zmq v2/v3 incompatibile renaming
89    *zmq_sendmsg = \&ZMQ::LibZMQ2::zmq_send;  # (socket,msgobj,flags)
90    *zmq_recvmsg = \&ZMQ::LibZMQ2::zmq_recv;  # (socket,flags) -> msgobj
91    *zmq_sendstr = sub {                      # (socket,string,flags)
92      my $rv = zmq_send(@_);  $rv == -1 ? undef : $rv;
93    };
94  } elsif (eval { require ZeroMQ::Constants && require ZeroMQ::Raw }) {
95    $zmq_mod_name = 'ZeroMQ';  # old interface module to zmq v2
96    import ZeroMQ::Raw;  import ZeroMQ::Constants qw(:all);
97    ($zmq_major, $zmq_minor, $zmq_patch) = ZeroMQ::version();
98    # zmq v2/v3 incompatibile renaming
99    *zmq_sendmsg = \&ZeroMQ::Raw::zmq_send;   # (socket,msgobj,flags)
100    *zmq_recvmsg = \&ZeroMQ::Raw::zmq_recv;   # (socket,flags) -> msgobj
101    *zmq_sendstr = sub {                      # (socket,string,flags)
102      my $rv = zmq_send(@_);  $rv == -1 ? undef : $rv;
103    };
104  } else {
105    die "Perl modules ZMQ::LibZMQ3 or ZMQ::LibZMQ2 or ZeroMQ not available\n";
106  }
107  $zmq_mod_version = $zmq_mod_name->VERSION;
108  $zmq_lib_version = join('.', $zmq_major, $zmq_minor, $zmq_patch);
109  1;
110}
111
112sub zmq_version {
113  sprintf("%s %s, lib %s",
114          $zmq_mod_name, $zmq_mod_version, $zmq_lib_version);
115};
116
117sub zmq_recvstr {               # (socket,buffer,offset) -> (len,more)
118  my $sock = $_[0];
119  my $offset = $_[2] || 0;
120  my $zm = zmq_recvmsg($sock);  # a copy of a received msg obj
121  if (!$zm) { substr($_[1],$offset) = ''; return }
122  ($offset ? substr($_[1],$offset) : $_[1]) = zmq_msg_data($zm);
123  my $len = length($_[1]) - $offset;
124  zmq_msg_close($zm);
125  return $len  if !wantarray;
126  my $more = zmq_getsockopt($sock, ZMQ_RCVMORE);
127  if ($more == -1) { substr($_[1],$offset) = ''; return }
128  ($len, $more);
129};
130
131
132my $wakeuptime = 1;    # -w, sleep time in seconds, may be fractional
133my $repeatcount;       # -c, repeat count (when defined)
134
135my $zmq_poll_units = 1000;  # milliseconds since zmq v3
136$zmq_poll_units *= 1000  if $zmq_lib_version =~ /^[012]\./;  # microseconds
137
138sub fmt_age($$$) {
139  my($t,$state_bar,$idling) = @_;
140  $t = int($t);
141  my $char = $idling ? '.' : '=';
142  my $bar_l = $idling ? $t : length($state_bar);
143  my $bar = substr( ($char x 9 . ':') x 3 . $char x 5, 0,$bar_l);
144  if (!$idling) {
145    $state_bar = substr($state_bar,0,length($bar)-2) . substr($state_bar,-1,1)
146                 . '>'  if length($state_bar) > length($bar);
147    for my $j (0 .. length($bar)-1) {
148      substr($bar,$j,1) = substr($state_bar,$j,1)
149        if substr($bar,$j,1) eq '=' && substr($state_bar,$j,1) ne ' ';
150    }
151  }
152  my $s = $t % 60;  $t = int($t/60);
153  my $m = $t % 60;  $t = int($t/60);
154  my $h = $t % 24;  $t = int($t/24);
155  my $d = $t;
156  my $str = sprintf("%d:%02d:%02d", $h,$m,$s);
157  $str = (!$d ? "  " : sprintf("%dd",$d)) . $str;
158  $str . ' ' . $bar;
159};
160
161sub usage() {
162  print <<'EOD';
163States legend:
164  A  accepted a connection
165  b  begin with a protocol for accepting a request
166  m  'MAIL FROM' smtp command started a new transaction in the same session
167  d  transferring data from MTA to amavisd
168  =  content checking just started
169  G  generating and verifying unique mail_id
170  D  decoding of mail parts
171  V  virus scanning
172  S  spam scanning
173  P  pen pals database lookup and updates
174  r  preparing results
175  Q  quarantining and preparing/sending notifications
176  F  forwarding mail to MTA
177  .  content checking just finished
178  sp space indicates idle (elapsed time bar is showing dots)
179
180EOD
181  print "Usage: $0 [-c <count>] [-w <wait-interval>]\n";
182}
183
184my($zmq_ctx, $zmq_sock);
185my %process; # associative array on pid
186my $any_events = 0;
187
188sub process_message {
189  my($msgstr, $msgstr_l, $more, $val, $p);
190  for (;;) {
191    ($msgstr_l,$more) = zmq_recvstr($zmq_sock,$msgstr);
192    defined $msgstr_l  or die "zmq_recvstr failed: $!";
193    $any_events = 1;
194    local($1);
195    if (!defined $msgstr) {
196      # should not happen (except on a failure of zmq_recvmsg)
197    } elsif ($msgstr =~ /^am\.st \d+\s+/s) {
198      my($subscription_chan, $pid, $time, $state, $task_id) =
199        split(' ',$msgstr);
200      if ($state eq 'FLUSH') {
201        %process = ();  # flush all kept state (e.g. on a restart)
202        printf STDERR ("state flushed (restart)\n");
203      } elsif ($state eq 'exiting' || $state eq 'purged') {
204        delete $process{$pid};  # may or may not exist
205      } else {
206        $state = ' ' if $state eq '-';
207        $p = $process{$pid};
208        if ($p) {
209          $p->{state} = $state;
210          $p->{task_id} = $task_id;
211        } else {  # new process appeared
212          $process{$pid} = $p = {
213            state     => $state,
214            task_id   => $task_id,
215            timestamp => undef,
216            base_timestamp => undef,
217            last_displ_timestamp => undef,
218            state_bars => undef,
219          };
220        }
221        my $now = Time::HiRes::time;
222        if ($time > 1e9) {  # Unix time in seconds with fraction (> Y2000)
223          $p->{base_timestamp} = $p->{timestamp} = $time;
224          $p->{state_bars} = '';  # reset for a new task
225        } elsif (!$p->{base_timestamp}) {  # delta time but no base
226          $p->{timestamp} = $now;
227          $p->{base_timestamp} = $p->{timestamp} - $time/1000;  # estimate
228        } else {  # delta time in ms since base_timestamp
229          $p->{timestamp} = $p->{base_timestamp} + $time/1000;
230        }
231        $p->{tick} = $now;
232      }
233    } elsif ($msgstr =~ /^am\.proc\.(busy|idle) /) {
234      my($subscription_chan, @pid_list) = split(' ',$msgstr);
235      my $now = Time::HiRes::time;
236      for my $pid (@pid_list) {
237        if ($process{$pid}) {
238          $p->{tick} = $now;
239        } else {
240          $process{$pid} = $p = {
241            state => $1 eq 'busy' ? '?' : ' ',
242            base_timestamp => $now, timestamp => $now, tick => $now,
243          };
244        }
245      }
246    } else {
247      print STDERR "Unrecognized message received: $msgstr\n";
248    }
249    last if !$more;
250  }
251  1;
252}
253
254use vars qw($peak_active $last_peak_reading_time);
255BEGIN { $peak_active = 0 }
256sub display_state() {
257  my $num_idling = 0;
258  my $num_active = 0;
259  my $now = Time::HiRes::time;
260  for my $pid (sort { $a <=> $b } keys %process) {
261    my $p = $process{$pid};
262    my $idling = !defined $p->{task_id} && $p->{state} =~ /^[. ]\z/s;
263    my $age = $now - $p->{base_timestamp};
264    if ($idling) {
265      $num_idling++;
266      $p->{state_bars} = '';
267      next;  # suppress reporting idle processes (or comment-out)
268    } else {
269      $num_active++;
270      my $len = int($age + 0.5);
271      $len = 1  if $len < 1;
272      my $str = $p->{state_bars};
273      $str = ''  if !defined $str;
274      if ($len > length $str) {  # replicate last character to desired size
275        my $ch = $str eq '' ? '=' : substr($str,-1,1);
276        $str .= $ch x ($len - length $str);
277      }
278      substr($str,$len-1,1) = $p->{state};
279      $p->{state_bars} = $str;
280    }
281    printf STDERR ("PID %5d: %-11s %s\n",
282                   $pid, $p->{task_id} || $p->{state},
283                   fmt_age($age, $p->{state_bars}, $idling) );
284  }
285  $now = Time::HiRes::time;
286  if ($num_active > $peak_active) {
287    $peak_active = $num_active;
288  } elsif ($peak_active >= 0.1) {  # exponential decay of a peak indicator
289    my $halflife = 8;  # seconds
290    my $weight = exp(-(($now - $last_peak_reading_time) / $halflife) * log(2));
291    $peak_active *= $weight;
292    $peak_active = $num_active  if $num_active > $peak_active;
293  }
294  $last_peak_reading_time = $now;
295  my $bar = ('*' x $num_active) . ('.' x $num_idling);
296  my $ipeak_active = int($peak_active+0.5);
297  if ($ipeak_active > $num_active) {
298    my $padding = $ipeak_active - ($num_active + $num_idling);
299    $bar .= ' ' x $padding  if $padding > 0;
300    substr($bar, $num_active, $ipeak_active-$num_active) =
301      ':' x ($ipeak_active-$num_active);
302  }
303  printf STDERR ("%d active, %d idling processes\n", $num_active, $num_idling);
304  printf STDERR ("%s\n", $bar);
305}
306
307# main program starts here
308
309  my $normal_termination = 0;
310  $SIG{INT}  = sub { die "\n" };  # do the END code block when interrupted
311  $SIG{TERM} = sub { die "\n" };  # do the END code block when killed
312
313  while (@ARGV) {
314    my $opt = shift @ARGV;
315    my $val = shift @ARGV;
316    if ($opt eq '-w' && $val =~ /^\+?\d+(?:\.\d*)?\z/) { $wakeuptime = $val }
317    elsif ($opt eq '-c' && $val =~ /^[+-]?\d+\z/) { $repeatcount = $val }
318    else { usage(); exit 1 }
319  }
320
321  $zmq_ctx = zmq_init();
322  $zmq_ctx or die "Can't create ZMQ context: $!";
323  $zmq_sock = zmq_socket($zmq_ctx,ZMQ_SUB);
324  $zmq_sock or die "Can't create ZMQ socket: $!";
325
326  my $sock_ipv4only = 1;  # a ZMQ default
327  if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) {
328    zmq_setsockopt($zmq_sock, ZMQ_IPV4ONLY(), 0) != -1
329      or die "zmq_setsockopt failed: $!";
330    $sock_ipv4only = 0;
331  }
332  zmq_setsockopt($zmq_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1
333    or die "zmq_setsockopt SUBSCRIBE failed: $!";
334  zmq_setsockopt($zmq_sock, ZMQ_SUBSCRIBE, 'am.proc.') != -1
335    or die "zmq_setsockopt SUBSCRIBE failed: $!";
336
337  zmq_connect($zmq_sock, $outer_sock_specs) != -1
338    or die "zmq_connect to $outer_sock_specs failed: $!";
339
340  print <<'EOD';
341process-id task-id     elapsed in    elapsed-bar (dots indicate idle)
342           or state   idle or busy
343EOD
344
345  my $last_display_time;
346  for (;;) {
347    if (defined $repeatcount) {
348      last  if $repeatcount <= 0;
349      $repeatcount--;
350    }
351    $| = 0;
352    print "\n";
353
354    my $now = Time::HiRes::time;
355    my $redraw_at =
356      defined $last_display_time ? $last_display_time + $wakeuptime
357                                 : $now + 0.2;
358    for (;;) {
359      my $timeout = $redraw_at - $now;
360      $timeout = 0  if $timeout < 0;
361      $any_events = 0;
362      zmq_poll(
363        [
364          { socket => $zmq_sock,
365            events => ZMQ_POLLIN,
366            callback => \&process_message,
367          },
368        ],
369        $timeout * $zmq_poll_units
370      ) != -1  or die "zmq_poll failed: $!";
371      $now = Time::HiRes::time;
372      last if $now >= $redraw_at;
373      last if $any_events && $now > $last_display_time + 0.2;
374    }
375
376    while (my($pid,$p) = each %process) {  # remove stale entries
377      delete $process{$pid}  if $p && $now - $p->{tick} > 10*60;
378    }
379    display_state();
380    $last_display_time = Time::HiRes::time;
381
382    $| = 1;  # flush STDOUT
383  } # forever
384
385  $normal_termination = 1;
386
387END {
388  # ignoring status
389  zmq_close($zmq_sock) if $zmq_sock;
390  zmq_term($zmq_ctx)   if $zmq_ctx;
391  print "exited\n" if !$normal_termination;
392
393  close(STDOUT) or die "Error closing STDOUT: $!";
394  close(STDERR) or die "Error closing STDERR: $!";
395}
396