1# This program is copyright 2009-2011 Percona Ireland Ltd.
2# Feedback and improvements are welcome.
3#
4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
7#
8# This program is free software; you can redistribute it and/or modify it under
9# the terms of the GNU General Public License as published by the Free Software
10# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
11# systems, you can issue `man perlgpl' or `man perlartistic' to read these
12# licenses.
13#
14# You should have received a copy of the GNU General Public License along with
15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
16# Place, Suite 330, Boston, MA  02111-1307  USA.
17# ###########################################################################
18# ProtocolParser package
19# ###########################################################################
20{
21# Package: ProtocolParser
22# ProtocolParser is a parent class for protocol-specific parsers.
23package ProtocolParser;
24
25use strict;
26use warnings FATAL => 'all';
27use English qw(-no_match_vars);
28use constant PTDEBUG => $ENV{PTDEBUG} || 0;
29
30use File::Basename qw(basename);
31use File::Temp qw(tempfile);
32
33eval {
34   require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib
35   IO::Uncompress::Inflate->import(qw(inflate $InflateError));
36};
37
38use Data::Dumper;
39$Data::Dumper::Indent    = 1;
40$Data::Dumper::Sortkeys  = 1;
41$Data::Dumper::Quotekeys = 0;
42
43sub new {
44   my ( $class, %args ) = @_;
45
46   my $self = {
47      server      => $args{server},
48      port        => $args{port},
49      sessions    => {},
50      o           => $args{o},
51   };
52
53   return bless $self, $class;
54}
55
56sub parse_event {
57   my ( $self, %args ) = @_;
58   my @required_args = qw(event);
59   foreach my $arg ( @required_args ) {
60      die "I need a $arg argument" unless $args{$arg};
61   }
62   my $packet = @args{@required_args};
63
64   # Save each session's packets until its closed by the client.
65   # This allows us to ensure that packets are processed in order.
66   if ( $self->{buffer} ) {
67      my ($packet_from, $session) = $self->_get_session($packet);
68      if ( $packet->{data_len} ) {
69         if ( $packet_from eq 'client' ) {
70            push @{$session->{client_packets}}, $packet;
71            PTDEBUG && _d('Saved client packet');
72         }
73         else {
74            push @{$session->{server_packets}}, $packet;
75            PTDEBUG && _d('Saved server packet');
76         }
77      }
78
79      # Process the session's packets when the client closes the connection.
80      return unless ($packet_from eq 'client')
81                    && ($packet->{fin} || $packet->{rst});
82
83      my $event;
84      map {
85         $event = $self->_parse_packet($_, $args{misc});
86         $args{stats}->{events_parsed}++ if $args{stats};
87      } sort { $a->{seq} <=> $b->{seq} }
88      @{$session->{client_packets}};
89
90      map {
91         $event = $self->_parse_packet($_, $args{misc});
92         $args{stats}->{events_parsed}++ if $args{stats};
93      } sort { $a->{seq} <=> $b->{seq} }
94      @{$session->{server_packets}};
95
96      return $event;
97   }
98
99   if ( $packet->{data_len} == 0 ) {
100      # Return early if there's no TCP data.  These are usually ACK packets, but
101      # they could also be FINs in which case, we should close and delete the
102      # client's session.
103      PTDEBUG && _d('No TCP data');
104      return;
105   }
106
107   my $event = $self->_parse_packet($packet, $args{misc});
108   $args{stats}->{events_parsed}++ if $args{stats};
109   return $event;
110}
111
112# The packet arg should be a hashref from TcpdumpParser::parse_event().
113# misc is a placeholder for future features.
114sub _parse_packet {
115   my ( $self, $packet, $misc ) = @_;
116
117   my ($packet_from, $session) = $self->_get_session($packet);
118   PTDEBUG && _d('State:', $session->{state});
119
120   # Save raw packets to dump later in case something fails.
121   push @{$session->{raw_packets}}, $packet->{raw_packet}
122      unless $misc->{recurse};
123
124   if ( $session->{buff} ) {
125      # Previous packets were not complete so append this data
126      # to what we've been buffering.
127      $session->{buff_left} -= $packet->{data_len};
128      if ( $session->{buff_left} > 0 ) {
129         PTDEBUG && _d('Added data to buff; expecting', $session->{buff_left},
130            'more bytes');
131         return;
132      }
133
134      PTDEBUG && _d('Got all data; buff left:', $session->{buff_left});
135      $packet->{data}       = $session->{buff} . $packet->{data};
136      $packet->{data_len}  += length $session->{buff};
137      $session->{buff}      = '';
138      $session->{buff_left} = 0;
139   }
140
141   # Finally, parse the packet and maybe create an event.
142   $packet->{data} = pack('H*', $packet->{data}) unless $misc->{recurse};
143   my $event;
144   if ( $packet_from eq 'server' ) {
145      $event = $self->_packet_from_server($packet, $session, $misc);
146   }
147   elsif ( $packet_from eq 'client' ) {
148      $event = $self->_packet_from_client($packet, $session, $misc);
149   }
150   else {
151      # Should not get here.
152      die 'Packet origin unknown';
153   }
154   PTDEBUG && _d('State:', $session->{state});
155
156   if ( $session->{out_of_order} ) {
157      PTDEBUG && _d('Session packets are out of order');
158      push @{$session->{packets}}, $packet;
159      $session->{ts_min}
160         = $packet->{ts} if $packet->{ts} lt ($session->{ts_min} || '');
161      $session->{ts_max}
162         = $packet->{ts} if $packet->{ts} gt ($session->{ts_max} || '');
163      if ( $session->{have_all_packets} ) {
164         PTDEBUG && _d('Have all packets; ordering and processing');
165         delete $session->{out_of_order};
166         delete $session->{have_all_packets};
167         map {
168            $event = $self->_parse_packet($_, { recurse => 1 });
169         } sort { $a->{seq} <=> $b->{seq} } @{$session->{packets}};
170      }
171   }
172
173   PTDEBUG && _d('Done with packet; event:', Dumper($event));
174   return $event;
175}
176
177sub _get_session {
178   my ( $self, $packet ) = @_;
179
180   my $src_host = "$packet->{src_host}:$packet->{src_port}";
181   my $dst_host = "$packet->{dst_host}:$packet->{dst_port}";
182
183   if ( my $server = $self->{server} ) {  # Watch only the given server.
184      $server .= ":$self->{port}";
185      if ( $src_host ne $server && $dst_host ne $server ) {
186         PTDEBUG && _d('Packet is not to or from', $server);
187         return;
188      }
189   }
190
191   # Auto-detect the server by looking for its port.
192   my $packet_from;
193   my $client;
194   if ( $src_host =~ m/:$self->{port}$/ ) {
195      $packet_from = 'server';
196      $client      = $dst_host;
197   }
198   elsif ( $dst_host =~ m/:$self->{port}$/ ) {
199      $packet_from = 'client';
200      $client      = $src_host;
201   }
202   else {
203      warn 'Packet is not to or from server: ', Dumper($packet);
204      return;
205   }
206   PTDEBUG && _d('Client:', $client);
207
208   # Get the client's session info or create a new session if the
209   # client hasn't been seen before.
210   if ( !exists $self->{sessions}->{$client} ) {
211      PTDEBUG && _d('New session');
212      $self->{sessions}->{$client} = {
213         client      => $client,
214         state       => undef,
215         raw_packets => [],
216         # ts -- wait for ts later.
217      };
218   };
219   my $session = $self->{sessions}->{$client};
220
221   return $packet_from, $session;
222}
223
224sub _packet_from_server {
225   die "Don't call parent class _packet_from_server()";
226}
227
228sub _packet_from_client {
229   die "Don't call parent class _packet_from_client()";
230}
231
232sub make_event {
233   my ( $self, $session, $packet ) = @_;
234   die "Event has no attributes" unless scalar keys %{$session->{attribs}};
235   die "Query has no arg attribute" unless $session->{attribs}->{arg};
236   my $start_request = $session->{start_request} || 0;
237   my $start_reply   = $session->{start_reply}   || 0;
238   my $end_reply     = $session->{end_reply}     || 0;
239   PTDEBUG && _d('Request start:', $start_request,
240      'reply start:', $start_reply, 'reply end:', $end_reply);
241   my $event = {
242      Query_time    => $self->timestamp_diff($start_request, $start_reply),
243      Transmit_time => $self->timestamp_diff($start_reply, $end_reply),
244   };
245   @{$event}{keys %{$session->{attribs}}} = values %{$session->{attribs}};
246   return $event;
247}
248
249sub _get_errors_fh {
250   my ( $self ) = @_;
251   return $self->{errors_fh} if $self->{errors_fh};
252
253   my $exec = basename($0);
254   # Errors file isn't open yet; try to open it.
255   my ($errors_fh, $filename);
256   if ( $filename = $ENV{PERCONA_TOOLKIT_TCP_ERRORS_FILE} ) {
257      open $errors_fh, ">", $filename
258         or die "Cannot open $filename for writing (supplied from "
259              . "PERCONA_TOOLKIT_TCP_ERRORS_FILE): $OS_ERROR";
260   }
261   else {
262      ($errors_fh, $filename) = tempfile("/tmp/$exec-errors.XXXXXXX", UNLINK => 0);
263   }
264
265   $self->{errors_file} = $filename;
266   $self->{errors_fh}   = $errors_fh;
267   return $errors_fh;
268}
269
270sub fail_session {
271   my ( $self, $session, $reason ) = @_;
272   PTDEBUG && _d('Failed session', $session->{client}, 'because', $reason);
273   delete $self->{sessions}->{$session->{client}};
274
275   return if $self->{_no_save_error};
276
277   my $errors_fh = $self->_get_errors_fh();
278
279   warn "TCP session $session->{client} had errors, will save them in $self->{errors_file}\n"
280      unless $self->{_warned_for}->{$self->{errors_file}}++;
281
282   my $raw_packets = delete $session->{raw_packets};
283   $session->{reason_for_failure} = $reason;
284   my $session_dump = '# ' . Dumper($session);
285   chomp $session_dump;
286   $session_dump =~ s/\n/\n# /g;
287   print $errors_fh join("\n", $session_dump, @$raw_packets), "\n";
288   return;
289}
290
291# Returns the difference between two tcpdump timestamps.
292sub timestamp_diff {
293   my ( $self, $start, $end ) = @_;
294   return 0 unless $start && $end;
295   my $sd = substr($start, 0, 11, '');
296   my $ed = substr($end,   0, 11, '');
297   my ( $sh, $sm, $ss ) = split(/:/, $start);
298   my ( $eh, $em, $es ) = split(/:/, $end);
299   my $esecs = ($eh * 3600 + $em * 60 + $es);
300   my $ssecs = ($sh * 3600 + $sm * 60 + $ss);
301   if ( $sd eq $ed ) {
302      return sprintf '%.6f', $esecs - $ssecs;
303   }
304   else { # Assume only one day boundary has been crossed, no DST, etc
305      return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs;
306   }
307}
308
309# Takes a scalarref to a hex string of compressed data.
310# Returns a scalarref to a hex string of the uncompressed data.
311# The given hex string of compressed data is not modified.
312sub uncompress_data {
313   my ( $self, $data, $len ) = @_;
314   die "I need data" unless $data;
315   die "I need a len argument" unless $len;
316   die "I need a scalar reference to data" unless ref $data eq 'SCALAR';
317   PTDEBUG && _d('Uncompressing data');
318   our $InflateError;
319
320   # Pack hex string into compressed binary data.
321   my $comp_bin_data = pack('H*', $$data);
322
323   # Uncompress the compressed binary data.
324   my $uncomp_bin_data = '';
325   my $z = new IO::Uncompress::Inflate(
326      \$comp_bin_data
327   ) or die "IO::Uncompress::Inflate failed: $InflateError";
328   my $status = $z->read(\$uncomp_bin_data, $len)
329      or die "IO::Uncompress::Inflate failed: $InflateError";
330
331   # Unpack the uncompressed binary data back into a hex string.
332   # This is the original MySQL packet(s).
333   my $uncomp_data = unpack('H*', $uncomp_bin_data);
334
335   return \$uncomp_data;
336}
337
338sub _d {
339   my ($package, undef, $line) = caller 0;
340   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
341        map { defined $_ ? $_ : 'undef' }
342        @_;
343   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
344}
345
3461;
347}
348# ###########################################################################
349# End ProtocolParser package
350# ###########################################################################
351