1# This program is copyright 2007-2011 Percona Ireland Ltd.
2# Feedback and improvements are welcome.
3#
4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
7#
8# This program is free software; you can redistribute it and/or modify it under
9# the terms of the GNU General Public License as published by the Free Software
10# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
11# systems, you can issue `man perlgpl' or `man perlartistic' to read these
12# licenses.
13#
14# You should have received a copy of the GNU General Public License along with
15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
16# Place, Suite 330, Boston, MA  02111-1307  USA.
17# ###########################################################################
18# MySQLProtocolParser package
19# ###########################################################################
20{
21# Package: MySQLProtocolParser
22# MySQLProtocolParser parses MySQL events from tcpdump files.
23# The packets come from TcpdumpParser.  MySQLProtocolParse::parse_packet()
24# should be first in the callback chain because it creates events for
25# subsequent callbacks.  So the sequence is:
26#    1. mk-query-digest calls TcpdumpParser::parse_event($fh, ..., @callbacks)
27#    2. TcpdumpParser::parse_event() extracts raw MySQL packets from $fh and
28#       passes them to the callbacks, the first of which is
29#       MySQLProtocolParser::parse_packet().
30#    3. MySQLProtocolParser::parse_packet() makes events from the packets
31#       and returns them to TcpdumpParser::parse_event().
32#    4. TcpdumpParser::parse_event() passes the newly created events to
33#       the subsequent callbacks.
34# At times MySQLProtocolParser::parse_packet() will not return an event
35# because it usually takes a few packets to create one event.  In such
36# cases, TcpdumpParser::parse_event() will not call the other callbacks.
37package MySQLProtocolParser;
38
39use strict;
40use warnings FATAL => 'all';
41use English qw(-no_match_vars);
42use constant PTDEBUG => $ENV{PTDEBUG} || 0;
43
44eval {
45   require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib
46   IO::Uncompress::Inflate->import(qw(inflate $InflateError));
47};
48
49use Data::Dumper;
50$Data::Dumper::Indent    = 1;
51$Data::Dumper::Sortkeys  = 1;
52$Data::Dumper::Quotekeys = 0;
53
54BEGIN { our @ISA = 'ProtocolParser'; }
55
56use constant {
57   COM_SLEEP               => '00',
58   COM_QUIT                => '01',
59   COM_INIT_DB             => '02',
60   COM_QUERY               => '03',
61   COM_FIELD_LIST          => '04',
62   COM_CREATE_DB           => '05',
63   COM_DROP_DB             => '06',
64   COM_REFRESH             => '07',
65   COM_SHUTDOWN            => '08',
66   COM_STATISTICS          => '09',
67   COM_PROCESS_INFO        => '0a',
68   COM_CONNECT             => '0b',
69   COM_PROCESS_KILL        => '0c',
70   COM_DEBUG               => '0d',
71   COM_PING                => '0e',
72   COM_TIME                => '0f',
73   COM_DELAYED_INSERT      => '10',
74   COM_CHANGE_USER         => '11',
75   COM_BINLOG_DUMP         => '12',
76   COM_TABLE_DUMP          => '13',
77   COM_CONNECT_OUT         => '14',
78   COM_REGISTER_SLAVE      => '15',
79   COM_STMT_PREPARE        => '16',
80   COM_STMT_EXECUTE        => '17',
81   COM_STMT_SEND_LONG_DATA => '18',
82   COM_STMT_CLOSE          => '19',
83   COM_STMT_RESET          => '1a',
84   COM_SET_OPTION          => '1b',
85   COM_STMT_FETCH          => '1c',
86   SERVER_QUERY_NO_GOOD_INDEX_USED => 16,
87   SERVER_QUERY_NO_INDEX_USED      => 32,
88};
89
90my %com_for = (
91   '00' => 'COM_SLEEP',
92   '01' => 'COM_QUIT',
93   '02' => 'COM_INIT_DB',
94   '03' => 'COM_QUERY',
95   '04' => 'COM_FIELD_LIST',
96   '05' => 'COM_CREATE_DB',
97   '06' => 'COM_DROP_DB',
98   '07' => 'COM_REFRESH',
99   '08' => 'COM_SHUTDOWN',
100   '09' => 'COM_STATISTICS',
101   '0a' => 'COM_PROCESS_INFO',
102   '0b' => 'COM_CONNECT',
103   '0c' => 'COM_PROCESS_KILL',
104   '0d' => 'COM_DEBUG',
105   '0e' => 'COM_PING',
106   '0f' => 'COM_TIME',
107   '10' => 'COM_DELAYED_INSERT',
108   '11' => 'COM_CHANGE_USER',
109   '12' => 'COM_BINLOG_DUMP',
110   '13' => 'COM_TABLE_DUMP',
111   '14' => 'COM_CONNECT_OUT',
112   '15' => 'COM_REGISTER_SLAVE',
113   '16' => 'COM_STMT_PREPARE',
114   '17' => 'COM_STMT_EXECUTE',
115   '18' => 'COM_STMT_SEND_LONG_DATA',
116   '19' => 'COM_STMT_CLOSE',
117   '1a' => 'COM_STMT_RESET',
118   '1b' => 'COM_SET_OPTION',
119   '1c' => 'COM_STMT_FETCH',
120);
121
122my %flag_for = (
123   'CLIENT_LONG_PASSWORD'     => 1,       # new more secure passwords
124   'CLIENT_FOUND_ROWS'        => 2,       # Found instead of affected rows
125   'CLIENT_LONG_FLAG'         => 4,       # Get all column flags
126   'CLIENT_CONNECT_WITH_DB'   => 8,       # One can specify db on connect
127   'CLIENT_NO_SCHEMA'         => 16,      # Don't allow database.table.column
128   'CLIENT_COMPRESS'          => 32,      # Can use compression protocol
129   'CLIENT_ODBC'              => 64,      # Odbc client
130   'CLIENT_LOCAL_FILES'       => 128,     # Can use LOAD DATA LOCAL
131   'CLIENT_IGNORE_SPACE'      => 256,     # Ignore spaces before '('
132   'CLIENT_PROTOCOL_41'       => 512,     # New 4.1 protocol
133   'CLIENT_INTERACTIVE'       => 1024,    # This is an interactive client
134   'CLIENT_SSL'               => 2048,    # Switch to SSL after handshake
135   'CLIENT_IGNORE_SIGPIPE'    => 4096,    # IGNORE sigpipes
136   'CLIENT_TRANSACTIONS'      => 8192,    # Client knows about transactions
137   'CLIENT_RESERVED'          => 16384,   # Old flag for 4.1 protocol
138   'CLIENT_SECURE_CONNECTION' => 32768,   # New 4.1 authentication
139   'CLIENT_MULTI_STATEMENTS'  => 65536,   # Enable/disable multi-stmt support
140   'CLIENT_MULTI_RESULTS'     => 131072,  # Enable/disable multi-results
141);
142
143use constant {
144   MYSQL_TYPE_DECIMAL      => 0,
145   MYSQL_TYPE_TINY         => 1,
146   MYSQL_TYPE_SHORT        => 2,
147   MYSQL_TYPE_LONG         => 3,
148   MYSQL_TYPE_FLOAT        => 4,
149   MYSQL_TYPE_DOUBLE       => 5,
150   MYSQL_TYPE_NULL         => 6,
151   MYSQL_TYPE_TIMESTAMP    => 7,
152   MYSQL_TYPE_LONGLONG     => 8,
153   MYSQL_TYPE_INT24        => 9,
154   MYSQL_TYPE_DATE         => 10,
155   MYSQL_TYPE_TIME         => 11,
156   MYSQL_TYPE_DATETIME     => 12,
157   MYSQL_TYPE_YEAR         => 13,
158   MYSQL_TYPE_NEWDATE      => 14,
159   MYSQL_TYPE_VARCHAR      => 15,
160   MYSQL_TYPE_BIT          => 16,
161   MYSQL_TYPE_NEWDECIMAL   => 246,
162   MYSQL_TYPE_ENUM         => 247,
163   MYSQL_TYPE_SET          => 248,
164   MYSQL_TYPE_TINY_BLOB    => 249,
165   MYSQL_TYPE_MEDIUM_BLOB  => 250,
166   MYSQL_TYPE_LONG_BLOB    => 251,
167   MYSQL_TYPE_BLOB         => 252,
168   MYSQL_TYPE_VAR_STRING   => 253,
169   MYSQL_TYPE_STRING       => 254,
170   MYSQL_TYPE_GEOMETRY     => 255,
171};
172
173my %type_for = (
174   0   => 'MYSQL_TYPE_DECIMAL',
175   1   => 'MYSQL_TYPE_TINY',
176   2   => 'MYSQL_TYPE_SHORT',
177   3   => 'MYSQL_TYPE_LONG',
178   4   => 'MYSQL_TYPE_FLOAT',
179   5   => 'MYSQL_TYPE_DOUBLE',
180   6   => 'MYSQL_TYPE_NULL',
181   7   => 'MYSQL_TYPE_TIMESTAMP',
182   8   => 'MYSQL_TYPE_LONGLONG',
183   9   => 'MYSQL_TYPE_INT24',
184   10  => 'MYSQL_TYPE_DATE',
185   11  => 'MYSQL_TYPE_TIME',
186   12  => 'MYSQL_TYPE_DATETIME',
187   13  => 'MYSQL_TYPE_YEAR',
188   14  => 'MYSQL_TYPE_NEWDATE',
189   15  => 'MYSQL_TYPE_VARCHAR',
190   16  => 'MYSQL_TYPE_BIT',
191   246 => 'MYSQL_TYPE_NEWDECIMAL',
192   247 => 'MYSQL_TYPE_ENUM',
193   248 => 'MYSQL_TYPE_SET',
194   249 => 'MYSQL_TYPE_TINY_BLOB',
195   250 => 'MYSQL_TYPE_MEDIUM_BLOB',
196   251 => 'MYSQL_TYPE_LONG_BLOB',
197   252 => 'MYSQL_TYPE_BLOB',
198   253 => 'MYSQL_TYPE_VAR_STRING',
199   254 => 'MYSQL_TYPE_STRING',
200   255 => 'MYSQL_TYPE_GEOMETRY',
201);
202
203my %unpack_type = (
204   MYSQL_TYPE_NULL       => sub { return 'NULL', 0; },
205   MYSQL_TYPE_TINY       => sub { return to_num(@_, 1), 1; },
206   MySQL_TYPE_SHORT      => sub { return to_num(@_, 2), 2; },
207   MYSQL_TYPE_LONG       => sub { return to_num(@_, 4), 4; },
208   MYSQL_TYPE_LONGLONG   => sub { return to_num(@_, 8), 8; },
209   MYSQL_TYPE_DOUBLE     => sub { return to_double(@_), 8; },
210   MYSQL_TYPE_VARCHAR    => \&unpack_string,
211   MYSQL_TYPE_VAR_STRING => \&unpack_string,
212   MYSQL_TYPE_STRING     => \&unpack_string,
213);
214
215# server is the "host:port" of the sever being watched.  It's auto-guessed if
216# not specified.  version is a placeholder for handling differences between
217# MySQL v4.0 and older and v4.1 and newer.  Currently, we only handle v4.1.
218sub new {
219   my ( $class, %args ) = @_;
220
221   my $self = {
222      server         => $args{server},
223      port           => $args{port} || '3306',
224      version        => '41',    # MySQL proto version; not used yet
225      sessions       => {},
226      o              => $args{o},
227      fake_thread_id => 2**32,   # see _make_event()
228      null_event     => $args{null_event},
229   };
230   PTDEBUG && $self->{server} && _d('Watching only server', $self->{server});
231   return bless $self, $class;
232}
233
234# The packet arg should be a hashref from TcpdumpParser::parse_event().
235# misc is a placeholder for future features.
236sub parse_event {
237   my ( $self, %args ) = @_;
238   my @required_args = qw(event);
239   foreach my $arg ( @required_args ) {
240      die "I need a $arg argument" unless $args{$arg};
241   }
242   my $packet = @args{@required_args};
243
244   my $src_host = "$packet->{src_host}:$packet->{src_port}";
245   my $dst_host = "$packet->{dst_host}:$packet->{dst_port}";
246
247   if ( my $server = $self->{server} ) {  # Watch only the given server.
248      $server .= ":$self->{port}";
249      if ( $src_host ne $server && $dst_host ne $server ) {
250         PTDEBUG && _d('Packet is not to or from', $server);
251         return $self->{null_event};
252      }
253   }
254
255   # Auto-detect the server by looking for port 3306 or port "mysql" (sometimes
256   # tcpdump will substitute the port by a lookup in /etc/protocols).
257   my $packet_from;
258   my $client;
259   if ( $src_host =~ m/:$self->{port}$/ ) {
260      $packet_from = 'server';
261      $client      = $dst_host;
262   }
263   elsif ( $dst_host =~ m/:$self->{port}$/ ) {
264      $packet_from = 'client';
265      $client      = $src_host;
266   }
267   else {
268      PTDEBUG && _d('Packet is not to or from a MySQL server');
269      return $self->{null_event};
270   }
271   PTDEBUG && _d('Client', $client);
272
273   # Get the client's session info or create a new session if
274   # we catch the TCP SYN sequence or the packetno is 0.
275   my $packetno = -1;
276   if ( $packet->{data_len} >= 5 ) {
277      # 5 bytes is the minimum length of any valid MySQL packet.
278      # If there's less, it's probably some TCP control packet
279      # with other data.  Peek at the MySQL packet number.  The
280      # only time a server sends packetno 0 is for its handshake.
281      # Client packetno 0 marks start of new query.
282      $packetno = to_num(substr($packet->{data}, 6, 2));
283   }
284   if ( !exists $self->{sessions}->{$client} ) {
285      if ( $packet->{syn} ) {
286         PTDEBUG && _d('New session (SYN)');
287      }
288      elsif ( $packetno == 0 ) {
289         PTDEBUG && _d('New session (packetno 0)');
290      }
291      else {
292         PTDEBUG && _d('Ignoring mid-stream', $packet_from, 'data,',
293            'packetno', $packetno);
294         return $self->{null_event};
295      }
296
297      $self->{sessions}->{$client} = {
298         client        => $client,
299         ts            => $packet->{ts},
300         state         => undef,
301         compress      => undef,
302         raw_packets   => [],
303         buff          => '',
304         sths          => {},
305         attribs       => {},
306         n_queries     => 0,
307      };
308   }
309   my $session = $self->{sessions}->{$client};
310   PTDEBUG && _d('Client state:', $session->{state});
311
312   # Save raw packets to dump later in case something fails.
313   push @{$session->{raw_packets}}, $packet->{raw_packet};
314
315   # Check client port reuse.
316   # http://code.google.com/p/maatkit/issues/detail?id=794
317   if ( $packet->{syn} && ($session->{n_queries} > 0 || $session->{state}) ) {
318      PTDEBUG && _d('Client port reuse and last session did not quit');
319      # Fail the session so we can see the last thing the previous
320      # session was doing.
321      $self->fail_session($session,
322            'client port reuse and last session did not quit');
323      # Then recurse to create a New session.
324      return $self->parse_event(%args);
325   }
326
327   # Return early if there's no TCP/MySQL data.  These are usually
328   # TCP control packets: SYN, ACK, FIN, etc.
329   if ( $packet->{data_len} == 0 ) {
330      PTDEBUG && _d('TCP control:',
331         map { uc $_ } grep { $packet->{$_} } qw(syn ack fin rst));
332      if ( $packet->{'fin'}
333           && ($session->{state} || '') eq 'server_handshake' ) {
334         PTDEBUG && _d('Client aborted connection');
335         my $event = {
336            cmd => 'Admin',
337            arg => 'administrator command: Connect',
338            ts  => $packet->{ts},
339         };
340         $session->{attribs}->{Error_msg} = 'Client closed connection during handshake';
341         $event = $self->_make_event($event, $packet, $session);
342         delete $self->{sessions}->{$session->{client}};
343         return $event;
344      }
345      return $self->{null_event};
346   }
347
348   # Return unless the compressed packet can be uncompressed.
349   # If it cannot, then we're helpless and must return.
350   if ( $session->{compress} ) {
351      return unless $self->uncompress_packet($packet, $session);
352   }
353
354   if ( $session->{buff} && $packet_from eq 'client' ) {
355      # Previous packets were not complete so append this data
356      # to what we've been buffering.  Afterwards, do *not* attempt
357      # to remove_mysql_header() because it was already done (from
358      # the first packet).
359      $session->{buff}      .= $packet->{data};
360      $packet->{data}        = $session->{buff};
361      $session->{buff_left} -= $packet->{data_len};
362
363      # We didn't remove_mysql_header(), so mysql_data_len isn't set.
364      # So set it to the real, complete data len (from the first
365      # packet's MySQL header).
366      $packet->{mysql_data_len} = $session->{mysql_data_len};
367      $packet->{number}         = $session->{number};
368
369      PTDEBUG && _d('Appending data to buff; expecting',
370         $session->{buff_left}, 'more bytes');
371   }
372   else {
373      # Remove the first MySQL header.  A single TCP packet can contain many
374      # MySQL packets, but we only look at the first.  The 2nd and subsequent
375      # packets are usually parts of a result set returned by the server, but
376      # we're not interested in result sets.
377      eval {
378         remove_mysql_header($packet);
379      };
380      if ( $EVAL_ERROR ) {
381         PTDEBUG && _d('remove_mysql_header() failed; failing session');
382         $session->{EVAL_ERROR} = $EVAL_ERROR;
383         $self->fail_session($session, 'remove_mysql_header() failed');
384         return $self->{null_event};
385      }
386   }
387
388   # Finally, parse the packet and maybe create an event.
389   # The returned event may be empty if no event was ready to be created.
390   my $event;
391   if ( $packet_from eq 'server' ) {
392      $event = $self->_packet_from_server($packet, $session, $args{misc});
393   }
394   elsif ( $packet_from eq 'client' ) {
395      if ( $session->{buff} ) {
396         if ( $session->{buff_left} <= 0 ) {
397            PTDEBUG && _d('Data is complete');
398            $self->_delete_buff($session);
399         }
400         else {
401            return $self->{null_event};  # waiting for more data; buff_left was reported earlier
402         }
403      }
404      elsif ( $packet->{mysql_data_len} > ($packet->{data_len} - 4) ) {
405
406         # http://code.google.com/p/maatkit/issues/detail?id=832
407         if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) {
408            PTDEBUG && _d('No server OK to previous command (frag)');
409            $self->fail_session($session, 'no server OK to previous command');
410            # The MySQL header is removed by this point, so put it back.
411            $packet->{data} = $packet->{mysql_hdr} . $packet->{data};
412            return $self->parse_event(%args);
413         }
414
415         # There is more MySQL data than this packet contains.
416         # Save the data and the original MySQL header values
417         # then wait for the rest of the data.
418         $session->{buff}           = $packet->{data};
419         $session->{mysql_data_len} = $packet->{mysql_data_len};
420         $session->{number}         = $packet->{number};
421
422         # Do this just once here.  For the next packets, buff_left
423         # will be decremented above.
424         $session->{buff_left}
425            ||= $packet->{mysql_data_len} - ($packet->{data_len} - 4);
426
427         PTDEBUG && _d('Data not complete; expecting',
428            $session->{buff_left}, 'more bytes');
429         return $self->{null_event};
430      }
431
432      if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) {
433         # Buffer handling above should ensure that by this point we have
434         # the full client query.  If there's a previous client query for
435         # which we're "awaiting_reply" and then we get another client
436         # query, chances are we missed the server's OK response to the
437         # first query.  So fail the first query and re-parse this second
438         # query.
439         PTDEBUG && _d('No server OK to previous command');
440         $self->fail_session($session, 'no server OK to previous command');
441         # The MySQL header is removed by this point, so put it back.
442         $packet->{data} = $packet->{mysql_hdr} . $packet->{data};
443         return $self->parse_event(%args);
444      }
445
446      $event = $self->_packet_from_client($packet, $session, $args{misc});
447   }
448   else {
449      # Should not get here.
450      die 'Packet origin unknown';
451   }
452
453   PTDEBUG && _d('Done parsing packet; client state:', $session->{state});
454   if ( $session->{closed} ) {
455      delete $self->{sessions}->{$session->{client}};
456      PTDEBUG && _d('Session deleted');
457   }
458
459   $args{stats}->{events_parsed}++ if $args{stats};
460   return $event || $self->{null_event};
461}
462
463# Handles a packet from the server given the state of the session.
464# The server can send back a lot of different stuff, but luckily
465# we're only interested in
466#    * Connection handshake packets for the thread_id
467#    * OK and Error packets for errors, warnings, etc.
468# Anything else is ignored.  Returns an event if one was ready to be
469# created, otherwise returns nothing.
470sub _packet_from_server {
471   my ( $self, $packet, $session, $misc ) = @_;
472   die "I need a packet"  unless $packet;
473   die "I need a session" unless $session;
474
475   PTDEBUG && _d('Packet is from server; client state:', $session->{state});
476
477   if ( ($session->{server_seq} || '') eq $packet->{seq} ) {
478      push @{ $session->{server_retransmissions} }, $packet->{seq};
479      PTDEBUG && _d('TCP retransmission');
480      return;
481   }
482   $session->{server_seq} = $packet->{seq};
483
484   my $data = $packet->{data};
485
486   # The first byte in the packet indicates whether it's an OK,
487   # ERROR, EOF packet.  If it's not one of those, we test
488   # whether it's an initialization packet (the first thing the
489   # server ever sends the client).  If it's not that, it could
490   # be a result set header, field, row data, etc.
491
492   my ( $first_byte ) = substr($data, 0, 2, '');
493   PTDEBUG && _d('First byte of packet:', $first_byte);
494   if ( !$first_byte ) {
495      $self->fail_session($session, 'no first byte');
496      return;
497   }
498
499   # If there's no session state, then we're catching a server response
500   # mid-stream.  It's only safe to wait until the client sends a command
501   # or to look for the server handshake.
502   if ( !$session->{state} ) {
503      if ( $first_byte eq '0a' && length $data >= 33 && $data =~ m/00{13}/ ) {
504         # It's the handshake packet from the server to the client.
505         # 0a is protocol v10 which is essentially the only version used
506         # today.  33 is the minimum possible length for a valid server
507         # handshake packet.  It's probably a lot longer.  Other packets
508         # may start with 0a, but none that can would be >= 33.  The 13-byte
509         # 00 scramble buffer is another indicator.
510         my $handshake = parse_server_handshake_packet($data);
511         if ( !$handshake ) {
512            $self->fail_session($session, 'failed to parse server handshake');
513            return;
514         }
515         $session->{state}     = 'server_handshake';
516         $session->{thread_id} = $handshake->{thread_id};
517
518         # See http://code.google.com/p/maatkit/issues/detail?id=794
519         $session->{ts} = $packet->{ts} unless $session->{ts};
520      }
521      elsif ( $session->{buff} ) {
522         $self->fail_session($session,
523            'got server response before full buffer');
524         return;
525      }
526      else {
527         PTDEBUG && _d('Ignoring mid-stream server response');
528         return;
529      }
530   }
531   else {
532      if ( $first_byte eq '00' ) {
533         if ( ($session->{state} || '') eq 'client_auth' ) {
534            # We logged in OK!  Trigger an admin Connect command.
535
536            $session->{compress} = $session->{will_compress};
537            delete $session->{will_compress};
538            PTDEBUG && $session->{compress} && _d('Packets will be compressed');
539
540            PTDEBUG && _d('Admin command: Connect');
541            return $self->_make_event(
542               {  cmd => 'Admin',
543                  arg => 'administrator command: Connect',
544                  ts  => $packet->{ts}, # Events are timestamped when they end
545               },
546               $packet, $session
547            );
548         }
549         elsif ( $session->{cmd} ) {
550            # This OK should be ack'ing a query or something sent earlier
551            # by the client.  OK for prepared statement are special.
552            my $com = $session->{cmd}->{cmd};
553            my $ok;
554            if ( $com eq COM_STMT_PREPARE ) {
555               PTDEBUG && _d('OK for prepared statement');
556               $ok = parse_ok_prepared_statement_packet($data);
557               if ( !$ok ) {
558                  $self->fail_session($session,
559                     'failed to parse OK prepared statement packet');
560                  return;
561               }
562               my $sth_id = $ok->{sth_id};
563               $session->{attribs}->{Statement_id} = $sth_id;
564
565               # Save all sth info, used in parse_execute_packet().
566               $session->{sths}->{$sth_id} = $ok;
567               $session->{sths}->{$sth_id}->{statement}
568                  = $session->{cmd}->{arg};
569            }
570            else {
571               $ok  = parse_ok_packet($data);
572               if ( !$ok ) {
573                  $self->fail_session($session, 'failed to parse OK packet');
574                  return;
575               }
576            }
577
578            my $arg;
579            if ( $com eq COM_QUERY
580                 || $com eq COM_STMT_EXECUTE || $com eq COM_STMT_RESET ) {
581               $com = 'Query';
582               $arg = $session->{cmd}->{arg};
583            }
584            elsif ( $com eq COM_STMT_PREPARE ) {
585               $com = 'Query';
586               $arg = "PREPARE $session->{cmd}->{arg}";
587            }
588            else {
589               $arg = 'administrator command: '
590                    . ucfirst(lc(substr($com_for{$com}, 4)));
591               $com = 'Admin';
592            }
593
594            return $self->_make_event(
595               {  cmd           => $com,
596                  arg           => $arg,
597                  ts            => $packet->{ts},
598                  Insert_id     => $ok->{insert_id},
599                  Warning_count => $ok->{warnings},
600                  Rows_affected => $ok->{affected_rows},
601               },
602               $packet, $session
603            );
604         }
605         else {
606            PTDEBUG && _d('Looks like an OK packet but session has no cmd');
607         }
608      }
609      elsif ( $first_byte eq 'ff' ) {
610         my $error = parse_error_packet($data);
611         if ( !$error ) {
612            $self->fail_session($session, 'failed to parse error packet');
613            return;
614         }
615         my $event;
616
617         if (   $session->{state} eq 'client_auth'
618             || $session->{state} eq 'server_handshake' ) {
619            PTDEBUG && _d('Connection failed');
620            $event = {
621               cmd      => 'Admin',
622               arg      => 'administrator command: Connect',
623               ts       => $packet->{ts},
624               Error_no => $error->{errno},
625            };
626            $session->{attribs}->{Error_msg} = $error->{message};
627            $session->{closed} = 1;  # delete session when done
628            return $self->_make_event($event, $packet, $session);
629         }
630         elsif ( $session->{cmd} ) {
631            # This error should be in response to a query or something
632            # sent earlier by the client.
633            my $com = $session->{cmd}->{cmd};
634            my $arg;
635
636            if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) {
637               $com = 'Query';
638               $arg = $session->{cmd}->{arg};
639            }
640            else {
641               $arg = 'administrator command: '
642                    . ucfirst(lc(substr($com_for{$com}, 4)));
643               $com = 'Admin';
644            }
645
646            $event = {
647               cmd => $com,
648               arg => $arg,
649               ts  => $packet->{ts},
650            };
651            if ( $error->{errno} ) {
652               # https://bugs.launchpad.net/percona-toolkit/+bug/823411
653               $event->{Error_no} = $error->{errno};
654            }
655            $session->{attribs}->{Error_msg} = $error->{message};
656            return $self->_make_event($event, $packet, $session);
657         }
658         else {
659            PTDEBUG && _d('Looks like an error packet but client is not '
660               . 'authenticating and session has no cmd');
661         }
662      }
663      elsif ( $first_byte eq 'fe' && $packet->{mysql_data_len} < 9 ) {
664         # EOF packet
665         if ( $packet->{mysql_data_len} == 1
666              && $session->{state} eq 'client_auth'
667              && $packet->{number} == 2 )
668         {
669            PTDEBUG && _d('Server has old password table;',
670               'client will resend password using old algorithm');
671            $session->{state} = 'client_auth_resend';
672         }
673         else {
674            PTDEBUG && _d('Got an EOF packet');
675            $self->fail_session($session, 'got an unexpected EOF packet');
676            # ^^^ We shouldn't reach this because EOF should come after a
677            # header, field, or row data packet; and we should be firing the
678            # event and returning when we see that.  See SVN history for some
679            # good stuff we could do if we wanted to handle EOF packets.
680         }
681      }
682      else {
683         # Since we do NOT always have all the data the server sent to the
684         # client, we can't always do any processing of results.  So when
685         # we get one of these, we just fire the event even if the query
686         # is not done.  This means we will NOT process EOF packets
687         # themselves (see above).
688         if ( $session->{cmd} ) {
689            PTDEBUG && _d('Got a row/field/result packet');
690            my $com = $session->{cmd}->{cmd};
691            PTDEBUG && _d('Responding to client', $com_for{$com});
692            my $event = { ts  => $packet->{ts} };
693            if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) {
694               $event->{cmd} = 'Query';
695               $event->{arg} = $session->{cmd}->{arg};
696            }
697            else {
698               $event->{arg} = 'administrator command: '
699                    . ucfirst(lc(substr($com_for{$com}, 4)));
700               $event->{cmd} = 'Admin';
701            }
702
703            # We DID get all the data in the packet.
704            if ( $packet->{complete} ) {
705               # Look to see if the end of the data appears to be an EOF
706               # packet.
707               my ( $warning_count, $status_flags )
708                  = $data =~ m/fe(.{4})(.{4})\Z/;
709               if ( $warning_count ) {
710                  $event->{Warnings} = to_num($warning_count);
711                  my $flags = to_num($status_flags); # TODO set all flags?
712                  $event->{No_good_index_used}
713                     = $flags & SERVER_QUERY_NO_GOOD_INDEX_USED ? 1 : 0;
714                  $event->{No_index_used}
715                     = $flags & SERVER_QUERY_NO_INDEX_USED ? 1 : 0;
716               }
717            }
718
719            return $self->_make_event($event, $packet, $session);
720         }
721         else {
722            PTDEBUG && _d('Unknown in-stream server response');
723         }
724      }
725   }
726
727   return;
728}
729
730# Handles a packet from the client given the state of the session.
731# The client doesn't send a wide and exotic array of packets like
732# the server.  Even so, we're only interested in:
733#    * Users and dbs from connection handshake packets
734#    * SQL statements from COM_QUERY commands
735# Anything else is ignored.  Returns an event if one was ready to be
736# created, otherwise returns nothing.
737sub _packet_from_client {
738   my ( $self, $packet, $session, $misc ) = @_;
739   die "I need a packet"  unless $packet;
740   die "I need a session" unless $session;
741
742   PTDEBUG && _d('Packet is from client; state:', $session->{state});
743
744   if ( ($session->{client_seq} || '') eq $packet->{seq} ) {
745      push @{ $session->{client_retransmissions} }, $packet->{seq};
746      PTDEBUG && _d('TCP retransmission');
747      return;
748   }
749   $session->{client_seq} = $packet->{seq};
750
751   my $data  = $packet->{data};
752   my $ts    = $packet->{ts};
753
754   if ( ($session->{state} || '') eq 'server_handshake' ) {
755      PTDEBUG && _d('Expecting client authentication packet');
756      # The connection is a 3-way handshake:
757      #    server > client  (protocol version, thread id, etc.)
758      #    client > server  (user, pass, default db, etc.)
759      #    server > client  OK if login succeeds
760      # pos_in_log refers to 2nd handshake from the client.
761      # A connection is logged even if the client fails to
762      # login (bad password, etc.).
763      my $handshake = parse_client_handshake_packet($data);
764      if ( !$handshake ) {
765         $self->fail_session($session, 'failed to parse client handshake');
766         return;
767      }
768      $session->{state}         = 'client_auth';
769      $session->{pos_in_log}    = $packet->{pos_in_log};
770      $session->{user}          = $handshake->{user};
771      $session->{db}            = $handshake->{db};
772
773      # $session->{will_compress} will become $session->{compress} when
774      # the server's final handshake packet is received.  This prevents
775      # parse_packet() from trying to decompress that final packet.
776      # Compressed packets can only begin after the full handshake is done.
777      $session->{will_compress} = $handshake->{flags}->{CLIENT_COMPRESS};
778   }
779   elsif ( ($session->{state} || '') eq 'client_auth_resend' ) {
780      # Don't know how to parse this packet.
781      PTDEBUG && _d('Client resending password using old algorithm');
782      $session->{state} = 'client_auth';
783   }
784   elsif ( ($session->{state} || '') eq 'awaiting_reply' ) {
785      my $arg = $session->{cmd}->{arg} ? substr($session->{cmd}->{arg}, 0, 50)
786              : 'unknown';
787      PTDEBUG && _d('More data for previous command:', $arg, '...');
788      return;
789   }
790   else {
791      # Otherwise, it should be a query if its the first packet (number 0).
792      # We ignore the commands that take arguments (COM_CHANGE_USER,
793      # COM_PROCESS_KILL).
794      if ( $packet->{number} != 0 ) {
795         $self->fail_session($session, 'client cmd not packet 0');
796         return;
797      }
798
799      # Detect compression in-stream only if $session->{compress} is
800      # not defined.  This means we didn't see the client handshake.
801      # If we had seen it, $session->{compress} would be defined as 0 or 1.
802      if ( !defined $session->{compress} ) {
803         return unless $self->detect_compression($packet, $session);
804         $data = $packet->{data};
805      }
806
807      my $com = parse_com_packet($data, $packet->{mysql_data_len});
808      if ( !$com ) {
809         $self->fail_session($session, 'failed to parse COM packet');
810         return;
811      }
812
813      if ( $com->{code} eq COM_STMT_EXECUTE ) {
814         PTDEBUG && _d('Execute prepared statement');
815         my $exec = parse_execute_packet($com->{data}, $session->{sths});
816         if ( !$exec ) {
817            # This does not signal a failure, it could just be that
818            # the statement handle ID is unknown.
819            PTDEBUG && _d('Failed to parse execute packet');
820            $session->{state} = undef;
821            return;
822         }
823         $com->{data} = $exec->{arg};
824         $session->{attribs}->{Statement_id} = $exec->{sth_id};
825      }
826      elsif ( $com->{code} eq COM_STMT_RESET ) {
827         my $sth_id = get_sth_id($com->{data});
828         if ( !$sth_id ) {
829            $self->fail_session($session,
830               'failed to parse prepared statement reset packet');
831            return;
832         }
833         $com->{data} = "RESET $sth_id";
834         $session->{attribs}->{Statement_id} = $sth_id;
835      }
836
837      $session->{state}      = 'awaiting_reply';
838      $session->{pos_in_log} = $packet->{pos_in_log};
839      $session->{ts}         = $ts;
840      $session->{cmd}        = {
841         cmd => $com->{code},
842         arg => $com->{data},
843      };
844
845      if ( $com->{code} eq COM_QUIT ) { # Fire right away; will cleanup later.
846         PTDEBUG && _d('Got a COM_QUIT');
847
848         # See http://code.google.com/p/maatkit/issues/detail?id=794
849         $session->{closed} = 1;  # delete session when done
850
851         return $self->_make_event(
852            {  cmd       => 'Admin',
853               arg       => 'administrator command: Quit',
854               ts        => $ts,
855            },
856            $packet, $session
857         );
858      }
859      elsif ( $com->{code} eq COM_STMT_CLOSE ) {
860         # Apparently, these are not acknowledged by the server.
861         my $sth_id = get_sth_id($com->{data});
862         if ( !$sth_id ) {
863            $self->fail_session($session,
864               'failed to parse prepared statement close packet');
865            return;
866         }
867         delete $session->{sths}->{$sth_id};
868         return $self->_make_event(
869            {  cmd       => 'Query',
870               arg       => "DEALLOCATE PREPARE $sth_id",
871               ts        => $ts,
872            },
873            $packet, $session
874         );
875      }
876   }
877
878   return;
879}
880
881# Make and return an event from the given packet and session.
882sub _make_event {
883   my ( $self, $event, $packet, $session ) = @_;
884   PTDEBUG && _d('Making event');
885
886   # Clear packets that preceded this event.
887   $session->{raw_packets}  = [];
888   $self->_delete_buff($session);
889
890   if ( !$session->{thread_id} ) {
891      # Only the server handshake packet gives the thread id, so for
892      # sessions caught mid-stream we assign a fake thread id.
893      PTDEBUG && _d('Giving session fake thread id', $self->{fake_thread_id});
894      $session->{thread_id} = $self->{fake_thread_id}++;
895   }
896
897   my ($host, $port) = $session->{client} =~ m/((?:\d+\.){3}\d+)\:(\w+)/;
898   my $new_event = {
899      cmd        => $event->{cmd},
900      arg        => $event->{arg},
901      bytes      => length( $event->{arg} ),
902      ts         => tcp_timestamp( $event->{ts} ),
903      host       => $host,
904      ip         => $host,
905      port       => $port,
906      db         => $session->{db},
907      user       => $session->{user},
908      Thread_id  => $session->{thread_id},
909      pos_in_log => $session->{pos_in_log},
910      Query_time => timestamp_diff($session->{ts}, $packet->{ts}),
911      Rows_affected      => ($event->{Rows_affected} || 0),
912      Warning_count      => ($event->{Warning_count} || 0),
913      No_good_index_used => ($event->{No_good_index_used} ? 'Yes' : 'No'),
914      No_index_used      => ($event->{No_index_used}      ? 'Yes' : 'No'),
915   };
916   @{$new_event}{keys %{$session->{attribs}}} = values %{$session->{attribs}};
917   # https://bugs.launchpad.net/percona-toolkit/+bug/823411
918   foreach my $opt_attrib ( qw(Error_no) ) {
919      if ( defined $event->{$opt_attrib} ) {
920         $new_event->{$opt_attrib} = $event->{$opt_attrib};
921      }
922   }
923   PTDEBUG && _d('Properties of event:', Dumper($new_event));
924
925   # Delete cmd to prevent re-making the same event if the
926   # server sends extra stuff that looks like a result set, etc.
927   delete $session->{cmd};
928
929   # Undef the session state so that we ignore everything from
930   # the server and wait until the client says something again.
931   $session->{state} = undef;
932
933   # Clear the attribs for this event.
934   $session->{attribs} = {};
935
936   $session->{n_queries}++;
937   $session->{server_retransmissions} = [];
938   $session->{client_retransmissions} = [];
939
940   return $new_event;
941}
942
943# Extracts a slow-log-formatted timestamp from the tcpdump timestamp format.
944sub tcp_timestamp {
945   my ( $ts ) = @_;
946   $ts =~ s/^\d\d(\d\d)-(\d\d)-(\d\d)/$1$2$3/;
947   return $ts;
948}
949
950# Returns the difference between two tcpdump timestamps.
951sub timestamp_diff {
952   my ( $start, $end ) = @_;
953   my $sd = substr($start, 0, 11, '');
954   my $ed = substr($end,   0, 11, '');
955   my ( $sh, $sm, $ss ) = split(/:/, $start);
956   my ( $eh, $em, $es ) = split(/:/, $end);
957   my $esecs = ($eh * 3600 + $em * 60 + $es);
958   my $ssecs = ($sh * 3600 + $sm * 60 + $ss);
959   if ( $sd eq $ed ) {
960      return sprintf '%.6f', $esecs - $ssecs;
961   }
962   else { # Assume only one day boundary has been crossed, no DST, etc
963      return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs;
964   }
965}
966
967# Converts hexadecimal to string.
968sub to_string {
969   my ( $data ) = @_;
970   return pack('H*', $data);
971}
972
973sub unpack_string {
974   my ( $data ) = @_;
975   my $len        = 0;
976   my $encode_len = 0;
977   ($data, $len, $encode_len) = decode_len($data);
978   my $t = 'H' . ($len ? $len * 2 : '*');
979   $data = pack($t, $data);
980   return "\"$data\"", $encode_len + $len;
981}
982
983sub decode_len {
984   my ( $data ) = @_;
985   return unless $data;
986
987   # first byte hex   len
988   # ========== ====  =============
989   # 0-251      0-FB  Same
990   # 252        FC    Len in next 2
991   # 253        FD    Len in next 4
992   # 254        FE    Len in next 8
993   my $first_byte = to_num(substr($data, 0, 2, ''));
994
995   my $len;
996   my $encode_len;
997   if ( $first_byte <= 251 ) {
998      $len        = $first_byte;
999      $encode_len = 1;
1000   }
1001   elsif ( $first_byte == 252 ) {
1002      $len        = to_num(substr($data, 4, ''));
1003      $encode_len = 2;
1004   }
1005   elsif ( $first_byte == 253 ) {
1006      $len        = to_num(substr($data, 6, ''));
1007      $encode_len = 3;
1008   }
1009   elsif ( $first_byte == 254 ) {
1010      $len        = to_num(substr($data, 16, ''));
1011      $encode_len = 8;
1012   }
1013   else {
1014      # This shouldn't happen, but it may if we're passed data
1015      # that isn't length encoded.
1016      PTDEBUG && _d('data:', $data, 'first byte:', $first_byte);
1017      die "Invalid length encoded byte: $first_byte";
1018   }
1019
1020   PTDEBUG && _d('len:', $len, 'encode len', $encode_len);
1021   return $data, $len, $encode_len;
1022}
1023
1024# All numbers are stored with the least significant byte first in the MySQL
1025# protocol (little endian).
1026# This function converts from little endian to big endian
1027sub to_num {
1028   my ( $str, $len ) = @_;
1029   if ( $len ) {
1030      $str = substr($str, 0, $len * 2);
1031   }
1032   my @bytes = $str =~ m/(..)/g;
1033   my $result = 0;
1034   foreach my $i ( 0 .. $#bytes ) {
1035      $result += hex($bytes[$i]) * (16 ** ($i * 2));
1036   }
1037   return $result;
1038}
1039
1040sub to_double {
1041   my ( $str ) = @_;
1042   return unpack('d', pack('H*', $str));
1043}
1044
1045# Accepts a reference to a string, which it will modify.  Extracts a
1046# length-coded binary off the front of the string and returns that value as an
1047# integer.
1048sub get_lcb {
1049   my ( $string ) = @_;
1050   my $first_byte = hex(substr($$string, 0, 2, ''));
1051   if ( $first_byte < 251 ) {
1052      return $first_byte;
1053   }
1054   elsif ( $first_byte == 252 ) {
1055      return to_num(substr($$string, 0, 4, ''));
1056   }
1057   elsif ( $first_byte == 253 ) {
1058      return to_num(substr($$string, 0, 6, ''));
1059   }
1060   elsif ( $first_byte == 254 ) {
1061      return to_num(substr($$string, 0, 16, ''));
1062   }
1063}
1064
1065# Error packet structure:
1066# Offset  Bytes               Field
1067# ======  =================   ====================================
1068#         00 00 00 01         MySQL proto header (already removed)
1069#         ff                  Error  (already removed)
1070# 0       00 00               Error number
1071# 4       23                  SQL state marker, always '#'
1072# 6       00 00 00 00 00      SQL state
1073# 16      00 ...              Error message
1074# The sqlstate marker and actual sqlstate are combined into one value.
1075sub parse_error_packet {
1076   my ( $data ) = @_;
1077   return unless $data;
1078   PTDEBUG && _d('ERROR data:', $data);
1079   if ( length $data < 16 ) {
1080      PTDEBUG && _d('Error packet is too short:', $data);
1081      return;
1082   }
1083   my $errno    = to_num(substr($data, 0, 4));
1084   my $marker   = to_string(substr($data, 4, 2));
1085   my $sqlstate = '';
1086   my $message  = '';
1087   if ( $marker eq '#' ) {
1088      $sqlstate = to_string(substr($data, 6, 10));
1089      $message  = to_string(substr($data, 16));
1090   }
1091   else {
1092      $marker  = '';
1093      $message = to_string(substr($data, 4));
1094   }
1095   return unless $message;
1096   my $pkt = {
1097      errno    => $errno,
1098      sqlstate => $marker . $sqlstate,
1099      message  => $message,
1100   };
1101   PTDEBUG && _d('Error packet:', Dumper($pkt));
1102   return $pkt;
1103}
1104
1105# OK packet structure:
1106# Bytes         Field
1107# ===========   ====================================
1108# 00 00 00 01   MySQL proto header (already removed)
1109# 00            OK/Field count (already removed)
1110# 1-9           Affected rows (LCB)
1111# 1-9           Insert ID (LCB)
1112# 00 00         Server status
1113# 00 00         Warning count
1114# 00 ...        Message (optional)
1115sub parse_ok_packet {
1116   my ( $data ) = @_;
1117   return unless $data;
1118   PTDEBUG && _d('OK data:', $data);
1119   if ( length $data < 12 ) {
1120      PTDEBUG && _d('OK packet is too short:', $data);
1121      return;
1122   }
1123   my $affected_rows = get_lcb(\$data);
1124   my $insert_id     = get_lcb(\$data);
1125   my $status        = to_num(substr($data, 0, 4, ''));
1126   my $warnings      = to_num(substr($data, 0, 4, ''));
1127   my $message       = to_string($data);
1128   # Note: $message is discarded.  It might be something like
1129   # Records: 2  Duplicates: 0  Warnings: 0
1130   my $pkt = {
1131      affected_rows => $affected_rows,
1132      insert_id     => $insert_id,
1133      status        => $status,
1134      warnings      => $warnings,
1135      message       => $message,
1136   };
1137   PTDEBUG && _d('OK packet:', Dumper($pkt));
1138   return $pkt;
1139}
1140
1141# OK prepared statement packet structure:
1142# Bytes         Field
1143# ===========   ====================================
1144# 00            OK  (already removed)
1145# 00 00 00 00   Statement handler ID
1146# 00 00         Number of columns in result set
1147# 00 00         Number of parameters (?) in query
1148sub parse_ok_prepared_statement_packet {
1149   my ( $data ) = @_;
1150   return unless $data;
1151   PTDEBUG && _d('OK prepared statement data:', $data);
1152   if ( length $data < 8 ) {
1153      PTDEBUG && _d('OK prepared statement packet is too short:', $data);
1154      return;
1155   }
1156   my $sth_id     = to_num(substr($data, 0, 8, ''));
1157   my $num_cols   = to_num(substr($data, 0, 4, ''));
1158   my $num_params = to_num(substr($data, 0, 4, ''));
1159   my $pkt = {
1160      sth_id     => $sth_id,
1161      num_cols   => $num_cols,
1162      num_params => $num_params,
1163   };
1164   PTDEBUG && _d('OK prepared packet:', Dumper($pkt));
1165   return $pkt;
1166}
1167
1168# Currently we only capture and return the thread id.
1169sub parse_server_handshake_packet {
1170   my ( $data ) = @_;
1171   return unless $data;
1172   PTDEBUG && _d('Server handshake data:', $data);
1173   my $handshake_pattern = qr{
1174                        # Bytes                Name
1175      ^                 # -----                ----
1176      (.+?)00           # n Null-Term String   server_version
1177      (.{8})            # 4                    thread_id
1178      .{16}             # 8                    scramble_buff
1179      .{2}              # 1                    filler: always 0x00
1180      (.{4})            # 2                    server_capabilities
1181      .{2}              # 1                    server_language
1182      .{4}              # 2                    server_status
1183      .{26}             # 13                   filler: always 0x00
1184                        # 13                   rest of scramble_buff
1185   }x;
1186   my ( $server_version, $thread_id, $flags ) = $data =~ m/$handshake_pattern/;
1187   my $pkt = {
1188      server_version => to_string($server_version),
1189      thread_id      => to_num($thread_id),
1190      flags          => parse_flags($flags),
1191   };
1192   PTDEBUG && _d('Server handshake packet:', Dumper($pkt));
1193   return $pkt;
1194}
1195
1196# Currently we only capture and return the user and default database.
1197sub parse_client_handshake_packet {
1198   my ( $data ) = @_;
1199   return unless $data;
1200   PTDEBUG && _d('Client handshake data:', $data);
1201   my ( $flags, $user, $buff_len ) = $data =~ m{
1202      ^
1203      (.{8})         # Client flags
1204      .{10}          # Max packet size, charset
1205      (?:00){23}     # Filler
1206      ((?:..)+?)00   # Null-terminated user name
1207      (..)           # Length-coding byte for scramble buff
1208   }x;
1209
1210   # This packet is easy to detect because it's the only case where
1211   # the server sends the client a packet first (its handshake) and
1212   # then the client only and ever sends back its handshake.
1213   if ( !$buff_len ) {
1214      PTDEBUG && _d('Did not match client handshake packet');
1215      return;
1216   }
1217
1218   my $code_len = hex($buff_len);
1219   my $db;
1220
1221   #  Only try to get the db if CLIENT_CONNECT_WITH_DB flag is set
1222   #  https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse41
1223   my $capability_flags = to_num($flags); # $flags is stored as little endian.
1224
1225   if ($capability_flags & $flag_for{CLIENT_CONNECT_WITH_DB}) {
1226      ( $db ) = $data =~ m!
1227         ^.{64}${user}00..   # Everything matched before
1228         (?:..){$code_len}   # The scramble buffer
1229         (.*?)00.*\Z         # The database name
1230      !x;
1231   }
1232
1233   my $pkt = {
1234      user  => to_string($user),
1235      db    => $db ? to_string($db) : '',
1236      flags => parse_flags($flags),
1237   };
1238   PTDEBUG && _d('Client handshake packet:', Dumper($pkt));
1239   return $pkt;
1240}
1241
1242# COM data is not 00-terminated, but the the MySQL client appends \0,
1243# so we have to use the packet length to know where the data ends.
1244sub parse_com_packet {
1245   my ( $data, $len ) = @_;
1246   return unless $data && $len;
1247   PTDEBUG && _d('COM data:',
1248      (substr($data, 0, 100).(length $data > 100 ? '...' : '')),
1249      'len:', $len);
1250   my $code = substr($data, 0, 2);
1251   my $com  = $com_for{$code};
1252   if ( !$com ) {
1253      PTDEBUG && _d('Did not match COM packet');
1254      return;
1255   }
1256   if (    $code ne COM_STMT_EXECUTE
1257        && $code ne COM_STMT_CLOSE
1258        && $code ne COM_STMT_RESET )
1259   {
1260      # Data for the most common COM, e.g. COM_QUERY, is text.
1261      # COM_STMT_EXECUTE is not, so we leave it binary; it can
1262      # be parsed by parse_execute_packet().
1263      $data = to_string(substr($data, 2, ($len - 1) * 2));
1264   }
1265   my $pkt = {
1266      code => $code,
1267      com  => $com,
1268      data => $data,
1269   };
1270   PTDEBUG && _d('COM packet:', Dumper($pkt));
1271   return $pkt;
1272}
1273
1274# Execute prepared statement packet structure:
1275# Bytes              Field
1276# ===========        ========================================
1277# 00                 Code 17, COM_STMT_EXECUTE
1278# 00 00 00 00        Statement handler ID
1279# 00                 flags
1280# 00 00 00 00        Iteration count (reserved, always 1)
1281# (param_count+7)/8  NULL bitmap
1282# 00                 1 if new parameters, else 0
1283# n*2                Parameter types (only if new parameters)
1284sub parse_execute_packet {
1285   my ( $data, $sths ) = @_;
1286   return unless $data && $sths;
1287
1288   my $sth_id = to_num(substr($data, 2, 8));
1289   return unless defined $sth_id;
1290
1291   my $sth = $sths->{$sth_id};
1292   if ( !$sth ) {
1293      PTDEBUG && _d('Skipping unknown statement handle', $sth_id);
1294      return;
1295   }
1296   my $null_count  = int(($sth->{num_params} + 7) / 8) || 1;
1297   my $null_bitmap = to_num(substr($data, 20, $null_count * 2));
1298   PTDEBUG && _d('NULL bitmap:', $null_bitmap, 'count:', $null_count);
1299
1300   # This chops off everything up to the byte for new params.
1301   substr($data, 0, 20 + ($null_count * 2), '');
1302
1303   my $new_params = to_num(substr($data, 0, 2, ''));
1304   my @types;
1305   if ( $new_params ) {
1306      PTDEBUG && _d('New param types');
1307      # It seems all params are type 254, MYSQL_TYPE_STRING.  Perhaps
1308      # this depends on the client.  If we ever need these types, they
1309      # can be saved here.  Otherwise for now I just want to see the
1310      # types in debug output.
1311      for my $i ( 0..($sth->{num_params}-1) ) {
1312         my $type = to_num(substr($data, 0, 4, ''));
1313         push @types, $type_for{$type};
1314         PTDEBUG && _d('Param', $i, 'type:', $type, $type_for{$type});
1315      }
1316      $sth->{types} = \@types;
1317   }
1318   else {
1319      # Retrieve previous param types if there are param vals (data).
1320      @types = @{$sth->{types}} if $data;
1321   }
1322
1323   # $data should now be truncated up to the parameter values.
1324
1325   my $arg  = $sth->{statement};
1326   PTDEBUG && _d('Statement:', $arg);
1327   for my $i ( 0..($sth->{num_params}-1) ) {
1328      my $val;
1329      my $len;  # in bytes
1330      if ( $null_bitmap & (2**$i) ) {
1331         PTDEBUG && _d('Param', $i, 'is NULL (bitmap)');
1332         $val = 'NULL';
1333         $len = 0;
1334      }
1335      else {
1336         if ( $unpack_type{$types[$i]} ) {
1337            ($val, $len) = $unpack_type{$types[$i]}->($data);
1338         }
1339         else {
1340            # TODO: this is probably going to break parsing other param vals
1341            PTDEBUG && _d('No handler for param', $i, 'type', $types[$i]);
1342            $val = '?';
1343            $len = 0;
1344         }
1345      }
1346
1347      # Replace ? in prepared statement with value.
1348      PTDEBUG && _d('Param', $i, 'val:', $val);
1349      $arg =~ s/\?/$val/;
1350
1351      # Remove this param val from the data, putting us at the next one.
1352      substr($data, 0, $len * 2, '') if $len;
1353   }
1354
1355   my $pkt = {
1356      sth_id => $sth_id,
1357      arg    => "EXECUTE $arg",
1358   };
1359   PTDEBUG && _d('Execute packet:', Dumper($pkt));
1360   return $pkt;
1361}
1362
1363sub get_sth_id {
1364   my ( $data ) = @_;
1365   return unless $data;
1366   my $sth_id = to_num(substr($data, 2, 8));
1367   return $sth_id;
1368}
1369
1370sub parse_flags {
1371   my ( $flags ) = @_;
1372   die "I need flags" unless $flags;
1373   PTDEBUG && _d('Flag data:', $flags);
1374   my %flags     = %flag_for;
1375   my $flags_dec = to_num($flags);
1376   foreach my $flag ( keys %flag_for ) {
1377      my $flagno    = $flag_for{$flag};
1378      $flags{$flag} = ($flags_dec & $flagno ? 1 : 0);
1379   }
1380   return \%flags;
1381}
1382
1383# Takes a scalarref to a hex string of compressed data.
1384# Returns a scalarref to a hex string of the uncompressed data.
1385# The given hex string of compressed data is not modified.
1386sub uncompress_data {
1387   my ( $data, $len ) = @_;
1388   die "I need data" unless $data;
1389   die "I need a len argument" unless $len;
1390   die "I need a scalar reference to data" unless ref $data eq 'SCALAR';
1391   PTDEBUG && _d('Uncompressing data');
1392   our $InflateError;
1393
1394   # Pack hex string into compressed binary data.
1395   my $comp_bin_data = pack('H*', $$data);
1396
1397   # Uncompress the compressed binary data.
1398   my $uncomp_bin_data = '';
1399   my $z = new IO::Uncompress::Inflate(
1400      \$comp_bin_data
1401   ) or die "IO::Uncompress::Inflate failed: $InflateError";
1402   my $status = $z->read(\$uncomp_bin_data, $len)
1403      or die "IO::Uncompress::Inflate failed: $InflateError";
1404
1405   # Unpack the uncompressed binary data back into a hex string.
1406   # This is the original MySQL packet(s).
1407   my $uncomp_data = unpack('H*', $uncomp_bin_data);
1408
1409   return \$uncomp_data;
1410}
1411
1412# Returns 1 on success or 0 on failure.  Failure is probably
1413# detecting compression but not being able to uncompress
1414# (uncompress_packet() returns 0).
1415sub detect_compression {
1416   my ( $self, $packet, $session ) = @_;
1417   PTDEBUG && _d('Checking for client compression');
1418   # This is a necessary hack for detecting compression in-stream without
1419   # having seen the client handshake and CLIENT_COMPRESS flag.  If the
1420   # client is compressing packets, there will be an extra 7 bytes before
1421   # the regular MySQL header.  For short COM_QUERY commands, these 7 bytes
1422   # are usually zero where we'd expect to see 03 for COM_QUERY.  So if we
1423   # parse this packet and it looks like a COM_SLEEP (00) which is not a
1424   # command that the client can send, then chances are the client is using
1425   # compression.
1426   my $com = parse_com_packet($packet->{data}, $packet->{mysql_data_len});
1427   if ( $com && $com->{code} eq COM_SLEEP ) {
1428      PTDEBUG && _d('Client is using compression');
1429      $session->{compress} = 1;
1430
1431      # Since parse_packet() didn't know the packet was compressed, it
1432      # called remove_mysql_header() which removed the first 4 of 7 bytes
1433      # of the compression header.  We must restore these 4 bytes, then
1434      # uncompress and remove the MySQL header.  We only do this once.
1435      $packet->{data} = $packet->{mysql_hdr} . $packet->{data};
1436      return 0 unless $self->uncompress_packet($packet, $session);
1437      remove_mysql_header($packet);
1438   }
1439   else {
1440      PTDEBUG && _d('Client is NOT using compression');
1441      $session->{compress} = 0;
1442   }
1443   return 1;
1444}
1445
1446# Returns 1 if the packet was uncompressed or 0 if we can't uncompress.
1447# Failure is usually due to IO::Uncompress not being available.
1448sub uncompress_packet {
1449   my ( $self, $packet, $session ) = @_;
1450   die "I need a packet"  unless $packet;
1451   die "I need a session" unless $session;
1452
1453   # From the doc: "A compressed packet header is:
1454   #    packet length (3 bytes),
1455   #    packet number (1 byte),
1456   #    and Uncompressed Packet Length (3 bytes).
1457   # The Uncompressed Packet Length is the number of bytes
1458   # in the original, uncompressed packet. If this is zero
1459   # then the data is not compressed."
1460
1461   my $data;
1462   my $comp_hdr;
1463   my $comp_data_len;
1464   my $pkt_num;
1465   my $uncomp_data_len;
1466   eval {
1467      $data            = \$packet->{data};
1468      $comp_hdr        = substr($$data, 0, 14, '');
1469      $comp_data_len   = to_num(substr($comp_hdr, 0, 6));
1470      $pkt_num         = to_num(substr($comp_hdr, 6, 2));
1471      $uncomp_data_len = to_num(substr($comp_hdr, 8, 6));
1472      PTDEBUG && _d('Compression header data:', $comp_hdr,
1473         'compressed data len (bytes)', $comp_data_len,
1474         'number', $pkt_num,
1475         'uncompressed data len (bytes)', $uncomp_data_len);
1476   };
1477   if ( $EVAL_ERROR ) {
1478      $session->{EVAL_ERROR} = $EVAL_ERROR;
1479      $self->fail_session($session, 'failed to parse compression header');
1480      return 0;
1481   }
1482
1483   if ( $uncomp_data_len ) {
1484      eval {
1485         $data = uncompress_data($data, $uncomp_data_len);
1486         $packet->{data} = $$data;
1487      };
1488      if ( $EVAL_ERROR ) {
1489         $session->{EVAL_ERROR} = $EVAL_ERROR;
1490         $self->fail_session($session, 'failed to uncompress data');
1491         die "Cannot uncompress packet.  Check that IO::Uncompress::Inflate "
1492            . "is installed.\nError: $EVAL_ERROR";
1493      }
1494   }
1495   else {
1496      PTDEBUG && _d('Packet is not really compressed');
1497      $packet->{data} = $$data;
1498   }
1499
1500   return 1;
1501}
1502
1503# Removes the first 4 bytes of the packet data which should be
1504# a MySQL header: 3 bytes packet length, 1 byte packet number.
1505sub remove_mysql_header {
1506   my ( $packet ) = @_;
1507   die "I need a packet" unless $packet;
1508
1509   # NOTE: the data is modified by the inmost substr call here!  If we
1510   # had all the data in the TCP packets, we could change this to a while
1511   # loop; while get-a-packet-from-$data, do stuff, etc.  But we don't,
1512   # and we don't want to either.
1513   my $mysql_hdr      = substr($packet->{data}, 0, 8, '');
1514   my $mysql_data_len = to_num(substr($mysql_hdr, 0, 6));
1515   my $pkt_num        = to_num(substr($mysql_hdr, 6, 2));
1516   PTDEBUG && _d('MySQL packet: header data', $mysql_hdr,
1517      'data len (bytes)', $mysql_data_len, 'number', $pkt_num);
1518
1519   $packet->{mysql_hdr}      = $mysql_hdr;
1520   $packet->{mysql_data_len} = $mysql_data_len;
1521   $packet->{number}         = $pkt_num;
1522
1523   return;
1524}
1525
1526# Delete anything we added to the session related to
1527# buffering a large query received in multiple packets.
1528sub _delete_buff {
1529   my ( $self, $session ) = @_;
1530   map { delete $session->{$_} } qw(buff buff_left mysql_data_len);
1531   return;
1532}
1533
1534sub _d {
1535   my ($package, undef, $line) = caller 0;
1536   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
1537        map { defined $_ ? $_ : 'undef' }
1538        @_;
1539   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
1540}
1541
15421;
1543}
1544# ###########################################################################
1545# End MySQLProtocolParser package
1546# ###########################################################################
1547