1# This program is copyright 2009-2011 Percona Ireland Ltd. 2# Feedback and improvements are welcome. 3# 4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED 5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF 6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. 7# 8# This program is free software; you can redistribute it and/or modify it under 9# the terms of the GNU General Public License as published by the Free Software 10# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar 11# systems, you can issue `man perlgpl' or `man perlartistic' to read these 12# licenses. 13# 14# You should have received a copy of the GNU General Public License along with 15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple 16# Place, Suite 330, Boston, MA 02111-1307 USA. 17# ########################################################################### 18# ProtocolParser package 19# ########################################################################### 20{ 21# Package: ProtocolParser 22# ProtocolParser is a parent class for protocol-specific parsers. 23package ProtocolParser; 24 25use strict; 26use warnings FATAL => 'all'; 27use English qw(-no_match_vars); 28use constant PTDEBUG => $ENV{PTDEBUG} || 0; 29 30use File::Basename qw(basename); 31use File::Temp qw(tempfile); 32 33eval { 34 require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib 35 IO::Uncompress::Inflate->import(qw(inflate $InflateError)); 36}; 37 38use Data::Dumper; 39$Data::Dumper::Indent = 1; 40$Data::Dumper::Sortkeys = 1; 41$Data::Dumper::Quotekeys = 0; 42 43sub new { 44 my ( $class, %args ) = @_; 45 46 my $self = { 47 server => $args{server}, 48 port => $args{port}, 49 sessions => {}, 50 o => $args{o}, 51 }; 52 53 return bless $self, $class; 54} 55 56sub parse_event { 57 my ( $self, %args ) = @_; 58 my @required_args = qw(event); 59 foreach my $arg ( @required_args ) { 60 die "I need a $arg argument" unless $args{$arg}; 61 } 62 my $packet = @args{@required_args}; 63 64 # Save each session's packets until its closed by the client. 65 # This allows us to ensure that packets are processed in order. 66 if ( $self->{buffer} ) { 67 my ($packet_from, $session) = $self->_get_session($packet); 68 if ( $packet->{data_len} ) { 69 if ( $packet_from eq 'client' ) { 70 push @{$session->{client_packets}}, $packet; 71 PTDEBUG && _d('Saved client packet'); 72 } 73 else { 74 push @{$session->{server_packets}}, $packet; 75 PTDEBUG && _d('Saved server packet'); 76 } 77 } 78 79 # Process the session's packets when the client closes the connection. 80 return unless ($packet_from eq 'client') 81 && ($packet->{fin} || $packet->{rst}); 82 83 my $event; 84 map { 85 $event = $self->_parse_packet($_, $args{misc}); 86 $args{stats}->{events_parsed}++ if $args{stats}; 87 } sort { $a->{seq} <=> $b->{seq} } 88 @{$session->{client_packets}}; 89 90 map { 91 $event = $self->_parse_packet($_, $args{misc}); 92 $args{stats}->{events_parsed}++ if $args{stats}; 93 } sort { $a->{seq} <=> $b->{seq} } 94 @{$session->{server_packets}}; 95 96 return $event; 97 } 98 99 if ( $packet->{data_len} == 0 ) { 100 # Return early if there's no TCP data. These are usually ACK packets, but 101 # they could also be FINs in which case, we should close and delete the 102 # client's session. 103 PTDEBUG && _d('No TCP data'); 104 return; 105 } 106 107 my $event = $self->_parse_packet($packet, $args{misc}); 108 $args{stats}->{events_parsed}++ if $args{stats}; 109 return $event; 110} 111 112# The packet arg should be a hashref from TcpdumpParser::parse_event(). 113# misc is a placeholder for future features. 114sub _parse_packet { 115 my ( $self, $packet, $misc ) = @_; 116 117 my ($packet_from, $session) = $self->_get_session($packet); 118 PTDEBUG && _d('State:', $session->{state}); 119 120 # Save raw packets to dump later in case something fails. 121 push @{$session->{raw_packets}}, $packet->{raw_packet} 122 unless $misc->{recurse}; 123 124 if ( $session->{buff} ) { 125 # Previous packets were not complete so append this data 126 # to what we've been buffering. 127 $session->{buff_left} -= $packet->{data_len}; 128 if ( $session->{buff_left} > 0 ) { 129 PTDEBUG && _d('Added data to buff; expecting', $session->{buff_left}, 130 'more bytes'); 131 return; 132 } 133 134 PTDEBUG && _d('Got all data; buff left:', $session->{buff_left}); 135 $packet->{data} = $session->{buff} . $packet->{data}; 136 $packet->{data_len} += length $session->{buff}; 137 $session->{buff} = ''; 138 $session->{buff_left} = 0; 139 } 140 141 # Finally, parse the packet and maybe create an event. 142 $packet->{data} = pack('H*', $packet->{data}) unless $misc->{recurse}; 143 my $event; 144 if ( $packet_from eq 'server' ) { 145 $event = $self->_packet_from_server($packet, $session, $misc); 146 } 147 elsif ( $packet_from eq 'client' ) { 148 $event = $self->_packet_from_client($packet, $session, $misc); 149 } 150 else { 151 # Should not get here. 152 die 'Packet origin unknown'; 153 } 154 PTDEBUG && _d('State:', $session->{state}); 155 156 if ( $session->{out_of_order} ) { 157 PTDEBUG && _d('Session packets are out of order'); 158 push @{$session->{packets}}, $packet; 159 $session->{ts_min} 160 = $packet->{ts} if $packet->{ts} lt ($session->{ts_min} || ''); 161 $session->{ts_max} 162 = $packet->{ts} if $packet->{ts} gt ($session->{ts_max} || ''); 163 if ( $session->{have_all_packets} ) { 164 PTDEBUG && _d('Have all packets; ordering and processing'); 165 delete $session->{out_of_order}; 166 delete $session->{have_all_packets}; 167 map { 168 $event = $self->_parse_packet($_, { recurse => 1 }); 169 } sort { $a->{seq} <=> $b->{seq} } @{$session->{packets}}; 170 } 171 } 172 173 PTDEBUG && _d('Done with packet; event:', Dumper($event)); 174 return $event; 175} 176 177sub _get_session { 178 my ( $self, $packet ) = @_; 179 180 my $src_host = "$packet->{src_host}:$packet->{src_port}"; 181 my $dst_host = "$packet->{dst_host}:$packet->{dst_port}"; 182 183 if ( my $server = $self->{server} ) { # Watch only the given server. 184 $server .= ":$self->{port}"; 185 if ( $src_host ne $server && $dst_host ne $server ) { 186 PTDEBUG && _d('Packet is not to or from', $server); 187 return; 188 } 189 } 190 191 # Auto-detect the server by looking for its port. 192 my $packet_from; 193 my $client; 194 if ( $src_host =~ m/:$self->{port}$/ ) { 195 $packet_from = 'server'; 196 $client = $dst_host; 197 } 198 elsif ( $dst_host =~ m/:$self->{port}$/ ) { 199 $packet_from = 'client'; 200 $client = $src_host; 201 } 202 else { 203 warn 'Packet is not to or from server: ', Dumper($packet); 204 return; 205 } 206 PTDEBUG && _d('Client:', $client); 207 208 # Get the client's session info or create a new session if the 209 # client hasn't been seen before. 210 if ( !exists $self->{sessions}->{$client} ) { 211 PTDEBUG && _d('New session'); 212 $self->{sessions}->{$client} = { 213 client => $client, 214 state => undef, 215 raw_packets => [], 216 # ts -- wait for ts later. 217 }; 218 }; 219 my $session = $self->{sessions}->{$client}; 220 221 return $packet_from, $session; 222} 223 224sub _packet_from_server { 225 die "Don't call parent class _packet_from_server()"; 226} 227 228sub _packet_from_client { 229 die "Don't call parent class _packet_from_client()"; 230} 231 232sub make_event { 233 my ( $self, $session, $packet ) = @_; 234 die "Event has no attributes" unless scalar keys %{$session->{attribs}}; 235 die "Query has no arg attribute" unless $session->{attribs}->{arg}; 236 my $start_request = $session->{start_request} || 0; 237 my $start_reply = $session->{start_reply} || 0; 238 my $end_reply = $session->{end_reply} || 0; 239 PTDEBUG && _d('Request start:', $start_request, 240 'reply start:', $start_reply, 'reply end:', $end_reply); 241 my $event = { 242 Query_time => $self->timestamp_diff($start_request, $start_reply), 243 Transmit_time => $self->timestamp_diff($start_reply, $end_reply), 244 }; 245 @{$event}{keys %{$session->{attribs}}} = values %{$session->{attribs}}; 246 return $event; 247} 248 249sub _get_errors_fh { 250 my ( $self ) = @_; 251 return $self->{errors_fh} if $self->{errors_fh}; 252 253 my $exec = basename($0); 254 # Errors file isn't open yet; try to open it. 255 my ($errors_fh, $filename); 256 if ( $filename = $ENV{PERCONA_TOOLKIT_TCP_ERRORS_FILE} ) { 257 open $errors_fh, ">", $filename 258 or die "Cannot open $filename for writing (supplied from " 259 . "PERCONA_TOOLKIT_TCP_ERRORS_FILE): $OS_ERROR"; 260 } 261 else { 262 ($errors_fh, $filename) = tempfile("/tmp/$exec-errors.XXXXXXX", UNLINK => 0); 263 } 264 265 $self->{errors_file} = $filename; 266 $self->{errors_fh} = $errors_fh; 267 return $errors_fh; 268} 269 270sub fail_session { 271 my ( $self, $session, $reason ) = @_; 272 PTDEBUG && _d('Failed session', $session->{client}, 'because', $reason); 273 delete $self->{sessions}->{$session->{client}}; 274 275 return if $self->{_no_save_error}; 276 277 my $errors_fh = $self->_get_errors_fh(); 278 279 warn "TCP session $session->{client} had errors, will save them in $self->{errors_file}\n" 280 unless $self->{_warned_for}->{$self->{errors_file}}++; 281 282 my $raw_packets = delete $session->{raw_packets}; 283 $session->{reason_for_failure} = $reason; 284 my $session_dump = '# ' . Dumper($session); 285 chomp $session_dump; 286 $session_dump =~ s/\n/\n# /g; 287 print $errors_fh join("\n", $session_dump, @$raw_packets), "\n"; 288 return; 289} 290 291# Returns the difference between two tcpdump timestamps. 292sub timestamp_diff { 293 my ( $self, $start, $end ) = @_; 294 return 0 unless $start && $end; 295 my $sd = substr($start, 0, 11, ''); 296 my $ed = substr($end, 0, 11, ''); 297 my ( $sh, $sm, $ss ) = split(/:/, $start); 298 my ( $eh, $em, $es ) = split(/:/, $end); 299 my $esecs = ($eh * 3600 + $em * 60 + $es); 300 my $ssecs = ($sh * 3600 + $sm * 60 + $ss); 301 if ( $sd eq $ed ) { 302 return sprintf '%.6f', $esecs - $ssecs; 303 } 304 else { # Assume only one day boundary has been crossed, no DST, etc 305 return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs; 306 } 307} 308 309# Takes a scalarref to a hex string of compressed data. 310# Returns a scalarref to a hex string of the uncompressed data. 311# The given hex string of compressed data is not modified. 312sub uncompress_data { 313 my ( $self, $data, $len ) = @_; 314 die "I need data" unless $data; 315 die "I need a len argument" unless $len; 316 die "I need a scalar reference to data" unless ref $data eq 'SCALAR'; 317 PTDEBUG && _d('Uncompressing data'); 318 our $InflateError; 319 320 # Pack hex string into compressed binary data. 321 my $comp_bin_data = pack('H*', $$data); 322 323 # Uncompress the compressed binary data. 324 my $uncomp_bin_data = ''; 325 my $z = new IO::Uncompress::Inflate( 326 \$comp_bin_data 327 ) or die "IO::Uncompress::Inflate failed: $InflateError"; 328 my $status = $z->read(\$uncomp_bin_data, $len) 329 or die "IO::Uncompress::Inflate failed: $InflateError"; 330 331 # Unpack the uncompressed binary data back into a hex string. 332 # This is the original MySQL packet(s). 333 my $uncomp_data = unpack('H*', $uncomp_bin_data); 334 335 return \$uncomp_data; 336} 337 338sub _d { 339 my ($package, undef, $line) = caller 0; 340 @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } 341 map { defined $_ ? $_ : 'undef' } 342 @_; 343 print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; 344} 345 3461; 347} 348# ########################################################################### 349# End ProtocolParser package 350# ########################################################################### 351