1# This program is copyright 2007-2011 Baron Schwartz, 2011-2012 Percona Ireland Ltd.
2# Feedback and improvements are welcome.
3#
4# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
5# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
6# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
7#
8# This program is free software; you can redistribute it and/or modify it under
9# the terms of the GNU General Public License as published by the Free Software
10# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
11# systems, you can issue `man perlgpl' or `man perlartistic' to read these
12# licenses.
13#
14# You should have received a copy of the GNU General Public License along with
15# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
16# Place, Suite 330, Boston, MA  02111-1307  USA.
17# ###########################################################################
18# MasterSlave package
19# ###########################################################################
20{
21# Package: MasterSlave
22# MasterSlave handles common tasks related to master-slave setups.
23package MasterSlave;
24
25use strict;
26use warnings FATAL => 'all';
27use English qw(-no_match_vars);
28use constant PTDEBUG => $ENV{PTDEBUG} || 0;
29
30# Sub: check_recursion_method
31#   Check that the arrayref of recursion methods passed in is valid
32sub check_recursion_method {
33   my ($methods) = @_;
34   if ( @$methods != 1 ) {
35      if ( grep({ !m/processlist|hosts/i } @$methods)
36            && $methods->[0] !~ /^dsn=/i )
37      {
38         die  "Invalid combination of recursion methods: "
39            . join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". "
40            . "Only hosts and processlist may be combined.\n"
41      }
42   }
43   else {
44      my ($method) = @$methods;
45      die "Invalid recursion method: " . ( $method || 'undef' )
46         unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i;
47   }
48}
49
50sub new {
51   my ( $class, %args ) = @_;
52   my @required_args = qw(OptionParser DSNParser Quoter);
53   foreach my $arg ( @required_args ) {
54      die "I need a $arg argument" unless $args{$arg};
55   }
56   my $self = {
57      %args,
58      replication_thread => {},
59   };
60   return bless $self, $class;
61}
62
63sub get_slaves {
64   my ($self, %args) = @_;
65   my @required_args = qw(make_cxn);
66   foreach my $arg ( @required_args ) {
67      die "I need a $arg argument" unless $args{$arg};
68   }
69   my ($make_cxn) = @args{@required_args};
70
71   my $slaves  = [];
72   my $dp      = $self->{DSNParser};
73   my $methods = $self->_resolve_recursion_methods($args{dsn});
74
75   return $slaves unless @$methods;
76
77   if ( grep { m/processlist|hosts/i } @$methods ) {
78      my @required_args = qw(dbh dsn);
79      foreach my $arg ( @required_args ) {
80         die "I need a $arg argument" unless $args{$arg};
81      }
82      my ($dbh, $dsn) = @args{@required_args};
83      my $o = $self->{OptionParser};
84
85      $self->recurse_to_slaves(
86         {  dbh            => $dbh,
87            dsn            => $dsn,
88            slave_user     => $o->got('slave-user') ? $o->get('slave-user') : '',
89            slave_password => $o->got('slave-password') ? $o->get('slave-password') : '',
90            callback  => sub {
91               my ( $dsn, $dbh, $level, $parent ) = @_;
92               return unless $level;
93               PTDEBUG && _d('Found slave:', $dp->as_string($dsn));
94               my $slave_dsn = $dsn;
95               if ($o->got('slave-user')) {
96                  $slave_dsn->{u} = $o->get('slave-user');
97                  PTDEBUG && _d("Using slave user ".$o->get('slave-user')." on ".$slave_dsn->{h}.":".$slave_dsn->{P});
98               }
99               if ($o->got('slave-password')) {
100                  $slave_dsn->{p} = $o->get('slave-password');
101                  PTDEBUG && _d("Slave password set");
102               }
103               push @$slaves, $make_cxn->(dsn => $slave_dsn, dbh => $dbh);
104               return;
105            },
106         }
107      );
108   } elsif ( $methods->[0] =~ m/^dsn=/i ) {
109      (my $dsn_table_dsn = join ",", @$methods) =~ s/^dsn=//i;
110      $slaves = $self->get_cxn_from_dsn_table(
111         %args,
112         dsn_table_dsn => $dsn_table_dsn,
113      );
114   }
115   elsif ( $methods->[0] =~ m/none/i ) {
116      PTDEBUG && _d('Not getting to slaves');
117   }
118   else {
119      die "Unexpected recursion methods: @$methods";
120   }
121
122   return $slaves;
123}
124
125sub _resolve_recursion_methods {
126   my ($self, $dsn) = @_;
127   my $o = $self->{OptionParser};
128   if ( $o->got('recursion-method') ) {
129      return $o->get('recursion-method');
130   }
131   elsif ( $dsn && ($dsn->{P} || 3306) != 3306 ) {
132      # Special case: hosts is best when port is non-standard.
133      PTDEBUG && _d('Port number is non-standard; using only hosts method');
134      return [qw(hosts)];
135   }
136   else {
137      # Use the option's default.
138      return $o->get('recursion-method');
139   }
140}
141
142# Sub: recurse_to_slaves
143#   Descend to slaves by examining SHOW SLAVE HOSTS.
144#   The callback gets the slave's DSN, dbh, parent, and the recursion level
145#   as args.  The recursion is tail recursion.
146#
147# Parameters:
148#   $args  - Hashref of arguments
149#   $level - Recursion level
150#
151# Required Arguments:
152#   dsn           - The DSN to connect to; if no dbh arg, connect using this.
153#   recurse       - How many levels to recurse. 0 = none, undef = infinite.
154#   callback      - Code to execute after finding a new slave.
155#   dsn_parser    - <DSNParser> object
156#
157# Optional Arguments:
158#   dbh           - dbh
159#   skip_callback - Execute with slaves that will be skipped.
160#   method        - Whether to prefer HOSTS over PROCESSLIST
161#   parent        - The DSN from which this call descended.
162sub recurse_to_slaves {
163   my ( $self, $args, $level ) = @_;
164   $level ||= 0;
165   my $dp = $self->{DSNParser};
166   my $recurse = $args->{recurse} || $self->{OptionParser}->get('recurse');
167   my $dsn = $args->{dsn};
168   my $slave_user = $args->{slave_user} || '';
169   my $slave_password = $args->{slave_password} || '';
170
171   my $methods = $self->_resolve_recursion_methods($dsn);
172   PTDEBUG && _d('Recursion methods:', @$methods);
173   if ( lc($methods->[0]) eq 'none' ) {
174      PTDEBUG && _d('Not recursing to slaves');
175      return;
176   }
177
178   my $slave_dsn = $dsn;
179   if ($slave_user) {
180      $slave_dsn->{u} = $slave_user;
181      PTDEBUG && _d("Using slave user $slave_user on ".$slave_dsn->{h}.":".$slave_dsn->{P});
182   }
183   if ($slave_password) {
184      $slave_dsn->{p} = $slave_password;
185      PTDEBUG && _d("Slave password set");
186   }
187
188   my $dbh;
189   eval {
190      $dbh = $args->{dbh} || $dp->get_dbh(
191         $dp->get_cxn_params($slave_dsn), { AutoCommit => 1 });
192      PTDEBUG && _d('Connected to', $dp->as_string($slave_dsn));
193   };
194   if ( $EVAL_ERROR ) {
195      print STDERR "Cannot connect to ", $dp->as_string($slave_dsn), "\n"
196         or die "Cannot print: $OS_ERROR";
197      return;
198   }
199
200   my $sql  = 'SELECT @@SERVER_ID';
201   PTDEBUG && _d($sql);
202   my ($id) = $dbh->selectrow_array($sql);
203   PTDEBUG && _d('Working on server ID', $id);
204   my $master_thinks_i_am = $dsn->{server_id};
205   if ( !defined $id
206       || ( defined $master_thinks_i_am && $master_thinks_i_am != $id )
207       || $args->{server_ids_seen}->{$id}++
208   ) {
209      PTDEBUG && _d('Server ID seen, or not what master said');
210      if ( $args->{skip_callback} ) {
211         $args->{skip_callback}->($dsn, $dbh, $level, $args->{parent});
212      }
213      return;
214   }
215
216   $args->{callback}->($dsn, $dbh, $level, $args->{parent});
217
218   if ( !defined $recurse || $level < $recurse ) {
219
220      my @slaves =
221         grep { !$_->{master_id} || $_->{master_id} == $id } # Only my slaves.
222         $self->find_slave_hosts($dp, $dbh, $dsn, $methods);
223
224      foreach my $slave ( @slaves ) {
225         PTDEBUG && _d('Recursing from',
226            $dp->as_string($dsn), 'to', $dp->as_string($slave));
227         $self->recurse_to_slaves(
228            { %$args, dsn => $slave, dbh => undef, parent => $dsn, slave_user => $slave_user, $slave_password => $slave_password }, $level + 1 );
229      }
230   }
231}
232
233# Finds slave hosts by trying different methods.  The default preferred method
234# is trying SHOW PROCESSLIST (processlist) and guessing which ones are slaves,
235# and if that doesn't reveal anything, then try SHOW SLAVE STATUS (hosts).
236# One exception is if the port is non-standard (3306), indicating that the port
237# from SHOW SLAVE HOSTS may be important.  Then only the hosts methods is used.
238#
239# Returns a list of DSN hashes.  Optional extra keys in the DSN hash are
240# master_id and server_id.  Also, the 'source' key is either 'processlist' or
241# 'hosts'.
242#
243# If a method is given, it becomes the preferred (first tried) method.
244# Searching stops as soon as a method finds slaves.
245sub find_slave_hosts {
246   my ( $self, $dsn_parser, $dbh, $dsn, $methods ) = @_;
247
248   PTDEBUG && _d('Looking for slaves on', $dsn_parser->as_string($dsn),
249      'using methods', @$methods);
250
251   my @slaves;
252   METHOD:
253   foreach my $method ( @$methods ) {
254      my $find_slaves = "_find_slaves_by_$method";
255      PTDEBUG && _d('Finding slaves with', $find_slaves);
256      @slaves = $self->$find_slaves($dsn_parser, $dbh, $dsn);
257      last METHOD if @slaves;
258   }
259
260   PTDEBUG && _d('Found', scalar(@slaves), 'slaves');
261   return @slaves;
262}
263
264sub _find_slaves_by_processlist {
265   my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
266   my @connected_slaves = $self->get_connected_slaves($dbh);
267   my @slaves = $self->_process_slaves_list($dsn_parser, $dsn, \@connected_slaves);
268   return @slaves;
269}
270
271sub _process_slaves_list {
272   my ($self, $dsn_parser, $dsn, $connected_slaves) = @_;
273   my @slaves = map  {
274      my $slave        = $dsn_parser->parse("h=$_", $dsn);
275      $slave->{source} = 'processlist';
276      $slave;
277   }
278   grep { $_ }
279   map  {
280      my ( $host ) = $_->{host} =~ m/^(.*):\d+$/;
281      if ( $host eq 'localhost' ) {
282         $host = '127.0.0.1'; # Replication never uses sockets.
283      }
284      if ($host =~ m/::/) {
285          $host = '['.$host.']';
286      }
287      $host;
288   } @$connected_slaves;
289
290   return @slaves;
291}
292
293# SHOW SLAVE HOSTS is significantly less reliable.
294# Machines tend to share the host list around with every machine in the
295# replication hierarchy, but they don't update each other when machines
296# disconnect or change to use a different master or something.  So there is
297# lots of cruft in SHOW SLAVE HOSTS.
298sub _find_slaves_by_hosts {
299   my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
300
301   my @slaves;
302   my $sql = 'SHOW SLAVE HOSTS';
303   PTDEBUG && _d($dbh, $sql);
304   @slaves = @{$dbh->selectall_arrayref($sql, { Slice => {} })};
305
306   # Convert SHOW SLAVE HOSTS into DSN hashes.
307   if ( @slaves ) {
308      PTDEBUG && _d('Found some SHOW SLAVE HOSTS info');
309      @slaves = map {
310         my %hash;
311         @hash{ map { lc $_ } keys %$_ } = values %$_;
312         my $spec = "h=$hash{host},P=$hash{port}"
313            . ( $hash{user} ? ",u=$hash{user}" : '')
314            . ( $hash{password} ? ",p=$hash{password}" : '');
315         my $dsn           = $dsn_parser->parse($spec, $dsn);
316         $dsn->{server_id} = $hash{server_id};
317         $dsn->{master_id} = $hash{master_id};
318         $dsn->{source}    = 'hosts';
319         $dsn;
320      } @slaves;
321   }
322
323   return @slaves;
324}
325
326# Returns PROCESSLIST entries of connected slaves, normalized to lowercase
327# column names.
328sub get_connected_slaves {
329   my ( $self, $dbh ) = @_;
330
331   # Check for the PROCESS privilege.
332   my $show = "SHOW GRANTS FOR ";
333   my $user = 'CURRENT_USER()';
334   my $sql = $show . $user;
335   PTDEBUG && _d($dbh, $sql);
336
337   my $proc;
338   eval {
339      $proc = grep {
340         m/ALL PRIVILEGES.*?\*\.\*|PROCESS/
341      } @{$dbh->selectcol_arrayref($sql)};
342   };
343   if ( $EVAL_ERROR ) {
344
345      if ( $EVAL_ERROR =~ m/no such grant defined for user/ ) {
346         # Try again without a host.
347         PTDEBUG && _d('Retrying SHOW GRANTS without host; error:',
348            $EVAL_ERROR);
349         ($user) = split('@', $user);
350         $sql    = $show . $user;
351         PTDEBUG && _d($sql);
352         eval {
353            $proc = grep {
354               m/ALL PRIVILEGES.*?\*\.\*|PROCESS/
355            } @{$dbh->selectcol_arrayref($sql)};
356         };
357      }
358
359      # The 2nd try above might have cleared $EVAL_ERROR.
360      # If not, die now.
361      die "Failed to $sql: $EVAL_ERROR" if $EVAL_ERROR;
362   }
363   if ( !$proc ) {
364      die "You do not have the PROCESS privilege";
365   }
366
367   $sql = 'SHOW FULL PROCESSLIST';
368   PTDEBUG && _d($dbh, $sql);
369   # It's probably a slave if it's doing a binlog dump.
370   grep { $_->{command} =~ m/Binlog Dump/i }
371   map  { # Lowercase the column names
372      my %hash;
373      @hash{ map { lc $_ } keys %$_ } = values %$_;
374      \%hash;
375   }
376   @{$dbh->selectall_arrayref($sql, { Slice => {} })};
377}
378
379# Verifies that $master is really the master of $slave.  This is not an exact
380# science, but there is a decent chance of catching some obvious cases when it
381# is not the master.  If not the master, it dies; otherwise returns true.
382sub is_master_of {
383   my ( $self, $master, $slave ) = @_;
384   my $master_status = $self->get_master_status($master)
385      or die "The server specified as a master is not a master";
386   my $slave_status  = $self->get_slave_status($slave)
387      or die "The server specified as a slave is not a slave";
388   my @connected     = $self->get_connected_slaves($master)
389      or die "The server specified as a master has no connected slaves";
390   my (undef, $port) = $master->selectrow_array("SHOW VARIABLES LIKE 'port'");
391
392   if ( $port != $slave_status->{master_port} ) {
393      die "The slave is connected to $slave_status->{master_port} "
394         . "but the master's port is $port";
395   }
396
397   if ( !grep { $slave_status->{master_user} eq $_->{user} } @connected ) {
398      die "I don't see any slave I/O thread connected with user "
399         . $slave_status->{master_user};
400   }
401
402   if ( ($slave_status->{slave_io_state} || '')
403      eq 'Waiting for master to send event' )
404   {
405      # The slave thinks its I/O thread is caught up to the master.  Let's
406      # compare and make sure the master and slave are reasonably close to each
407      # other.  Note that this is one of the few places where I check the I/O
408      # thread positions instead of the SQL thread positions!
409      # Master_Log_File/Read_Master_Log_Pos is the I/O thread's position on the
410      # master.
411      my ( $master_log_name, $master_log_num )
412         = $master_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
413      my ( $slave_log_name, $slave_log_num )
414         = $slave_status->{master_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
415      if ( $master_log_name ne $slave_log_name
416         || abs($master_log_num - $slave_log_num) > 1 )
417      {
418         die "The slave thinks it is reading from "
419            . "$slave_status->{master_log_file},  but the "
420            . "master is writing to $master_status->{file}";
421      }
422   }
423   return 1;
424}
425
426# Figures out how to connect to the master, by examining SHOW SLAVE STATUS.  But
427# does NOT use the value from Master_User for the username, because typically we
428# want to perform operations as the username that was specified (usually to the
429# program's --user option, or in a DSN), rather than as the replication user,
430# which is often restricted.
431sub get_master_dsn {
432   my ( $self, $dbh, $dsn, $dsn_parser ) = @_;
433   my $master = $self->get_slave_status($dbh) or return undef;
434   my $spec   = "h=$master->{master_host},P=$master->{master_port}";
435   return       $dsn_parser->parse($spec, $dsn);
436}
437
438# Gets SHOW SLAVE STATUS, with column names all lowercased, as a hashref.
439sub get_slave_status {
440   my ( $self, $dbh ) = @_;
441
442   if ( !$self->{not_a_slave}->{$dbh} ) {
443      my $sth = $self->{sths}->{$dbh}->{SLAVE_STATUS}
444            ||= $dbh->prepare('SHOW SLAVE STATUS');
445      PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
446      $sth->execute();
447      my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
448
449      # If SHOW SLAVE STATUS returns more than one row it means that this slave is connected to more
450      # than one master using replication channels.
451      # If we have a channel name as a parameter, we need to select the correct row and return it.
452      # If we don't have a channel name as a parameter, there is no way to know what the correct master is so,
453      # return an error.
454      my $ss;
455      if ( $sss_rows && @$sss_rows ) {
456          if (scalar @$sss_rows > 1) {
457              if (!$self->{channel}) {
458                  die 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
459              }
460              my $slave_use_channels;
461              for my $row (@$sss_rows) {
462                  $row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
463                  if ($row->{channel_name}) {
464                      $slave_use_channels = 1;
465                  }
466                  if ($row->{channel_name} eq $self->{channel}) {
467                      $ss = $row;
468                      last;
469                  }
470              }
471              if (!$ss && $slave_use_channels) {
472                 die 'This server is using replication channels but "channel" was not specified on the command line';
473              }
474          } else {
475              if ($sss_rows->[0]->{channel_name} && $sss_rows->[0]->{channel_name} ne $self->{channel}) {
476                  die 'This server is using replication channels but "channel" was not specified on the command line';
477              } else {
478                  $ss = $sss_rows->[0];
479              }
480          }
481
482          if ( $ss && %$ss ) {
483             $ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
484             return $ss;
485          }
486          if (!$ss && $self->{channel}) {
487              die "Specified channel name is invalid";
488          }
489      }
490
491      PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
492      $self->{not_a_slave}->{$dbh}++;
493  }
494}
495
496# Gets SHOW MASTER STATUS, with column names all lowercased, as a hashref.
497sub get_master_status {
498   my ( $self, $dbh ) = @_;
499
500   if ( $self->{not_a_master}->{$dbh} ) {
501      PTDEBUG && _d('Server on dbh', $dbh, 'is not a master');
502      return;
503   }
504
505   my $sth = $self->{sths}->{$dbh}->{MASTER_STATUS}
506         ||= $dbh->prepare('SHOW MASTER STATUS');
507   PTDEBUG && _d($dbh, 'SHOW MASTER STATUS');
508   $sth->execute();
509   my ($ms) = @{$sth->fetchall_arrayref({})};
510   PTDEBUG && _d(
511      $ms ? map { "$_=" . (defined $ms->{$_} ? $ms->{$_} : '') } keys %$ms
512          : '');
513
514   if ( !$ms || scalar keys %$ms < 2 ) {
515      PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a master');
516      $self->{not_a_master}->{$dbh}++;
517   }
518
519  return { map { lc($_) => $ms->{$_} } keys %$ms }; # lowercase the keys
520}
521
522# Sub: wait_for_master
523#   Execute MASTER_POS_WAIT() to make slave wait for its master.
524#
525# Parameters:
526#   %args - Arguments
527#
528# Required Arguments:
529#   * master_status - Hashref returned by <get_master_status()>
530#   * slave_dbh     - dbh for slave host
531#
532# Optional Arguments:
533#   * timeout - Wait time in seconds (default 60)
534#
535# Returns:
536#   Hashref with result of waiting, like:
537#   (start code)
538#   {
539#     result => the result returned by MASTER_POS_WAIT: -1, undef, 0+
540#     waited => the number of seconds waited, might be zero
541#   }
542#   (end code)
543sub wait_for_master {
544   my ( $self, %args ) = @_;
545   my @required_args = qw(master_status slave_dbh);
546   foreach my $arg ( @required_args ) {
547      die "I need a $arg argument" unless $args{$arg};
548   }
549   my ($master_status, $slave_dbh) = @args{@required_args};
550   my $timeout       = $args{timeout} || 60;
551
552   my $result;
553   my $waited;
554   if ( $master_status ) {
555      my $slave_status;
556      eval {
557          $slave_status = $self->get_slave_status($slave_dbh);
558      };
559      if ($EVAL_ERROR) {
560          return {
561              result => undef,
562              waited => 0,
563              error  =>'Wait for master: this is a multi-master slave but "channel" was not specified on the command line',
564          };
565      }
566      my $server_version = VersionParser->new($slave_dbh);
567      my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
568      my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
569      PTDEBUG && _d($slave_dbh, $sql);
570      my $start = time;
571      ($result) = $slave_dbh->selectrow_array($sql);
572
573      # If MASTER_POS_WAIT() returned NULL and we waited at least 1s
574      # and the time we waited is less than the timeout then this is
575      # a strong indication that the slave was stopped while we were
576      # waiting.
577      $waited = time - $start;
578
579      PTDEBUG && _d('Result of waiting:', $result);
580      PTDEBUG && _d("Waited", $waited, "seconds");
581   }
582   else {
583      PTDEBUG && _d('Not waiting: this server is not a master');
584   }
585
586   return {
587      result => $result,
588      waited => $waited,
589   };
590}
591
592# Executes STOP SLAVE.
593sub stop_slave {
594   my ( $self, $dbh ) = @_;
595   my $sth = $self->{sths}->{$dbh}->{STOP_SLAVE}
596         ||= $dbh->prepare('STOP SLAVE');
597   PTDEBUG && _d($dbh, $sth->{Statement});
598   $sth->execute();
599}
600
601# Executes START SLAVE, optionally with UNTIL.
602sub start_slave {
603   my ( $self, $dbh, $pos ) = @_;
604   if ( $pos ) {
605      # Just like with CHANGE MASTER TO, you can't quote the position.
606      my $sql = "START SLAVE UNTIL MASTER_LOG_FILE='$pos->{file}', "
607              . "MASTER_LOG_POS=$pos->{position}";
608      PTDEBUG && _d($dbh, $sql);
609      $dbh->do($sql);
610   }
611   else {
612      my $sth = $self->{sths}->{$dbh}->{START_SLAVE}
613            ||= $dbh->prepare('START SLAVE');
614      PTDEBUG && _d($dbh, $sth->{Statement});
615      $sth->execute();
616   }
617}
618
619# Waits for the slave to catch up to its master, using START SLAVE UNTIL.  When
620# complete, the slave is caught up to the master, and the slave process is
621# stopped on both servers.
622sub catchup_to_master {
623   my ( $self, $slave, $master, $timeout ) = @_;
624   $self->stop_slave($master);
625   $self->stop_slave($slave);
626   my $slave_status  = $self->get_slave_status($slave);
627   my $slave_pos     = $self->repl_posn($slave_status);
628   my $master_status = $self->get_master_status($master);
629   my $master_pos    = $self->repl_posn($master_status);
630   PTDEBUG && _d('Master position:', $self->pos_to_string($master_pos),
631      'Slave position:', $self->pos_to_string($slave_pos));
632
633   my $result;
634   if ( $self->pos_cmp($slave_pos, $master_pos) < 0 ) {
635      PTDEBUG && _d('Waiting for slave to catch up to master');
636      $self->start_slave($slave, $master_pos);
637
638      # The slave may catch up instantly and stop, in which case
639      # MASTER_POS_WAIT will return NULL and $result->{result} will be undef.
640      # We must catch this; if it returns NULL, then we check that
641      # its position is as desired.
642      # TODO: what if master_pos_wait times out and $result == -1? retry?
643      $result = $self->wait_for_master(
644            master_status => $master_status,
645            slave_dbh     => $slave,
646            timeout       => $timeout,
647            master_status => $master_status
648      );
649      if ($result->{error}) {
650          die $result->{error};
651      }
652      if ( !defined $result->{result} ) {
653         $slave_status = $self->get_slave_status($slave);
654         if ( !$self->slave_is_running($slave_status) ) {
655            PTDEBUG && _d('Master position:',
656               $self->pos_to_string($master_pos),
657               'Slave position:', $self->pos_to_string($slave_pos));
658            $slave_pos = $self->repl_posn($slave_status);
659            if ( $self->pos_cmp($slave_pos, $master_pos) != 0 ) {
660               die "MASTER_POS_WAIT() returned NULL but slave has not "
661                  . "caught up to master";
662            }
663            PTDEBUG && _d('Slave is caught up to master and stopped');
664         }
665         else {
666            die "Slave has not caught up to master and it is still running";
667         }
668      }
669   }
670   else {
671      PTDEBUG && _d("Slave is already caught up to master");
672   }
673
674   return $result;
675}
676
677# Makes one server catch up to the other in replication.  When complete, both
678# servers are stopped and at the same position.
679sub catchup_to_same_pos {
680   my ( $self, $s1_dbh, $s2_dbh ) = @_;
681   $self->stop_slave($s1_dbh);
682   $self->stop_slave($s2_dbh);
683   my $s1_status = $self->get_slave_status($s1_dbh);
684   my $s2_status = $self->get_slave_status($s2_dbh);
685   my $s1_pos    = $self->repl_posn($s1_status);
686   my $s2_pos    = $self->repl_posn($s2_status);
687   if ( $self->pos_cmp($s1_pos, $s2_pos) < 0 ) {
688      $self->start_slave($s1_dbh, $s2_pos);
689   }
690   elsif ( $self->pos_cmp($s2_pos, $s1_pos) < 0 ) {
691      $self->start_slave($s2_dbh, $s1_pos);
692   }
693
694   # Re-fetch the replication statuses and positions.
695   $s1_status = $self->get_slave_status($s1_dbh);
696   $s2_status = $self->get_slave_status($s2_dbh);
697   $s1_pos    = $self->repl_posn($s1_status);
698   $s2_pos    = $self->repl_posn($s2_status);
699
700   # Verify that they are both stopped and are at the same position.
701   if ( $self->slave_is_running($s1_status)
702     || $self->slave_is_running($s2_status)
703     || $self->pos_cmp($s1_pos, $s2_pos) != 0)
704   {
705      die "The servers aren't both stopped at the same position";
706   }
707
708}
709
710# Returns true if the slave is running.
711sub slave_is_running {
712   my ( $self, $slave_status ) = @_;
713   return ($slave_status->{slave_sql_running} || 'No') eq 'Yes';
714}
715
716# Returns true if the server's log_slave_updates option is enabled.
717sub has_slave_updates {
718   my ( $self, $dbh ) = @_;
719   my $sql = q{SHOW VARIABLES LIKE 'log_slave_updates'};
720   PTDEBUG && _d($dbh, $sql);
721   my ($name, $value) = $dbh->selectrow_array($sql);
722   return $value && $value =~ m/^(1|ON)$/;
723}
724
725# Extracts the replication position out of either SHOW MASTER STATUS or SHOW
726# SLAVE STATUS, and returns it as a hashref { file, position }
727sub repl_posn {
728   my ( $self, $status ) = @_;
729   if ( exists $status->{file} && exists $status->{position} ) {
730      # It's the output of SHOW MASTER STATUS
731      return {
732         file     => $status->{file},
733         position => $status->{position},
734      };
735   }
736   else {
737      return {
738         file     => $status->{relay_master_log_file},
739         position => $status->{exec_master_log_pos},
740      };
741   }
742}
743
744# Gets the slave's lag.  TODO: permit using a heartbeat table.
745sub get_slave_lag {
746   my ( $self, $dbh ) = @_;
747   my $stat = $self->get_slave_status($dbh);
748   return unless $stat;  # server is not a slave
749   return $stat->{seconds_behind_master};
750}
751
752# Compares two replication positions and returns -1, 0, or 1 just as the cmp
753# operator does.
754sub pos_cmp {
755   my ( $self, $a, $b ) = @_;
756   return $self->pos_to_string($a) cmp $self->pos_to_string($b);
757}
758
759# Sub: short_host
760#   Simplify a hostname as much as possible.  For purposes of replication, a
761#   hostname is really just the combination of hostname and port, since
762#   replication always uses TCP connections (it does not work via sockets).  If
763#   the port is the default 3306, it is omitted.  As a convenience, this sub
764#   accepts either SHOW SLAVE STATUS or a DSN.
765#
766# Parameters:
767#   $dsn - DSN hashref
768#
769# Returns:
770#   Short hostname string
771sub short_host {
772   my ( $self, $dsn ) = @_;
773   my ($host, $port);
774   if ( $dsn->{master_host} ) {
775      $host = $dsn->{master_host};
776      $port = $dsn->{master_port};
777   }
778   else {
779      $host = $dsn->{h};
780      $port = $dsn->{P};
781   }
782   return ($host || '[default]') . ( ($port || 3306) == 3306 ? '' : ":$port" );
783}
784
785# Sub: is_replication_thread
786#   Determine if a processlist item is a replication thread.
787#
788# Parameters:
789#   $query - Hashref of a processlist item
790#   %args  - Arguments
791#
792# Arguments:
793#   type            - Which kind of repl thread to match:
794#                     all, binlog_dump (master), slave_io, or slave_sql
795#                     (default: all)
796#   check_known_ids - Check known replication thread IDs (default: yes)
797#
798# Returns:
799#   True if the proclist item is the given type of replication thread.
800sub is_replication_thread {
801   my ( $self, $query, %args ) = @_;
802   return unless $query;
803
804   my $type = lc($args{type} || 'all');
805   die "Invalid type: $type"
806      unless $type =~ m/^binlog_dump|slave_io|slave_sql|all$/i;
807
808   my $match = 0;
809   if ( $type =~ m/binlog_dump|all/i ) {
810      $match = 1
811         if ($query->{Command} || $query->{command} || '') eq "Binlog Dump";
812   }
813   if ( !$match ) {
814      # On a slave, there are two threads.  Both have user="system user".
815      if ( ($query->{User} || $query->{user} || '') eq "system user" ) {
816         PTDEBUG && _d("Slave replication thread");
817         if ( $type ne 'all' ) {
818            # Match a particular slave thread.
819            my $state = $query->{State} || $query->{state} || '';
820
821            if ( $state =~ m/^init|end$/ ) {
822               # http://code.google.com/p/maatkit/issues/detail?id=1121
823               PTDEBUG && _d("Special state:", $state);
824               $match = 1;
825            }
826            else {
827               # These patterns are abbreviated because if the first few words
828               # match chances are very high it's the full slave thd state.
829               my ($slave_sql) = $state =~ m/
830                  ^(Waiting\sfor\sthe\snext\sevent
831                   |Reading\sevent\sfrom\sthe\srelay\slog
832                   |Has\sread\sall\srelay\slog;\swaiting
833                   |Making\stemp\sfile
834                   |Waiting\sfor\sslave\smutex\son\sexit)/xi;
835
836               # Type is either "slave_sql" or "slave_io".  The second line
837               # implies that if this isn't the sql thread then it must be
838               # the io thread, so match is true if we were supposed to match
839               # the io thread.
840               $match = $type eq 'slave_sql' &&  $slave_sql ? 1
841                      : $type eq 'slave_io'  && !$slave_sql ? 1
842                      :                                       0;
843            }
844         }
845         else {
846            # Type is "all" and it's not a master (binlog_dump) thread,
847            # else we wouldn't have gotten here.  It's either of the 2
848            # slave threads and we don't care which.
849            $match = 1;
850         }
851      }
852      else {
853         PTDEBUG && _d('Not system user');
854      }
855
856      # MySQL loves to trick us.  Sometimes a slave replication thread will
857      # temporarily morph into what looks like a regular user thread when
858      # really it's still the same slave repl thread.  So here we save known
859      # repl thread IDs and check if a non-matching event is actually a
860      # known repl thread ID and if yes then we make it match.
861      if ( !defined $args{check_known_ids} || $args{check_known_ids} ) {
862         my $id = $query->{Id} || $query->{id};
863         if ( $match ) {
864            $self->{replication_thread}->{$id} = 1;
865         }
866         else {
867            if ( $self->{replication_thread}->{$id} ) {
868               PTDEBUG && _d("Thread ID is a known replication thread ID");
869               $match = 1;
870            }
871         }
872      }
873   }
874
875   PTDEBUG && _d('Matches', $type, 'replication thread:',
876      ($match ? 'yes' : 'no'), '; match:', $match);
877
878   return $match;
879}
880
881
882# Sub: get_replication_filters
883#   Get any replication filters set on the host.
884#
885# Parameters:
886#   %args - Arguments
887#
888# Required Arguments:
889#   dbh - dbh, master or slave
890#
891# Returns:
892#   Hashref of any replication filters.  If none are set, an empty hashref
893#   is returned.
894sub get_replication_filters {
895   my ( $self, %args ) = @_;
896   my @required_args = qw(dbh);
897   foreach my $arg ( @required_args ) {
898      die "I need a $arg argument" unless $args{$arg};
899   }
900   my ($dbh) = @args{@required_args};
901
902   my %filters = ();
903
904   my $status = $self->get_master_status($dbh);
905   if ( $status ) {
906      map { $filters{$_} = $status->{$_} }
907      grep { defined $status->{$_} && $status->{$_} ne '' }
908      qw(
909         binlog_do_db
910         binlog_ignore_db
911      );
912   }
913
914   $status = $self->get_slave_status($dbh);
915   if ( $status ) {
916      map { $filters{$_} = $status->{$_} }
917      grep { defined $status->{$_} && $status->{$_} ne '' }
918      qw(
919         replicate_do_db
920         replicate_ignore_db
921         replicate_do_table
922         replicate_ignore_table
923         replicate_wild_do_table
924         replicate_wild_ignore_table
925      );
926
927      my $sql = "SHOW VARIABLES LIKE 'slave_skip_errors'";
928      PTDEBUG && _d($dbh, $sql);
929      my $row = $dbh->selectrow_arrayref($sql);
930      # "OFF" in 5.0, "" in 5.1
931      $filters{slave_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
932   }
933
934   return \%filters;
935}
936
937
938# Sub: pos_to_string
939#   Stringify a position in a way that's string-comparable.
940#
941# Parameters:
942#   $pos - Hashref with file and position
943#
944# Returns:
945#   String like "file/posNNNNN"
946sub pos_to_string {
947   my ( $self, $pos ) = @_;
948   my $fmt  = '%s/%020d';
949   return sprintf($fmt, @{$pos}{qw(file position)});
950}
951
952sub reset_known_replication_threads {
953   my ( $self ) = @_;
954   $self->{replication_thread} = {};
955   return;
956}
957
958sub get_cxn_from_dsn_table {
959   my ($self, %args) = @_;
960   my @required_args = qw(dsn_table_dsn make_cxn);
961   foreach my $arg ( @required_args ) {
962      die "I need a $arg argument" unless $args{$arg};
963   }
964   my ($dsn_table_dsn, $make_cxn) = @args{@required_args};
965   PTDEBUG && _d('DSN table DSN:', $dsn_table_dsn);
966
967   my $dp = $self->{DSNParser};
968   my $q  = $self->{Quoter};
969
970   my $dsn = $dp->parse($dsn_table_dsn);
971   my $dsn_table;
972   if ( $dsn->{D} && $dsn->{t} ) {
973      $dsn_table = $q->quote($dsn->{D}, $dsn->{t});
974   }
975   elsif ( $dsn->{t} && $dsn->{t} =~ m/\./ ) {
976      $dsn_table = $q->quote($q->split_unquote($dsn->{t}));
977   }
978   else {
979      die "DSN table DSN does not specify a database (D) "
980        . "or a database-qualified table (t)";
981   }
982
983   my $dsn_tbl_cxn = $make_cxn->(dsn => $dsn);
984   my $dbh         = $dsn_tbl_cxn->connect();
985   my $sql         = "SELECT dsn FROM $dsn_table ORDER BY id";
986   PTDEBUG && _d($sql);
987   my $dsn_strings = $dbh->selectcol_arrayref($sql);
988   my @cxn;
989   if ( $dsn_strings ) {
990      foreach my $dsn_string ( @$dsn_strings ) {
991         PTDEBUG && _d('DSN from DSN table:', $dsn_string);
992         push @cxn, $make_cxn->(dsn_string => $dsn_string);
993      }
994   }
995   return \@cxn;
996}
997
998sub _d {
999   my ($package, undef, $line) = caller 0;
1000   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
1001        map { defined $_ ? $_ : 'undef' }
1002        @_;
1003   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
1004}
1005
10061;
1007}
1008# ###########################################################################
1009# End MasterSlave package
1010# ###########################################################################
1011