1# This program is copyright 2007-2011 Percona Ireland Ltd. 2# Feedback and improvements are welcome. 3# 4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED 5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF 6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. 7# 8# This program is free software; you can redistribute it and/or modify it under 9# the terms of the GNU General Public License as published by the Free Software 10# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar 11# systems, you can issue `man perlgpl' or `man perlartistic' to read these 12# licenses. 13# 14# You should have received a copy of the GNU General Public License along with 15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple 16# Place, Suite 330, Boston, MA 02111-1307 USA. 17# ########################################################################### 18# MySQLProtocolParser package 19# ########################################################################### 20{ 21# Package: MySQLProtocolParser 22# MySQLProtocolParser parses MySQL events from tcpdump files. 23# The packets come from TcpdumpParser. MySQLProtocolParse::parse_packet() 24# should be first in the callback chain because it creates events for 25# subsequent callbacks. So the sequence is: 26# 1. mk-query-digest calls TcpdumpParser::parse_event($fh, ..., @callbacks) 27# 2. TcpdumpParser::parse_event() extracts raw MySQL packets from $fh and 28# passes them to the callbacks, the first of which is 29# MySQLProtocolParser::parse_packet(). 30# 3. MySQLProtocolParser::parse_packet() makes events from the packets 31# and returns them to TcpdumpParser::parse_event(). 32# 4. TcpdumpParser::parse_event() passes the newly created events to 33# the subsequent callbacks. 34# At times MySQLProtocolParser::parse_packet() will not return an event 35# because it usually takes a few packets to create one event. In such 36# cases, TcpdumpParser::parse_event() will not call the other callbacks. 37package MySQLProtocolParser; 38 39use strict; 40use warnings FATAL => 'all'; 41use English qw(-no_match_vars); 42use constant PTDEBUG => $ENV{PTDEBUG} || 0; 43 44eval { 45 require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib 46 IO::Uncompress::Inflate->import(qw(inflate $InflateError)); 47}; 48 49use Data::Dumper; 50$Data::Dumper::Indent = 1; 51$Data::Dumper::Sortkeys = 1; 52$Data::Dumper::Quotekeys = 0; 53 54BEGIN { our @ISA = 'ProtocolParser'; } 55 56use constant { 57 COM_SLEEP => '00', 58 COM_QUIT => '01', 59 COM_INIT_DB => '02', 60 COM_QUERY => '03', 61 COM_FIELD_LIST => '04', 62 COM_CREATE_DB => '05', 63 COM_DROP_DB => '06', 64 COM_REFRESH => '07', 65 COM_SHUTDOWN => '08', 66 COM_STATISTICS => '09', 67 COM_PROCESS_INFO => '0a', 68 COM_CONNECT => '0b', 69 COM_PROCESS_KILL => '0c', 70 COM_DEBUG => '0d', 71 COM_PING => '0e', 72 COM_TIME => '0f', 73 COM_DELAYED_INSERT => '10', 74 COM_CHANGE_USER => '11', 75 COM_BINLOG_DUMP => '12', 76 COM_TABLE_DUMP => '13', 77 COM_CONNECT_OUT => '14', 78 COM_REGISTER_SLAVE => '15', 79 COM_STMT_PREPARE => '16', 80 COM_STMT_EXECUTE => '17', 81 COM_STMT_SEND_LONG_DATA => '18', 82 COM_STMT_CLOSE => '19', 83 COM_STMT_RESET => '1a', 84 COM_SET_OPTION => '1b', 85 COM_STMT_FETCH => '1c', 86 SERVER_QUERY_NO_GOOD_INDEX_USED => 16, 87 SERVER_QUERY_NO_INDEX_USED => 32, 88}; 89 90my %com_for = ( 91 '00' => 'COM_SLEEP', 92 '01' => 'COM_QUIT', 93 '02' => 'COM_INIT_DB', 94 '03' => 'COM_QUERY', 95 '04' => 'COM_FIELD_LIST', 96 '05' => 'COM_CREATE_DB', 97 '06' => 'COM_DROP_DB', 98 '07' => 'COM_REFRESH', 99 '08' => 'COM_SHUTDOWN', 100 '09' => 'COM_STATISTICS', 101 '0a' => 'COM_PROCESS_INFO', 102 '0b' => 'COM_CONNECT', 103 '0c' => 'COM_PROCESS_KILL', 104 '0d' => 'COM_DEBUG', 105 '0e' => 'COM_PING', 106 '0f' => 'COM_TIME', 107 '10' => 'COM_DELAYED_INSERT', 108 '11' => 'COM_CHANGE_USER', 109 '12' => 'COM_BINLOG_DUMP', 110 '13' => 'COM_TABLE_DUMP', 111 '14' => 'COM_CONNECT_OUT', 112 '15' => 'COM_REGISTER_SLAVE', 113 '16' => 'COM_STMT_PREPARE', 114 '17' => 'COM_STMT_EXECUTE', 115 '18' => 'COM_STMT_SEND_LONG_DATA', 116 '19' => 'COM_STMT_CLOSE', 117 '1a' => 'COM_STMT_RESET', 118 '1b' => 'COM_SET_OPTION', 119 '1c' => 'COM_STMT_FETCH', 120); 121 122my %flag_for = ( 123 'CLIENT_LONG_PASSWORD' => 1, # new more secure passwords 124 'CLIENT_FOUND_ROWS' => 2, # Found instead of affected rows 125 'CLIENT_LONG_FLAG' => 4, # Get all column flags 126 'CLIENT_CONNECT_WITH_DB' => 8, # One can specify db on connect 127 'CLIENT_NO_SCHEMA' => 16, # Don't allow database.table.column 128 'CLIENT_COMPRESS' => 32, # Can use compression protocol 129 'CLIENT_ODBC' => 64, # Odbc client 130 'CLIENT_LOCAL_FILES' => 128, # Can use LOAD DATA LOCAL 131 'CLIENT_IGNORE_SPACE' => 256, # Ignore spaces before '(' 132 'CLIENT_PROTOCOL_41' => 512, # New 4.1 protocol 133 'CLIENT_INTERACTIVE' => 1024, # This is an interactive client 134 'CLIENT_SSL' => 2048, # Switch to SSL after handshake 135 'CLIENT_IGNORE_SIGPIPE' => 4096, # IGNORE sigpipes 136 'CLIENT_TRANSACTIONS' => 8192, # Client knows about transactions 137 'CLIENT_RESERVED' => 16384, # Old flag for 4.1 protocol 138 'CLIENT_SECURE_CONNECTION' => 32768, # New 4.1 authentication 139 'CLIENT_MULTI_STATEMENTS' => 65536, # Enable/disable multi-stmt support 140 'CLIENT_MULTI_RESULTS' => 131072, # Enable/disable multi-results 141); 142 143use constant { 144 MYSQL_TYPE_DECIMAL => 0, 145 MYSQL_TYPE_TINY => 1, 146 MYSQL_TYPE_SHORT => 2, 147 MYSQL_TYPE_LONG => 3, 148 MYSQL_TYPE_FLOAT => 4, 149 MYSQL_TYPE_DOUBLE => 5, 150 MYSQL_TYPE_NULL => 6, 151 MYSQL_TYPE_TIMESTAMP => 7, 152 MYSQL_TYPE_LONGLONG => 8, 153 MYSQL_TYPE_INT24 => 9, 154 MYSQL_TYPE_DATE => 10, 155 MYSQL_TYPE_TIME => 11, 156 MYSQL_TYPE_DATETIME => 12, 157 MYSQL_TYPE_YEAR => 13, 158 MYSQL_TYPE_NEWDATE => 14, 159 MYSQL_TYPE_VARCHAR => 15, 160 MYSQL_TYPE_BIT => 16, 161 MYSQL_TYPE_NEWDECIMAL => 246, 162 MYSQL_TYPE_ENUM => 247, 163 MYSQL_TYPE_SET => 248, 164 MYSQL_TYPE_TINY_BLOB => 249, 165 MYSQL_TYPE_MEDIUM_BLOB => 250, 166 MYSQL_TYPE_LONG_BLOB => 251, 167 MYSQL_TYPE_BLOB => 252, 168 MYSQL_TYPE_VAR_STRING => 253, 169 MYSQL_TYPE_STRING => 254, 170 MYSQL_TYPE_GEOMETRY => 255, 171}; 172 173my %type_for = ( 174 0 => 'MYSQL_TYPE_DECIMAL', 175 1 => 'MYSQL_TYPE_TINY', 176 2 => 'MYSQL_TYPE_SHORT', 177 3 => 'MYSQL_TYPE_LONG', 178 4 => 'MYSQL_TYPE_FLOAT', 179 5 => 'MYSQL_TYPE_DOUBLE', 180 6 => 'MYSQL_TYPE_NULL', 181 7 => 'MYSQL_TYPE_TIMESTAMP', 182 8 => 'MYSQL_TYPE_LONGLONG', 183 9 => 'MYSQL_TYPE_INT24', 184 10 => 'MYSQL_TYPE_DATE', 185 11 => 'MYSQL_TYPE_TIME', 186 12 => 'MYSQL_TYPE_DATETIME', 187 13 => 'MYSQL_TYPE_YEAR', 188 14 => 'MYSQL_TYPE_NEWDATE', 189 15 => 'MYSQL_TYPE_VARCHAR', 190 16 => 'MYSQL_TYPE_BIT', 191 246 => 'MYSQL_TYPE_NEWDECIMAL', 192 247 => 'MYSQL_TYPE_ENUM', 193 248 => 'MYSQL_TYPE_SET', 194 249 => 'MYSQL_TYPE_TINY_BLOB', 195 250 => 'MYSQL_TYPE_MEDIUM_BLOB', 196 251 => 'MYSQL_TYPE_LONG_BLOB', 197 252 => 'MYSQL_TYPE_BLOB', 198 253 => 'MYSQL_TYPE_VAR_STRING', 199 254 => 'MYSQL_TYPE_STRING', 200 255 => 'MYSQL_TYPE_GEOMETRY', 201); 202 203my %unpack_type = ( 204 MYSQL_TYPE_NULL => sub { return 'NULL', 0; }, 205 MYSQL_TYPE_TINY => sub { return to_num(@_, 1), 1; }, 206 MySQL_TYPE_SHORT => sub { return to_num(@_, 2), 2; }, 207 MYSQL_TYPE_LONG => sub { return to_num(@_, 4), 4; }, 208 MYSQL_TYPE_LONGLONG => sub { return to_num(@_, 8), 8; }, 209 MYSQL_TYPE_DOUBLE => sub { return to_double(@_), 8; }, 210 MYSQL_TYPE_VARCHAR => \&unpack_string, 211 MYSQL_TYPE_VAR_STRING => \&unpack_string, 212 MYSQL_TYPE_STRING => \&unpack_string, 213); 214 215# server is the "host:port" of the sever being watched. It's auto-guessed if 216# not specified. version is a placeholder for handling differences between 217# MySQL v4.0 and older and v4.1 and newer. Currently, we only handle v4.1. 218sub new { 219 my ( $class, %args ) = @_; 220 221 my $self = { 222 server => $args{server}, 223 port => $args{port} || '3306', 224 version => '41', # MySQL proto version; not used yet 225 sessions => {}, 226 o => $args{o}, 227 fake_thread_id => 2**32, # see _make_event() 228 null_event => $args{null_event}, 229 }; 230 PTDEBUG && $self->{server} && _d('Watching only server', $self->{server}); 231 return bless $self, $class; 232} 233 234# The packet arg should be a hashref from TcpdumpParser::parse_event(). 235# misc is a placeholder for future features. 236sub parse_event { 237 my ( $self, %args ) = @_; 238 my @required_args = qw(event); 239 foreach my $arg ( @required_args ) { 240 die "I need a $arg argument" unless $args{$arg}; 241 } 242 my $packet = @args{@required_args}; 243 244 my $src_host = "$packet->{src_host}:$packet->{src_port}"; 245 my $dst_host = "$packet->{dst_host}:$packet->{dst_port}"; 246 247 if ( my $server = $self->{server} ) { # Watch only the given server. 248 $server .= ":$self->{port}"; 249 if ( $src_host ne $server && $dst_host ne $server ) { 250 PTDEBUG && _d('Packet is not to or from', $server); 251 return $self->{null_event}; 252 } 253 } 254 255 # Auto-detect the server by looking for port 3306 or port "mysql" (sometimes 256 # tcpdump will substitute the port by a lookup in /etc/protocols). 257 my $packet_from; 258 my $client; 259 if ( $src_host =~ m/:$self->{port}$/ ) { 260 $packet_from = 'server'; 261 $client = $dst_host; 262 } 263 elsif ( $dst_host =~ m/:$self->{port}$/ ) { 264 $packet_from = 'client'; 265 $client = $src_host; 266 } 267 else { 268 PTDEBUG && _d('Packet is not to or from a MySQL server'); 269 return $self->{null_event}; 270 } 271 PTDEBUG && _d('Client', $client); 272 273 # Get the client's session info or create a new session if 274 # we catch the TCP SYN sequence or the packetno is 0. 275 my $packetno = -1; 276 if ( $packet->{data_len} >= 5 ) { 277 # 5 bytes is the minimum length of any valid MySQL packet. 278 # If there's less, it's probably some TCP control packet 279 # with other data. Peek at the MySQL packet number. The 280 # only time a server sends packetno 0 is for its handshake. 281 # Client packetno 0 marks start of new query. 282 $packetno = to_num(substr($packet->{data}, 6, 2)); 283 } 284 if ( !exists $self->{sessions}->{$client} ) { 285 if ( $packet->{syn} ) { 286 PTDEBUG && _d('New session (SYN)'); 287 } 288 elsif ( $packetno == 0 ) { 289 PTDEBUG && _d('New session (packetno 0)'); 290 } 291 else { 292 PTDEBUG && _d('Ignoring mid-stream', $packet_from, 'data,', 293 'packetno', $packetno); 294 return $self->{null_event}; 295 } 296 297 $self->{sessions}->{$client} = { 298 client => $client, 299 ts => $packet->{ts}, 300 state => undef, 301 compress => undef, 302 raw_packets => [], 303 buff => '', 304 sths => {}, 305 attribs => {}, 306 n_queries => 0, 307 }; 308 } 309 my $session = $self->{sessions}->{$client}; 310 PTDEBUG && _d('Client state:', $session->{state}); 311 312 # Save raw packets to dump later in case something fails. 313 push @{$session->{raw_packets}}, $packet->{raw_packet}; 314 315 # Check client port reuse. 316 # http://code.google.com/p/maatkit/issues/detail?id=794 317 if ( $packet->{syn} && ($session->{n_queries} > 0 || $session->{state}) ) { 318 PTDEBUG && _d('Client port reuse and last session did not quit'); 319 # Fail the session so we can see the last thing the previous 320 # session was doing. 321 $self->fail_session($session, 322 'client port reuse and last session did not quit'); 323 # Then recurse to create a New session. 324 return $self->parse_event(%args); 325 } 326 327 # Return early if there's no TCP/MySQL data. These are usually 328 # TCP control packets: SYN, ACK, FIN, etc. 329 if ( $packet->{data_len} == 0 ) { 330 PTDEBUG && _d('TCP control:', 331 map { uc $_ } grep { $packet->{$_} } qw(syn ack fin rst)); 332 if ( $packet->{'fin'} 333 && ($session->{state} || '') eq 'server_handshake' ) { 334 PTDEBUG && _d('Client aborted connection'); 335 my $event = { 336 cmd => 'Admin', 337 arg => 'administrator command: Connect', 338 ts => $packet->{ts}, 339 }; 340 $session->{attribs}->{Error_msg} = 'Client closed connection during handshake'; 341 $event = $self->_make_event($event, $packet, $session); 342 delete $self->{sessions}->{$session->{client}}; 343 return $event; 344 } 345 return $self->{null_event}; 346 } 347 348 # Return unless the compressed packet can be uncompressed. 349 # If it cannot, then we're helpless and must return. 350 if ( $session->{compress} ) { 351 return unless $self->uncompress_packet($packet, $session); 352 } 353 354 if ( $session->{buff} && $packet_from eq 'client' ) { 355 # Previous packets were not complete so append this data 356 # to what we've been buffering. Afterwards, do *not* attempt 357 # to remove_mysql_header() because it was already done (from 358 # the first packet). 359 $session->{buff} .= $packet->{data}; 360 $packet->{data} = $session->{buff}; 361 $session->{buff_left} -= $packet->{data_len}; 362 363 # We didn't remove_mysql_header(), so mysql_data_len isn't set. 364 # So set it to the real, complete data len (from the first 365 # packet's MySQL header). 366 $packet->{mysql_data_len} = $session->{mysql_data_len}; 367 $packet->{number} = $session->{number}; 368 369 PTDEBUG && _d('Appending data to buff; expecting', 370 $session->{buff_left}, 'more bytes'); 371 } 372 else { 373 # Remove the first MySQL header. A single TCP packet can contain many 374 # MySQL packets, but we only look at the first. The 2nd and subsequent 375 # packets are usually parts of a result set returned by the server, but 376 # we're not interested in result sets. 377 eval { 378 remove_mysql_header($packet); 379 }; 380 if ( $EVAL_ERROR ) { 381 PTDEBUG && _d('remove_mysql_header() failed; failing session'); 382 $session->{EVAL_ERROR} = $EVAL_ERROR; 383 $self->fail_session($session, 'remove_mysql_header() failed'); 384 return $self->{null_event}; 385 } 386 } 387 388 # Finally, parse the packet and maybe create an event. 389 # The returned event may be empty if no event was ready to be created. 390 my $event; 391 if ( $packet_from eq 'server' ) { 392 $event = $self->_packet_from_server($packet, $session, $args{misc}); 393 } 394 elsif ( $packet_from eq 'client' ) { 395 if ( $session->{buff} ) { 396 if ( $session->{buff_left} <= 0 ) { 397 PTDEBUG && _d('Data is complete'); 398 $self->_delete_buff($session); 399 } 400 else { 401 return $self->{null_event}; # waiting for more data; buff_left was reported earlier 402 } 403 } 404 elsif ( $packet->{mysql_data_len} > ($packet->{data_len} - 4) ) { 405 406 # http://code.google.com/p/maatkit/issues/detail?id=832 407 if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) { 408 PTDEBUG && _d('No server OK to previous command (frag)'); 409 $self->fail_session($session, 'no server OK to previous command'); 410 # The MySQL header is removed by this point, so put it back. 411 $packet->{data} = $packet->{mysql_hdr} . $packet->{data}; 412 return $self->parse_event(%args); 413 } 414 415 # There is more MySQL data than this packet contains. 416 # Save the data and the original MySQL header values 417 # then wait for the rest of the data. 418 $session->{buff} = $packet->{data}; 419 $session->{mysql_data_len} = $packet->{mysql_data_len}; 420 $session->{number} = $packet->{number}; 421 422 # Do this just once here. For the next packets, buff_left 423 # will be decremented above. 424 $session->{buff_left} 425 ||= $packet->{mysql_data_len} - ($packet->{data_len} - 4); 426 427 PTDEBUG && _d('Data not complete; expecting', 428 $session->{buff_left}, 'more bytes'); 429 return $self->{null_event}; 430 } 431 432 if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) { 433 # Buffer handling above should ensure that by this point we have 434 # the full client query. If there's a previous client query for 435 # which we're "awaiting_reply" and then we get another client 436 # query, chances are we missed the server's OK response to the 437 # first query. So fail the first query and re-parse this second 438 # query. 439 PTDEBUG && _d('No server OK to previous command'); 440 $self->fail_session($session, 'no server OK to previous command'); 441 # The MySQL header is removed by this point, so put it back. 442 $packet->{data} = $packet->{mysql_hdr} . $packet->{data}; 443 return $self->parse_event(%args); 444 } 445 446 $event = $self->_packet_from_client($packet, $session, $args{misc}); 447 } 448 else { 449 # Should not get here. 450 die 'Packet origin unknown'; 451 } 452 453 PTDEBUG && _d('Done parsing packet; client state:', $session->{state}); 454 if ( $session->{closed} ) { 455 delete $self->{sessions}->{$session->{client}}; 456 PTDEBUG && _d('Session deleted'); 457 } 458 459 $args{stats}->{events_parsed}++ if $args{stats}; 460 return $event || $self->{null_event}; 461} 462 463# Handles a packet from the server given the state of the session. 464# The server can send back a lot of different stuff, but luckily 465# we're only interested in 466# * Connection handshake packets for the thread_id 467# * OK and Error packets for errors, warnings, etc. 468# Anything else is ignored. Returns an event if one was ready to be 469# created, otherwise returns nothing. 470sub _packet_from_server { 471 my ( $self, $packet, $session, $misc ) = @_; 472 die "I need a packet" unless $packet; 473 die "I need a session" unless $session; 474 475 PTDEBUG && _d('Packet is from server; client state:', $session->{state}); 476 477 if ( ($session->{server_seq} || '') eq $packet->{seq} ) { 478 push @{ $session->{server_retransmissions} }, $packet->{seq}; 479 PTDEBUG && _d('TCP retransmission'); 480 return; 481 } 482 $session->{server_seq} = $packet->{seq}; 483 484 my $data = $packet->{data}; 485 486 # The first byte in the packet indicates whether it's an OK, 487 # ERROR, EOF packet. If it's not one of those, we test 488 # whether it's an initialization packet (the first thing the 489 # server ever sends the client). If it's not that, it could 490 # be a result set header, field, row data, etc. 491 492 my ( $first_byte ) = substr($data, 0, 2, ''); 493 PTDEBUG && _d('First byte of packet:', $first_byte); 494 if ( !$first_byte ) { 495 $self->fail_session($session, 'no first byte'); 496 return; 497 } 498 499 # If there's no session state, then we're catching a server response 500 # mid-stream. It's only safe to wait until the client sends a command 501 # or to look for the server handshake. 502 if ( !$session->{state} ) { 503 if ( $first_byte eq '0a' && length $data >= 33 && $data =~ m/00{13}/ ) { 504 # It's the handshake packet from the server to the client. 505 # 0a is protocol v10 which is essentially the only version used 506 # today. 33 is the minimum possible length for a valid server 507 # handshake packet. It's probably a lot longer. Other packets 508 # may start with 0a, but none that can would be >= 33. The 13-byte 509 # 00 scramble buffer is another indicator. 510 my $handshake = parse_server_handshake_packet($data); 511 if ( !$handshake ) { 512 $self->fail_session($session, 'failed to parse server handshake'); 513 return; 514 } 515 $session->{state} = 'server_handshake'; 516 $session->{thread_id} = $handshake->{thread_id}; 517 518 # See http://code.google.com/p/maatkit/issues/detail?id=794 519 $session->{ts} = $packet->{ts} unless $session->{ts}; 520 } 521 elsif ( $session->{buff} ) { 522 $self->fail_session($session, 523 'got server response before full buffer'); 524 return; 525 } 526 else { 527 PTDEBUG && _d('Ignoring mid-stream server response'); 528 return; 529 } 530 } 531 else { 532 if ( $first_byte eq '00' ) { 533 if ( ($session->{state} || '') eq 'client_auth' ) { 534 # We logged in OK! Trigger an admin Connect command. 535 536 $session->{compress} = $session->{will_compress}; 537 delete $session->{will_compress}; 538 PTDEBUG && $session->{compress} && _d('Packets will be compressed'); 539 540 PTDEBUG && _d('Admin command: Connect'); 541 return $self->_make_event( 542 { cmd => 'Admin', 543 arg => 'administrator command: Connect', 544 ts => $packet->{ts}, # Events are timestamped when they end 545 }, 546 $packet, $session 547 ); 548 } 549 elsif ( $session->{cmd} ) { 550 # This OK should be ack'ing a query or something sent earlier 551 # by the client. OK for prepared statement are special. 552 my $com = $session->{cmd}->{cmd}; 553 my $ok; 554 if ( $com eq COM_STMT_PREPARE ) { 555 PTDEBUG && _d('OK for prepared statement'); 556 $ok = parse_ok_prepared_statement_packet($data); 557 if ( !$ok ) { 558 $self->fail_session($session, 559 'failed to parse OK prepared statement packet'); 560 return; 561 } 562 my $sth_id = $ok->{sth_id}; 563 $session->{attribs}->{Statement_id} = $sth_id; 564 565 # Save all sth info, used in parse_execute_packet(). 566 $session->{sths}->{$sth_id} = $ok; 567 $session->{sths}->{$sth_id}->{statement} 568 = $session->{cmd}->{arg}; 569 } 570 else { 571 $ok = parse_ok_packet($data); 572 if ( !$ok ) { 573 $self->fail_session($session, 'failed to parse OK packet'); 574 return; 575 } 576 } 577 578 my $arg; 579 if ( $com eq COM_QUERY 580 || $com eq COM_STMT_EXECUTE || $com eq COM_STMT_RESET ) { 581 $com = 'Query'; 582 $arg = $session->{cmd}->{arg}; 583 } 584 elsif ( $com eq COM_STMT_PREPARE ) { 585 $com = 'Query'; 586 $arg = "PREPARE $session->{cmd}->{arg}"; 587 } 588 else { 589 $arg = 'administrator command: ' 590 . ucfirst(lc(substr($com_for{$com}, 4))); 591 $com = 'Admin'; 592 } 593 594 return $self->_make_event( 595 { cmd => $com, 596 arg => $arg, 597 ts => $packet->{ts}, 598 Insert_id => $ok->{insert_id}, 599 Warning_count => $ok->{warnings}, 600 Rows_affected => $ok->{affected_rows}, 601 }, 602 $packet, $session 603 ); 604 } 605 else { 606 PTDEBUG && _d('Looks like an OK packet but session has no cmd'); 607 } 608 } 609 elsif ( $first_byte eq 'ff' ) { 610 my $error = parse_error_packet($data); 611 if ( !$error ) { 612 $self->fail_session($session, 'failed to parse error packet'); 613 return; 614 } 615 my $event; 616 617 if ( $session->{state} eq 'client_auth' 618 || $session->{state} eq 'server_handshake' ) { 619 PTDEBUG && _d('Connection failed'); 620 $event = { 621 cmd => 'Admin', 622 arg => 'administrator command: Connect', 623 ts => $packet->{ts}, 624 Error_no => $error->{errno}, 625 }; 626 $session->{attribs}->{Error_msg} = $error->{message}; 627 $session->{closed} = 1; # delete session when done 628 return $self->_make_event($event, $packet, $session); 629 } 630 elsif ( $session->{cmd} ) { 631 # This error should be in response to a query or something 632 # sent earlier by the client. 633 my $com = $session->{cmd}->{cmd}; 634 my $arg; 635 636 if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) { 637 $com = 'Query'; 638 $arg = $session->{cmd}->{arg}; 639 } 640 else { 641 $arg = 'administrator command: ' 642 . ucfirst(lc(substr($com_for{$com}, 4))); 643 $com = 'Admin'; 644 } 645 646 $event = { 647 cmd => $com, 648 arg => $arg, 649 ts => $packet->{ts}, 650 }; 651 if ( $error->{errno} ) { 652 # https://bugs.launchpad.net/percona-toolkit/+bug/823411 653 $event->{Error_no} = $error->{errno}; 654 } 655 $session->{attribs}->{Error_msg} = $error->{message}; 656 return $self->_make_event($event, $packet, $session); 657 } 658 else { 659 PTDEBUG && _d('Looks like an error packet but client is not ' 660 . 'authenticating and session has no cmd'); 661 } 662 } 663 elsif ( $first_byte eq 'fe' && $packet->{mysql_data_len} < 9 ) { 664 # EOF packet 665 if ( $packet->{mysql_data_len} == 1 666 && $session->{state} eq 'client_auth' 667 && $packet->{number} == 2 ) 668 { 669 PTDEBUG && _d('Server has old password table;', 670 'client will resend password using old algorithm'); 671 $session->{state} = 'client_auth_resend'; 672 } 673 else { 674 PTDEBUG && _d('Got an EOF packet'); 675 $self->fail_session($session, 'got an unexpected EOF packet'); 676 # ^^^ We shouldn't reach this because EOF should come after a 677 # header, field, or row data packet; and we should be firing the 678 # event and returning when we see that. See SVN history for some 679 # good stuff we could do if we wanted to handle EOF packets. 680 } 681 } 682 else { 683 # Since we do NOT always have all the data the server sent to the 684 # client, we can't always do any processing of results. So when 685 # we get one of these, we just fire the event even if the query 686 # is not done. This means we will NOT process EOF packets 687 # themselves (see above). 688 if ( $session->{cmd} ) { 689 PTDEBUG && _d('Got a row/field/result packet'); 690 my $com = $session->{cmd}->{cmd}; 691 PTDEBUG && _d('Responding to client', $com_for{$com}); 692 my $event = { ts => $packet->{ts} }; 693 if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) { 694 $event->{cmd} = 'Query'; 695 $event->{arg} = $session->{cmd}->{arg}; 696 } 697 else { 698 $event->{arg} = 'administrator command: ' 699 . ucfirst(lc(substr($com_for{$com}, 4))); 700 $event->{cmd} = 'Admin'; 701 } 702 703 # We DID get all the data in the packet. 704 if ( $packet->{complete} ) { 705 # Look to see if the end of the data appears to be an EOF 706 # packet. 707 my ( $warning_count, $status_flags ) 708 = $data =~ m/fe(.{4})(.{4})\Z/; 709 if ( $warning_count ) { 710 $event->{Warnings} = to_num($warning_count); 711 my $flags = to_num($status_flags); # TODO set all flags? 712 $event->{No_good_index_used} 713 = $flags & SERVER_QUERY_NO_GOOD_INDEX_USED ? 1 : 0; 714 $event->{No_index_used} 715 = $flags & SERVER_QUERY_NO_INDEX_USED ? 1 : 0; 716 } 717 } 718 719 return $self->_make_event($event, $packet, $session); 720 } 721 else { 722 PTDEBUG && _d('Unknown in-stream server response'); 723 } 724 } 725 } 726 727 return; 728} 729 730# Handles a packet from the client given the state of the session. 731# The client doesn't send a wide and exotic array of packets like 732# the server. Even so, we're only interested in: 733# * Users and dbs from connection handshake packets 734# * SQL statements from COM_QUERY commands 735# Anything else is ignored. Returns an event if one was ready to be 736# created, otherwise returns nothing. 737sub _packet_from_client { 738 my ( $self, $packet, $session, $misc ) = @_; 739 die "I need a packet" unless $packet; 740 die "I need a session" unless $session; 741 742 PTDEBUG && _d('Packet is from client; state:', $session->{state}); 743 744 if ( ($session->{client_seq} || '') eq $packet->{seq} ) { 745 push @{ $session->{client_retransmissions} }, $packet->{seq}; 746 PTDEBUG && _d('TCP retransmission'); 747 return; 748 } 749 $session->{client_seq} = $packet->{seq}; 750 751 my $data = $packet->{data}; 752 my $ts = $packet->{ts}; 753 754 if ( ($session->{state} || '') eq 'server_handshake' ) { 755 PTDEBUG && _d('Expecting client authentication packet'); 756 # The connection is a 3-way handshake: 757 # server > client (protocol version, thread id, etc.) 758 # client > server (user, pass, default db, etc.) 759 # server > client OK if login succeeds 760 # pos_in_log refers to 2nd handshake from the client. 761 # A connection is logged even if the client fails to 762 # login (bad password, etc.). 763 my $handshake = parse_client_handshake_packet($data); 764 if ( !$handshake ) { 765 $self->fail_session($session, 'failed to parse client handshake'); 766 return; 767 } 768 $session->{state} = 'client_auth'; 769 $session->{pos_in_log} = $packet->{pos_in_log}; 770 $session->{user} = $handshake->{user}; 771 $session->{db} = $handshake->{db}; 772 773 # $session->{will_compress} will become $session->{compress} when 774 # the server's final handshake packet is received. This prevents 775 # parse_packet() from trying to decompress that final packet. 776 # Compressed packets can only begin after the full handshake is done. 777 $session->{will_compress} = $handshake->{flags}->{CLIENT_COMPRESS}; 778 } 779 elsif ( ($session->{state} || '') eq 'client_auth_resend' ) { 780 # Don't know how to parse this packet. 781 PTDEBUG && _d('Client resending password using old algorithm'); 782 $session->{state} = 'client_auth'; 783 } 784 elsif ( ($session->{state} || '') eq 'awaiting_reply' ) { 785 my $arg = $session->{cmd}->{arg} ? substr($session->{cmd}->{arg}, 0, 50) 786 : 'unknown'; 787 PTDEBUG && _d('More data for previous command:', $arg, '...'); 788 return; 789 } 790 else { 791 # Otherwise, it should be a query if its the first packet (number 0). 792 # We ignore the commands that take arguments (COM_CHANGE_USER, 793 # COM_PROCESS_KILL). 794 if ( $packet->{number} != 0 ) { 795 $self->fail_session($session, 'client cmd not packet 0'); 796 return; 797 } 798 799 # Detect compression in-stream only if $session->{compress} is 800 # not defined. This means we didn't see the client handshake. 801 # If we had seen it, $session->{compress} would be defined as 0 or 1. 802 if ( !defined $session->{compress} ) { 803 return unless $self->detect_compression($packet, $session); 804 $data = $packet->{data}; 805 } 806 807 my $com = parse_com_packet($data, $packet->{mysql_data_len}); 808 if ( !$com ) { 809 $self->fail_session($session, 'failed to parse COM packet'); 810 return; 811 } 812 813 if ( $com->{code} eq COM_STMT_EXECUTE ) { 814 PTDEBUG && _d('Execute prepared statement'); 815 my $exec = parse_execute_packet($com->{data}, $session->{sths}); 816 if ( !$exec ) { 817 # This does not signal a failure, it could just be that 818 # the statement handle ID is unknown. 819 PTDEBUG && _d('Failed to parse execute packet'); 820 $session->{state} = undef; 821 return; 822 } 823 $com->{data} = $exec->{arg}; 824 $session->{attribs}->{Statement_id} = $exec->{sth_id}; 825 } 826 elsif ( $com->{code} eq COM_STMT_RESET ) { 827 my $sth_id = get_sth_id($com->{data}); 828 if ( !$sth_id ) { 829 $self->fail_session($session, 830 'failed to parse prepared statement reset packet'); 831 return; 832 } 833 $com->{data} = "RESET $sth_id"; 834 $session->{attribs}->{Statement_id} = $sth_id; 835 } 836 837 $session->{state} = 'awaiting_reply'; 838 $session->{pos_in_log} = $packet->{pos_in_log}; 839 $session->{ts} = $ts; 840 $session->{cmd} = { 841 cmd => $com->{code}, 842 arg => $com->{data}, 843 }; 844 845 if ( $com->{code} eq COM_QUIT ) { # Fire right away; will cleanup later. 846 PTDEBUG && _d('Got a COM_QUIT'); 847 848 # See http://code.google.com/p/maatkit/issues/detail?id=794 849 $session->{closed} = 1; # delete session when done 850 851 return $self->_make_event( 852 { cmd => 'Admin', 853 arg => 'administrator command: Quit', 854 ts => $ts, 855 }, 856 $packet, $session 857 ); 858 } 859 elsif ( $com->{code} eq COM_STMT_CLOSE ) { 860 # Apparently, these are not acknowledged by the server. 861 my $sth_id = get_sth_id($com->{data}); 862 if ( !$sth_id ) { 863 $self->fail_session($session, 864 'failed to parse prepared statement close packet'); 865 return; 866 } 867 delete $session->{sths}->{$sth_id}; 868 return $self->_make_event( 869 { cmd => 'Query', 870 arg => "DEALLOCATE PREPARE $sth_id", 871 ts => $ts, 872 }, 873 $packet, $session 874 ); 875 } 876 } 877 878 return; 879} 880 881# Make and return an event from the given packet and session. 882sub _make_event { 883 my ( $self, $event, $packet, $session ) = @_; 884 PTDEBUG && _d('Making event'); 885 886 # Clear packets that preceded this event. 887 $session->{raw_packets} = []; 888 $self->_delete_buff($session); 889 890 if ( !$session->{thread_id} ) { 891 # Only the server handshake packet gives the thread id, so for 892 # sessions caught mid-stream we assign a fake thread id. 893 PTDEBUG && _d('Giving session fake thread id', $self->{fake_thread_id}); 894 $session->{thread_id} = $self->{fake_thread_id}++; 895 } 896 897 my ($host, $port) = $session->{client} =~ m/((?:\d+\.){3}\d+)\:(\w+)/; 898 my $new_event = { 899 cmd => $event->{cmd}, 900 arg => $event->{arg}, 901 bytes => length( $event->{arg} ), 902 ts => tcp_timestamp( $event->{ts} ), 903 host => $host, 904 ip => $host, 905 port => $port, 906 db => $session->{db}, 907 user => $session->{user}, 908 Thread_id => $session->{thread_id}, 909 pos_in_log => $session->{pos_in_log}, 910 Query_time => timestamp_diff($session->{ts}, $packet->{ts}), 911 Rows_affected => ($event->{Rows_affected} || 0), 912 Warning_count => ($event->{Warning_count} || 0), 913 No_good_index_used => ($event->{No_good_index_used} ? 'Yes' : 'No'), 914 No_index_used => ($event->{No_index_used} ? 'Yes' : 'No'), 915 }; 916 @{$new_event}{keys %{$session->{attribs}}} = values %{$session->{attribs}}; 917 # https://bugs.launchpad.net/percona-toolkit/+bug/823411 918 foreach my $opt_attrib ( qw(Error_no) ) { 919 if ( defined $event->{$opt_attrib} ) { 920 $new_event->{$opt_attrib} = $event->{$opt_attrib}; 921 } 922 } 923 PTDEBUG && _d('Properties of event:', Dumper($new_event)); 924 925 # Delete cmd to prevent re-making the same event if the 926 # server sends extra stuff that looks like a result set, etc. 927 delete $session->{cmd}; 928 929 # Undef the session state so that we ignore everything from 930 # the server and wait until the client says something again. 931 $session->{state} = undef; 932 933 # Clear the attribs for this event. 934 $session->{attribs} = {}; 935 936 $session->{n_queries}++; 937 $session->{server_retransmissions} = []; 938 $session->{client_retransmissions} = []; 939 940 return $new_event; 941} 942 943# Extracts a slow-log-formatted timestamp from the tcpdump timestamp format. 944sub tcp_timestamp { 945 my ( $ts ) = @_; 946 $ts =~ s/^\d\d(\d\d)-(\d\d)-(\d\d)/$1$2$3/; 947 return $ts; 948} 949 950# Returns the difference between two tcpdump timestamps. 951sub timestamp_diff { 952 my ( $start, $end ) = @_; 953 my $sd = substr($start, 0, 11, ''); 954 my $ed = substr($end, 0, 11, ''); 955 my ( $sh, $sm, $ss ) = split(/:/, $start); 956 my ( $eh, $em, $es ) = split(/:/, $end); 957 my $esecs = ($eh * 3600 + $em * 60 + $es); 958 my $ssecs = ($sh * 3600 + $sm * 60 + $ss); 959 if ( $sd eq $ed ) { 960 return sprintf '%.6f', $esecs - $ssecs; 961 } 962 else { # Assume only one day boundary has been crossed, no DST, etc 963 return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs; 964 } 965} 966 967# Converts hexadecimal to string. 968sub to_string { 969 my ( $data ) = @_; 970 return pack('H*', $data); 971} 972 973sub unpack_string { 974 my ( $data ) = @_; 975 my $len = 0; 976 my $encode_len = 0; 977 ($data, $len, $encode_len) = decode_len($data); 978 my $t = 'H' . ($len ? $len * 2 : '*'); 979 $data = pack($t, $data); 980 return "\"$data\"", $encode_len + $len; 981} 982 983sub decode_len { 984 my ( $data ) = @_; 985 return unless $data; 986 987 # first byte hex len 988 # ========== ==== ============= 989 # 0-251 0-FB Same 990 # 252 FC Len in next 2 991 # 253 FD Len in next 4 992 # 254 FE Len in next 8 993 my $first_byte = to_num(substr($data, 0, 2, '')); 994 995 my $len; 996 my $encode_len; 997 if ( $first_byte <= 251 ) { 998 $len = $first_byte; 999 $encode_len = 1; 1000 } 1001 elsif ( $first_byte == 252 ) { 1002 $len = to_num(substr($data, 4, '')); 1003 $encode_len = 2; 1004 } 1005 elsif ( $first_byte == 253 ) { 1006 $len = to_num(substr($data, 6, '')); 1007 $encode_len = 3; 1008 } 1009 elsif ( $first_byte == 254 ) { 1010 $len = to_num(substr($data, 16, '')); 1011 $encode_len = 8; 1012 } 1013 else { 1014 # This shouldn't happen, but it may if we're passed data 1015 # that isn't length encoded. 1016 PTDEBUG && _d('data:', $data, 'first byte:', $first_byte); 1017 die "Invalid length encoded byte: $first_byte"; 1018 } 1019 1020 PTDEBUG && _d('len:', $len, 'encode len', $encode_len); 1021 return $data, $len, $encode_len; 1022} 1023 1024# All numbers are stored with the least significant byte first in the MySQL 1025# protocol (little endian). 1026# This function converts from little endian to big endian 1027sub to_num { 1028 my ( $str, $len ) = @_; 1029 if ( $len ) { 1030 $str = substr($str, 0, $len * 2); 1031 } 1032 my @bytes = $str =~ m/(..)/g; 1033 my $result = 0; 1034 foreach my $i ( 0 .. $#bytes ) { 1035 $result += hex($bytes[$i]) * (16 ** ($i * 2)); 1036 } 1037 return $result; 1038} 1039 1040sub to_double { 1041 my ( $str ) = @_; 1042 return unpack('d', pack('H*', $str)); 1043} 1044 1045# Accepts a reference to a string, which it will modify. Extracts a 1046# length-coded binary off the front of the string and returns that value as an 1047# integer. 1048sub get_lcb { 1049 my ( $string ) = @_; 1050 my $first_byte = hex(substr($$string, 0, 2, '')); 1051 if ( $first_byte < 251 ) { 1052 return $first_byte; 1053 } 1054 elsif ( $first_byte == 252 ) { 1055 return to_num(substr($$string, 0, 4, '')); 1056 } 1057 elsif ( $first_byte == 253 ) { 1058 return to_num(substr($$string, 0, 6, '')); 1059 } 1060 elsif ( $first_byte == 254 ) { 1061 return to_num(substr($$string, 0, 16, '')); 1062 } 1063} 1064 1065# Error packet structure: 1066# Offset Bytes Field 1067# ====== ================= ==================================== 1068# 00 00 00 01 MySQL proto header (already removed) 1069# ff Error (already removed) 1070# 0 00 00 Error number 1071# 4 23 SQL state marker, always '#' 1072# 6 00 00 00 00 00 SQL state 1073# 16 00 ... Error message 1074# The sqlstate marker and actual sqlstate are combined into one value. 1075sub parse_error_packet { 1076 my ( $data ) = @_; 1077 return unless $data; 1078 PTDEBUG && _d('ERROR data:', $data); 1079 if ( length $data < 16 ) { 1080 PTDEBUG && _d('Error packet is too short:', $data); 1081 return; 1082 } 1083 my $errno = to_num(substr($data, 0, 4)); 1084 my $marker = to_string(substr($data, 4, 2)); 1085 my $sqlstate = ''; 1086 my $message = ''; 1087 if ( $marker eq '#' ) { 1088 $sqlstate = to_string(substr($data, 6, 10)); 1089 $message = to_string(substr($data, 16)); 1090 } 1091 else { 1092 $marker = ''; 1093 $message = to_string(substr($data, 4)); 1094 } 1095 return unless $message; 1096 my $pkt = { 1097 errno => $errno, 1098 sqlstate => $marker . $sqlstate, 1099 message => $message, 1100 }; 1101 PTDEBUG && _d('Error packet:', Dumper($pkt)); 1102 return $pkt; 1103} 1104 1105# OK packet structure: 1106# Bytes Field 1107# =========== ==================================== 1108# 00 00 00 01 MySQL proto header (already removed) 1109# 00 OK/Field count (already removed) 1110# 1-9 Affected rows (LCB) 1111# 1-9 Insert ID (LCB) 1112# 00 00 Server status 1113# 00 00 Warning count 1114# 00 ... Message (optional) 1115sub parse_ok_packet { 1116 my ( $data ) = @_; 1117 return unless $data; 1118 PTDEBUG && _d('OK data:', $data); 1119 if ( length $data < 12 ) { 1120 PTDEBUG && _d('OK packet is too short:', $data); 1121 return; 1122 } 1123 my $affected_rows = get_lcb(\$data); 1124 my $insert_id = get_lcb(\$data); 1125 my $status = to_num(substr($data, 0, 4, '')); 1126 my $warnings = to_num(substr($data, 0, 4, '')); 1127 my $message = to_string($data); 1128 # Note: $message is discarded. It might be something like 1129 # Records: 2 Duplicates: 0 Warnings: 0 1130 my $pkt = { 1131 affected_rows => $affected_rows, 1132 insert_id => $insert_id, 1133 status => $status, 1134 warnings => $warnings, 1135 message => $message, 1136 }; 1137 PTDEBUG && _d('OK packet:', Dumper($pkt)); 1138 return $pkt; 1139} 1140 1141# OK prepared statement packet structure: 1142# Bytes Field 1143# =========== ==================================== 1144# 00 OK (already removed) 1145# 00 00 00 00 Statement handler ID 1146# 00 00 Number of columns in result set 1147# 00 00 Number of parameters (?) in query 1148sub parse_ok_prepared_statement_packet { 1149 my ( $data ) = @_; 1150 return unless $data; 1151 PTDEBUG && _d('OK prepared statement data:', $data); 1152 if ( length $data < 8 ) { 1153 PTDEBUG && _d('OK prepared statement packet is too short:', $data); 1154 return; 1155 } 1156 my $sth_id = to_num(substr($data, 0, 8, '')); 1157 my $num_cols = to_num(substr($data, 0, 4, '')); 1158 my $num_params = to_num(substr($data, 0, 4, '')); 1159 my $pkt = { 1160 sth_id => $sth_id, 1161 num_cols => $num_cols, 1162 num_params => $num_params, 1163 }; 1164 PTDEBUG && _d('OK prepared packet:', Dumper($pkt)); 1165 return $pkt; 1166} 1167 1168# Currently we only capture and return the thread id. 1169sub parse_server_handshake_packet { 1170 my ( $data ) = @_; 1171 return unless $data; 1172 PTDEBUG && _d('Server handshake data:', $data); 1173 my $handshake_pattern = qr{ 1174 # Bytes Name 1175 ^ # ----- ---- 1176 (.+?)00 # n Null-Term String server_version 1177 (.{8}) # 4 thread_id 1178 .{16} # 8 scramble_buff 1179 .{2} # 1 filler: always 0x00 1180 (.{4}) # 2 server_capabilities 1181 .{2} # 1 server_language 1182 .{4} # 2 server_status 1183 .{26} # 13 filler: always 0x00 1184 # 13 rest of scramble_buff 1185 }x; 1186 my ( $server_version, $thread_id, $flags ) = $data =~ m/$handshake_pattern/; 1187 my $pkt = { 1188 server_version => to_string($server_version), 1189 thread_id => to_num($thread_id), 1190 flags => parse_flags($flags), 1191 }; 1192 PTDEBUG && _d('Server handshake packet:', Dumper($pkt)); 1193 return $pkt; 1194} 1195 1196# Currently we only capture and return the user and default database. 1197sub parse_client_handshake_packet { 1198 my ( $data ) = @_; 1199 return unless $data; 1200 PTDEBUG && _d('Client handshake data:', $data); 1201 my ( $flags, $user, $buff_len ) = $data =~ m{ 1202 ^ 1203 (.{8}) # Client flags 1204 .{10} # Max packet size, charset 1205 (?:00){23} # Filler 1206 ((?:..)+?)00 # Null-terminated user name 1207 (..) # Length-coding byte for scramble buff 1208 }x; 1209 1210 # This packet is easy to detect because it's the only case where 1211 # the server sends the client a packet first (its handshake) and 1212 # then the client only and ever sends back its handshake. 1213 if ( !$buff_len ) { 1214 PTDEBUG && _d('Did not match client handshake packet'); 1215 return; 1216 } 1217 1218 my $code_len = hex($buff_len); 1219 my $db; 1220 1221 # Only try to get the db if CLIENT_CONNECT_WITH_DB flag is set 1222 # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse41 1223 my $capability_flags = to_num($flags); # $flags is stored as little endian. 1224 1225 if ($capability_flags & $flag_for{CLIENT_CONNECT_WITH_DB}) { 1226 ( $db ) = $data =~ m! 1227 ^.{64}${user}00.. # Everything matched before 1228 (?:..){$code_len} # The scramble buffer 1229 (.*?)00.*\Z # The database name 1230 !x; 1231 } 1232 1233 my $pkt = { 1234 user => to_string($user), 1235 db => $db ? to_string($db) : '', 1236 flags => parse_flags($flags), 1237 }; 1238 PTDEBUG && _d('Client handshake packet:', Dumper($pkt)); 1239 return $pkt; 1240} 1241 1242# COM data is not 00-terminated, but the the MySQL client appends \0, 1243# so we have to use the packet length to know where the data ends. 1244sub parse_com_packet { 1245 my ( $data, $len ) = @_; 1246 return unless $data && $len; 1247 PTDEBUG && _d('COM data:', 1248 (substr($data, 0, 100).(length $data > 100 ? '...' : '')), 1249 'len:', $len); 1250 my $code = substr($data, 0, 2); 1251 my $com = $com_for{$code}; 1252 if ( !$com ) { 1253 PTDEBUG && _d('Did not match COM packet'); 1254 return; 1255 } 1256 if ( $code ne COM_STMT_EXECUTE 1257 && $code ne COM_STMT_CLOSE 1258 && $code ne COM_STMT_RESET ) 1259 { 1260 # Data for the most common COM, e.g. COM_QUERY, is text. 1261 # COM_STMT_EXECUTE is not, so we leave it binary; it can 1262 # be parsed by parse_execute_packet(). 1263 $data = to_string(substr($data, 2, ($len - 1) * 2)); 1264 } 1265 my $pkt = { 1266 code => $code, 1267 com => $com, 1268 data => $data, 1269 }; 1270 PTDEBUG && _d('COM packet:', Dumper($pkt)); 1271 return $pkt; 1272} 1273 1274# Execute prepared statement packet structure: 1275# Bytes Field 1276# =========== ======================================== 1277# 00 Code 17, COM_STMT_EXECUTE 1278# 00 00 00 00 Statement handler ID 1279# 00 flags 1280# 00 00 00 00 Iteration count (reserved, always 1) 1281# (param_count+7)/8 NULL bitmap 1282# 00 1 if new parameters, else 0 1283# n*2 Parameter types (only if new parameters) 1284sub parse_execute_packet { 1285 my ( $data, $sths ) = @_; 1286 return unless $data && $sths; 1287 1288 my $sth_id = to_num(substr($data, 2, 8)); 1289 return unless defined $sth_id; 1290 1291 my $sth = $sths->{$sth_id}; 1292 if ( !$sth ) { 1293 PTDEBUG && _d('Skipping unknown statement handle', $sth_id); 1294 return; 1295 } 1296 my $null_count = int(($sth->{num_params} + 7) / 8) || 1; 1297 my $null_bitmap = to_num(substr($data, 20, $null_count * 2)); 1298 PTDEBUG && _d('NULL bitmap:', $null_bitmap, 'count:', $null_count); 1299 1300 # This chops off everything up to the byte for new params. 1301 substr($data, 0, 20 + ($null_count * 2), ''); 1302 1303 my $new_params = to_num(substr($data, 0, 2, '')); 1304 my @types; 1305 if ( $new_params ) { 1306 PTDEBUG && _d('New param types'); 1307 # It seems all params are type 254, MYSQL_TYPE_STRING. Perhaps 1308 # this depends on the client. If we ever need these types, they 1309 # can be saved here. Otherwise for now I just want to see the 1310 # types in debug output. 1311 for my $i ( 0..($sth->{num_params}-1) ) { 1312 my $type = to_num(substr($data, 0, 4, '')); 1313 push @types, $type_for{$type}; 1314 PTDEBUG && _d('Param', $i, 'type:', $type, $type_for{$type}); 1315 } 1316 $sth->{types} = \@types; 1317 } 1318 else { 1319 # Retrieve previous param types if there are param vals (data). 1320 @types = @{$sth->{types}} if $data; 1321 } 1322 1323 # $data should now be truncated up to the parameter values. 1324 1325 my $arg = $sth->{statement}; 1326 PTDEBUG && _d('Statement:', $arg); 1327 for my $i ( 0..($sth->{num_params}-1) ) { 1328 my $val; 1329 my $len; # in bytes 1330 if ( $null_bitmap & (2**$i) ) { 1331 PTDEBUG && _d('Param', $i, 'is NULL (bitmap)'); 1332 $val = 'NULL'; 1333 $len = 0; 1334 } 1335 else { 1336 if ( $unpack_type{$types[$i]} ) { 1337 ($val, $len) = $unpack_type{$types[$i]}->($data); 1338 } 1339 else { 1340 # TODO: this is probably going to break parsing other param vals 1341 PTDEBUG && _d('No handler for param', $i, 'type', $types[$i]); 1342 $val = '?'; 1343 $len = 0; 1344 } 1345 } 1346 1347 # Replace ? in prepared statement with value. 1348 PTDEBUG && _d('Param', $i, 'val:', $val); 1349 $arg =~ s/\?/$val/; 1350 1351 # Remove this param val from the data, putting us at the next one. 1352 substr($data, 0, $len * 2, '') if $len; 1353 } 1354 1355 my $pkt = { 1356 sth_id => $sth_id, 1357 arg => "EXECUTE $arg", 1358 }; 1359 PTDEBUG && _d('Execute packet:', Dumper($pkt)); 1360 return $pkt; 1361} 1362 1363sub get_sth_id { 1364 my ( $data ) = @_; 1365 return unless $data; 1366 my $sth_id = to_num(substr($data, 2, 8)); 1367 return $sth_id; 1368} 1369 1370sub parse_flags { 1371 my ( $flags ) = @_; 1372 die "I need flags" unless $flags; 1373 PTDEBUG && _d('Flag data:', $flags); 1374 my %flags = %flag_for; 1375 my $flags_dec = to_num($flags); 1376 foreach my $flag ( keys %flag_for ) { 1377 my $flagno = $flag_for{$flag}; 1378 $flags{$flag} = ($flags_dec & $flagno ? 1 : 0); 1379 } 1380 return \%flags; 1381} 1382 1383# Takes a scalarref to a hex string of compressed data. 1384# Returns a scalarref to a hex string of the uncompressed data. 1385# The given hex string of compressed data is not modified. 1386sub uncompress_data { 1387 my ( $data, $len ) = @_; 1388 die "I need data" unless $data; 1389 die "I need a len argument" unless $len; 1390 die "I need a scalar reference to data" unless ref $data eq 'SCALAR'; 1391 PTDEBUG && _d('Uncompressing data'); 1392 our $InflateError; 1393 1394 # Pack hex string into compressed binary data. 1395 my $comp_bin_data = pack('H*', $$data); 1396 1397 # Uncompress the compressed binary data. 1398 my $uncomp_bin_data = ''; 1399 my $z = new IO::Uncompress::Inflate( 1400 \$comp_bin_data 1401 ) or die "IO::Uncompress::Inflate failed: $InflateError"; 1402 my $status = $z->read(\$uncomp_bin_data, $len) 1403 or die "IO::Uncompress::Inflate failed: $InflateError"; 1404 1405 # Unpack the uncompressed binary data back into a hex string. 1406 # This is the original MySQL packet(s). 1407 my $uncomp_data = unpack('H*', $uncomp_bin_data); 1408 1409 return \$uncomp_data; 1410} 1411 1412# Returns 1 on success or 0 on failure. Failure is probably 1413# detecting compression but not being able to uncompress 1414# (uncompress_packet() returns 0). 1415sub detect_compression { 1416 my ( $self, $packet, $session ) = @_; 1417 PTDEBUG && _d('Checking for client compression'); 1418 # This is a necessary hack for detecting compression in-stream without 1419 # having seen the client handshake and CLIENT_COMPRESS flag. If the 1420 # client is compressing packets, there will be an extra 7 bytes before 1421 # the regular MySQL header. For short COM_QUERY commands, these 7 bytes 1422 # are usually zero where we'd expect to see 03 for COM_QUERY. So if we 1423 # parse this packet and it looks like a COM_SLEEP (00) which is not a 1424 # command that the client can send, then chances are the client is using 1425 # compression. 1426 my $com = parse_com_packet($packet->{data}, $packet->{mysql_data_len}); 1427 if ( $com && $com->{code} eq COM_SLEEP ) { 1428 PTDEBUG && _d('Client is using compression'); 1429 $session->{compress} = 1; 1430 1431 # Since parse_packet() didn't know the packet was compressed, it 1432 # called remove_mysql_header() which removed the first 4 of 7 bytes 1433 # of the compression header. We must restore these 4 bytes, then 1434 # uncompress and remove the MySQL header. We only do this once. 1435 $packet->{data} = $packet->{mysql_hdr} . $packet->{data}; 1436 return 0 unless $self->uncompress_packet($packet, $session); 1437 remove_mysql_header($packet); 1438 } 1439 else { 1440 PTDEBUG && _d('Client is NOT using compression'); 1441 $session->{compress} = 0; 1442 } 1443 return 1; 1444} 1445 1446# Returns 1 if the packet was uncompressed or 0 if we can't uncompress. 1447# Failure is usually due to IO::Uncompress not being available. 1448sub uncompress_packet { 1449 my ( $self, $packet, $session ) = @_; 1450 die "I need a packet" unless $packet; 1451 die "I need a session" unless $session; 1452 1453 # From the doc: "A compressed packet header is: 1454 # packet length (3 bytes), 1455 # packet number (1 byte), 1456 # and Uncompressed Packet Length (3 bytes). 1457 # The Uncompressed Packet Length is the number of bytes 1458 # in the original, uncompressed packet. If this is zero 1459 # then the data is not compressed." 1460 1461 my $data; 1462 my $comp_hdr; 1463 my $comp_data_len; 1464 my $pkt_num; 1465 my $uncomp_data_len; 1466 eval { 1467 $data = \$packet->{data}; 1468 $comp_hdr = substr($$data, 0, 14, ''); 1469 $comp_data_len = to_num(substr($comp_hdr, 0, 6)); 1470 $pkt_num = to_num(substr($comp_hdr, 6, 2)); 1471 $uncomp_data_len = to_num(substr($comp_hdr, 8, 6)); 1472 PTDEBUG && _d('Compression header data:', $comp_hdr, 1473 'compressed data len (bytes)', $comp_data_len, 1474 'number', $pkt_num, 1475 'uncompressed data len (bytes)', $uncomp_data_len); 1476 }; 1477 if ( $EVAL_ERROR ) { 1478 $session->{EVAL_ERROR} = $EVAL_ERROR; 1479 $self->fail_session($session, 'failed to parse compression header'); 1480 return 0; 1481 } 1482 1483 if ( $uncomp_data_len ) { 1484 eval { 1485 $data = uncompress_data($data, $uncomp_data_len); 1486 $packet->{data} = $$data; 1487 }; 1488 if ( $EVAL_ERROR ) { 1489 $session->{EVAL_ERROR} = $EVAL_ERROR; 1490 $self->fail_session($session, 'failed to uncompress data'); 1491 die "Cannot uncompress packet. Check that IO::Uncompress::Inflate " 1492 . "is installed.\nError: $EVAL_ERROR"; 1493 } 1494 } 1495 else { 1496 PTDEBUG && _d('Packet is not really compressed'); 1497 $packet->{data} = $$data; 1498 } 1499 1500 return 1; 1501} 1502 1503# Removes the first 4 bytes of the packet data which should be 1504# a MySQL header: 3 bytes packet length, 1 byte packet number. 1505sub remove_mysql_header { 1506 my ( $packet ) = @_; 1507 die "I need a packet" unless $packet; 1508 1509 # NOTE: the data is modified by the inmost substr call here! If we 1510 # had all the data in the TCP packets, we could change this to a while 1511 # loop; while get-a-packet-from-$data, do stuff, etc. But we don't, 1512 # and we don't want to either. 1513 my $mysql_hdr = substr($packet->{data}, 0, 8, ''); 1514 my $mysql_data_len = to_num(substr($mysql_hdr, 0, 6)); 1515 my $pkt_num = to_num(substr($mysql_hdr, 6, 2)); 1516 PTDEBUG && _d('MySQL packet: header data', $mysql_hdr, 1517 'data len (bytes)', $mysql_data_len, 'number', $pkt_num); 1518 1519 $packet->{mysql_hdr} = $mysql_hdr; 1520 $packet->{mysql_data_len} = $mysql_data_len; 1521 $packet->{number} = $pkt_num; 1522 1523 return; 1524} 1525 1526# Delete anything we added to the session related to 1527# buffering a large query received in multiple packets. 1528sub _delete_buff { 1529 my ( $self, $session ) = @_; 1530 map { delete $session->{$_} } qw(buff buff_left mysql_data_len); 1531 return; 1532} 1533 1534sub _d { 1535 my ($package, undef, $line) = caller 0; 1536 @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } 1537 map { defined $_ ? $_ : 'undef' } 1538 @_; 1539 print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; 1540} 1541 15421; 1543} 1544# ########################################################################### 1545# End MySQLProtocolParser package 1546# ########################################################################### 1547