1#!/usr/bin/env perl
2# -*-mode:cperl; indent-tabs-mode: nil-*-
3
4## Slony migrator
5##
6## Greg Sabino Mullane <greg@turnstep.com>, Joshua Tolley <josh@endpoint.com>
7## End Point Corporation http://www.endpoint.com/
8## BSD licensed, see complete license at bottom of this script
9## The latest version can be found in the Bucardo distribution at:
10## http://www.bucardo.org/
11##
12## See the HISTORY section for other contributors
13
14package slony_migrator;
15
16use 5.006001;
17use strict;
18use warnings;
19use Getopt::Long qw/GetOptions/;
20Getopt::Long::Configure(qw/no_ignore_case/);
21use File::Basename qw/basename/;
22use File::Temp qw/tempfile tempdir/;
23File::Temp->safe_level( File::Temp::MEDIUM );
24use Cwd;
25use Data::Dumper qw/Dumper/;
26$Data::Dumper::Varname = 'SLONY';
27$Data::Dumper::Indent = 2;
28$Data::Dumper::Useqq = 1;
29
30our $VERSION = '0.0.3';
31
32use vars qw/ %opt $PSQL $res $COM $SQL $db /;
33
34## If psql is not in your path, it is recommended that hardcode it here,
35## as an alternative to the --PSQL option
36$PSQL = '';
37
38our $SLONIK = 'slonik';
39
40## If this is true, $opt{PSQL} is disabled for security reasons
41our $NO_PSQL_OPTION = 1;
42
43## If true, we show how long each query took by default. Requires Time::HiRes to be installed.
44$opt{showtime} = 0;
45
46## Which user to connect as if --dbuser is not given
47$opt{defaultuser} = 'postgres';
48
49## Default time display format, used for last_vacuum and last_analyze
50our $SHOWTIME = 'HH24:MI FMMonth DD, YYYY';
51
52## Nothing below this line should need to be changed for normal usage.
53## If you do find yourself needing to change something,
54## please email the author as it probably indicates something
55## that could be made into a command-line option or moved above.
56
57our $ME = basename($0);
58our $ME2 = 'slony_migrator.pl';
59our $USAGE = qq{\nUsage: $ME <options>\n Try "$ME --help" for a complete list of options\n\n};
60
61## Global error string, mostly used for MRTG error handling
62our $ERROR = '';
63
64## For options that take a time e.g. --critical="10 minutes" Fractions are allowed.
65our $timere = qr{^\s*(\d+(?:\.\d+)?)\s*(\w*)\s*$}i;
66
67$opt{test} = 0;
68$opt{timeout} = 10;
69
70die $USAGE unless
71    GetOptions(
72               \%opt,
73               'version|V',
74               'verbose|v+',
75               'help|h',
76
77               'host|H=s@',
78               'port=s@',
79               'dbname|db=s@',
80               'dbuser|u=s@',
81               'dbpass=s@',
82               'timeout=i',
83
84               'PSQL=s',
85
86               'slonyschema=s',
87               'slonyset=i',
88
89               'slonik',
90               'bucardo',
91               'check',
92               )
93    and keys %opt
94    and ! @ARGV;
95
96our $VERBOSE = $opt{verbose} || 0;
97
98$VERBOSE >= 3 and warn Dumper \%opt;
99
100if ($opt{version}) {
101    print qq{$ME2 version $VERSION\n};
102    exit 0;
103}
104
105if ($opt{help}) {
106    print qq{Usage: $ME2 <options>
107Slony Migrator
108This is version $VERSION.
109
110Main functional options:
111  --bucardo          print commands to migrate this Slony cluster to  Bucardo replication
112  --slonik           print slonik scripts to recreate this Slony cluster
113
114Common connection options:
115 -H,  --host=NAME    hostname(s) to connect to; defaults to none (Unix socket)
116 -p,  --port=NUM     port(s) to connect to; defaults to 5432.
117 -db, --dbname=NAME  database name(s) to connect to; defaults to 'postgres' or 'template1'
118 -u   --dbuser=NAME  database user(s) to connect as; defaults to 'postgres'
119      --dbpass=PASS  database password(s); use a .pgpass file instead when possible
120
121Other options:
122  --PSQL=FILE        location of the psql executable; avoid using if possible
123  -v, --verbose      verbosity level; can be used more than once to increase the level
124  -h, --help         display this help information
125  -t X, --timeout=X  how long in seconds before we timeout. Defaults to 10 seconds.
126  --check            sanity checks the schema (experimental)
127
128For a complete list of options and full documentation, please view the POD for this file.
129Two ways to do this is to run:
130pod2text $ME | less
131pod2man $ME | man -l -
132Or simply visit: https://bucardo.org/
133
134
135};
136    exit 0;
137}
138
139## Die if Time::HiRes is needed but not found
140if ($opt{showtime}) {
141    eval {
142        require Time::HiRes;
143        import Time::HiRes qw/gettimeofday tv_interval sleep/;
144    };
145    if ($@) {
146        die qq{Cannot find Time::HiRes, needed if 'showtime' is true\n};
147    }
148}
149
150## Everything from here on out needs psql, so find and verify a working version:
151if ($NO_PSQL_OPTION) {
152    delete $opt{PSQL};
153}
154
155if (! defined $PSQL or ! length $PSQL) {
156    if (exists $opt{PSQL}) {
157        $PSQL = $opt{PSQL};
158        $PSQL =~ m{^/[\w\d\/]*psql$} or die qq{Invalid psql argument: must be full path to a file named psql\n};
159        -e $PSQL or die qq{Cannot find given psql executable: $PSQL\n};
160    }
161    else {
162        chomp($PSQL = qx{which psql});
163        $PSQL or die qq{Could not find a suitable psql executable\n};
164    }
165}
166-x $PSQL or die qq{The file "$PSQL" does not appear to be executable\n};
167$res = qx{$PSQL --version};
168$res =~ /^psql \(PostgreSQL\) (\d+\.\d+)/ or die qq{Could not determine psql version\n};
169our $psql_version = $1;
170
171$VERBOSE >= 1 and warn qq{psql=$PSQL version=$psql_version\n};
172
173$opt{defaultdb} = $psql_version >= 7.4 ? 'postgres' : 'template1';
174
175## Which schema is slony in?
176my $schema = $opt{slonyschema} || find_slony_schema();
177
178## Now determine the version of Slony we are dealing with
179## Not needed, but a great sanity check
180my ($postgres_version, $slony_version, $slony_node) = find_slony_version($schema);
181
182## Next, we want to slurp a bunch of information from Slony tables
183## Because no matter what we're doing, we're going to need some of it
184## Things to grab:
185## sl_set: Basic set information
186## sl_node: Basic info on each node
187## sl_nodelock: Which nodes are busy
188## sl_path: How to reach each node
189## sl_listen: What's listening where
190## sl_subscribe: Who's subscribed to each set
191my $slonyinfo = get_slony_info($schema);
192sanitycheck() if defined $opt{check};
193if (defined $opt{slonik}) {
194    print_slonik($slonyinfo);
195}
196elsif (defined $opt{bucardo}) {
197    make_bucardo_init($slonyinfo);
198}
199else {
200    printinfo();
201}
202
203exit 0;
204
205sub sanitycheck {
206    print "Beginning sanity check...\n";
207    print " * Checking for triggers...\n";
208    for my $trigname (($schema.'_logtrigger', $schema.'_denyaccess')) {
209        my $SQL = qq{SELECT tab_relname
210                    FROM (
211                        SELECT tab_relname, tgname FROM $schema.sl_table
212                        LEFT JOIN (
213                            SELECT tgrelid, tgname FROM pg_trigger
214                            WHERE tgname ~ '$trigname'
215                        ) f ON ( tab_reloid = tgrelid)) g
216                        WHERE tgname IS NULL};
217        my $res = run_command($SQL);
218        for my $db (@{$res->{db}}) {
219            my $s = $db->{slurp};
220            for my $row (split /\n/ => $s) {
221                print "Table $row is missing the $trigname trigger in database at " . $db->{pname} . "\n";
222            }
223        }
224    }
225
226    my @tables = qw/ sl_path sl_subscribe sl_set sl_node sl_table sl_listen /;
227    print ' * Making sure ' . (join ' ', @tables) . " match between databases...\n";
228    for my $table (@tables) {
229        reduce(
230            sub {
231                print "Difference in $table instances between databases at \"" .
232                    $_[0]{pname} . '" and "' . $_[1]{pname} . "\"\n"
233                    if ( join ("\n", sort( split "\n", $_[0]{slurp})) ne join ("\n", sort( split "\n", $_[1]{slurp})));
234            },
235            @{$slonyinfo->{$table}{db}});
236    }
237    return;
238}
239
240sub reduce {
241    my $code = shift;
242    my $val = shift;
243    for (@_) { $val = $code->($val, $_); }
244    return $val;
245}
246
247sub printinfo {
248    print "Slony version: $slony_version\n";
249    print "psql version: $psql_version\n";
250    print "Postgres version: $postgres_version\n";
251    print "Slony schema: $schema\n";
252    print "Local node: $slony_node\n";
253
254    for my $slony_set (sort { $a <=> $b } keys %{$slonyinfo->{set}}) {
255
256        ## Overall set information
257        my $s = $slonyinfo->{set}{$slony_set};
258        my $comm = $s->{comment} || '';
259        print "SET $slony_set: $comm\n";
260        if ($s->{locked}) {
261            print " This set is locked by txn $s->{locked}\n";
262        }
263
264        ## The master
265        my $showconn = 1;
266        my $origin = $s->{origin};
267        my $master = $slonyinfo->{node}{$origin};
268        printf qq{* Master node: $origin  Active: %s%s  Comment: "%s"\n%s\n},
269            $master->{active} ? 'Yes' : 'No',
270            $master->{active} ? "  PID: $master->{pid}" : '',
271            $master->{comment},
272            $showconn ? "  ($slonyinfo->{path}{$origin}{conninfo})" : '';;
273
274        ## All slaves subscribed to this set
275        for my $sub (keys %{$slonyinfo->{sub}}) {
276            next if $sub != $slony_set;
277            for my $slave (sort { $a <=> $b } keys %{$slonyinfo->{sub}{$sub}}) {
278                $s = $slonyinfo->{sub}{$sub}{$slave};
279                my $p = $slonyinfo->{path}{$slave};
280                my $active = find_slave_status($p->{conninfo}, $slave, $slony_set, $s->{provider});
281                printf qq{  ** Slave node: %2d  Active: %3s  Forward: %3s  Provider: %2d  Comment: "%s"\n    %s\n},
282                    $slave,
283                    $active eq 't' ? 'Yes' : 'No',
284                    $s->{forward} ? 'Yes' : 'No',
285                    $s->{provider},
286                    $slonyinfo->{node}{$slave}{comment},
287                    $showconn ? " ($slonyinfo->{path}{$slave}{conninfo})" : '';
288            }
289        }
290
291    }
292    return;
293} ## End of printinfo
294
295
296sub pretty_size {
297
298    ## Transform number of bytes to a SI display similar to Postgres' format
299
300    my $bytes = shift;
301    my $rounded = shift || 0;
302
303    return "$bytes bytes" if $bytes < 10240;
304
305    my @unit = qw/kB MB GB TB PB EB YB ZB/;
306
307    for my $p (1..@unit) {
308        if ($bytes <= 1024**$p) {
309            $bytes /= (1024**($p-1));
310            return $rounded ?
311                sprintf ('%d %s', $bytes, $unit[$p-2]) :
312                    sprintf ('%.2f %s', $bytes, $unit[$p-2]);
313        }
314    }
315
316    return $bytes;
317
318} ## end of pretty_size
319
320
321sub run_command {
322
323    ## Run a command string against each of our databases using psql
324    ## Optional args in a hashref:
325    ## "failok" - don't report if we failed
326    ## "target" - use this targetlist instead of generating one
327    ## "timeout" - change the timeout from the default of $opt{timeout}
328    ## "regex" - the query must match this or we throw an error
329    ## "emptyok" - it's okay to not match any rows at all
330    ## "version" - alternate versions for different versions
331    ## "dbnumber" - connect with an alternate set of params, e.g. port2 dbname2
332
333    my $string = shift || '';
334    my $arg = shift || {};
335    my $info = { command => $string, db => [], hosts => 0 };
336
337    $VERBOSE >= 3 and warn qq{Starting run_command with "$string"\n};
338
339    my (%host,$passfile,$passfh,$tempdir,$tempfile,$tempfh,$errorfile,$errfh);
340    my $offset = -1;
341
342    ## Build a list of all databases to connect to.
343    ## Number is determined by host, port, and db arguments
344    ## Multi-args are grouped together: host, port, dbuser, dbpass
345    ## Grouped are kept together for first pass
346    ## The final arg in a group is passed on
347    ##
348    ## Examples:
349    ## --host=a,b --port=5433 --db=c
350    ## Connects twice to port 5433, using database c, to hosts a and b
351    ## a-5433-c b-5433-c
352    ##
353    ## --host=a,b --port=5433 --db=c,d
354    ## Connects four times: a-5433-c a-5433-d b-5433-c b-5433-d
355    ##
356    ## --host=a,b --host=foo --port=1234 --port=5433 --db=e,f
357    ## Connects six times: a-1234-e a-1234-f b-1234-e b-1234-f foo-5433-e foo-5433-f
358    ##
359    ## --host=a,b --host=x --port=5432,5433 --dbuser=alice --dbuser=bob -db=baz
360    ## Connects three times: a-5432-alice-baz b-5433-alice-baz x-5433-bob-baz
361
362    ## The final list of targets:
363    my @target;
364
365    ## Default connection options
366    my $conn =
367        {
368         host   => ['<none>'],
369         port   => [5432],
370         dbname => [$opt{defaultdb}],
371         dbuser => [$opt{defaultuser}],
372         dbpass => [''],
373         inputfile => [''],
374         };
375
376    my $gbin = 0;
377  GROUP: {
378        ## This level controls a "group" of targets
379
380        ## If we were passed in a target, use that and move on
381        if (exists $arg->{target}) {
382            push @target, $arg->{target};
383            last GROUP;
384        }
385
386        my %group;
387        my $foundgroup = 0;
388        for my $v (keys %$conn) {
389            my $vname = $v;
390            ## Something new?
391            if ($arg->{dbnumber}) {
392                $v .= "$arg->{dbnumber}";
393            }
394            if (defined $opt{$v}->[$gbin]) {
395                my $new = $opt{$v}->[$gbin];
396                $new =~ s/\s+//g;
397                ## Set this as the new default
398                $conn->{$vname} = [split /,/ => $new];
399                $foundgroup = 1;
400            }
401            $group{$vname} = $conn->{$vname};
402        }
403
404        if (!$foundgroup) { ## Nothing new, so we bail
405            last GROUP;
406        }
407        $gbin++;
408
409        ## Now break the newly created group into individual targets
410        my $tbin = 0;
411      TARGET: {
412            my $foundtarget = 0;
413            ## We know th
414            my %temptarget;
415#            map { $temptarget{$_} = '' } qw/port host dbname dbuser/;
416            for my $g (keys %group) {
417                if (defined $group{$g}->[$tbin]) {
418                    $conn->{$g} = [$group{$g}->[$tbin]];
419                    $foundtarget = 1;
420                }
421                $temptarget{$g} = $conn->{$g}[0] || '';
422            }
423
424            ## Leave if nothing new
425            last TARGET if ! $foundtarget;
426
427            ## Add to our master list
428            push @target, \%temptarget;
429
430            $tbin++;
431            redo;
432        } ## end TARGET
433
434        redo;
435    } ## end GROUP
436
437    if (! @target) {
438        die qq{No target databases found\n};
439    }
440
441    ## Create a temp file to store our results
442    $tempdir = tempdir(CLEANUP => 1);
443    ($tempfh,$tempfile) = tempfile('slony_bucardo_migrator.XXXXXXX', SUFFIX => '.tmp', DIR => $tempdir);
444
445    ## Create another one to catch any errors
446    ($errfh,$errorfile) = tempfile('slony_bucardo_migrator.XXXXXXX', SUFFIX => '.tmp', DIR => $tempdir);
447
448    for $db (@target) {
449
450        ## Just to keep things clean:
451        truncate $tempfh, 0;
452        truncate $errfh, 0;
453
454        ## Store this target in the global target list
455        push @{$info->{db}}, $db;
456
457        $db->{pname} = "port=$db->{port} host=$db->{host} db=$db->{dbname} user=$db->{dbuser}";
458        my @args = ('-q', '-U', "$db->{dbuser}", '-d', $db->{dbname}, '-t');
459        if ($db->{host} ne '<none>') {
460            push @args => '-h', $db->{host};
461            $host{$db->{host}}++; ## For the overall count
462        }
463        push @args => '-p', $db->{port};
464
465        if (defined $db->{dbpass} and length $db->{dbpass}) {
466            ## Make a custom PGPASSFILE. Far better to simply use your own .pgpass of course
467            ($passfh,$passfile) = tempfile('nagios.XXXXXXXX', SUFFIX => '.tmp', DIR => $tempdir);
468            $VERBOSE >= 3 and warn "Created temporary pgpass file $passfile\n";
469            $ENV{PGPASSFILE} = $passfile;
470            printf $passfh "%s:%s:%s:%s:%s\n",
471                $db->{host} eq '<none>' ? '*' : $db->{host},
472                $db->{port},   $db->{dbname},
473                $db->{dbuser}, $db->{dbpass};
474            close $passfh or die qq{Could not close $passfile: $!\n};
475        }
476
477
478        push @args, '-o', $tempfile;
479
480        ## If we've got different SQL, use this first run to simply grab the version
481        ## Then we'll use that info to pick the real query
482        if ($arg->{version}) {
483            $arg->{oldstring} = $string;
484            $string = 'SELECT version()';
485        }
486
487        if (defined $db->{inputfile} and length $db->{inputfile}) {
488            push @args, '-f', $db->{inputfile};
489        } else {
490            push @args, '-c', $string;
491        }
492
493        $VERBOSE >= 3 and warn Dumper \@args;
494
495        local $SIG{ALRM} = sub { die 'Timed out' };
496        my $timeout = $arg->{timeout} || $opt{timeout};
497        alarm 0;
498
499        my $start = $opt{showtime} ? [gettimeofday()] : 0;
500        eval {
501            alarm $timeout;
502#            print "$PSQL " . (join ' ', @args);
503            $res = system $PSQL => @args;
504        };
505        my $err = $@;
506        alarm 0;
507        if ($err) {
508            if ($err =~ /Timed out/) {
509                die qq{Command: "$string" timed out! Consider boosting --timeout higher than $timeout\n};
510            }
511            else {
512                die q{Unknown error inside of the "run_command" function};
513            }
514        }
515
516        $db->{totaltime} = sprintf '%.2f', $opt{showtime} ? tv_interval($start) : 0;
517
518        if ($res) {
519            $res >>= 8;
520            $db->{fail} = $res;
521            $VERBOSE >= 3 and !$arg->{failok} and warn qq{System call failed with a $res\n};
522            seek $errfh, 0, 0;
523            {
524                local $/;
525                $db->{error} = <$errfh> || '';
526                $db->{error} =~ s/\s*$//;
527                $db->{error} =~ s/^psql: //;
528                $ERROR = $db->{error};
529            }
530            if (!$db->{ok} and !$arg->{failok}) {
531                die "Query failed: $string\n";
532            }
533        }
534        else {
535            seek $tempfh, 0, 0;
536            {
537                local $/;
538                $db->{slurp} = <$tempfh>;
539            }
540            $db->{ok} = 1;
541
542            ## Allow an empty query (no matching rows) if requested
543            if ($arg->{emptyok} and $db->{slurp} =~ /^\s*$/o) {
544            }
545            ## If we were provided with a regex, check and bail if it fails
546            elsif ($arg->{regex}) {
547                if ($db->{slurp} !~ $arg->{regex}) {
548                    die "Regex failed for query: $string\n";
549                }
550            }
551
552        }
553
554        ## If we are running different queries based on the version,
555        ## find the version we are using, replace the string as needed,
556        ## then re-run the command to this connection.
557        if ($arg->{version}) {
558            if ($db->{error}) {
559                die $db->{error};
560            }
561            if ($db->{slurp} !~ /PostgreSQL (\d+\.\d+)/) {
562                die qq{Could not determine version of Postgres!\n};
563            }
564            $db->{version} = $1;
565            $string = $arg->{version}{$db->{version}} || $arg->{oldstring};
566            delete $arg->{version};
567            redo;
568        }
569    } ## end each database
570
571#    close $errfh or die qq{Could not close $errorfile: $!\n};
572#    close $tempfh or die qq{Could not close $tempfile: $!\n};
573
574    $info->{hosts} = keys %host;
575
576    $VERBOSE >= 3 and warn Dumper $info;
577
578    return $info;
579
580
581} ## end of run_command
582
583
584sub size_in_bytes { ## no critic (RequireArgUnpacking)
585
586    ## Given a number and a unit, return the number of bytes.
587
588    my ($val,$unit) = ($_[0],lc substr($_[1]||'s',0,1));
589    return $val * ($unit eq 's' ? 1 : $unit eq 'k' ? 1024 : $unit eq 'm' ? 1024**2 :
590                   $unit eq 'g' ? 1024**3 : $unit eq 't' ? 1024**4 :
591                   $unit eq 'p' ? 1024**5 : $unit eq 'e' ? 1024**6 :
592                   $unit eq 'z' ? 1024**7 : 1024**8);
593
594} ## end of size_in_bytes
595
596
597sub size_in_seconds {
598
599    my ($string,$type) = @_;
600
601    return '' if ! length $string;
602    if ($string !~ $timere) {
603        my $l = substr($type,0,1);
604        die qq{Value for '$type' must be a valid time. Examples: -$l 1s  -$l "10 minutes"\n};
605    }
606    my ($val,$unit) = ($1,lc substr($2||'s',0,1));
607    my $tempval = sprintf '%.9f', $val * ($unit eq 's' ? 1 : $unit eq 'm' ? 60 : $unit eq 'h' ? 3600 : 86600);
608    $tempval =~ s/0+$//;
609    $tempval = int $tempval if $tempval =~ /\.$/;
610    return $tempval;
611
612} ## end of size_in_seconds
613
614
615sub get_slony_info {
616
617    ## Extract some information from the Slony sl_ tables
618    ## Returns a hashref
619
620    my $schema = shift;
621    my (%info, $info, $s);
622
623    ## sl_node
624    $SQL = qq{SELECT no_id, no_active, no_comment FROM $schema.sl_node};
625    #$SQL = qq{SELECT no_id, no_active, no_spool, no_comment FROM $schema.sl_node};
626    $info = run_command($SQL);
627    $s = $info->{db}[0]{slurp};
628    for my $row (split /\n/ => $s) {
629        my @i = split /\s*\|\s*/ => $row;
630        my $id = int $i[0];
631        $info{node}{$id}{active} = $i[1] eq 't' ? 1 : 0;
632#        $info{node}{$id}{spool} = $i[2] eq 't' ? 1 : 0;
633        #$info{node}{$id}{comment} = $i[3];
634        $info{node}{$id}{comment} = $i[2];
635    }
636    $info{sl_node} = $info;
637
638    ## sl_nodelock
639    $SQL = qq{SELECT nl_nodeid, nl_conncnt, nl_backendpid FROM $schema.sl_nodelock};
640    $info = run_command($SQL);
641    $s = $info->{db}[0]{slurp};
642    for my $row (split /\n/ => $s) {
643        my @i = split /\s*\|\s*/ => $row;
644        my $id = int $i[0];
645        $info{node}{$id}{connectnumber} = $i[1];
646        $info{node}{$id}{pid} = int $i[2];
647    }
648    $info{sl_nodelock} = $info;
649
650    ## sl_set
651    $SQL = qq{SELECT set_id, set_origin, set_locked, set_comment FROM $schema.sl_set};
652    $info = run_command($SQL);
653    $s = $info->{db}[0]{slurp};
654    for my $row (split /\n/ => $s) {
655        my @i = split /\s*\|\s*/ => $row;
656        my $id = int $i[0];
657        $info{set}{$id}{origin} = $i[1];
658        $info{set}{$id}{locked} = $i[2];
659        $info{set}{$id}{comment} = $i[3];
660    }
661    $info{sl_set} = $info;
662
663    ## sl_subscribe
664    $SQL = qq{SELECT sub_set, sub_provider, sub_receiver, sub_forward, sub_active FROM $schema.sl_subscribe};
665    $info = run_command($SQL);
666    $s = $info->{db}[0]{slurp};
667    for my $row (split /\n/ => $s) {
668        my @i = split /\s*\|\s*/ => $row;
669        my $id = int $i[0];
670        $info{sub}{$id}{$i[2]}{provider} = $i[1];
671        $info{sub}{$id}{$i[2]}{forward}  = $i[3] ? 1 : 0;
672        $info{sub}{$id}{$i[2]}{active}   = $i[4] ? 1 : 0;
673    }
674    $info{sl_subscribe} = $info;
675
676    ## sl_path
677    $SQL = qq{SELECT pa_server, pa_client, pa_connretry, pa_conninfo FROM $schema.sl_path};
678    $info = run_command($SQL);
679    $s = $info->{db}[0]{slurp};
680    for my $row (split /\n/ => $s) {
681        my @i = split /\s*\|\s*/ => $row;
682        my $id = int $i[0];
683        $info{path}{$id}{client} = $i[1];
684        $info{path}{$id}{delay} = $i[2];
685        $info{path}{$id}{conninfo} = $i[3];
686    }
687    $info{sl_path} = $info;
688
689
690    ## sl_listen
691    $SQL = qq{SELECT li_origin, li_provider, li_receiver FROM $schema.sl_listen};
692    $info = run_command($SQL);
693    $s = $info->{db}[0]{slurp};
694    for my $row (split /\n/ => $s) {
695        my @i = split /\s*\|\s*/ => $row;
696        my $id = int $i[0];
697        $info{listen}{$id}{provider} = $i[1];
698        $info{listen}{$id}{receiver} = $i[2];
699    }
700    $info{sl_listen} = $info;
701
702
703    ## sl_table
704    $SQL = qq{SELECT tab_id, tab_nspname || '.' || tab_relname, tab_set, tab_idxname, tab_comment, set_origin FROM $schema.sl_table JOIN $schema.sl_set ON (set_id = tab_set) ORDER BY tab_set, tab_id};
705    $info = run_command($SQL);
706    $s = $info->{db}[0]{slurp};
707    for my $row (split /\n/ => $s) {
708        my @i = split /\s*\|\s*/ => $row;
709        my $id                     = int $i[0];
710        $info{table}{$id}{FQN}     = $i[1];
711        $info{table}{$id}{set}     = int $i[2];
712        $info{table}{$id}{key}     = $i[3];
713        $info{table}{$id}{comment} = $i[4];
714        $info{table}{$id}{origin}  = int $i[5];
715    }
716    $info{sl_table} = $info;
717
718    ## sl_sequence
719    $SQL = qq{SELECT seq_id, seq_nspname || '.' || seq_relname, seq_set, seq_comment, set_origin FROM $schema.sl_sequence JOIN $schema.sl_set ON (set_id = seq_set) ORDER BY seq_set, seq_id};
720    $info = run_command($SQL);
721    $s = $info->{db}[0]{slurp};
722    for my $row (split /\n/ => $s) {
723        my @i = split /\s*\|\s*/ => $row;
724        my $id                        = int $i[0];
725        $info{sequence}{$id}{FQN}     = $i[1];
726        $info{sequence}{$id}{set}     = int $i[2];
727        $info{sequence}{$id}{comment} = $i[3];
728        $info{sequence}{$id}{origin}  = int $i[4];
729    }
730    $info{sl_sequence} = $info;
731
732    return \%info;
733
734} ## end of get_slony_info
735
736
737sub find_slony_schema {
738
739    ## Attempt to figure out the name of the Slony schema
740    ## Returns the name of the schema, quoted if needed
741    ## Dies if none found, or more than one found
742
743    $SQL = q{SELECT quote_ident(nspname) FROM pg_namespace WHERE oid IN}.
744        q{(SELECT pronamespace FROM pg_proc WHERE proname = 'slonyversion')};
745
746    my $info = run_command($SQL);
747
748    my $schema = '';
749    if (defined $info->{db}[0] and exists $info->{db}[0]{slurp}) {
750        (my @names) = map { s/\s//g; $_ } grep { /\S/ } split /\s*\|\s*/ => $info->{db}[0]{slurp};
751        if (@names) {
752            my $num = @names;
753            if ($num > 1) {
754                ## Or should we simply show them all?
755                my $list = join ',' => map { qq{"$_"} } @names;
756                die "Please specify a slony scheme. We found: $list\n";
757            }
758            $schema = $names[0];
759        }
760    }
761    if (! length $schema) {
762        die "Could not find a slony schema, please specify one using the --slonyschema option\n";
763    }
764
765    return $schema;
766
767} ## end of find_slony_schema
768
769
770sub find_slony_version {
771
772    ## Returns the version of Slony via the slonyversion() function
773
774    my $schema = shift; ## make global?
775
776    my $safeschema = $schema;
777    $safeschema =~ s/'/''/g;
778
779    $SQL = qq{SELECT version(), $schema.slonyversion(), $schema.getlocalnodeid('$safeschema')};
780
781    my $info = run_command($SQL, { regex => qr{([\d\.]+)} });
782
783    my ($pg_version, $sl_version, $sl_node) = (0,0,0);
784    if (defined $info->{db}[0] and exists $info->{db}[0]{slurp}) {
785        if ($info->{db}[0]{slurp} =~ /PostgreSQL (\S+).*\| ([\d\.]+)\s*\|\s*(\d+)/) {
786            ($pg_version, $sl_version, $sl_node) = ($1,$2,$3);
787        }
788    }
789
790    ## Usually due to an incorrect schema
791    $sl_version or die "Could not determine the version of Slony\n";
792    $sl_node or die "Could not determine the local Slony node\n";
793    $pg_version or die "Could not determine the version of Postgres\n";
794
795    return $pg_version, $sl_version, $sl_node;
796
797} ## end of find_slony_version
798
799
800sub find_slave_status {
801
802    my ($conninfo, $slave, $slony_set, $provider) = @_;
803    my ($info, %info);
804
805    # Create a new target for $PSQL query because
806    # sl_subscribe.sub_active is only meaningful on the slave
807
808    # parse out connection information from $conninfo
809    my %target = ();
810    # Figure out a way to fail gracefully if the port selection doesn't work
811    $target{port}   = $conninfo =~ /port=(\d+)/   ? $1 : ($opt{port}[0] || 5432);
812    $target{host}   = $conninfo =~ /host=(\S+)/   ? $1 : die 'No host found?';
813    $target{dbname} = $conninfo =~ /dbname=(\S+)/ ? $1 : die 'No dbname found?';
814    $target{dbuser} = $conninfo =~ /user=(\S+)/   ? $1 : die 'No dbuser found?';
815
816    eval {
817        my $SQL = qq{SELECT sub_active FROM $schema.sl_subscribe WHERE sub_receiver = $slave }.
818            qq{AND sub_provider = $provider AND sub_set = $slony_set};
819        $info = run_command($SQL, { target => \%target });
820    };
821    if ($@) {
822        print "Failed\n";
823    }
824    my $status = '';
825    if (defined $info->{db}[0] and exists $info->{db}[0]{slurp}) {
826        my (@statuses) = map { s/\s//g; $_ } grep { /\S/ } split /\s*\|\s*/ => $info->{db}[0]{slurp};
827        if (@statuses) {
828            my $num = @statuses;
829            if ($num > 1) {
830                die "Oops, found more than one subscription on set $slony_set to provider $provider from node $slave\n";
831            }
832            $status = $statuses[0];
833        }
834    }
835    if (!length $status) {
836        die qq{Could not figure out status of slave $slave};
837    }
838
839    return $status;
840
841} ## end of find_slave_status
842
843sub get_slony_set {
844
845    if (defined $opt{slonyset}) {
846        return $opt{slonyset};
847    }
848
849    my $slony_set;
850    my @sets = keys %{$slonyinfo->{set}};
851    if (@sets) {
852        my $num = @sets;
853        if ($num > 1) {
854            my $list = join ', ' => @sets;
855            die "Please specify a set with the --slonyset option. We found $list\n";
856        }
857        $slony_set = $sets[0];
858    }
859
860    return $slony_set;
861
862} ## end of get_slony_set
863
864
865#
866# Slonyinfo helpers
867#
868
869sub get_conninfo {
870
871    my ($node) = @_;
872    unless (defined $slonyinfo->{path}{$node} and exists $slonyinfo->{path}{$node}{conninfo}) {
873        die "ERROR: Unable to find node $node. Are you sure that node exists?\n";
874    }
875
876    return ($slonyinfo->{path}{$node}{conninfo});
877}
878
879sub get_master {
880
881    my $slony_set = get_slony_set();
882    my $s = $slonyinfo->{set}{$slony_set}; ## or die
883    my $master = $s->{origin};
884
885    return $master;
886}
887
888# returns a string suitable for passing to slonik
889sub create_store_paths {
890
891    my ($new_node, $new_conninfo) = @_;
892    my $paths;
893    # for each node in the slony network, create a store path to a new_node node
894    # store path ( server = ? , client = ? , conninfo = $conninfo ' );
895    foreach my $old_node (sort keys %{$slonyinfo->{node}}) {
896        my $old_conninfo = get_conninfo($old_node);
897        $paths .= qq{store path ( server=$old_node, client=$new_node, conninfo='$old_conninfo' );\n};
898        $paths .= qq{store path ( server=$new_node, client=$old_node, conninfo='$new_conninfo' );\n};
899    }
900
901    return $paths;
902}
903
904# generates all admin paths for all nodes
905# returns a string suitable for passing to slonik
906sub create_admin_paths {
907
908    # can indicate a node to skip
909    my ($skip_node) = @_;
910    my $connections;
911    # for each node in the slony network, create a store path to a new_node node
912    # store path ( server = ? , client = ? , conninfo = $conninfo ' );
913    foreach my $node (keys %{$slonyinfo->{node}}) {
914        next if (defined $skip_node and $node == $skip_node);
915        my $conninfo = get_conninfo($node);
916        $connections .= qq{node $node admin conninfo='$conninfo';\n}
917    }
918
919    return $connections;
920}
921
922#
923# Utility functions
924#
925
926sub prompt_user {
927    my ($prompt_string, $default) = @_;
928    if ($default) {
929        print $prompt_string, '[', $default, ']: ';
930    } else {
931        print $prompt_string, ': ';
932    }
933
934    $| = 1;
935    $_ = <STDIN>;
936
937    chomp;
938    if ("$default") {
939        return $_ ? $_ : $default # return $_ if it has a value
940    } else {
941        return $_;
942    }
943}
944
945sub make_bucardo_init {
946    my $info = shift;
947    my (@dbs, @herds, @syncs, @tables, @sequences);
948    my $cluster_name = $schema;
949    $cluster_name =~ s/^_//;
950
951    PATHS:
952    for my $p (keys %{$info->{path}}) {
953        my ($name, $conninfo) = ($cluster_name.'_'.$p, $info->{path}{$p}{conninfo});
954        if ($conninfo eq '<event pending>') {
955            warn "Couldn't get connection info for database $name.";
956            next PATHS;
957        }
958        my @connopts = split /\s+/, $conninfo;
959        my ($dbname, $conn) = ('', '');
960        for my $opt (@connopts) {
961            my ($key, $value) = split /=/, $opt;
962            my $match;
963            if ($key eq 'dbname') { $dbname = $value; }
964            else {
965                for my $a (qw/host port user pass/) {
966                    if ($key eq $a) {
967                        $match = 1;
968                        $conn .= " $a=$value";
969                    }
970                }
971                $conn .= " $key=$value" unless defined $match;
972            }
973        }
974        $dbs[$p] = {
975            name => $name,
976            conninfo => $conninfo,
977        };
978        print "./bucardo add db $name dbname=$dbname $conn\n";
979    }
980
981    for my $set (@{ get_ordered_subscribes($info->{sub}, $info->{set}, $info->{node}) }) {
982        traverse_set($set, sub {
983            my $node = shift;
984            my $set_num = $set->{set_num};
985            my $db = $cluster_name . '_' . $node->{num};
986            my $herd = $cluster_name . '_node' . $node->{num} . '_set' . $set_num;
987            if (exists $node->{children} and $#{$node->{children}} > -1) {
988                map {
989                    my $name = $info->{table}{$_}{FQN};
990                    if ($info->{table}{$_}{set} == $set_num) {
991                        print "./bucardo add table $name db=$db autokick=true conflict_strategy=source herd=$herd\n";
992                    }
993                } keys %{$info->{table}};
994                map {
995                    my $name = $info->{sequence}{$_}{FQN};
996                    if ($info->{sequence}{$_}{set} == $set_num) {
997                        print "./bucardo add sequence $name db=$db autokick=true conflict_strategy=source herd=$herd\n";
998                    }
999                } keys %{$info->{sequence}};
1000                for my $child (@{$node->{children}}) {
1001                    my $targetdbname = $cluster_name . '_' . $child;
1002                    my $syncname = $cluster_name . '_set' . $set_num . '_node' . $node->{num} . '_to_node' . $child;
1003                    my $childnode = $set->{$child};
1004                    print "./bucardo add sync $syncname source=$herd targetdb=$targetdbname type=pushdelta";
1005                    print " target_makedelta=on"
1006                        if (exists $childnode->{children} and $#{$childnode->{children}} > -1);
1007                    print "\n";
1008                }
1009            }
1010        }, { include_origin => 1 });
1011    }
1012    return;
1013}
1014
1015sub print_slonik {
1016    my $info = shift;
1017    my $cluster = $schema;
1018
1019    $cluster =~ s/^_//;
1020    print "CLUSTER NAME = $cluster;\n";
1021    my $master_id;
1022    for my $p (keys %{$info->{path}}) {
1023        not $master_id and $master_id = $p;
1024        print "NODE $p ADMIN CONNINFO = '" . $info->{path}{$p}{conninfo} ."';\n";
1025    }
1026
1027    # Set up nodes
1028    print "INIT CLUSTER (ID = $master_id, COMMENT = '" . $info->{node}{$master_id}{comment} . "');\n";
1029    for my $p (keys %{$info->{node}}) {
1030        next if $p eq $master_id;
1031        # TODO Make sure EVENT NODE is right, here
1032        print "STORE NODE (ID = $p, EVENT NODE = $master_id, COMMENT = '" . $info->{node}{$p}{comment} ."');\n";
1033    }
1034
1035    # Set up paths
1036    for my $p (sort keys %{$info->{path}}) {
1037        print "STORE PATH (SERVER = $p, CLIENT = " . $info->{path}{$p}{client} .
1038                           ', CONNINFO = \''  . $info->{path}{$p}{conninfo} .
1039                           '\', CONNRETRY = ' . $info->{path}{$p}{delay} . ");\n";
1040    }
1041
1042    print "ECHO 'Please start up replication nodes here';\n";
1043
1044    for my $p (sort keys %{$info->{set}}) {
1045        print "TRY {
1046    CREATE SET (ID = $p, ORIGIN = " . $info->{set}{$p}{origin} .
1047    ', COMMENT = \'' . $info->{set}{$p}{comment} . "');
1048} ON ERROR {
1049    EXIT -1;
1050}\n";
1051    }
1052
1053    for my $p (keys %{$info->{table}}) {
1054        print "SET ADD TABLE (ID = $p, ORIGIN = " . $info->{table}{$p}{origin}
1055            . ', SET ID = ' . $info->{table}{$p}{set}
1056            . ', FULLY QUALIFIED NAME = \'' . $info->{table}{$p}{FQN}
1057            . '\', KEY = \'' . $info->{table}{$p}{key}
1058            . '\', COMMENT = \'' . $info->{table}{$p}{comment} . "');\n";
1059    }
1060
1061    for my $p (keys %{$info->{sequence}}) {
1062        print "SET ADD SEQUENCE (ID = $p, ORIGIN = " . $info->{sequence}{$p}{origin}
1063            . ', SET ID = ' . $info->{sequence}{$p}{set}
1064            . ', FULLY QUALIFIED NAME = \'' . $info->{sequence}{$p}{FQN}
1065            . '\', COMMENT = \'' . $info->{sequence}{$p}{comment} . "');\n";
1066    }
1067
1068    my $p = 0;
1069    for my $set (@{ get_ordered_subscribes($info->{sub}, $info->{set}, $info->{node}) }) {
1070        traverse_set($set, sub {
1071            my $node = shift;
1072            print "SUBSCRIBE SET (ID = $set->{set_num}, PROVIDER = $node->{parent}, RECEIVER = $node->{num}, "
1073                    . "FORWARD = " . ($node->{forward} ? 'YES' : 'NO') . ");\n";
1074        }, {} );
1075    }
1076    return;
1077}
1078
1079sub process_child {
1080    my ($set, $node, $callback) = @_;
1081    $callback->($node);
1082    map { process_child($set, $set->{$_}, $callback) } @{$node->{children}};
1083    return;
1084}
1085
1086sub traverse_set {
1087    my ($set, $callback, $args) = @_;
1088    $callback->($set->{origin}) if (exists ($args->{include_origin}) and $args->{include_origin});
1089    map { process_child($set, $set->{$_}, $callback) if (exists $set->{$_}) } @{$set->{origin}{children}};
1090    return;
1091}
1092
1093sub get_ordered_subscribes {
1094    my ($subs, $sets, $nodes) = @_;
1095    # Bucardo needs to know each set; slonik just needs to know a valid subscribe order
1096    my @results;
1097    #map { push @subs, $subs->{$_}; } keys %{ $subs };
1098
1099    for my $set_num (keys %$subs) {
1100        my $origin = { num => $sets->{$set_num}{origin}, };
1101        my $set = { set_num => $set_num, origin => $origin, $origin->{num} => $origin };
1102        for my $sub (keys %{$subs->{$set_num}}) {
1103            my $node;
1104            my ($prov, $recv) = ($subs->{$set_num}{$sub}{provider}, $sub);
1105            if (! exists ($set->{$recv})) {
1106                $node = { num => $recv, forward => $subs->{$set_num}{$sub}{forward}, };
1107                $set->{$recv} = $node;
1108            }
1109            else {
1110                $node = $set->{$recv};
1111            }
1112            $node->{parent} = $prov;
1113            if (! exists ($set->{$prov})) {
1114                my $newnode = { num => $prov, forward => $subs->{$set_num}{$sub}{forward}, };
1115                $set->{$prov} = $newnode;
1116            }
1117            push @{$set->{$prov}->{children}}, $recv;
1118        }
1119        push @results, $set;
1120    }
1121    return \@results;
1122}
1123
1124=pod
1125
1126=head1 NAME
1127
1128B<slony_migrator.pl> - Slony-to-Bucardo migration tool
1129
1130=head1 SYNOPSIS
1131
1132Provides information about a running Slony cluster, including a summary
1133description (default), Slonik scripts (the --slonik option), and
1134Slony-to-Bucardo migration scripts (the --bucardo option).
1135
1136=head1 DESCRIPTION
1137
1138Connects to a running Slony cluster and provides one of the following: A
1139summary of the sets and nodes involved in the cluster, a slonik script to
1140rebuild the cluster from scratch, or bucardo commands to build the same
1141cluster based on Bucardo. This last will allow migration from Slony to Bucardo.
1142
1143=head1 OPTIONS FOR PRINCIPLE FUNCTIONS
1144
1145=over 4
1146
1147=item B<--bucardo>
1148
1149Returns a list of bucardo commands which will allow migration of a Slony
1150cluster off of Slony and on to Bucardo. After installing Bucardo with
1151I<bucardo install>, these scripts will tell Bucardo about all the tables
1152and sequences in the Slony sets, each node in the Slony cluster, and configure
1153Bucardo to replicate those objects in the same way Slony does. This includes
1154the use of cascaded replication.
1155
1156=item B<--slonik>
1157
1158Returns a Slonik script which will recreate the Slony cluster from scratch.
1159
1160=back
1161
1162=head1 DATABASE CONNECTION OPTIONS
1163
1164=over 4
1165
1166=item B<-H NAME> or B<--host=NAME>
1167
1168Connect to the host indicated by NAME.
1169
1170=item B<-p PORT> or B<--port=PORT>
1171
1172Connects using the specified PORT number.
1173
1174=item B<-db NAME> or B<--dbname=NAME>
1175
1176Specifies which database to connect to. If no dbname option is provided,
1177defaults to 'postgres' if psql is version 8 or greater, and 'template1'
1178otherwise.
1179
1180=item B<-u USERNAME> or B<--dbuser=USERNAME>
1181
1182The name of the database user to connect as. If this is not provided, the
1183default is 'postgres'.
1184
1185=item B<--dbpass=PASSWORD>
1186
1187Provides the password to connect to the database with. Use of this option is highly discouraged.
1188Instead, one should use a .pgpass file.
1189
1190=back
1191
1192=head1 OTHER OPTIONS
1193
1194Other options include:
1195
1196=over 4
1197
1198=item B<-t VAL> or B<--timeout=VAL>
1199
1200Sets the timeout in seconds after which the script will abort whatever it is doing
1201and return an UNKNOWN status. The timeout is per Postgres cluster, not for the entire
1202script. The default value is 10; the units are always in seconds.
1203
1204=item B<-h> or B<--help>
1205
1206Displays a help screen with a summary of all actions and options.
1207
1208=item B<-V> or B<--version>
1209
1210Shows the current version.
1211
1212=item B<-v> or B<--verbose>
1213
1214Set the verbosity level. Can call more than once to boost the level. Setting it to three
1215or higher (in other words, issuing C<-v -v -v>) turns on debugging information for this
1216program which is sent to stderr.
1217
1218=item B<--PSQL=PATH>
1219
1220Tells the script where to find the psql program. Useful if you have more than
1221one version of the psql executable on your system, or if there is no psql program
1222in your path. Note that this option is in all uppercase. By default, this option
1223is I<not allowed>. To enable it, you must change the C<$NO_PSQL_OPTION> near the
1224top of the script to 0. Avoid using this option if you can, and instead hard-code
1225your psql location into the C<$PSQL> variable, also near the top of the script.
1226
1227=back
1228
1229=head1 DEPENDENCIES
1230
1231Access to a working version of psql, and Perl v5.6.1 or later. Also the
1232Time::HiRes Perl module if C<$opt{showtime}> is set to true, which is the
1233default.
1234
1235=head1 DEVELOPMENT
1236
1237Development happens using the git system. You can clone the latest version by doing:
1238
1239 git clone https://bucardo.org/bucardo.git/
1240
1241=head1 HISTORY
1242
1243=over 4
1244
1245=item B<Version 0.0.3>, first release
1246
1247=back
1248
1249=head1 BUGS AND LIMITATIONS
1250
1251Slony paths aren't all captured, so --slonik output might need some tweaking to
1252work correctly
1253
1254Please report any problems to josh@endpoint.com.
1255
1256=head1 AUTHORS
1257
1258 Greg Sabino Mullane <greg@turnstep.com>
1259 Selena Decklemann <selena@endpoint.com>
1260 Joshua Tolley <josh@endpoint.com>
1261
1262=head1 LICENSE AND COPYRIGHT
1263
1264Copyright (c) 2007-2009 Greg Sabino Mullane <greg@turnstep.com>.
1265
1266Redistribution and use in source and binary forms, with or without
1267modification, are permitted provided that the following conditions are met:
1268
1269  1. Redistributions of source code must retain the above copyright notice,
1270     this list of conditions and the following disclaimer.
1271  2. Redistributions in binary form must reproduce the above copyright notice,
1272     this list of conditions and the following disclaimer in the documentation
1273     and/or other materials provided with the distribution.
1274
1275THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR IMPLIED
1276WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
1277MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
1278EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
1279EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
1280OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
1281INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
1282CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
1283IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
1284OF SUCH DAMAGE.
1285
1286=cut
1287