1# This program is copyright 2007-2011 Baron Schwartz, 2011-2012 Percona Ireland Ltd. 2# Feedback and improvements are welcome. 3# 4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED 5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF 6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. 7# 8# This program is free software; you can redistribute it and/or modify it under 9# the terms of the GNU General Public License as published by the Free Software 10# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar 11# systems, you can issue `man perlgpl' or `man perlartistic' to read these 12# licenses. 13# 14# You should have received a copy of the GNU General Public License along with 15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple 16# Place, Suite 330, Boston, MA 02111-1307 USA. 17# ########################################################################### 18# MasterSlave package 19# ########################################################################### 20{ 21# Package: MasterSlave 22# MasterSlave handles common tasks related to master-slave setups. 23package MasterSlave; 24 25use strict; 26use warnings FATAL => 'all'; 27use English qw(-no_match_vars); 28use constant PTDEBUG => $ENV{PTDEBUG} || 0; 29 30# Sub: check_recursion_method 31# Check that the arrayref of recursion methods passed in is valid 32sub check_recursion_method { 33 my ($methods) = @_; 34 if ( @$methods != 1 ) { 35 if ( grep({ !m/processlist|hosts/i } @$methods) 36 && $methods->[0] !~ /^dsn=/i ) 37 { 38 die "Invalid combination of recursion methods: " 39 . join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". " 40 . "Only hosts and processlist may be combined.\n" 41 } 42 } 43 else { 44 my ($method) = @$methods; 45 die "Invalid recursion method: " . ( $method || 'undef' ) 46 unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i; 47 } 48} 49 50sub new { 51 my ( $class, %args ) = @_; 52 my @required_args = qw(OptionParser DSNParser Quoter); 53 foreach my $arg ( @required_args ) { 54 die "I need a $arg argument" unless $args{$arg}; 55 } 56 my $self = { 57 %args, 58 replication_thread => {}, 59 }; 60 return bless $self, $class; 61} 62 63sub get_slaves { 64 my ($self, %args) = @_; 65 my @required_args = qw(make_cxn); 66 foreach my $arg ( @required_args ) { 67 die "I need a $arg argument" unless $args{$arg}; 68 } 69 my ($make_cxn) = @args{@required_args}; 70 71 my $slaves = []; 72 my $dp = $self->{DSNParser}; 73 my $methods = $self->_resolve_recursion_methods($args{dsn}); 74 75 return $slaves unless @$methods; 76 77 if ( grep { m/processlist|hosts/i } @$methods ) { 78 my @required_args = qw(dbh dsn); 79 foreach my $arg ( @required_args ) { 80 die "I need a $arg argument" unless $args{$arg}; 81 } 82 my ($dbh, $dsn) = @args{@required_args}; 83 my $o = $self->{OptionParser}; 84 85 $self->recurse_to_slaves( 86 { dbh => $dbh, 87 dsn => $dsn, 88 slave_user => $o->got('slave-user') ? $o->get('slave-user') : '', 89 slave_password => $o->got('slave-password') ? $o->get('slave-password') : '', 90 callback => sub { 91 my ( $dsn, $dbh, $level, $parent ) = @_; 92 return unless $level; 93 PTDEBUG && _d('Found slave:', $dp->as_string($dsn)); 94 my $slave_dsn = $dsn; 95 if ($o->got('slave-user')) { 96 $slave_dsn->{u} = $o->get('slave-user'); 97 PTDEBUG && _d("Using slave user ".$o->get('slave-user')." on ".$slave_dsn->{h}.":".$slave_dsn->{P}); 98 } 99 if ($o->got('slave-password')) { 100 $slave_dsn->{p} = $o->get('slave-password'); 101 PTDEBUG && _d("Slave password set"); 102 } 103 push @$slaves, $make_cxn->(dsn => $slave_dsn, dbh => $dbh); 104 return; 105 }, 106 } 107 ); 108 } elsif ( $methods->[0] =~ m/^dsn=/i ) { 109 (my $dsn_table_dsn = join ",", @$methods) =~ s/^dsn=//i; 110 $slaves = $self->get_cxn_from_dsn_table( 111 %args, 112 dsn_table_dsn => $dsn_table_dsn, 113 ); 114 } 115 elsif ( $methods->[0] =~ m/none/i ) { 116 PTDEBUG && _d('Not getting to slaves'); 117 } 118 else { 119 die "Unexpected recursion methods: @$methods"; 120 } 121 122 return $slaves; 123} 124 125sub _resolve_recursion_methods { 126 my ($self, $dsn) = @_; 127 my $o = $self->{OptionParser}; 128 if ( $o->got('recursion-method') ) { 129 return $o->get('recursion-method'); 130 } 131 elsif ( $dsn && ($dsn->{P} || 3306) != 3306 ) { 132 # Special case: hosts is best when port is non-standard. 133 PTDEBUG && _d('Port number is non-standard; using only hosts method'); 134 return [qw(hosts)]; 135 } 136 else { 137 # Use the option's default. 138 return $o->get('recursion-method'); 139 } 140} 141 142# Sub: recurse_to_slaves 143# Descend to slaves by examining SHOW SLAVE HOSTS. 144# The callback gets the slave's DSN, dbh, parent, and the recursion level 145# as args. The recursion is tail recursion. 146# 147# Parameters: 148# $args - Hashref of arguments 149# $level - Recursion level 150# 151# Required Arguments: 152# dsn - The DSN to connect to; if no dbh arg, connect using this. 153# recurse - How many levels to recurse. 0 = none, undef = infinite. 154# callback - Code to execute after finding a new slave. 155# dsn_parser - <DSNParser> object 156# 157# Optional Arguments: 158# dbh - dbh 159# skip_callback - Execute with slaves that will be skipped. 160# method - Whether to prefer HOSTS over PROCESSLIST 161# parent - The DSN from which this call descended. 162sub recurse_to_slaves { 163 my ( $self, $args, $level ) = @_; 164 $level ||= 0; 165 my $dp = $self->{DSNParser}; 166 my $recurse = $args->{recurse} || $self->{OptionParser}->get('recurse'); 167 my $dsn = $args->{dsn}; 168 my $slave_user = $args->{slave_user} || ''; 169 my $slave_password = $args->{slave_password} || ''; 170 171 my $methods = $self->_resolve_recursion_methods($dsn); 172 PTDEBUG && _d('Recursion methods:', @$methods); 173 if ( lc($methods->[0]) eq 'none' ) { 174 PTDEBUG && _d('Not recursing to slaves'); 175 return; 176 } 177 178 my $slave_dsn = $dsn; 179 if ($slave_user) { 180 $slave_dsn->{u} = $slave_user; 181 PTDEBUG && _d("Using slave user $slave_user on ".$slave_dsn->{h}.":".$slave_dsn->{P}); 182 } 183 if ($slave_password) { 184 $slave_dsn->{p} = $slave_password; 185 PTDEBUG && _d("Slave password set"); 186 } 187 188 my $dbh; 189 eval { 190 $dbh = $args->{dbh} || $dp->get_dbh( 191 $dp->get_cxn_params($slave_dsn), { AutoCommit => 1 }); 192 PTDEBUG && _d('Connected to', $dp->as_string($slave_dsn)); 193 }; 194 if ( $EVAL_ERROR ) { 195 print STDERR "Cannot connect to ", $dp->as_string($slave_dsn), "\n" 196 or die "Cannot print: $OS_ERROR"; 197 return; 198 } 199 200 my $sql = 'SELECT @@SERVER_ID'; 201 PTDEBUG && _d($sql); 202 my ($id) = $dbh->selectrow_array($sql); 203 PTDEBUG && _d('Working on server ID', $id); 204 my $master_thinks_i_am = $dsn->{server_id}; 205 if ( !defined $id 206 || ( defined $master_thinks_i_am && $master_thinks_i_am != $id ) 207 || $args->{server_ids_seen}->{$id}++ 208 ) { 209 PTDEBUG && _d('Server ID seen, or not what master said'); 210 if ( $args->{skip_callback} ) { 211 $args->{skip_callback}->($dsn, $dbh, $level, $args->{parent}); 212 } 213 return; 214 } 215 216 $args->{callback}->($dsn, $dbh, $level, $args->{parent}); 217 218 if ( !defined $recurse || $level < $recurse ) { 219 220 my @slaves = 221 grep { !$_->{master_id} || $_->{master_id} == $id } # Only my slaves. 222 $self->find_slave_hosts($dp, $dbh, $dsn, $methods); 223 224 foreach my $slave ( @slaves ) { 225 PTDEBUG && _d('Recursing from', 226 $dp->as_string($dsn), 'to', $dp->as_string($slave)); 227 $self->recurse_to_slaves( 228 { %$args, dsn => $slave, dbh => undef, parent => $dsn, slave_user => $slave_user, $slave_password => $slave_password }, $level + 1 ); 229 } 230 } 231} 232 233# Finds slave hosts by trying different methods. The default preferred method 234# is trying SHOW PROCESSLIST (processlist) and guessing which ones are slaves, 235# and if that doesn't reveal anything, then try SHOW SLAVE STATUS (hosts). 236# One exception is if the port is non-standard (3306), indicating that the port 237# from SHOW SLAVE HOSTS may be important. Then only the hosts methods is used. 238# 239# Returns a list of DSN hashes. Optional extra keys in the DSN hash are 240# master_id and server_id. Also, the 'source' key is either 'processlist' or 241# 'hosts'. 242# 243# If a method is given, it becomes the preferred (first tried) method. 244# Searching stops as soon as a method finds slaves. 245sub find_slave_hosts { 246 my ( $self, $dsn_parser, $dbh, $dsn, $methods ) = @_; 247 248 PTDEBUG && _d('Looking for slaves on', $dsn_parser->as_string($dsn), 249 'using methods', @$methods); 250 251 my @slaves; 252 METHOD: 253 foreach my $method ( @$methods ) { 254 my $find_slaves = "_find_slaves_by_$method"; 255 PTDEBUG && _d('Finding slaves with', $find_slaves); 256 @slaves = $self->$find_slaves($dsn_parser, $dbh, $dsn); 257 last METHOD if @slaves; 258 } 259 260 PTDEBUG && _d('Found', scalar(@slaves), 'slaves'); 261 return @slaves; 262} 263 264sub _find_slaves_by_processlist { 265 my ( $self, $dsn_parser, $dbh, $dsn ) = @_; 266 my @connected_slaves = $self->get_connected_slaves($dbh); 267 my @slaves = $self->_process_slaves_list($dsn_parser, $dsn, \@connected_slaves); 268 return @slaves; 269} 270 271sub _process_slaves_list { 272 my ($self, $dsn_parser, $dsn, $connected_slaves) = @_; 273 my @slaves = map { 274 my $slave = $dsn_parser->parse("h=$_", $dsn); 275 $slave->{source} = 'processlist'; 276 $slave; 277 } 278 grep { $_ } 279 map { 280 my ( $host ) = $_->{host} =~ m/^(.*):\d+$/; 281 if ( $host eq 'localhost' ) { 282 $host = '127.0.0.1'; # Replication never uses sockets. 283 } 284 if ($host =~ m/::/) { 285 $host = '['.$host.']'; 286 } 287 $host; 288 } @$connected_slaves; 289 290 return @slaves; 291} 292 293# SHOW SLAVE HOSTS is significantly less reliable. 294# Machines tend to share the host list around with every machine in the 295# replication hierarchy, but they don't update each other when machines 296# disconnect or change to use a different master or something. So there is 297# lots of cruft in SHOW SLAVE HOSTS. 298sub _find_slaves_by_hosts { 299 my ( $self, $dsn_parser, $dbh, $dsn ) = @_; 300 301 my @slaves; 302 my $sql = 'SHOW SLAVE HOSTS'; 303 PTDEBUG && _d($dbh, $sql); 304 @slaves = @{$dbh->selectall_arrayref($sql, { Slice => {} })}; 305 306 # Convert SHOW SLAVE HOSTS into DSN hashes. 307 if ( @slaves ) { 308 PTDEBUG && _d('Found some SHOW SLAVE HOSTS info'); 309 @slaves = map { 310 my %hash; 311 @hash{ map { lc $_ } keys %$_ } = values %$_; 312 my $spec = "h=$hash{host},P=$hash{port}" 313 . ( $hash{user} ? ",u=$hash{user}" : '') 314 . ( $hash{password} ? ",p=$hash{password}" : ''); 315 my $dsn = $dsn_parser->parse($spec, $dsn); 316 $dsn->{server_id} = $hash{server_id}; 317 $dsn->{master_id} = $hash{master_id}; 318 $dsn->{source} = 'hosts'; 319 $dsn; 320 } @slaves; 321 } 322 323 return @slaves; 324} 325 326# Returns PROCESSLIST entries of connected slaves, normalized to lowercase 327# column names. 328sub get_connected_slaves { 329 my ( $self, $dbh ) = @_; 330 331 # Check for the PROCESS privilege. 332 my $show = "SHOW GRANTS FOR "; 333 my $user = 'CURRENT_USER()'; 334 my $sql = $show . $user; 335 PTDEBUG && _d($dbh, $sql); 336 337 my $proc; 338 eval { 339 $proc = grep { 340 m/ALL PRIVILEGES.*?\*\.\*|PROCESS/ 341 } @{$dbh->selectcol_arrayref($sql)}; 342 }; 343 if ( $EVAL_ERROR ) { 344 345 if ( $EVAL_ERROR =~ m/no such grant defined for user/ ) { 346 # Try again without a host. 347 PTDEBUG && _d('Retrying SHOW GRANTS without host; error:', 348 $EVAL_ERROR); 349 ($user) = split('@', $user); 350 $sql = $show . $user; 351 PTDEBUG && _d($sql); 352 eval { 353 $proc = grep { 354 m/ALL PRIVILEGES.*?\*\.\*|PROCESS/ 355 } @{$dbh->selectcol_arrayref($sql)}; 356 }; 357 } 358 359 # The 2nd try above might have cleared $EVAL_ERROR. 360 # If not, die now. 361 die "Failed to $sql: $EVAL_ERROR" if $EVAL_ERROR; 362 } 363 if ( !$proc ) { 364 die "You do not have the PROCESS privilege"; 365 } 366 367 $sql = 'SHOW FULL PROCESSLIST'; 368 PTDEBUG && _d($dbh, $sql); 369 # It's probably a slave if it's doing a binlog dump. 370 grep { $_->{command} =~ m/Binlog Dump/i } 371 map { # Lowercase the column names 372 my %hash; 373 @hash{ map { lc $_ } keys %$_ } = values %$_; 374 \%hash; 375 } 376 @{$dbh->selectall_arrayref($sql, { Slice => {} })}; 377} 378 379# Verifies that $master is really the master of $slave. This is not an exact 380# science, but there is a decent chance of catching some obvious cases when it 381# is not the master. If not the master, it dies; otherwise returns true. 382sub is_master_of { 383 my ( $self, $master, $slave ) = @_; 384 my $master_status = $self->get_master_status($master) 385 or die "The server specified as a master is not a master"; 386 my $slave_status = $self->get_slave_status($slave) 387 or die "The server specified as a slave is not a slave"; 388 my @connected = $self->get_connected_slaves($master) 389 or die "The server specified as a master has no connected slaves"; 390 my (undef, $port) = $master->selectrow_array("SHOW VARIABLES LIKE 'port'"); 391 392 if ( $port != $slave_status->{master_port} ) { 393 die "The slave is connected to $slave_status->{master_port} " 394 . "but the master's port is $port"; 395 } 396 397 if ( !grep { $slave_status->{master_user} eq $_->{user} } @connected ) { 398 die "I don't see any slave I/O thread connected with user " 399 . $slave_status->{master_user}; 400 } 401 402 if ( ($slave_status->{slave_io_state} || '') 403 eq 'Waiting for master to send event' ) 404 { 405 # The slave thinks its I/O thread is caught up to the master. Let's 406 # compare and make sure the master and slave are reasonably close to each 407 # other. Note that this is one of the few places where I check the I/O 408 # thread positions instead of the SQL thread positions! 409 # Master_Log_File/Read_Master_Log_Pos is the I/O thread's position on the 410 # master. 411 my ( $master_log_name, $master_log_num ) 412 = $master_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/; 413 my ( $slave_log_name, $slave_log_num ) 414 = $slave_status->{master_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/; 415 if ( $master_log_name ne $slave_log_name 416 || abs($master_log_num - $slave_log_num) > 1 ) 417 { 418 die "The slave thinks it is reading from " 419 . "$slave_status->{master_log_file}, but the " 420 . "master is writing to $master_status->{file}"; 421 } 422 } 423 return 1; 424} 425 426# Figures out how to connect to the master, by examining SHOW SLAVE STATUS. But 427# does NOT use the value from Master_User for the username, because typically we 428# want to perform operations as the username that was specified (usually to the 429# program's --user option, or in a DSN), rather than as the replication user, 430# which is often restricted. 431sub get_master_dsn { 432 my ( $self, $dbh, $dsn, $dsn_parser ) = @_; 433 my $master = $self->get_slave_status($dbh) or return undef; 434 my $spec = "h=$master->{master_host},P=$master->{master_port}"; 435 return $dsn_parser->parse($spec, $dsn); 436} 437 438# Gets SHOW SLAVE STATUS, with column names all lowercased, as a hashref. 439sub get_slave_status { 440 my ( $self, $dbh ) = @_; 441 442 if ( !$self->{not_a_slave}->{$dbh} ) { 443 my $sth = $self->{sths}->{$dbh}->{SLAVE_STATUS} 444 ||= $dbh->prepare('SHOW SLAVE STATUS'); 445 PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS'); 446 $sth->execute(); 447 my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows 448 449 # If SHOW SLAVE STATUS returns more than one row it means that this slave is connected to more 450 # than one master using replication channels. 451 # If we have a channel name as a parameter, we need to select the correct row and return it. 452 # If we don't have a channel name as a parameter, there is no way to know what the correct master is so, 453 # return an error. 454 my $ss; 455 if ( $sss_rows && @$sss_rows ) { 456 if (scalar @$sss_rows > 1) { 457 if (!$self->{channel}) { 458 die 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line'; 459 } 460 my $slave_use_channels; 461 for my $row (@$sss_rows) { 462 $row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys 463 if ($row->{channel_name}) { 464 $slave_use_channels = 1; 465 } 466 if ($row->{channel_name} eq $self->{channel}) { 467 $ss = $row; 468 last; 469 } 470 } 471 if (!$ss && $slave_use_channels) { 472 die 'This server is using replication channels but "channel" was not specified on the command line'; 473 } 474 } else { 475 if ($sss_rows->[0]->{channel_name} && $sss_rows->[0]->{channel_name} ne $self->{channel}) { 476 die 'This server is using replication channels but "channel" was not specified on the command line'; 477 } else { 478 $ss = $sss_rows->[0]; 479 } 480 } 481 482 if ( $ss && %$ss ) { 483 $ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys 484 return $ss; 485 } 486 if (!$ss && $self->{channel}) { 487 die "Specified channel name is invalid"; 488 } 489 } 490 491 PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS'); 492 $self->{not_a_slave}->{$dbh}++; 493 } 494} 495 496# Gets SHOW MASTER STATUS, with column names all lowercased, as a hashref. 497sub get_master_status { 498 my ( $self, $dbh ) = @_; 499 500 if ( $self->{not_a_master}->{$dbh} ) { 501 PTDEBUG && _d('Server on dbh', $dbh, 'is not a master'); 502 return; 503 } 504 505 my $sth = $self->{sths}->{$dbh}->{MASTER_STATUS} 506 ||= $dbh->prepare('SHOW MASTER STATUS'); 507 PTDEBUG && _d($dbh, 'SHOW MASTER STATUS'); 508 $sth->execute(); 509 my ($ms) = @{$sth->fetchall_arrayref({})}; 510 PTDEBUG && _d( 511 $ms ? map { "$_=" . (defined $ms->{$_} ? $ms->{$_} : '') } keys %$ms 512 : ''); 513 514 if ( !$ms || scalar keys %$ms < 2 ) { 515 PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a master'); 516 $self->{not_a_master}->{$dbh}++; 517 } 518 519 return { map { lc($_) => $ms->{$_} } keys %$ms }; # lowercase the keys 520} 521 522# Sub: wait_for_master 523# Execute MASTER_POS_WAIT() to make slave wait for its master. 524# 525# Parameters: 526# %args - Arguments 527# 528# Required Arguments: 529# * master_status - Hashref returned by <get_master_status()> 530# * slave_dbh - dbh for slave host 531# 532# Optional Arguments: 533# * timeout - Wait time in seconds (default 60) 534# 535# Returns: 536# Hashref with result of waiting, like: 537# (start code) 538# { 539# result => the result returned by MASTER_POS_WAIT: -1, undef, 0+ 540# waited => the number of seconds waited, might be zero 541# } 542# (end code) 543sub wait_for_master { 544 my ( $self, %args ) = @_; 545 my @required_args = qw(master_status slave_dbh); 546 foreach my $arg ( @required_args ) { 547 die "I need a $arg argument" unless $args{$arg}; 548 } 549 my ($master_status, $slave_dbh) = @args{@required_args}; 550 my $timeout = $args{timeout} || 60; 551 552 my $result; 553 my $waited; 554 if ( $master_status ) { 555 my $slave_status; 556 eval { 557 $slave_status = $self->get_slave_status($slave_dbh); 558 }; 559 if ($EVAL_ERROR) { 560 return { 561 result => undef, 562 waited => 0, 563 error =>'Wait for master: this is a multi-master slave but "channel" was not specified on the command line', 564 }; 565 } 566 my $server_version = VersionParser->new($slave_dbh); 567 my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : ''; 568 my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)"; 569 PTDEBUG && _d($slave_dbh, $sql); 570 my $start = time; 571 ($result) = $slave_dbh->selectrow_array($sql); 572 573 # If MASTER_POS_WAIT() returned NULL and we waited at least 1s 574 # and the time we waited is less than the timeout then this is 575 # a strong indication that the slave was stopped while we were 576 # waiting. 577 $waited = time - $start; 578 579 PTDEBUG && _d('Result of waiting:', $result); 580 PTDEBUG && _d("Waited", $waited, "seconds"); 581 } 582 else { 583 PTDEBUG && _d('Not waiting: this server is not a master'); 584 } 585 586 return { 587 result => $result, 588 waited => $waited, 589 }; 590} 591 592# Executes STOP SLAVE. 593sub stop_slave { 594 my ( $self, $dbh ) = @_; 595 my $sth = $self->{sths}->{$dbh}->{STOP_SLAVE} 596 ||= $dbh->prepare('STOP SLAVE'); 597 PTDEBUG && _d($dbh, $sth->{Statement}); 598 $sth->execute(); 599} 600 601# Executes START SLAVE, optionally with UNTIL. 602sub start_slave { 603 my ( $self, $dbh, $pos ) = @_; 604 if ( $pos ) { 605 # Just like with CHANGE MASTER TO, you can't quote the position. 606 my $sql = "START SLAVE UNTIL MASTER_LOG_FILE='$pos->{file}', " 607 . "MASTER_LOG_POS=$pos->{position}"; 608 PTDEBUG && _d($dbh, $sql); 609 $dbh->do($sql); 610 } 611 else { 612 my $sth = $self->{sths}->{$dbh}->{START_SLAVE} 613 ||= $dbh->prepare('START SLAVE'); 614 PTDEBUG && _d($dbh, $sth->{Statement}); 615 $sth->execute(); 616 } 617} 618 619# Waits for the slave to catch up to its master, using START SLAVE UNTIL. When 620# complete, the slave is caught up to the master, and the slave process is 621# stopped on both servers. 622sub catchup_to_master { 623 my ( $self, $slave, $master, $timeout ) = @_; 624 $self->stop_slave($master); 625 $self->stop_slave($slave); 626 my $slave_status = $self->get_slave_status($slave); 627 my $slave_pos = $self->repl_posn($slave_status); 628 my $master_status = $self->get_master_status($master); 629 my $master_pos = $self->repl_posn($master_status); 630 PTDEBUG && _d('Master position:', $self->pos_to_string($master_pos), 631 'Slave position:', $self->pos_to_string($slave_pos)); 632 633 my $result; 634 if ( $self->pos_cmp($slave_pos, $master_pos) < 0 ) { 635 PTDEBUG && _d('Waiting for slave to catch up to master'); 636 $self->start_slave($slave, $master_pos); 637 638 # The slave may catch up instantly and stop, in which case 639 # MASTER_POS_WAIT will return NULL and $result->{result} will be undef. 640 # We must catch this; if it returns NULL, then we check that 641 # its position is as desired. 642 # TODO: what if master_pos_wait times out and $result == -1? retry? 643 $result = $self->wait_for_master( 644 master_status => $master_status, 645 slave_dbh => $slave, 646 timeout => $timeout, 647 master_status => $master_status 648 ); 649 if ($result->{error}) { 650 die $result->{error}; 651 } 652 if ( !defined $result->{result} ) { 653 $slave_status = $self->get_slave_status($slave); 654 if ( !$self->slave_is_running($slave_status) ) { 655 PTDEBUG && _d('Master position:', 656 $self->pos_to_string($master_pos), 657 'Slave position:', $self->pos_to_string($slave_pos)); 658 $slave_pos = $self->repl_posn($slave_status); 659 if ( $self->pos_cmp($slave_pos, $master_pos) != 0 ) { 660 die "MASTER_POS_WAIT() returned NULL but slave has not " 661 . "caught up to master"; 662 } 663 PTDEBUG && _d('Slave is caught up to master and stopped'); 664 } 665 else { 666 die "Slave has not caught up to master and it is still running"; 667 } 668 } 669 } 670 else { 671 PTDEBUG && _d("Slave is already caught up to master"); 672 } 673 674 return $result; 675} 676 677# Makes one server catch up to the other in replication. When complete, both 678# servers are stopped and at the same position. 679sub catchup_to_same_pos { 680 my ( $self, $s1_dbh, $s2_dbh ) = @_; 681 $self->stop_slave($s1_dbh); 682 $self->stop_slave($s2_dbh); 683 my $s1_status = $self->get_slave_status($s1_dbh); 684 my $s2_status = $self->get_slave_status($s2_dbh); 685 my $s1_pos = $self->repl_posn($s1_status); 686 my $s2_pos = $self->repl_posn($s2_status); 687 if ( $self->pos_cmp($s1_pos, $s2_pos) < 0 ) { 688 $self->start_slave($s1_dbh, $s2_pos); 689 } 690 elsif ( $self->pos_cmp($s2_pos, $s1_pos) < 0 ) { 691 $self->start_slave($s2_dbh, $s1_pos); 692 } 693 694 # Re-fetch the replication statuses and positions. 695 $s1_status = $self->get_slave_status($s1_dbh); 696 $s2_status = $self->get_slave_status($s2_dbh); 697 $s1_pos = $self->repl_posn($s1_status); 698 $s2_pos = $self->repl_posn($s2_status); 699 700 # Verify that they are both stopped and are at the same position. 701 if ( $self->slave_is_running($s1_status) 702 || $self->slave_is_running($s2_status) 703 || $self->pos_cmp($s1_pos, $s2_pos) != 0) 704 { 705 die "The servers aren't both stopped at the same position"; 706 } 707 708} 709 710# Returns true if the slave is running. 711sub slave_is_running { 712 my ( $self, $slave_status ) = @_; 713 return ($slave_status->{slave_sql_running} || 'No') eq 'Yes'; 714} 715 716# Returns true if the server's log_slave_updates option is enabled. 717sub has_slave_updates { 718 my ( $self, $dbh ) = @_; 719 my $sql = q{SHOW VARIABLES LIKE 'log_slave_updates'}; 720 PTDEBUG && _d($dbh, $sql); 721 my ($name, $value) = $dbh->selectrow_array($sql); 722 return $value && $value =~ m/^(1|ON)$/; 723} 724 725# Extracts the replication position out of either SHOW MASTER STATUS or SHOW 726# SLAVE STATUS, and returns it as a hashref { file, position } 727sub repl_posn { 728 my ( $self, $status ) = @_; 729 if ( exists $status->{file} && exists $status->{position} ) { 730 # It's the output of SHOW MASTER STATUS 731 return { 732 file => $status->{file}, 733 position => $status->{position}, 734 }; 735 } 736 else { 737 return { 738 file => $status->{relay_master_log_file}, 739 position => $status->{exec_master_log_pos}, 740 }; 741 } 742} 743 744# Gets the slave's lag. TODO: permit using a heartbeat table. 745sub get_slave_lag { 746 my ( $self, $dbh ) = @_; 747 my $stat = $self->get_slave_status($dbh); 748 return unless $stat; # server is not a slave 749 return $stat->{seconds_behind_master}; 750} 751 752# Compares two replication positions and returns -1, 0, or 1 just as the cmp 753# operator does. 754sub pos_cmp { 755 my ( $self, $a, $b ) = @_; 756 return $self->pos_to_string($a) cmp $self->pos_to_string($b); 757} 758 759# Sub: short_host 760# Simplify a hostname as much as possible. For purposes of replication, a 761# hostname is really just the combination of hostname and port, since 762# replication always uses TCP connections (it does not work via sockets). If 763# the port is the default 3306, it is omitted. As a convenience, this sub 764# accepts either SHOW SLAVE STATUS or a DSN. 765# 766# Parameters: 767# $dsn - DSN hashref 768# 769# Returns: 770# Short hostname string 771sub short_host { 772 my ( $self, $dsn ) = @_; 773 my ($host, $port); 774 if ( $dsn->{master_host} ) { 775 $host = $dsn->{master_host}; 776 $port = $dsn->{master_port}; 777 } 778 else { 779 $host = $dsn->{h}; 780 $port = $dsn->{P}; 781 } 782 return ($host || '[default]') . ( ($port || 3306) == 3306 ? '' : ":$port" ); 783} 784 785# Sub: is_replication_thread 786# Determine if a processlist item is a replication thread. 787# 788# Parameters: 789# $query - Hashref of a processlist item 790# %args - Arguments 791# 792# Arguments: 793# type - Which kind of repl thread to match: 794# all, binlog_dump (master), slave_io, or slave_sql 795# (default: all) 796# check_known_ids - Check known replication thread IDs (default: yes) 797# 798# Returns: 799# True if the proclist item is the given type of replication thread. 800sub is_replication_thread { 801 my ( $self, $query, %args ) = @_; 802 return unless $query; 803 804 my $type = lc($args{type} || 'all'); 805 die "Invalid type: $type" 806 unless $type =~ m/^binlog_dump|slave_io|slave_sql|all$/i; 807 808 my $match = 0; 809 if ( $type =~ m/binlog_dump|all/i ) { 810 $match = 1 811 if ($query->{Command} || $query->{command} || '') eq "Binlog Dump"; 812 } 813 if ( !$match ) { 814 # On a slave, there are two threads. Both have user="system user". 815 if ( ($query->{User} || $query->{user} || '') eq "system user" ) { 816 PTDEBUG && _d("Slave replication thread"); 817 if ( $type ne 'all' ) { 818 # Match a particular slave thread. 819 my $state = $query->{State} || $query->{state} || ''; 820 821 if ( $state =~ m/^init|end$/ ) { 822 # http://code.google.com/p/maatkit/issues/detail?id=1121 823 PTDEBUG && _d("Special state:", $state); 824 $match = 1; 825 } 826 else { 827 # These patterns are abbreviated because if the first few words 828 # match chances are very high it's the full slave thd state. 829 my ($slave_sql) = $state =~ m/ 830 ^(Waiting\sfor\sthe\snext\sevent 831 |Reading\sevent\sfrom\sthe\srelay\slog 832 |Has\sread\sall\srelay\slog;\swaiting 833 |Making\stemp\sfile 834 |Waiting\sfor\sslave\smutex\son\sexit)/xi; 835 836 # Type is either "slave_sql" or "slave_io". The second line 837 # implies that if this isn't the sql thread then it must be 838 # the io thread, so match is true if we were supposed to match 839 # the io thread. 840 $match = $type eq 'slave_sql' && $slave_sql ? 1 841 : $type eq 'slave_io' && !$slave_sql ? 1 842 : 0; 843 } 844 } 845 else { 846 # Type is "all" and it's not a master (binlog_dump) thread, 847 # else we wouldn't have gotten here. It's either of the 2 848 # slave threads and we don't care which. 849 $match = 1; 850 } 851 } 852 else { 853 PTDEBUG && _d('Not system user'); 854 } 855 856 # MySQL loves to trick us. Sometimes a slave replication thread will 857 # temporarily morph into what looks like a regular user thread when 858 # really it's still the same slave repl thread. So here we save known 859 # repl thread IDs and check if a non-matching event is actually a 860 # known repl thread ID and if yes then we make it match. 861 if ( !defined $args{check_known_ids} || $args{check_known_ids} ) { 862 my $id = $query->{Id} || $query->{id}; 863 if ( $match ) { 864 $self->{replication_thread}->{$id} = 1; 865 } 866 else { 867 if ( $self->{replication_thread}->{$id} ) { 868 PTDEBUG && _d("Thread ID is a known replication thread ID"); 869 $match = 1; 870 } 871 } 872 } 873 } 874 875 PTDEBUG && _d('Matches', $type, 'replication thread:', 876 ($match ? 'yes' : 'no'), '; match:', $match); 877 878 return $match; 879} 880 881 882# Sub: get_replication_filters 883# Get any replication filters set on the host. 884# 885# Parameters: 886# %args - Arguments 887# 888# Required Arguments: 889# dbh - dbh, master or slave 890# 891# Returns: 892# Hashref of any replication filters. If none are set, an empty hashref 893# is returned. 894sub get_replication_filters { 895 my ( $self, %args ) = @_; 896 my @required_args = qw(dbh); 897 foreach my $arg ( @required_args ) { 898 die "I need a $arg argument" unless $args{$arg}; 899 } 900 my ($dbh) = @args{@required_args}; 901 902 my %filters = (); 903 904 my $status = $self->get_master_status($dbh); 905 if ( $status ) { 906 map { $filters{$_} = $status->{$_} } 907 grep { defined $status->{$_} && $status->{$_} ne '' } 908 qw( 909 binlog_do_db 910 binlog_ignore_db 911 ); 912 } 913 914 $status = $self->get_slave_status($dbh); 915 if ( $status ) { 916 map { $filters{$_} = $status->{$_} } 917 grep { defined $status->{$_} && $status->{$_} ne '' } 918 qw( 919 replicate_do_db 920 replicate_ignore_db 921 replicate_do_table 922 replicate_ignore_table 923 replicate_wild_do_table 924 replicate_wild_ignore_table 925 ); 926 927 my $sql = "SHOW VARIABLES LIKE 'slave_skip_errors'"; 928 PTDEBUG && _d($dbh, $sql); 929 my $row = $dbh->selectrow_arrayref($sql); 930 # "OFF" in 5.0, "" in 5.1 931 $filters{slave_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF'; 932 } 933 934 return \%filters; 935} 936 937 938# Sub: pos_to_string 939# Stringify a position in a way that's string-comparable. 940# 941# Parameters: 942# $pos - Hashref with file and position 943# 944# Returns: 945# String like "file/posNNNNN" 946sub pos_to_string { 947 my ( $self, $pos ) = @_; 948 my $fmt = '%s/%020d'; 949 return sprintf($fmt, @{$pos}{qw(file position)}); 950} 951 952sub reset_known_replication_threads { 953 my ( $self ) = @_; 954 $self->{replication_thread} = {}; 955 return; 956} 957 958sub get_cxn_from_dsn_table { 959 my ($self, %args) = @_; 960 my @required_args = qw(dsn_table_dsn make_cxn); 961 foreach my $arg ( @required_args ) { 962 die "I need a $arg argument" unless $args{$arg}; 963 } 964 my ($dsn_table_dsn, $make_cxn) = @args{@required_args}; 965 PTDEBUG && _d('DSN table DSN:', $dsn_table_dsn); 966 967 my $dp = $self->{DSNParser}; 968 my $q = $self->{Quoter}; 969 970 my $dsn = $dp->parse($dsn_table_dsn); 971 my $dsn_table; 972 if ( $dsn->{D} && $dsn->{t} ) { 973 $dsn_table = $q->quote($dsn->{D}, $dsn->{t}); 974 } 975 elsif ( $dsn->{t} && $dsn->{t} =~ m/\./ ) { 976 $dsn_table = $q->quote($q->split_unquote($dsn->{t})); 977 } 978 else { 979 die "DSN table DSN does not specify a database (D) " 980 . "or a database-qualified table (t)"; 981 } 982 983 my $dsn_tbl_cxn = $make_cxn->(dsn => $dsn); 984 my $dbh = $dsn_tbl_cxn->connect(); 985 my $sql = "SELECT dsn FROM $dsn_table ORDER BY id"; 986 PTDEBUG && _d($sql); 987 my $dsn_strings = $dbh->selectcol_arrayref($sql); 988 my @cxn; 989 if ( $dsn_strings ) { 990 foreach my $dsn_string ( @$dsn_strings ) { 991 PTDEBUG && _d('DSN from DSN table:', $dsn_string); 992 push @cxn, $make_cxn->(dsn_string => $dsn_string); 993 } 994 } 995 return \@cxn; 996} 997 998sub _d { 999 my ($package, undef, $line) = caller 0; 1000 @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } 1001 map { defined $_ ? $_ : 'undef' } 1002 @_; 1003 print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; 1004} 1005 10061; 1007} 1008# ########################################################################### 1009# End MasterSlave package 1010# ########################################################################### 1011