1# This program is copyright 2008-2011 Percona Ireland Ltd.
2# Feedback and improvements are welcome.
3#
4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
7#
8# This program is free software; you can redistribute it and/or modify it under
9# the terms of the GNU General Public License as published by the Free Software
10# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
11# systems, you can issue `man perlgpl' or `man perlartistic' to read these
12# licenses.
13#
14# You should have received a copy of the GNU General Public License along with
15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
16# Place, Suite 330, Boston, MA  02111-1307  USA.
17# ###########################################################################
18# EventAggregator package
19# ###########################################################################
20{
21# Package: EventAggregator
22# EventAggregator aggregates event values and calculates basic statistics.
23package EventAggregator;
24
25use strict;
26use warnings FATAL => 'all';
27use English qw(-no_match_vars);
28use constant PTDEBUG => $ENV{PTDEBUG} || 0;
29
30use List::Util qw(min max);
31use Data::Dumper;
32$Data::Dumper::Indent    = 1;
33$Data::Dumper::Sortkeys  = 1;
34$Data::Dumper::Quotekeys = 0;
35
36# ###########################################################################
37# Set up some constants for bucketing values.  It is impossible to keep all
38# values seen in memory, but putting them into logarithmically scaled buckets
39# and just incrementing the bucket each time works, although it is imprecise.
40# See http://code.google.com/p/maatkit/wiki/EventAggregatorInternals.
41# ###########################################################################
42use constant BUCK_SIZE   => 1.05;
43use constant BASE_LOG    => log(BUCK_SIZE);
44use constant BASE_OFFSET => abs(1 - log(0.000001) / BASE_LOG); # 284.1617969
45use constant NUM_BUCK    => 1000;
46use constant MIN_BUCK    => .000001;
47
48# Used in buckets_of() to map buckets of log10 to log1.05 buckets.
49my @buck_vals = map { bucket_value($_); } (0..NUM_BUCK-1);
50
51# Sub: new
52#
53# Parameters:
54#   %args - Arguments
55#
56# Required Arguments:
57#   groupby - Attribute to group/aggregate classes by.
58#   worst   - Attribute which defines the worst event in a class.
59#             Samples of the worst attribute are saved.
60#
61# Optional Arguments:
62#   attributes        - Hashref of attributes to aggregate.  Keys are attribute
63#                       names used in the EventAggregator object and values are
64#                       attribute names used to get values from events.
65#                       Multiple attrib names in the arrayref specify alternate
66#                       attrib names in the event.  Example:
67#                       Fruit => ['apple', 'orange'].  An attrib called "Fruit"
68#                       will be created using the event's "apple" value or,
69#                       if that attrib doesn't exist, its "orange" value.
70#                       If this option isn't specified, then then all attributes#                       are auto-detected and aggregated.
71#   ignore_attributes - Arrayref of auto-detected attributes to ignore.
72#                       This does not apply to the attributes specified
73#                       with the optional attributes option above.
74#   unroll_limit      - If this many events have been processed and some
75#                       handlers haven't been generated yet (due to lack
76#                       of sample data), unroll the loop anyway. (default 1000)
77#   attrib_limit      - Sanity limit for attribute values.  If the value
78#                       exceeds the limit, use the last-seen for this class;
79#                       if none, then 0.
80#   type_for          - Hashref of attribute=>type pairs.  See $type in
81#                       <make_handler()> for the list of types.
82#
83# Returns:
84#   EventAggregator object
85sub new {
86   my ( $class, %args ) = @_;
87   foreach my $arg ( qw(groupby worst) ) {
88      die "I need a $arg argument" unless $args{$arg};
89   }
90   my $attributes = $args{attributes} || {};
91   my $self = {
92      groupby        => $args{groupby},
93      detect_attribs => scalar keys %$attributes == 0 ? 1 : 0,
94      all_attribs    => [ keys %$attributes ],
95      ignore_attribs => {
96         map  { $_ => $args{attributes}->{$_} }
97         grep { $_ ne $args{groupby} }
98         @{$args{ignore_attributes}}
99      },
100      attributes     => {
101         map  { $_ => $args{attributes}->{$_} }
102         grep { $_ ne $args{groupby} }
103         keys %$attributes
104      },
105      alt_attribs    => {
106         map  { $_ => make_alt_attrib(@{$args{attributes}->{$_}}) }
107         grep { $_ ne $args{groupby} }
108         keys %$attributes
109      },
110      worst          => $args{worst},
111      unroll_limit   => $ENV{PT_QUERY_DIGEST_CHECK_ATTRIB_LIMIT} || 1000,
112      attrib_limit   => $args{attrib_limit},
113      result_classes => {},
114      result_globals => {},
115      result_samples => {},
116      class_metrics  => {},
117      global_metrics => {},
118      n_events       => 0,
119      unrolled_loops => undef,
120      type_for       => { %{$args{type_for} || { Query_time => 'num' }} },
121   };
122   return bless $self, $class;
123}
124
125# Delete all collected data, but don't delete things like the generated
126# subroutines.  Resetting aggregated data is an interesting little exercise.
127# The generated functions that do aggregation have private namespaces with
128# references to some of the data.  Thus, they will not necessarily do as
129# expected if the stored data is simply wiped out.  Instead, it needs to be
130# zeroed out without replacing the actual objects.
131sub reset_aggregated_data {
132   my ( $self ) = @_;
133   foreach my $class ( values %{$self->{result_classes}} ) {
134      foreach my $attrib ( values %$class ) {
135         delete @{$attrib}{keys %$attrib};
136      }
137   }
138   foreach my $class ( values %{$self->{result_globals}} ) {
139      delete @{$class}{keys %$class};
140   }
141   delete @{$self->{result_samples}}{keys %{$self->{result_samples}}};
142   $self->{n_events} = 0;
143}
144
145# Aggregate an event hashref's properties.  Code is built on the fly to do this,
146# based on the values being passed in.  After code is built for every attribute
147# (or 50 events are seen and we decide to give up) the little bits of code get
148# unrolled into a whole subroutine to handle events.  For that reason, you can't
149# re-use an instance.
150sub aggregate {
151   my ( $self, $event ) = @_;
152
153   my $group_by = $event->{$self->{groupby}};
154   return unless defined $group_by;
155
156   $self->{n_events}++;
157   PTDEBUG && _d('Event', $self->{n_events});
158
159   # Run only unrolled loops if available.
160   return $self->{unrolled_loops}->($self, $event, $group_by)
161      if $self->{unrolled_loops};
162
163   # For the first unroll_limit events, auto-detect new attribs and
164   # run attrib handlers.
165   if ( $self->{n_events} <= $self->{unroll_limit} ) {
166
167      $self->add_new_attributes($event) if $self->{detect_attribs};
168
169      ATTRIB:
170      foreach my $attrib ( keys %{$self->{attributes}} ) {
171
172         # Attrib auto-detection can add a lot of attributes which some events
173         # may or may not have.  Aggregating a nonexistent attrib is wasteful,
174         # so we check that the attrib or one of its alternates exists.  If
175         # one does, then we leave attrib alone because the handler sub will
176         # also check alternates.
177         if ( !exists $event->{$attrib} ) {
178            PTDEBUG && _d("attrib doesn't exist in event:", $attrib);
179            my $alt_attrib = $self->{alt_attribs}->{$attrib}->($event);
180            PTDEBUG && _d('alt attrib:', $alt_attrib);
181            next ATTRIB unless $alt_attrib;
182         }
183
184         # The value of the attribute ( $group_by ) may be an arrayref.
185         GROUPBY:
186         foreach my $val ( ref $group_by ? @$group_by : ($group_by) ) {
187            my $class_attrib  = $self->{result_classes}->{$val}->{$attrib} ||= {};
188            my $global_attrib = $self->{result_globals}->{$attrib} ||= {};
189            my $samples       = $self->{result_samples};
190            my $handler = $self->{handlers}->{ $attrib };
191            if ( !$handler ) {
192               $handler = $self->make_handler(
193                  event      => $event,
194                  attribute  => $attrib,
195                  alternates => $self->{attributes}->{$attrib},
196                  worst      => $self->{worst} eq $attrib,
197               );
198               $self->{handlers}->{$attrib} = $handler;
199            }
200            next GROUPBY unless $handler;
201            $samples->{$val} ||= $event; # Initialize to the first event.
202            $handler->($event, $class_attrib, $global_attrib, $samples, $group_by);
203         }
204      }
205   }
206   else {
207      # After unroll_limit events, unroll the loops.
208      $self->_make_unrolled_loops($event);
209      # Run unrolled loops here once.  Next time, they'll be ran
210      # before this if-else.
211      $self->{unrolled_loops}->($self, $event, $group_by);
212   }
213
214   return;
215}
216
217sub _make_unrolled_loops {
218   my ( $self, $event ) = @_;
219
220   my $group_by = $event->{$self->{groupby}};
221
222   # All attributes have handlers, so let's combine them into one faster sub.
223   # Start by getting direct handles to the location of each data store and
224   # thing that would otherwise be looked up via hash keys.
225   my @attrs   = grep { $self->{handlers}->{$_} } keys %{$self->{attributes}};
226   my $globs   = $self->{result_globals}; # Global stats for each
227   my $samples = $self->{result_samples};
228
229   # Now the tricky part -- must make sure only the desired variables from
230   # the outer scope are re-used, and any variables that should have their
231   # own scope are declared within the subroutine.
232   my @lines = (
233      'my ( $self, $event, $group_by ) = @_;',
234      'my ($val, $class, $global, $idx);',
235      (ref $group_by ? ('foreach my $group_by ( @$group_by ) {') : ()),
236      # Create and get each attribute's storage
237      'my $temp = $self->{result_classes}->{ $group_by }
238         ||= { map { $_ => { } } @attrs };',
239      '$samples->{$group_by} ||= $event;', # Always start with the first.
240   );
241   foreach my $i ( 0 .. $#attrs ) {
242      # Access through array indexes, it's faster than hash lookups
243      push @lines, (
244         '$class  = $temp->{\''  . $attrs[$i] . '\'};',
245         '$global = $globs->{\'' . $attrs[$i] . '\'};',
246         $self->{unrolled_for}->{$attrs[$i]},
247      );
248   }
249   if ( ref $group_by ) {
250      push @lines, '}'; # Close the loop opened above
251   }
252   @lines = map { s/^/   /gm; $_ } @lines; # Indent for debugging
253   unshift @lines, 'sub {';
254   push @lines, '}';
255
256   # Make the subroutine.
257   my $code = join("\n", @lines);
258   PTDEBUG && _d('Unrolled subroutine:', @lines);
259   my $sub = eval $code;
260   die $EVAL_ERROR if $EVAL_ERROR;
261   $self->{unrolled_loops} = $sub;
262
263   return;
264}
265
266# Return the aggregated results.
267sub results {
268   my ( $self ) = @_;
269   return {
270      classes => $self->{result_classes},
271      globals => $self->{result_globals},
272      samples => $self->{result_samples},
273   };
274}
275
276sub set_results {
277   my ( $self, $results ) = @_;
278   $self->{result_classes} = $results->{classes};
279   $self->{result_globals} = $results->{globals};
280   $self->{result_samples} = $results->{samples};
281   return;
282}
283
284sub stats {
285   my ( $self ) = @_;
286   return {
287      classes => $self->{class_metrics},
288      globals => $self->{global_metrics},
289   };
290}
291
292# Return the attributes that this object is tracking, and their data types, as
293# a hashref of name => type.
294sub attributes {
295   my ( $self ) = @_;
296   return $self->{type_for};
297}
298
299sub set_attribute_types {
300   my ( $self, $attrib_types ) = @_;
301   $self->{type_for} = $attrib_types;
302   return;
303}
304
305# Returns the type of the attribute (as decided by the aggregation process,
306# which inspects the values).
307sub type_for {
308   my ( $self, $attrib ) = @_;
309   return $self->{type_for}->{$attrib};
310}
311
312# Sub: make_handler
313#   Make an attribute handler subroutine for <aggregate()>.  Each attribute
314#   needs a handler to keep trach of the min and max values, worst sample, etc.
315#   Handlers differ depending on the type of attribute (num, bool or string).
316#
317# Parameters:
318#   %args - Arguments
319#
320# Required Arguments:
321#   event     - Event hashref
322#   attribute - Attribute name
323#
324# Optional Arguments:
325#   alternates - Arrayref of alternate names for the attribute
326#   worst      - Keep a sample of the attribute's worst value (default no)
327#
328# Returns:
329#   A subroutine that can aggregate the attribute.
330sub make_handler {
331   my ( $self, %args ) = @_;
332   my @required_args = qw(event attribute);
333   foreach my $arg ( @required_args ) {
334      die "I need a $arg argument" unless $args{$arg};
335   }
336   my ($event, $attrib) = @args{@required_args};
337
338   my $val;
339   eval { $val= $self->_get_value(%args); };
340   if ( $EVAL_ERROR ) {
341      PTDEBUG && _d("Cannot make", $attrib, "handler:", $EVAL_ERROR);
342      return;
343   }
344   return unless defined $val; # can't determine type if it's undef
345
346   # Ripped off from Regexp::Common::number and modified.
347   my $float_re = qr{[+-]?(?:(?=\d|[.])\d+(?:[.])\d{0,})(?:E[+-]?\d+)?}i;
348   my $type = $self->type_for($attrib)           ? $self->type_for($attrib)
349            : $attrib =~ m/_crc$/                ? 'string'
350            : $val    =~ m/^(?:\d+|$float_re)$/o ? 'num'
351            : $val    =~ m/^(?:Yes|No)$/         ? 'bool'
352            :                                      'string';
353   PTDEBUG && _d('Type for', $attrib, 'is', $type, '(sample:', $val, ')');
354   $self->{type_for}->{$attrib} = $type;
355
356   # ########################################################################
357   # Begin creating the handler subroutine by writing lines of code.
358   # ########################################################################
359   my @lines;
360
361   # Some attrib types don't need us to track sum, unq or all--and some do.
362   my %track = (
363      sum => $type =~ m/num|bool/    ? 1 : 0,  # sum of values
364      unq => $type =~ m/bool|string/ ? 1 : 0,  # count of unique values seen
365      all => $type eq 'num'          ? 1 : 0,  # all values in bucketed list
366   );
367
368   # First, do any transformations to the value if needed.  Right now,
369   # it's just bool type attribs that need to be transformed.
370   my $trf = ($type eq 'bool') ? q{(($val || '') eq 'Yes') ? 1 : 0}
371           :                     undef;
372   if ( $trf ) {
373      push @lines, q{$val = } . $trf . ';';
374   }
375
376   # Handle broken Query_time like 123.124345.8382 (issue 234).
377   if ( $attrib eq 'Query_time' ) {
378      push @lines, (
379         '$val =~ s/^(\d+(?:\.\d+)?).*/$1/;',
380         '$event->{\''.$attrib.'\'} = $val;',
381      );
382   }
383
384   # Make sure the value is constrained to legal limits.  If it's out of
385   # bounds, just use the last-seen value for it.
386   if ( $type eq 'num' && $self->{attrib_limit} ) {
387      push @lines, (
388         "if ( \$val > $self->{attrib_limit} ) {",
389         '   $val = $class->{last} ||= 0;',
390         '}',
391         '$class->{last} = $val;',
392      );
393   }
394
395   # Update values for this attrib in the class and global stores.  We write
396   # code for each store.  The placeholder word PLACE is replaced with either
397   # $class or $global at the end of the loop.
398   my $lt = $type eq 'num' ? '<' : 'lt';
399   my $gt = $type eq 'num' ? '>' : 'gt';
400   foreach my $place ( qw($class $global) ) {
401      my @tmp;  # hold lines until PLACE placeholder is replaced
402
403      # Track count of any and all values seen for this attribute.
404      # This is mostly used for class->Query_time->cnt which represents
405      # the number of queries in the class because all queries (should)
406      # have a Query_time attribute.
407      push @tmp, '++PLACE->{cnt};';  # count of all values seen
408
409      # CRC attribs are bucketed in 1k buckets by % 1_000.  We must
410      # convert the val early so min and max don't show, e.g. 996791064
411      # whereas unq will contain 64 (996791064 % 1_000).
412      if ( $attrib =~ m/_crc$/ ) {
413         push @tmp, '$val = $val % 1_000;';
414      }
415
416      # Track min, max and sum of values.  Min and max for strings is
417      # mostly used for timestamps; min ts is earliest and max ts is latest.
418      push @tmp, (
419         'PLACE->{min} = $val if !defined PLACE->{min} || $val '
420            . $lt . ' PLACE->{min};',
421      );
422      push @tmp, (
423         'PLACE->{max} = $val if !defined PLACE->{max} || $val '
424         . $gt . ' PLACE->{max};',
425      );
426      if ( $track{sum} ) {
427         push @tmp, 'PLACE->{sum} += $val;';
428      }
429
430      # Save all values in a bucketed list.  See bucket_idx() below.
431      if ( $track{all} ) {
432         push @tmp, (
433            'exists PLACE->{all} or PLACE->{all} = {};',
434            '++PLACE->{all}->{ EventAggregator::bucket_idx($val) };',
435         );
436      }
437
438      # Replace PLACE with current variable, $class or $global.
439      push @lines, map { s/PLACE/$place/g; $_ } @tmp;
440   }
441
442   # We only save unique and worst values for the class, not globally.
443   if ( $track{unq} ) {
444      push @lines, '++$class->{unq}->{$val}';
445   }
446
447   if ( $args{worst} ) {
448      my $op = $type eq 'num' ? '>=' : 'ge';
449      push @lines, (
450         'if ( $val ' . $op . ' ($class->{max} || 0) ) {',
451         '   $samples->{$group_by} = $event;',
452         '}',
453      );
454   }
455
456   # Make the core code.  This part is saved for later, as part of an
457   # "unrolled" subroutine.
458   my @unrolled = (
459      # Get $val from primary attrib name.
460      "\$val = \$event->{'$attrib'};",
461
462      # Get $val from alternate attrib names.
463      ( map  { "\$val = \$event->{'$_'} unless defined \$val;" }
464        grep { $_ ne $attrib } @{$args{alternates}}
465      ),
466
467      # Execute the code lines, if $val is defined.
468      'defined $val && do {',
469         @lines,
470      '};',
471   );
472   $self->{unrolled_for}->{$attrib} = join("\n", @unrolled);
473
474   # Finally, make a complete subroutine by wrapping the core code inside
475   # a "sub { ... }" template.
476   my @code = (
477      'sub {',
478         # Get args and define all variables.
479         'my ( $event, $class, $global, $samples, $group_by ) = @_;',
480         'my ($val, $idx);',
481
482         # Core code from above.
483         $self->{unrolled_for}->{$attrib},
484
485         'return;',
486      '}',
487   );
488   $self->{code_for}->{$attrib} = join("\n", @code);
489   PTDEBUG && _d($attrib, 'handler code:', $self->{code_for}->{$attrib});
490   my $sub = eval $self->{code_for}->{$attrib};
491   if ( $EVAL_ERROR ) {
492      die "Failed to compile $attrib handler code: $EVAL_ERROR";
493   }
494
495   return $sub;
496}
497
498# Sub: bucket_idx
499#   Return the bucket number for the given value. Buck numbers are zero-indexed,
500#   so although there are 1,000 buckets (NUM_BUCK), 999 is the greatest idx.
501#
502#   Notice that this sub is not a class method, so either call it
503#   from inside this module like bucket_idx() or outside this module
504#   like EventAggregator::bucket_idx().
505#
506#   The bucketed list works this way: each range of values from MIN_BUCK in
507#   increments of BUCK_SIZE (that is 5%) we consider a bucket.  We keep NUM_BUCK
508#   buckets.  The upper end of the range is more than 1.5e15 so it should be big
509#   enough for almost anything.  The buckets are accessed by log base BUCK_SIZE,
510#   so floor(log(N)/log(BUCK_SIZE)).  The smallest bucket's index is -284. We
511#   shift all values up 284 so we have values from 0 to 999 that can be used as
512#   array indexes.  A value that falls into a bucket simply increments the array
513#   entry.  We do NOT use POSIX::floor() because it is too expensive.
514#
515#   This eliminates the need to keep and sort all values to calculate median,
516#   standard deviation, 95th percentile, etc.  So memory usage is bounded by
517#   the number of distinct aggregated values, not the number of events.
518#
519#   TODO: could export this by default to avoid having to specific packge::.
520#
521# Parameters:
522#   $val - Numeric value to bucketize
523#
524# Returns:
525#   Bucket number (0 to NUM_BUCK-1) for the value
526sub bucket_idx {
527   my ( $val ) = @_;
528   return 0 if $val < MIN_BUCK;
529   my $idx = int(BASE_OFFSET + log($val)/BASE_LOG);
530   return $idx > (NUM_BUCK-1) ? (NUM_BUCK-1) : $idx;
531}
532
533# Sub: bucket_value
534#   Return the value corresponding to the given bucket.  The value of each
535#   bucket is the first value that it covers. So the value of bucket 1 is
536#   0.000001000 because it covers [0.000001000, 0.000001050).
537#
538#   Notice that this sub is not a class method, so either call it
539#   from inside this module like bucket_idx() or outside this module
540#   like EventAggregator::bucket_value().
541#
542#   TODO: could export this by default to avoid having to specific packge::.
543#
544# Parameters:
545#   $bucket - Bucket number (0 to NUM_BUCK-1)
546#
547# Returns:
548#   Numeric value corresponding to the bucket
549sub bucket_value {
550   my ( $bucket ) = @_;
551   return 0 if $bucket == 0;
552   die "Invalid bucket: $bucket" if $bucket < 0 || $bucket > (NUM_BUCK-1);
553   # $bucket - 1 because buckets are shifted up by 1 to handle zero values.
554   return (BUCK_SIZE**($bucket-1)) * MIN_BUCK;
555}
556
557# Map the 1,000 base 1.05 buckets to 8 base 10 buckets. Returns an array
558# of 1,000 buckets, the value of each represents its index in an 8 bucket
559# base 10 array. For example: base 10 bucket 0 represents vals (0, 0.000010),
560# and base 1.05 buckets 0..47 represent vals (0, 0.000010401). So the first
561# 48 elements of the returned array will have 0 as their values.
562# TODO: right now it's hardcoded to buckets of 10, in the future maybe not.
563{
564   my @buck_tens;
565   sub buckets_of {
566      return @buck_tens if @buck_tens;
567
568      # To make a more precise map, we first set the starting values for
569      # each of the 8 base 10 buckets.
570      my $start_bucket  = 0;
571      my @base10_starts = (0);
572      map { push @base10_starts, (10**$_)*MIN_BUCK } (1..7);
573
574      # Then find the base 1.05 buckets that correspond to each
575      # base 10 bucket. The last value in each bucket's range belongs
576      # to the next bucket, so $next_bucket-1 represents the real last
577      # base 1.05 bucket in which the base 10 bucket's range falls.
578      for my $base10_bucket ( 0..($#base10_starts-1) ) {
579         my $next_bucket = bucket_idx( $base10_starts[$base10_bucket+1] );
580         PTDEBUG && _d('Base 10 bucket', $base10_bucket, 'maps to',
581            'base 1.05 buckets', $start_bucket, '..', $next_bucket-1);
582         for my $base1_05_bucket ($start_bucket..($next_bucket-1)) {
583            $buck_tens[$base1_05_bucket] = $base10_bucket;
584         }
585         $start_bucket = $next_bucket;
586      }
587
588      # Map all remaining base 1.05 buckets to base 10 bucket 7 which
589      # is for vals > 10.
590      map { $buck_tens[$_] = 7 } ($start_bucket..(NUM_BUCK-1));
591
592      return @buck_tens;
593   }
594}
595
596# Calculate 95%, stddev and median for numeric attributes in the
597# global and classes stores that have all values (1k buckets).
598# Save the metrics in global_metrics and class_metrics.
599sub calculate_statistical_metrics {
600   my ( $self, %args ) = @_;
601   my $classes        = $self->{result_classes};
602   my $globals        = $self->{result_globals};
603   my $class_metrics  = $self->{class_metrics};
604   my $global_metrics = $self->{global_metrics};
605   PTDEBUG && _d('Calculating statistical_metrics');
606   foreach my $attrib ( keys %$globals ) {
607      if ( exists $globals->{$attrib}->{all} ) {
608         $global_metrics->{$attrib}
609            = $self->_calc_metrics(
610               $globals->{$attrib}->{all},
611               $globals->{$attrib},
612            );
613      }
614
615      foreach my $class ( keys %$classes ) {
616         if ( exists $classes->{$class}->{$attrib}->{all} ) {
617            $class_metrics->{$class}->{$attrib}
618               = $self->_calc_metrics(
619                  $classes->{$class}->{$attrib}->{all},
620                  $classes->{$class}->{$attrib}
621               );
622         }
623      }
624   }
625
626   return;
627}
628
629# Given a hashref of vals, returns a hashref with the following
630# statistical metrics:
631#
632#    pct_95    => top bucket value in the 95th percentile
633#    cutoff    => How many values fall into the 95th percentile
634#    stddev    => of all values
635#    median    => of all values
636#
637# The vals hashref represents the buckets as per the above (see the comments
638# at the top of this file).  $args should contain cnt, min and max properties.
639sub _calc_metrics {
640   my ( $self, $vals, $args ) = @_;
641   my $statistical_metrics = {
642      pct_95    => 0,
643      stddev    => 0,
644      median    => 0,
645      cutoff    => undef,
646   };
647
648   # These cases might happen when there is nothing to get from the event, for
649   # example, processlist sniffing doesn't gather Rows_examined, so $args won't
650   # have {cnt} or other properties.
651   return $statistical_metrics
652      unless defined $vals && %$vals && $args->{cnt};
653
654   # Return accurate metrics for some cases.
655   my $n_vals = $args->{cnt};
656   if ( $n_vals == 1 || $args->{max} == $args->{min} ) {
657      my $v      = $args->{max} || 0;
658      my $bucket = int(6 + ( log($v > 0 ? $v : MIN_BUCK) / log(10)));
659      $bucket    = $bucket > 7 ? 7 : $bucket < 0 ? 0 : $bucket;
660      return {
661         pct_95 => $v,
662         stddev => 0,
663         median => $v,
664         cutoff => $n_vals,
665      };
666   }
667   elsif ( $n_vals == 2 ) {
668      foreach my $v ( $args->{min}, $args->{max} ) {
669         my $bucket = int(6 + ( log($v && $v > 0 ? $v : MIN_BUCK) / log(10)));
670         $bucket = $bucket > 7 ? 7 : $bucket < 0 ? 0 : $bucket;
671      }
672      my $v      = $args->{max} || 0;
673      my $mean = (($args->{min} || 0) + $v) / 2;
674      return {
675         pct_95 => $v,
676         stddev => sqrt((($v - $mean) ** 2) *2),
677         median => $mean,
678         cutoff => $n_vals,
679      };
680   }
681
682   # Determine cutoff point for 95% if there are at least 10 vals.  Cutoff
683   # serves also for the number of vals left in the 95%.  E.g. with 50 vals
684   # the cutoff is 47 which means there are 47 vals: 0..46.  $cutoff is NOT
685   # an array index.
686   my $cutoff = $n_vals >= 10 ? int ( $n_vals * 0.95 ) : $n_vals;
687   $statistical_metrics->{cutoff} = $cutoff;
688
689   # Calculate the standard deviation and median of all values.
690   my $total_left = $n_vals;
691   my $top_vals   = $n_vals - $cutoff; # vals > 95th
692   my $sum_excl   = 0;
693   my $sum        = 0;
694   my $sumsq      = 0;
695   my $mid        = int($n_vals / 2);
696   my $median     = 0;
697   my $prev       = NUM_BUCK-1; # Used for getting median when $cutoff is odd
698   my $bucket_95  = 0; # top bucket in 95th
699
700   PTDEBUG && _d('total vals:', $total_left, 'top vals:', $top_vals, 'mid:', $mid);
701
702   # In ancient times we kept an array of 1k buckets for each numeric
703   # attrib.  Each such array cost 32_300 bytes of memory (that's not
704   # a typo; yes, it was verified).  But measurements showed that only
705   # 1% of the buckets were used on average, meaning 99% of 32_300 was
706   # wasted.  Now we store only the used buckets in a hashref which we
707   # map to a 1k bucket array for processing, so we don't have to tinker
708   # with the delitcate code below.
709   # http://code.google.com/p/maatkit/issues/detail?id=866
710   my @buckets = map { 0 } (0..NUM_BUCK-1);
711   map { $buckets[$_] = $vals->{$_} } keys %$vals;
712   $vals = \@buckets;  # repoint vals from given hashref to our array
713
714   BUCKET:
715   for my $bucket ( reverse 0..(NUM_BUCK-1) ) {
716      my $val = $vals->[$bucket];
717      next BUCKET unless $val;
718
719      $total_left -= $val;
720      $sum_excl   += $val;
721      $bucket_95   = $bucket if !$bucket_95 && $sum_excl > $top_vals;
722
723      if ( !$median && $total_left <= $mid ) {
724         $median = (($cutoff % 2) || ($val > 1)) ? $buck_vals[$bucket]
725                 : ($buck_vals[$bucket] + $buck_vals[$prev]) / 2;
726      }
727
728      $sum    += $val * $buck_vals[$bucket];
729      $sumsq  += $val * ($buck_vals[$bucket]**2);
730      $prev   =  $bucket;
731   }
732
733   my $var      = $sumsq/$n_vals - ( ($sum/$n_vals) ** 2 );
734   my $stddev   = $var > 0 ? sqrt($var) : 0;
735   my $maxstdev = (($args->{max} || 0) - ($args->{min} || 0)) / 2;
736   $stddev      = $stddev > $maxstdev ? $maxstdev : $stddev;
737
738   PTDEBUG && _d('sum:', $sum, 'sumsq:', $sumsq, 'stddev:', $stddev,
739      'median:', $median, 'prev bucket:', $prev,
740      'total left:', $total_left, 'sum excl', $sum_excl,
741      'bucket 95:', $bucket_95, $buck_vals[$bucket_95]);
742
743   $statistical_metrics->{stddev} = $stddev;
744   $statistical_metrics->{pct_95} = $buck_vals[$bucket_95];
745   $statistical_metrics->{median} = $median;
746
747   return $statistical_metrics;
748}
749
750# Return a hashref of the metrics for some attribute, pre-digested.
751# %args is:
752#  attrib => the attribute to report on
753#  where  => the value of the fingerprint for the attrib
754sub metrics {
755   my ( $self, %args ) = @_;
756   foreach my $arg ( qw(attrib where) ) {
757      die "I need a $arg argument" unless defined $args{$arg};
758   }
759   my $attrib = $args{attrib};
760   my $where   = $args{where};
761
762   my $stats      = $self->results();
763   my $metrics    = $self->stats();
764   my $store      = $stats->{classes}->{$where}->{$attrib};
765   my $global_cnt = $stats->{globals}->{$attrib}->{cnt};
766
767   return {
768      cnt    => $store->{cnt},
769      pct    => $global_cnt && $store->{cnt} ? $store->{cnt} / $global_cnt : 0,
770      sum    => $store->{sum},
771      min    => $store->{min},
772      max    => $store->{max},
773      avg    => $store->{sum} && $store->{cnt} ? $store->{sum} / $store->{cnt} : 0,
774      median => $metrics->{classes}->{$where}->{$attrib}->{median} || 0,
775      pct_95 => $metrics->{classes}->{$where}->{$attrib}->{pct_95} || 0,
776      stddev => $metrics->{classes}->{$where}->{$attrib}->{stddev} || 0,
777   };
778}
779
780# Find the top N or top % event keys, in sorted order, optionally including
781# outliers (ol_...) that are notable for some reason.  %args looks like this:
782#
783#  attrib      order-by attribute (usually Query_time)
784#  orderby     order-by aggregate expression (should be numeric, usually sum)
785#  total       include events whose summed attribs are <= this number...
786#  count       ...or this many events, whichever is less...
787#  ol_attrib   ...or events where the 95th percentile of this attribute...
788#  ol_limit    ...is greater than this value, AND...
789#  ol_freq     ...the event occurred at least this many times.
790# The return value is two arrayref.  The first is a list of arrayrefs of the
791# chosen (top) events.  Each arrayref is the event key and an explanation of
792# why it was included (top|outlier).  The second is a list of the non-top
793# event keys.
794sub top_events {
795   my ( $self, %args ) = @_;
796   my $classes = $self->{result_classes};
797   my @sorted = reverse sort { # Sorted list of $groupby values
798      $classes->{$a}->{$args{attrib}}->{$args{orderby}}
799         <=> $classes->{$b}->{$args{attrib}}->{$args{orderby}}
800      } grep {
801         # Defensive programming
802         defined $classes->{$_}->{$args{attrib}}->{$args{orderby}}
803      } keys %$classes;
804   my @chosen;  # top events
805   my @other;   # other events (< top)
806   my ($total, $count) = (0, 0);
807   foreach my $groupby ( @sorted ) {
808      # Events that fall into the top criterion for some reason
809      if (
810         (!$args{total} || $total < $args{total} )
811         && ( !$args{count} || $count < $args{count} )
812      ) {
813         push @chosen, [$groupby, 'top', $count+1];
814      }
815
816      # Events that are notable outliers
817      elsif ( $args{ol_attrib} && (!$args{ol_freq}
818         || $classes->{$groupby}->{$args{ol_attrib}}->{cnt} >= $args{ol_freq})
819      ) {
820         my $stats = $self->{class_metrics}->{$groupby}->{$args{ol_attrib}};
821         if ( ($stats->{pct_95} || 0) >= $args{ol_limit} ) {
822            push @chosen, [$groupby, 'outlier', $count+1];
823         }
824         else {
825            push @other, [$groupby, 'misc', $count+1];
826         }
827      }
828
829      # Events not in the top criterion
830      else {
831         push @other, [$groupby, 'misc', $count+1];
832      }
833
834      $total += $classes->{$groupby}->{$args{attrib}}->{$args{orderby}};
835      $count++;
836   }
837   return \@chosen, \@other;
838}
839
840# Adds all new attributes in $event to $self->{attributes}.
841sub add_new_attributes {
842   my ( $self, $event ) = @_;
843   return unless $event;
844
845   map {
846      my $attrib = $_;
847      $self->{attributes}->{$attrib}  = [$attrib];
848      $self->{alt_attribs}->{$attrib} = make_alt_attrib($attrib);
849      push @{$self->{all_attribs}}, $attrib;
850      PTDEBUG && _d('Added new attribute:', $attrib);
851   }
852   grep {
853      $_ ne $self->{groupby}
854      && !exists $self->{attributes}->{$_}
855      && !exists $self->{ignore_attribs}->{$_}
856   }
857   keys %$event;
858
859   return;
860}
861
862# Returns an arrayref of all the attributes that were either given
863# explicitly to new() or that were auto-detected.
864sub get_attributes {
865   my ( $self ) = @_;
866   return $self->{all_attribs};
867}
868
869sub events_processed {
870   my ( $self ) = @_;
871   return $self->{n_events};
872}
873
874sub make_alt_attrib {
875   my ( @attribs ) = @_;
876
877   my $attrib = shift @attribs;  # Primary attribute.
878   return sub {} unless @attribs;  # No alternates.
879
880   my @lines;
881   push @lines, 'sub { my ( $event ) = @_; my $alt_attrib;';
882   push @lines, map  {
883         "\$alt_attrib = '$_' if !defined \$alt_attrib "
884         . "&& exists \$event->{'$_'};"
885      } @attribs;
886   push @lines, 'return $alt_attrib; }';
887   PTDEBUG && _d('alt attrib sub for', $attrib, ':', @lines);
888   my $sub = eval join("\n", @lines);
889   die if $EVAL_ERROR;
890   return $sub;
891}
892
893# Merge/add the given arrayref of EventAggregator objects.
894# Returns a new EventAggregator obj.
895sub merge {
896   my ( @ea_objs ) = @_;
897   PTDEBUG && _d('Merging', scalar @ea_objs, 'ea');
898   return unless scalar @ea_objs;
899
900   # If all the ea don't have the same groupby and worst then adding
901   # them will produce a nonsensical result.  (Maybe not if worst
902   # differs but certainly if groupby differs).  And while checking this...
903   my $ea1   = shift @ea_objs;
904   my $r1    = $ea1->results;
905   my $worst = $ea1->{worst};  # for merging, finding worst sample
906
907   # ...get all attributes and their types to properly initialize the
908   # returned ea obj;
909   my %attrib_types = %{ $ea1->attributes() };
910
911   foreach my $ea ( @ea_objs ) {
912      die "EventAggregator objects have different groupby: "
913         . "$ea1->{groupby} and $ea->{groupby}"
914         unless $ea1->{groupby} eq $ea->{groupby};
915      die "EventAggregator objects have different worst: "
916         . "$ea1->{worst} and $ea->{worst}"
917         unless $ea1->{worst} eq $ea->{worst};
918
919      my $attrib_types = $ea->attributes();
920      map {
921         $attrib_types{$_} = $attrib_types->{$_}
922            unless exists $attrib_types{$_};
923      } keys %$attrib_types;
924   }
925
926   # First, deep copy the first ea obj.  Do not shallow copy, do deep copy
927   # so the returned ea is truly its own obj and does not point to data
928   # structs in one of the given ea.
929   my $r_merged = {
930      classes => {},
931      globals => _deep_copy_attribs($r1->{globals}),
932      samples => {},
933   };
934   map {
935      $r_merged->{classes}->{$_}
936         = _deep_copy_attribs($r1->{classes}->{$_});
937
938      @{$r_merged->{samples}->{$_}}{keys %{$r1->{samples}->{$_}}}
939         = values %{$r1->{samples}->{$_}};
940   } keys %{$r1->{classes}};
941
942   # Then, merge/add the other eas.  r1* is the eventual return val.
943   # r2* is the current ea being merged/added into r1*.
944   for my $i ( 0..$#ea_objs ) {
945      PTDEBUG && _d('Merging ea obj', ($i + 1));
946      my $r2 = $ea_objs[$i]->results;
947
948      # Descend into each class (e.g. unique query/fingerprint), each
949      # attribute (e.g. Query_time, etc.), and then each attribute
950      # value (e.g. min, max, etc.).  If either a class or attrib is
951      # missing in one of the results, deep copy the extant class/attrib;
952      # if both exist, add/merge the results.
953      eval {
954         CLASS:
955         foreach my $class ( keys %{$r2->{classes}} ) {
956            my $r1_class = $r_merged->{classes}->{$class};
957            my $r2_class = $r2->{classes}->{$class};
958
959            if ( $r1_class && $r2_class ) {
960               # Class exists in both results.  Add/merge all their attributes.
961               CLASS_ATTRIB:
962               foreach my $attrib ( keys %$r2_class ) {
963                  PTDEBUG && _d('merge', $attrib);
964                  if ( $r1_class->{$attrib} && $r2_class->{$attrib} ) {
965                     _add_attrib_vals($r1_class->{$attrib}, $r2_class->{$attrib});
966                  }
967                  elsif ( !$r1_class->{$attrib} ) {
968                  PTDEBUG && _d('copy', $attrib);
969                     $r1_class->{$attrib} =
970                        _deep_copy_attrib_vals($r2_class->{$attrib})
971                  }
972               }
973            }
974            elsif ( !$r1_class ) {
975               # Class is missing in r1; deep copy it from r2.
976               PTDEBUG && _d('copy class');
977               $r_merged->{classes}->{$class} = _deep_copy_attribs($r2_class);
978            }
979
980            # Update the worst sample if either the r2 sample is worst than
981            # the r1 or there's no such sample in r1.
982            my $new_worst_sample;
983            if ( $r_merged->{samples}->{$class} && $r2->{samples}->{$class} ) {
984               if (   $r2->{samples}->{$class}->{$worst}
985                    > $r_merged->{samples}->{$class}->{$worst} ) {
986                  $new_worst_sample = $r2->{samples}->{$class}
987               }
988            }
989            elsif ( !$r_merged->{samples}->{$class} ) {
990               $new_worst_sample = $r2->{samples}->{$class};
991            }
992            # Events don't have references to other data structs
993            # so we don't have to worry about doing a deep copy.
994            if ( $new_worst_sample ) {
995               PTDEBUG && _d('New worst sample:', $worst, '=',
996                  $new_worst_sample->{$worst}, 'item:', substr($class, 0, 100));
997               my %new_sample;
998               @new_sample{keys %$new_worst_sample}
999                  = values %$new_worst_sample;
1000               $r_merged->{samples}->{$class} = \%new_sample;
1001            }
1002         }
1003      };
1004      if ( $EVAL_ERROR ) {
1005         warn "Error merging class/sample: $EVAL_ERROR";
1006      }
1007
1008      # Same as above but for the global attribs/vals.
1009      eval {
1010         GLOBAL_ATTRIB:
1011         PTDEBUG && _d('Merging global attributes');
1012         foreach my $attrib ( keys %{$r2->{globals}} ) {
1013            my $r1_global = $r_merged->{globals}->{$attrib};
1014            my $r2_global = $r2->{globals}->{$attrib};
1015
1016            if ( $r1_global && $r2_global ) {
1017               # Global attrib exists in both results.  Add/merge all its values.
1018               PTDEBUG && _d('merge', $attrib);
1019               _add_attrib_vals($r1_global, $r2_global);
1020            }
1021            elsif ( !$r1_global ) {
1022               # Global attrib is missing in r1; deep cpoy it from r2 global.
1023               PTDEBUG && _d('copy', $attrib);
1024               $r_merged->{globals}->{$attrib}
1025                  = _deep_copy_attrib_vals($r2_global);
1026            }
1027         }
1028      };
1029      if ( $EVAL_ERROR ) {
1030         warn "Error merging globals: $EVAL_ERROR";
1031      }
1032   }
1033
1034   # Create a new EventAggregator obj, initialize it with the summed results,
1035   # and return it.
1036   my $ea_merged = new EventAggregator(
1037      groupby    => $ea1->{groupby},
1038      worst      => $ea1->{worst},
1039      attributes => { map { $_=>[$_] } keys %attrib_types },
1040   );
1041   $ea_merged->set_results($r_merged);
1042   $ea_merged->set_attribute_types(\%attrib_types);
1043   return $ea_merged;
1044}
1045
1046# Adds/merges vals2 attrib values into vals1.
1047sub _add_attrib_vals {
1048   my ( $vals1, $vals2 ) = @_;
1049
1050   # Assuming both sets of values are the same attribute (that's the caller
1051   # responsibility), each should have the same values (min, max, unq, etc.)
1052   foreach my $val ( keys %$vals1 ) {
1053      my $val1 = $vals1->{$val};
1054      my $val2 = $vals2->{$val};
1055
1056      if ( (!ref $val1) && (!ref $val2) ) {
1057         # min, max, cnt, sum should never be undef.
1058         die "undefined $val value" unless defined $val1 && defined $val2;
1059
1060         # Value is scalar but return unless it's numeric.
1061         # Only numeric values have "sum".
1062         my $is_num = exists $vals1->{sum} ? 1 : 0;
1063         if ( $val eq 'max' ) {
1064            if ( $is_num ) {
1065               $vals1->{$val} = $val1 > $val2  ? $val1 : $val2;
1066            }
1067            else {
1068               $vals1->{$val} = $val1 gt $val2 ? $val1 : $val2;
1069            }
1070         }
1071         elsif ( $val eq 'min' ) {
1072            if ( $is_num ) {
1073               $vals1->{$val} = $val1 < $val2  ? $val1 : $val2;
1074            }
1075            else {
1076               $vals1->{$val} = $val1 lt $val2 ? $val1 : $val2;
1077            }
1078         }
1079         else {
1080            $vals1->{$val} += $val2;
1081         }
1082      }
1083      elsif ( (ref $val1 eq 'ARRAY') && (ref $val2 eq 'ARRAY') ) {
1084         # Value is an arrayref, so it should be 1k buckets.
1085         # Should never be empty.
1086         die "Empty $val arrayref" unless @$val1 && @$val2;
1087         my $n_buckets = (scalar @$val1) - 1;
1088         for my $i ( 0..$n_buckets ) {
1089            $vals1->{$val}->[$i] += $val2->[$i];
1090         }
1091      }
1092      elsif ( (ref $val1 eq 'HASH')  && (ref $val2 eq 'HASH')  ) {
1093         # Value is a hashref, probably for unq string occurences.
1094         # Should never be empty.
1095         die "Empty $val hashref" unless %$val1 and %$val2;
1096         map { $vals1->{$val}->{$_} += $val2->{$_} } keys %$val2;
1097      }
1098      else {
1099         # This shouldn't happen.
1100         PTDEBUG && _d('vals1:', Dumper($vals1));
1101         PTDEBUG && _d('vals2:', Dumper($vals2));
1102         die "$val type mismatch";
1103      }
1104   }
1105
1106   return;
1107}
1108
1109# These _deep_copy_* subs only go 1 level deep because, so far,
1110# no ea data struct has a ref any deeper.
1111sub _deep_copy_attribs {
1112   my ( $attribs ) = @_;
1113   my $copy = {};
1114   foreach my $attrib ( keys %$attribs ) {
1115      $copy->{$attrib} = _deep_copy_attrib_vals($attribs->{$attrib});
1116   }
1117   return $copy;
1118}
1119
1120sub _deep_copy_attrib_vals {
1121   my ( $vals ) = @_;
1122   my $copy;
1123   if ( ref $vals eq 'HASH' ) {
1124      $copy = {};
1125      foreach my $val ( keys %$vals ) {
1126         if ( my $ref_type = ref $val ) {
1127            if ( $ref_type eq 'ARRAY' ) {
1128               my $n_elems = (scalar @$val) - 1;
1129               $copy->{$val} = [ map { undef } ( 0..$n_elems ) ];
1130               for my $i ( 0..$n_elems ) {
1131                  $copy->{$val}->[$i] = $vals->{$val}->[$i];
1132               }
1133            }
1134            elsif ( $ref_type eq 'HASH' ) {
1135               $copy->{$val} = {};
1136               map { $copy->{$val}->{$_} += $vals->{$val}->{$_} }
1137                  keys %{$vals->{$val}}
1138            }
1139            else {
1140               die "I don't know how to deep copy a $ref_type reference";
1141            }
1142         }
1143         else {
1144            $copy->{$val} = $vals->{$val};
1145         }
1146      }
1147   }
1148   else {
1149      $copy = $vals;
1150   }
1151   return $copy;
1152}
1153
1154# Sub: _get_value
1155#   Get the value of the attribute (or one of its alternatives) from the event.
1156#   Undef is a valid value.  If the attrib or none of its alternatives exist
1157#   in the event, then this sub dies.
1158#
1159# Parameters:
1160#   %args - Arguments
1161#
1162# Required Arguments:
1163#   event     - Event hashref
1164#   attribute - Attribute name
1165#
1166# Optional Arguments:
1167#   alternates - Arrayref of alternate attribute names
1168#
1169# Returns:
1170#   Value of attribute in the event, possibly undef
1171sub _get_value {
1172   my ( $self, %args ) = @_;
1173   my ($event, $attrib, $alts) = @args{qw(event attribute alternates)};
1174   return unless $event && $attrib;
1175
1176   my $value;
1177   if ( exists $event->{$attrib} ) {
1178      $value = $event->{$attrib};
1179   }
1180   elsif ( $alts ) {
1181      my $found_value = 0;
1182      foreach my $alt_attrib( @$alts ) {
1183         if ( exists $event->{$alt_attrib} ) {
1184            $value       = $event->{$alt_attrib};
1185            $found_value = 1;
1186            last;
1187         }
1188      }
1189      die "Event does not have attribute $attrib or any of its alternates"
1190         unless $found_value;
1191   }
1192   else {
1193      die "Event does not have attribute $attrib and there are no alterantes";
1194   }
1195
1196   return $value;
1197}
1198
1199sub _d {
1200   my ($package, undef, $line) = caller 0;
1201   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
1202        map { defined $_ ? $_ : 'undef' }
1203        @_;
1204   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
1205}
1206
12071;
1208}
1209# ###########################################################################
1210# End EventAggregator package
1211# ###########################################################################
1212