1#!/usr/bin/env perl
2# -*-mode:cperl; indent-tabs-mode: nil-*-
3
4package BucardoTesting;
5
6## Helper module for the Bucardo tests
7## Contains shared code for setup and breakdown
8
9use strict;
10use warnings;
11use utf8;
12
13use Encode qw/ decode /;
14use Encode::Locale;
15use DBI;
16use DBD::Pg;
17use Time::HiRes qw/sleep gettimeofday tv_interval/;
18use Cwd;
19use Data::Dumper;
20use Symbol;
21require Test::More;
22
23use vars qw/$SQL $sth $count $COM %dbh/;
24
25my $DEBUG = $ENV{BUCARDO_DEBUG} || 0;
26
27$ENV{BUCARDO_CONFIRM} = 0 if exists $ENV{BUCARDO_CONFIRM};
28
29use base 'Exporter';
30our @EXPORT = qw/%tabletype %tabletypemysql %tabletypemariadb %tabletypeoracle %tabletypesqlite %tabletypefirebird
31                 %sequences %val
32                 compare_tables bc_deeply clear_notices wait_for_notice
33                 $location $oldherd_msg $newherd_msg $addtable_msg $deltable_msg $nomatch_msg/;
34
35## Special global vars for munging the data
36my (%gsth, %gdbh);
37
38my $dbname = 'bucardo_test';
39
40## We need to use the local Bucardo.pm, not a system installed one!
41$ENV{PERL5LIB} = '.';
42
43## Shortcuts for ease of changes and smaller text:
44our $addtable_msg = 'Added the following tables or sequences';
45our $deltable_msg = 'Removed the following tables';
46our $nomatch_msg = 'Did not find matches for the following terms';
47our $oldherd_msg = 'The following tables or sequences are now part of the relgroup';
48our $newherd_msg = 'The following tables or sequences are now part of the relgroup';
49
50our $location = 'setup';
51my $testmsg  = ' ?';
52my $testline = '?';
53## Sometimes, we want to stop as soon as we see an error
54my $bail_on_error = $ENV{BUCARDO_TESTBAIL} || 0;
55my $total_errors = 0;
56## Used by the tt sub
57my %timing;
58
59## If true, turns off the epoch "time" output at the end of each testing output line
60my $notime = 1;
61
62my $user = qx{whoami};
63chomp $user;
64
65my $FRESHLOG = 1;
66if ($FRESHLOG) {
67    unlink 'tmp/bucardo.log';
68}
69
70my $piddir = 'pid';
71if (! -e $piddir) {
72    mkdir $piddir;
73}
74
75if ($ENV{BUCARDO_LOG_ERROR_CONTEXT}) {
76    no strict 'refs';
77    no warnings qw/prototype redefine/;
78    my ($package) = caller();
79
80    # wrap these routines
81    for my $subname ( qw(ok is like) ) {
82
83        my $glob = qualify_to_ref($subname,$package);
84
85        if (my $sub = *$glob{CODE}) {
86            *$glob = sub {
87                # get result; this is not a general wrapper, since most of
88                # the testing ignores return values here, we aren't worried
89                # about wantarray, etc; we need the return value to decide
90                # if we're going to output a bunch of additional debugging
91                # information.
92                my $res = $sub->( @_ );
93                if (!$res) {
94                    _log_context("@_");
95                }
96                $res;
97            }
98        }
99    }
100}
101
102## Test databases are labelled as A, B, C, etc.
103my @dbs = qw/A B C D E/;
104
105### TODO: Add point type (which has no natural ordering operator!)
106
107our %tabletype =
108    (
109     'bucardo_test1'  => 'SMALLINT',
110     'bucardo_test2'  => 'INT',
111     'Bucardo_test3'  => 'BIGINT',
112     'bucardo_test4'  => 'TEXT',
113     'bucardo_test5'  => 'DATE',
114     'bucardo_test6'  => 'TIMESTAMP',
115     'bucardo_test7'  => 'NUMERIC',
116     'bucardo_test8'  => 'BYTEA',
117     'bucardo_test9'  => 'int_unsigned',
118     'bucardo_test10' => 'TIMESTAMPTZ',
119     'bucardo space test' => 'INT',
120     );
121
122our %tabletypemysql =
123    (
124     'bucardo_test1'  => 'SMALLINT',
125     'bucardo_test2'  => 'INT',
126     'Bucardo_test3'  => 'BIGINT',
127     'bucardo_test4'  => 'VARCHAR(700)',
128     'bucardo_test5'  => 'DATE',
129     'bucardo_test6'  => 'DATETIME',
130     'bucardo_test7'  => 'NUMERIC(5,1)',
131     'bucardo_test8'  => 'VARBINARY(1000)',
132     'bucardo_test9'  => 'INTEGER UNSIGNED',
133     'bucardo_test10' => 'DATETIME',
134     'bucardo space test' => 'INT',
135     );
136
137our %tabletypemariadb =
138    (
139     'bucardo_test1'  => 'SMALLINT',
140     'bucardo_test2'  => 'INT',
141     'Bucardo_test3'  => 'BIGINT',
142     'bucardo_test4'  => 'VARCHAR(700)',
143     'bucardo_test5'  => 'DATE',
144     'bucardo_test6'  => 'DATETIME',
145     'bucardo_test7'  => 'NUMERIC(5,1)',
146     'bucardo_test8'  => 'VARBINARY(1000)',
147     'bucardo_test9'  => 'INTEGER UNSIGNED',
148     'bucardo_test10' => 'DATETIME',
149     'bucardo space test' => 'INT',
150     );
151
152our %tabletypefirebird =
153    (
154     'bucardo_test1'  => 'SMALLINT',
155     'bucardo_test2'  => 'INT',
156     'Bucardo_test3'  => 'BIGINT',
157     'bucardo_test4'  => 'VARCHAR(700)',
158     'bucardo_test5'  => 'DATE',
159     'bucardo_test6'  => 'DATETIME',
160     'bucardo_test7'  => 'NUMERIC(5,1)',
161     'bucardo_test8'  => 'VARBINARY(1000)',
162     'bucardo_test9'  => 'INTEGER UNSIGNED',
163     'bucardo_test10' => 'TIMESTAMP',
164     'bucardo space test' => 'INT',
165     );
166
167our %tabletypeoracle =
168    (
169     'bucardo_test1'  => 'SMALLINT',
170     'bucardo_test2'  => 'INT',
171     'Bucardo_test3'  => 'BIGINT',
172     'bucardo_test4'  => 'NVARCHAR2(1000)',
173     'bucardo_test5'  => 'DATE',
174     'bucardo_test6'  => 'TIMESTAMP',
175     'bucardo_test7'  => 'NUMERIC(5,1)',
176     'bucardo_test8'  => 'BLOB',
177     'bucardo_test9'  => 'INTEGER',
178     'bucardo_test10' => 'TIMESTAMP WITH TIME ZONE',
179     'bucardo space test' => 'INT',
180     );
181
182our %tabletypesqlite =
183    (
184     'bucardo_test1'  => 'SMALLINT',
185     'bucardo_test2'  => 'INT',
186     'Bucardo_test3'  => 'BIGINT',
187     'bucardo_test4'  => 'VARCHAR(1000)',
188     'bucardo_test5'  => 'DATE',
189     'bucardo_test6'  => 'DATETIME',
190     'bucardo_test7'  => 'NUMERIC(5,1)',
191     'bucardo_test8'  => 'VARBINARY(1000)',
192     'bucardo_test9'  => 'INTEGER UNSIGNED',
193     'bucardo_test10' => 'DATETIME',
194     'bucardo space test' => 'INT',
195     );
196
197
198our @tables2empty = (qw/droptest_bucardo/);
199
200our %sequences =
201    (
202    'bucardo_test_seq1' => '',
203    'bucardo_test_seq2' => '',
204    'Bucardo_test_seq3' => '',
205    );
206
207my %debug = (
208             recreatedb     => 0,
209             recreateschema => 1,
210             recreateuser   => 0,
211         );
212
213my $DEBUGDIR = ".";
214-e $DEBUGDIR or mkdir $DEBUGDIR;
215
216## To avoid stepping on other instance's toes
217my $PIDDIR = "/tmp/bucardo_testing_$ENV{USER}";
218mkdir $PIDDIR if ! -e $PIDDIR;
219
220## Let pg_config guide us to a likely initdb/pg_ctl location
221my $output = qx{pg_config --bindir};
222chomp $output;
223my $bindir = $output =~ m{^/} ? $1 : '';
224
225## Location of files
226my $initdb = $ENV{PGBINDIR} ? "$ENV{PGBINDIR}/initdb" : $bindir ? "$bindir/initdb" : 'initdb';
227my $pg_ctl = $ENV{PGBINDIR} ? "$ENV{PGBINDIR}/pg_ctl" : $bindir ? "$bindir/pg_ctl" : 'pg_ctl';
228
229## Get the default initdb location
230my $pgversion = qx{$initdb -V};
231my ($pg_ver, $pg_major_version, $pg_minor_version, $pg_point_version);
232if (defined $pgversion and $pgversion =~ /initdb \(PostgreSQL\) (\d+\..*)/) {
233    $pg_ver = $1;
234    ($pg_major_version, $pg_minor_version, $pg_point_version) = split /\./, $pg_ver;
235    $pg_minor_version =~ s/(\d+).+/$1/;
236}
237else {
238    die qq{Could not determine initdb version information from running "$initdb -V"\n};
239}
240
241sub pg_major_version { join '.', $pg_major_version, $pg_minor_version }
242
243## Each database can also have a custom version
244## We do this by setting PGBINDIR[A-Z]
245## This allows us to test (for example) a 8.1 master and an 8.4 slave
246my %pgver;
247my %clusterinfo;
248my $lport = 58920;
249for my $name ('A'..'Z') {
250    $lport++;
251    $clusterinfo{$name}{port} = $lport;
252
253    my $lbindir = $ENV{PGBINDIR} || '';
254    my $linitdb = $initdb;
255    my $lpgctl  = $pg_ctl;
256    my $localver = $pg_ver;
257    my ($lmaj,$lmin,$lrev) = ($pg_major_version, $pg_minor_version, $pg_point_version);
258    if (exists $ENV{"PGBINDIR$name"}) {
259        $lbindir = $ENV{"PGBINDIR$name"};
260        -d $lbindir or die qq{Invalid ENV "PGBINDIR$name"\n};
261        $linitdb = "$lbindir/initdb";
262        $lpgctl = "$lbindir/pg_ctl";
263
264        $COM = "$linitdb -V";
265        my $answer = qx{$COM};
266        die "Cannot find version from: $COM" if $answer !~ /initdb \(PostgreSQL\) (\d+\..*)/;
267        $localver = $1;
268        ($lmaj,$lmin,$lrev) = split /\./, $localver;
269        $lmin =~ s/(\d+).+/$1/;
270    }
271    $pgver{$name} = {
272        bindir  => $lbindir,
273        initdb  => $linitdb,
274        pgctl   => $lpgctl,
275        version => $localver,
276        ver     => "$lmaj.$lmin",
277        vmaj    => $lmaj,
278        vmin    => $lmin,
279        vrev    => $lrev,
280        dirname => "bucardo_test_database_${name}_$lmaj.$lmin",
281        port    => $lport,
282    };
283}
284
285# Set a semi-unique name to make killing old tests easier
286my $xname = "bctest_$ENV{USER}";
287
288## Maximum time to wait for bucardo to return
289my $ALARM_BUCARDO = 25;
290## Maximum time to wait for a kid to appear via pg_listener
291my $ALARM_WAIT4KID = 3;
292## How long to wait for most syncs to take effect?
293my $TIMEOUT_SYNCWAIT = 3;
294## How long to sleep between checks for sync being done?
295my $TIMEOUT_SLEEP = 0.1;
296## How long to wait for a notice to be issued?
297my $TIMEOUT_NOTICE = 4;
298
299## Bail if the bucardo file does not exist / does not compile
300for my $file (qw/bucardo Bucardo.pm/) {
301    if (! -e $file) {
302        die "Cannot run without file $file\n";
303    }
304    eval {
305        $ENV{BUCARDO_TEST} = 1;
306        require $file;
307        $ENV{BUCARDO_TEST} = 0;
308    };
309    if ($@) {
310        die "Cannot run unless $file compiles cleanly: $@\n";
311    }
312}
313
314## Prepare some test values for easy use
315## The secondary names are for other databases, e.g. MySQL
316our %val;
317my $xvalmax = 30;
318for (1..$xvalmax) {
319    $val{SMALLINT}{$_} = $_;
320    $val{INT}{$_} = 1234567+$_;
321    $val{BIGINT}{$_} = 7777777777 + $_;
322    $val{TEXT}{$_} = $val{'VARCHAR(1000)'}{$_} = $val{'VARCHAR(700)'}{$_} = "\\Pbc'$_";
323    $val{DATE}{$_} = sprintf '2001-10-%02d', $_;
324    $val{TIMESTAMP}{$_} = $val{DATE}{$_} . ' 12:34:56';
325    $val{NUMERIC}{$_} = $val{'NUMERIC(5,1)'}{$_} = 0.7 + $_;
326    $val{BYTEA}{$_} = "$_\0Z";
327    $val{int_unsigned}{$_} = $val{'INTEGER UNSIGNED'}{$_} = 5000 + $_;
328    $val{TIMESTAMPTZ}{$_} = $val{DATETIME}{$_} = $val{DATE}{$_} . ' 11:22:33+00';
329    $val{DATETIME}{$_} =~ s/\+00//;
330    $val{TIMESTAMPTZNOZERO} = $val{DATE}{$_} . ' 11:22:33';
331}
332
333
334sub diag {
335    Test::More::diag(@_);
336}
337
338
339sub new {
340
341    ## Create a new BucardoTesting object.
342    ## Arguments:
343    ## 1. Hashref of options (optional)
344    ## Returns: reference to a new BucardoTesting object
345
346    my $class = shift;
347    my $arg   = shift || {};
348    my $self  = {};
349    bless $self, $class;
350
351    if ($arg->{notime}) {
352        $notime = 1;
353    }
354
355    ## Make a note of which file invoked us for later debugging
356    $self->{file} = (caller)[1];
357
358    ## Bail on first error? Default is ENV, then false.
359    $bail_on_error = exists $arg->{bail} ? $arg->{bail} : $ENV{BUCARDO_TESTBAIL} || 0;
360
361    ## Name of the test schema
362    $self->{schema} = 'bucardo_schema';
363
364    ## Let's find out where bucardo is. Prefer the blib ones, which are shebang adjusted
365    if (-e 'blib/script/bucardo') {
366        $self->{bucardo} = 'blib/script/bucardo';
367    }
368    elsif (-e '../blib/script/bucardo') {
369        $self->{bucardo} = '../blib/script/bucardo';
370    }
371    elsif (-e './bucardo') {
372        $self->{bucardo} = './bucardo';
373    }
374    elsif (-e '../bucardo') {
375        $self->{bucardo} = '../bucardo';
376    }
377    else {
378        die qq{Could not find bucardo\n};
379    }
380
381    ## Handle both old and new way of setting location
382    if ($location eq 'setup' and $arg->{location}) {
383        $location = $self->{location} = $arg->{location};
384    }
385
386
387    return $self;
388
389} ## end of new
390
391
392sub debug {
393
394    ## Simply internal debugging routine, prints a message if $DEBUG is set
395    ## Arguments:
396    ## 1. Message to print
397    ## 2. Optional level, defaults to 0
398    ## Returns: nothing
399
400    $DEBUG or return;
401
402    my $msg = shift || 'No message?!';
403    my $level = shift || 0;
404
405    return if $DEBUG < $level;
406
407    chomp $msg;
408    warn "DEBUG: $msg\n";
409
410    return;
411
412} ## end of debug
413
414
415sub empty_cluster {
416
417    ## Empty out a cluster's databases
418    ## Creates the cluster and 'bucardo_test' database as needed
419    ## For existing databases, removes all known schemas
420    ## Always recreates the public schema
421    ## Arguments: one
422    ## 1. Name of the cluster
423    ## Returns: arrayref of database handles to the 'bucardo_test*' databases
424
425    my $self = shift;
426    my $clustername = shift or die;
427
428    ## Create the cluster if needed
429    $self->create_cluster($clustername);
430
431    ## Start it up if needed
432    $self->start_cluster($clustername);
433
434    my $alldbh;
435
436    ## Get a handle to the postgres database
437    my $masterdbh = $self->connect_database($clustername, 'postgres');
438
439    my $dbh;
440    if (database_exists($masterdbh, $dbname)) {
441        $dbh = $self->connect_database($clustername, $dbname);
442        ## Remove any of our known schemas
443        my @slist;
444        for my $sname (qw/ public bucardo freezer tschema /) {
445            push @slist => $sname if $self->drop_schema($dbh, $sname);
446        }
447        debug(qq{Schemas dropped from $dbname on $clustername: } . join ',' => @slist);
448
449        ## Recreate the public schema
450        $dbh->do("CREATE SCHEMA public");
451        $dbh->commit();
452    }
453    else {
454        local $masterdbh->{AutoCommit} = 1;
455        debug(qq{Creating database $dbname});
456        $masterdbh->do("CREATE DATABASE $dbname");
457        $dbh = $self->connect_database($clustername, $dbname);
458    }
459
460    $masterdbh->disconnect();
461
462    return $dbh;
463
464} ## end of empty_cluster
465
466
467sub create_cluster {
468
469    ## Create a cluster if it does not already exist
470    ## Runs initdb, then modifies postgresql.conf
471    ## Arguments:
472    ## 1. Name of the cluster
473    ## Returns: nothing
474
475    my $self = shift;
476    my $clustername = shift or die;
477
478    my $line = (caller)[2];
479    my $info = $pgver{$clustername}
480        or die qq{No such cluster as "$clustername" (called from line $line)\n};
481
482    my $dirname = $info->{dirname};
483
484    if (-d $dirname) {
485        ## Sometimes these test clusters get left in a broken state.
486        my $file = "$dirname/postgresql.conf";
487        if (! -e $file) {
488            ## Just move it out of the way, rather than deleting it
489            rename $dirname, "$dirname.old";
490        }
491        return;
492    }
493
494    my $localinitdb = $info->{initdb};
495
496    debug(qq{Running $localinitdb for cluster "$clustername"});
497    my $com = qq{$localinitdb -D $dirname 2>&1};
498    debug($com);
499    my $res = qx{$com};
500    die $res if $? != 0;
501    if ($DEBUG) {
502        warn Dumper $res;
503    }
504
505    ## Make some minor adjustments
506    my $connections = $clustername eq 'A' ? 150 : 75;
507    my $file = "$dirname/postgresql.conf";
508    open my $fh, '>>', $file or die qq{Could not open "$file": $!\n};
509    printf {$fh} "
510
511port                       = %d
512max_connections            = $connections
513random_page_cost           = 2.5
514log_statement              = 'all'
515log_min_duration_statement = 0
516client_min_messages        = WARNING
517log_line_prefix            = '%s %s[%s] '
518listen_addresses           = ''
519
520",
521    $info->{port}, '%m', '%d', '%p';
522
523    ## Make some per-version adjustments
524    if ($info->{ver} >= 8.3) {
525        print {$fh} "logging_collector = off\n";
526    }
527    else {
528        print {$fh} "redirect_stderr   = off\n";
529    }
530    close $fh or die qq{Could not close "$file": $!\n};
531
532    return;
533
534
535} ## end of create_cluster
536
537
538sub start_cluster {
539
540    ## Startup a cluster if not already running
541    ## Arguments:
542    ## 1. Name of the cluster
543    ## Returns: nothing
544
545    my $self = shift;
546    my $clustername = shift || 'A';
547
548    ## Create the cluster if needed
549    $self->create_cluster($clustername);
550
551    my $info = $pgver{$clustername};
552
553    my $dirname = $info->{dirname};
554
555    ## Check the PID file. If it exists and is active, simply return
556    my $pidfile = "$dirname/postmaster.pid";
557    if (-e $pidfile) {
558        open my $fh, '<', $pidfile or die qq{Could not open "$pidfile": $!\n};
559        <$fh> =~ /(\d+)/ or die qq{No PID found in file "$pidfile"\n};
560        my $pid = $1;
561        close $fh or die qq{Could not close "$pidfile": $!\n};
562        ## An active process should respond to a "ping kill"
563        $count = kill 0 => $pid;
564        return if 1 == $count;
565        ## If no response, remove the pidfile ourselves and go on
566        debug(qq{Server seems to have died, removing file "$pidfile"});
567        unlink $pidfile or die qq{Could not remove file "$pidfile"\n};
568    }
569
570    my $port = $info->{port};
571    debug(qq{Starting cluster "$clustername" on port $port});
572
573    ## If not Windows, we'll use Unix sockets with a custom socket dir
574    my $option = '';
575    if ($^O !~ /Win32/) {
576        my $sockdir = "$dirname/socket";
577        -e $sockdir or mkdir $sockdir;
578        $option = q{-o '-k socket'};
579        ## Older versions do not assume socket is right off of data dir
580        if ($info->{ver} <= 8.0) {
581            $option = qq{-o '-k $dirname/socket'};
582        }
583    }
584
585    ## Attempt to start it up with a pg_ctl call
586    my $localpgctl = $info->{pgctl};
587
588    $COM = qq{$localpgctl $option -l $dirname/pg.log -D $dirname start};
589    debug(qq{Running: $COM});
590    qx{$COM};
591
592    ## Wait for the pidfile to appear
593    my $maxwaitseconds = 20;
594    my $loops = 0;
595    {
596        last if -e $pidfile;
597        sleep 0.1;
598        if ($loops++ > ($maxwaitseconds * 10)) {
599            Test::More::BAIL_OUT ( 'Failed to connect to database' );
600            die "Failed to startup cluster $clustername, command was $COM\n";
601        }
602        redo;
603    }
604
605    ## Keep attempting to get a database connection until we get one or timeout
606    $maxwaitseconds = 20;
607
608    my $dbhost = getcwd;
609    $dbhost .= "/$dirname/socket";
610
611    ## Using the "invalidname" is a nice way to work around locale issues
612    my $dsn = "dbi:Pg:dbname=invalidname;port=$port;host=$dbhost";
613    my $dbh;
614
615    debug(qq{Connecting as $dsn});
616
617    $loops = 0;
618  LOOP: {
619        eval {
620            $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 });
621        };
622        last if $@ =~ /"invalidname"/;
623        sleep 0.1;
624        if ($loops++ > ($maxwaitseconds * 10)) {
625            die "Database did not come up: dsn was $dsn\n";
626        }
627        redo;
628    }
629
630    return;
631
632} ## end of start_cluster
633
634
635sub connect_database {
636
637    ## Return a connection to a database within a cluster
638    ## Arguments:
639    ## 1. Name of the cluster
640    ## 2. Name of the database (optional, defaults to 'bucardo_test')
641    ## Returns: database handle
642
643    my $self = shift;
644    my $clustername = shift or die;
645    my $ldbname = shift || $dbname;
646
647    ## This may be one of the "extra" databases. In which case the true cluster must be revealed:
648    $clustername =~ s/\d+$//;
649
650    ## Create and start the cluster as needed
651    $self->start_cluster($clustername);
652
653    ## Build the DSN to connect with
654    my $info = $pgver{$clustername};
655    my $dbport = $info->{port};
656    my $dbhost = getcwd . "/$info->{dirname}/socket";
657    my $dsn = "dbi:Pg:dbname=$ldbname;port=$dbport;host=$dbhost";
658
659    ## If we already have a cached version and it responds, return it
660    if (exists $dbh{$dsn}) {
661        my $dbh = $dbh{$dsn};
662        $dbh->ping and return $dbh;
663        ## No ping? Remove from the cache
664        $dbh->disconnect();
665        delete $dbh{$dsn};
666    }
667
668    my $dbh;
669    eval {
670        $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 });
671    };
672    if ($@) {
673        if ($ldbname eq 'postgres' and $@ =~ /"postgres"/) {
674
675            ## Probably an older version that uses template1
676            (my $localdsn = $dsn) =~ s/dbname=postgres/dbname=template1/;
677
678            ## Give up right away if we are already trying template1
679            die $@ if $localdsn eq $dsn;
680
681            debug(qq{Connection failed, trying to connect to template1 to create a postgres database});
682
683            ## Connect as template1 and create a postgres database
684            $dbh = DBI->connect($localdsn, '', '', { AutoCommit=>1, RaiseError=>1, PrintError=>0 });
685            $dbh->do('CREATE DATABASE postgres');
686            $dbh->disconnect();
687
688            ## Reconnect to our new database
689            $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 });
690        }
691        else {
692            die "$@\n";
693        }
694    }
695
696    $dbh->do(q{SET TIME ZONE 'UTC'});
697
698    if ($DEBUG) {
699        my $file = 'bucardo.debug.dsns.txt';
700        if (open my $fh, '>>', $file) {
701            print {$fh} "\n$dsn\n";
702            my ($host,$port,$db);
703            $dsn =~ /port=(\d+)/ and $port=$1;
704            $dsn =~ /dbname=(.+?);/ and $db=$1;
705            $dsn =~ /host=(.+)/ and $host=$1;
706            printf {$fh} "psql%s%s%s\n", " -h $host", " -p $port", " $db";
707            close $fh or die qq{Could not close file "$file": $!\n};
708        }
709    }
710
711    $dbh->commit();
712
713    return $dbh;
714
715} ## end of connect_database
716
717
718sub drop_schema {
719
720    ## Drop a schema if it exists
721    ## Two arguments:
722    ## 1. database handle
723    ## 2. name of the schema
724    ## Returns 1 if dropped, 0 if not
725
726    my ($self,$dbh,$sname) = @_;
727
728    return 0 if ! schema_exists($dbh, $sname);
729
730    local $dbh->{AutoCommit} = 1;
731    local $dbh->{Warn} = 0;
732    $dbh->do("DROP SCHEMA $sname CASCADE");
733
734    return 1;
735
736} ## end of drop_schema
737
738
739sub repopulate_cluster {
740
741    ## Make sure a cluster is empty, then add in the sample data
742    ## Arguments: two
743    ## 1. Name of the cluster
744    ## 2. Optional - number of additional databases to create
745    ## Returns: database handle to the 'bucardo_test' database
746
747    my $self = shift;
748    my $clustername = shift or die;
749    my $extradbs = shift || 0;
750
751    Test::More::note("Recreating cluster $clustername");
752
753    my $dbh = $self->empty_cluster($clustername);
754    $self->add_test_schema($dbh, $clustername);
755
756    ## Now recreate all the extra databases via templating
757    for my $number (1..$extradbs) {
758        my $dbname2 = "$dbname$number";
759        local $dbh->{AutoCommit} = 1;
760        if (database_exists($dbh, $dbname2)) {
761            ## First, kill other sessions!
762            my $odbh = $self->connect_database($clustername, $dbname2);
763            eval {
764                $SQL = 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = ? AND pid <> pg_backend_pid()';
765                $sth = $odbh->prepare($SQL);
766                $odbh->execute($dbname2);
767                $odbh->commit();
768            };
769            $odbh->disconnect();
770            $dbh->do("DROP DATABASE $dbname2");
771        }
772        $dbh->do("CREATE DATABASE $dbname2 TEMPLATE $dbname");
773    }
774
775    ## Store our names away
776    $gdbh{$clustername} = $dbh;
777
778    return $dbh;
779
780} ## end of repopulate_cluster
781
782
783sub add_test_schema {
784
785    ## Add an empty test schema to a database
786    ## Arguments: two
787    ## 1. database handle (usually to 'bucardo_test')
788    ## 2. Cluster name
789    ## Returns: nothing
790
791    my $self = shift;
792    my $dbh = shift or die;
793    my $clustername = shift or die;
794
795    my ($tcount,$scount,$fcount) = (0,0,0);
796
797    ## Empty out or create the droptest table
798    if (table_exists($dbh => 'droptest_bucardo')) {
799        $dbh->do('TRUNCATE TABLE droptest_bucardo');
800    }
801    else {
802        $tcount++;
803        $dbh->do(q{
804            CREATE TABLE droptest_bucardo (
805              name TEXT NOT NULL,
806              type TEXT NOT NULL
807            )
808        });
809    }
810
811    ## Create the language if needed
812    if (!language_exists($dbh => 'plpgsql')) {
813        debug(q{Creating language plpgsql});
814        $dbh->do('CREATE LANGUAGE plpgsql');
815    }
816
817    ## Create supporting functions as needed
818    if (!function_exists($dbh => 'trigger_test')) {
819        $fcount++;
820        $dbh->do(q{
821                CREATE FUNCTION trigger_test()
822                RETURNS trigger
823                LANGUAGE plpgsql
824                AS $_$ BEGIN
825                INSERT INTO droptest_bucardo(name,type)
826                    VALUES (TG_RELNAME, 'trigger');
827                RETURN NULL;
828                END;
829                $_$
830            });
831    }
832    if (!function_exists($dbh => 'trigger_test_zero')) {
833        $fcount++;
834        $dbh->do(q{
835                CREATE FUNCTION trigger_test_zero()
836                RETURNS trigger
837                LANGUAGE plpgsql
838                AS $_$ BEGIN
839                INSERT INTO droptest_bucardo(name,type)
840                    VALUES (TG_RELNAME, 'trigger');
841                RETURN NULL;
842                END;
843                $_$;
844            });
845    }
846
847    ## Create our helper domain for pseudo-types
848    if (domain_exists($dbh => 'int_unsigned')) {
849        $dbh->do('DROP DOMAIN int_unsigned CASCADE');
850    }
851    $dbh->do('CREATE DOMAIN int_unsigned INTEGER CHECK (value >= 0)');
852
853    ## Create one table for each table type
854    for my $table (sort keys %tabletype) {
855
856        local $dbh->{Warn} = 0;
857
858        ## Does the table already exist? If so, drop it.
859        if (table_exists($dbh => $table)) {
860            $dbh->do(qq{DROP TABLE "$table"});
861        }
862
863        my $pkeyname = $table =~ /test5/ ? q{"id space"} : 'id';
864        my $pkindex = $table =~ /test2/ ? '' : 'PRIMARY KEY';
865        $SQL = qq{
866            CREATE TABLE "$table" (
867                $pkeyname    $tabletype{$table} NOT NULL $pkindex};
868        $SQL .= $table =~ /X/ ? "\n)" : qq{,
869                data1 TEXT                   NULL,
870                inty  SMALLINT               NULL,
871                booly BOOLEAN                NULL,
872                bite1 BYTEA                  NULL,
873                bite2 BYTEA                  NULL,
874                email TEXT                   NULL UNIQUE
875            )
876            };
877
878        $dbh->do($SQL);
879        $tcount++;
880
881        if ($table =~ /test2/) {
882            $dbh->do(qq{ALTER TABLE "$table" ADD CONSTRAINT multipk PRIMARY KEY ($pkeyname,data1)});
883        }
884
885        ## Create a trigger to test trigger supression during syncs
886        $SQL = qq{
887            CREATE TRIGGER "bctrig_$table"
888            AFTER INSERT OR UPDATE ON "$table"
889            FOR EACH ROW EXECUTE PROCEDURE trigger_test()
890            };
891        $table =~ /0/ and ($SQL =~ s/trigger_test/trigger_test_zero/);
892        $dbh->do($SQL);
893
894        ## Create a rule to test rule supression during syncs
895        $SQL = qq{
896            CREATE OR REPLACE RULE "bcrule_$table"
897            AS ON INSERT TO "$table"
898            DO ALSO INSERT INTO droptest_bucardo(name,type) VALUES ('$table','rule')
899            };
900        $table =~ /0/ and $SQL =~ s/NEW.inty/0/;
901        $dbh->do($SQL);
902    }
903
904    ## Create the foreign key tables
905    #$dbh->do('CREATE TABLE bucardo_fkey1 (fkid INTEGER NOT NULL PRIMARY KEY, data2 TEXT)');
906    $SQL = q{
907ALTER TABLE bucardo_fkey1
908  ADD CONSTRAINT "bucardo_fkey1"
909  FOREIGN KEY (fkid)
910  REFERENCES bucardo_test1 (id)
911  ON DELETE CASCADE ON UPDATE CASCADE
912};
913    #$dbh->do($SQL);
914
915    ## Create one sequence for each table type
916    for my $seq (sort keys %sequences) {
917
918        local $dbh->{Warn} = 0;
919
920        ## Does the sequence already exist? If so, drop it.
921        if (table_exists($dbh => $seq)) {
922            $dbh->do(qq{DROP SEQUENCE "$seq"});
923        }
924
925        $SQL = qq{CREATE SEQUENCE "$seq"};
926        $dbh->do($SQL);
927        $scount++;
928    }
929
930    debug("Test objects created for $clustername. Tables: $tcount  Sequences: $scount  Functions: $fcount");
931#    diag("Test objects created for $clustername. Tables: $tcount  Sequences: $scount  Functions: $fcount");
932
933    $dbh->commit() if ! $dbh->{AutoCommit};
934
935    return;
936
937} ## end of add_test_schema
938
939sub mock_serialization_failure {
940    my ($self, $dbh, $table) = @_;
941    return if $dbh->{pg_server_version} < 80401;
942    $table ||= 'bucardo_test1';
943
944    # Mock a serialization failure on every other INSERT. Runs only when
945    # `session_replica_role` is "replica", which it true for Bucardo targets.
946    $dbh->do(qq{
947        DROP SEQUENCE IF EXISTS serial_seq;
948        CREATE SEQUENCE serial_seq;
949
950        CREATE OR REPLACE FUNCTION mock_serial_fail(
951        ) RETURNS trigger LANGUAGE plpgsql AS \$_\$
952        BEGIN
953            IF nextval('serial_seq') % 2 = 0 THEN RETURN NEW; END IF;
954            RAISE EXCEPTION 'Serialization error'
955                  USING ERRCODE = 'serialization_failure';
956        END;
957        \$_\$;
958
959        CREATE TRIGGER mock_serial_fail AFTER INSERT ON "$table"
960            FOR EACH ROW EXECUTE PROCEDURE mock_serial_fail();
961        ALTER TABLE "$table" ENABLE REPLICA TRIGGER mock_serial_fail;
962    });
963    $dbh->commit;
964
965    return 1;
966} ## end of mock_serialization_failure
967
968sub unmock_serialization_failure {
969    my ($self, $dbh, $table) = @_;
970    return if $dbh->{pg_server_version} < 80401;
971    $table ||= 'bucardo_test1';
972
973    $dbh->do(qq{
974        DROP TRIGGER IF EXISTS mock_serial_fail ON "$table";
975        DROP FUNCTION IF EXISTS mock_serial_fail();
976        DROP SEQUENCE IF EXISTS serial_seq;
977    });
978
979    return 1;
980} ## end of unmock_serialization_failure
981
982sub add_test_databases {
983
984    ## Add one or more databases to the bucardo.db table
985    ## Arguments:
986    ## 1. White-space separated db names
987    ## Returns: nothing
988
989    my $self = shift;
990    my $string = shift or die;
991
992    for my $db (split /\s+/ => $string) {
993        my $ctlargs = $self->add_db_args($db);
994        my $i = $self->ctl("add database bucardo_test $ctlargs");
995        die $i if $i =~ /ERROR/;
996    }
997
998    return;
999
1000} ## end of add_test_databases
1001
1002
1003sub add_db_args {
1004
1005    ## Arguments:
1006    ## 1. Name of a cluster
1007    ## Returns: DSN-like string to connect to that cluster
1008    ## Allows for "same" databases o the form X# e.g. A1, B1
1009    ## May return string or array depending on how it was called
1010
1011    my $self = shift;
1012    my $clustername = shift or die;
1013
1014    $clustername =~ s/\d+$//;
1015
1016    ## Build the DSN to connect with
1017    my $info = $pgver{$clustername};
1018    my $dbport = $info->{port};
1019    my $dbhost = getcwd . "/$info->{dirname}/socket";
1020    my $dsn = "dbi:Pg:dbname=$dbname;port=$dbport;host=$dbhost";
1021
1022    return wantarray
1023        ? ($user,$dbport,$dbhost)
1024        : "name=$dbname user=$user port=$dbport host=$dbhost";
1025
1026} ## end of add_db_args
1027
1028
1029sub stop_bucardo {
1030
1031    ## Stops Bucardo via a bucardo request
1032    ## Arguments: none
1033    ## Returns: 1
1034
1035    my $self = shift;
1036
1037    $self->ctl('stop testing');
1038
1039    sleep 0.2;
1040
1041    return 1;
1042
1043} ## end of stop_bucardo
1044
1045
1046sub ctl {
1047
1048    ## Run a simple non-forking command against bucardo
1049    ## Emulates a command-line invocation
1050    ## Arguments:
1051    ## 1. String to pass to bucardo
1052    ## 2. Database name to connect to. Used only when we're not confident the bucardo database exists already.
1053    ## Returns: answer as a string
1054
1055    my ($self,$args, $db) = @_;
1056    $db ||= 'bucardo';
1057
1058    my $info;
1059    my $ctl = $self->{bucardo};
1060
1061    ## Build the connection options
1062    my $bc = $self->{bcinfo};
1063    my $connopts = '';
1064    for my $arg (qw/host port pass/) {
1065        my $val = 'DB' . (uc $arg) . '_bucardo';
1066        next unless exists $bc->{$val} and length $bc->{$val};
1067        $connopts .= " --db$arg=$bc->{$val}";
1068    }
1069    $connopts .= " --dbname=$db --log-dest .";
1070    $connopts .= " --dbuser=$user";
1071    ## Just hard-code these, no sense in multiple Bucardo base dbs yet:
1072    $connopts .= " --dbport=58921";
1073    my $dbhost = getcwd;
1074    my $dirname = $pgver{A}{dirname};
1075    $dbhost .= "/$dirname/socket";
1076    $connopts .= " --dbhost=$dbhost";
1077    $connopts .= " --no-bucardorc";
1078
1079    ## Whitespace cleanup
1080    $args =~ s/^\s+//s;
1081
1082    ## Allow the caller to look better
1083    $args =~ s/^bucardo\s+//;
1084
1085    ## Set a timeout
1086    alarm 0;
1087    eval {
1088        local $SIG{ALRM} = sub { die "Alarum!\n"; };
1089        alarm $ALARM_BUCARDO;
1090        debug("Script: $ctl Connection options: $connopts Args: $args", 3);
1091        $info = decode( locale => qx{$ctl $connopts $args 2>&1} );
1092        debug("Exit value: $?", 3);
1093        die $info if $? != 0;
1094        alarm 0;
1095    };
1096
1097    if ($@ =~ /Alarum/ or $info =~ /Alarum/) {
1098        return __PACKAGE__ . ' timeout hit, giving up';
1099    }
1100    if ($@) {
1101        return "Error running bucardo: " . decode( locale => $@ ) . "\n";
1102    }
1103
1104    debug("bucardo said: $info", 3);
1105
1106    return $info;
1107
1108} ## end of ctl
1109
1110
1111sub restart_bucardo {
1112
1113    ## Start Bucardo, but stop first if it is already running
1114    ## Arguments: one, two, or three
1115    ## 1. database handle to the bucardo_control_test db
1116    ## 2. The notice we wait for, defaults to: bucardo_started
1117    ## 3. The message to give to the "pass" function, defaults to: Bucardo was started
1118    ## Returns: nothing
1119
1120    my ($self,$dbh,$notice,$passmsg) = @_;
1121
1122    my $line = (caller)[2];
1123
1124    $notice ||= 'bucardo_started';
1125    $passmsg ||= "Bucardo was started (caller line $line)";
1126
1127    $self->stop_bucardo();
1128
1129    ## Because the stop signal arrives before the PID is removed, sleep a bit
1130    sleep 2;
1131
1132    pass("Starting up Bucardo (caller line $line)");
1133    $dbh->do('LISTEN bucardo');
1134    $dbh->do('LISTEN bucardo_boot');
1135    $dbh->do("LISTEN $notice");
1136    $dbh->do('LISTEN bucardo_nosyncs');
1137    $dbh->commit();
1138
1139    my $output = $self->ctl('start --exit-on-nosync --quickstart testing');
1140
1141    my $bail = 50;
1142    my $n;
1143  WAITFORIT: {
1144        if ($bail--<0) {
1145            $output =~ s/^/#     /gmx;
1146            my $time = localtime;
1147            die "Bucardo did not start, but we waited!\nTime: $time\nStart output:\n\n$output\n";
1148        }
1149        while ($n = $dbh->func('pg_notifies')) {
1150            my ($name, $pid, $payload) = @$n;
1151            if ($dbh->{pg_server_version} >= 9999990000) {
1152                next if $name ne 'bucardo';
1153                $name = $payload;
1154            }
1155            last WAITFORIT if $name eq $notice;
1156        }
1157        $dbh->commit();
1158        sleep 0.2;
1159        redo;
1160    }
1161    pass($passmsg);
1162
1163    ## There is a race condition here for testing
1164    ## Bucardo starts up, and gives the notice above.
1165    ## However, after it does so, CTLs and KIDs start up and look for new rows
1166    ## If the caller of this function makes changes right away and then kicks,
1167    ## Bucardo may see them on the "startup kick" and thus the caller will
1168    ## get a "syncdone" message that was not initiated by *their* kick.
1169    ## One way around this is to make sure your caller immediately does a
1170    ## kick 0, which will flush out the startup kick. If it arrives after the
1171    ## startup kick, then it simply returns as a sync with no activity
1172
1173    return 1;
1174
1175} ## end of restart_bucardo
1176
1177sub setup_bucardo {
1178
1179    ## Installs bucardo via "bucardo install" into a database
1180    ## The database will be emptied out first if it already exists
1181    ## If it does not exist, it will be created
1182    ## If the cluster does not exist, it will be created
1183    ## Arguments:
1184    ## 1. Name of the cluster
1185    ## Returns: database handle to the bucardo database
1186
1187    my $self = shift;
1188    my $clustername = shift or die;
1189
1190    Test::More::note('Installing Bucardo');
1191
1192    $self->create_cluster($clustername);
1193    my $dbh = $self->connect_database($clustername, 'postgres');
1194    if (database_exists($dbh,'bucardo')) {
1195        my $retries = 5;
1196        my $pidcol = $dbh->{pg_server_version} >= 90200 ? 'pid' : 'procpid';
1197        do {
1198            ## Kick off all other people
1199            $SQL = qq{SELECT $pidcol FROM pg_stat_activity WHERE datname = 'bucardo' and $pidcol <> pg_backend_pid()};
1200            for my $row (@{$dbh->selectall_arrayref($SQL)}) {
1201                my $pid = $row->[0];
1202                $SQL = 'SELECT pg_terminate_backend(?)';
1203                $sth = $dbh->prepare($SQL);
1204                $sth->execute($pid);
1205            }
1206            $dbh->commit();
1207        } while ($dbh->selectrow_array(qq{SELECT count(*) FROM pg_stat_activity WHERE datname = 'bucardo' and $pidcol <> pg_backend_pid()}))[0] && $retries--;
1208        debug(qq{Dropping database bucardo from cluster $clustername});
1209        local $dbh->{AutoCommit} = 1;
1210        $dbh->do('DROP DATABASE bucardo');
1211    }
1212
1213    ## Make sure we have a postgres role
1214    if (! user_exists($dbh, 'postgres')) {
1215        $dbh->do('CREATE USER postgres SUPERUSER');
1216        $dbh->commit();
1217    }
1218
1219    ## Now run the install. Timeout after a few seconds
1220    debug(qq{Running bucardo install on cluster $clustername});
1221    my $info = $self->ctl('install --batch', 'postgres');
1222
1223    if ($info !~ /Installation is now complete/) {
1224        die "Installation failed: $info\n";
1225    }
1226
1227    ## Reconnect to the new database
1228    $dbh = $self->connect_database($clustername, 'bucardo');
1229
1230    ## Make some adjustments
1231    $sth = $dbh->prepare('UPDATE bucardo.bucardo_config SET setting = $2 WHERE name = $1');
1232    $count = $sth->execute('piddir' => $PIDDIR);
1233    $count = $sth->execute('reason_file' => "$PIDDIR/reason");
1234    $count = $sth->execute('sendmail_file' => 'debug.sendmail.txt');
1235    $count = $sth->execute('audit_pid' => 1);
1236    $dbh->commit();
1237
1238    ## Adjust a second way
1239    $self->ctl('set log_level=debug log_microsecond=1 log_showline=1');
1240
1241    debug(qq{Install complete});
1242
1243    return $dbh;
1244
1245} ## end of setup_bucardo
1246
1247# utility sub called on test error to output pg and bucardo logs to a single
1248# output file with context; mainly useful for CI debugging/output
1249sub _log_context {
1250    return unless $ENV{BUCARDO_LOG_ERROR_CONTEXT};
1251
1252    warn "Logging context for @_; dir=$ENV{PWD}\n";
1253    system("echo '====================' >> log.context");
1254    system("date >> log.context");
1255    system(sprintf "echo '%s' >> log.context", quotemeta($_[0])) if $_[0];
1256    system("tail -n 100 log.bucardo bucardo_test_database_*/pg.log 2>/dev/null >> log.context");
1257}
1258
1259## Utility functions for object existences:
1260sub thing_exists {
1261    my ($dbh,$name,$table,$column) = @_;
1262    my $SQL = "SELECT 1 FROM $table WHERE $column = ?";
1263    ## Only want tables from the public schema for now
1264    if ($table eq 'pg_class') {
1265        $SQL .= qq{ AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')};
1266    }
1267    my $sth = $dbh->prepare($SQL);
1268    $count = $sth->execute($name);
1269    $sth->finish();
1270    $dbh->commit() if ! $dbh->{AutoCommit};
1271    return $count < 1 ? 0 : $count;
1272}
1273sub schema_exists   { return thing_exists(@_, 'pg_namespace', 'nspname'); }
1274sub language_exists { return thing_exists(@_, 'pg_language',  'lanname'); }
1275sub database_exists { return thing_exists(@_, 'pg_database',  'datname'); }
1276sub user_exists     { return thing_exists(@_, 'pg_user',      'usename'); }
1277sub table_exists    { return thing_exists(@_, 'pg_class',     'relname'); }
1278sub function_exists { return thing_exists(@_, 'pg_proc',      'proname'); }
1279sub domain_exists   { return thing_exists(@_, 'pg_type',      'typname'); }
1280
1281
1282sub wait_for_notice {
1283
1284    ## Wait until a named NOTIFY is issued
1285    ## Arguments:
1286    ## 1. The listen string or array of strings
1287    ## 2. Seconds until we give up
1288    ## 3. Seconds we sleep between checks
1289    ## 4. Boolean: bail out if not found (defaults to true)
1290    ## Returns true if the NOTIFY was recieved.
1291
1292    my $self = shift;
1293    my $dbh = shift;
1294    my $text = shift;
1295    my $timeout = shift || $TIMEOUT_NOTICE;
1296    my $sleep = shift || $TIMEOUT_SLEEP;
1297    my $bail = shift;
1298    $bail = 0 if !defined($bail);
1299    my $n;
1300    my %wait_for;
1301    for my $str (ref $text ? @{ $text } : $text) {
1302        $wait_for{$str}++;
1303    }
1304
1305    eval {
1306        local $SIG{ALRM} = sub { die "Lookout!\n"; };
1307        alarm $timeout;
1308      N: {
1309            while ($n = $dbh->func('pg_notifies')) {
1310                my ($name, $pid, $payload) = @$n;
1311                $name = $payload if length $payload;
1312                if (exists $wait_for{$name}) {
1313                    if (--$wait_for{$name} == 0) {
1314                        delete $wait_for{$name};
1315                        last N unless %wait_for;
1316                    }
1317                }
1318                else {
1319                    debug("notice was $name", 1);
1320                }
1321            }
1322            sleep $sleep;
1323            redo;
1324        }
1325        alarm 0;
1326    };
1327    if ($@) {
1328        if ($@ =~ /Lookout/o) {
1329            my $line = (caller)[2];
1330            my $now = scalar localtime;
1331            my $texts = join '", "', keys %wait_for;
1332            my $pl = keys %wait_for > 1 ? 's' : '';
1333            my $notice = qq{Gave up waiting for notice$pl "$texts": timed out at $timeout from line $line. Time=$now};
1334            if ($bail) {
1335                Test::More::BAIL_OUT ($notice);
1336            }
1337            else {
1338                die $notice;
1339            }
1340            return;
1341        }
1342    }
1343    return 1;
1344
1345} ## end of wait_for_notice
1346
1347## Older methods:
1348
1349sub fresh_database {
1350
1351    ## Drop and create the bucardo_test database
1352    ## First arg is cluster name
1353    ## Second arg is hashref, can be 'dropdb'
1354
1355    my $self = shift;
1356    my $name = shift || 'A';
1357    my $arg = shift || {};
1358
1359    my $dirname = $pgver{$name}{dirname};
1360
1361    ## Just in case
1362    -d $dirname or $self->create_cluster($name);
1363    -e "$dirname/postmaster.pid" or $self->start_cluster($name);
1364
1365    my $dbh = $self->connect_database($name, 'postgres');
1366
1367    my $brandnew = 0;
1368    {
1369        if (database_exists($dbh => $dbname) and $arg->{dropdb}) {
1370            local $dbh->{AutoCommit} = 1;
1371            debug("Dropping database $dbname");
1372            $dbh->do("DROP DATABASE $dbname");
1373        }
1374        if (!database_exists($dbh => $dbname)) {
1375            local $dbh->{AutoCommit} = 1;
1376            debug("Creating database $dbname");
1377            $dbh->do("CREATE DATABASE $dbname");
1378            $brandnew = 1;
1379            $dbh->disconnect();
1380        }
1381    }
1382
1383    $dbh = $self->connect_database($name, $dbname);
1384
1385    return $dbh if $brandnew;
1386
1387    $self->empty_test_database($dbh);
1388
1389    return $dbh;
1390
1391} ## end of fresh_database
1392
1393
1394
1395sub create_database {
1396
1397    ## Create a new database
1398    ## First argument is the cluster name
1399    ## Second argument is the name of the database
1400    ## If the database already exists, nothing will be done
1401    ## Returns a database handle to the database
1402
1403    my $self = shift;
1404    my $clustername = shift or die;
1405    my $dbname = shift or die;
1406
1407    my $dirname = $pgver{$clustername}{dirname};
1408
1409    ## Create the cluster if needed
1410    -d $dirname or $self->create_cluster($clustername);
1411
1412    ## Start the cluster up if needed
1413    -e "$dirname/postmaster.pid" or $self->start_cluster($clustername);
1414
1415    ## Connect to the database
1416
1417    my $dbh = $self->connect_database($clustername, 'postgres');
1418
1419    if (! database_exists($dbh => $dbname)) {
1420        local $dbh->{AutoCommit} = 1;
1421        debug("Creating database $dbname");
1422        $dbh->do("CREATE DATABASE $dbname");
1423        $dbh->disconnect();
1424    }
1425
1426    $dbh = $self->connect_database($clustername, $dbname);
1427
1428    return $dbh;
1429
1430} ## end of create_database
1431
1432
1433sub empty_test_database {
1434
1435    ## Wipe all data tables from a test database
1436    ## Takes a database handle as only arg
1437
1438    my $self = shift;
1439    my $dbh = shift;
1440
1441    if ($dbh->{pg_server_version} >= 80300) {
1442        $dbh->do(q{SET session_replication_role = 'replica'});
1443    }
1444
1445    for my $table (sort keys %tabletype) {
1446        $dbh->do(qq{TRUNCATE TABLE "$table"});
1447    }
1448
1449    for my $table (@tables2empty) {
1450        $dbh->do(qq{TRUNCATE TABLE "$table"});
1451    }
1452
1453    if ($dbh->{pg_server_version} >= 80300) {
1454        $dbh->do(q{SET session_replication_role = 'origin'});
1455    }
1456    $dbh->commit;
1457
1458    return;
1459
1460} ## end of empty_test_database
1461
1462END {
1463#    __PACKAGE__->shutdown_cluster($_) for keys %pgver;
1464}
1465
1466sub shutdown_cluster {
1467
1468    ## Shutdown a cluster if running
1469    ## Takes the cluster name
1470
1471    my $self = shift;
1472    my $name = shift;
1473
1474    my $dirname = $pgver{$name}{dirname};
1475
1476    return if ! -d $dirname;
1477
1478    my $pidfile = "$dirname/postmaster.pid";
1479    return if ! -e $pidfile;
1480
1481    Test::More::note("Stopping cluster $name");
1482    my @cmd = ($pg_ctl, '-D', $dirname, '-s', '-m', 'fast', 'stop');
1483    system(@cmd) == 0 or die "@cmd failed: $?\n";
1484
1485    ## Hang around until the PID file is gone
1486    my $loops = 0;
1487    {
1488        sleep 0.2;
1489        last if ! -e $pidfile;
1490        redo;
1491    }
1492
1493    delete $gdbh{$name};
1494
1495    return;
1496
1497} ## end of shutdown_cluster
1498
1499
1500sub remove_cluster {
1501
1502    ## Remove a cluster, shutting it down first
1503    ## Takes the cluster name
1504
1505    my $self = shift;
1506    my $name = shift;
1507
1508    my $dirname = $pgver{$name}{dirname};
1509
1510    return if ! -d $dirname;
1511
1512    ## Just in case
1513    $self->shutdown_cluster($name);
1514
1515    system("rm -fr $dirname");
1516
1517    return;
1518
1519} ## end of remove_cluster
1520
1521
1522
1523
1524
1525
1526
1527
1528sub tt {
1529    ## Simple timing routine. Call twice with the same arg, before and after
1530    my $name = shift or die qq{Need a name!\n};
1531    if (exists $timing{$name}) {
1532        my $newtime = tv_interval($timing{$name});
1533        debug("Timing for $name: $newtime");
1534        delete $timing{$name};
1535    }
1536    else {
1537        $timing{$name} = [gettimeofday];
1538    }
1539    return;
1540} ## end of tt
1541
1542sub t {
1543
1544    $testmsg = shift || '';
1545    $testline = shift || (caller)[2];
1546    $testmsg =~ s/^\s+//;
1547    if ($location) {
1548        $testmsg = "($location) $testmsg";
1549    }
1550    $testmsg .= " [line: $testline]";
1551    my $time = time;
1552    $testmsg .= " [time: $time]" unless $notime;
1553
1554    return;
1555
1556} ## end of t
1557
1558
1559sub add_test_tables_to_herd {
1560
1561    ## Add all of the test tables (and sequences) to a herd
1562    ## Create the herd if it does not exist
1563    ## First arg is database name, second arg is the herdname
1564
1565    my $self = shift;
1566    my $db = shift;
1567    my $herd = shift;
1568
1569    my $result = $self->ctl("add herd $herd");
1570    if ($result !~ /Added herd/) {
1571        die "Failed to add herd $herd: $result\n";
1572    }
1573
1574    my $addstring = join ' ' => sort keys %tabletype;
1575    my $com = "add table $addstring db=$db herd=$herd";
1576    $result = $self->ctl($com);
1577    if ($result !~ /Added table/) {
1578        die "Failed to add tables: $result (command was: $com)\n";
1579    }
1580
1581    $addstring = join ' ' => sort keys %sequences;
1582    $com = "add sequence $addstring db=$db herd=$herd";
1583    $result = $self->ctl($com);
1584    if ($result !~ /Added sequence/) {
1585        die "Failed to add sequences: $result (command was: $com)\n";
1586    }
1587
1588    return;
1589
1590} ## end of add_test_tables_to_herd
1591
1592
1593sub bc_deeply {
1594
1595    my ($exp,$dbh,$sql,$msg,$oline) = @_;
1596    my $line = (caller)[2];
1597
1598    local $Data::Dumper::Terse = 1;
1599    local $Data::Dumper::Indent = 0;
1600
1601    die "Very invalid statement from line $line: $sql\n" if $sql !~ /^\s*select/i;
1602
1603    my $got;
1604    eval {
1605        $got = $dbh->selectall_arrayref($sql);
1606    };
1607    if ($@) {
1608        die "bc_deeply failed from line $line. SQL=$sql\n$@\n";
1609    }
1610
1611    local $Test::Builder::Level = $Test::Builder::Level + 1;
1612    return is_deeply($got,$exp,$msg,$oline||(caller)[2]);
1613
1614} ## end of bc_deeply
1615
1616sub clear_notices {
1617    my $dbh = shift;
1618    my $timeout = shift || $TIMEOUT_NOTICE;
1619    sleep $timeout;
1620    0 while (my $n = $dbh->func('pg_notifies'));
1621}
1622
1623
1624sub get_pgctl_options {
1625    my $dirname = shift;
1626    my $option;
1627    if ($^O !~ /Win32/) {
1628        my $sockdir = "$dirname/socket";
1629        -e $sockdir or mkdir $sockdir;
1630        $option = q{-o '-k socket'};
1631    }
1632    return $option;
1633}
1634
1635sub remove_single_dir {
1636
1637    my $dirname = shift;
1638    print "Removing test database in $dirname\n";
1639    # Try stopping PostgreSQL
1640    my $options = get_pgctl_options($dirname);
1641    qx{$pg_ctl $options -l $dirname/pg.log -D $dirname stop -m immediate};
1642    sleep 2;
1643    qx{rm -rf $dirname};
1644    return;
1645
1646}
1647
1648sub drop_database {
1649
1650    my ($self, $dir) = @_;
1651    if ($dir eq 'all') {
1652        ok(opendir(my $dh, '.'), 'Open current directory to clean up');
1653        my @test_db_dirs = grep { -d $_ && /^bucardo_test_database/ } readdir $dh;
1654        close($dh);
1655
1656        for my $dirname (@test_db_dirs) {
1657            remove_single_dir($dirname);
1658        }
1659    }
1660    else {
1661        remove_single_dir($dir);
1662    }
1663    return;
1664}
1665
1666
1667sub add_row_to_database {
1668
1669    ## Add a row to each table in one of the databases
1670    ## Arguments: three
1671    ## 1. Database name to use
1672    ## 2. Value to use (lookup, not the direct value)
1673    ## 3. Do we commit or not? Boolean, defaults to true
1674    ## Returns: undef
1675
1676    my ($self, $dbname, $xval, $commit) = @_;
1677
1678
1679    if ($xval > $xvalmax) {
1680        die "Too high of an ID: max is $xvalmax\n";
1681    }
1682
1683    $commit = 1 if ! defined $commit;
1684
1685    my $dbh = $gdbh{$dbname} or die "No such database: $dbname";
1686
1687    ## Loop through each table we know about
1688    for my $table (sort keys %tabletype) {
1689
1690        ## Look up the actual value to use
1691        my $type = $tabletype{$table};
1692        my $value = $val{$type}{$xval};
1693
1694        ## Prepare it if we have not already
1695        if (! exists $gsth{$dbh}{insert}{$xval}{$table}) {
1696
1697            ## Handle odd pkeys
1698            my $pkey = $table =~ /test5/ ? q{"id space"} : 'id';
1699
1700            ## Put some standard values in, plus a single placeholder
1701            my $SQL = qq{INSERT INTO "$table"($pkey,data1,inty,booly) VALUES (?,'foo',$xval,'true')};
1702            $gsth{$dbh}{insert}{$xval}{$table} = $dbh->prepare($SQL);
1703
1704            ## If this is a bytea, we need to tell DBD::Pg about it
1705            if ('BYTEA' eq $type) {
1706                $gsth{$dbh}{insert}{$xval}{$table}->bind_param(1, undef, {pg_type => PG_BYTEA});
1707            }
1708
1709        }
1710
1711        ## Execute!
1712        $gsth{$dbh}{insert}{$xval}{$table}->execute($value);
1713
1714    }
1715
1716    $dbh->commit() if $commit;
1717
1718    return undef;
1719
1720} ## end of add_row_to_database
1721
1722
1723sub update_row_in_database {
1724
1725    ## Change a row in each table in a database
1726    ## We always change the "inty" field
1727    ## Arguments: four
1728    ## 1. Database name to use
1729    ## 2. Primary key to update
1730    ## 3. New value
1731    ## 4. Do we commit or not? Boolean, defaults to true
1732    ## Returns: undef
1733
1734    my ($self, $dbname, $pkeyvalue, $newvalue, $commit) = @_;
1735
1736    $commit = 1 if ! defined $commit;
1737
1738    my $dbh = $gdbh{$dbname} or die "No such database: $dbname";
1739
1740    ## Loop through each table we know about
1741    for my $table (sort keys %tabletype) {
1742
1743        ## Look up the actual value to use
1744        my $type = $tabletype{$table};
1745        my $value = $val{$type}{$pkeyvalue};
1746
1747        ## Prepare it if we have not already
1748        if (! exists $gsth{$dbh}{update}{inty}{$table}) {
1749
1750            ## Handle odd pkeys
1751            my $pkey = $table =~ /test5/ ? q{"id space"} : 'id';
1752
1753            my $SQL = qq{UPDATE "$table" SET inty=? WHERE $pkey = ?};
1754            $gsth{$dbh}{update}{inty}{$table} = $dbh->prepare($SQL);
1755
1756            if ('BYTEA' eq $type) {
1757                $gsth{$dbh}{update}{inty}{$table}->bind_param(2, undef, {pg_type => PG_BYTEA});
1758            }
1759
1760        }
1761
1762        ## Execute!
1763        $gsth{$dbh}{update}{inty}{$table}->execute($newvalue,$value);
1764
1765    }
1766
1767    $dbh->commit() if $commit;
1768
1769    return undef;
1770
1771} ## end of update_row_in_database
1772
1773
1774sub remove_row_from_database {
1775
1776    ## Delete a row from each table in one of the databases
1777    ## Arguments: three
1778    ## 1. Database name to use
1779    ## 2. Value to use (lookup, not the direct value). Can be an arrayref.
1780    ## 3. Do we commit or not? Boolean, defaults to true
1781    ## Returns: undef
1782
1783    my ($self, $dbname, $val, $commit) = @_;
1784
1785    $commit = 1 if ! defined $commit;
1786
1787    my $dbh = $gdbh{$dbname} or die "No such database: $dbname";
1788
1789    ## Loop through each table we know about
1790    for my $table (sort keys %tabletype) {
1791
1792        ## Prepare it if we have not already
1793        if (! exists $gsth{$dbh}{delete}{$table}) {
1794
1795            ## Delete, based on the inty
1796            my $SQL = qq{DELETE FROM "$table" WHERE inty = ?};
1797            $gsth{$dbh}{delete}{$table} = $dbh->prepare($SQL);
1798
1799        }
1800
1801        ## Execute it.
1802        if (ref $val) {
1803            for (@$val) {
1804                $gsth{$dbh}{delete}{$table}->execute($_);
1805            }
1806        }
1807        else {
1808            $gsth{$dbh}{delete}{$table}->execute($val);
1809        }
1810
1811    }
1812
1813    $dbh->commit() if $commit;
1814
1815    return undef;
1816
1817} ## end of remove_row_from_database
1818
1819
1820sub truncate_all_tables {
1821
1822    ## Truncate all the tables
1823    ## Arguments: two
1824    ## 1. Database to use
1825    ## 3. Do we commit or not? Boolean, defaults to true
1826    ## Returns: undef
1827
1828    my ($self, $dbname, $commit) = @_;
1829
1830    $commit = 1 if ! defined $commit;
1831
1832    my $dbh = $gdbh{$dbname} or die "No such database: $dbname";
1833
1834    ## Loop through each table we know about
1835    for my $table (sort keys %tabletype) {
1836        $dbh->do(qq{TRUNCATE Table "$table"});
1837    }
1838
1839    $dbh->commit() if $commit;
1840
1841    return undef;
1842
1843} ## end of truncate_all_tables
1844
1845
1846sub delete_all_tables {
1847
1848    ## Delete all the tables.
1849    ## Mostly for old versions that do not support truncate triggers.
1850    ## Arguments: two
1851    ## 1. Database to use
1852    ## 3. Do we commit or not? Boolean, defaults to true
1853    ## Returns: undef
1854
1855    my ($self, $dbname, $commit) = @_;
1856
1857    $commit = 1 if ! defined $commit;
1858
1859    my $dbh = $gdbh{$dbname} or die "No such database: $dbname";
1860
1861    ## Loop through each table we know about
1862    for my $table (sort keys %tabletype) {
1863        $dbh->do(qq{DELETE FROM "$table"});
1864    }
1865
1866    $dbh->commit() if $commit;
1867
1868    return undef;
1869
1870} ## end of delete_all_tables
1871
1872
1873sub check_for_row {
1874
1875    ## Check that a given row is on the database as expected: checks the inty column only
1876    ## Arguments: two or three or four
1877    ## 1. The result we are expecting, as an arrayref
1878    ## 2. A list of database names (should be inside gdbh)
1879    ## 3. Optional text to append to output message
1880    ## 4. Optional tables to limit checking to
1881    ## Returns: undef
1882
1883    my ($self, $res, $dblist, $text, $filter) = @_;
1884
1885    ## Get largest tablename
1886    my $maxtable = 1;
1887    for my $table (keys %tabletype) {
1888        ## Allow skipping tables
1889        if (defined $filter) {
1890            my $f = $filter;
1891            if ($f =~ s/^\!//) {
1892                if ($table =~ /$f$/) {
1893                    delete $tabletype{$table};
1894                    next;
1895                }
1896            }
1897            else {
1898                if ($table !~ /$f$/) {
1899                    delete $tabletype{$table};
1900                    next;
1901                }
1902            }
1903        }
1904        $maxtable = length $table if length $table > $maxtable;
1905    }
1906
1907    for my $dbname (@$dblist) {
1908
1909        if (! $gdbh{$dbname}) {
1910            $gdbh{$dbname} = $self->connect_database($dbname,$BucardoTesting::dbname);
1911        }
1912
1913        my $dbh = $gdbh{$dbname};
1914
1915        my $maxdbtable = $maxtable + 1 + length $dbname;
1916
1917        for my $table (sort keys %tabletype) {
1918
1919            ## Handle odd pkeys
1920            my $pkey = $table =~ /test5/ ? q{"id space"} : 'id';
1921
1922            my $type = $tabletype{$table};
1923            my $t = sprintf qq{%-*s copy ok (%s)},
1924                $maxdbtable,
1925                "$dbname.$table",
1926                    $type;
1927
1928            ## Change the message if no rows
1929            if (ref $res eq 'ARRAY' and ! defined $res->[0]) {
1930                $t = sprintf qq{No rows as expected in %-*s for pkey type %s},
1931                    $maxdbtable,
1932                    "$dbname.$table",
1933                    $type;
1934            }
1935
1936            if (defined $text and length $text) {
1937                $t .= " $text";
1938            }
1939
1940            my $SQL = qq{SELECT inty FROM "$table" ORDER BY inty};
1941            $table =~ /X/ and $SQL =~ s/inty/$pkey/;
1942
1943            local $Test::Builder::Level = $Test::Builder::Level + 1;
1944            my $result = bc_deeply($res, $dbh, $SQL, $t, (caller)[2]);
1945            $dbh->commit();
1946            if (!$result) {
1947                my $line = (caller)[2];
1948                Test::More::BAIL_OUT("Stopping on a failed 'check_for_row' test from line $line");
1949            }
1950        }
1951    }
1952
1953    return;
1954
1955} ## end of check_for_row
1956
1957
1958sub check_sequences_same {
1959
1960    ## Check that sequences are the same across all databases
1961    ## Arguments: one
1962    ## 1. A list of database names (should be inside gdbh)
1963    ## Returns: undef
1964
1965    my ($self, $dblist) = @_;
1966
1967    for my $seq (sort keys %sequences) {
1968
1969        $SQL = qq{SELECT * FROM "$seq"};
1970
1971        ## The first we come across will be the standard for the others
1972        my (%firstone, $firstdb);
1973
1974        ## Store failure messages
1975        my @msg;
1976
1977        for my $dbname (@$dblist) {
1978
1979            my $dbh = $gdbh{$dbname} or die "Invalid database name: $dbname";
1980
1981            my $sth = $dbh->prepare($SQL);
1982            $sth->execute();
1983            my $info = $sth->fetchall_arrayref({})->[0];
1984
1985            if (! defined $firstone{$seq}) {
1986                $firstone{$seq} = $info;
1987                $firstdb = $dbname;
1988                next;
1989            }
1990
1991            ## Compare certain items
1992            for my $item (qw/ last_value start_value increment_by min_value max_value is_cycled is_called/) {
1993                my ($uno,$dos) = ($firstone{$seq}->{$item}, $info->{$item});
1994                next if ! defined $uno or ! defined $dos;
1995                if ($uno ne $dos) {
1996                    push @msg, "$item is different on $firstdb vs $dbname: $uno vs $dos";
1997                }
1998            }
1999
2000        } ## end each sequence
2001
2002        if (@msg) {
2003            Test::More::fail("Sequence $seq NOT the same");
2004            for (@msg) {
2005                diag($_);
2006            }
2007        }
2008        else {
2009            Test::More::pass("Sequence $seq is the same across all databases");
2010        }
2011
2012    } ## end each database
2013
2014
2015    return;
2016
2017
2018} ## end of check_sequences_same
2019
2020
2021
2022
2023## Hack to override some Test::More methods
2024## no critic
2025
2026sub is_deeply {
2027
2028    t($_[2],$_[3] || (caller)[2]);
2029    local $Test::Builder::Level = $Test::Builder::Level + 1;
2030    my $rv = Test::More::is_deeply($_[0],$_[1],$testmsg);
2031    return $rv if $rv;
2032    if ($bail_on_error and ++$total_errors => $bail_on_error) {
2033        my $line = (caller)[2];
2034        my $time = time;
2035        diag("GOT: ".Dumper $_[0]);
2036        diag("EXPECTED: ".Dumper $_[1]);
2037        Test::More::BAIL_OUT("Stopping on a failed 'is_deeply' test from line $line. Time: $time");
2038    }
2039} ## end of is_deeply
2040sub like($$;$) {
2041    t($_[2],(caller)[2]);
2042    local $Test::Builder::Level = $Test::Builder::Level + 1;
2043    my $rv = Test::More::like($_[0],$_[1],$testmsg);
2044    return $rv if $rv;
2045    if ($bail_on_error and ++$total_errors => $bail_on_error) {
2046        my $line = (caller)[2];
2047        my $time = time;
2048#        Test::More::diag("GOT: ".Dumper $_[0]);
2049#        Test::More::diag("EXPECTED: ".Dumper $_[1]);
2050        Test::More::BAIL_OUT("Stopping on a failed 'like' test from line $line. Time: $time");
2051    }
2052} ## end of like
2053sub pass(;$) {
2054    t($_[0],$_[1]||(caller)[2]);
2055    local $Test::Builder::Level = $Test::Builder::Level + 1;
2056    Test::More::pass($testmsg);
2057} ## end of pass
2058sub is($$;$) {
2059    t($_[2],(caller)[2]);
2060    local $Test::Builder::Level = $Test::Builder::Level + 1;
2061    my $rv = Test::More::is($_[0],$_[1],$testmsg);
2062    return $rv if $rv;
2063    ## Where exactly did this fail?
2064    my $char = 0;
2065    my $onelen = length $_[0];
2066    my $twolen = length $_[1];
2067    my $line = 1;
2068    my $lchar = 1;
2069    for ($char = 0; $char < $onelen and $char < $twolen; $char++) {
2070        my $one = ord(substr($_[0],$char,1));
2071        my $two = ord(substr($_[1],$char,1));
2072        if ($one != $two) {
2073            diag("First difference at character $char ($one vs $two) (line $line, char $lchar)");
2074            last;
2075        }
2076        if (10 == $one) {
2077            $line++;
2078            $lchar = 1;
2079        }
2080        else {
2081            $lchar++;
2082        }
2083    }
2084    if ($bail_on_error and ++$total_errors => $bail_on_error) {
2085        my $line = (caller)[2];
2086        my $time = time;
2087        Test::More::BAIL_OUT("Stopping on a failed 'is' test from line $line. Time: $time");
2088    }
2089} ## end of is
2090sub isa_ok($$;$) {
2091    t("Object isa $_[1]",(caller)[2]);
2092    my ($name, $type, $msg) = ($_[0],$_[1]);
2093    local $Test::Builder::Level = $Test::Builder::Level + 1;
2094    if (ref $name and ref $name eq $type) {
2095        Test::More::pass($testmsg);
2096        return;
2097    }
2098    if ($bail_on_error and ++$total_errors => $bail_on_error) {
2099        Test::More::BAIL_OUT("Stopping on a failed test");
2100    }
2101} ## end of isa_ok
2102sub ok($;$) {
2103    t($_[1]||$testmsg);
2104    local $Test::Builder::Level = $Test::Builder::Level + 1;
2105    my $rv = Test::More::ok($_[0],$testmsg);
2106    return $rv if $rv;
2107    if ($bail_on_error and ++$total_errors => $bail_on_error) {
2108        my $line = (caller)[2];
2109        my $time = time;
2110        Test::More::BAIL_OUT("Stopping on a failed 'ok' test from line $line. Time: $time");
2111    }
2112} ## end of ok
2113
2114## use critic
2115
2116
21171;
2118