1#!/usr/bin/env perl 2# -*-mode:cperl; indent-tabs-mode: nil-*- 3 4package BucardoTesting; 5 6## Helper module for the Bucardo tests 7## Contains shared code for setup and breakdown 8 9use strict; 10use warnings; 11use utf8; 12 13use Encode qw/ decode /; 14use Encode::Locale; 15use DBI; 16use DBD::Pg; 17use Time::HiRes qw/sleep gettimeofday tv_interval/; 18use Cwd; 19use Data::Dumper; 20use Symbol; 21require Test::More; 22 23use vars qw/$SQL $sth $count $COM %dbh/; 24 25my $DEBUG = $ENV{BUCARDO_DEBUG} || 0; 26 27$ENV{BUCARDO_CONFIRM} = 0 if exists $ENV{BUCARDO_CONFIRM}; 28 29use base 'Exporter'; 30our @EXPORT = qw/%tabletype %tabletypemysql %tabletypemariadb %tabletypeoracle %tabletypesqlite %tabletypefirebird 31 %sequences %val 32 compare_tables bc_deeply clear_notices wait_for_notice 33 $location $oldherd_msg $newherd_msg $addtable_msg $deltable_msg $nomatch_msg/; 34 35## Special global vars for munging the data 36my (%gsth, %gdbh); 37 38my $dbname = 'bucardo_test'; 39 40## We need to use the local Bucardo.pm, not a system installed one! 41$ENV{PERL5LIB} = '.'; 42 43## Shortcuts for ease of changes and smaller text: 44our $addtable_msg = 'Added the following tables or sequences'; 45our $deltable_msg = 'Removed the following tables'; 46our $nomatch_msg = 'Did not find matches for the following terms'; 47our $oldherd_msg = 'The following tables or sequences are now part of the relgroup'; 48our $newherd_msg = 'The following tables or sequences are now part of the relgroup'; 49 50our $location = 'setup'; 51my $testmsg = ' ?'; 52my $testline = '?'; 53## Sometimes, we want to stop as soon as we see an error 54my $bail_on_error = $ENV{BUCARDO_TESTBAIL} || 0; 55my $total_errors = 0; 56## Used by the tt sub 57my %timing; 58 59## If true, turns off the epoch "time" output at the end of each testing output line 60my $notime = 1; 61 62my $user = qx{whoami}; 63chomp $user; 64 65my $FRESHLOG = 1; 66if ($FRESHLOG) { 67 unlink 'tmp/bucardo.log'; 68} 69 70my $piddir = 'pid'; 71if (! -e $piddir) { 72 mkdir $piddir; 73} 74 75if ($ENV{BUCARDO_LOG_ERROR_CONTEXT}) { 76 no strict 'refs'; 77 no warnings qw/prototype redefine/; 78 my ($package) = caller(); 79 80 # wrap these routines 81 for my $subname ( qw(ok is like) ) { 82 83 my $glob = qualify_to_ref($subname,$package); 84 85 if (my $sub = *$glob{CODE}) { 86 *$glob = sub { 87 # get result; this is not a general wrapper, since most of 88 # the testing ignores return values here, we aren't worried 89 # about wantarray, etc; we need the return value to decide 90 # if we're going to output a bunch of additional debugging 91 # information. 92 my $res = $sub->( @_ ); 93 if (!$res) { 94 _log_context("@_"); 95 } 96 $res; 97 } 98 } 99 } 100} 101 102## Test databases are labelled as A, B, C, etc. 103my @dbs = qw/A B C D E/; 104 105### TODO: Add point type (which has no natural ordering operator!) 106 107our %tabletype = 108 ( 109 'bucardo_test1' => 'SMALLINT', 110 'bucardo_test2' => 'INT', 111 'Bucardo_test3' => 'BIGINT', 112 'bucardo_test4' => 'TEXT', 113 'bucardo_test5' => 'DATE', 114 'bucardo_test6' => 'TIMESTAMP', 115 'bucardo_test7' => 'NUMERIC', 116 'bucardo_test8' => 'BYTEA', 117 'bucardo_test9' => 'int_unsigned', 118 'bucardo_test10' => 'TIMESTAMPTZ', 119 'bucardo space test' => 'INT', 120 ); 121 122our %tabletypemysql = 123 ( 124 'bucardo_test1' => 'SMALLINT', 125 'bucardo_test2' => 'INT', 126 'Bucardo_test3' => 'BIGINT', 127 'bucardo_test4' => 'VARCHAR(700)', 128 'bucardo_test5' => 'DATE', 129 'bucardo_test6' => 'DATETIME', 130 'bucardo_test7' => 'NUMERIC(5,1)', 131 'bucardo_test8' => 'VARBINARY(1000)', 132 'bucardo_test9' => 'INTEGER UNSIGNED', 133 'bucardo_test10' => 'DATETIME', 134 'bucardo space test' => 'INT', 135 ); 136 137our %tabletypemariadb = 138 ( 139 'bucardo_test1' => 'SMALLINT', 140 'bucardo_test2' => 'INT', 141 'Bucardo_test3' => 'BIGINT', 142 'bucardo_test4' => 'VARCHAR(700)', 143 'bucardo_test5' => 'DATE', 144 'bucardo_test6' => 'DATETIME', 145 'bucardo_test7' => 'NUMERIC(5,1)', 146 'bucardo_test8' => 'VARBINARY(1000)', 147 'bucardo_test9' => 'INTEGER UNSIGNED', 148 'bucardo_test10' => 'DATETIME', 149 'bucardo space test' => 'INT', 150 ); 151 152our %tabletypefirebird = 153 ( 154 'bucardo_test1' => 'SMALLINT', 155 'bucardo_test2' => 'INT', 156 'Bucardo_test3' => 'BIGINT', 157 'bucardo_test4' => 'VARCHAR(700)', 158 'bucardo_test5' => 'DATE', 159 'bucardo_test6' => 'DATETIME', 160 'bucardo_test7' => 'NUMERIC(5,1)', 161 'bucardo_test8' => 'VARBINARY(1000)', 162 'bucardo_test9' => 'INTEGER UNSIGNED', 163 'bucardo_test10' => 'TIMESTAMP', 164 'bucardo space test' => 'INT', 165 ); 166 167our %tabletypeoracle = 168 ( 169 'bucardo_test1' => 'SMALLINT', 170 'bucardo_test2' => 'INT', 171 'Bucardo_test3' => 'BIGINT', 172 'bucardo_test4' => 'NVARCHAR2(1000)', 173 'bucardo_test5' => 'DATE', 174 'bucardo_test6' => 'TIMESTAMP', 175 'bucardo_test7' => 'NUMERIC(5,1)', 176 'bucardo_test8' => 'BLOB', 177 'bucardo_test9' => 'INTEGER', 178 'bucardo_test10' => 'TIMESTAMP WITH TIME ZONE', 179 'bucardo space test' => 'INT', 180 ); 181 182our %tabletypesqlite = 183 ( 184 'bucardo_test1' => 'SMALLINT', 185 'bucardo_test2' => 'INT', 186 'Bucardo_test3' => 'BIGINT', 187 'bucardo_test4' => 'VARCHAR(1000)', 188 'bucardo_test5' => 'DATE', 189 'bucardo_test6' => 'DATETIME', 190 'bucardo_test7' => 'NUMERIC(5,1)', 191 'bucardo_test8' => 'VARBINARY(1000)', 192 'bucardo_test9' => 'INTEGER UNSIGNED', 193 'bucardo_test10' => 'DATETIME', 194 'bucardo space test' => 'INT', 195 ); 196 197 198our @tables2empty = (qw/droptest_bucardo/); 199 200our %sequences = 201 ( 202 'bucardo_test_seq1' => '', 203 'bucardo_test_seq2' => '', 204 'Bucardo_test_seq3' => '', 205 ); 206 207my %debug = ( 208 recreatedb => 0, 209 recreateschema => 1, 210 recreateuser => 0, 211 ); 212 213my $DEBUGDIR = "."; 214-e $DEBUGDIR or mkdir $DEBUGDIR; 215 216## To avoid stepping on other instance's toes 217my $PIDDIR = "/tmp/bucardo_testing_$ENV{USER}"; 218mkdir $PIDDIR if ! -e $PIDDIR; 219 220## Let pg_config guide us to a likely initdb/pg_ctl location 221my $output = qx{pg_config --bindir}; 222chomp $output; 223my $bindir = $output =~ m{^/} ? $1 : ''; 224 225## Location of files 226my $initdb = $ENV{PGBINDIR} ? "$ENV{PGBINDIR}/initdb" : $bindir ? "$bindir/initdb" : 'initdb'; 227my $pg_ctl = $ENV{PGBINDIR} ? "$ENV{PGBINDIR}/pg_ctl" : $bindir ? "$bindir/pg_ctl" : 'pg_ctl'; 228 229## Get the default initdb location 230my $pgversion = qx{$initdb -V}; 231my ($pg_ver, $pg_major_version, $pg_minor_version, $pg_point_version); 232if (defined $pgversion and $pgversion =~ /initdb \(PostgreSQL\) (\d+\..*)/) { 233 $pg_ver = $1; 234 ($pg_major_version, $pg_minor_version, $pg_point_version) = split /\./, $pg_ver; 235 $pg_minor_version =~ s/(\d+).+/$1/; 236} 237else { 238 die qq{Could not determine initdb version information from running "$initdb -V"\n}; 239} 240 241sub pg_major_version { join '.', $pg_major_version, $pg_minor_version } 242 243## Each database can also have a custom version 244## We do this by setting PGBINDIR[A-Z] 245## This allows us to test (for example) a 8.1 master and an 8.4 slave 246my %pgver; 247my %clusterinfo; 248my $lport = 58920; 249for my $name ('A'..'Z') { 250 $lport++; 251 $clusterinfo{$name}{port} = $lport; 252 253 my $lbindir = $ENV{PGBINDIR} || ''; 254 my $linitdb = $initdb; 255 my $lpgctl = $pg_ctl; 256 my $localver = $pg_ver; 257 my ($lmaj,$lmin,$lrev) = ($pg_major_version, $pg_minor_version, $pg_point_version); 258 if (exists $ENV{"PGBINDIR$name"}) { 259 $lbindir = $ENV{"PGBINDIR$name"}; 260 -d $lbindir or die qq{Invalid ENV "PGBINDIR$name"\n}; 261 $linitdb = "$lbindir/initdb"; 262 $lpgctl = "$lbindir/pg_ctl"; 263 264 $COM = "$linitdb -V"; 265 my $answer = qx{$COM}; 266 die "Cannot find version from: $COM" if $answer !~ /initdb \(PostgreSQL\) (\d+\..*)/; 267 $localver = $1; 268 ($lmaj,$lmin,$lrev) = split /\./, $localver; 269 $lmin =~ s/(\d+).+/$1/; 270 } 271 $pgver{$name} = { 272 bindir => $lbindir, 273 initdb => $linitdb, 274 pgctl => $lpgctl, 275 version => $localver, 276 ver => "$lmaj.$lmin", 277 vmaj => $lmaj, 278 vmin => $lmin, 279 vrev => $lrev, 280 dirname => "bucardo_test_database_${name}_$lmaj.$lmin", 281 port => $lport, 282 }; 283} 284 285# Set a semi-unique name to make killing old tests easier 286my $xname = "bctest_$ENV{USER}"; 287 288## Maximum time to wait for bucardo to return 289my $ALARM_BUCARDO = 25; 290## Maximum time to wait for a kid to appear via pg_listener 291my $ALARM_WAIT4KID = 3; 292## How long to wait for most syncs to take effect? 293my $TIMEOUT_SYNCWAIT = 3; 294## How long to sleep between checks for sync being done? 295my $TIMEOUT_SLEEP = 0.1; 296## How long to wait for a notice to be issued? 297my $TIMEOUT_NOTICE = 4; 298 299## Bail if the bucardo file does not exist / does not compile 300for my $file (qw/bucardo Bucardo.pm/) { 301 if (! -e $file) { 302 die "Cannot run without file $file\n"; 303 } 304 eval { 305 $ENV{BUCARDO_TEST} = 1; 306 require $file; 307 $ENV{BUCARDO_TEST} = 0; 308 }; 309 if ($@) { 310 die "Cannot run unless $file compiles cleanly: $@\n"; 311 } 312} 313 314## Prepare some test values for easy use 315## The secondary names are for other databases, e.g. MySQL 316our %val; 317my $xvalmax = 30; 318for (1..$xvalmax) { 319 $val{SMALLINT}{$_} = $_; 320 $val{INT}{$_} = 1234567+$_; 321 $val{BIGINT}{$_} = 7777777777 + $_; 322 $val{TEXT}{$_} = $val{'VARCHAR(1000)'}{$_} = $val{'VARCHAR(700)'}{$_} = "\\Pbc'$_"; 323 $val{DATE}{$_} = sprintf '2001-10-%02d', $_; 324 $val{TIMESTAMP}{$_} = $val{DATE}{$_} . ' 12:34:56'; 325 $val{NUMERIC}{$_} = $val{'NUMERIC(5,1)'}{$_} = 0.7 + $_; 326 $val{BYTEA}{$_} = "$_\0Z"; 327 $val{int_unsigned}{$_} = $val{'INTEGER UNSIGNED'}{$_} = 5000 + $_; 328 $val{TIMESTAMPTZ}{$_} = $val{DATETIME}{$_} = $val{DATE}{$_} . ' 11:22:33+00'; 329 $val{DATETIME}{$_} =~ s/\+00//; 330 $val{TIMESTAMPTZNOZERO} = $val{DATE}{$_} . ' 11:22:33'; 331} 332 333 334sub diag { 335 Test::More::diag(@_); 336} 337 338 339sub new { 340 341 ## Create a new BucardoTesting object. 342 ## Arguments: 343 ## 1. Hashref of options (optional) 344 ## Returns: reference to a new BucardoTesting object 345 346 my $class = shift; 347 my $arg = shift || {}; 348 my $self = {}; 349 bless $self, $class; 350 351 if ($arg->{notime}) { 352 $notime = 1; 353 } 354 355 ## Make a note of which file invoked us for later debugging 356 $self->{file} = (caller)[1]; 357 358 ## Bail on first error? Default is ENV, then false. 359 $bail_on_error = exists $arg->{bail} ? $arg->{bail} : $ENV{BUCARDO_TESTBAIL} || 0; 360 361 ## Name of the test schema 362 $self->{schema} = 'bucardo_schema'; 363 364 ## Let's find out where bucardo is. Prefer the blib ones, which are shebang adjusted 365 if (-e 'blib/script/bucardo') { 366 $self->{bucardo} = 'blib/script/bucardo'; 367 } 368 elsif (-e '../blib/script/bucardo') { 369 $self->{bucardo} = '../blib/script/bucardo'; 370 } 371 elsif (-e './bucardo') { 372 $self->{bucardo} = './bucardo'; 373 } 374 elsif (-e '../bucardo') { 375 $self->{bucardo} = '../bucardo'; 376 } 377 else { 378 die qq{Could not find bucardo\n}; 379 } 380 381 ## Handle both old and new way of setting location 382 if ($location eq 'setup' and $arg->{location}) { 383 $location = $self->{location} = $arg->{location}; 384 } 385 386 387 return $self; 388 389} ## end of new 390 391 392sub debug { 393 394 ## Simply internal debugging routine, prints a message if $DEBUG is set 395 ## Arguments: 396 ## 1. Message to print 397 ## 2. Optional level, defaults to 0 398 ## Returns: nothing 399 400 $DEBUG or return; 401 402 my $msg = shift || 'No message?!'; 403 my $level = shift || 0; 404 405 return if $DEBUG < $level; 406 407 chomp $msg; 408 warn "DEBUG: $msg\n"; 409 410 return; 411 412} ## end of debug 413 414 415sub empty_cluster { 416 417 ## Empty out a cluster's databases 418 ## Creates the cluster and 'bucardo_test' database as needed 419 ## For existing databases, removes all known schemas 420 ## Always recreates the public schema 421 ## Arguments: one 422 ## 1. Name of the cluster 423 ## Returns: arrayref of database handles to the 'bucardo_test*' databases 424 425 my $self = shift; 426 my $clustername = shift or die; 427 428 ## Create the cluster if needed 429 $self->create_cluster($clustername); 430 431 ## Start it up if needed 432 $self->start_cluster($clustername); 433 434 my $alldbh; 435 436 ## Get a handle to the postgres database 437 my $masterdbh = $self->connect_database($clustername, 'postgres'); 438 439 my $dbh; 440 if (database_exists($masterdbh, $dbname)) { 441 $dbh = $self->connect_database($clustername, $dbname); 442 ## Remove any of our known schemas 443 my @slist; 444 for my $sname (qw/ public bucardo freezer tschema /) { 445 push @slist => $sname if $self->drop_schema($dbh, $sname); 446 } 447 debug(qq{Schemas dropped from $dbname on $clustername: } . join ',' => @slist); 448 449 ## Recreate the public schema 450 $dbh->do("CREATE SCHEMA public"); 451 $dbh->commit(); 452 } 453 else { 454 local $masterdbh->{AutoCommit} = 1; 455 debug(qq{Creating database $dbname}); 456 $masterdbh->do("CREATE DATABASE $dbname"); 457 $dbh = $self->connect_database($clustername, $dbname); 458 } 459 460 $masterdbh->disconnect(); 461 462 return $dbh; 463 464} ## end of empty_cluster 465 466 467sub create_cluster { 468 469 ## Create a cluster if it does not already exist 470 ## Runs initdb, then modifies postgresql.conf 471 ## Arguments: 472 ## 1. Name of the cluster 473 ## Returns: nothing 474 475 my $self = shift; 476 my $clustername = shift or die; 477 478 my $line = (caller)[2]; 479 my $info = $pgver{$clustername} 480 or die qq{No such cluster as "$clustername" (called from line $line)\n}; 481 482 my $dirname = $info->{dirname}; 483 484 if (-d $dirname) { 485 ## Sometimes these test clusters get left in a broken state. 486 my $file = "$dirname/postgresql.conf"; 487 if (! -e $file) { 488 ## Just move it out of the way, rather than deleting it 489 rename $dirname, "$dirname.old"; 490 } 491 return; 492 } 493 494 my $localinitdb = $info->{initdb}; 495 496 debug(qq{Running $localinitdb for cluster "$clustername"}); 497 my $com = qq{$localinitdb -D $dirname 2>&1}; 498 debug($com); 499 my $res = qx{$com}; 500 die $res if $? != 0; 501 if ($DEBUG) { 502 warn Dumper $res; 503 } 504 505 ## Make some minor adjustments 506 my $connections = $clustername eq 'A' ? 150 : 75; 507 my $file = "$dirname/postgresql.conf"; 508 open my $fh, '>>', $file or die qq{Could not open "$file": $!\n}; 509 printf {$fh} " 510 511port = %d 512max_connections = $connections 513random_page_cost = 2.5 514log_statement = 'all' 515log_min_duration_statement = 0 516client_min_messages = WARNING 517log_line_prefix = '%s %s[%s] ' 518listen_addresses = '' 519 520", 521 $info->{port}, '%m', '%d', '%p'; 522 523 ## Make some per-version adjustments 524 if ($info->{ver} >= 8.3) { 525 print {$fh} "logging_collector = off\n"; 526 } 527 else { 528 print {$fh} "redirect_stderr = off\n"; 529 } 530 close $fh or die qq{Could not close "$file": $!\n}; 531 532 return; 533 534 535} ## end of create_cluster 536 537 538sub start_cluster { 539 540 ## Startup a cluster if not already running 541 ## Arguments: 542 ## 1. Name of the cluster 543 ## Returns: nothing 544 545 my $self = shift; 546 my $clustername = shift || 'A'; 547 548 ## Create the cluster if needed 549 $self->create_cluster($clustername); 550 551 my $info = $pgver{$clustername}; 552 553 my $dirname = $info->{dirname}; 554 555 ## Check the PID file. If it exists and is active, simply return 556 my $pidfile = "$dirname/postmaster.pid"; 557 if (-e $pidfile) { 558 open my $fh, '<', $pidfile or die qq{Could not open "$pidfile": $!\n}; 559 <$fh> =~ /(\d+)/ or die qq{No PID found in file "$pidfile"\n}; 560 my $pid = $1; 561 close $fh or die qq{Could not close "$pidfile": $!\n}; 562 ## An active process should respond to a "ping kill" 563 $count = kill 0 => $pid; 564 return if 1 == $count; 565 ## If no response, remove the pidfile ourselves and go on 566 debug(qq{Server seems to have died, removing file "$pidfile"}); 567 unlink $pidfile or die qq{Could not remove file "$pidfile"\n}; 568 } 569 570 my $port = $info->{port}; 571 debug(qq{Starting cluster "$clustername" on port $port}); 572 573 ## If not Windows, we'll use Unix sockets with a custom socket dir 574 my $option = ''; 575 if ($^O !~ /Win32/) { 576 my $sockdir = "$dirname/socket"; 577 -e $sockdir or mkdir $sockdir; 578 $option = q{-o '-k socket'}; 579 ## Older versions do not assume socket is right off of data dir 580 if ($info->{ver} <= 8.0) { 581 $option = qq{-o '-k $dirname/socket'}; 582 } 583 } 584 585 ## Attempt to start it up with a pg_ctl call 586 my $localpgctl = $info->{pgctl}; 587 588 $COM = qq{$localpgctl $option -l $dirname/pg.log -D $dirname start}; 589 debug(qq{Running: $COM}); 590 qx{$COM}; 591 592 ## Wait for the pidfile to appear 593 my $maxwaitseconds = 20; 594 my $loops = 0; 595 { 596 last if -e $pidfile; 597 sleep 0.1; 598 if ($loops++ > ($maxwaitseconds * 10)) { 599 Test::More::BAIL_OUT ( 'Failed to connect to database' ); 600 die "Failed to startup cluster $clustername, command was $COM\n"; 601 } 602 redo; 603 } 604 605 ## Keep attempting to get a database connection until we get one or timeout 606 $maxwaitseconds = 20; 607 608 my $dbhost = getcwd; 609 $dbhost .= "/$dirname/socket"; 610 611 ## Using the "invalidname" is a nice way to work around locale issues 612 my $dsn = "dbi:Pg:dbname=invalidname;port=$port;host=$dbhost"; 613 my $dbh; 614 615 debug(qq{Connecting as $dsn}); 616 617 $loops = 0; 618 LOOP: { 619 eval { 620 $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 }); 621 }; 622 last if $@ =~ /"invalidname"/; 623 sleep 0.1; 624 if ($loops++ > ($maxwaitseconds * 10)) { 625 die "Database did not come up: dsn was $dsn\n"; 626 } 627 redo; 628 } 629 630 return; 631 632} ## end of start_cluster 633 634 635sub connect_database { 636 637 ## Return a connection to a database within a cluster 638 ## Arguments: 639 ## 1. Name of the cluster 640 ## 2. Name of the database (optional, defaults to 'bucardo_test') 641 ## Returns: database handle 642 643 my $self = shift; 644 my $clustername = shift or die; 645 my $ldbname = shift || $dbname; 646 647 ## This may be one of the "extra" databases. In which case the true cluster must be revealed: 648 $clustername =~ s/\d+$//; 649 650 ## Create and start the cluster as needed 651 $self->start_cluster($clustername); 652 653 ## Build the DSN to connect with 654 my $info = $pgver{$clustername}; 655 my $dbport = $info->{port}; 656 my $dbhost = getcwd . "/$info->{dirname}/socket"; 657 my $dsn = "dbi:Pg:dbname=$ldbname;port=$dbport;host=$dbhost"; 658 659 ## If we already have a cached version and it responds, return it 660 if (exists $dbh{$dsn}) { 661 my $dbh = $dbh{$dsn}; 662 $dbh->ping and return $dbh; 663 ## No ping? Remove from the cache 664 $dbh->disconnect(); 665 delete $dbh{$dsn}; 666 } 667 668 my $dbh; 669 eval { 670 $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 }); 671 }; 672 if ($@) { 673 if ($ldbname eq 'postgres' and $@ =~ /"postgres"/) { 674 675 ## Probably an older version that uses template1 676 (my $localdsn = $dsn) =~ s/dbname=postgres/dbname=template1/; 677 678 ## Give up right away if we are already trying template1 679 die $@ if $localdsn eq $dsn; 680 681 debug(qq{Connection failed, trying to connect to template1 to create a postgres database}); 682 683 ## Connect as template1 and create a postgres database 684 $dbh = DBI->connect($localdsn, '', '', { AutoCommit=>1, RaiseError=>1, PrintError=>0 }); 685 $dbh->do('CREATE DATABASE postgres'); 686 $dbh->disconnect(); 687 688 ## Reconnect to our new database 689 $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 }); 690 } 691 else { 692 die "$@\n"; 693 } 694 } 695 696 $dbh->do(q{SET TIME ZONE 'UTC'}); 697 698 if ($DEBUG) { 699 my $file = 'bucardo.debug.dsns.txt'; 700 if (open my $fh, '>>', $file) { 701 print {$fh} "\n$dsn\n"; 702 my ($host,$port,$db); 703 $dsn =~ /port=(\d+)/ and $port=$1; 704 $dsn =~ /dbname=(.+?);/ and $db=$1; 705 $dsn =~ /host=(.+)/ and $host=$1; 706 printf {$fh} "psql%s%s%s\n", " -h $host", " -p $port", " $db"; 707 close $fh or die qq{Could not close file "$file": $!\n}; 708 } 709 } 710 711 $dbh->commit(); 712 713 return $dbh; 714 715} ## end of connect_database 716 717 718sub drop_schema { 719 720 ## Drop a schema if it exists 721 ## Two arguments: 722 ## 1. database handle 723 ## 2. name of the schema 724 ## Returns 1 if dropped, 0 if not 725 726 my ($self,$dbh,$sname) = @_; 727 728 return 0 if ! schema_exists($dbh, $sname); 729 730 local $dbh->{AutoCommit} = 1; 731 local $dbh->{Warn} = 0; 732 $dbh->do("DROP SCHEMA $sname CASCADE"); 733 734 return 1; 735 736} ## end of drop_schema 737 738 739sub repopulate_cluster { 740 741 ## Make sure a cluster is empty, then add in the sample data 742 ## Arguments: two 743 ## 1. Name of the cluster 744 ## 2. Optional - number of additional databases to create 745 ## Returns: database handle to the 'bucardo_test' database 746 747 my $self = shift; 748 my $clustername = shift or die; 749 my $extradbs = shift || 0; 750 751 Test::More::note("Recreating cluster $clustername"); 752 753 my $dbh = $self->empty_cluster($clustername); 754 $self->add_test_schema($dbh, $clustername); 755 756 ## Now recreate all the extra databases via templating 757 for my $number (1..$extradbs) { 758 my $dbname2 = "$dbname$number"; 759 local $dbh->{AutoCommit} = 1; 760 if (database_exists($dbh, $dbname2)) { 761 ## First, kill other sessions! 762 my $odbh = $self->connect_database($clustername, $dbname2); 763 eval { 764 $SQL = 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = ? AND pid <> pg_backend_pid()'; 765 $sth = $odbh->prepare($SQL); 766 $odbh->execute($dbname2); 767 $odbh->commit(); 768 }; 769 $odbh->disconnect(); 770 $dbh->do("DROP DATABASE $dbname2"); 771 } 772 $dbh->do("CREATE DATABASE $dbname2 TEMPLATE $dbname"); 773 } 774 775 ## Store our names away 776 $gdbh{$clustername} = $dbh; 777 778 return $dbh; 779 780} ## end of repopulate_cluster 781 782 783sub add_test_schema { 784 785 ## Add an empty test schema to a database 786 ## Arguments: two 787 ## 1. database handle (usually to 'bucardo_test') 788 ## 2. Cluster name 789 ## Returns: nothing 790 791 my $self = shift; 792 my $dbh = shift or die; 793 my $clustername = shift or die; 794 795 my ($tcount,$scount,$fcount) = (0,0,0); 796 797 ## Empty out or create the droptest table 798 if (table_exists($dbh => 'droptest_bucardo')) { 799 $dbh->do('TRUNCATE TABLE droptest_bucardo'); 800 } 801 else { 802 $tcount++; 803 $dbh->do(q{ 804 CREATE TABLE droptest_bucardo ( 805 name TEXT NOT NULL, 806 type TEXT NOT NULL 807 ) 808 }); 809 } 810 811 ## Create the language if needed 812 if (!language_exists($dbh => 'plpgsql')) { 813 debug(q{Creating language plpgsql}); 814 $dbh->do('CREATE LANGUAGE plpgsql'); 815 } 816 817 ## Create supporting functions as needed 818 if (!function_exists($dbh => 'trigger_test')) { 819 $fcount++; 820 $dbh->do(q{ 821 CREATE FUNCTION trigger_test() 822 RETURNS trigger 823 LANGUAGE plpgsql 824 AS $_$ BEGIN 825 INSERT INTO droptest_bucardo(name,type) 826 VALUES (TG_RELNAME, 'trigger'); 827 RETURN NULL; 828 END; 829 $_$ 830 }); 831 } 832 if (!function_exists($dbh => 'trigger_test_zero')) { 833 $fcount++; 834 $dbh->do(q{ 835 CREATE FUNCTION trigger_test_zero() 836 RETURNS trigger 837 LANGUAGE plpgsql 838 AS $_$ BEGIN 839 INSERT INTO droptest_bucardo(name,type) 840 VALUES (TG_RELNAME, 'trigger'); 841 RETURN NULL; 842 END; 843 $_$; 844 }); 845 } 846 847 ## Create our helper domain for pseudo-types 848 if (domain_exists($dbh => 'int_unsigned')) { 849 $dbh->do('DROP DOMAIN int_unsigned CASCADE'); 850 } 851 $dbh->do('CREATE DOMAIN int_unsigned INTEGER CHECK (value >= 0)'); 852 853 ## Create one table for each table type 854 for my $table (sort keys %tabletype) { 855 856 local $dbh->{Warn} = 0; 857 858 ## Does the table already exist? If so, drop it. 859 if (table_exists($dbh => $table)) { 860 $dbh->do(qq{DROP TABLE "$table"}); 861 } 862 863 my $pkeyname = $table =~ /test5/ ? q{"id space"} : 'id'; 864 my $pkindex = $table =~ /test2/ ? '' : 'PRIMARY KEY'; 865 $SQL = qq{ 866 CREATE TABLE "$table" ( 867 $pkeyname $tabletype{$table} NOT NULL $pkindex}; 868 $SQL .= $table =~ /X/ ? "\n)" : qq{, 869 data1 TEXT NULL, 870 inty SMALLINT NULL, 871 booly BOOLEAN NULL, 872 bite1 BYTEA NULL, 873 bite2 BYTEA NULL, 874 email TEXT NULL UNIQUE 875 ) 876 }; 877 878 $dbh->do($SQL); 879 $tcount++; 880 881 if ($table =~ /test2/) { 882 $dbh->do(qq{ALTER TABLE "$table" ADD CONSTRAINT multipk PRIMARY KEY ($pkeyname,data1)}); 883 } 884 885 ## Create a trigger to test trigger supression during syncs 886 $SQL = qq{ 887 CREATE TRIGGER "bctrig_$table" 888 AFTER INSERT OR UPDATE ON "$table" 889 FOR EACH ROW EXECUTE PROCEDURE trigger_test() 890 }; 891 $table =~ /0/ and ($SQL =~ s/trigger_test/trigger_test_zero/); 892 $dbh->do($SQL); 893 894 ## Create a rule to test rule supression during syncs 895 $SQL = qq{ 896 CREATE OR REPLACE RULE "bcrule_$table" 897 AS ON INSERT TO "$table" 898 DO ALSO INSERT INTO droptest_bucardo(name,type) VALUES ('$table','rule') 899 }; 900 $table =~ /0/ and $SQL =~ s/NEW.inty/0/; 901 $dbh->do($SQL); 902 } 903 904 ## Create the foreign key tables 905 #$dbh->do('CREATE TABLE bucardo_fkey1 (fkid INTEGER NOT NULL PRIMARY KEY, data2 TEXT)'); 906 $SQL = q{ 907ALTER TABLE bucardo_fkey1 908 ADD CONSTRAINT "bucardo_fkey1" 909 FOREIGN KEY (fkid) 910 REFERENCES bucardo_test1 (id) 911 ON DELETE CASCADE ON UPDATE CASCADE 912}; 913 #$dbh->do($SQL); 914 915 ## Create one sequence for each table type 916 for my $seq (sort keys %sequences) { 917 918 local $dbh->{Warn} = 0; 919 920 ## Does the sequence already exist? If so, drop it. 921 if (table_exists($dbh => $seq)) { 922 $dbh->do(qq{DROP SEQUENCE "$seq"}); 923 } 924 925 $SQL = qq{CREATE SEQUENCE "$seq"}; 926 $dbh->do($SQL); 927 $scount++; 928 } 929 930 debug("Test objects created for $clustername. Tables: $tcount Sequences: $scount Functions: $fcount"); 931# diag("Test objects created for $clustername. Tables: $tcount Sequences: $scount Functions: $fcount"); 932 933 $dbh->commit() if ! $dbh->{AutoCommit}; 934 935 return; 936 937} ## end of add_test_schema 938 939sub mock_serialization_failure { 940 my ($self, $dbh, $table) = @_; 941 return if $dbh->{pg_server_version} < 80401; 942 $table ||= 'bucardo_test1'; 943 944 # Mock a serialization failure on every other INSERT. Runs only when 945 # `session_replica_role` is "replica", which it true for Bucardo targets. 946 $dbh->do(qq{ 947 DROP SEQUENCE IF EXISTS serial_seq; 948 CREATE SEQUENCE serial_seq; 949 950 CREATE OR REPLACE FUNCTION mock_serial_fail( 951 ) RETURNS trigger LANGUAGE plpgsql AS \$_\$ 952 BEGIN 953 IF nextval('serial_seq') % 2 = 0 THEN RETURN NEW; END IF; 954 RAISE EXCEPTION 'Serialization error' 955 USING ERRCODE = 'serialization_failure'; 956 END; 957 \$_\$; 958 959 CREATE TRIGGER mock_serial_fail AFTER INSERT ON "$table" 960 FOR EACH ROW EXECUTE PROCEDURE mock_serial_fail(); 961 ALTER TABLE "$table" ENABLE REPLICA TRIGGER mock_serial_fail; 962 }); 963 $dbh->commit; 964 965 return 1; 966} ## end of mock_serialization_failure 967 968sub unmock_serialization_failure { 969 my ($self, $dbh, $table) = @_; 970 return if $dbh->{pg_server_version} < 80401; 971 $table ||= 'bucardo_test1'; 972 973 $dbh->do(qq{ 974 DROP TRIGGER IF EXISTS mock_serial_fail ON "$table"; 975 DROP FUNCTION IF EXISTS mock_serial_fail(); 976 DROP SEQUENCE IF EXISTS serial_seq; 977 }); 978 979 return 1; 980} ## end of unmock_serialization_failure 981 982sub add_test_databases { 983 984 ## Add one or more databases to the bucardo.db table 985 ## Arguments: 986 ## 1. White-space separated db names 987 ## Returns: nothing 988 989 my $self = shift; 990 my $string = shift or die; 991 992 for my $db (split /\s+/ => $string) { 993 my $ctlargs = $self->add_db_args($db); 994 my $i = $self->ctl("add database bucardo_test $ctlargs"); 995 die $i if $i =~ /ERROR/; 996 } 997 998 return; 999 1000} ## end of add_test_databases 1001 1002 1003sub add_db_args { 1004 1005 ## Arguments: 1006 ## 1. Name of a cluster 1007 ## Returns: DSN-like string to connect to that cluster 1008 ## Allows for "same" databases o the form X# e.g. A1, B1 1009 ## May return string or array depending on how it was called 1010 1011 my $self = shift; 1012 my $clustername = shift or die; 1013 1014 $clustername =~ s/\d+$//; 1015 1016 ## Build the DSN to connect with 1017 my $info = $pgver{$clustername}; 1018 my $dbport = $info->{port}; 1019 my $dbhost = getcwd . "/$info->{dirname}/socket"; 1020 my $dsn = "dbi:Pg:dbname=$dbname;port=$dbport;host=$dbhost"; 1021 1022 return wantarray 1023 ? ($user,$dbport,$dbhost) 1024 : "name=$dbname user=$user port=$dbport host=$dbhost"; 1025 1026} ## end of add_db_args 1027 1028 1029sub stop_bucardo { 1030 1031 ## Stops Bucardo via a bucardo request 1032 ## Arguments: none 1033 ## Returns: 1 1034 1035 my $self = shift; 1036 1037 $self->ctl('stop testing'); 1038 1039 sleep 0.2; 1040 1041 return 1; 1042 1043} ## end of stop_bucardo 1044 1045 1046sub ctl { 1047 1048 ## Run a simple non-forking command against bucardo 1049 ## Emulates a command-line invocation 1050 ## Arguments: 1051 ## 1. String to pass to bucardo 1052 ## 2. Database name to connect to. Used only when we're not confident the bucardo database exists already. 1053 ## Returns: answer as a string 1054 1055 my ($self,$args, $db) = @_; 1056 $db ||= 'bucardo'; 1057 1058 my $info; 1059 my $ctl = $self->{bucardo}; 1060 1061 ## Build the connection options 1062 my $bc = $self->{bcinfo}; 1063 my $connopts = ''; 1064 for my $arg (qw/host port pass/) { 1065 my $val = 'DB' . (uc $arg) . '_bucardo'; 1066 next unless exists $bc->{$val} and length $bc->{$val}; 1067 $connopts .= " --db$arg=$bc->{$val}"; 1068 } 1069 $connopts .= " --dbname=$db --log-dest ."; 1070 $connopts .= " --dbuser=$user"; 1071 ## Just hard-code these, no sense in multiple Bucardo base dbs yet: 1072 $connopts .= " --dbport=58921"; 1073 my $dbhost = getcwd; 1074 my $dirname = $pgver{A}{dirname}; 1075 $dbhost .= "/$dirname/socket"; 1076 $connopts .= " --dbhost=$dbhost"; 1077 $connopts .= " --no-bucardorc"; 1078 1079 ## Whitespace cleanup 1080 $args =~ s/^\s+//s; 1081 1082 ## Allow the caller to look better 1083 $args =~ s/^bucardo\s+//; 1084 1085 ## Set a timeout 1086 alarm 0; 1087 eval { 1088 local $SIG{ALRM} = sub { die "Alarum!\n"; }; 1089 alarm $ALARM_BUCARDO; 1090 debug("Script: $ctl Connection options: $connopts Args: $args", 3); 1091 $info = decode( locale => qx{$ctl $connopts $args 2>&1} ); 1092 debug("Exit value: $?", 3); 1093 die $info if $? != 0; 1094 alarm 0; 1095 }; 1096 1097 if ($@ =~ /Alarum/ or $info =~ /Alarum/) { 1098 return __PACKAGE__ . ' timeout hit, giving up'; 1099 } 1100 if ($@) { 1101 return "Error running bucardo: " . decode( locale => $@ ) . "\n"; 1102 } 1103 1104 debug("bucardo said: $info", 3); 1105 1106 return $info; 1107 1108} ## end of ctl 1109 1110 1111sub restart_bucardo { 1112 1113 ## Start Bucardo, but stop first if it is already running 1114 ## Arguments: one, two, or three 1115 ## 1. database handle to the bucardo_control_test db 1116 ## 2. The notice we wait for, defaults to: bucardo_started 1117 ## 3. The message to give to the "pass" function, defaults to: Bucardo was started 1118 ## Returns: nothing 1119 1120 my ($self,$dbh,$notice,$passmsg) = @_; 1121 1122 my $line = (caller)[2]; 1123 1124 $notice ||= 'bucardo_started'; 1125 $passmsg ||= "Bucardo was started (caller line $line)"; 1126 1127 $self->stop_bucardo(); 1128 1129 ## Because the stop signal arrives before the PID is removed, sleep a bit 1130 sleep 2; 1131 1132 pass("Starting up Bucardo (caller line $line)"); 1133 $dbh->do('LISTEN bucardo'); 1134 $dbh->do('LISTEN bucardo_boot'); 1135 $dbh->do("LISTEN $notice"); 1136 $dbh->do('LISTEN bucardo_nosyncs'); 1137 $dbh->commit(); 1138 1139 my $output = $self->ctl('start --exit-on-nosync --quickstart testing'); 1140 1141 my $bail = 50; 1142 my $n; 1143 WAITFORIT: { 1144 if ($bail--<0) { 1145 $output =~ s/^/# /gmx; 1146 my $time = localtime; 1147 die "Bucardo did not start, but we waited!\nTime: $time\nStart output:\n\n$output\n"; 1148 } 1149 while ($n = $dbh->func('pg_notifies')) { 1150 my ($name, $pid, $payload) = @$n; 1151 if ($dbh->{pg_server_version} >= 9999990000) { 1152 next if $name ne 'bucardo'; 1153 $name = $payload; 1154 } 1155 last WAITFORIT if $name eq $notice; 1156 } 1157 $dbh->commit(); 1158 sleep 0.2; 1159 redo; 1160 } 1161 pass($passmsg); 1162 1163 ## There is a race condition here for testing 1164 ## Bucardo starts up, and gives the notice above. 1165 ## However, after it does so, CTLs and KIDs start up and look for new rows 1166 ## If the caller of this function makes changes right away and then kicks, 1167 ## Bucardo may see them on the "startup kick" and thus the caller will 1168 ## get a "syncdone" message that was not initiated by *their* kick. 1169 ## One way around this is to make sure your caller immediately does a 1170 ## kick 0, which will flush out the startup kick. If it arrives after the 1171 ## startup kick, then it simply returns as a sync with no activity 1172 1173 return 1; 1174 1175} ## end of restart_bucardo 1176 1177sub setup_bucardo { 1178 1179 ## Installs bucardo via "bucardo install" into a database 1180 ## The database will be emptied out first if it already exists 1181 ## If it does not exist, it will be created 1182 ## If the cluster does not exist, it will be created 1183 ## Arguments: 1184 ## 1. Name of the cluster 1185 ## Returns: database handle to the bucardo database 1186 1187 my $self = shift; 1188 my $clustername = shift or die; 1189 1190 Test::More::note('Installing Bucardo'); 1191 1192 $self->create_cluster($clustername); 1193 my $dbh = $self->connect_database($clustername, 'postgres'); 1194 if (database_exists($dbh,'bucardo')) { 1195 my $retries = 5; 1196 my $pidcol = $dbh->{pg_server_version} >= 90200 ? 'pid' : 'procpid'; 1197 do { 1198 ## Kick off all other people 1199 $SQL = qq{SELECT $pidcol FROM pg_stat_activity WHERE datname = 'bucardo' and $pidcol <> pg_backend_pid()}; 1200 for my $row (@{$dbh->selectall_arrayref($SQL)}) { 1201 my $pid = $row->[0]; 1202 $SQL = 'SELECT pg_terminate_backend(?)'; 1203 $sth = $dbh->prepare($SQL); 1204 $sth->execute($pid); 1205 } 1206 $dbh->commit(); 1207 } while ($dbh->selectrow_array(qq{SELECT count(*) FROM pg_stat_activity WHERE datname = 'bucardo' and $pidcol <> pg_backend_pid()}))[0] && $retries--; 1208 debug(qq{Dropping database bucardo from cluster $clustername}); 1209 local $dbh->{AutoCommit} = 1; 1210 $dbh->do('DROP DATABASE bucardo'); 1211 } 1212 1213 ## Make sure we have a postgres role 1214 if (! user_exists($dbh, 'postgres')) { 1215 $dbh->do('CREATE USER postgres SUPERUSER'); 1216 $dbh->commit(); 1217 } 1218 1219 ## Now run the install. Timeout after a few seconds 1220 debug(qq{Running bucardo install on cluster $clustername}); 1221 my $info = $self->ctl('install --batch', 'postgres'); 1222 1223 if ($info !~ /Installation is now complete/) { 1224 die "Installation failed: $info\n"; 1225 } 1226 1227 ## Reconnect to the new database 1228 $dbh = $self->connect_database($clustername, 'bucardo'); 1229 1230 ## Make some adjustments 1231 $sth = $dbh->prepare('UPDATE bucardo.bucardo_config SET setting = $2 WHERE name = $1'); 1232 $count = $sth->execute('piddir' => $PIDDIR); 1233 $count = $sth->execute('reason_file' => "$PIDDIR/reason"); 1234 $count = $sth->execute('sendmail_file' => 'debug.sendmail.txt'); 1235 $count = $sth->execute('audit_pid' => 1); 1236 $dbh->commit(); 1237 1238 ## Adjust a second way 1239 $self->ctl('set log_level=debug log_microsecond=1 log_showline=1'); 1240 1241 debug(qq{Install complete}); 1242 1243 return $dbh; 1244 1245} ## end of setup_bucardo 1246 1247# utility sub called on test error to output pg and bucardo logs to a single 1248# output file with context; mainly useful for CI debugging/output 1249sub _log_context { 1250 return unless $ENV{BUCARDO_LOG_ERROR_CONTEXT}; 1251 1252 warn "Logging context for @_; dir=$ENV{PWD}\n"; 1253 system("echo '====================' >> log.context"); 1254 system("date >> log.context"); 1255 system(sprintf "echo '%s' >> log.context", quotemeta($_[0])) if $_[0]; 1256 system("tail -n 100 log.bucardo bucardo_test_database_*/pg.log 2>/dev/null >> log.context"); 1257} 1258 1259## Utility functions for object existences: 1260sub thing_exists { 1261 my ($dbh,$name,$table,$column) = @_; 1262 my $SQL = "SELECT 1 FROM $table WHERE $column = ?"; 1263 ## Only want tables from the public schema for now 1264 if ($table eq 'pg_class') { 1265 $SQL .= qq{ AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')}; 1266 } 1267 my $sth = $dbh->prepare($SQL); 1268 $count = $sth->execute($name); 1269 $sth->finish(); 1270 $dbh->commit() if ! $dbh->{AutoCommit}; 1271 return $count < 1 ? 0 : $count; 1272} 1273sub schema_exists { return thing_exists(@_, 'pg_namespace', 'nspname'); } 1274sub language_exists { return thing_exists(@_, 'pg_language', 'lanname'); } 1275sub database_exists { return thing_exists(@_, 'pg_database', 'datname'); } 1276sub user_exists { return thing_exists(@_, 'pg_user', 'usename'); } 1277sub table_exists { return thing_exists(@_, 'pg_class', 'relname'); } 1278sub function_exists { return thing_exists(@_, 'pg_proc', 'proname'); } 1279sub domain_exists { return thing_exists(@_, 'pg_type', 'typname'); } 1280 1281 1282sub wait_for_notice { 1283 1284 ## Wait until a named NOTIFY is issued 1285 ## Arguments: 1286 ## 1. The listen string or array of strings 1287 ## 2. Seconds until we give up 1288 ## 3. Seconds we sleep between checks 1289 ## 4. Boolean: bail out if not found (defaults to true) 1290 ## Returns true if the NOTIFY was recieved. 1291 1292 my $self = shift; 1293 my $dbh = shift; 1294 my $text = shift; 1295 my $timeout = shift || $TIMEOUT_NOTICE; 1296 my $sleep = shift || $TIMEOUT_SLEEP; 1297 my $bail = shift; 1298 $bail = 0 if !defined($bail); 1299 my $n; 1300 my %wait_for; 1301 for my $str (ref $text ? @{ $text } : $text) { 1302 $wait_for{$str}++; 1303 } 1304 1305 eval { 1306 local $SIG{ALRM} = sub { die "Lookout!\n"; }; 1307 alarm $timeout; 1308 N: { 1309 while ($n = $dbh->func('pg_notifies')) { 1310 my ($name, $pid, $payload) = @$n; 1311 $name = $payload if length $payload; 1312 if (exists $wait_for{$name}) { 1313 if (--$wait_for{$name} == 0) { 1314 delete $wait_for{$name}; 1315 last N unless %wait_for; 1316 } 1317 } 1318 else { 1319 debug("notice was $name", 1); 1320 } 1321 } 1322 sleep $sleep; 1323 redo; 1324 } 1325 alarm 0; 1326 }; 1327 if ($@) { 1328 if ($@ =~ /Lookout/o) { 1329 my $line = (caller)[2]; 1330 my $now = scalar localtime; 1331 my $texts = join '", "', keys %wait_for; 1332 my $pl = keys %wait_for > 1 ? 's' : ''; 1333 my $notice = qq{Gave up waiting for notice$pl "$texts": timed out at $timeout from line $line. Time=$now}; 1334 if ($bail) { 1335 Test::More::BAIL_OUT ($notice); 1336 } 1337 else { 1338 die $notice; 1339 } 1340 return; 1341 } 1342 } 1343 return 1; 1344 1345} ## end of wait_for_notice 1346 1347## Older methods: 1348 1349sub fresh_database { 1350 1351 ## Drop and create the bucardo_test database 1352 ## First arg is cluster name 1353 ## Second arg is hashref, can be 'dropdb' 1354 1355 my $self = shift; 1356 my $name = shift || 'A'; 1357 my $arg = shift || {}; 1358 1359 my $dirname = $pgver{$name}{dirname}; 1360 1361 ## Just in case 1362 -d $dirname or $self->create_cluster($name); 1363 -e "$dirname/postmaster.pid" or $self->start_cluster($name); 1364 1365 my $dbh = $self->connect_database($name, 'postgres'); 1366 1367 my $brandnew = 0; 1368 { 1369 if (database_exists($dbh => $dbname) and $arg->{dropdb}) { 1370 local $dbh->{AutoCommit} = 1; 1371 debug("Dropping database $dbname"); 1372 $dbh->do("DROP DATABASE $dbname"); 1373 } 1374 if (!database_exists($dbh => $dbname)) { 1375 local $dbh->{AutoCommit} = 1; 1376 debug("Creating database $dbname"); 1377 $dbh->do("CREATE DATABASE $dbname"); 1378 $brandnew = 1; 1379 $dbh->disconnect(); 1380 } 1381 } 1382 1383 $dbh = $self->connect_database($name, $dbname); 1384 1385 return $dbh if $brandnew; 1386 1387 $self->empty_test_database($dbh); 1388 1389 return $dbh; 1390 1391} ## end of fresh_database 1392 1393 1394 1395sub create_database { 1396 1397 ## Create a new database 1398 ## First argument is the cluster name 1399 ## Second argument is the name of the database 1400 ## If the database already exists, nothing will be done 1401 ## Returns a database handle to the database 1402 1403 my $self = shift; 1404 my $clustername = shift or die; 1405 my $dbname = shift or die; 1406 1407 my $dirname = $pgver{$clustername}{dirname}; 1408 1409 ## Create the cluster if needed 1410 -d $dirname or $self->create_cluster($clustername); 1411 1412 ## Start the cluster up if needed 1413 -e "$dirname/postmaster.pid" or $self->start_cluster($clustername); 1414 1415 ## Connect to the database 1416 1417 my $dbh = $self->connect_database($clustername, 'postgres'); 1418 1419 if (! database_exists($dbh => $dbname)) { 1420 local $dbh->{AutoCommit} = 1; 1421 debug("Creating database $dbname"); 1422 $dbh->do("CREATE DATABASE $dbname"); 1423 $dbh->disconnect(); 1424 } 1425 1426 $dbh = $self->connect_database($clustername, $dbname); 1427 1428 return $dbh; 1429 1430} ## end of create_database 1431 1432 1433sub empty_test_database { 1434 1435 ## Wipe all data tables from a test database 1436 ## Takes a database handle as only arg 1437 1438 my $self = shift; 1439 my $dbh = shift; 1440 1441 if ($dbh->{pg_server_version} >= 80300) { 1442 $dbh->do(q{SET session_replication_role = 'replica'}); 1443 } 1444 1445 for my $table (sort keys %tabletype) { 1446 $dbh->do(qq{TRUNCATE TABLE "$table"}); 1447 } 1448 1449 for my $table (@tables2empty) { 1450 $dbh->do(qq{TRUNCATE TABLE "$table"}); 1451 } 1452 1453 if ($dbh->{pg_server_version} >= 80300) { 1454 $dbh->do(q{SET session_replication_role = 'origin'}); 1455 } 1456 $dbh->commit; 1457 1458 return; 1459 1460} ## end of empty_test_database 1461 1462END { 1463# __PACKAGE__->shutdown_cluster($_) for keys %pgver; 1464} 1465 1466sub shutdown_cluster { 1467 1468 ## Shutdown a cluster if running 1469 ## Takes the cluster name 1470 1471 my $self = shift; 1472 my $name = shift; 1473 1474 my $dirname = $pgver{$name}{dirname}; 1475 1476 return if ! -d $dirname; 1477 1478 my $pidfile = "$dirname/postmaster.pid"; 1479 return if ! -e $pidfile; 1480 1481 Test::More::note("Stopping cluster $name"); 1482 my @cmd = ($pg_ctl, '-D', $dirname, '-s', '-m', 'fast', 'stop'); 1483 system(@cmd) == 0 or die "@cmd failed: $?\n"; 1484 1485 ## Hang around until the PID file is gone 1486 my $loops = 0; 1487 { 1488 sleep 0.2; 1489 last if ! -e $pidfile; 1490 redo; 1491 } 1492 1493 delete $gdbh{$name}; 1494 1495 return; 1496 1497} ## end of shutdown_cluster 1498 1499 1500sub remove_cluster { 1501 1502 ## Remove a cluster, shutting it down first 1503 ## Takes the cluster name 1504 1505 my $self = shift; 1506 my $name = shift; 1507 1508 my $dirname = $pgver{$name}{dirname}; 1509 1510 return if ! -d $dirname; 1511 1512 ## Just in case 1513 $self->shutdown_cluster($name); 1514 1515 system("rm -fr $dirname"); 1516 1517 return; 1518 1519} ## end of remove_cluster 1520 1521 1522 1523 1524 1525 1526 1527 1528sub tt { 1529 ## Simple timing routine. Call twice with the same arg, before and after 1530 my $name = shift or die qq{Need a name!\n}; 1531 if (exists $timing{$name}) { 1532 my $newtime = tv_interval($timing{$name}); 1533 debug("Timing for $name: $newtime"); 1534 delete $timing{$name}; 1535 } 1536 else { 1537 $timing{$name} = [gettimeofday]; 1538 } 1539 return; 1540} ## end of tt 1541 1542sub t { 1543 1544 $testmsg = shift || ''; 1545 $testline = shift || (caller)[2]; 1546 $testmsg =~ s/^\s+//; 1547 if ($location) { 1548 $testmsg = "($location) $testmsg"; 1549 } 1550 $testmsg .= " [line: $testline]"; 1551 my $time = time; 1552 $testmsg .= " [time: $time]" unless $notime; 1553 1554 return; 1555 1556} ## end of t 1557 1558 1559sub add_test_tables_to_herd { 1560 1561 ## Add all of the test tables (and sequences) to a herd 1562 ## Create the herd if it does not exist 1563 ## First arg is database name, second arg is the herdname 1564 1565 my $self = shift; 1566 my $db = shift; 1567 my $herd = shift; 1568 1569 my $result = $self->ctl("add herd $herd"); 1570 if ($result !~ /Added herd/) { 1571 die "Failed to add herd $herd: $result\n"; 1572 } 1573 1574 my $addstring = join ' ' => sort keys %tabletype; 1575 my $com = "add table $addstring db=$db herd=$herd"; 1576 $result = $self->ctl($com); 1577 if ($result !~ /Added table/) { 1578 die "Failed to add tables: $result (command was: $com)\n"; 1579 } 1580 1581 $addstring = join ' ' => sort keys %sequences; 1582 $com = "add sequence $addstring db=$db herd=$herd"; 1583 $result = $self->ctl($com); 1584 if ($result !~ /Added sequence/) { 1585 die "Failed to add sequences: $result (command was: $com)\n"; 1586 } 1587 1588 return; 1589 1590} ## end of add_test_tables_to_herd 1591 1592 1593sub bc_deeply { 1594 1595 my ($exp,$dbh,$sql,$msg,$oline) = @_; 1596 my $line = (caller)[2]; 1597 1598 local $Data::Dumper::Terse = 1; 1599 local $Data::Dumper::Indent = 0; 1600 1601 die "Very invalid statement from line $line: $sql\n" if $sql !~ /^\s*select/i; 1602 1603 my $got; 1604 eval { 1605 $got = $dbh->selectall_arrayref($sql); 1606 }; 1607 if ($@) { 1608 die "bc_deeply failed from line $line. SQL=$sql\n$@\n"; 1609 } 1610 1611 local $Test::Builder::Level = $Test::Builder::Level + 1; 1612 return is_deeply($got,$exp,$msg,$oline||(caller)[2]); 1613 1614} ## end of bc_deeply 1615 1616sub clear_notices { 1617 my $dbh = shift; 1618 my $timeout = shift || $TIMEOUT_NOTICE; 1619 sleep $timeout; 1620 0 while (my $n = $dbh->func('pg_notifies')); 1621} 1622 1623 1624sub get_pgctl_options { 1625 my $dirname = shift; 1626 my $option; 1627 if ($^O !~ /Win32/) { 1628 my $sockdir = "$dirname/socket"; 1629 -e $sockdir or mkdir $sockdir; 1630 $option = q{-o '-k socket'}; 1631 } 1632 return $option; 1633} 1634 1635sub remove_single_dir { 1636 1637 my $dirname = shift; 1638 print "Removing test database in $dirname\n"; 1639 # Try stopping PostgreSQL 1640 my $options = get_pgctl_options($dirname); 1641 qx{$pg_ctl $options -l $dirname/pg.log -D $dirname stop -m immediate}; 1642 sleep 2; 1643 qx{rm -rf $dirname}; 1644 return; 1645 1646} 1647 1648sub drop_database { 1649 1650 my ($self, $dir) = @_; 1651 if ($dir eq 'all') { 1652 ok(opendir(my $dh, '.'), 'Open current directory to clean up'); 1653 my @test_db_dirs = grep { -d $_ && /^bucardo_test_database/ } readdir $dh; 1654 close($dh); 1655 1656 for my $dirname (@test_db_dirs) { 1657 remove_single_dir($dirname); 1658 } 1659 } 1660 else { 1661 remove_single_dir($dir); 1662 } 1663 return; 1664} 1665 1666 1667sub add_row_to_database { 1668 1669 ## Add a row to each table in one of the databases 1670 ## Arguments: three 1671 ## 1. Database name to use 1672 ## 2. Value to use (lookup, not the direct value) 1673 ## 3. Do we commit or not? Boolean, defaults to true 1674 ## Returns: undef 1675 1676 my ($self, $dbname, $xval, $commit) = @_; 1677 1678 1679 if ($xval > $xvalmax) { 1680 die "Too high of an ID: max is $xvalmax\n"; 1681 } 1682 1683 $commit = 1 if ! defined $commit; 1684 1685 my $dbh = $gdbh{$dbname} or die "No such database: $dbname"; 1686 1687 ## Loop through each table we know about 1688 for my $table (sort keys %tabletype) { 1689 1690 ## Look up the actual value to use 1691 my $type = $tabletype{$table}; 1692 my $value = $val{$type}{$xval}; 1693 1694 ## Prepare it if we have not already 1695 if (! exists $gsth{$dbh}{insert}{$xval}{$table}) { 1696 1697 ## Handle odd pkeys 1698 my $pkey = $table =~ /test5/ ? q{"id space"} : 'id'; 1699 1700 ## Put some standard values in, plus a single placeholder 1701 my $SQL = qq{INSERT INTO "$table"($pkey,data1,inty,booly) VALUES (?,'foo',$xval,'true')}; 1702 $gsth{$dbh}{insert}{$xval}{$table} = $dbh->prepare($SQL); 1703 1704 ## If this is a bytea, we need to tell DBD::Pg about it 1705 if ('BYTEA' eq $type) { 1706 $gsth{$dbh}{insert}{$xval}{$table}->bind_param(1, undef, {pg_type => PG_BYTEA}); 1707 } 1708 1709 } 1710 1711 ## Execute! 1712 $gsth{$dbh}{insert}{$xval}{$table}->execute($value); 1713 1714 } 1715 1716 $dbh->commit() if $commit; 1717 1718 return undef; 1719 1720} ## end of add_row_to_database 1721 1722 1723sub update_row_in_database { 1724 1725 ## Change a row in each table in a database 1726 ## We always change the "inty" field 1727 ## Arguments: four 1728 ## 1. Database name to use 1729 ## 2. Primary key to update 1730 ## 3. New value 1731 ## 4. Do we commit or not? Boolean, defaults to true 1732 ## Returns: undef 1733 1734 my ($self, $dbname, $pkeyvalue, $newvalue, $commit) = @_; 1735 1736 $commit = 1 if ! defined $commit; 1737 1738 my $dbh = $gdbh{$dbname} or die "No such database: $dbname"; 1739 1740 ## Loop through each table we know about 1741 for my $table (sort keys %tabletype) { 1742 1743 ## Look up the actual value to use 1744 my $type = $tabletype{$table}; 1745 my $value = $val{$type}{$pkeyvalue}; 1746 1747 ## Prepare it if we have not already 1748 if (! exists $gsth{$dbh}{update}{inty}{$table}) { 1749 1750 ## Handle odd pkeys 1751 my $pkey = $table =~ /test5/ ? q{"id space"} : 'id'; 1752 1753 my $SQL = qq{UPDATE "$table" SET inty=? WHERE $pkey = ?}; 1754 $gsth{$dbh}{update}{inty}{$table} = $dbh->prepare($SQL); 1755 1756 if ('BYTEA' eq $type) { 1757 $gsth{$dbh}{update}{inty}{$table}->bind_param(2, undef, {pg_type => PG_BYTEA}); 1758 } 1759 1760 } 1761 1762 ## Execute! 1763 $gsth{$dbh}{update}{inty}{$table}->execute($newvalue,$value); 1764 1765 } 1766 1767 $dbh->commit() if $commit; 1768 1769 return undef; 1770 1771} ## end of update_row_in_database 1772 1773 1774sub remove_row_from_database { 1775 1776 ## Delete a row from each table in one of the databases 1777 ## Arguments: three 1778 ## 1. Database name to use 1779 ## 2. Value to use (lookup, not the direct value). Can be an arrayref. 1780 ## 3. Do we commit or not? Boolean, defaults to true 1781 ## Returns: undef 1782 1783 my ($self, $dbname, $val, $commit) = @_; 1784 1785 $commit = 1 if ! defined $commit; 1786 1787 my $dbh = $gdbh{$dbname} or die "No such database: $dbname"; 1788 1789 ## Loop through each table we know about 1790 for my $table (sort keys %tabletype) { 1791 1792 ## Prepare it if we have not already 1793 if (! exists $gsth{$dbh}{delete}{$table}) { 1794 1795 ## Delete, based on the inty 1796 my $SQL = qq{DELETE FROM "$table" WHERE inty = ?}; 1797 $gsth{$dbh}{delete}{$table} = $dbh->prepare($SQL); 1798 1799 } 1800 1801 ## Execute it. 1802 if (ref $val) { 1803 for (@$val) { 1804 $gsth{$dbh}{delete}{$table}->execute($_); 1805 } 1806 } 1807 else { 1808 $gsth{$dbh}{delete}{$table}->execute($val); 1809 } 1810 1811 } 1812 1813 $dbh->commit() if $commit; 1814 1815 return undef; 1816 1817} ## end of remove_row_from_database 1818 1819 1820sub truncate_all_tables { 1821 1822 ## Truncate all the tables 1823 ## Arguments: two 1824 ## 1. Database to use 1825 ## 3. Do we commit or not? Boolean, defaults to true 1826 ## Returns: undef 1827 1828 my ($self, $dbname, $commit) = @_; 1829 1830 $commit = 1 if ! defined $commit; 1831 1832 my $dbh = $gdbh{$dbname} or die "No such database: $dbname"; 1833 1834 ## Loop through each table we know about 1835 for my $table (sort keys %tabletype) { 1836 $dbh->do(qq{TRUNCATE Table "$table"}); 1837 } 1838 1839 $dbh->commit() if $commit; 1840 1841 return undef; 1842 1843} ## end of truncate_all_tables 1844 1845 1846sub delete_all_tables { 1847 1848 ## Delete all the tables. 1849 ## Mostly for old versions that do not support truncate triggers. 1850 ## Arguments: two 1851 ## 1. Database to use 1852 ## 3. Do we commit or not? Boolean, defaults to true 1853 ## Returns: undef 1854 1855 my ($self, $dbname, $commit) = @_; 1856 1857 $commit = 1 if ! defined $commit; 1858 1859 my $dbh = $gdbh{$dbname} or die "No such database: $dbname"; 1860 1861 ## Loop through each table we know about 1862 for my $table (sort keys %tabletype) { 1863 $dbh->do(qq{DELETE FROM "$table"}); 1864 } 1865 1866 $dbh->commit() if $commit; 1867 1868 return undef; 1869 1870} ## end of delete_all_tables 1871 1872 1873sub check_for_row { 1874 1875 ## Check that a given row is on the database as expected: checks the inty column only 1876 ## Arguments: two or three or four 1877 ## 1. The result we are expecting, as an arrayref 1878 ## 2. A list of database names (should be inside gdbh) 1879 ## 3. Optional text to append to output message 1880 ## 4. Optional tables to limit checking to 1881 ## Returns: undef 1882 1883 my ($self, $res, $dblist, $text, $filter) = @_; 1884 1885 ## Get largest tablename 1886 my $maxtable = 1; 1887 for my $table (keys %tabletype) { 1888 ## Allow skipping tables 1889 if (defined $filter) { 1890 my $f = $filter; 1891 if ($f =~ s/^\!//) { 1892 if ($table =~ /$f$/) { 1893 delete $tabletype{$table}; 1894 next; 1895 } 1896 } 1897 else { 1898 if ($table !~ /$f$/) { 1899 delete $tabletype{$table}; 1900 next; 1901 } 1902 } 1903 } 1904 $maxtable = length $table if length $table > $maxtable; 1905 } 1906 1907 for my $dbname (@$dblist) { 1908 1909 if (! $gdbh{$dbname}) { 1910 $gdbh{$dbname} = $self->connect_database($dbname,$BucardoTesting::dbname); 1911 } 1912 1913 my $dbh = $gdbh{$dbname}; 1914 1915 my $maxdbtable = $maxtable + 1 + length $dbname; 1916 1917 for my $table (sort keys %tabletype) { 1918 1919 ## Handle odd pkeys 1920 my $pkey = $table =~ /test5/ ? q{"id space"} : 'id'; 1921 1922 my $type = $tabletype{$table}; 1923 my $t = sprintf qq{%-*s copy ok (%s)}, 1924 $maxdbtable, 1925 "$dbname.$table", 1926 $type; 1927 1928 ## Change the message if no rows 1929 if (ref $res eq 'ARRAY' and ! defined $res->[0]) { 1930 $t = sprintf qq{No rows as expected in %-*s for pkey type %s}, 1931 $maxdbtable, 1932 "$dbname.$table", 1933 $type; 1934 } 1935 1936 if (defined $text and length $text) { 1937 $t .= " $text"; 1938 } 1939 1940 my $SQL = qq{SELECT inty FROM "$table" ORDER BY inty}; 1941 $table =~ /X/ and $SQL =~ s/inty/$pkey/; 1942 1943 local $Test::Builder::Level = $Test::Builder::Level + 1; 1944 my $result = bc_deeply($res, $dbh, $SQL, $t, (caller)[2]); 1945 $dbh->commit(); 1946 if (!$result) { 1947 my $line = (caller)[2]; 1948 Test::More::BAIL_OUT("Stopping on a failed 'check_for_row' test from line $line"); 1949 } 1950 } 1951 } 1952 1953 return; 1954 1955} ## end of check_for_row 1956 1957 1958sub check_sequences_same { 1959 1960 ## Check that sequences are the same across all databases 1961 ## Arguments: one 1962 ## 1. A list of database names (should be inside gdbh) 1963 ## Returns: undef 1964 1965 my ($self, $dblist) = @_; 1966 1967 for my $seq (sort keys %sequences) { 1968 1969 $SQL = qq{SELECT * FROM "$seq"}; 1970 1971 ## The first we come across will be the standard for the others 1972 my (%firstone, $firstdb); 1973 1974 ## Store failure messages 1975 my @msg; 1976 1977 for my $dbname (@$dblist) { 1978 1979 my $dbh = $gdbh{$dbname} or die "Invalid database name: $dbname"; 1980 1981 my $sth = $dbh->prepare($SQL); 1982 $sth->execute(); 1983 my $info = $sth->fetchall_arrayref({})->[0]; 1984 1985 if (! defined $firstone{$seq}) { 1986 $firstone{$seq} = $info; 1987 $firstdb = $dbname; 1988 next; 1989 } 1990 1991 ## Compare certain items 1992 for my $item (qw/ last_value start_value increment_by min_value max_value is_cycled is_called/) { 1993 my ($uno,$dos) = ($firstone{$seq}->{$item}, $info->{$item}); 1994 next if ! defined $uno or ! defined $dos; 1995 if ($uno ne $dos) { 1996 push @msg, "$item is different on $firstdb vs $dbname: $uno vs $dos"; 1997 } 1998 } 1999 2000 } ## end each sequence 2001 2002 if (@msg) { 2003 Test::More::fail("Sequence $seq NOT the same"); 2004 for (@msg) { 2005 diag($_); 2006 } 2007 } 2008 else { 2009 Test::More::pass("Sequence $seq is the same across all databases"); 2010 } 2011 2012 } ## end each database 2013 2014 2015 return; 2016 2017 2018} ## end of check_sequences_same 2019 2020 2021 2022 2023## Hack to override some Test::More methods 2024## no critic 2025 2026sub is_deeply { 2027 2028 t($_[2],$_[3] || (caller)[2]); 2029 local $Test::Builder::Level = $Test::Builder::Level + 1; 2030 my $rv = Test::More::is_deeply($_[0],$_[1],$testmsg); 2031 return $rv if $rv; 2032 if ($bail_on_error and ++$total_errors => $bail_on_error) { 2033 my $line = (caller)[2]; 2034 my $time = time; 2035 diag("GOT: ".Dumper $_[0]); 2036 diag("EXPECTED: ".Dumper $_[1]); 2037 Test::More::BAIL_OUT("Stopping on a failed 'is_deeply' test from line $line. Time: $time"); 2038 } 2039} ## end of is_deeply 2040sub like($$;$) { 2041 t($_[2],(caller)[2]); 2042 local $Test::Builder::Level = $Test::Builder::Level + 1; 2043 my $rv = Test::More::like($_[0],$_[1],$testmsg); 2044 return $rv if $rv; 2045 if ($bail_on_error and ++$total_errors => $bail_on_error) { 2046 my $line = (caller)[2]; 2047 my $time = time; 2048# Test::More::diag("GOT: ".Dumper $_[0]); 2049# Test::More::diag("EXPECTED: ".Dumper $_[1]); 2050 Test::More::BAIL_OUT("Stopping on a failed 'like' test from line $line. Time: $time"); 2051 } 2052} ## end of like 2053sub pass(;$) { 2054 t($_[0],$_[1]||(caller)[2]); 2055 local $Test::Builder::Level = $Test::Builder::Level + 1; 2056 Test::More::pass($testmsg); 2057} ## end of pass 2058sub is($$;$) { 2059 t($_[2],(caller)[2]); 2060 local $Test::Builder::Level = $Test::Builder::Level + 1; 2061 my $rv = Test::More::is($_[0],$_[1],$testmsg); 2062 return $rv if $rv; 2063 ## Where exactly did this fail? 2064 my $char = 0; 2065 my $onelen = length $_[0]; 2066 my $twolen = length $_[1]; 2067 my $line = 1; 2068 my $lchar = 1; 2069 for ($char = 0; $char < $onelen and $char < $twolen; $char++) { 2070 my $one = ord(substr($_[0],$char,1)); 2071 my $two = ord(substr($_[1],$char,1)); 2072 if ($one != $two) { 2073 diag("First difference at character $char ($one vs $two) (line $line, char $lchar)"); 2074 last; 2075 } 2076 if (10 == $one) { 2077 $line++; 2078 $lchar = 1; 2079 } 2080 else { 2081 $lchar++; 2082 } 2083 } 2084 if ($bail_on_error and ++$total_errors => $bail_on_error) { 2085 my $line = (caller)[2]; 2086 my $time = time; 2087 Test::More::BAIL_OUT("Stopping on a failed 'is' test from line $line. Time: $time"); 2088 } 2089} ## end of is 2090sub isa_ok($$;$) { 2091 t("Object isa $_[1]",(caller)[2]); 2092 my ($name, $type, $msg) = ($_[0],$_[1]); 2093 local $Test::Builder::Level = $Test::Builder::Level + 1; 2094 if (ref $name and ref $name eq $type) { 2095 Test::More::pass($testmsg); 2096 return; 2097 } 2098 if ($bail_on_error and ++$total_errors => $bail_on_error) { 2099 Test::More::BAIL_OUT("Stopping on a failed test"); 2100 } 2101} ## end of isa_ok 2102sub ok($;$) { 2103 t($_[1]||$testmsg); 2104 local $Test::Builder::Level = $Test::Builder::Level + 1; 2105 my $rv = Test::More::ok($_[0],$testmsg); 2106 return $rv if $rv; 2107 if ($bail_on_error and ++$total_errors => $bail_on_error) { 2108 my $line = (caller)[2]; 2109 my $time = time; 2110 Test::More::BAIL_OUT("Stopping on a failed 'ok' test from line $line. Time: $time"); 2111 } 2112} ## end of ok 2113 2114## use critic 2115 2116 21171; 2118