1# This program is copyright 2008-2011 Percona Ireland Ltd. 2# Feedback and improvements are welcome. 3# 4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED 5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF 6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. 7# 8# This program is free software; you can redistribute it and/or modify it under 9# the terms of the GNU General Public License as published by the Free Software 10# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar 11# systems, you can issue `man perlgpl' or `man perlartistic' to read these 12# licenses. 13# 14# You should have received a copy of the GNU General Public License along with 15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple 16# Place, Suite 330, Boston, MA 02111-1307 USA. 17# ########################################################################### 18# EventAggregator package 19# ########################################################################### 20{ 21# Package: EventAggregator 22# EventAggregator aggregates event values and calculates basic statistics. 23package EventAggregator; 24 25use strict; 26use warnings FATAL => 'all'; 27use English qw(-no_match_vars); 28use constant PTDEBUG => $ENV{PTDEBUG} || 0; 29 30use List::Util qw(min max); 31use Data::Dumper; 32$Data::Dumper::Indent = 1; 33$Data::Dumper::Sortkeys = 1; 34$Data::Dumper::Quotekeys = 0; 35 36# ########################################################################### 37# Set up some constants for bucketing values. It is impossible to keep all 38# values seen in memory, but putting them into logarithmically scaled buckets 39# and just incrementing the bucket each time works, although it is imprecise. 40# See http://code.google.com/p/maatkit/wiki/EventAggregatorInternals. 41# ########################################################################### 42use constant BUCK_SIZE => 1.05; 43use constant BASE_LOG => log(BUCK_SIZE); 44use constant BASE_OFFSET => abs(1 - log(0.000001) / BASE_LOG); # 284.1617969 45use constant NUM_BUCK => 1000; 46use constant MIN_BUCK => .000001; 47 48# Used in buckets_of() to map buckets of log10 to log1.05 buckets. 49my @buck_vals = map { bucket_value($_); } (0..NUM_BUCK-1); 50 51# Sub: new 52# 53# Parameters: 54# %args - Arguments 55# 56# Required Arguments: 57# groupby - Attribute to group/aggregate classes by. 58# worst - Attribute which defines the worst event in a class. 59# Samples of the worst attribute are saved. 60# 61# Optional Arguments: 62# attributes - Hashref of attributes to aggregate. Keys are attribute 63# names used in the EventAggregator object and values are 64# attribute names used to get values from events. 65# Multiple attrib names in the arrayref specify alternate 66# attrib names in the event. Example: 67# Fruit => ['apple', 'orange']. An attrib called "Fruit" 68# will be created using the event's "apple" value or, 69# if that attrib doesn't exist, its "orange" value. 70# If this option isn't specified, then then all attributes# are auto-detected and aggregated. 71# ignore_attributes - Arrayref of auto-detected attributes to ignore. 72# This does not apply to the attributes specified 73# with the optional attributes option above. 74# unroll_limit - If this many events have been processed and some 75# handlers haven't been generated yet (due to lack 76# of sample data), unroll the loop anyway. (default 1000) 77# attrib_limit - Sanity limit for attribute values. If the value 78# exceeds the limit, use the last-seen for this class; 79# if none, then 0. 80# type_for - Hashref of attribute=>type pairs. See $type in 81# <make_handler()> for the list of types. 82# 83# Returns: 84# EventAggregator object 85sub new { 86 my ( $class, %args ) = @_; 87 foreach my $arg ( qw(groupby worst) ) { 88 die "I need a $arg argument" unless $args{$arg}; 89 } 90 my $attributes = $args{attributes} || {}; 91 my $self = { 92 groupby => $args{groupby}, 93 detect_attribs => scalar keys %$attributes == 0 ? 1 : 0, 94 all_attribs => [ keys %$attributes ], 95 ignore_attribs => { 96 map { $_ => $args{attributes}->{$_} } 97 grep { $_ ne $args{groupby} } 98 @{$args{ignore_attributes}} 99 }, 100 attributes => { 101 map { $_ => $args{attributes}->{$_} } 102 grep { $_ ne $args{groupby} } 103 keys %$attributes 104 }, 105 alt_attribs => { 106 map { $_ => make_alt_attrib(@{$args{attributes}->{$_}}) } 107 grep { $_ ne $args{groupby} } 108 keys %$attributes 109 }, 110 worst => $args{worst}, 111 unroll_limit => $ENV{PT_QUERY_DIGEST_CHECK_ATTRIB_LIMIT} || 1000, 112 attrib_limit => $args{attrib_limit}, 113 result_classes => {}, 114 result_globals => {}, 115 result_samples => {}, 116 class_metrics => {}, 117 global_metrics => {}, 118 n_events => 0, 119 unrolled_loops => undef, 120 type_for => { %{$args{type_for} || { Query_time => 'num' }} }, 121 }; 122 return bless $self, $class; 123} 124 125# Delete all collected data, but don't delete things like the generated 126# subroutines. Resetting aggregated data is an interesting little exercise. 127# The generated functions that do aggregation have private namespaces with 128# references to some of the data. Thus, they will not necessarily do as 129# expected if the stored data is simply wiped out. Instead, it needs to be 130# zeroed out without replacing the actual objects. 131sub reset_aggregated_data { 132 my ( $self ) = @_; 133 foreach my $class ( values %{$self->{result_classes}} ) { 134 foreach my $attrib ( values %$class ) { 135 delete @{$attrib}{keys %$attrib}; 136 } 137 } 138 foreach my $class ( values %{$self->{result_globals}} ) { 139 delete @{$class}{keys %$class}; 140 } 141 delete @{$self->{result_samples}}{keys %{$self->{result_samples}}}; 142 $self->{n_events} = 0; 143} 144 145# Aggregate an event hashref's properties. Code is built on the fly to do this, 146# based on the values being passed in. After code is built for every attribute 147# (or 50 events are seen and we decide to give up) the little bits of code get 148# unrolled into a whole subroutine to handle events. For that reason, you can't 149# re-use an instance. 150sub aggregate { 151 my ( $self, $event ) = @_; 152 153 my $group_by = $event->{$self->{groupby}}; 154 return unless defined $group_by; 155 156 $self->{n_events}++; 157 PTDEBUG && _d('Event', $self->{n_events}); 158 159 # Run only unrolled loops if available. 160 return $self->{unrolled_loops}->($self, $event, $group_by) 161 if $self->{unrolled_loops}; 162 163 # For the first unroll_limit events, auto-detect new attribs and 164 # run attrib handlers. 165 if ( $self->{n_events} <= $self->{unroll_limit} ) { 166 167 $self->add_new_attributes($event) if $self->{detect_attribs}; 168 169 ATTRIB: 170 foreach my $attrib ( keys %{$self->{attributes}} ) { 171 172 # Attrib auto-detection can add a lot of attributes which some events 173 # may or may not have. Aggregating a nonexistent attrib is wasteful, 174 # so we check that the attrib or one of its alternates exists. If 175 # one does, then we leave attrib alone because the handler sub will 176 # also check alternates. 177 if ( !exists $event->{$attrib} ) { 178 PTDEBUG && _d("attrib doesn't exist in event:", $attrib); 179 my $alt_attrib = $self->{alt_attribs}->{$attrib}->($event); 180 PTDEBUG && _d('alt attrib:', $alt_attrib); 181 next ATTRIB unless $alt_attrib; 182 } 183 184 # The value of the attribute ( $group_by ) may be an arrayref. 185 GROUPBY: 186 foreach my $val ( ref $group_by ? @$group_by : ($group_by) ) { 187 my $class_attrib = $self->{result_classes}->{$val}->{$attrib} ||= {}; 188 my $global_attrib = $self->{result_globals}->{$attrib} ||= {}; 189 my $samples = $self->{result_samples}; 190 my $handler = $self->{handlers}->{ $attrib }; 191 if ( !$handler ) { 192 $handler = $self->make_handler( 193 event => $event, 194 attribute => $attrib, 195 alternates => $self->{attributes}->{$attrib}, 196 worst => $self->{worst} eq $attrib, 197 ); 198 $self->{handlers}->{$attrib} = $handler; 199 } 200 next GROUPBY unless $handler; 201 $samples->{$val} ||= $event; # Initialize to the first event. 202 $handler->($event, $class_attrib, $global_attrib, $samples, $group_by); 203 } 204 } 205 } 206 else { 207 # After unroll_limit events, unroll the loops. 208 $self->_make_unrolled_loops($event); 209 # Run unrolled loops here once. Next time, they'll be ran 210 # before this if-else. 211 $self->{unrolled_loops}->($self, $event, $group_by); 212 } 213 214 return; 215} 216 217sub _make_unrolled_loops { 218 my ( $self, $event ) = @_; 219 220 my $group_by = $event->{$self->{groupby}}; 221 222 # All attributes have handlers, so let's combine them into one faster sub. 223 # Start by getting direct handles to the location of each data store and 224 # thing that would otherwise be looked up via hash keys. 225 my @attrs = grep { $self->{handlers}->{$_} } keys %{$self->{attributes}}; 226 my $globs = $self->{result_globals}; # Global stats for each 227 my $samples = $self->{result_samples}; 228 229 # Now the tricky part -- must make sure only the desired variables from 230 # the outer scope are re-used, and any variables that should have their 231 # own scope are declared within the subroutine. 232 my @lines = ( 233 'my ( $self, $event, $group_by ) = @_;', 234 'my ($val, $class, $global, $idx);', 235 (ref $group_by ? ('foreach my $group_by ( @$group_by ) {') : ()), 236 # Create and get each attribute's storage 237 'my $temp = $self->{result_classes}->{ $group_by } 238 ||= { map { $_ => { } } @attrs };', 239 '$samples->{$group_by} ||= $event;', # Always start with the first. 240 ); 241 foreach my $i ( 0 .. $#attrs ) { 242 # Access through array indexes, it's faster than hash lookups 243 push @lines, ( 244 '$class = $temp->{\'' . $attrs[$i] . '\'};', 245 '$global = $globs->{\'' . $attrs[$i] . '\'};', 246 $self->{unrolled_for}->{$attrs[$i]}, 247 ); 248 } 249 if ( ref $group_by ) { 250 push @lines, '}'; # Close the loop opened above 251 } 252 @lines = map { s/^/ /gm; $_ } @lines; # Indent for debugging 253 unshift @lines, 'sub {'; 254 push @lines, '}'; 255 256 # Make the subroutine. 257 my $code = join("\n", @lines); 258 PTDEBUG && _d('Unrolled subroutine:', @lines); 259 my $sub = eval $code; 260 die $EVAL_ERROR if $EVAL_ERROR; 261 $self->{unrolled_loops} = $sub; 262 263 return; 264} 265 266# Return the aggregated results. 267sub results { 268 my ( $self ) = @_; 269 return { 270 classes => $self->{result_classes}, 271 globals => $self->{result_globals}, 272 samples => $self->{result_samples}, 273 }; 274} 275 276sub set_results { 277 my ( $self, $results ) = @_; 278 $self->{result_classes} = $results->{classes}; 279 $self->{result_globals} = $results->{globals}; 280 $self->{result_samples} = $results->{samples}; 281 return; 282} 283 284sub stats { 285 my ( $self ) = @_; 286 return { 287 classes => $self->{class_metrics}, 288 globals => $self->{global_metrics}, 289 }; 290} 291 292# Return the attributes that this object is tracking, and their data types, as 293# a hashref of name => type. 294sub attributes { 295 my ( $self ) = @_; 296 return $self->{type_for}; 297} 298 299sub set_attribute_types { 300 my ( $self, $attrib_types ) = @_; 301 $self->{type_for} = $attrib_types; 302 return; 303} 304 305# Returns the type of the attribute (as decided by the aggregation process, 306# which inspects the values). 307sub type_for { 308 my ( $self, $attrib ) = @_; 309 return $self->{type_for}->{$attrib}; 310} 311 312# Sub: make_handler 313# Make an attribute handler subroutine for <aggregate()>. Each attribute 314# needs a handler to keep trach of the min and max values, worst sample, etc. 315# Handlers differ depending on the type of attribute (num, bool or string). 316# 317# Parameters: 318# %args - Arguments 319# 320# Required Arguments: 321# event - Event hashref 322# attribute - Attribute name 323# 324# Optional Arguments: 325# alternates - Arrayref of alternate names for the attribute 326# worst - Keep a sample of the attribute's worst value (default no) 327# 328# Returns: 329# A subroutine that can aggregate the attribute. 330sub make_handler { 331 my ( $self, %args ) = @_; 332 my @required_args = qw(event attribute); 333 foreach my $arg ( @required_args ) { 334 die "I need a $arg argument" unless $args{$arg}; 335 } 336 my ($event, $attrib) = @args{@required_args}; 337 338 my $val; 339 eval { $val= $self->_get_value(%args); }; 340 if ( $EVAL_ERROR ) { 341 PTDEBUG && _d("Cannot make", $attrib, "handler:", $EVAL_ERROR); 342 return; 343 } 344 return unless defined $val; # can't determine type if it's undef 345 346 # Ripped off from Regexp::Common::number and modified. 347 my $float_re = qr{[+-]?(?:(?=\d|[.])\d+(?:[.])\d{0,})(?:E[+-]?\d+)?}i; 348 my $type = $self->type_for($attrib) ? $self->type_for($attrib) 349 : $attrib =~ m/_crc$/ ? 'string' 350 : $val =~ m/^(?:\d+|$float_re)$/o ? 'num' 351 : $val =~ m/^(?:Yes|No)$/ ? 'bool' 352 : 'string'; 353 PTDEBUG && _d('Type for', $attrib, 'is', $type, '(sample:', $val, ')'); 354 $self->{type_for}->{$attrib} = $type; 355 356 # ######################################################################## 357 # Begin creating the handler subroutine by writing lines of code. 358 # ######################################################################## 359 my @lines; 360 361 # Some attrib types don't need us to track sum, unq or all--and some do. 362 my %track = ( 363 sum => $type =~ m/num|bool/ ? 1 : 0, # sum of values 364 unq => $type =~ m/bool|string/ ? 1 : 0, # count of unique values seen 365 all => $type eq 'num' ? 1 : 0, # all values in bucketed list 366 ); 367 368 # First, do any transformations to the value if needed. Right now, 369 # it's just bool type attribs that need to be transformed. 370 my $trf = ($type eq 'bool') ? q{(($val || '') eq 'Yes') ? 1 : 0} 371 : undef; 372 if ( $trf ) { 373 push @lines, q{$val = } . $trf . ';'; 374 } 375 376 # Handle broken Query_time like 123.124345.8382 (issue 234). 377 if ( $attrib eq 'Query_time' ) { 378 push @lines, ( 379 '$val =~ s/^(\d+(?:\.\d+)?).*/$1/;', 380 '$event->{\''.$attrib.'\'} = $val;', 381 ); 382 } 383 384 # Make sure the value is constrained to legal limits. If it's out of 385 # bounds, just use the last-seen value for it. 386 if ( $type eq 'num' && $self->{attrib_limit} ) { 387 push @lines, ( 388 "if ( \$val > $self->{attrib_limit} ) {", 389 ' $val = $class->{last} ||= 0;', 390 '}', 391 '$class->{last} = $val;', 392 ); 393 } 394 395 # Update values for this attrib in the class and global stores. We write 396 # code for each store. The placeholder word PLACE is replaced with either 397 # $class or $global at the end of the loop. 398 my $lt = $type eq 'num' ? '<' : 'lt'; 399 my $gt = $type eq 'num' ? '>' : 'gt'; 400 foreach my $place ( qw($class $global) ) { 401 my @tmp; # hold lines until PLACE placeholder is replaced 402 403 # Track count of any and all values seen for this attribute. 404 # This is mostly used for class->Query_time->cnt which represents 405 # the number of queries in the class because all queries (should) 406 # have a Query_time attribute. 407 push @tmp, '++PLACE->{cnt};'; # count of all values seen 408 409 # CRC attribs are bucketed in 1k buckets by % 1_000. We must 410 # convert the val early so min and max don't show, e.g. 996791064 411 # whereas unq will contain 64 (996791064 % 1_000). 412 if ( $attrib =~ m/_crc$/ ) { 413 push @tmp, '$val = $val % 1_000;'; 414 } 415 416 # Track min, max and sum of values. Min and max for strings is 417 # mostly used for timestamps; min ts is earliest and max ts is latest. 418 push @tmp, ( 419 'PLACE->{min} = $val if !defined PLACE->{min} || $val ' 420 . $lt . ' PLACE->{min};', 421 ); 422 push @tmp, ( 423 'PLACE->{max} = $val if !defined PLACE->{max} || $val ' 424 . $gt . ' PLACE->{max};', 425 ); 426 if ( $track{sum} ) { 427 push @tmp, 'PLACE->{sum} += $val;'; 428 } 429 430 # Save all values in a bucketed list. See bucket_idx() below. 431 if ( $track{all} ) { 432 push @tmp, ( 433 'exists PLACE->{all} or PLACE->{all} = {};', 434 '++PLACE->{all}->{ EventAggregator::bucket_idx($val) };', 435 ); 436 } 437 438 # Replace PLACE with current variable, $class or $global. 439 push @lines, map { s/PLACE/$place/g; $_ } @tmp; 440 } 441 442 # We only save unique and worst values for the class, not globally. 443 if ( $track{unq} ) { 444 push @lines, '++$class->{unq}->{$val}'; 445 } 446 447 if ( $args{worst} ) { 448 my $op = $type eq 'num' ? '>=' : 'ge'; 449 push @lines, ( 450 'if ( $val ' . $op . ' ($class->{max} || 0) ) {', 451 ' $samples->{$group_by} = $event;', 452 '}', 453 ); 454 } 455 456 # Make the core code. This part is saved for later, as part of an 457 # "unrolled" subroutine. 458 my @unrolled = ( 459 # Get $val from primary attrib name. 460 "\$val = \$event->{'$attrib'};", 461 462 # Get $val from alternate attrib names. 463 ( map { "\$val = \$event->{'$_'} unless defined \$val;" } 464 grep { $_ ne $attrib } @{$args{alternates}} 465 ), 466 467 # Execute the code lines, if $val is defined. 468 'defined $val && do {', 469 @lines, 470 '};', 471 ); 472 $self->{unrolled_for}->{$attrib} = join("\n", @unrolled); 473 474 # Finally, make a complete subroutine by wrapping the core code inside 475 # a "sub { ... }" template. 476 my @code = ( 477 'sub {', 478 # Get args and define all variables. 479 'my ( $event, $class, $global, $samples, $group_by ) = @_;', 480 'my ($val, $idx);', 481 482 # Core code from above. 483 $self->{unrolled_for}->{$attrib}, 484 485 'return;', 486 '}', 487 ); 488 $self->{code_for}->{$attrib} = join("\n", @code); 489 PTDEBUG && _d($attrib, 'handler code:', $self->{code_for}->{$attrib}); 490 my $sub = eval $self->{code_for}->{$attrib}; 491 if ( $EVAL_ERROR ) { 492 die "Failed to compile $attrib handler code: $EVAL_ERROR"; 493 } 494 495 return $sub; 496} 497 498# Sub: bucket_idx 499# Return the bucket number for the given value. Buck numbers are zero-indexed, 500# so although there are 1,000 buckets (NUM_BUCK), 999 is the greatest idx. 501# 502# Notice that this sub is not a class method, so either call it 503# from inside this module like bucket_idx() or outside this module 504# like EventAggregator::bucket_idx(). 505# 506# The bucketed list works this way: each range of values from MIN_BUCK in 507# increments of BUCK_SIZE (that is 5%) we consider a bucket. We keep NUM_BUCK 508# buckets. The upper end of the range is more than 1.5e15 so it should be big 509# enough for almost anything. The buckets are accessed by log base BUCK_SIZE, 510# so floor(log(N)/log(BUCK_SIZE)). The smallest bucket's index is -284. We 511# shift all values up 284 so we have values from 0 to 999 that can be used as 512# array indexes. A value that falls into a bucket simply increments the array 513# entry. We do NOT use POSIX::floor() because it is too expensive. 514# 515# This eliminates the need to keep and sort all values to calculate median, 516# standard deviation, 95th percentile, etc. So memory usage is bounded by 517# the number of distinct aggregated values, not the number of events. 518# 519# TODO: could export this by default to avoid having to specific packge::. 520# 521# Parameters: 522# $val - Numeric value to bucketize 523# 524# Returns: 525# Bucket number (0 to NUM_BUCK-1) for the value 526sub bucket_idx { 527 my ( $val ) = @_; 528 return 0 if $val < MIN_BUCK; 529 my $idx = int(BASE_OFFSET + log($val)/BASE_LOG); 530 return $idx > (NUM_BUCK-1) ? (NUM_BUCK-1) : $idx; 531} 532 533# Sub: bucket_value 534# Return the value corresponding to the given bucket. The value of each 535# bucket is the first value that it covers. So the value of bucket 1 is 536# 0.000001000 because it covers [0.000001000, 0.000001050). 537# 538# Notice that this sub is not a class method, so either call it 539# from inside this module like bucket_idx() or outside this module 540# like EventAggregator::bucket_value(). 541# 542# TODO: could export this by default to avoid having to specific packge::. 543# 544# Parameters: 545# $bucket - Bucket number (0 to NUM_BUCK-1) 546# 547# Returns: 548# Numeric value corresponding to the bucket 549sub bucket_value { 550 my ( $bucket ) = @_; 551 return 0 if $bucket == 0; 552 die "Invalid bucket: $bucket" if $bucket < 0 || $bucket > (NUM_BUCK-1); 553 # $bucket - 1 because buckets are shifted up by 1 to handle zero values. 554 return (BUCK_SIZE**($bucket-1)) * MIN_BUCK; 555} 556 557# Map the 1,000 base 1.05 buckets to 8 base 10 buckets. Returns an array 558# of 1,000 buckets, the value of each represents its index in an 8 bucket 559# base 10 array. For example: base 10 bucket 0 represents vals (0, 0.000010), 560# and base 1.05 buckets 0..47 represent vals (0, 0.000010401). So the first 561# 48 elements of the returned array will have 0 as their values. 562# TODO: right now it's hardcoded to buckets of 10, in the future maybe not. 563{ 564 my @buck_tens; 565 sub buckets_of { 566 return @buck_tens if @buck_tens; 567 568 # To make a more precise map, we first set the starting values for 569 # each of the 8 base 10 buckets. 570 my $start_bucket = 0; 571 my @base10_starts = (0); 572 map { push @base10_starts, (10**$_)*MIN_BUCK } (1..7); 573 574 # Then find the base 1.05 buckets that correspond to each 575 # base 10 bucket. The last value in each bucket's range belongs 576 # to the next bucket, so $next_bucket-1 represents the real last 577 # base 1.05 bucket in which the base 10 bucket's range falls. 578 for my $base10_bucket ( 0..($#base10_starts-1) ) { 579 my $next_bucket = bucket_idx( $base10_starts[$base10_bucket+1] ); 580 PTDEBUG && _d('Base 10 bucket', $base10_bucket, 'maps to', 581 'base 1.05 buckets', $start_bucket, '..', $next_bucket-1); 582 for my $base1_05_bucket ($start_bucket..($next_bucket-1)) { 583 $buck_tens[$base1_05_bucket] = $base10_bucket; 584 } 585 $start_bucket = $next_bucket; 586 } 587 588 # Map all remaining base 1.05 buckets to base 10 bucket 7 which 589 # is for vals > 10. 590 map { $buck_tens[$_] = 7 } ($start_bucket..(NUM_BUCK-1)); 591 592 return @buck_tens; 593 } 594} 595 596# Calculate 95%, stddev and median for numeric attributes in the 597# global and classes stores that have all values (1k buckets). 598# Save the metrics in global_metrics and class_metrics. 599sub calculate_statistical_metrics { 600 my ( $self, %args ) = @_; 601 my $classes = $self->{result_classes}; 602 my $globals = $self->{result_globals}; 603 my $class_metrics = $self->{class_metrics}; 604 my $global_metrics = $self->{global_metrics}; 605 PTDEBUG && _d('Calculating statistical_metrics'); 606 foreach my $attrib ( keys %$globals ) { 607 if ( exists $globals->{$attrib}->{all} ) { 608 $global_metrics->{$attrib} 609 = $self->_calc_metrics( 610 $globals->{$attrib}->{all}, 611 $globals->{$attrib}, 612 ); 613 } 614 615 foreach my $class ( keys %$classes ) { 616 if ( exists $classes->{$class}->{$attrib}->{all} ) { 617 $class_metrics->{$class}->{$attrib} 618 = $self->_calc_metrics( 619 $classes->{$class}->{$attrib}->{all}, 620 $classes->{$class}->{$attrib} 621 ); 622 } 623 } 624 } 625 626 return; 627} 628 629# Given a hashref of vals, returns a hashref with the following 630# statistical metrics: 631# 632# pct_95 => top bucket value in the 95th percentile 633# cutoff => How many values fall into the 95th percentile 634# stddev => of all values 635# median => of all values 636# 637# The vals hashref represents the buckets as per the above (see the comments 638# at the top of this file). $args should contain cnt, min and max properties. 639sub _calc_metrics { 640 my ( $self, $vals, $args ) = @_; 641 my $statistical_metrics = { 642 pct_95 => 0, 643 stddev => 0, 644 median => 0, 645 cutoff => undef, 646 }; 647 648 # These cases might happen when there is nothing to get from the event, for 649 # example, processlist sniffing doesn't gather Rows_examined, so $args won't 650 # have {cnt} or other properties. 651 return $statistical_metrics 652 unless defined $vals && %$vals && $args->{cnt}; 653 654 # Return accurate metrics for some cases. 655 my $n_vals = $args->{cnt}; 656 if ( $n_vals == 1 || $args->{max} == $args->{min} ) { 657 my $v = $args->{max} || 0; 658 my $bucket = int(6 + ( log($v > 0 ? $v : MIN_BUCK) / log(10))); 659 $bucket = $bucket > 7 ? 7 : $bucket < 0 ? 0 : $bucket; 660 return { 661 pct_95 => $v, 662 stddev => 0, 663 median => $v, 664 cutoff => $n_vals, 665 }; 666 } 667 elsif ( $n_vals == 2 ) { 668 foreach my $v ( $args->{min}, $args->{max} ) { 669 my $bucket = int(6 + ( log($v && $v > 0 ? $v : MIN_BUCK) / log(10))); 670 $bucket = $bucket > 7 ? 7 : $bucket < 0 ? 0 : $bucket; 671 } 672 my $v = $args->{max} || 0; 673 my $mean = (($args->{min} || 0) + $v) / 2; 674 return { 675 pct_95 => $v, 676 stddev => sqrt((($v - $mean) ** 2) *2), 677 median => $mean, 678 cutoff => $n_vals, 679 }; 680 } 681 682 # Determine cutoff point for 95% if there are at least 10 vals. Cutoff 683 # serves also for the number of vals left in the 95%. E.g. with 50 vals 684 # the cutoff is 47 which means there are 47 vals: 0..46. $cutoff is NOT 685 # an array index. 686 my $cutoff = $n_vals >= 10 ? int ( $n_vals * 0.95 ) : $n_vals; 687 $statistical_metrics->{cutoff} = $cutoff; 688 689 # Calculate the standard deviation and median of all values. 690 my $total_left = $n_vals; 691 my $top_vals = $n_vals - $cutoff; # vals > 95th 692 my $sum_excl = 0; 693 my $sum = 0; 694 my $sumsq = 0; 695 my $mid = int($n_vals / 2); 696 my $median = 0; 697 my $prev = NUM_BUCK-1; # Used for getting median when $cutoff is odd 698 my $bucket_95 = 0; # top bucket in 95th 699 700 PTDEBUG && _d('total vals:', $total_left, 'top vals:', $top_vals, 'mid:', $mid); 701 702 # In ancient times we kept an array of 1k buckets for each numeric 703 # attrib. Each such array cost 32_300 bytes of memory (that's not 704 # a typo; yes, it was verified). But measurements showed that only 705 # 1% of the buckets were used on average, meaning 99% of 32_300 was 706 # wasted. Now we store only the used buckets in a hashref which we 707 # map to a 1k bucket array for processing, so we don't have to tinker 708 # with the delitcate code below. 709 # http://code.google.com/p/maatkit/issues/detail?id=866 710 my @buckets = map { 0 } (0..NUM_BUCK-1); 711 map { $buckets[$_] = $vals->{$_} } keys %$vals; 712 $vals = \@buckets; # repoint vals from given hashref to our array 713 714 BUCKET: 715 for my $bucket ( reverse 0..(NUM_BUCK-1) ) { 716 my $val = $vals->[$bucket]; 717 next BUCKET unless $val; 718 719 $total_left -= $val; 720 $sum_excl += $val; 721 $bucket_95 = $bucket if !$bucket_95 && $sum_excl > $top_vals; 722 723 if ( !$median && $total_left <= $mid ) { 724 $median = (($cutoff % 2) || ($val > 1)) ? $buck_vals[$bucket] 725 : ($buck_vals[$bucket] + $buck_vals[$prev]) / 2; 726 } 727 728 $sum += $val * $buck_vals[$bucket]; 729 $sumsq += $val * ($buck_vals[$bucket]**2); 730 $prev = $bucket; 731 } 732 733 my $var = $sumsq/$n_vals - ( ($sum/$n_vals) ** 2 ); 734 my $stddev = $var > 0 ? sqrt($var) : 0; 735 my $maxstdev = (($args->{max} || 0) - ($args->{min} || 0)) / 2; 736 $stddev = $stddev > $maxstdev ? $maxstdev : $stddev; 737 738 PTDEBUG && _d('sum:', $sum, 'sumsq:', $sumsq, 'stddev:', $stddev, 739 'median:', $median, 'prev bucket:', $prev, 740 'total left:', $total_left, 'sum excl', $sum_excl, 741 'bucket 95:', $bucket_95, $buck_vals[$bucket_95]); 742 743 $statistical_metrics->{stddev} = $stddev; 744 $statistical_metrics->{pct_95} = $buck_vals[$bucket_95]; 745 $statistical_metrics->{median} = $median; 746 747 return $statistical_metrics; 748} 749 750# Return a hashref of the metrics for some attribute, pre-digested. 751# %args is: 752# attrib => the attribute to report on 753# where => the value of the fingerprint for the attrib 754sub metrics { 755 my ( $self, %args ) = @_; 756 foreach my $arg ( qw(attrib where) ) { 757 die "I need a $arg argument" unless defined $args{$arg}; 758 } 759 my $attrib = $args{attrib}; 760 my $where = $args{where}; 761 762 my $stats = $self->results(); 763 my $metrics = $self->stats(); 764 my $store = $stats->{classes}->{$where}->{$attrib}; 765 my $global_cnt = $stats->{globals}->{$attrib}->{cnt}; 766 767 return { 768 cnt => $store->{cnt}, 769 pct => $global_cnt && $store->{cnt} ? $store->{cnt} / $global_cnt : 0, 770 sum => $store->{sum}, 771 min => $store->{min}, 772 max => $store->{max}, 773 avg => $store->{sum} && $store->{cnt} ? $store->{sum} / $store->{cnt} : 0, 774 median => $metrics->{classes}->{$where}->{$attrib}->{median} || 0, 775 pct_95 => $metrics->{classes}->{$where}->{$attrib}->{pct_95} || 0, 776 stddev => $metrics->{classes}->{$where}->{$attrib}->{stddev} || 0, 777 }; 778} 779 780# Find the top N or top % event keys, in sorted order, optionally including 781# outliers (ol_...) that are notable for some reason. %args looks like this: 782# 783# attrib order-by attribute (usually Query_time) 784# orderby order-by aggregate expression (should be numeric, usually sum) 785# total include events whose summed attribs are <= this number... 786# count ...or this many events, whichever is less... 787# ol_attrib ...or events where the 95th percentile of this attribute... 788# ol_limit ...is greater than this value, AND... 789# ol_freq ...the event occurred at least this many times. 790# The return value is two arrayref. The first is a list of arrayrefs of the 791# chosen (top) events. Each arrayref is the event key and an explanation of 792# why it was included (top|outlier). The second is a list of the non-top 793# event keys. 794sub top_events { 795 my ( $self, %args ) = @_; 796 my $classes = $self->{result_classes}; 797 my @sorted = reverse sort { # Sorted list of $groupby values 798 $classes->{$a}->{$args{attrib}}->{$args{orderby}} 799 <=> $classes->{$b}->{$args{attrib}}->{$args{orderby}} 800 } grep { 801 # Defensive programming 802 defined $classes->{$_}->{$args{attrib}}->{$args{orderby}} 803 } keys %$classes; 804 my @chosen; # top events 805 my @other; # other events (< top) 806 my ($total, $count) = (0, 0); 807 foreach my $groupby ( @sorted ) { 808 # Events that fall into the top criterion for some reason 809 if ( 810 (!$args{total} || $total < $args{total} ) 811 && ( !$args{count} || $count < $args{count} ) 812 ) { 813 push @chosen, [$groupby, 'top', $count+1]; 814 } 815 816 # Events that are notable outliers 817 elsif ( $args{ol_attrib} && (!$args{ol_freq} 818 || $classes->{$groupby}->{$args{ol_attrib}}->{cnt} >= $args{ol_freq}) 819 ) { 820 my $stats = $self->{class_metrics}->{$groupby}->{$args{ol_attrib}}; 821 if ( ($stats->{pct_95} || 0) >= $args{ol_limit} ) { 822 push @chosen, [$groupby, 'outlier', $count+1]; 823 } 824 else { 825 push @other, [$groupby, 'misc', $count+1]; 826 } 827 } 828 829 # Events not in the top criterion 830 else { 831 push @other, [$groupby, 'misc', $count+1]; 832 } 833 834 $total += $classes->{$groupby}->{$args{attrib}}->{$args{orderby}}; 835 $count++; 836 } 837 return \@chosen, \@other; 838} 839 840# Adds all new attributes in $event to $self->{attributes}. 841sub add_new_attributes { 842 my ( $self, $event ) = @_; 843 return unless $event; 844 845 map { 846 my $attrib = $_; 847 $self->{attributes}->{$attrib} = [$attrib]; 848 $self->{alt_attribs}->{$attrib} = make_alt_attrib($attrib); 849 push @{$self->{all_attribs}}, $attrib; 850 PTDEBUG && _d('Added new attribute:', $attrib); 851 } 852 grep { 853 $_ ne $self->{groupby} 854 && !exists $self->{attributes}->{$_} 855 && !exists $self->{ignore_attribs}->{$_} 856 } 857 keys %$event; 858 859 return; 860} 861 862# Returns an arrayref of all the attributes that were either given 863# explicitly to new() or that were auto-detected. 864sub get_attributes { 865 my ( $self ) = @_; 866 return $self->{all_attribs}; 867} 868 869sub events_processed { 870 my ( $self ) = @_; 871 return $self->{n_events}; 872} 873 874sub make_alt_attrib { 875 my ( @attribs ) = @_; 876 877 my $attrib = shift @attribs; # Primary attribute. 878 return sub {} unless @attribs; # No alternates. 879 880 my @lines; 881 push @lines, 'sub { my ( $event ) = @_; my $alt_attrib;'; 882 push @lines, map { 883 "\$alt_attrib = '$_' if !defined \$alt_attrib " 884 . "&& exists \$event->{'$_'};" 885 } @attribs; 886 push @lines, 'return $alt_attrib; }'; 887 PTDEBUG && _d('alt attrib sub for', $attrib, ':', @lines); 888 my $sub = eval join("\n", @lines); 889 die if $EVAL_ERROR; 890 return $sub; 891} 892 893# Merge/add the given arrayref of EventAggregator objects. 894# Returns a new EventAggregator obj. 895sub merge { 896 my ( @ea_objs ) = @_; 897 PTDEBUG && _d('Merging', scalar @ea_objs, 'ea'); 898 return unless scalar @ea_objs; 899 900 # If all the ea don't have the same groupby and worst then adding 901 # them will produce a nonsensical result. (Maybe not if worst 902 # differs but certainly if groupby differs). And while checking this... 903 my $ea1 = shift @ea_objs; 904 my $r1 = $ea1->results; 905 my $worst = $ea1->{worst}; # for merging, finding worst sample 906 907 # ...get all attributes and their types to properly initialize the 908 # returned ea obj; 909 my %attrib_types = %{ $ea1->attributes() }; 910 911 foreach my $ea ( @ea_objs ) { 912 die "EventAggregator objects have different groupby: " 913 . "$ea1->{groupby} and $ea->{groupby}" 914 unless $ea1->{groupby} eq $ea->{groupby}; 915 die "EventAggregator objects have different worst: " 916 . "$ea1->{worst} and $ea->{worst}" 917 unless $ea1->{worst} eq $ea->{worst}; 918 919 my $attrib_types = $ea->attributes(); 920 map { 921 $attrib_types{$_} = $attrib_types->{$_} 922 unless exists $attrib_types{$_}; 923 } keys %$attrib_types; 924 } 925 926 # First, deep copy the first ea obj. Do not shallow copy, do deep copy 927 # so the returned ea is truly its own obj and does not point to data 928 # structs in one of the given ea. 929 my $r_merged = { 930 classes => {}, 931 globals => _deep_copy_attribs($r1->{globals}), 932 samples => {}, 933 }; 934 map { 935 $r_merged->{classes}->{$_} 936 = _deep_copy_attribs($r1->{classes}->{$_}); 937 938 @{$r_merged->{samples}->{$_}}{keys %{$r1->{samples}->{$_}}} 939 = values %{$r1->{samples}->{$_}}; 940 } keys %{$r1->{classes}}; 941 942 # Then, merge/add the other eas. r1* is the eventual return val. 943 # r2* is the current ea being merged/added into r1*. 944 for my $i ( 0..$#ea_objs ) { 945 PTDEBUG && _d('Merging ea obj', ($i + 1)); 946 my $r2 = $ea_objs[$i]->results; 947 948 # Descend into each class (e.g. unique query/fingerprint), each 949 # attribute (e.g. Query_time, etc.), and then each attribute 950 # value (e.g. min, max, etc.). If either a class or attrib is 951 # missing in one of the results, deep copy the extant class/attrib; 952 # if both exist, add/merge the results. 953 eval { 954 CLASS: 955 foreach my $class ( keys %{$r2->{classes}} ) { 956 my $r1_class = $r_merged->{classes}->{$class}; 957 my $r2_class = $r2->{classes}->{$class}; 958 959 if ( $r1_class && $r2_class ) { 960 # Class exists in both results. Add/merge all their attributes. 961 CLASS_ATTRIB: 962 foreach my $attrib ( keys %$r2_class ) { 963 PTDEBUG && _d('merge', $attrib); 964 if ( $r1_class->{$attrib} && $r2_class->{$attrib} ) { 965 _add_attrib_vals($r1_class->{$attrib}, $r2_class->{$attrib}); 966 } 967 elsif ( !$r1_class->{$attrib} ) { 968 PTDEBUG && _d('copy', $attrib); 969 $r1_class->{$attrib} = 970 _deep_copy_attrib_vals($r2_class->{$attrib}) 971 } 972 } 973 } 974 elsif ( !$r1_class ) { 975 # Class is missing in r1; deep copy it from r2. 976 PTDEBUG && _d('copy class'); 977 $r_merged->{classes}->{$class} = _deep_copy_attribs($r2_class); 978 } 979 980 # Update the worst sample if either the r2 sample is worst than 981 # the r1 or there's no such sample in r1. 982 my $new_worst_sample; 983 if ( $r_merged->{samples}->{$class} && $r2->{samples}->{$class} ) { 984 if ( $r2->{samples}->{$class}->{$worst} 985 > $r_merged->{samples}->{$class}->{$worst} ) { 986 $new_worst_sample = $r2->{samples}->{$class} 987 } 988 } 989 elsif ( !$r_merged->{samples}->{$class} ) { 990 $new_worst_sample = $r2->{samples}->{$class}; 991 } 992 # Events don't have references to other data structs 993 # so we don't have to worry about doing a deep copy. 994 if ( $new_worst_sample ) { 995 PTDEBUG && _d('New worst sample:', $worst, '=', 996 $new_worst_sample->{$worst}, 'item:', substr($class, 0, 100)); 997 my %new_sample; 998 @new_sample{keys %$new_worst_sample} 999 = values %$new_worst_sample; 1000 $r_merged->{samples}->{$class} = \%new_sample; 1001 } 1002 } 1003 }; 1004 if ( $EVAL_ERROR ) { 1005 warn "Error merging class/sample: $EVAL_ERROR"; 1006 } 1007 1008 # Same as above but for the global attribs/vals. 1009 eval { 1010 GLOBAL_ATTRIB: 1011 PTDEBUG && _d('Merging global attributes'); 1012 foreach my $attrib ( keys %{$r2->{globals}} ) { 1013 my $r1_global = $r_merged->{globals}->{$attrib}; 1014 my $r2_global = $r2->{globals}->{$attrib}; 1015 1016 if ( $r1_global && $r2_global ) { 1017 # Global attrib exists in both results. Add/merge all its values. 1018 PTDEBUG && _d('merge', $attrib); 1019 _add_attrib_vals($r1_global, $r2_global); 1020 } 1021 elsif ( !$r1_global ) { 1022 # Global attrib is missing in r1; deep cpoy it from r2 global. 1023 PTDEBUG && _d('copy', $attrib); 1024 $r_merged->{globals}->{$attrib} 1025 = _deep_copy_attrib_vals($r2_global); 1026 } 1027 } 1028 }; 1029 if ( $EVAL_ERROR ) { 1030 warn "Error merging globals: $EVAL_ERROR"; 1031 } 1032 } 1033 1034 # Create a new EventAggregator obj, initialize it with the summed results, 1035 # and return it. 1036 my $ea_merged = new EventAggregator( 1037 groupby => $ea1->{groupby}, 1038 worst => $ea1->{worst}, 1039 attributes => { map { $_=>[$_] } keys %attrib_types }, 1040 ); 1041 $ea_merged->set_results($r_merged); 1042 $ea_merged->set_attribute_types(\%attrib_types); 1043 return $ea_merged; 1044} 1045 1046# Adds/merges vals2 attrib values into vals1. 1047sub _add_attrib_vals { 1048 my ( $vals1, $vals2 ) = @_; 1049 1050 # Assuming both sets of values are the same attribute (that's the caller 1051 # responsibility), each should have the same values (min, max, unq, etc.) 1052 foreach my $val ( keys %$vals1 ) { 1053 my $val1 = $vals1->{$val}; 1054 my $val2 = $vals2->{$val}; 1055 1056 if ( (!ref $val1) && (!ref $val2) ) { 1057 # min, max, cnt, sum should never be undef. 1058 die "undefined $val value" unless defined $val1 && defined $val2; 1059 1060 # Value is scalar but return unless it's numeric. 1061 # Only numeric values have "sum". 1062 my $is_num = exists $vals1->{sum} ? 1 : 0; 1063 if ( $val eq 'max' ) { 1064 if ( $is_num ) { 1065 $vals1->{$val} = $val1 > $val2 ? $val1 : $val2; 1066 } 1067 else { 1068 $vals1->{$val} = $val1 gt $val2 ? $val1 : $val2; 1069 } 1070 } 1071 elsif ( $val eq 'min' ) { 1072 if ( $is_num ) { 1073 $vals1->{$val} = $val1 < $val2 ? $val1 : $val2; 1074 } 1075 else { 1076 $vals1->{$val} = $val1 lt $val2 ? $val1 : $val2; 1077 } 1078 } 1079 else { 1080 $vals1->{$val} += $val2; 1081 } 1082 } 1083 elsif ( (ref $val1 eq 'ARRAY') && (ref $val2 eq 'ARRAY') ) { 1084 # Value is an arrayref, so it should be 1k buckets. 1085 # Should never be empty. 1086 die "Empty $val arrayref" unless @$val1 && @$val2; 1087 my $n_buckets = (scalar @$val1) - 1; 1088 for my $i ( 0..$n_buckets ) { 1089 $vals1->{$val}->[$i] += $val2->[$i]; 1090 } 1091 } 1092 elsif ( (ref $val1 eq 'HASH') && (ref $val2 eq 'HASH') ) { 1093 # Value is a hashref, probably for unq string occurences. 1094 # Should never be empty. 1095 die "Empty $val hashref" unless %$val1 and %$val2; 1096 map { $vals1->{$val}->{$_} += $val2->{$_} } keys %$val2; 1097 } 1098 else { 1099 # This shouldn't happen. 1100 PTDEBUG && _d('vals1:', Dumper($vals1)); 1101 PTDEBUG && _d('vals2:', Dumper($vals2)); 1102 die "$val type mismatch"; 1103 } 1104 } 1105 1106 return; 1107} 1108 1109# These _deep_copy_* subs only go 1 level deep because, so far, 1110# no ea data struct has a ref any deeper. 1111sub _deep_copy_attribs { 1112 my ( $attribs ) = @_; 1113 my $copy = {}; 1114 foreach my $attrib ( keys %$attribs ) { 1115 $copy->{$attrib} = _deep_copy_attrib_vals($attribs->{$attrib}); 1116 } 1117 return $copy; 1118} 1119 1120sub _deep_copy_attrib_vals { 1121 my ( $vals ) = @_; 1122 my $copy; 1123 if ( ref $vals eq 'HASH' ) { 1124 $copy = {}; 1125 foreach my $val ( keys %$vals ) { 1126 if ( my $ref_type = ref $val ) { 1127 if ( $ref_type eq 'ARRAY' ) { 1128 my $n_elems = (scalar @$val) - 1; 1129 $copy->{$val} = [ map { undef } ( 0..$n_elems ) ]; 1130 for my $i ( 0..$n_elems ) { 1131 $copy->{$val}->[$i] = $vals->{$val}->[$i]; 1132 } 1133 } 1134 elsif ( $ref_type eq 'HASH' ) { 1135 $copy->{$val} = {}; 1136 map { $copy->{$val}->{$_} += $vals->{$val}->{$_} } 1137 keys %{$vals->{$val}} 1138 } 1139 else { 1140 die "I don't know how to deep copy a $ref_type reference"; 1141 } 1142 } 1143 else { 1144 $copy->{$val} = $vals->{$val}; 1145 } 1146 } 1147 } 1148 else { 1149 $copy = $vals; 1150 } 1151 return $copy; 1152} 1153 1154# Sub: _get_value 1155# Get the value of the attribute (or one of its alternatives) from the event. 1156# Undef is a valid value. If the attrib or none of its alternatives exist 1157# in the event, then this sub dies. 1158# 1159# Parameters: 1160# %args - Arguments 1161# 1162# Required Arguments: 1163# event - Event hashref 1164# attribute - Attribute name 1165# 1166# Optional Arguments: 1167# alternates - Arrayref of alternate attribute names 1168# 1169# Returns: 1170# Value of attribute in the event, possibly undef 1171sub _get_value { 1172 my ( $self, %args ) = @_; 1173 my ($event, $attrib, $alts) = @args{qw(event attribute alternates)}; 1174 return unless $event && $attrib; 1175 1176 my $value; 1177 if ( exists $event->{$attrib} ) { 1178 $value = $event->{$attrib}; 1179 } 1180 elsif ( $alts ) { 1181 my $found_value = 0; 1182 foreach my $alt_attrib( @$alts ) { 1183 if ( exists $event->{$alt_attrib} ) { 1184 $value = $event->{$alt_attrib}; 1185 $found_value = 1; 1186 last; 1187 } 1188 } 1189 die "Event does not have attribute $attrib or any of its alternates" 1190 unless $found_value; 1191 } 1192 else { 1193 die "Event does not have attribute $attrib and there are no alterantes"; 1194 } 1195 1196 return $value; 1197} 1198 1199sub _d { 1200 my ($package, undef, $line) = caller 0; 1201 @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } 1202 map { defined $_ ? $_ : 'undef' } 1203 @_; 1204 print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; 1205} 1206 12071; 1208} 1209# ########################################################################### 1210# End EventAggregator package 1211# ########################################################################### 1212